use std::env;
use std::io::Write;
use std::path::Path;
use std::thread;
use std::time::{Duration, Instant};
use spg_engine::{Engine, QueryResult};
use crate::{
CommitResult, CommitTask, DEFAULT_COMMIT_GROUP_MAX, ServerState, parse_env_u64, parse_env_usize,
};
use crate::{observability, pubsub};
pub(crate) const WAL_V2_SENTINEL: u32 = 0x8000_0000;
pub(crate) const WAL_V3_FLAG: u32 = 0x4000_0000;
pub(crate) const WAL_V3_SENTINEL: u32 = WAL_V2_SENTINEL | WAL_V3_FLAG;
pub(crate) fn synchronous_commit_disabled() -> bool {
static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*CACHED.get_or_init(|| {
std::env::var("SPG_SYNCHRONOUS_COMMIT")
.ok()
.is_some_and(|s| matches!(s.trim().to_lowercase().as_str(), "off" | "false" | "0"))
})
}
pub(crate) const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
pub(crate) const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
pub(crate) const WAL_V3_TYPE_COMPRESSED_SQL: u8 = 0x03;
pub(crate) const WAL_COMPRESS_ALGO_LZSS: u8 = 0x01;
pub(crate) const WAL_COMPRESS_MIN_BYTES: usize = 256;
pub(crate) fn encode_wal_record(sql: &str) -> std::io::Result<Vec<u8>> {
let len = u32::try_from(sql.len())
.map_err(|_| std::io::Error::other("SQL too large for WAL entry"))?;
if len & WAL_V2_SENTINEL != 0 {
return Err(std::io::Error::other(
"SQL byte count would alias the v4.37 WAL framing sentinel (≥ 2 GiB)",
));
}
let crc = spg_crypto::crc32::crc32(sql.as_bytes());
let mut entry = Vec::with_capacity(8 + sql.len());
entry.extend_from_slice(&(len | WAL_V2_SENTINEL).to_le_bytes());
entry.extend_from_slice(&crc.to_le_bytes());
entry.extend_from_slice(sql.as_bytes());
Ok(entry)
}
pub(crate) fn encode_wal_v3_record(type_tag: u8, payload: &[u8]) -> std::io::Result<Vec<u8>> {
let len = u32::try_from(payload.len())
.map_err(|_| std::io::Error::other("WAL v3 payload too large"))?;
if len & (WAL_V2_SENTINEL | WAL_V3_FLAG) != 0 {
return Err(std::io::Error::other(
"WAL v3 payload size would alias the v4.41 sentinel bits (≥ 1 GiB)",
));
}
let mut crc_input = Vec::with_capacity(1 + payload.len());
crc_input.push(type_tag);
crc_input.extend_from_slice(payload);
let crc = spg_crypto::crc32::crc32(&crc_input);
let mut entry = Vec::with_capacity(9 + payload.len());
entry.extend_from_slice(&(len | WAL_V3_SENTINEL).to_le_bytes());
entry.extend_from_slice(&crc.to_le_bytes());
entry.push(type_tag);
entry.extend_from_slice(payload);
Ok(entry)
}
pub(crate) fn encode_wal_auto_commit_sql_metrics(
sql: &str,
metrics: &observability::Metrics,
) -> std::io::Result<Vec<u8>> {
use std::sync::atomic::Ordering;
let raw_len = sql.len() as u64;
metrics
.wal_bytes_uncompressed_in
.fetch_add(raw_len, Ordering::Relaxed);
let threshold = wal_compression_min_bytes();
if !wal_compression_enabled() || sql.len() < threshold {
let out = encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, sql.as_bytes())?;
metrics
.wal_bytes_compressed_out
.fetch_add(out.len() as u64, Ordering::Relaxed);
return Ok(out);
}
let compressed = spg_crypto::lzss::compress(sql.as_bytes());
if compressed.len() + 1 >= sql.len() {
let out = encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, sql.as_bytes())?;
metrics
.wal_bytes_compressed_out
.fetch_add(out.len() as u64, Ordering::Relaxed);
return Ok(out);
}
let mut payload = Vec::with_capacity(1 + compressed.len());
payload.push(WAL_COMPRESS_ALGO_LZSS);
payload.extend_from_slice(&compressed);
let out = encode_wal_v3_record(WAL_V3_TYPE_COMPRESSED_SQL, &payload)?;
metrics
.wal_bytes_compressed_out
.fetch_add(out.len() as u64, Ordering::Relaxed);
Ok(out)
}
#[allow(dead_code)]
pub(crate) fn encode_wal_auto_commit_sql(sql: &str) -> std::io::Result<Vec<u8>> {
let threshold = wal_compression_min_bytes();
if !wal_compression_enabled() || sql.len() < threshold {
return encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, sql.as_bytes());
}
let compressed = spg_crypto::lzss::compress(sql.as_bytes());
if compressed.len() + 1 >= sql.len() {
return encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, sql.as_bytes());
}
let mut payload = Vec::with_capacity(1 + compressed.len());
payload.push(WAL_COMPRESS_ALGO_LZSS);
payload.extend_from_slice(&compressed);
encode_wal_v3_record(WAL_V3_TYPE_COMPRESSED_SQL, &payload)
}
pub(crate) fn wal_compression_min_bytes() -> usize {
static CHECKED: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*CHECKED.get_or_init(|| {
std::env::var("SPG_COMPRESSION_MIN_BYTES")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(WAL_COMPRESS_MIN_BYTES)
})
}
pub(crate) fn wal_compression_enabled() -> bool {
static CHECKED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*CHECKED.get_or_init(|| {
std::env::var("SPG_WAL_COMPRESSION").map_or(true, |v| !v.eq_ignore_ascii_case("none"))
})
}
pub(crate) fn wal_v3_auto_commit_size(sql: &str) -> u64 {
9u64 + sql.len() as u64
}
pub(crate) fn encode_durability_marker(byte_offset: u64) -> std::io::Result<Vec<u8>> {
encode_wal_v3_record(
WAL_V3_TYPE_DURABILITY_CHECKPOINT,
&byte_offset.to_le_bytes(),
)
}
pub(crate) fn append_durability_marker(state: &ServerState) -> std::io::Result<u64> {
let Some(wal_mutex) = state.wal.as_ref() else {
return Ok(0);
};
let pre_marker_offset = {
let mut wal = wal_mutex
.lock()
.map_err(|_| std::io::Error::other("wal mutex poisoned"))?;
let pre_marker_offset = wal.metadata()?.len();
let entry = encode_durability_marker(pre_marker_offset)?;
if let Some(quota) = state.chaos.wal_quota_bytes
&& pre_marker_offset.saturating_add(entry.len() as u64) > quota
{
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"wal quota exceeded by durability marker: cur={pre_marker_offset} + {} > quota={quota}",
entry.len()
),
));
}
if let Some(min_free) = state.limits.wal_min_free_bytes
&& let Some(wal_path) = state.wal_path.as_deref()
{
let free = wal_volume_free_bytes(wal_path)?;
if free < min_free {
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"WAL volume below water-mark for durability marker: free={free} < SPG_WAL_MIN_FREE_BYTES={min_free}"
),
));
}
}
wal.write_all(&entry)?;
pre_marker_offset
};
if let Some(sync_handle) = state.wal_sync_clone.as_ref() {
sync_handle.sync_data()?;
} else {
let wal = wal_mutex
.lock()
.map_err(|_| std::io::Error::other("wal mutex poisoned"))?;
wal.sync_data()?;
}
Ok(pre_marker_offset)
}
pub(crate) fn append_wal_v3_group(state: &ServerState, entries: &[Vec<u8>]) -> std::io::Result<()> {
let Some(wal) = state.wal.as_ref() else {
return Ok(());
};
if entries.is_empty() {
return Ok(());
}
let total: usize = entries.iter().map(Vec::len).sum();
let mut batched = Vec::with_capacity(total);
for e in entries {
batched.extend_from_slice(e);
}
let mut f = wal
.lock()
.map_err(|_| std::io::Error::other("wal mutex poisoned"))?;
if let Some(quota) = state.chaos.wal_quota_bytes {
let current = f.metadata().map_or(0, |m| m.len());
if current.saturating_add(batched.len() as u64) > quota {
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"wal quota exceeded: cur={current} + {} > quota={quota} (SPG_FAIL_WAL_QUOTA_BYTES)",
batched.len()
),
));
}
}
if let Some(min_free) = state.limits.wal_min_free_bytes
&& let Some(wal_path) = state.wal_path.as_deref()
{
let free = wal_volume_free_bytes(wal_path)?;
if free < min_free {
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"WAL volume below water-mark: free={free} < SPG_WAL_MIN_FREE_BYTES={min_free}"
),
));
}
}
f.write_all(&batched)?;
if !synchronous_commit_disabled() {
f.sync_data()?;
}
if let Some(tee_path) = wal_tee_path() {
if let Err(e) = append_to_tee(tee_path, &batched) {
eprintln!("spg-server: WAL tee append to {tee_path:?} failed: {e}");
}
}
Ok(())
}
pub(crate) fn wal_tee_path() -> Option<&'static str> {
static CACHED: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
CACHED
.get_or_init(|| env::var("SPG_WAL_TEE_PATH").ok().filter(|s| !s.is_empty()))
.as_deref()
}
pub(crate) fn append_to_tee(path: &str, bytes: &[u8]) -> std::io::Result<()> {
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
f.write_all(bytes)
}
pub(crate) fn clone_io_err(e: &std::io::Error) -> std::io::Error {
std::io::Error::new(e.kind(), e.to_string())
}
pub(crate) fn commit_group_max() -> usize {
parse_env_usize("SPG_COMMIT_GROUP_MAX").unwrap_or(DEFAULT_COMMIT_GROUP_MAX)
}
pub(crate) fn commit_delay_us() -> u64 {
parse_env_u64("SPG_COMMIT_DELAY_US").unwrap_or(0)
}
pub(crate) fn enqueue_commit_task(state: &ServerState, task: CommitTask) -> bool {
let mut q = state
.commit_queue
.lock()
.expect("commit queue mutex poisoned");
q.pending.push_back(task);
if q.leader_active {
false
} else {
q.leader_active = true;
true
}
}
#[allow(clippy::too_many_lines)]
pub(crate) fn run_leader_commit_round(state: &ServerState) {
struct Prepared {
task: CommitTask,
result: QueryResult,
wal_bytes: Vec<u8>,
}
let group_max = commit_group_max();
let delay_us = commit_delay_us();
loop {
let group: Vec<CommitTask> = {
let mut q = state
.commit_queue
.lock()
.expect("commit queue mutex poisoned");
if delay_us > 0 && q.pending.len() < group_max {
let deadline = Instant::now() + Duration::from_micros(delay_us);
while q.pending.len() < group_max && Instant::now() < deadline {
drop(q);
thread::yield_now();
q = state
.commit_queue
.lock()
.expect("commit queue mutex poisoned");
}
}
if q.pending.is_empty() {
q.leader_active = false;
return;
}
let take = q.pending.len().min(group_max);
q.pending.drain(..take).collect()
};
let mut prepared: Vec<Prepared> = Vec::with_capacity(group.len());
let pre_image: Option<spg_storage::Catalog> = {
let Ok(mut engine) = state.engine.write() else {
drop(group);
if let Ok(mut q) = state.commit_queue.lock() {
q.leader_active = false;
}
return;
};
let pre = engine.catalog().clone();
for task in group {
let tx_id = engine.alloc_tx_id();
if let Err(e) = engine.execute_in("BEGIN", tx_id) {
let _ = task.ack.send(CommitResult {
result: Err(e),
wal_outcome: Ok(()),
});
continue;
}
let exec_res = engine.execute_in_with_cancel(
&task.sql,
tx_id,
spg_engine::CancelToken::from_flag(&task.cancel_flag),
);
let was_command_ok = matches!(exec_res, Ok(QueryResult::CommandOk { .. }));
if !was_command_ok {
let _ = engine.execute_in("ROLLBACK", tx_id);
let _ = task.ack.send(CommitResult {
result: exec_res,
wal_outcome: Ok(()),
});
continue;
}
let wal_bytes = match encode_wal_auto_commit_sql_metrics(&task.sql, &state.metrics)
{
Ok(b) => b,
Err(e) => {
let _ = engine.execute_in("ROLLBACK", tx_id);
let _ = task.ack.send(CommitResult {
result: exec_res,
wal_outcome: Err(e),
});
continue;
}
};
if let Err(e) = engine.execute_in("COMMIT", tx_id) {
let _ = engine.execute_in("ROLLBACK", tx_id);
let _ = task.ack.send(CommitResult {
result: Err(e),
wal_outcome: Ok(()),
});
continue;
}
prepared.push(Prepared {
task,
result: exec_res.unwrap(),
wal_bytes,
});
}
if prepared.is_empty() { None } else { Some(pre) }
};
if prepared.is_empty() {
continue;
}
let entries: Vec<Vec<u8>> = prepared.iter().map(|p| p.wal_bytes.clone()).collect();
let wal_outcome: std::io::Result<()> = append_wal_v3_group(state, &entries);
if wal_outcome.is_err()
&& let Some(pre) = pre_image
{
if let Ok(mut engine) = state.engine.write() {
engine.replace_catalog(pre);
} else {
drop(prepared);
if let Ok(mut q) = state.commit_queue.lock() {
q.leader_active = false;
}
return;
}
}
let wal_ok = wal_outcome.is_ok();
for p in prepared {
let cloned_wal = match &wal_outcome {
Ok(()) => Ok(()),
Err(e) => Err(clone_io_err(e)),
};
if wal_ok {
pubsub::publish_sql(&p.task.sql);
}
let _ = p.task.ack.send(CommitResult {
result: Ok(p.result),
wal_outcome: cloned_wal,
});
}
}
}
pub(crate) fn append_wal(state: &ServerState, sql: &str) -> std::io::Result<()> {
let Some(wal) = state.wal.as_ref() else {
return Ok(());
};
let entry = encode_wal_record(sql)?;
let mut f = wal
.lock()
.map_err(|_| std::io::Error::other("wal mutex poisoned"))?;
if let Some(quota) = state.chaos.wal_quota_bytes {
let current = f.metadata().map_or(0, |m| m.len());
if current.saturating_add(entry.len() as u64) > quota {
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"wal quota exceeded: cur={current} + {} > quota={quota} (SPG_FAIL_WAL_QUOTA_BYTES)",
entry.len()
),
));
}
}
if let Some(min_free) = state.limits.wal_min_free_bytes
&& let Some(wal_path) = state.wal_path.as_deref()
{
let free = wal_volume_free_bytes(wal_path)?;
if free < min_free {
return Err(std::io::Error::new(
std::io::ErrorKind::StorageFull,
format!(
"WAL volume below water-mark: free={free} < SPG_WAL_MIN_FREE_BYTES={min_free}"
),
));
}
}
f.write_all(&entry)?;
if !synchronous_commit_disabled() {
f.sync_data()?;
}
Ok(())
}
#[allow(unsafe_code, clippy::cast_lossless, clippy::useless_conversion)]
pub(crate) fn wal_volume_free_bytes(path: &Path) -> std::io::Result<u64> {
use std::os::unix::ffi::OsStrExt;
let bytes = path.as_os_str().as_bytes();
let mut c_path = Vec::with_capacity(bytes.len() + 1);
c_path.extend_from_slice(bytes);
c_path.push(0);
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::statvfs(c_path.as_ptr().cast(), &raw mut stat) };
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
let bavail = stat.f_bavail as u64;
let frsize = stat.f_frsize as u64;
Ok(bavail.saturating_mul(frsize))
}
pub(crate) fn replay_execute_quarantining(engine: &mut Engine, sql: &str, frame_off: usize) {
if let Err(e) = engine.execute(sql) {
eprintln!(
"spg-server: WAL replay QUARANTINED statement at offset {frame_off} \
(boot continues): {sql:?} rejected: {e:?}"
);
}
}
pub(crate) fn dispatch_v3_record(
tag: u8,
payload: &[u8],
frame_off: usize,
engine: &mut Engine,
) -> std::io::Result<bool> {
match tag {
WAL_V3_TYPE_AUTO_COMMIT_SQL => {
let sql = core::str::from_utf8(payload).map_err(|_| {
std::io::Error::other("v3 auto_commit_sql payload has non-UTF-8 SQL")
})?;
replay_execute_quarantining(engine, sql, frame_off);
Ok(true)
}
WAL_V3_TYPE_COMPRESSED_SQL => {
if payload.is_empty() {
return Err(std::io::Error::other(format!(
"WAL compressed_sql at offset {frame_off}: empty payload"
)));
}
let algo = payload[0];
let compressed = &payload[1..];
let raw_bytes = match algo {
WAL_COMPRESS_ALGO_LZSS => spg_crypto::lzss::decompress(compressed).map_err(|e| {
std::io::Error::other(format!(
"WAL compressed_sql at offset {frame_off}: LZSS decompress failed: {e:?}"
))
})?,
other => {
return Err(std::io::Error::other(format!(
"WAL compressed_sql at offset {frame_off}: unknown algo byte {other:#04x}"
)));
}
};
let sql = core::str::from_utf8(&raw_bytes).map_err(|_| {
std::io::Error::other(format!(
"WAL compressed_sql at offset {frame_off}: decompressed bytes are not valid UTF-8"
))
})?;
replay_execute_quarantining(engine, sql, frame_off);
Ok(true)
}
WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
if payload.len() != 8 {
return Err(std::io::Error::other(format!(
"WAL durability_checkpoint at offset {frame_off} has {}-byte payload (expected 8)",
payload.len()
)));
}
let arr: [u8; 8] = payload.try_into().expect("checked len above");
let recorded_off = u64::from_le_bytes(arr);
let frame_off_u64 = frame_off as u64;
if recorded_off != frame_off_u64 {
eprintln!(
"spg-server: WAL durability_checkpoint at offset {frame_off} carries recorded_off={recorded_off} — possible WAL relocation; treating marker as no-op"
);
}
Ok(false)
}
other => Err(std::io::Error::other(format!(
"WAL v3 unknown type byte {other:#04x} at offset {frame_off} — refusing to replay"
))),
}
}
pub(crate) fn replay_wal_bytes(bytes: &[u8], engine: &mut Engine) -> std::io::Result<usize> {
let mut cur = 0;
let mut applied = 0usize;
while cur < bytes.len() {
if bytes.len() - cur < 4 {
eprintln!(
"spg-server: WAL truncated at offset {cur} (need 4-byte length, have {})",
bytes.len() - cur
);
break;
}
let frame_off = cur;
let len_arr: [u8; 4] = bytes[cur..cur + 4].try_into().expect("checked");
let raw_len = u32::from_le_bytes(len_arr);
cur += 4;
let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
let len_mask = if is_v3 {
!(WAL_V2_SENTINEL | WAL_V3_FLAG)
} else {
!WAL_V2_SENTINEL
};
let len = (raw_len & len_mask) as usize;
let expected_crc = if is_v2 {
if bytes.len() - cur < 4 {
eprintln!(
"spg-server: v2/v3 WAL truncated at offset {cur} (need 4-byte CRC, have {})",
bytes.len() - cur
);
break;
}
let crc_arr: [u8; 4] = bytes[cur..cur + 4].try_into().expect("checked");
cur += 4;
Some(u32::from_le_bytes(crc_arr))
} else {
None
};
let v3_type_tag = if is_v3 {
if bytes.len() - cur < 1 {
eprintln!(
"spg-server: v3 WAL truncated at offset {cur} (need 1-byte type, have 0)"
);
break;
}
let t = bytes[cur];
cur += 1;
Some(t)
} else {
None
};
if cur + len > bytes.len() {
eprintln!("spg-server: WAL entry truncated (payload_len={len}) — dropping tail");
break;
}
let payload = &bytes[cur..cur + len];
if let Some(expected) = expected_crc {
let actual = if let Some(tag) = v3_type_tag {
let mut buf = Vec::with_capacity(1 + payload.len());
buf.push(tag);
buf.extend_from_slice(payload);
spg_crypto::crc32::crc32(&buf)
} else {
spg_crypto::crc32::crc32(payload)
};
if actual != expected {
return Err(std::io::Error::other(format!(
"WAL CRC mismatch at offset {frame_off} (expected={expected:#010x}, computed={actual:#010x}, payload_len={len}) — corruption detected, refusing to replay"
)));
}
}
let count_as_applied = if let Some(tag) = v3_type_tag {
dispatch_v3_record(tag, payload, frame_off, engine)?
} else {
let sql = core::str::from_utf8(payload)
.map_err(|_| std::io::Error::other("WAL entry has non-UTF-8 SQL"))?;
replay_execute_quarantining(engine, sql, frame_off);
true
};
cur += len;
if count_as_applied {
applied += 1;
}
}
Ok(applied)
}
#[cfg(test)]
mod wal_v3_durability_marker_tests {
use super::{
Engine, WAL_V2_SENTINEL, WAL_V3_FLAG, WAL_V3_SENTINEL, WAL_V3_TYPE_AUTO_COMMIT_SQL,
WAL_V3_TYPE_DURABILITY_CHECKPOINT, encode_durability_marker, encode_wal_v3_record,
replay_wal_bytes,
};
#[test]
fn durability_marker_frame_shape_pins_v3_wire() {
let bytes = encode_durability_marker(0x1234_5678).unwrap();
assert_eq!(bytes.len(), 17, "marker frame must be 17 bytes");
let raw_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let len_field = raw_len & !(WAL_V2_SENTINEL | WAL_V3_FLAG);
assert_eq!(len_field, 8, "marker payload is 8 bytes (the u64 offset)");
assert_eq!(
raw_len & WAL_V3_SENTINEL,
WAL_V3_SENTINEL,
"marker must carry v3 sentinel bits",
);
assert_eq!(
bytes[8], WAL_V3_TYPE_DURABILITY_CHECKPOINT,
"type byte must be 0x02",
);
let offset = u64::from_le_bytes(bytes[9..17].try_into().unwrap());
assert_eq!(offset, 0x1234_5678, "payload echoes the offset arg");
}
#[test]
fn replay_skips_durability_markers_and_does_not_increment_applied() {
let mut stream = Vec::new();
stream.extend_from_slice(&encode_durability_marker(0).unwrap());
stream.extend_from_slice(&encode_durability_marker(17).unwrap());
stream.extend_from_slice(&encode_durability_marker(34).unwrap());
let mut engine = Engine::new();
let applied = replay_wal_bytes(&stream, &mut engine).expect("replay must accept markers");
assert_eq!(applied, 0, "markers do not count as applied records");
}
#[test]
fn replay_mixes_sql_and_markers_advancing_cursor_correctly() {
let mut stream = Vec::new();
let create_a =
encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, b"CREATE TABLE a (id INT)").unwrap();
let create_b =
encode_wal_v3_record(WAL_V3_TYPE_AUTO_COMMIT_SQL, b"CREATE TABLE b (id INT)").unwrap();
let marker_off = create_a.len() as u64;
let marker = encode_durability_marker(marker_off).unwrap();
stream.extend_from_slice(&create_a);
stream.extend_from_slice(&marker);
stream.extend_from_slice(&create_b);
let mut engine = Engine::new();
let applied =
replay_wal_bytes(&stream, &mut engine).expect("mixed stream must replay cleanly");
assert_eq!(
applied, 2,
"two CREATE TABLEs applied; marker doesn't count"
);
}
#[test]
fn replay_rejects_marker_with_wrong_payload_length() {
let bad =
encode_wal_v3_record(WAL_V3_TYPE_DURABILITY_CHECKPOINT, &0u32.to_le_bytes()).unwrap();
let mut engine = Engine::new();
let err = replay_wal_bytes(&bad, &mut engine).expect_err("4-byte payload must error");
let msg = err.to_string();
assert!(
msg.contains("durability_checkpoint") && msg.contains("4-byte payload"),
"error message should name the malformed marker: got {msg:?}",
);
}
}