Skip to main content

fsqlite_wal/
parallel_wal.rs

1//! Parallel WAL coordinator (D1: bd-3wop3.1).
2//!
3//! This module provides a lock-free parallel WAL write path using per-thread
4//! buffers and epoch-based group commit. It replaces the global WAL append
5//! mutex with cooperative per-thread buffering.
6//!
7//! # Architecture
8//!
9//! 1. Each writer thread appends WAL frames to its own buffer with NO global lock.
10//! 2. A background epoch ticker advances the global epoch every ~10ms.
11//! 3. On epoch advance, slot-local buffer locks make sealing wait for any
12//!    in-flight batch append to complete, then the previous epoch is flushed.
13//! 4. Commit durability: transaction waits until its epoch is durable.
14//!
15//! # Key Benefits
16//!
17//! - Eliminates the #1 contention point (global WAL append mutex).
18//! - WAL writes are now embarrassingly parallel.
19//! - Epoch mechanism provides natural group commit semantics (Silo/Aether pattern).
20
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::env;
23use std::fs::{self, File, OpenOptions};
24use std::hash::BuildHasher;
25use std::io::{self, BufReader, BufWriter, Read, Seek, Write};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::{Arc, Mutex, OnceLock};
29use std::time::Duration;
30
31#[cfg(not(target_arch = "wasm32"))]
32use asupersync::runtime::{BlockingTaskHandle, RuntimeHandle};
33use fsqlite_types::{CommitSeq, PageNumber, TxnToken, cx::Cx, limits};
34
35use crate::group_commit::TransactionFrameBatchContext;
36use crate::per_core_buffer::{
37    AppendOutcome, BufferConfig, DEFAULT_BUFFER_SLOT_COUNT, EpochConfig, EpochFlushBatch,
38    EpochOrderCoordinator, WalRecord, thread_buffer_slot,
39};
40
41// ---------------------------------------------------------------------------
42// Configuration
43// ---------------------------------------------------------------------------
44
45/// Configuration for the parallel WAL coordinator.
46#[derive(Debug, Clone, Copy)]
47pub struct ParallelWalConfig {
48    /// Number of buffer slots (typically 128 for 16 threads).
49    pub slot_count: usize,
50    /// Epoch advance interval in milliseconds (default: 10ms).
51    pub epoch_interval_ms: u64,
52    /// Buffer capacity in bytes per slot (default: 4MB).
53    pub buffer_capacity_bytes: usize,
54}
55
56impl Default for ParallelWalConfig {
57    fn default() -> Self {
58        Self {
59            slot_count: DEFAULT_BUFFER_SLOT_COUNT,
60            epoch_interval_ms: 10,
61            buffer_capacity_bytes: 4 * 1024 * 1024,
62        }
63    }
64}
65
66/// Operator-visible control mode for the D1.a parallel WAL contract.
67///
68/// `Auto` keeps the deterministic parallel data plane enabled and allows the
69/// optional decision plane to tune batching/flush behavior within declared
70/// safety bounds. `Conservative` forces the compatibility-safe single ordered
71/// path. `ShadowCompare` runs the conservative proof path alongside the
72/// parallel data plane and forces a downgrade if the two disagree.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
74pub enum ParallelWalOperatingMode {
75    #[default]
76    Auto,
77    Conservative,
78    ShadowCompare,
79}
80
81/// The irreducible ordered residue that remains even after per-core lane
82/// staging removes the global append bottleneck.
83///
84/// D1.a explicitly constrains the ordered residue to:
85/// 1. commit-sequence assignment,
86/// 2. commit-certificate durability, and
87/// 3. pager visibility publication.
88///
89/// Everything before that point stays lane-local and parallelizable.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum ParallelWalOrderedResidue {
92    #[default]
93    CommitCertificateThenPublish,
94}
95
96/// Deterministic reasons for forcing the conservative/safe path.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ParallelWalFallbackReason {
99    OperatorForced,
100    LaneOverflow,
101    CertificateGap,
102    CertificateChecksumMismatch,
103    PublicationMismatch,
104    RecoveryGap,
105    CheckpointConflict,
106    ControllerEvidenceLost,
107}
108
109/// Explicit operator control surface for the D1.a contract.
110///
111/// This is intentionally configuration-shaped rather than implementation-
112/// shaped so the later D1.b/D1.c/D1.d beads cannot reinterpret the runtime
113/// knobs ad hoc.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct ParallelWalControlSurface {
116    /// `Auto`, `Conservative`, or `ShadowCompare`.
117    pub mode: ParallelWalOperatingMode,
118    /// Optional hard override for the number of active append lanes.
119    pub lane_count_override: Option<usize>,
120    /// Optional cap for helper/combine lanes assisting with flush work.
121    pub helper_lane_budget: Option<usize>,
122    /// Optional cap on batch bytes the decision plane may stage before sealing.
123    pub max_parallel_commit_bytes: Option<u64>,
124    /// Optional cap on how long a batch may wait before being sealed/flushed.
125    pub max_flush_delay_ms: Option<u64>,
126    /// Shadow-compare sampling rate in per-mille. `None` disables sampling.
127    pub shadow_compare_sampling_per_mille: Option<u16>,
128}
129
130impl Default for ParallelWalControlSurface {
131    fn default() -> Self {
132        Self {
133            mode: ParallelWalOperatingMode::Auto,
134            lane_count_override: None,
135            helper_lane_budget: None,
136            max_parallel_commit_bytes: None,
137            max_flush_delay_ms: None,
138            shadow_compare_sampling_per_mille: None,
139        }
140    }
141}
142
143/// Telemetry schema version for lane-local append staging.
144pub const PARALLEL_WAL_LANE_POLICY_VERSION: &str = "thread_slot_v1";
145/// Compatibility selector bundle required by `bd-db300.7.5.3` / `G5.3`.
146pub const PARALLEL_WAL_COMPATIBILITY_SELECTOR: &str = "wal_invariant,integrity_check,row_level";
147/// Structured-log scenario id for queue submission.
148pub const PARALLEL_WAL_STAGE_SCENARIO_ID: &str = "parallel_wal_lane_stage";
149/// Structured-log scenario id for flush-time lane telemetry.
150pub const PARALLEL_WAL_FLUSH_SCENARIO_ID: &str = "parallel_wal_lane_flush";
151/// Lane ids and commit-certificate lane counts are stored as `u16`, so keep
152/// both the count and every generated id representable.
153const MAX_PARALLEL_WAL_LANE_COUNT: usize = 65_535;
154
155/// Verdict emitted by shadow-compare lane validation.
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
157pub enum ParallelWalShadowVerdict {
158    #[default]
159    NotRun,
160    Clean,
161    Diverged,
162}
163
164/// Staged lane-local payload awaiting ordered flush consumption.
165#[derive(Debug, Clone)]
166pub struct ParallelWalLaneBatch<T> {
167    /// Monotonic identifier used to correlate queue submission with flush.
168    pub batch_id: u64,
169    /// Stable lane identity chosen for the submitting writer.
170    pub lane_id: u16,
171    /// Number of frames staged for this batch.
172    pub staged_frame_count: u32,
173    /// Time spent staging locally before queue submission.
174    pub staging_elapsed_ns: u64,
175    /// Shadow-compare outcome for this batch.
176    pub shadow_verdict: ParallelWalShadowVerdict,
177    /// Caller-owned payload preserved until the ordered residue consumes it.
178    pub payload: T,
179}
180
181/// Production lane-local staging state for ordinary parallel WAL appends.
182///
183/// This keeps batch ownership, backlog accounting, and same-lane drain order
184/// inside `fsqlite-wal` instead of scattering the logic across pager-only
185/// callers. The caller still owns the final ordered durability residue.
186#[derive(Debug)]
187pub struct ParallelWalLaneStager<T> {
188    control: ParallelWalControlSurface,
189    next_batch_id: AtomicU64,
190    lane_batches: Mutex<HashMap<u16, VecDeque<ParallelWalLaneBatch<T>>>>,
191    lane_backlog_frames: Mutex<HashMap<u16, usize>>,
192}
193
194impl<T> ParallelWalLaneStager<T> {
195    #[must_use]
196    pub fn new(control: ParallelWalControlSurface) -> Self {
197        Self {
198            control,
199            next_batch_id: AtomicU64::new(1),
200            lane_batches: Mutex::new(HashMap::new()),
201            lane_backlog_frames: Mutex::new(HashMap::new()),
202        }
203    }
204
205    #[must_use]
206    pub fn control(&self) -> &ParallelWalControlSurface {
207        &self.control
208    }
209
210    #[must_use]
211    pub fn next_batch_id(&self) -> u64 {
212        self.next_batch_id.fetch_add(1, Ordering::Relaxed)
213    }
214
215    #[must_use]
216    pub fn lane_count(&self) -> usize {
217        match self.control.mode {
218            ParallelWalOperatingMode::Conservative => 1,
219            _ => self
220                .control
221                .lane_count_override
222                .unwrap_or_else(default_parallel_wal_lane_count)
223                .clamp(1, MAX_PARALLEL_WAL_LANE_COUNT),
224        }
225    }
226
227    #[must_use]
228    pub fn current_lane_id(&self) -> u16 {
229        u16::try_from(thread_buffer_slot(self.lane_count()))
230            .expect("lane_count is clamped to the u16 lane-id range")
231    }
232
233    #[must_use]
234    pub fn current_lane_backlog(&self, lane_id: u16) -> usize {
235        self.lane_backlog_frames
236            .lock()
237            .unwrap_or_else(std::sync::PoisonError::into_inner)
238            .get(&lane_id)
239            .copied()
240            .unwrap_or(0)
241    }
242
243    pub fn record_batch(&self, batch: ParallelWalLaneBatch<T>) -> usize {
244        let lane_id = batch.lane_id;
245        let staged_frame_count = usize::try_from(batch.staged_frame_count).unwrap_or(0);
246
247        let mut lane_batches = self
248            .lane_batches
249            .lock()
250            .unwrap_or_else(std::sync::PoisonError::into_inner);
251        let mut lane_backlog = self
252            .lane_backlog_frames
253            .lock()
254            .unwrap_or_else(std::sync::PoisonError::into_inner);
255        lane_batches.entry(lane_id).or_default().push_back(batch);
256        let backlog = lane_backlog.entry(lane_id).or_insert(0);
257        *backlog = backlog.saturating_add(staged_frame_count);
258        *backlog
259    }
260
261    pub fn take_batches_for_flush(
262        &self,
263        contexts: &[TransactionFrameBatchContext],
264    ) -> Option<HashMap<u64, ParallelWalLaneBatch<T>>> {
265        let mut lane_batches = self
266            .lane_batches
267            .lock()
268            .unwrap_or_else(std::sync::PoisonError::into_inner);
269
270        let mut expected_offsets = HashMap::<u16, usize>::new();
271        for context in contexts {
272            let offset = expected_offsets.entry(context.lane_id).or_insert(0);
273            let candidate = lane_batches
274                .get(&context.lane_id)
275                .and_then(|queue| queue.get(*offset))
276                .filter(|candidate| candidate.batch_id == context.batch_id)?;
277            let _ = candidate;
278            *offset = offset.saturating_add(1);
279        }
280
281        let mut by_batch_id = HashMap::with_capacity(contexts.len());
282        let mut lane_backlog = self
283            .lane_backlog_frames
284            .lock()
285            .unwrap_or_else(std::sync::PoisonError::into_inner);
286        for context in contexts {
287            let candidate = lane_batches
288                .get_mut(&context.lane_id)
289                .and_then(VecDeque::pop_front)
290                .expect("verified lane-local batch must still exist");
291            let backlog = lane_backlog.entry(context.lane_id).or_insert(0);
292            *backlog = backlog.saturating_sub(
293                usize::try_from(candidate.staged_frame_count).unwrap_or(usize::MAX),
294            );
295            if *backlog == 0 {
296                lane_backlog.remove(&context.lane_id);
297            }
298            if lane_batches
299                .get(&context.lane_id)
300                .is_some_and(VecDeque::is_empty)
301            {
302                lane_batches.remove(&context.lane_id);
303            }
304            by_batch_id.insert(candidate.batch_id, candidate);
305        }
306
307        Some(by_batch_id)
308    }
309
310    /// Discard prepared payloads for batches that already fell back to the
311    /// raw ordered WAL path.
312    ///
313    /// Lane-local preparation is an optimization, not the durability record.
314    /// If a flusher cannot consume the prepared payloads for a group-commit
315    /// epoch, those payloads become stale as soon as the same batches are
316    /// appended via borrowed frame refs. Leaving them queued would make later
317    /// epochs see an old batch id at the front of the lane and permanently
318    /// fall back.
319    pub fn discard_batches_for_flush(&self, contexts: &[TransactionFrameBatchContext]) -> usize {
320        if contexts.is_empty() {
321            return 0;
322        }
323
324        let discard_ids = contexts
325            .iter()
326            .map(|context| context.batch_id)
327            .collect::<HashSet<_>>();
328        let mut removed_batches = 0_usize;
329        let mut removed_frames_by_lane = HashMap::<u16, usize>::new();
330
331        let mut lane_batches = self
332            .lane_batches
333            .lock()
334            .unwrap_or_else(std::sync::PoisonError::into_inner);
335        for (lane_id, queue) in lane_batches.iter_mut() {
336            let mut retained = VecDeque::with_capacity(queue.len());
337            while let Some(batch) = queue.pop_front() {
338                if discard_ids.contains(&batch.batch_id) {
339                    removed_batches = removed_batches.saturating_add(1);
340                    let removed_frames =
341                        usize::try_from(batch.staged_frame_count).unwrap_or(usize::MAX);
342                    let entry = removed_frames_by_lane.entry(*lane_id).or_insert(0);
343                    *entry = entry.saturating_add(removed_frames);
344                } else {
345                    retained.push_back(batch);
346                }
347            }
348            *queue = retained;
349        }
350        lane_batches.retain(|_, queue| !queue.is_empty());
351
352        if !removed_frames_by_lane.is_empty() {
353            let mut lane_backlog = self
354                .lane_backlog_frames
355                .lock()
356                .unwrap_or_else(std::sync::PoisonError::into_inner);
357            for (lane_id, removed_frames) in removed_frames_by_lane {
358                let backlog = lane_backlog.entry(lane_id).or_insert(0);
359                *backlog = backlog.saturating_sub(removed_frames);
360                if *backlog == 0 {
361                    lane_backlog.remove(&lane_id);
362                }
363            }
364        }
365
366        removed_batches
367    }
368}
369
370#[must_use]
371pub fn parallel_wal_mode_name(mode: ParallelWalOperatingMode) -> &'static str {
372    match mode {
373        ParallelWalOperatingMode::Auto => "auto",
374        ParallelWalOperatingMode::Conservative => "conservative",
375        ParallelWalOperatingMode::ShadowCompare => "shadow_compare",
376    }
377}
378
379#[must_use]
380pub fn parallel_wal_fallback_reason_name(
381    reason: Option<ParallelWalFallbackReason>,
382) -> &'static str {
383    match reason {
384        None => "none",
385        Some(ParallelWalFallbackReason::OperatorForced) => "operator_forced",
386        Some(ParallelWalFallbackReason::LaneOverflow) => "lane_overflow",
387        Some(ParallelWalFallbackReason::CertificateGap) => "certificate_gap",
388        Some(ParallelWalFallbackReason::CertificateChecksumMismatch) => {
389            "certificate_checksum_mismatch"
390        }
391        Some(ParallelWalFallbackReason::PublicationMismatch) => "publication_mismatch",
392        Some(ParallelWalFallbackReason::RecoveryGap) => "recovery_gap",
393        Some(ParallelWalFallbackReason::CheckpointConflict) => "checkpoint_conflict",
394        Some(ParallelWalFallbackReason::ControllerEvidenceLost) => "controller_evidence_lost",
395    }
396}
397
398#[must_use]
399pub fn parallel_wal_shadow_verdict_name(verdict: ParallelWalShadowVerdict) -> &'static str {
400    match verdict {
401        ParallelWalShadowVerdict::NotRun => "not_run",
402        ParallelWalShadowVerdict::Clean => "clean",
403        ParallelWalShadowVerdict::Diverged => "diverged",
404    }
405}
406
407/// Decide whether a batch should run the shadow-compare proof path.
408///
409/// `ShadowCompare` mode always compares. In `Auto`, operators can enable a
410/// deterministic sample using `shadow_compare_sampling_per_mille`; the first
411/// `N` batch ids in each 1000-batch window take the compare path.
412#[must_use]
413pub fn parallel_wal_should_shadow_compare(
414    control: &ParallelWalControlSurface,
415    batch_id: u64,
416) -> bool {
417    match control.mode {
418        ParallelWalOperatingMode::Conservative => false,
419        ParallelWalOperatingMode::ShadowCompare => true,
420        ParallelWalOperatingMode::Auto => {
421            control
422                .shadow_compare_sampling_per_mille
423                .is_some_and(|rate| {
424                    let rate = u64::from(rate.min(1_000));
425                    rate > 0 && batch_id.saturating_sub(1) % 1_000 < rate
426                })
427        }
428    }
429}
430
431#[must_use]
432pub fn default_parallel_wal_lane_count() -> usize {
433    std::thread::available_parallelism()
434        .map(std::num::NonZeroUsize::get)
435        .unwrap_or(1)
436        .max(1)
437}
438
439#[must_use]
440pub fn resolve_parallel_wal_control_surface_from_env() -> ParallelWalControlSurface {
441    let mut control = ParallelWalControlSurface {
442        // Default runtime policy favors the raw ordered append path; operators
443        // can still opt into lane-local staging with FSQLITE_PARALLEL_WAL_MODE=auto.
444        mode: ParallelWalOperatingMode::Conservative,
445        ..ParallelWalControlSurface::default()
446    };
447
448    if let Ok(mode) = env::var("FSQLITE_PARALLEL_WAL_MODE") {
449        control.mode = match mode.trim().to_ascii_lowercase().as_str() {
450            "auto" => ParallelWalOperatingMode::Auto,
451            "conservative" | "serialized" | "single_lane" => ParallelWalOperatingMode::Conservative,
452            "shadow" | "shadow_compare" => ParallelWalOperatingMode::ShadowCompare,
453            _ => control.mode,
454        };
455    }
456    if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_LANES") {
457        if let Ok(value) = raw.trim().parse::<usize>() {
458            control.lane_count_override = Some(value.max(1));
459        }
460    }
461    if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_MAX_BATCH_BYTES") {
462        if let Ok(value) = raw.trim().parse::<u64>() {
463            control.max_parallel_commit_bytes = Some(value.max(1));
464        }
465    }
466    if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_MAX_FLUSH_DELAY_MS") {
467        if let Ok(value) = raw.trim().parse::<u64>() {
468            control.max_flush_delay_ms = Some(value);
469        }
470    }
471    if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_SHADOW_COMPARE_PER_MILLE") {
472        if let Ok(value) = raw.trim().parse::<u16>() {
473            control.shadow_compare_sampling_per_mille = Some(value);
474        }
475    }
476
477    control
478}
479
480/// Commit-certificate proof object for the parallel WAL data plane.
481///
482/// A commit becomes externally publishable only after the certificate is
483/// durably written. The certificate covers a contiguous commit-sequence range,
484/// the lanes that contributed to that range, and the pager-visible metadata
485/// that must be published once the ordered residue completes.
486#[derive(Debug, Clone, PartialEq, Eq)]
487pub struct ParallelWalCommitCertificate {
488    pub format_version: u16,
489    pub residue: ParallelWalOrderedResidue,
490    pub certificate_epoch: u64,
491    pub commit_seq_lo: CommitSeq,
492    pub commit_seq_hi: CommitSeq,
493    pub durable_segment_epoch: u64,
494    pub lane_count: u16,
495    pub lane_record_counts: Vec<u32>,
496    pub db_size_pages: u32,
497    pub page_set_size: u32,
498    pub certificate_crc32c: u32,
499    pub fallback_active: bool,
500}
501
502/// Trace schema shared by lane, combiner, checkpoint, recovery, and
503/// control-plane events.
504#[derive(Debug, Clone, PartialEq, Eq)]
505pub struct ParallelWalTraceRecord {
506    pub component: String,
507    pub trace_id: u64,
508    pub decision_id: Option<u64>,
509    pub mode: ParallelWalOperatingMode,
510    pub lane_id: Option<usize>,
511    pub epoch: Option<u64>,
512    pub commit_seq_lo: Option<CommitSeq>,
513    pub commit_seq_hi: Option<CommitSeq>,
514    pub checkpoint_epoch: Option<u64>,
515    pub recovery_epoch: Option<u64>,
516    pub fallback_active: bool,
517    pub fallback_reason: Option<ParallelWalFallbackReason>,
518    pub policy_id: Option<String>,
519    pub policy_version: Option<String>,
520}
521
522/// Optional controller action for the D1 decision plane.
523///
524/// The deterministic data plane stays correct even when the controller is
525/// disabled. These actions only tune batching and lane budgets within the
526/// declared control surface.
527#[derive(Debug, Clone, Copy, PartialEq, Eq)]
528pub enum ParallelWalDecisionAction {
529    KeepCurrent,
530    SealEpochNow,
531    IncreaseLaneBudget,
532    DecreaseLaneBudget,
533    ForceConservative,
534}
535
536/// Decision-record schema for the optional D1 controller.
537#[derive(Debug, Clone, PartialEq, Eq)]
538pub struct ParallelWalDecisionRecord {
539    pub policy_id: String,
540    pub policy_version: String,
541    pub decision_id: u64,
542    pub action: ParallelWalDecisionAction,
543    pub confidence_bps: u16,
544    pub expected_loss_micros: u64,
545    pub top_evidence_terms: Vec<String>,
546    pub counterfactual_action: ParallelWalDecisionAction,
547    pub counterfactual_regret_micros: i64,
548    pub fallback_active: bool,
549}
550
551// ---------------------------------------------------------------------------
552// Segment File I/O (D1.6)
553// ---------------------------------------------------------------------------
554
555/// Magic number for parallel WAL segment files.
556const SEGMENT_MAGIC: u32 = 0x5057_414C; // "PWAL"
557
558/// Version of the segment file format.
559const SEGMENT_VERSION: u16 = 1;
560
561/// Segment file header size in bytes.
562const SEGMENT_HEADER_SIZE: usize = 24;
563
564/// Fixed record bytes for a record without `end_seq` and without page images.
565const SEGMENT_RECORD_MIN_SIZE: usize = 8 + 4 + 8 + 4 + 8 + 1 + 4 + 4;
566
567/// Largest supported page image in a segment record.
568const MAX_SEGMENT_RECORD_IMAGE_BYTES: usize = limits::MAX_PAGE_SIZE as usize;
569
570/// Largest record payload the segment reader will allocate from an on-disk length.
571const MAX_SEGMENT_RECORD_SIZE: usize =
572    SEGMENT_RECORD_MIN_SIZE + 8 + 2 * MAX_SEGMENT_RECORD_IMAGE_BYTES;
573
574/// fsync policy for segment files.
575#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
576pub enum FsyncPolicy {
577    /// Full fsync after every write (safest, slowest).
578    #[default]
579    Full,
580    /// Fsync at epoch boundaries only.
581    Normal,
582    /// No fsync (fastest, least safe).
583    Off,
584}
585
586/// Segment file header.
587///
588/// Layout (24 bytes):
589/// ```text
590/// [0..4]   magic: u32 (0x5057414C = "PWAL")
591/// [4..6]   version: u16
592/// [6..8]   reserved: u16 (for alignment)
593/// [8..16]  epoch: u64
594/// [16..20] record_count: u32
595/// [20..24] checksum: u32 (CRC32C of header fields 0..20)
596/// ```
597#[derive(Debug, Clone, Copy)]
598pub struct SegmentHeader {
599    /// Epoch number for this segment.
600    pub epoch: u64,
601    /// Number of records in this segment.
602    pub record_count: u32,
603}
604
605impl SegmentHeader {
606    /// Create a new segment header.
607    #[must_use]
608    pub const fn new(epoch: u64, record_count: u32) -> Self {
609        Self {
610            epoch,
611            record_count,
612        }
613    }
614
615    /// Serialize the header to bytes.
616    #[must_use]
617    pub fn to_bytes(&self) -> [u8; SEGMENT_HEADER_SIZE] {
618        let mut buf = [0u8; SEGMENT_HEADER_SIZE];
619        buf[0..4].copy_from_slice(&SEGMENT_MAGIC.to_le_bytes());
620        buf[4..6].copy_from_slice(&SEGMENT_VERSION.to_le_bytes());
621        // buf[6..8] reserved
622        buf[8..16].copy_from_slice(&self.epoch.to_le_bytes());
623        buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
624        // Compute CRC32C of bytes 0..20
625        let checksum = crc32c::crc32c(&buf[0..20]);
626        buf[20..24].copy_from_slice(&checksum.to_le_bytes());
627        buf
628    }
629
630    /// Parse a header from bytes.
631    pub fn from_bytes(buf: &[u8; SEGMENT_HEADER_SIZE]) -> Result<Self, String> {
632        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
633        if magic != SEGMENT_MAGIC {
634            return Err(format!("invalid segment magic: {magic:#x}"));
635        }
636        let version = u16::from_le_bytes([buf[4], buf[5]]);
637        if version != SEGMENT_VERSION {
638            return Err(format!("unsupported segment version: {version}"));
639        }
640        let epoch = u64::from_le_bytes([
641            buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
642        ]);
643        let record_count = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
644        let stored_checksum = u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]);
645        let computed_checksum = crc32c::crc32c(&buf[0..20]);
646        if stored_checksum != computed_checksum {
647            return Err(format!(
648                "segment header checksum mismatch: stored={stored_checksum:#x}, computed={computed_checksum:#x}"
649            ));
650        }
651        Ok(Self {
652            epoch,
653            record_count,
654        })
655    }
656}
657
658/// Generate the segment file path for a given database and epoch.
659#[must_use]
660pub fn segment_path(db_path: &Path, epoch: u64) -> PathBuf {
661    let mut path = db_path.to_path_buf();
662    let file_name = path
663        .file_name()
664        .map_or_else(|| "db".to_string(), |n| n.to_string_lossy().to_string());
665    path.set_file_name(format!("{file_name}-wal-seg-{epoch:016x}"));
666    path
667}
668
669/// List all segment files for a database, sorted by epoch.
670pub fn list_segments(db_path: &Path) -> io::Result<Vec<(u64, PathBuf)>> {
671    let dir = db_path.parent().unwrap_or_else(|| Path::new("."));
672    let db_name = db_path
673        .file_name()
674        .map_or_else(|| "db".to_string(), |n| n.to_string_lossy().to_string());
675    let prefix = format!("{db_name}-wal-seg-");
676
677    let mut segments = Vec::new();
678    for entry in fs::read_dir(dir)? {
679        let entry = entry?;
680        let name = entry.file_name();
681        let name_str = name.to_string_lossy();
682        if let Some(epoch_hex) = name_str.strip_prefix(&prefix) {
683            if let Ok(epoch) = u64::from_str_radix(epoch_hex, 16) {
684                segments.push((epoch, entry.path()));
685            }
686        }
687    }
688    segments.sort_by_key(|(epoch, _)| *epoch);
689    Ok(segments)
690}
691
692/// Write a segment file for the given epoch batch.
693///
694/// The segment file contains:
695/// 1. Header with epoch and record count
696/// 2. Serialized records (length-prefixed bincode)
697///
698/// Returns the number of bytes written.
699pub fn write_segment(
700    db_path: &Path,
701    batch: &EpochFlushBatch,
702    fsync_policy: FsyncPolicy,
703) -> io::Result<usize> {
704    let path = segment_path(db_path, batch.epoch);
705
706    let ordered_records = ordered_segment_records(batch.epoch, &batch.records)?;
707    for record in &ordered_records {
708        validate_segment_record_images(record)
709            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
710    }
711    let record_count = u32::try_from(ordered_records.len()).map_err(|_| {
712        io::Error::new(
713            io::ErrorKind::InvalidInput,
714            format!(
715                "segment record count {} exceeds u32 header field",
716                ordered_records.len()
717            ),
718        )
719    })?;
720
721    let file = OpenOptions::new()
722        .write(true)
723        .create(true)
724        .truncate(true)
725        .open(&path)?;
726    let mut writer = BufWriter::new(file);
727
728    // Write header
729    let header = SegmentHeader::new(batch.epoch, record_count);
730    let header_bytes = header.to_bytes();
731    writer.write_all(&header_bytes)?;
732    let mut total_bytes = SEGMENT_HEADER_SIZE;
733
734    // Write records in canonical replay order so crash recovery is deterministic.
735    for record in &ordered_records {
736        let record_bytes =
737            serialize_record(record).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
738        let len = u32::try_from(record_bytes.len()).map_err(|_| {
739            io::Error::new(
740                io::ErrorKind::InvalidInput,
741                format!(
742                    "segment record length {} exceeds u32 length prefix",
743                    record_bytes.len()
744                ),
745            )
746        })?;
747        writer.write_all(&len.to_le_bytes())?;
748        writer.write_all(&record_bytes)?;
749        total_bytes += 4 + record_bytes.len();
750    }
751
752    writer.flush()?;
753
754    // Apply fsync policy
755    if fsync_policy == FsyncPolicy::Full || fsync_policy == FsyncPolicy::Normal {
756        writer.get_ref().sync_all()?;
757    }
758
759    Ok(total_bytes)
760}
761
762/// Read a segment file and return the records.
763pub fn read_segment(path: &Path) -> io::Result<(SegmentHeader, Vec<WalRecord>)> {
764    let file = File::open(path)?;
765    let mut reader = BufReader::new(file);
766
767    // Read header
768    let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
769    reader.read_exact(&mut header_buf)?;
770    let header = SegmentHeader::from_bytes(&header_buf)
771        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
772
773    let file_len = reader.get_ref().metadata()?.len();
774    let body_len = file_len.saturating_sub(SEGMENT_HEADER_SIZE as u64);
775    let min_record_on_disk_len = u64::try_from(4 + SEGMENT_RECORD_MIN_SIZE).map_err(|_| {
776        io::Error::new(
777            io::ErrorKind::InvalidData,
778            "minimum segment record length exceeds u64",
779        )
780    })?;
781    let max_possible_records = body_len / min_record_on_disk_len;
782    if u64::from(header.record_count) > max_possible_records {
783        return Err(io::Error::new(
784            io::ErrorKind::InvalidData,
785            format!(
786                "segment record count {} exceeds maximum possible {} for file length {}",
787                header.record_count, max_possible_records, file_len
788            ),
789        ));
790    }
791
792    // Read records
793    let record_capacity = usize::try_from(header.record_count).map_err(|_| {
794        io::Error::new(
795            io::ErrorKind::InvalidData,
796            format!(
797                "segment record count {} exceeds addressable size",
798                header.record_count
799            ),
800        )
801    })?;
802    let mut records = Vec::with_capacity(record_capacity);
803    for _ in 0..header.record_count {
804        let mut len_buf = [0u8; 4];
805        reader.read_exact(&mut len_buf)?;
806        let len = u32::from_le_bytes(len_buf) as usize;
807        if len > MAX_SEGMENT_RECORD_SIZE {
808            return Err(io::Error::new(
809                io::ErrorKind::InvalidData,
810                format!("segment record length {len} exceeds maximum {MAX_SEGMENT_RECORD_SIZE}"),
811            ));
812        }
813
814        let mut record_buf = vec![0u8; len];
815        reader.read_exact(&mut record_buf)?;
816        let record = deserialize_record(&record_buf)
817            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
818        records.push(record);
819    }
820    let consumed_len = reader.stream_position()?;
821    if consumed_len != file_len {
822        return Err(io::Error::new(
823            io::ErrorKind::InvalidData,
824            format!(
825                "segment has {} trailing bytes after declared records",
826                file_len.saturating_sub(consumed_len)
827            ),
828        ));
829    }
830
831    Ok((header, ordered_segment_records(header.epoch, &records)?))
832}
833
834/// Delete a segment file.
835pub fn delete_segment(path: &Path) -> io::Result<()> {
836    fs::remove_file(path)
837}
838
839// ---------------------------------------------------------------------------
840// Segment Recovery (D1.7)
841// ---------------------------------------------------------------------------
842
843/// Result of recovering segments for a database.
844#[derive(Debug, Clone)]
845pub struct SegmentRecoveryResult {
846    /// Number of segments recovered.
847    pub segments_recovered: usize,
848    /// Number of records applied.
849    pub records_applied: usize,
850    /// Total bytes read from segment files.
851    pub bytes_read: u64,
852    /// Epochs recovered, in order.
853    pub epochs: Vec<u64>,
854    /// Any partial segments that were skipped (truncated/corrupt).
855    pub partial_segments: Vec<PathBuf>,
856}
857
858/// Options for segment recovery.
859#[derive(Debug, Clone, Copy, Default)]
860pub struct SegmentRecoveryOptions {
861    /// Delete segment files after successful recovery.
862    pub delete_after_recovery: bool,
863    /// Stop at the first corrupt segment and return the durable prefix instead
864    /// of failing the whole recovery.
865    pub skip_corrupt: bool,
866}
867
868/// Recover all segments for a database.
869///
870/// This function:
871/// 1. Finds all segment files for the database.
872/// 2. Sorts them by epoch (ascending).
873/// 3. Reads and returns records from each segment.
874/// 4. Optionally deletes segments after recovery.
875///
876/// The caller is responsible for applying records to the database
877/// (updating page contents based on after_images).
878pub fn recover_segments(
879    db_path: &Path,
880    options: SegmentRecoveryOptions,
881) -> io::Result<(SegmentRecoveryResult, Vec<WalRecord>)> {
882    let segments = list_segments(db_path)?;
883
884    let mut result = SegmentRecoveryResult {
885        segments_recovered: 0,
886        records_applied: 0,
887        bytes_read: 0,
888        epochs: Vec::with_capacity(segments.len()),
889        partial_segments: Vec::new(),
890    };
891
892    let mut all_records = Vec::new();
893
894    for (segment_index, (epoch, path)) in segments.iter().enumerate() {
895        // Get file size for byte tracking
896        let metadata = fs::metadata(path)?;
897        let file_size = metadata.len();
898
899        // Try to read the segment
900        match read_segment(path) {
901            Ok((header, records)) => {
902                if header.epoch != *epoch {
903                    let error = io::Error::new(
904                        io::ErrorKind::InvalidData,
905                        format!(
906                            "segment {} has mismatched epoch: header={}, filename={}",
907                            path.display(),
908                            header.epoch,
909                            epoch
910                        ),
911                    );
912                    if options.skip_corrupt {
913                        eprintln!(
914                            "warning: stopping recovery at corrupt segment {}: {error}",
915                            path.display()
916                        );
917                        result.partial_segments.extend(
918                            segments[segment_index..]
919                                .iter()
920                                .map(|(_, path)| path.clone()),
921                        );
922                        break;
923                    }
924                    return Err(error);
925                }
926
927                result.segments_recovered += 1;
928                result.records_applied += records.len();
929                result.bytes_read += file_size;
930                result.epochs.push(*epoch);
931
932                all_records.extend(records);
933            }
934            Err(e) => {
935                if options.skip_corrupt {
936                    eprintln!(
937                        "warning: stopping recovery at corrupt segment {}: {e}",
938                        path.display()
939                    );
940                    result.partial_segments.extend(
941                        segments[segment_index..]
942                            .iter()
943                            .map(|(_, path)| path.clone()),
944                    );
945                    break;
946                }
947                return Err(e);
948            }
949        }
950    }
951
952    // Delete segments after successful recovery if requested
953    if options.delete_after_recovery {
954        for (_, path) in &segments {
955            // Skip segments that were partial/corrupt
956            if result.partial_segments.contains(path) {
957                continue;
958            }
959            if let Err(e) = delete_segment(path) {
960                eprintln!("warning: failed to delete segment {}: {e}", path.display());
961            }
962        }
963    }
964
965    Ok((result, EpochOrderCoordinator::recovery_order(&all_records)))
966}
967
968fn ordered_segment_records(epoch: u64, records: &[WalRecord]) -> io::Result<Vec<WalRecord>> {
969    let ordered = EpochOrderCoordinator::recovery_order(records);
970    if let Some(record) = ordered.iter().find(|record| record.epoch != epoch) {
971        return Err(io::Error::new(
972            io::ErrorKind::InvalidData,
973            format!(
974                "segment epoch {epoch} contains record from epoch {}",
975                record.epoch
976            ),
977        ));
978    }
979    Ok(ordered)
980}
981
982/// Recover segments and apply records to a page cache.
983///
984/// This is a higher-level recovery function that takes a mutable page
985/// map and applies after_images from recovered records. It returns
986/// the recovery result and the final page contents.
987///
988/// The page_contents map is keyed by page number and contains the
989/// current contents of each page. Records are applied in epoch order.
990pub fn recover_and_apply_segments(
991    db_path: &Path,
992    page_contents: &mut HashMap<u32, Vec<u8>, impl BuildHasher>,
993    options: SegmentRecoveryOptions,
994) -> io::Result<SegmentRecoveryResult> {
995    let (result, records) = recover_segments(db_path, options)?;
996
997    // Apply records in order (they're already sorted by epoch)
998    for record in records {
999        let page_id = record.page_id.get();
1000        if !record.after_image.is_empty() {
1001            page_contents.insert(page_id, record.after_image);
1002        }
1003    }
1004
1005    Ok(result)
1006}
1007
1008/// Get the maximum durable epoch from existing segment files.
1009///
1010/// This can be used to determine the recovery point after a crash.
1011/// Returns None if no segment files exist.
1012pub fn max_durable_epoch(db_path: &Path) -> io::Result<Option<u64>> {
1013    let segments = list_segments(db_path)?;
1014    Ok(segments.last().map(|(epoch, _)| *epoch))
1015}
1016
1017/// Clean up all segment files for a database.
1018///
1019/// This should be called after checkpoint when segments are no longer needed.
1020pub fn cleanup_segments(db_path: &Path) -> io::Result<usize> {
1021    let segments = list_segments(db_path)?;
1022    let count = segments.len();
1023    for (_, path) in segments {
1024        delete_segment(&path)?;
1025    }
1026    Ok(count)
1027}
1028
1029/// Serialize a WalRecord to bytes.
1030fn serialize_record(record: &WalRecord) -> Result<Vec<u8>, String> {
1031    // Simple binary format:
1032    // [8] txn_id
1033    // [4] txn_epoch
1034    // [8] record_epoch
1035    // [4] page_id
1036    // [8] begin_seq
1037    // [1] has_end_seq
1038    // [8] end_seq (if has_end_seq)
1039    // [4] before_image_len
1040    // [N] before_image
1041    // [4] after_image_len
1042    // [N] after_image
1043    validate_segment_record_images(record)?;
1044    let before_len = u32::try_from(record.before_image.len())
1045        .map_err(|_| "before_image length exceeds u32 length prefix".to_string())?;
1046    let after_len = u32::try_from(record.after_image.len())
1047        .map_err(|_| "after_image length exceeds u32 length prefix".to_string())?;
1048
1049    let mut buf = Vec::with_capacity(64 + record.before_image.len() + record.after_image.len());
1050
1051    buf.extend_from_slice(&record.txn_token.id.get().to_le_bytes());
1052    buf.extend_from_slice(&record.txn_token.epoch.get().to_le_bytes());
1053    buf.extend_from_slice(&record.epoch.to_le_bytes());
1054    buf.extend_from_slice(&record.page_id.get().to_le_bytes());
1055    buf.extend_from_slice(&record.begin_seq.get().to_le_bytes());
1056    if let Some(end_seq) = record.end_seq {
1057        buf.push(1);
1058        buf.extend_from_slice(&end_seq.get().to_le_bytes());
1059    } else {
1060        buf.push(0);
1061    }
1062    buf.extend_from_slice(&before_len.to_le_bytes());
1063    buf.extend_from_slice(&record.before_image);
1064    buf.extend_from_slice(&after_len.to_le_bytes());
1065    buf.extend_from_slice(&record.after_image);
1066
1067    Ok(buf)
1068}
1069
1070fn validate_segment_record_images(record: &WalRecord) -> Result<(), String> {
1071    validate_segment_image_len("before_image", record.before_image.len())?;
1072    validate_segment_image_len("after_image", record.after_image.len())
1073}
1074
1075fn validate_segment_image_len(field: &'static str, len: usize) -> Result<(), String> {
1076    if len > MAX_SEGMENT_RECORD_IMAGE_BYTES {
1077        return Err(format!(
1078            "{field} length {len} exceeds maximum {MAX_SEGMENT_RECORD_IMAGE_BYTES}"
1079        ));
1080    }
1081    Ok(())
1082}
1083
1084fn read_record_bytes<'a>(
1085    buf: &'a [u8],
1086    offset: &mut usize,
1087    len: usize,
1088    field: &'static str,
1089) -> Result<&'a [u8], String> {
1090    let end = offset
1091        .checked_add(len)
1092        .ok_or_else(|| format!("{field} offset overflow"))?;
1093    let bytes = buf
1094        .get(*offset..end)
1095        .ok_or_else(|| format!("{field} truncated"))?;
1096    *offset = end;
1097    Ok(bytes)
1098}
1099
1100fn read_record_u32(buf: &[u8], offset: &mut usize, field: &'static str) -> Result<u32, String> {
1101    let bytes = read_record_bytes(buf, offset, 4, field)?;
1102    Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
1103}
1104
1105fn read_record_u64(buf: &[u8], offset: &mut usize, field: &'static str) -> Result<u64, String> {
1106    let bytes = read_record_bytes(buf, offset, 8, field)?;
1107    Ok(u64::from_le_bytes([
1108        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1109    ]))
1110}
1111
1112/// Deserialize a WalRecord from bytes.
1113fn deserialize_record(buf: &[u8]) -> Result<WalRecord, String> {
1114    if buf.len() < SEGMENT_RECORD_MIN_SIZE {
1115        return Err("record too short".to_string());
1116    }
1117
1118    let mut offset = 0;
1119
1120    let txn_id = read_record_u64(buf, &mut offset, "txn_id")?;
1121    let txn_epoch = read_record_u32(buf, &mut offset, "txn_epoch")?;
1122    let record_epoch = read_record_u64(buf, &mut offset, "record_epoch")?;
1123    let page_id = read_record_u32(buf, &mut offset, "page_id")?;
1124    let begin_seq = read_record_u64(buf, &mut offset, "begin_seq")?;
1125    let has_end_seq = *read_record_bytes(buf, &mut offset, 1, "end_seq flag")?
1126        .first()
1127        .ok_or_else(|| "end_seq flag truncated".to_string())?;
1128    let end_seq = if has_end_seq == 1 {
1129        let seq = read_record_u64(buf, &mut offset, "end_seq")?;
1130        Some(CommitSeq::new(seq))
1131    } else if has_end_seq == 0 {
1132        None
1133    } else {
1134        return Err(format!("invalid end_seq flag: {has_end_seq}"));
1135    };
1136    let before_len = read_record_u32(buf, &mut offset, "before_image length")? as usize;
1137    validate_segment_image_len("before_image", before_len)?;
1138    let before_image = read_record_bytes(buf, &mut offset, before_len, "before_image")?.to_vec();
1139    let after_len = read_record_u32(buf, &mut offset, "after_image length")? as usize;
1140    validate_segment_image_len("after_image", after_len)?;
1141    let after_image = read_record_bytes(buf, &mut offset, after_len, "after_image")?.to_vec();
1142    if offset != buf.len() {
1143        return Err(format!(
1144            "trailing bytes after WAL record: {}",
1145            buf.len().saturating_sub(offset)
1146        ));
1147    }
1148
1149    let txn_id = fsqlite_types::TxnId::new(txn_id).ok_or("invalid txn_id (zero)")?;
1150    let page_id = PageNumber::new(page_id).ok_or("invalid page_id (zero)")?;
1151
1152    Ok(WalRecord {
1153        txn_token: TxnToken::new(txn_id, fsqlite_types::TxnEpoch::new(txn_epoch)),
1154        epoch: record_epoch,
1155        page_id,
1156        begin_seq: CommitSeq::new(begin_seq),
1157        end_seq,
1158        before_image,
1159        after_image,
1160    })
1161}
1162
1163// ---------------------------------------------------------------------------
1164// WAL Frame for Parallel Submission
1165// ---------------------------------------------------------------------------
1166
1167/// A WAL frame submitted for parallel writing.
1168#[derive(Debug, Clone)]
1169pub struct ParallelWalFrame {
1170    /// Page number.
1171    pub page_number: PageNumber,
1172    /// Page data (owned copy for buffering).
1173    pub page_data: Vec<u8>,
1174    /// Database size in pages for commit frames, or 0 for non-commit frames.
1175    pub db_size_if_commit: u32,
1176}
1177
1178/// A batch of WAL frames from a single transaction.
1179#[derive(Debug, Clone)]
1180pub struct ParallelWalBatch {
1181    /// Transaction token identifying this batch.
1182    pub txn_token: TxnToken,
1183    /// Commit sequence assigned to this batch.
1184    pub commit_seq: CommitSeq,
1185    /// Frames in write order.
1186    pub frames: Vec<ParallelWalFrame>,
1187}
1188
1189impl ParallelWalBatch {
1190    /// Create a new batch from the given frames.
1191    #[must_use]
1192    pub fn new(txn_token: TxnToken, commit_seq: CommitSeq, frames: Vec<ParallelWalFrame>) -> Self {
1193        Self {
1194            txn_token,
1195            commit_seq,
1196            frames,
1197        }
1198    }
1199}
1200
1201// ---------------------------------------------------------------------------
1202// Parallel WAL Coordinator
1203// ---------------------------------------------------------------------------
1204
1205/// Per-database parallel WAL coordinator.
1206///
1207/// This coordinator manages per-thread WAL buffers and epoch-based flushing.
1208/// It replaces the global WAL append mutex with lock-free per-thread appends.
1209pub struct ParallelWalCoordinator {
1210    /// The epoch-based buffer coordinator (Arc for ticker thread sharing).
1211    inner: Arc<EpochOrderCoordinator>,
1212    /// Path to the database (for segment file naming).
1213    db_path: PathBuf,
1214    /// Configuration.
1215    config: ParallelWalConfig,
1216    /// Whether the coordinator is running (Arc for ticker thread sharing).
1217    running: Arc<AtomicBool>,
1218    /// Epoch batches drained from memory but not yet durably written.
1219    pending_batches: Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1220    /// Child cancellation scope for the background ticker task.
1221    ticker_cx: Mutex<Option<Cx>>,
1222    /// Epoch ticker handle (spawned on an asupersync runtime).
1223    #[cfg(not(target_arch = "wasm32"))]
1224    ticker_handle: Mutex<Option<BlockingTaskHandle>>,
1225}
1226
1227impl std::fmt::Debug for ParallelWalCoordinator {
1228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229        f.debug_struct("ParallelWalCoordinator")
1230            .field("db_path", &self.db_path)
1231            .field("config", &self.config)
1232            .field("running", &self.running.load(Ordering::Relaxed))
1233            .finish_non_exhaustive()
1234    }
1235}
1236
1237impl ParallelWalCoordinator {
1238    /// Create a new parallel WAL coordinator for the given database path.
1239    #[must_use]
1240    pub fn new(db_path: &Path, config: ParallelWalConfig) -> Self {
1241        let buffer_config = BufferConfig {
1242            capacity_bytes: config.buffer_capacity_bytes,
1243            ..BufferConfig::default()
1244        };
1245        let epoch_config = EpochConfig {
1246            advance_interval_ms: config.epoch_interval_ms,
1247        };
1248
1249        Self {
1250            inner: Arc::new(EpochOrderCoordinator::new(
1251                config.slot_count,
1252                buffer_config,
1253                epoch_config,
1254            )),
1255            db_path: db_path.to_path_buf(),
1256            config,
1257            running: Arc::new(AtomicBool::new(false)),
1258            pending_batches: Arc::new(Mutex::new(VecDeque::new())),
1259            ticker_cx: Mutex::new(None),
1260            #[cfg(not(target_arch = "wasm32"))]
1261            ticker_handle: Mutex::new(None),
1262        }
1263    }
1264
1265    /// Get the current epoch.
1266    #[must_use]
1267    pub fn current_epoch(&self) -> u64 {
1268        self.inner.current_epoch()
1269    }
1270
1271    /// Get the durable epoch (all epochs <= this are guaranteed durable).
1272    #[must_use]
1273    pub fn durable_epoch(&self) -> Option<u64> {
1274        self.inner.durable_epoch()
1275    }
1276
1277    /// Get the buffer slot index for the current thread.
1278    #[must_use]
1279    pub fn thread_slot(&self) -> usize {
1280        thread_buffer_slot(self.config.slot_count)
1281    }
1282
1283    /// Submit a WAL frame batch for the current thread.
1284    ///
1285    /// This method appends the batch's frames to the current thread's buffer
1286    /// with NO global lock. The batch will be flushed when the epoch advances.
1287    ///
1288    /// Returns the epoch in which the batch was submitted.
1289    pub fn submit_batch(&self, batch: ParallelWalBatch) -> Result<u64, String> {
1290        let slot = self.thread_slot();
1291        let epoch = self.inner.current_append_epoch();
1292        let records = batch
1293            .frames
1294            .into_iter()
1295            .map(|frame| WalRecord {
1296                txn_token: batch.txn_token,
1297                epoch,
1298                page_id: frame.page_number,
1299                begin_seq: batch.commit_seq,
1300                end_seq: Some(batch.commit_seq),
1301                before_image: Vec::new(), // WAL frames don't have before images
1302                after_image: frame.page_data,
1303            })
1304            .collect();
1305
1306        let outcome = self.inner.append_records_to_core(slot, records)?;
1307        if matches!(outcome, AppendOutcome::Blocked) {
1308            return Err("buffer blocked, fallback to serialized path".to_string());
1309        }
1310
1311        Ok(epoch)
1312    }
1313
1314    /// Wait until the given epoch is durable.
1315    ///
1316    /// This method blocks until all frames submitted in or before `epoch`
1317    /// have been flushed to disk.
1318    pub fn wait_for_epoch_durable(&self, epoch: u64, timeout: Duration) -> Result<(), String> {
1319        self.inner.wait_until_epoch_durable(epoch, timeout)
1320    }
1321
1322    /// Start the background epoch ticker on a caller-owned asupersync runtime.
1323    ///
1324    /// The ticker task advances the epoch at the configured interval (default 10ms),
1325    /// sealing and flushing all per-thread buffers. This implements the Silo/Aether
1326    /// group commit pattern where transactions wait for their epoch to become durable.
1327    #[cfg(not(target_arch = "wasm32"))]
1328    pub fn start_on_runtime(&self, runtime: &RuntimeHandle, parent_cx: &Cx) -> Result<(), String> {
1329        self.start_on_runtime_with_fsync(runtime, parent_cx, FsyncPolicy::default())
1330    }
1331
1332    /// Start the background epoch ticker with a specific fsync policy.
1333    #[cfg(not(target_arch = "wasm32"))]
1334    pub fn start_on_runtime_with_fsync(
1335        &self,
1336        runtime: &RuntimeHandle,
1337        parent_cx: &Cx,
1338        fsync_policy: FsyncPolicy,
1339    ) -> Result<(), String> {
1340        if self.running.load(Ordering::Acquire) {
1341            return Err("coordinator already running".to_string());
1342        }
1343
1344        let prior_ticker_cx = self
1345            .ticker_cx
1346            .lock()
1347            .unwrap_or_else(std::sync::PoisonError::into_inner)
1348            .take();
1349        if let Some(ticker_cx) = prior_ticker_cx {
1350            ticker_cx.cancel();
1351        }
1352        let prior_handle = self
1353            .ticker_handle
1354            .lock()
1355            .unwrap_or_else(std::sync::PoisonError::into_inner)
1356            .take();
1357        if let Some(handle) = prior_handle {
1358            handle.wait();
1359        }
1360        if self
1361            .running
1362            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
1363            .is_err()
1364        {
1365            return Err("coordinator already running".to_string());
1366        }
1367
1368        let ticker_cx = parent_cx.create_child();
1369
1370        // Clone Arc handles for the ticker task.
1371        let running = Arc::clone(&self.running);
1372        let inner = Arc::clone(&self.inner);
1373        let db_path = self.db_path.clone();
1374        let pending_batches = Arc::clone(&self.pending_batches);
1375        let interval = Duration::from_millis(self.config.epoch_interval_ms);
1376        let flush_timeout = Duration::from_millis(self.config.epoch_interval_ms * 10);
1377        let loop_cx = ticker_cx.clone();
1378
1379        let Some(handle) = runtime.spawn_blocking(move || {
1380            epoch_ticker_loop(
1381                running,
1382                inner,
1383                db_path,
1384                pending_batches,
1385                interval,
1386                flush_timeout,
1387                fsync_policy,
1388                loop_cx,
1389            );
1390        }) else {
1391            self.running.store(false, Ordering::Release);
1392            return Err(
1393                "failed to spawn epoch ticker task: runtime has no blocking pool".to_string(),
1394            );
1395        };
1396
1397        *self
1398            .ticker_cx
1399            .lock()
1400            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(ticker_cx);
1401
1402        let mut ticker_handle = self
1403            .ticker_handle
1404            .lock()
1405            .unwrap_or_else(std::sync::PoisonError::into_inner);
1406        *ticker_handle = Some(handle);
1407
1408        Ok(())
1409    }
1410
1411    /// Stop the background epoch ticker task.
1412    ///
1413    /// Signals the ticker to stop and waits for it to complete its current
1414    /// flush cycle before returning.
1415    pub fn stop(&self) {
1416        self.running.store(false, Ordering::Release);
1417        let prior_ticker_cx = self
1418            .ticker_cx
1419            .lock()
1420            .unwrap_or_else(std::sync::PoisonError::into_inner)
1421            .take();
1422        if let Some(ticker_cx) = prior_ticker_cx {
1423            ticker_cx.cancel();
1424        }
1425
1426        #[cfg(not(target_arch = "wasm32"))]
1427        let mut handle = self
1428            .ticker_handle
1429            .lock()
1430            .unwrap_or_else(std::sync::PoisonError::into_inner);
1431        #[cfg(not(target_arch = "wasm32"))]
1432        if let Some(h) = handle.take() {
1433            h.wait();
1434        }
1435    }
1436
1437    /// Check if the background epoch ticker is running.
1438    #[must_use]
1439    pub fn is_running(&self) -> bool {
1440        self.running.load(Ordering::Acquire)
1441    }
1442
1443    /// Manually advance the epoch and flush all buffers.
1444    ///
1445    /// This is used for testing or when no background ticker is running.
1446    pub fn advance_and_flush(&self, timeout: Duration) -> Result<u64, String> {
1447        flush_pending_batches(
1448            &self.pending_batches,
1449            &self.inner,
1450            &self.db_path,
1451            FsyncPolicy::default(),
1452        )?;
1453
1454        // Slot-level buffer locks serialize a batch append against sealing, so
1455        // the top-level coordinator can advance without waiting on inactive slots.
1456        let new_epoch = self.inner.advance_epoch_and_wait(&[], timeout)?;
1457
1458        let prev_epoch = new_epoch.saturating_sub(1);
1459        let batch = self.inner.flush_epoch(prev_epoch)?;
1460        if batch.records.is_empty() {
1461            self.inner.mark_epoch_durable(prev_epoch);
1462        } else {
1463            enqueue_flush_batch(&self.pending_batches, batch);
1464            flush_pending_batches(
1465                &self.pending_batches,
1466                &self.inner,
1467                &self.db_path,
1468                FsyncPolicy::default(),
1469            )?;
1470        }
1471
1472        Ok(new_epoch)
1473    }
1474}
1475
1476impl Drop for ParallelWalCoordinator {
1477    fn drop(&mut self) {
1478        self.stop();
1479    }
1480}
1481
1482fn enqueue_flush_batch(
1483    pending_batches: &Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1484    batch: EpochFlushBatch,
1485) {
1486    let mut pending = pending_batches
1487        .lock()
1488        .unwrap_or_else(std::sync::PoisonError::into_inner);
1489    pending.push_back(batch);
1490}
1491
1492fn flush_pending_batches(
1493    pending_batches: &Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1494    inner: &EpochOrderCoordinator,
1495    db_path: &Path,
1496    fsync_policy: FsyncPolicy,
1497) -> Result<(), String> {
1498    loop {
1499        let next_batch = {
1500            let mut pending = pending_batches
1501                .lock()
1502                .unwrap_or_else(std::sync::PoisonError::into_inner);
1503            pending.pop_front()
1504        };
1505
1506        let Some(batch) = next_batch else {
1507            return Ok(());
1508        };
1509
1510        if let Err(error) = write_segment(db_path, &batch, fsync_policy) {
1511            let epoch = batch.epoch;
1512            let mut pending = pending_batches
1513                .lock()
1514                .unwrap_or_else(std::sync::PoisonError::into_inner);
1515            pending.push_front(batch);
1516            return Err(format!("write_segment({epoch}) failed: {error}"));
1517        }
1518
1519        inner.mark_epoch_durable(batch.epoch);
1520    }
1521}
1522
1523// ---------------------------------------------------------------------------
1524// Epoch Ticker Loop
1525// ---------------------------------------------------------------------------
1526
1527/// Background task loop that advances epochs and flushes WAL buffers.
1528///
1529/// This implements an epoch-based group commit pattern:
1530/// 1. Sleep for the configured interval (default 10ms).
1531/// 2. Advance the global epoch.
1532/// 3. Flush any prior pending segment writes.
1533/// 4. Seal and drain the previous epoch's buffers.
1534/// 5. Write the batch to a segment file.
1535/// 6. Mark the epoch as durable.
1536///
1537/// The loop exits when `running` is cleared or the task `Cx` is cancelled.
1538#[allow(clippy::too_many_arguments)]
1539fn epoch_ticker_loop(
1540    running: Arc<AtomicBool>,
1541    inner: Arc<EpochOrderCoordinator>,
1542    db_path: PathBuf,
1543    pending_batches: Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1544    interval: Duration,
1545    flush_timeout: Duration,
1546    fsync_policy: FsyncPolicy,
1547    ticker_cx: Cx,
1548) {
1549    while running.load(Ordering::Acquire) {
1550        if ticker_cx.checkpoint().is_err() {
1551            break;
1552        }
1553
1554        // Sleep for the epoch interval.
1555        std::thread::sleep(interval);
1556
1557        // Check if we should stop before doing work.
1558        if !running.load(Ordering::Acquire) || ticker_cx.is_cancel_requested() {
1559            break;
1560        }
1561
1562        if let Err(error) = flush_pending_batches(&pending_batches, &inner, &db_path, fsync_policy)
1563        {
1564            eprintln!("epoch ticker: {error}");
1565            continue;
1566        }
1567
1568        // Slot-level buffer locking makes batch submission atomic relative to sealing,
1569        // so we can advance without stalling on globally inactive slots.
1570        match inner.advance_epoch_and_wait(&[], flush_timeout) {
1571            Ok(new_epoch) => {
1572                let prev_epoch = new_epoch.saturating_sub(1);
1573                match inner.flush_epoch(prev_epoch) {
1574                    Ok(batch) => {
1575                        if batch.records.is_empty() {
1576                            inner.mark_epoch_durable(prev_epoch);
1577                        } else {
1578                            enqueue_flush_batch(&pending_batches, batch);
1579                            if let Err(error) = flush_pending_batches(
1580                                &pending_batches,
1581                                &inner,
1582                                &db_path,
1583                                fsync_policy,
1584                            ) {
1585                                eprintln!("epoch ticker: {error}");
1586                            }
1587                        }
1588                    }
1589                    Err(error) => {
1590                        eprintln!("epoch ticker: flush_epoch({prev_epoch}) failed: {error}");
1591                    }
1592                }
1593            }
1594            Err(error) => {
1595                eprintln!("epoch ticker: advance_epoch_and_wait failed: {error}");
1596            }
1597        }
1598    }
1599
1600    running.store(false, Ordering::Release);
1601}
1602
1603// ---------------------------------------------------------------------------
1604// Global Coordinators Registry
1605// ---------------------------------------------------------------------------
1606
1607type CoordinatorRef = Arc<ParallelWalCoordinator>;
1608
1609static PARALLEL_WAL_COORDINATORS: OnceLock<Mutex<HashMap<PathBuf, CoordinatorRef>>> =
1610    OnceLock::new();
1611
1612/// Get or create a parallel WAL coordinator for the given database path.
1613pub fn parallel_wal_coordinator_for_path(db_path: &Path) -> CoordinatorRef {
1614    let coordinators = PARALLEL_WAL_COORDINATORS.get_or_init(|| Mutex::new(HashMap::new()));
1615    let mut coordinators = coordinators
1616        .lock()
1617        .unwrap_or_else(std::sync::PoisonError::into_inner);
1618
1619    Arc::clone(
1620        coordinators
1621            .entry(db_path.to_path_buf())
1622            .or_insert_with(|| {
1623                Arc::new(ParallelWalCoordinator::new(
1624                    db_path,
1625                    ParallelWalConfig::default(),
1626                ))
1627            }),
1628    )
1629}
1630
1631/// Remove a parallel WAL coordinator for the given database path.
1632pub fn remove_parallel_wal_coordinator(db_path: &Path) {
1633    if let Some(coordinators) = PARALLEL_WAL_COORDINATORS.get() {
1634        let mut coordinators = coordinators
1635            .lock()
1636            .unwrap_or_else(std::sync::PoisonError::into_inner);
1637        if let Some(coordinator) = coordinators.remove(db_path) {
1638            coordinator.stop();
1639        }
1640    }
1641}
1642
1643// ---------------------------------------------------------------------------
1644// Tests
1645// ---------------------------------------------------------------------------
1646
1647#[cfg(test)]
1648mod tests {
1649    use super::*;
1650    use asupersync::runtime::RuntimeBuilder;
1651    use std::path::PathBuf;
1652    use std::sync::{LazyLock, Mutex, MutexGuard};
1653
1654    use crate::per_core_buffer::reset_slot_counter;
1655
1656    static PARALLEL_WAL_LANE_TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
1657
1658    fn lane_test_guard() -> MutexGuard<'static, ()> {
1659        PARALLEL_WAL_LANE_TEST_LOCK
1660            .lock()
1661            .unwrap_or_else(std::sync::PoisonError::into_inner)
1662    }
1663
1664    fn test_runtime() -> asupersync::runtime::Runtime {
1665        RuntimeBuilder::current_thread()
1666            .blocking_threads(1, 1)
1667            .build()
1668            .expect("runtime should build")
1669    }
1670
1671    fn test_cx() -> Cx {
1672        Cx::default()
1673    }
1674
1675    fn sample_batch(txn_id: u64, commit_seq: u64) -> ParallelWalBatch {
1676        ParallelWalBatch::new(
1677            TxnToken::new(
1678                fsqlite_types::TxnId::new(txn_id).expect("txn id should be non-zero"),
1679                fsqlite_types::TxnEpoch::new(0),
1680            ),
1681            CommitSeq::new(commit_seq),
1682            vec![
1683                ParallelWalFrame {
1684                    page_number: PageNumber::new(7).expect("page should be non-zero"),
1685                    page_data: vec![0xAA; 16],
1686                    db_size_if_commit: 0,
1687                },
1688                ParallelWalFrame {
1689                    page_number: PageNumber::new(9).expect("page should be non-zero"),
1690                    page_data: vec![0xBB; 24],
1691                    db_size_if_commit: 12,
1692                },
1693            ],
1694        )
1695    }
1696
1697    fn sample_lane_batch(
1698        batch_id: u64,
1699        lane_id: u16,
1700        staged_frame_count: u32,
1701        payload: u32,
1702    ) -> ParallelWalLaneBatch<u32> {
1703        ParallelWalLaneBatch {
1704            batch_id,
1705            lane_id,
1706            staged_frame_count,
1707            staging_elapsed_ns: u64::from(staged_frame_count) * 10,
1708            shadow_verdict: ParallelWalShadowVerdict::NotRun,
1709            payload,
1710        }
1711    }
1712
1713    fn sample_lane_context(batch_id: u64, lane_id: u16) -> TransactionFrameBatchContext {
1714        TransactionFrameBatchContext {
1715            batch_id,
1716            lane_id,
1717            staged_frame_count: 1,
1718            staging_elapsed_ns: 10,
1719        }
1720    }
1721
1722    #[test]
1723    fn test_parallel_wal_coordinator_creation() {
1724        let path = PathBuf::from("/tmp/test.db");
1725        let coordinator = ParallelWalCoordinator::new(&path, ParallelWalConfig::default());
1726
1727        assert_eq!(coordinator.current_epoch(), 0);
1728        assert_eq!(coordinator.durable_epoch(), None);
1729    }
1730
1731    #[test]
1732    fn test_thread_slot_assignment() {
1733        let _guard = lane_test_guard();
1734        let path = PathBuf::from("/tmp/test.db");
1735        let config = ParallelWalConfig {
1736            slot_count: 4,
1737            ..ParallelWalConfig::default()
1738        };
1739        let coordinator = ParallelWalCoordinator::new(&path, config);
1740
1741        // Thread slot should be consistent for the same thread.
1742        let slot1 = coordinator.thread_slot();
1743        let slot2 = coordinator.thread_slot();
1744        assert_eq!(slot1, slot2);
1745        assert!(slot1 < 4);
1746    }
1747
1748    #[test]
1749    fn test_lane_stager_identity_is_stable_within_thread() {
1750        let _guard = lane_test_guard();
1751        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1752            mode: ParallelWalOperatingMode::Auto,
1753            lane_count_override: Some(4),
1754            ..ParallelWalControlSurface::default()
1755        });
1756
1757        let first = stager.current_lane_id();
1758        let second = stager.current_lane_id();
1759        assert_eq!(first, second);
1760        assert!(usize::from(first) < 4);
1761    }
1762
1763    #[test]
1764    fn test_lane_stager_reuses_lanes_after_worker_churn() {
1765        let _guard = lane_test_guard();
1766        reset_slot_counter();
1767
1768        let stager = Arc::new(ParallelWalLaneStager::<u32>::new(
1769            ParallelWalControlSurface {
1770                mode: ParallelWalOperatingMode::Auto,
1771                lane_count_override: Some(2),
1772                ..ParallelWalControlSurface::default()
1773            },
1774        ));
1775
1776        let spawn_wave = || {
1777            let mut lanes = Vec::new();
1778            for _ in 0..2 {
1779                let stager = Arc::clone(&stager);
1780                lanes.push(std::thread::spawn(move || stager.current_lane_id()));
1781            }
1782            let mut observed = lanes
1783                .into_iter()
1784                .map(|handle| handle.join().expect("lane thread should join"))
1785                .collect::<Vec<_>>();
1786            observed.sort_unstable();
1787            observed
1788        };
1789
1790        assert_eq!(spawn_wave(), vec![0, 1]);
1791        assert_eq!(spawn_wave(), vec![0, 1]);
1792    }
1793
1794    #[test]
1795    fn test_lane_stager_conservative_mode_collapses_to_single_lane() {
1796        let _guard = lane_test_guard();
1797        let stager = Arc::new(ParallelWalLaneStager::<u32>::new(
1798            ParallelWalControlSurface {
1799                mode: ParallelWalOperatingMode::Conservative,
1800                lane_count_override: Some(8),
1801                ..ParallelWalControlSurface::default()
1802            },
1803        ));
1804
1805        assert_eq!(stager.lane_count(), 1);
1806
1807        let mut lanes = Vec::new();
1808        for _ in 0..2 {
1809            let stager = Arc::clone(&stager);
1810            lanes.push(std::thread::spawn(move || stager.current_lane_id()));
1811        }
1812
1813        let observed = lanes
1814            .into_iter()
1815            .map(|handle| handle.join().expect("lane thread should join"))
1816            .collect::<Vec<_>>();
1817        assert_eq!(observed, vec![0, 0]);
1818    }
1819
1820    #[test]
1821    fn test_lane_stager_clamps_lane_count_to_lane_id_range() {
1822        let _guard = lane_test_guard();
1823        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1824            mode: ParallelWalOperatingMode::Auto,
1825            lane_count_override: Some(MAX_PARALLEL_WAL_LANE_COUNT + 1),
1826            ..ParallelWalControlSurface::default()
1827        });
1828
1829        assert_eq!(stager.lane_count(), MAX_PARALLEL_WAL_LANE_COUNT);
1830        assert_eq!(stager.lane_count(), usize::from(u16::MAX));
1831        assert!(usize::from(stager.current_lane_id()) < stager.lane_count());
1832    }
1833
1834    #[test]
1835    fn test_lane_stager_same_lane_order_mismatch_returns_none_without_drain() {
1836        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1837            mode: ParallelWalOperatingMode::Auto,
1838            lane_count_override: Some(2),
1839            ..ParallelWalControlSurface::default()
1840        });
1841
1842        assert_eq!(stager.record_batch(sample_lane_batch(10, 0, 1, 10)), 1);
1843        assert_eq!(stager.record_batch(sample_lane_batch(11, 0, 1, 11)), 2);
1844
1845        let out_of_order = [sample_lane_context(11, 0), sample_lane_context(10, 0)];
1846        assert!(stager.take_batches_for_flush(&out_of_order).is_none());
1847        assert_eq!(stager.current_lane_backlog(0), 2);
1848
1849        let in_order = [sample_lane_context(10, 0), sample_lane_context(11, 0)];
1850        let drained = stager
1851            .take_batches_for_flush(&in_order)
1852            .expect("verified in-order batches should drain");
1853        assert_eq!(drained.len(), 2);
1854        assert_eq!(drained.get(&10).map(|batch| batch.payload), Some(10));
1855        assert_eq!(drained.get(&11).map(|batch| batch.payload), Some(11));
1856        assert_eq!(stager.current_lane_backlog(0), 0);
1857    }
1858
1859    #[test]
1860    fn test_lane_stager_discard_batches_for_flush_removes_stale_payloads() {
1861        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1862            mode: ParallelWalOperatingMode::Auto,
1863            lane_count_override: Some(2),
1864            ..ParallelWalControlSurface::default()
1865        });
1866
1867        assert_eq!(stager.record_batch(sample_lane_batch(10, 0, 2, 10)), 2);
1868        assert_eq!(stager.record_batch(sample_lane_batch(11, 0, 3, 11)), 5);
1869        assert_eq!(stager.record_batch(sample_lane_batch(12, 0, 5, 12)), 10);
1870
1871        assert_eq!(
1872            stager.discard_batches_for_flush(&[sample_lane_context(11, 0)]),
1873            1
1874        );
1875        assert_eq!(
1876            stager.current_lane_backlog(0),
1877            7,
1878            "discarding a stale middle batch must subtract its staged frames without disturbing retained payloads"
1879        );
1880
1881        let retained = [sample_lane_context(10, 0), sample_lane_context(12, 0)];
1882        let drained = stager
1883            .take_batches_for_flush(&retained)
1884            .expect("discarded stale payload should not block later retained batches");
1885        assert_eq!(drained.len(), 2);
1886        assert_eq!(drained.get(&10).map(|batch| batch.payload), Some(10));
1887        assert_eq!(drained.get(&12).map(|batch| batch.payload), Some(12));
1888        assert_eq!(stager.current_lane_backlog(0), 0);
1889    }
1890
1891    #[test]
1892    fn test_lane_stager_discard_batches_for_flush_is_idempotent() {
1893        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1894            mode: ParallelWalOperatingMode::Auto,
1895            lane_count_override: Some(2),
1896            ..ParallelWalControlSurface::default()
1897        });
1898
1899        assert_eq!(stager.record_batch(sample_lane_batch(20, 1, 4, 20)), 4);
1900        let context = [sample_lane_context(20, 1)];
1901        assert_eq!(stager.discard_batches_for_flush(&context), 1);
1902        assert_eq!(stager.current_lane_backlog(1), 0);
1903        assert_eq!(
1904            stager.discard_batches_for_flush(&context),
1905            0,
1906            "discarding an already-flushed raw fallback batch should be a no-op"
1907        );
1908        assert_eq!(stager.current_lane_backlog(1), 0);
1909    }
1910
1911    #[test]
1912    fn test_lane_stager_discard_batches_for_flush_ignores_unknown_ids() {
1913        let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1914            mode: ParallelWalOperatingMode::Auto,
1915            lane_count_override: Some(2),
1916            ..ParallelWalControlSurface::default()
1917        });
1918
1919        assert_eq!(stager.record_batch(sample_lane_batch(30, 0, 2, 30)), 2);
1920        assert_eq!(
1921            stager.discard_batches_for_flush(&[sample_lane_context(99, 0)]),
1922            0
1923        );
1924        assert_eq!(stager.current_lane_backlog(0), 2);
1925
1926        let drained = stager
1927            .take_batches_for_flush(&[sample_lane_context(30, 0)])
1928            .expect("unknown discard must not perturb queued batches");
1929        assert_eq!(drained.get(&30).map(|batch| batch.payload), Some(30));
1930        assert_eq!(stager.current_lane_backlog(0), 0);
1931    }
1932
1933    #[test]
1934    fn test_auto_shadow_compare_sampling_is_deterministic_by_batch_window() {
1935        let control = ParallelWalControlSurface {
1936            mode: ParallelWalOperatingMode::Auto,
1937            shadow_compare_sampling_per_mille: Some(2),
1938            ..ParallelWalControlSurface::default()
1939        };
1940
1941        assert!(parallel_wal_should_shadow_compare(&control, 1));
1942        assert!(parallel_wal_should_shadow_compare(&control, 2));
1943        assert!(!parallel_wal_should_shadow_compare(&control, 3));
1944        assert!(parallel_wal_should_shadow_compare(&control, 1_001));
1945        assert!(parallel_wal_should_shadow_compare(&control, 1_002));
1946        assert!(!parallel_wal_should_shadow_compare(&control, 1_003));
1947    }
1948
1949    #[test]
1950    fn test_shadow_compare_mode_ignores_sampling_gate() {
1951        let control = ParallelWalControlSurface {
1952            mode: ParallelWalOperatingMode::ShadowCompare,
1953            shadow_compare_sampling_per_mille: Some(0),
1954            ..ParallelWalControlSurface::default()
1955        };
1956
1957        assert!(parallel_wal_should_shadow_compare(&control, 1));
1958        assert!(parallel_wal_should_shadow_compare(&control, 7));
1959    }
1960
1961    #[test]
1962    fn test_conservative_mode_never_runs_shadow_compare_sampling() {
1963        let control = ParallelWalControlSurface {
1964            mode: ParallelWalOperatingMode::Conservative,
1965            shadow_compare_sampling_per_mille: Some(1_000),
1966            ..ParallelWalControlSurface::default()
1967        };
1968
1969        assert!(!parallel_wal_should_shadow_compare(&control, 1));
1970        assert!(!parallel_wal_should_shadow_compare(&control, 1_000));
1971    }
1972
1973    #[test]
1974    fn test_global_coordinator_registry() {
1975        let path = PathBuf::from("/tmp/test_registry.db");
1976        let coord1 = parallel_wal_coordinator_for_path(&path);
1977        let coord2 = parallel_wal_coordinator_for_path(&path);
1978
1979        // Should return the same coordinator.
1980        assert!(Arc::ptr_eq(&coord1, &coord2));
1981
1982        // Cleanup.
1983        remove_parallel_wal_coordinator(&path);
1984    }
1985
1986    #[test]
1987    fn test_epoch_ticker_start_stop() {
1988        let path = PathBuf::from("/tmp/test_ticker.db");
1989        let config = ParallelWalConfig {
1990            slot_count: 4,
1991            epoch_interval_ms: 5, // Fast interval for testing
1992            ..ParallelWalConfig::default()
1993        };
1994        let coordinator = ParallelWalCoordinator::new(&path, config);
1995        let runtime = test_runtime();
1996        let cx = test_cx();
1997
1998        // Initially not running.
1999        assert!(!coordinator.is_running());
2000
2001        // Start the ticker.
2002        coordinator
2003            .start_on_runtime(&runtime.handle(), &cx)
2004            .expect("start should succeed");
2005        assert!(coordinator.is_running());
2006
2007        // Starting again should fail.
2008        assert!(
2009            coordinator
2010                .start_on_runtime(&runtime.handle(), &cx)
2011                .is_err()
2012        );
2013
2014        // Let the ticker run for a few epochs.
2015        std::thread::sleep(Duration::from_millis(25));
2016
2017        // Epoch should be accessible (exact count depends on timing).
2018        let _epoch = coordinator.current_epoch();
2019
2020        // Stop the ticker.
2021        coordinator.stop();
2022        assert!(!coordinator.is_running());
2023
2024        // Stopping again should be a no-op (idempotent).
2025        coordinator.stop();
2026        assert!(!coordinator.is_running());
2027    }
2028
2029    #[test]
2030    fn test_epoch_ticker_advances_epochs() {
2031        let path = PathBuf::from("/tmp/test_ticker_advance.db");
2032        let config = ParallelWalConfig {
2033            slot_count: 2,        // Small slot count for testing
2034            epoch_interval_ms: 5, // Fast interval for testing
2035            ..ParallelWalConfig::default()
2036        };
2037        let coordinator = ParallelWalCoordinator::new(&path, config);
2038        let runtime = test_runtime();
2039        let cx = test_cx();
2040
2041        let initial_epoch = coordinator.current_epoch();
2042
2043        // Start the ticker and wait for several epochs.
2044        coordinator
2045            .start_on_runtime(&runtime.handle(), &cx)
2046            .expect("start should succeed");
2047        std::thread::sleep(Duration::from_millis(50));
2048        coordinator.stop();
2049
2050        let final_epoch = coordinator.current_epoch();
2051
2052        assert!(
2053            final_epoch > initial_epoch,
2054            "epoch ticker should advance without stalling on inactive slots: initial={initial_epoch}, final={final_epoch}"
2055        );
2056    }
2057
2058    #[test]
2059    fn test_epoch_ticker_restart_after_parent_cancellation() {
2060        let path = PathBuf::from("/tmp/test_ticker_restart.db");
2061        let config = ParallelWalConfig {
2062            slot_count: 2,
2063            epoch_interval_ms: 5,
2064            ..ParallelWalConfig::default()
2065        };
2066        let coordinator = ParallelWalCoordinator::new(&path, config);
2067        let runtime = test_runtime();
2068        let parent_cx = test_cx();
2069
2070        coordinator
2071            .start_on_runtime(&runtime.handle(), &parent_cx)
2072            .expect("initial start should succeed");
2073        parent_cx.cancel();
2074        std::thread::sleep(Duration::from_millis(15));
2075
2076        let replacement_cx = test_cx();
2077        coordinator
2078            .start_on_runtime(&runtime.handle(), &replacement_cx)
2079            .expect("restart after parent cancellation should drain prior task");
2080        assert!(coordinator.is_running());
2081
2082        coordinator.stop();
2083        assert!(!coordinator.is_running());
2084    }
2085
2086    #[test]
2087    fn test_submit_batch_persists_actual_frame_payloads() {
2088        use tempfile::tempdir;
2089
2090        let _guard = lane_test_guard();
2091        let dir = tempdir().expect("create temp dir");
2092        let db_path = dir.path().join("submit_batch.db");
2093        let config = ParallelWalConfig {
2094            slot_count: 1,
2095            ..ParallelWalConfig::default()
2096        };
2097        let coordinator = ParallelWalCoordinator::new(&db_path, config);
2098
2099        let epoch = coordinator
2100            .submit_batch(sample_batch(11, 77))
2101            .expect("submit should succeed");
2102        assert_eq!(epoch, 0);
2103
2104        coordinator
2105            .advance_and_flush(Duration::from_millis(50))
2106            .expect("flush should succeed");
2107        assert_eq!(coordinator.durable_epoch(), Some(0));
2108
2109        let seg_path = segment_path(&db_path, 0);
2110        let (_, records) = read_segment(&seg_path).expect("segment should read back");
2111        assert_eq!(records.len(), 2);
2112        assert_eq!(records[0].txn_token.id.get(), 11);
2113        assert_eq!(records[0].begin_seq, CommitSeq::new(77));
2114        assert_eq!(records[0].page_id.get(), 7);
2115        assert_eq!(records[0].after_image, vec![0xAA; 16]);
2116        assert_eq!(records[1].page_id.get(), 9);
2117        assert_eq!(records[1].after_image, vec![0xBB; 24]);
2118    }
2119
2120    #[test]
2121    fn test_advance_and_flush_does_not_mark_epoch_durable_on_segment_write_failure() {
2122        use tempfile::tempdir;
2123
2124        let _guard = lane_test_guard();
2125        let dir = tempdir().expect("create temp dir");
2126        let db_path = dir.path().join("missing").join("write_failure.db");
2127        let config = ParallelWalConfig {
2128            slot_count: 1,
2129            ..ParallelWalConfig::default()
2130        };
2131        let coordinator = ParallelWalCoordinator::new(&db_path, config);
2132
2133        coordinator
2134            .submit_batch(sample_batch(21, 99))
2135            .expect("submit should succeed");
2136
2137        let error = coordinator
2138            .advance_and_flush(Duration::from_millis(50))
2139            .expect_err("flush should fail when the segment directory is missing");
2140        assert!(
2141            error.contains("write_segment(0) failed"),
2142            "error should preserve the failing epoch: {error}"
2143        );
2144        assert_eq!(
2145            coordinator.durable_epoch(),
2146            None,
2147            "failed segment writes must not be reported as durable"
2148        );
2149        assert!(
2150            coordinator
2151                .wait_for_epoch_durable(0, Duration::from_millis(10))
2152                .is_err(),
2153            "durability wait must keep blocking after a failed segment write"
2154        );
2155    }
2156
2157    // -------------------------------------------------------------------------
2158    // Segment File I/O Tests
2159    // -------------------------------------------------------------------------
2160
2161    #[test]
2162    fn test_segment_header_roundtrip() {
2163        let header = SegmentHeader::new(42, 100);
2164        let bytes = header.to_bytes();
2165        let parsed = SegmentHeader::from_bytes(&bytes).expect("should parse");
2166        assert_eq!(parsed.epoch, 42);
2167        assert_eq!(parsed.record_count, 100);
2168    }
2169
2170    #[test]
2171    fn test_segment_header_invalid_magic() {
2172        let mut bytes = [0u8; SEGMENT_HEADER_SIZE];
2173        bytes[0..4].copy_from_slice(&0xDEAD_BEEFu32.to_le_bytes());
2174        let result = SegmentHeader::from_bytes(&bytes);
2175        assert!(result.is_err());
2176        assert!(result.unwrap_err().contains("invalid segment magic"));
2177    }
2178
2179    #[test]
2180    fn test_segment_header_checksum_mismatch() {
2181        let header = SegmentHeader::new(42, 100);
2182        let mut bytes = header.to_bytes();
2183        // Corrupt the epoch field
2184        bytes[8] ^= 0xFF;
2185        let result = SegmentHeader::from_bytes(&bytes);
2186        assert!(result.is_err());
2187        assert!(result.unwrap_err().contains("checksum mismatch"));
2188    }
2189
2190    #[test]
2191    fn test_segment_path_generation() {
2192        let db_path = PathBuf::from("/tmp/mydb.sqlite");
2193        let path = segment_path(&db_path, 0x1234_5678_9ABC_DEF0);
2194        assert_eq!(
2195            path.file_name().unwrap().to_str().unwrap(),
2196            "mydb.sqlite-wal-seg-123456789abcdef0"
2197        );
2198    }
2199
2200    #[test]
2201    fn test_segment_write_and_read() {
2202        use tempfile::tempdir;
2203
2204        let dir = tempdir().expect("create temp dir");
2205        let db_path = dir.path().join("test.db");
2206
2207        // Create a batch with some records
2208        let records = vec![
2209            WalRecord {
2210                txn_token: TxnToken::new(
2211                    fsqlite_types::TxnId::new(1).unwrap(),
2212                    fsqlite_types::TxnEpoch::new(0),
2213                ),
2214                epoch: 5,
2215                page_id: PageNumber::new(1).unwrap(),
2216                begin_seq: CommitSeq::new(100),
2217                end_seq: Some(CommitSeq::new(100)),
2218                before_image: vec![0u8; 32],
2219                after_image: vec![1u8; 32],
2220            },
2221            WalRecord {
2222                txn_token: TxnToken::new(
2223                    fsqlite_types::TxnId::new(2).unwrap(),
2224                    fsqlite_types::TxnEpoch::new(1),
2225                ),
2226                epoch: 5,
2227                page_id: PageNumber::new(2).unwrap(),
2228                begin_seq: CommitSeq::new(101),
2229                end_seq: None,
2230                before_image: Vec::new(),
2231                after_image: vec![2u8; 64],
2232            },
2233        ];
2234
2235        let batch = EpochFlushBatch {
2236            epoch: 5,
2237            records,
2238            records_per_core: vec![1, 1],
2239        };
2240
2241        // Write the segment
2242        let bytes_written =
2243            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2244        assert!(bytes_written > SEGMENT_HEADER_SIZE);
2245
2246        // Read it back
2247        let seg_path = segment_path(&db_path, 5);
2248        let (header, records) = read_segment(&seg_path).expect("read should succeed");
2249
2250        assert_eq!(header.epoch, 5);
2251        assert_eq!(header.record_count, 2);
2252        assert_eq!(records.len(), 2);
2253
2254        // Verify first record
2255        assert_eq!(records[0].txn_token.id.get(), 1);
2256        assert_eq!(records[0].page_id.get(), 1);
2257        assert_eq!(records[0].before_image.len(), 32);
2258        assert_eq!(records[0].after_image.len(), 32);
2259        assert_eq!(records[0].end_seq, Some(CommitSeq::new(100)));
2260
2261        // Verify second record
2262        assert_eq!(records[1].txn_token.id.get(), 2);
2263        assert_eq!(records[1].page_id.get(), 2);
2264        assert_eq!(records[1].before_image.len(), 0);
2265        assert_eq!(records[1].after_image.len(), 64);
2266        assert_eq!(records[1].end_seq, None);
2267
2268        // Cleanup
2269        delete_segment(&seg_path).expect("delete should succeed");
2270    }
2271
2272    #[test]
2273    fn test_deserialize_record_rejects_invalid_end_seq_flag() {
2274        let record = WalRecord {
2275            txn_token: TxnToken::new(
2276                fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2277                fsqlite_types::TxnEpoch::new(0),
2278            ),
2279            epoch: 5,
2280            page_id: PageNumber::new(1).expect("page should be non-zero"),
2281            begin_seq: CommitSeq::new(100),
2282            end_seq: None,
2283            before_image: Vec::new(),
2284            after_image: vec![0xAA; 8],
2285        };
2286        let mut bytes = serialize_record(&record).expect("sample record should serialize");
2287        let end_seq_flag_offset = 8 + 4 + 8 + 4 + 8;
2288        bytes[end_seq_flag_offset] = 2;
2289
2290        let error = deserialize_record(&bytes)
2291            .expect_err("invalid end_seq flag must reject corrupt record bytes");
2292        assert!(
2293            error.contains("invalid end_seq flag"),
2294            "unexpected error: {error}"
2295        );
2296    }
2297
2298    #[test]
2299    fn test_deserialize_record_rejects_trailing_bytes() {
2300        let record = WalRecord {
2301            txn_token: TxnToken::new(
2302                fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2303                fsqlite_types::TxnEpoch::new(0),
2304            ),
2305            epoch: 5,
2306            page_id: PageNumber::new(1).expect("page should be non-zero"),
2307            begin_seq: CommitSeq::new(100),
2308            end_seq: None,
2309            before_image: Vec::new(),
2310            after_image: vec![0xAA; 8],
2311        };
2312        let mut bytes = serialize_record(&record).expect("sample record should serialize");
2313        bytes.extend_from_slice(b"junk");
2314
2315        let error =
2316            deserialize_record(&bytes).expect_err("record decoder must reject trailing bytes");
2317        assert!(
2318            error.contains("trailing bytes"),
2319            "unexpected error: {error}"
2320        );
2321    }
2322
2323    #[test]
2324    fn test_read_segment_rejects_impossible_record_count_before_allocation() {
2325        use tempfile::tempdir;
2326
2327        let dir = tempdir().expect("create temp dir");
2328        let db_path = dir.path().join("impossible-count.db");
2329        let seg_path = segment_path(&db_path, 1);
2330        std::fs::write(&seg_path, SegmentHeader::new(1, u32::MAX).to_bytes())
2331            .expect("write corrupt segment header");
2332
2333        let error =
2334            read_segment(&seg_path).expect_err("impossible record count must fail before alloc");
2335        assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2336        assert!(
2337            error.to_string().contains("exceeds maximum possible"),
2338            "unexpected error: {error}"
2339        );
2340    }
2341
2342    #[test]
2343    fn test_read_segment_rejects_record_count_without_min_payload_space() {
2344        use tempfile::tempdir;
2345
2346        let dir = tempdir().expect("create temp dir");
2347        let db_path = dir.path().join("short-records.db");
2348        let seg_path = segment_path(&db_path, 1);
2349        let mut bytes = SegmentHeader::new(1, 2).to_bytes().to_vec();
2350        bytes.extend_from_slice(&0_u32.to_le_bytes());
2351        bytes.extend_from_slice(&0_u32.to_le_bytes());
2352        std::fs::write(&seg_path, bytes).expect("write corrupt segment");
2353
2354        let error = read_segment(&seg_path)
2355            .expect_err("record count must account for minimum record payload bytes");
2356        assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2357        assert!(
2358            error.to_string().contains("exceeds maximum possible"),
2359            "unexpected error: {error}"
2360        );
2361    }
2362
2363    #[test]
2364    fn test_read_segment_rejects_trailing_bytes_after_declared_records() {
2365        use tempfile::tempdir;
2366
2367        let dir = tempdir().expect("create temp dir");
2368        let db_path = dir.path().join("trailing-bytes.db");
2369        let batch = EpochFlushBatch {
2370            epoch: 1,
2371            records: vec![WalRecord {
2372                txn_token: TxnToken::new(
2373                    fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2374                    fsqlite_types::TxnEpoch::new(0),
2375                ),
2376                epoch: 1,
2377                page_id: PageNumber::new(1).expect("page should be non-zero"),
2378                begin_seq: CommitSeq::new(1),
2379                end_seq: Some(CommitSeq::new(1)),
2380                before_image: Vec::new(),
2381                after_image: vec![0xCC; 16],
2382            }],
2383            records_per_core: vec![1],
2384        };
2385        write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2386
2387        let seg_path = segment_path(&db_path, 1);
2388        {
2389            use std::io::Write as _;
2390            let mut file = OpenOptions::new()
2391                .append(true)
2392                .open(&seg_path)
2393                .expect("open segment for append");
2394            file.write_all(b"junk").expect("append trailing bytes");
2395        }
2396
2397        let error =
2398            read_segment(&seg_path).expect_err("segment decoder must reject trailing bytes");
2399        assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2400        assert!(
2401            error.to_string().contains("trailing bytes"),
2402            "unexpected error: {error}"
2403        );
2404    }
2405
2406    #[test]
2407    fn test_read_segment_rejects_oversized_record_length_before_allocation() {
2408        use tempfile::tempdir;
2409
2410        let dir = tempdir().expect("create temp dir");
2411        let db_path = dir.path().join("oversized.db");
2412        let seg_path = segment_path(&db_path, 1);
2413        let mut bytes = Vec::new();
2414        bytes.extend_from_slice(&SegmentHeader::new(1, 1).to_bytes());
2415        bytes.extend_from_slice(&u32::MAX.to_le_bytes());
2416        std::fs::write(&seg_path, bytes).expect("write corrupt segment");
2417
2418        let error =
2419            read_segment(&seg_path).expect_err("oversized record length must fail before alloc");
2420        assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2421        assert!(
2422            error.to_string().contains("exceeds maximum"),
2423            "unexpected error: {error}"
2424        );
2425    }
2426
2427    #[test]
2428    fn test_segment_write_and_recovery_canonicalize_intra_epoch_order() {
2429        use tempfile::tempdir;
2430
2431        let dir = tempdir().expect("create temp dir");
2432        let db_path = dir.path().join("ordered.db");
2433        let page_id = PageNumber::new(1).unwrap();
2434
2435        let later = WalRecord {
2436            txn_token: TxnToken::new(
2437                fsqlite_types::TxnId::new(2).unwrap(),
2438                fsqlite_types::TxnEpoch::new(0),
2439            ),
2440            epoch: 7,
2441            page_id,
2442            begin_seq: CommitSeq::new(200),
2443            end_seq: Some(CommitSeq::new(200)),
2444            before_image: Vec::new(),
2445            after_image: vec![0x22; 8],
2446        };
2447        let earlier = WalRecord {
2448            txn_token: TxnToken::new(
2449                fsqlite_types::TxnId::new(1).unwrap(),
2450                fsqlite_types::TxnEpoch::new(0),
2451            ),
2452            epoch: 7,
2453            page_id,
2454            begin_seq: CommitSeq::new(100),
2455            end_seq: Some(CommitSeq::new(100)),
2456            before_image: Vec::new(),
2457            after_image: vec![0x11; 8],
2458        };
2459        let batch = EpochFlushBatch {
2460            epoch: 7,
2461            records: vec![later, earlier],
2462            records_per_core: vec![1, 1],
2463        };
2464
2465        write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2466
2467        let seg_path = segment_path(&db_path, 7);
2468        let (_, records) = read_segment(&seg_path).expect("read should succeed");
2469        assert_eq!(records.len(), 2);
2470        assert_eq!(records[0].begin_seq, CommitSeq::new(100));
2471        assert_eq!(records[1].begin_seq, CommitSeq::new(200));
2472
2473        let mut page_contents = HashMap::new();
2474        recover_and_apply_segments(
2475            &db_path,
2476            &mut page_contents,
2477            SegmentRecoveryOptions::default(),
2478        )
2479        .expect("recovery should succeed");
2480        assert_eq!(
2481            page_contents.get(&page_id.get()),
2482            Some(&vec![0x22; 8]),
2483            "recovery must replay the later commit last even if the flushed batch arrived out of order"
2484        );
2485    }
2486
2487    #[test]
2488    fn test_write_segment_rejects_record_epoch_mismatch() {
2489        use tempfile::tempdir;
2490
2491        let dir = tempdir().expect("create temp dir");
2492        let db_path = dir.path().join("mismatch.db");
2493
2494        let batch = EpochFlushBatch {
2495            epoch: 5,
2496            records: vec![WalRecord {
2497                txn_token: TxnToken::new(
2498                    fsqlite_types::TxnId::new(1).unwrap(),
2499                    fsqlite_types::TxnEpoch::new(0),
2500                ),
2501                epoch: 4,
2502                page_id: PageNumber::new(1).unwrap(),
2503                begin_seq: CommitSeq::new(100),
2504                end_seq: Some(CommitSeq::new(100)),
2505                before_image: Vec::new(),
2506                after_image: vec![0xAB; 8],
2507            }],
2508            records_per_core: vec![1],
2509        };
2510
2511        let error = write_segment(&db_path, &batch, FsyncPolicy::Off)
2512            .expect_err("segment write must reject mixed-epoch records");
2513        assert!(
2514            error
2515                .to_string()
2516                .contains("segment epoch 5 contains record from epoch 4"),
2517            "unexpected error: {error}"
2518        );
2519        assert!(
2520            !segment_path(&db_path, 5).exists(),
2521            "failed validation must not create or truncate a segment file"
2522        );
2523    }
2524
2525    #[test]
2526    fn test_write_segment_rejects_oversized_page_image_before_create() {
2527        use tempfile::tempdir;
2528
2529        let dir = tempdir().expect("create temp dir");
2530        let db_path = dir.path().join("oversized-write.db");
2531        let batch = EpochFlushBatch {
2532            epoch: 5,
2533            records: vec![WalRecord {
2534                txn_token: TxnToken::new(
2535                    fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2536                    fsqlite_types::TxnEpoch::new(0),
2537                ),
2538                epoch: 5,
2539                page_id: PageNumber::new(1).expect("page should be non-zero"),
2540                begin_seq: CommitSeq::new(100),
2541                end_seq: Some(CommitSeq::new(100)),
2542                before_image: Vec::new(),
2543                after_image: vec![0xAB; MAX_SEGMENT_RECORD_IMAGE_BYTES + 1],
2544            }],
2545            records_per_core: vec![1],
2546        };
2547
2548        let error = write_segment(&db_path, &batch, FsyncPolicy::Off)
2549            .expect_err("segment write must reject oversized page images");
2550        assert_eq!(error.kind(), io::ErrorKind::InvalidInput);
2551        assert!(
2552            error.to_string().contains("after_image length"),
2553            "unexpected error: {error}"
2554        );
2555        assert!(
2556            !segment_path(&db_path, 5).exists(),
2557            "failed validation must not create or truncate a segment file"
2558        );
2559    }
2560
2561    #[test]
2562    fn test_list_segments() {
2563        use tempfile::tempdir;
2564
2565        let dir = tempdir().expect("create temp dir");
2566        let db_path = dir.path().join("test.db");
2567
2568        // Create a few empty segment files
2569        for epoch in [1u64, 5, 10, 2] {
2570            let batch = EpochFlushBatch {
2571                epoch,
2572                records: Vec::new(),
2573                records_per_core: Vec::new(),
2574            };
2575            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2576        }
2577
2578        // List segments
2579        let segments = list_segments(&db_path).expect("list should succeed");
2580        assert_eq!(segments.len(), 4);
2581
2582        // Should be sorted by epoch
2583        assert_eq!(segments[0].0, 1);
2584        assert_eq!(segments[1].0, 2);
2585        assert_eq!(segments[2].0, 5);
2586        assert_eq!(segments[3].0, 10);
2587
2588        // Cleanup
2589        for (_, path) in segments {
2590            delete_segment(&path).expect("delete should succeed");
2591        }
2592    }
2593
2594    // -------------------------------------------------------------------------
2595    // Segment Recovery Tests
2596    // -------------------------------------------------------------------------
2597
2598    #[test]
2599    fn test_recover_segments_basic() {
2600        use tempfile::tempdir;
2601
2602        let dir = tempdir().expect("create temp dir");
2603        let db_path = dir.path().join("test.db");
2604
2605        // Create segments for epochs 1, 2, 3
2606        for epoch in 1..=3u64 {
2607            let records = vec![WalRecord {
2608                txn_token: TxnToken::new(
2609                    fsqlite_types::TxnId::new(epoch).unwrap(),
2610                    fsqlite_types::TxnEpoch::new(0),
2611                ),
2612                epoch,
2613                page_id: PageNumber::new(epoch as u32).unwrap(),
2614                begin_seq: CommitSeq::new(epoch * 100),
2615                end_seq: Some(CommitSeq::new(epoch * 100)),
2616                before_image: Vec::new(),
2617                after_image: vec![epoch as u8; 32],
2618            }];
2619            let batch = EpochFlushBatch {
2620                epoch,
2621                records,
2622                records_per_core: vec![1],
2623            };
2624            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2625        }
2626
2627        // Recover segments
2628        let options = SegmentRecoveryOptions::default();
2629        let (result, records) =
2630            recover_segments(&db_path, options).expect("recovery should succeed");
2631
2632        assert_eq!(result.segments_recovered, 3);
2633        assert_eq!(result.records_applied, 3);
2634        assert_eq!(result.epochs, vec![1, 2, 3]);
2635        assert!(result.partial_segments.is_empty());
2636
2637        // Verify records are in epoch order
2638        assert_eq!(records.len(), 3);
2639        assert_eq!(records[0].epoch, 1);
2640        assert_eq!(records[1].epoch, 2);
2641        assert_eq!(records[2].epoch, 3);
2642
2643        // Cleanup
2644        cleanup_segments(&db_path).expect("cleanup should succeed");
2645    }
2646
2647    #[test]
2648    fn test_recover_segments_rejects_header_filename_epoch_mismatch() {
2649        use tempfile::tempdir;
2650
2651        let dir = tempdir().expect("create temp dir");
2652        let db_path = dir.path().join("rename.db");
2653        let batch = EpochFlushBatch {
2654            epoch: 5,
2655            records: vec![WalRecord {
2656                txn_token: TxnToken::new(
2657                    fsqlite_types::TxnId::new(1).unwrap(),
2658                    fsqlite_types::TxnEpoch::new(0),
2659                ),
2660                epoch: 5,
2661                page_id: PageNumber::new(1).unwrap(),
2662                begin_seq: CommitSeq::new(100),
2663                end_seq: Some(CommitSeq::new(100)),
2664                before_image: Vec::new(),
2665                after_image: vec![0xAA; 8],
2666            }],
2667            records_per_core: vec![1],
2668        };
2669        write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2670
2671        let original = segment_path(&db_path, 5);
2672        let renamed = segment_path(&db_path, 3);
2673        std::fs::rename(&original, &renamed).expect("rename should succeed");
2674
2675        let error = recover_segments(&db_path, SegmentRecoveryOptions::default())
2676            .expect_err("recovery must fail closed on mismatched epoch metadata");
2677        assert!(
2678            error.to_string().contains("mismatched epoch"),
2679            "unexpected error: {error}"
2680        );
2681
2682        let (result, records) = recover_segments(
2683            &db_path,
2684            SegmentRecoveryOptions {
2685                skip_corrupt: true,
2686                ..Default::default()
2687            },
2688        )
2689        .expect("skip_corrupt should ignore the bad segment");
2690        assert_eq!(result.segments_recovered, 0);
2691        assert_eq!(result.partial_segments, vec![renamed]);
2692        assert!(records.is_empty());
2693    }
2694
2695    #[test]
2696    fn test_recover_and_apply_segments_skip_corrupt_stops_at_first_bad_epoch() {
2697        use tempfile::tempdir;
2698
2699        let dir = tempdir().expect("create temp dir");
2700        let db_path = dir.path().join("prefix.db");
2701
2702        for epoch in 1..=3u64 {
2703            let batch = EpochFlushBatch {
2704                epoch,
2705                records: vec![WalRecord {
2706                    txn_token: TxnToken::new(
2707                        fsqlite_types::TxnId::new(epoch).unwrap(),
2708                        fsqlite_types::TxnEpoch::new(0),
2709                    ),
2710                    epoch,
2711                    page_id: PageNumber::new(1).unwrap(),
2712                    begin_seq: CommitSeq::new(epoch * 100),
2713                    end_seq: Some(CommitSeq::new(epoch * 100)),
2714                    before_image: Vec::new(),
2715                    after_image: vec![epoch as u8; 16],
2716                }],
2717                records_per_core: vec![1],
2718            };
2719            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2720        }
2721
2722        let corrupt_epoch_path = segment_path(&db_path, 2);
2723        std::fs::write(&corrupt_epoch_path, [0xFF_u8; 8]).expect("corrupt write should succeed");
2724
2725        let mut page_contents = HashMap::new();
2726        let result = recover_and_apply_segments(
2727            &db_path,
2728            &mut page_contents,
2729            SegmentRecoveryOptions {
2730                skip_corrupt: true,
2731                ..Default::default()
2732            },
2733        )
2734        .expect("skip_corrupt should return the durable prefix");
2735
2736        assert_eq!(result.segments_recovered, 1);
2737        assert_eq!(result.records_applied, 1);
2738        assert_eq!(result.epochs, vec![1]);
2739        assert_eq!(
2740            result.partial_segments,
2741            vec![segment_path(&db_path, 2), segment_path(&db_path, 3)]
2742        );
2743
2744        let page = page_contents
2745            .get(&1)
2746            .expect("prefix recovery should apply the last durable epoch only");
2747        assert!(
2748            page.iter().all(|&byte| byte == 1),
2749            "recovery must stop before epoch 3 once epoch 2 is corrupt"
2750        );
2751    }
2752
2753    #[test]
2754    fn test_recover_and_apply_segments() {
2755        use tempfile::tempdir;
2756
2757        let dir = tempdir().expect("create temp dir");
2758        let db_path = dir.path().join("test.db");
2759
2760        // Create segments that update the same page multiple times
2761        let page_id = 1u32;
2762        for epoch in 1..=3u64 {
2763            let records = vec![WalRecord {
2764                txn_token: TxnToken::new(
2765                    fsqlite_types::TxnId::new(epoch).unwrap(),
2766                    fsqlite_types::TxnEpoch::new(0),
2767                ),
2768                epoch,
2769                page_id: PageNumber::new(page_id).unwrap(),
2770                begin_seq: CommitSeq::new(epoch * 100),
2771                end_seq: Some(CommitSeq::new(epoch * 100)),
2772                before_image: Vec::new(),
2773                after_image: vec![epoch as u8; 32], // Different content each epoch
2774            }];
2775            let batch = EpochFlushBatch {
2776                epoch,
2777                records,
2778                records_per_core: vec![1],
2779            };
2780            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2781        }
2782
2783        // Recover and apply to page cache
2784        let mut page_contents = HashMap::new();
2785        let options = SegmentRecoveryOptions {
2786            delete_after_recovery: true,
2787            ..Default::default()
2788        };
2789        let result = recover_and_apply_segments(&db_path, &mut page_contents, options)
2790            .expect("should succeed");
2791
2792        assert_eq!(result.segments_recovered, 3);
2793
2794        // Page should have the final epoch's contents (epoch 3)
2795        let page = page_contents.get(&page_id).expect("page should exist");
2796        assert_eq!(page.len(), 32);
2797        assert!(page.iter().all(|&b| b == 3), "should have epoch 3 content");
2798
2799        // Segments should be deleted
2800        let remaining = list_segments(&db_path).expect("list should succeed");
2801        assert!(
2802            remaining.is_empty(),
2803            "segments should be deleted after recovery"
2804        );
2805    }
2806
2807    #[test]
2808    fn test_max_durable_epoch() {
2809        use tempfile::tempdir;
2810
2811        let dir = tempdir().expect("create temp dir");
2812        let db_path = dir.path().join("test.db");
2813
2814        // Initially no segments
2815        let max = max_durable_epoch(&db_path).expect("should succeed");
2816        assert_eq!(max, None);
2817
2818        // Create segments
2819        for epoch in [5u64, 10, 3] {
2820            let batch = EpochFlushBatch {
2821                epoch,
2822                records: Vec::new(),
2823                records_per_core: Vec::new(),
2824            };
2825            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2826        }
2827
2828        // Max should be 10
2829        let max = max_durable_epoch(&db_path).expect("should succeed");
2830        assert_eq!(max, Some(10));
2831
2832        // Cleanup
2833        cleanup_segments(&db_path).expect("cleanup should succeed");
2834
2835        // Now max should be None again
2836        let max = max_durable_epoch(&db_path).expect("should succeed");
2837        assert_eq!(max, None);
2838    }
2839
2840    #[test]
2841    fn test_cleanup_segments() {
2842        use tempfile::tempdir;
2843
2844        let dir = tempdir().expect("create temp dir");
2845        let db_path = dir.path().join("test.db");
2846
2847        // Create segments
2848        for epoch in 1..=5u64 {
2849            let batch = EpochFlushBatch {
2850                epoch,
2851                records: Vec::new(),
2852                records_per_core: Vec::new(),
2853            };
2854            write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2855        }
2856
2857        // Verify segments exist
2858        let segments = list_segments(&db_path).expect("list should succeed");
2859        assert_eq!(segments.len(), 5);
2860
2861        // Cleanup
2862        let count = cleanup_segments(&db_path).expect("cleanup should succeed");
2863        assert_eq!(count, 5);
2864
2865        // Verify segments are gone
2866        let segments = list_segments(&db_path).expect("list should succeed");
2867        assert!(segments.is_empty());
2868    }
2869
2870    #[test]
2871    fn parallel_wal_config_default_values() {
2872        let cfg = ParallelWalConfig::default();
2873        assert_eq!(cfg.slot_count, DEFAULT_BUFFER_SLOT_COUNT);
2874        assert_eq!(cfg.epoch_interval_ms, 10);
2875        assert_eq!(cfg.buffer_capacity_bytes, 4 * 1024 * 1024);
2876        let copied = cfg;
2877        assert_eq!(copied.epoch_interval_ms, cfg.epoch_interval_ms);
2878        let dbg = format!("{cfg:?}");
2879        assert!(dbg.contains("ParallelWalConfig"));
2880    }
2881
2882    #[test]
2883    fn parallel_wal_fallback_reason_all_variants_debug_copy_eq() {
2884        let variants = [
2885            ParallelWalFallbackReason::OperatorForced,
2886            ParallelWalFallbackReason::LaneOverflow,
2887            ParallelWalFallbackReason::CertificateGap,
2888            ParallelWalFallbackReason::CertificateChecksumMismatch,
2889            ParallelWalFallbackReason::PublicationMismatch,
2890            ParallelWalFallbackReason::RecoveryGap,
2891            ParallelWalFallbackReason::CheckpointConflict,
2892            ParallelWalFallbackReason::ControllerEvidenceLost,
2893        ];
2894        for (i, v) in variants.iter().enumerate() {
2895            let copied = *v;
2896            assert_eq!(copied, *v);
2897            for (j, w) in variants.iter().enumerate() {
2898                assert_eq!(i == j, v == w);
2899            }
2900        }
2901        let dbg = format!("{:?}", ParallelWalFallbackReason::CertificateGap);
2902        assert!(dbg.contains("CertificateGap"));
2903    }
2904
2905    #[test]
2906    fn parallel_wal_control_surface_default_and_eq() {
2907        let def = ParallelWalControlSurface::default();
2908        assert_eq!(def.mode, ParallelWalOperatingMode::Auto);
2909        assert!(def.lane_count_override.is_none());
2910        assert!(def.helper_lane_budget.is_none());
2911        assert!(def.max_parallel_commit_bytes.is_none());
2912        assert!(def.max_flush_delay_ms.is_none());
2913        assert!(def.shadow_compare_sampling_per_mille.is_none());
2914        let other = ParallelWalControlSurface {
2915            mode: ParallelWalOperatingMode::Conservative,
2916            ..ParallelWalControlSurface::default()
2917        };
2918        assert_ne!(def, other);
2919        let dbg = format!("{def:?}");
2920        assert!(dbg.contains("ParallelWalControlSurface"));
2921    }
2922
2923    #[test]
2924    fn parallel_wal_ordered_residue_and_shadow_verdict_defaults() {
2925        let residue = ParallelWalOrderedResidue::default();
2926        assert_eq!(
2927            residue,
2928            ParallelWalOrderedResidue::CommitCertificateThenPublish
2929        );
2930        let copied = residue;
2931        assert_eq!(copied, residue);
2932
2933        let verdict = ParallelWalShadowVerdict::default();
2934        assert_eq!(verdict, ParallelWalShadowVerdict::NotRun);
2935        assert_ne!(verdict, ParallelWalShadowVerdict::Clean);
2936        assert_ne!(
2937            ParallelWalShadowVerdict::Clean,
2938            ParallelWalShadowVerdict::Diverged
2939        );
2940        let dbg = format!("{verdict:?}");
2941        assert!(dbg.contains("NotRun"));
2942    }
2943
2944    #[test]
2945    fn decision_action_all_variants_copy_eq() {
2946        let variants = [
2947            ParallelWalDecisionAction::KeepCurrent,
2948            ParallelWalDecisionAction::SealEpochNow,
2949            ParallelWalDecisionAction::IncreaseLaneBudget,
2950            ParallelWalDecisionAction::DecreaseLaneBudget,
2951            ParallelWalDecisionAction::ForceConservative,
2952        ];
2953        for (i, a) in variants.iter().enumerate() {
2954            let copied = *a;
2955            assert_eq!(copied, *a);
2956            for (j, b) in variants.iter().enumerate() {
2957                assert_eq!(i == j, a == b);
2958            }
2959        }
2960    }
2961
2962    #[test]
2963    fn fsync_policy_all_variants_and_default() {
2964        assert_eq!(FsyncPolicy::default(), FsyncPolicy::Full);
2965        let variants = [FsyncPolicy::Full, FsyncPolicy::Normal, FsyncPolicy::Off];
2966        for (i, a) in variants.iter().enumerate() {
2967            let copied = *a;
2968            assert_eq!(copied, *a);
2969            for (j, b) in variants.iter().enumerate() {
2970                assert_eq!(i == j, a == b);
2971            }
2972        }
2973        let dbg = format!("{:?}", FsyncPolicy::Off);
2974        assert!(dbg.contains("Off"));
2975    }
2976
2977    #[test]
2978    fn trace_record_clone_eq_debug() {
2979        let tr = ParallelWalTraceRecord {
2980            component: "test".into(),
2981            trace_id: 1,
2982            decision_id: None,
2983            mode: ParallelWalOperatingMode::Auto,
2984            lane_id: Some(0),
2985            epoch: Some(5),
2986            commit_seq_lo: None,
2987            commit_seq_hi: None,
2988            checkpoint_epoch: None,
2989            recovery_epoch: None,
2990            fallback_active: false,
2991            fallback_reason: None,
2992            policy_id: None,
2993            policy_version: None,
2994        };
2995        let cloned = tr.clone();
2996        assert_eq!(cloned, tr);
2997        let dbg = format!("{tr:?}");
2998        assert!(dbg.contains("ParallelWalTraceRecord"));
2999    }
3000
3001    #[test]
3002    fn commit_certificate_clone_eq_debug() {
3003        let cert = ParallelWalCommitCertificate {
3004            format_version: 1,
3005            residue: ParallelWalOrderedResidue::default(),
3006            certificate_epoch: 10,
3007            commit_seq_lo: CommitSeq::new(1),
3008            commit_seq_hi: CommitSeq::new(5),
3009            durable_segment_epoch: 9,
3010            lane_count: 4,
3011            lane_record_counts: vec![2, 3, 0, 1],
3012            db_size_pages: 100,
3013            page_set_size: 6,
3014            certificate_crc32c: 0,
3015            fallback_active: false,
3016        };
3017        let cloned = cert.clone();
3018        assert_eq!(cloned, cert);
3019        assert_eq!(cert.lane_count, 4);
3020        assert_eq!(cert.lane_record_counts.len(), 4);
3021        let dbg = format!("{cert:?}");
3022        assert!(dbg.contains("ParallelWalCommitCertificate"));
3023    }
3024}