Skip to main content

fsqlite_wal/
per_core_buffer.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex, TryLockError};
4use std::time::{Duration, Instant};
5
6#[cfg(test)]
7use asupersync::runtime::{Runtime, RuntimeBuilder, spawn_blocking};
8#[cfg(test)]
9use asupersync::time::{sleep, wall_now};
10use fsqlite_types::{CommitSeq, PageNumber, TxnEpoch, TxnId, TxnToken};
11
12/// Per-core WAL buffer capacity in bytes. With N CPU cores, the total
13/// memory commitment is N × 4 MiB. On memory-constrained systems, reduce
14/// this via `PerCoreBuffer::with_capacity()`. On servers with large L3
15/// caches, increasing this can reduce flush frequency.
16const DEFAULT_BUFFER_CAPACITY_BYTES: usize = 4 * 1024 * 1024;
17/// Overflow fallback buffer size when the per-core buffer is full.
18const DEFAULT_OVERFLOW_FALLBACK_BYTES: usize = 8 * 1024 * 1024;
19const RECORD_FIXED_OVERHEAD_BYTES: usize = 48;
20/// Epoch advance interval for flushing batched WAL entries.
21const DEFAULT_EPOCH_ADVANCE_INTERVAL_MS: u64 = 10;
22
23/// Default number of buffer slots.
24///
25/// With 128 slots, 16 threads have only 12.5% probability of two threads
26/// landing on the same slot. For systems with fewer cores, a smaller slot
27/// count reduces memory overhead.
28pub const DEFAULT_BUFFER_SLOT_COUNT: usize = 128;
29
30/// Global counter for assigning thread-local buffer slot indices.
31/// Each thread gets a unique slot on first access via `thread_buffer_slot()`.
32static NEXT_BUFFER_SLOT: AtomicUsize = AtomicUsize::new(0);
33
34std::thread_local! {
35    /// Each thread's assigned buffer slot index. Assigned on first access
36    /// via atomic increment of `NEXT_BUFFER_SLOT`, then modulo slot count.
37    static THREAD_BUFFER_SLOT: usize =
38        NEXT_BUFFER_SLOT.fetch_add(1, Ordering::Relaxed);
39}
40
41/// Returns the current thread's assigned buffer slot index, modulo `slot_count`.
42///
43/// This slot is stable for the lifetime of the thread. Different threads
44/// get different slots (until wrap-around at `slot_count`).
45#[inline]
46pub fn thread_buffer_slot(slot_count: usize) -> usize {
47    THREAD_BUFFER_SLOT.with(|&slot| slot % slot_count)
48}
49
50/// Resets the global slot counter. Only for testing.
51#[cfg(test)]
52pub fn reset_slot_counter() {
53    NEXT_BUFFER_SLOT.store(0, Ordering::SeqCst);
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58    BlockWriter,
59    AllocateOverflow,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub struct BufferConfig {
64    pub capacity_bytes: usize,
65    pub overflow_policy: OverflowPolicy,
66    pub overflow_fallback_bytes: usize,
67}
68
69impl Default for BufferConfig {
70    fn default() -> Self {
71        Self {
72            capacity_bytes: DEFAULT_BUFFER_CAPACITY_BYTES,
73            overflow_policy: OverflowPolicy::AllocateOverflow,
74            overflow_fallback_bytes: DEFAULT_OVERFLOW_FALLBACK_BYTES,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum BufferState {
81    Writable,
82    Sealed { epoch: u64 },
83    Flushing { epoch: u64 },
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum AppendOutcome {
88    Appended,
89    QueuedOverflow,
90    Blocked,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum FallbackDecision {
95    ContinueParallel,
96    ForceSerializedDrain,
97}
98
99#[derive(Debug, Clone)]
100pub struct WalRecord {
101    pub txn_token: TxnToken,
102    pub epoch: u64,
103    pub page_id: PageNumber,
104    pub begin_seq: CommitSeq,
105    pub end_seq: Option<CommitSeq>,
106    pub before_image: Vec<u8>,
107    pub after_image: Vec<u8>,
108}
109
110impl WalRecord {
111    fn encoded_len(&self) -> usize {
112        let metadata_guard = self.txn_token.id.get()
113            ^ u64::from(self.txn_token.epoch.get())
114            ^ u64::from(self.page_id.get())
115            ^ self.epoch
116            ^ self.begin_seq.get()
117            ^ self.end_seq.map_or(0, CommitSeq::get);
118
119        let metadata_bytes = if metadata_guard == u64::MAX {
120            RECORD_FIXED_OVERHEAD_BYTES + 1
121        } else {
122            RECORD_FIXED_OVERHEAD_BYTES
123        };
124
125        metadata_bytes
126            .saturating_add(self.before_image.len())
127            .saturating_add(self.after_image.len())
128    }
129}
130
131#[derive(Debug, Clone)]
132struct BufferLane {
133    state: BufferState,
134    bytes_used: usize,
135    records: Vec<WalRecord>,
136}
137
138impl BufferLane {
139    fn new_writable() -> Self {
140        Self {
141            state: BufferState::Writable,
142            bytes_used: 0,
143            records: Vec::new(),
144        }
145    }
146}
147
148#[derive(Debug)]
149pub struct PerCoreWalBuffer {
150    config: BufferConfig,
151    active: BufferLane,
152    flush_lane: BufferLane,
153    overflow: VecDeque<WalRecord>,
154    overflow_bytes: usize,
155    fallback_latched: bool,
156}
157
158impl PerCoreWalBuffer {
159    pub fn new(_core_id: usize, config: BufferConfig) -> Self {
160        Self {
161            config,
162            active: BufferLane::new_writable(),
163            flush_lane: BufferLane::new_writable(),
164            overflow: VecDeque::new(),
165            overflow_bytes: 0,
166            fallback_latched: false,
167        }
168    }
169
170    pub fn append(&mut self, record: WalRecord) -> AppendOutcome {
171        if self.active.state != BufferState::Writable {
172            return AppendOutcome::Blocked;
173        }
174
175        let needed = record.encoded_len();
176        if needed > self.config.capacity_bytes {
177            self.fallback_latched = true;
178            return AppendOutcome::Blocked;
179        }
180
181        if self.active.bytes_used.saturating_add(needed) <= self.config.capacity_bytes {
182            self.active.bytes_used = self.active.bytes_used.saturating_add(needed);
183            self.active.records.push(record);
184            return AppendOutcome::Appended;
185        }
186
187        match self.config.overflow_policy {
188            OverflowPolicy::BlockWriter => AppendOutcome::Blocked,
189            OverflowPolicy::AllocateOverflow => {
190                self.overflow_bytes = self.overflow_bytes.saturating_add(needed);
191                self.overflow.push_back(record);
192                if self.overflow_bytes > self.config.overflow_fallback_bytes {
193                    self.fallback_latched = true;
194                }
195                AppendOutcome::QueuedOverflow
196            }
197        }
198    }
199
200    pub fn append_batch(&mut self, records: Vec<WalRecord>) -> AppendOutcome {
201        if self.active.state != BufferState::Writable {
202            return AppendOutcome::Blocked;
203        }
204
205        let mut total_needed = 0usize;
206        for record in &records {
207            let needed = record.encoded_len();
208            if needed > self.config.capacity_bytes {
209                self.fallback_latched = true;
210                return AppendOutcome::Blocked;
211            }
212            total_needed = total_needed.saturating_add(needed);
213        }
214
215        if self.config.overflow_policy == OverflowPolicy::BlockWriter
216            && self.active.bytes_used.saturating_add(total_needed) > self.config.capacity_bytes
217        {
218            return AppendOutcome::Blocked;
219        }
220
221        let mut queued_overflow = false;
222        for record in records {
223            match self.append(record) {
224                AppendOutcome::Appended => {}
225                AppendOutcome::QueuedOverflow => queued_overflow = true,
226                AppendOutcome::Blocked => return AppendOutcome::Blocked,
227            }
228        }
229
230        if queued_overflow {
231            AppendOutcome::QueuedOverflow
232        } else {
233            AppendOutcome::Appended
234        }
235    }
236
237    pub fn seal_active(&mut self, epoch: u64) -> Result<(), &'static str> {
238        if self.active.state != BufferState::Writable {
239            return Err("active lane is not writable");
240        }
241        self.active.state = BufferState::Sealed { epoch };
242        Ok(())
243    }
244
245    pub fn begin_flush(&mut self) -> Result<usize, &'static str> {
246        let BufferState::Sealed { epoch } = self.active.state else {
247            return Err("active lane must be sealed before flush");
248        };
249
250        if self.flush_lane.state != BufferState::Writable {
251            return Err("flush lane must be writable before flush");
252        }
253        if !self.flush_lane.records.is_empty() || self.flush_lane.bytes_used != 0 {
254            return Err("flush lane must be empty before flush");
255        }
256
257        std::mem::swap(&mut self.active, &mut self.flush_lane);
258        self.flush_lane.state = BufferState::Flushing { epoch };
259        Ok(self.flush_lane.records.len())
260    }
261
262    pub fn complete_flush(&mut self) -> Result<(), &'static str> {
263        if !matches!(self.flush_lane.state, BufferState::Flushing { .. }) {
264            return Err("flush lane is not in flushing state");
265        }
266
267        self.flush_lane.records.clear();
268        self.flush_lane.bytes_used = 0;
269        self.flush_lane.state = BufferState::Writable;
270        self.drain_overflow_into_active();
271        Ok(())
272    }
273
274    pub fn fallback_decision(&self) -> FallbackDecision {
275        if self.fallback_latched {
276            FallbackDecision::ForceSerializedDrain
277        } else {
278            FallbackDecision::ContinueParallel
279        }
280    }
281
282    pub fn force_serialized_drain(&mut self) -> Vec<WalRecord> {
283        let mut drained = Vec::with_capacity(
284            self.active
285                .records
286                .len()
287                .saturating_add(self.flush_lane.records.len())
288                .saturating_add(self.overflow.len()),
289        );
290        drained.append(&mut self.active.records);
291        drained.append(&mut self.flush_lane.records);
292        drained.extend(self.overflow.drain(..));
293
294        self.active.bytes_used = 0;
295        self.active.state = BufferState::Writable;
296
297        self.flush_lane.bytes_used = 0;
298        self.flush_lane.state = BufferState::Writable;
299
300        self.overflow_bytes = 0;
301        self.fallback_latched = false;
302        drained
303    }
304
305    pub fn active_state(&self) -> BufferState {
306        self.active.state
307    }
308
309    pub fn flush_state(&self) -> BufferState {
310        self.flush_lane.state
311    }
312
313    pub fn active_len(&self) -> usize {
314        self.active.records.len()
315    }
316
317    pub fn flush_len(&self) -> usize {
318        self.flush_lane.records.len()
319    }
320
321    pub fn overflow_len(&self) -> usize {
322        self.overflow.len()
323    }
324
325    fn drain_overflow_into_active(&mut self) {
326        if self.active.state != BufferState::Writable {
327            return;
328        }
329
330        while let Some(front) = self.overflow.front() {
331            let needed = front.encoded_len();
332            if self.active.bytes_used.saturating_add(needed) > self.config.capacity_bytes {
333                break;
334            }
335
336            let Some(record) = self.overflow.pop_front() else {
337                break;
338            };
339            self.overflow_bytes = self.overflow_bytes.saturating_sub(needed);
340            self.active.bytes_used = self.active.bytes_used.saturating_add(needed);
341            self.active.records.push(record);
342        }
343
344        if self.overflow.is_empty() {
345            self.fallback_latched = false;
346        }
347    }
348}
349
350#[derive(Debug)]
351struct BufferCell {
352    inner: Mutex<PerCoreWalBuffer>,
353    contention_events: AtomicU64,
354}
355
356impl BufferCell {
357    fn new(core_id: usize, config: BufferConfig) -> Self {
358        Self {
359            inner: Mutex::new(PerCoreWalBuffer::new(core_id, config)),
360            contention_events: AtomicU64::new(0),
361        }
362    }
363
364    fn append(&self, record: WalRecord) -> AppendOutcome {
365        match self.inner.try_lock() {
366            Ok(mut guard) => guard.append(record),
367            Err(TryLockError::WouldBlock) => {
368                self.contention_events.fetch_add(1, Ordering::Relaxed);
369                let mut guard = self
370                    .inner
371                    .lock()
372                    .unwrap_or_else(std::sync::PoisonError::into_inner);
373                guard.append(record)
374            }
375            Err(TryLockError::Poisoned(poisoned)) => {
376                let mut guard = poisoned.into_inner();
377                guard.append(record)
378            }
379        }
380    }
381
382    fn append_batch(&self, records: Vec<WalRecord>) -> AppendOutcome {
383        match self.inner.try_lock() {
384            Ok(mut guard) => guard.append_batch(records),
385            Err(TryLockError::WouldBlock) => {
386                self.contention_events.fetch_add(1, Ordering::Relaxed);
387                let mut guard = self
388                    .inner
389                    .lock()
390                    .unwrap_or_else(std::sync::PoisonError::into_inner);
391                guard.append_batch(records)
392            }
393            Err(TryLockError::Poisoned(poisoned)) => {
394                let mut guard = poisoned.into_inner();
395                guard.append_batch(records)
396            }
397        }
398    }
399
400    fn contention_events(&self) -> u64 {
401        self.contention_events.load(Ordering::Relaxed)
402    }
403}
404
405#[derive(Debug)]
406pub struct PerCoreWalBufferPool {
407    cells: Vec<BufferCell>,
408}
409
410impl PerCoreWalBufferPool {
411    pub fn new(core_count: usize, config: BufferConfig) -> Self {
412        assert!(core_count > 0, "core_count must be > 0");
413        let mut cells = Vec::with_capacity(core_count);
414        for core_id in 0..core_count {
415            cells.push(BufferCell::new(core_id, config));
416        }
417        Self { cells }
418    }
419
420    pub fn append_to_core(
421        &self,
422        core_id: usize,
423        record: WalRecord,
424    ) -> Result<AppendOutcome, String> {
425        let Some(cell) = self.cells.get(core_id) else {
426            return Err(format!(
427                "invalid core_id={core_id}; available cores={}",
428                self.cells.len()
429            ));
430        };
431        Ok(cell.append(record))
432    }
433
434    pub fn append_batch_to_core(
435        &self,
436        core_id: usize,
437        records: Vec<WalRecord>,
438    ) -> Result<AppendOutcome, String> {
439        let Some(cell) = self.cells.get(core_id) else {
440            return Err(format!(
441                "invalid core_id={core_id}; available cores={}",
442                self.cells.len()
443            ));
444        };
445        Ok(cell.append_batch(records))
446    }
447
448    pub fn contention_events_total(&self) -> u64 {
449        self.cells
450            .iter()
451            .map(BufferCell::contention_events)
452            .sum::<u64>()
453    }
454
455    pub fn core_count(&self) -> usize {
456        self.cells.len()
457    }
458
459    pub fn seal_active_for_epoch(&self, epoch: u64) -> Result<usize, String> {
460        let mut sealed_lanes = 0_usize;
461
462        for (core_id, cell) in self.cells.iter().enumerate() {
463            let did_seal = {
464                let mut guard = cell
465                    .inner
466                    .lock()
467                    .unwrap_or_else(std::sync::PoisonError::into_inner);
468                if guard.active_state() == BufferState::Writable && guard.active_len() > 0 {
469                    guard.seal_active(epoch).map_err(|error| {
470                        format!(
471                            "core {core_id}: failed to seal active lane for epoch {epoch}: {error}"
472                        )
473                    })?;
474                    true
475                } else {
476                    false
477                }
478            };
479
480            if did_seal {
481                sealed_lanes = sealed_lanes.saturating_add(1);
482            }
483        }
484
485        Ok(sealed_lanes)
486    }
487
488    pub fn flush_epoch_batches(&self, epoch: u64) -> Result<(Vec<WalRecord>, Vec<usize>), String> {
489        let mut all_records = Vec::new();
490        let mut records_per_core = Vec::with_capacity(self.cells.len());
491
492        for (core_id, cell) in self.cells.iter().enumerate() {
493            let core_records = {
494                let mut guard = cell
495                    .inner
496                    .lock()
497                    .unwrap_or_else(std::sync::PoisonError::into_inner);
498
499                if matches!(guard.active_state(), BufferState::Sealed { epoch: sealed_epoch } if sealed_epoch == epoch)
500                {
501                    guard.begin_flush().map_err(|error| {
502                        format!("core {core_id}: begin_flush failed for epoch {epoch}: {error}")
503                    })?;
504                }
505
506                let should_drain = matches!(
507                    guard.flush_state(),
508                    BufferState::Flushing {
509                        epoch: flushing_epoch
510                    } if flushing_epoch == epoch
511                );
512
513                let core_records = if should_drain {
514                    // Records without an end_seq represent aborted writes and must never
515                    // participate in durable replay ordering.
516                    guard
517                        .flush_lane
518                        .records
519                        .retain(|record| record.end_seq.is_some());
520
521                    if guard
522                        .flush_lane
523                        .records
524                        .iter()
525                        .any(|record| record.epoch != epoch)
526                    {
527                        return Err(format!(
528                            "core {core_id}: epoch boundary straddle detected in flush lane for epoch {epoch}"
529                        ));
530                    }
531
532                    let core_records = std::mem::take(&mut guard.flush_lane.records);
533                    guard.flush_lane.bytes_used = 0;
534                    guard.flush_lane.state = BufferState::Writable;
535                    guard.drain_overflow_into_active();
536                    Some(core_records)
537                } else {
538                    None
539                };
540
541                drop(guard);
542                core_records
543            };
544
545            if let Some(core_records) = core_records {
546                records_per_core.push(core_records.len());
547                all_records.extend(core_records);
548            } else {
549                records_per_core.push(0);
550            }
551        }
552
553        Ok((all_records, records_per_core))
554    }
555}
556
557#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub struct EpochConfig {
559    pub advance_interval_ms: u64,
560}
561
562impl Default for EpochConfig {
563    fn default() -> Self {
564        Self {
565            advance_interval_ms: DEFAULT_EPOCH_ADVANCE_INTERVAL_MS,
566        }
567    }
568}
569
570#[derive(Debug, Clone)]
571pub struct EpochFlushBatch {
572    pub epoch: u64,
573    pub records: Vec<WalRecord>,
574    pub records_per_core: Vec<usize>,
575}
576
577impl EpochFlushBatch {
578    pub fn total_records(&self) -> usize {
579        self.records.len()
580    }
581}
582
583#[derive(Debug)]
584struct EpochWaitState {
585    observed_epochs: Vec<u64>,
586    durable_epoch: Option<u64>,
587}
588
589#[derive(Debug)]
590pub struct EpochOrderCoordinator {
591    pool: Arc<PerCoreWalBufferPool>,
592    current_epoch: AtomicU64,
593    append_epoch: AtomicU64,
594    advance_lock: Mutex<()>,
595    wait_state: Mutex<EpochWaitState>,
596    wait_cv: Condvar,
597    config: EpochConfig,
598}
599
600impl EpochOrderCoordinator {
601    pub fn new(core_count: usize, buffer_config: BufferConfig, config: EpochConfig) -> Self {
602        let pool = Arc::new(PerCoreWalBufferPool::new(core_count, buffer_config));
603        let wait_state = EpochWaitState {
604            observed_epochs: vec![0; core_count],
605            durable_epoch: None,
606        };
607        Self {
608            pool,
609            current_epoch: AtomicU64::new(0),
610            append_epoch: AtomicU64::new(0),
611            advance_lock: Mutex::new(()),
612            wait_state: Mutex::new(wait_state),
613            wait_cv: Condvar::new(),
614            config,
615        }
616    }
617
618    pub fn current_epoch(&self) -> u64 {
619        self.current_epoch.load(Ordering::SeqCst)
620    }
621
622    pub fn current_append_epoch(&self) -> u64 {
623        self.append_epoch.load(Ordering::SeqCst)
624    }
625
626    pub fn durable_epoch(&self) -> Option<u64> {
627        let guard = self
628            .wait_state
629            .lock()
630            .unwrap_or_else(std::sync::PoisonError::into_inner);
631        guard.durable_epoch
632    }
633
634    pub fn epoch_advance_interval(&self) -> Duration {
635        Duration::from_millis(self.config.advance_interval_ms)
636    }
637
638    pub fn observe_epoch(&self, core_id: usize) -> Result<u64, String> {
639        let observed_epoch = self.current_epoch();
640        let mut guard = self
641            .wait_state
642            .lock()
643            .unwrap_or_else(std::sync::PoisonError::into_inner);
644        let Some(slot) = guard.observed_epochs.get_mut(core_id) else {
645            return Err(format!(
646                "invalid core_id={core_id}; available cores={}",
647                guard.observed_epochs.len()
648            ));
649        };
650        *slot = (*slot).max(observed_epoch);
651        drop(guard);
652        self.wait_cv.notify_all();
653        Ok(observed_epoch)
654    }
655
656    pub fn append_to_core(
657        &self,
658        core_id: usize,
659        begin_seq: u64,
660        payload_len: usize,
661    ) -> Result<AppendOutcome, String> {
662        let mut record = make_record(core_id, begin_seq, payload_len);
663        record.epoch = self.current_append_epoch();
664        self.pool.append_to_core(core_id, record)
665    }
666
667    pub fn append_records_to_core(
668        &self,
669        core_id: usize,
670        records: Vec<WalRecord>,
671    ) -> Result<AppendOutcome, String> {
672        self.pool.append_batch_to_core(core_id, records)
673    }
674
675    pub fn advance_epoch_and_wait(
676        &self,
677        active_cores: &[usize],
678        timeout: Duration,
679    ) -> Result<u64, String> {
680        let _advance_guard = self
681            .advance_lock
682            .lock()
683            .unwrap_or_else(std::sync::PoisonError::into_inner);
684        let core_count = self.pool.core_count();
685        for &core_id in active_cores {
686            if core_id >= core_count {
687                return Err(format!(
688                    "invalid active core_id={core_id}; available cores={core_count}"
689                ));
690            }
691        }
692
693        let previous_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
694        let next_epoch = previous_epoch.saturating_add(1);
695        let deadline = Instant::now() + timeout;
696        let mut guard = self
697            .wait_state
698            .lock()
699            .unwrap_or_else(std::sync::PoisonError::into_inner);
700
701        while active_cores
702            .iter()
703            .any(|core_id| guard.observed_epochs[*core_id] < next_epoch)
704        {
705            let now = Instant::now();
706            if now >= deadline {
707                self.current_epoch.store(previous_epoch, Ordering::SeqCst);
708                return Err(format!(
709                    "epoch fence timed out waiting for active cores to observe epoch {next_epoch}"
710                ));
711            }
712
713            let remaining = deadline.saturating_duration_since(now);
714            let (next_guard, wait_result) = self
715                .wait_cv
716                .wait_timeout(guard, remaining)
717                .unwrap_or_else(std::sync::PoisonError::into_inner);
718            guard = next_guard;
719            if wait_result.timed_out()
720                && active_cores
721                    .iter()
722                    .any(|core_id| guard.observed_epochs[*core_id] < next_epoch)
723            {
724                self.current_epoch.store(previous_epoch, Ordering::SeqCst);
725                return Err(format!(
726                    "epoch fence timed out waiting for active cores to observe epoch {next_epoch}"
727                ));
728            }
729        }
730
731        drop(guard);
732        self.pool.seal_active_for_epoch(previous_epoch)?;
733        self.append_epoch.store(next_epoch, Ordering::SeqCst);
734        Ok(next_epoch)
735    }
736
737    pub fn flush_epoch(&self, epoch: u64) -> Result<EpochFlushBatch, String> {
738        let (records, records_per_core) = self.pool.flush_epoch_batches(epoch)?;
739        Ok(EpochFlushBatch {
740            epoch,
741            records,
742            records_per_core,
743        })
744    }
745
746    pub fn mark_epoch_durable(&self, epoch: u64) {
747        let mut guard = self
748            .wait_state
749            .lock()
750            .unwrap_or_else(std::sync::PoisonError::into_inner);
751        guard.durable_epoch = Some(
752            guard
753                .durable_epoch
754                .map_or(epoch, |existing| existing.max(epoch)),
755        );
756        drop(guard);
757        self.wait_cv.notify_all();
758    }
759
760    pub fn wait_until_epoch_durable(&self, epoch: u64, timeout: Duration) -> Result<(), String> {
761        let deadline = Instant::now() + timeout;
762        let mut guard = self
763            .wait_state
764            .lock()
765            .unwrap_or_else(std::sync::PoisonError::into_inner);
766        while guard
767            .durable_epoch
768            .is_none_or(|durable_epoch| durable_epoch < epoch)
769        {
770            let now = Instant::now();
771            if now >= deadline {
772                return Err(format!("timeout while waiting for durable epoch {epoch}"));
773            }
774            let remaining = deadline.saturating_duration_since(now);
775            let (next_guard, wait_result) = self
776                .wait_cv
777                .wait_timeout(guard, remaining)
778                .unwrap_or_else(std::sync::PoisonError::into_inner);
779            guard = next_guard;
780            if wait_result.timed_out()
781                && guard
782                    .durable_epoch
783                    .is_none_or(|durable_epoch| durable_epoch < epoch)
784            {
785                return Err(format!("timeout while waiting for durable epoch {epoch}"));
786            }
787        }
788        Ok(())
789    }
790
791    pub fn recovery_order(records: &[WalRecord]) -> Vec<WalRecord> {
792        let mut ordered = records.to_vec();
793        ordered.sort_by_key(|record| {
794            (
795                record.epoch,
796                record.begin_seq.get(),
797                record.txn_token.id.get(),
798                record.page_id.get(),
799            )
800        });
801        ordered
802    }
803}
804
805fn make_record(core_id: usize, seq: u64, payload_len: usize) -> WalRecord {
806    let core_u64 = u64::try_from(core_id).expect("core id should fit into u64");
807    let txn_id_raw = core_u64.saturating_mul(1_000_000).saturating_add(seq + 1);
808    let txn_id = TxnId::new(txn_id_raw).expect("txn id should be non-zero");
809
810    let page_raw = u32::try_from(core_id + 1).expect("core id should fit into u32");
811    let page_id = PageNumber::new(page_raw).expect("page id should be non-zero");
812
813    WalRecord {
814        txn_token: TxnToken::new(txn_id, TxnEpoch::new(1)),
815        epoch: seq,
816        page_id,
817        begin_seq: CommitSeq::new(seq),
818        end_seq: Some(CommitSeq::new(seq.saturating_add(1))),
819        before_image: vec![0x10; payload_len],
820        after_image: vec![0x20; payload_len],
821    }
822}
823
824#[cfg(test)]
825fn test_runtime() -> Runtime {
826    RuntimeBuilder::current_thread()
827        .blocking_threads(8, 8)
828        .build()
829        .expect("runtime build")
830}
831
832#[test]
833fn bd_ncivz_1_state_machine_double_buffering() {
834    let config = BufferConfig {
835        capacity_bytes: 640,
836        ..BufferConfig::default()
837    };
838    let mut buffer = PerCoreWalBuffer::new(0, config);
839
840    assert_eq!(buffer.active_state(), BufferState::Writable);
841    assert_eq!(buffer.flush_state(), BufferState::Writable);
842
843    assert_eq!(
844        buffer.append(make_record(0, 1, 64)),
845        AppendOutcome::Appended
846    );
847    assert_eq!(
848        buffer.append(make_record(0, 2, 64)),
849        AppendOutcome::Appended
850    );
851    assert_eq!(buffer.active_len(), 2);
852
853    buffer.seal_active(7).expect("active lane should seal");
854    assert_eq!(buffer.active_state(), BufferState::Sealed { epoch: 7 });
855
856    let flushed_records = buffer.begin_flush().expect("sealed lane should flush");
857    assert_eq!(flushed_records, 2);
858    assert_eq!(buffer.flush_state(), BufferState::Flushing { epoch: 7 });
859    assert_eq!(buffer.active_state(), BufferState::Writable);
860
861    assert_eq!(
862        buffer.append(make_record(0, 3, 64)),
863        AppendOutcome::Appended
864    );
865    assert_eq!(buffer.active_len(), 1);
866
867    buffer
868        .complete_flush()
869        .expect("flushing lane should complete");
870    assert_eq!(buffer.flush_state(), BufferState::Writable);
871    assert_eq!(buffer.flush_len(), 0);
872    assert_eq!(buffer.active_len(), 1);
873    assert_eq!(
874        buffer.fallback_decision(),
875        FallbackDecision::ContinueParallel
876    );
877}
878
879#[test]
880fn bd_ncivz_1_sealed_lane_rejects_mutating_appends() {
881    let config = BufferConfig {
882        capacity_bytes: 640,
883        ..BufferConfig::default()
884    };
885    let mut buffer = PerCoreWalBuffer::new(0, config);
886
887    let seeded = make_record(0, 1, 64);
888    assert_eq!(buffer.append(seeded.clone()), AppendOutcome::Appended);
889    buffer
890        .seal_active(3)
891        .expect("active lane should seal before flush");
892
893    assert_eq!(
894        buffer.append(make_record(0, 2, 64)),
895        AppendOutcome::Blocked,
896        "sealed lane must reject mutating appends"
897    );
898
899    let flushed_records = buffer
900        .begin_flush()
901        .expect("sealed records should move into flush lane");
902    assert_eq!(flushed_records, 1);
903    assert_eq!(buffer.flush_lane.records.len(), 1);
904
905    let flushed = &buffer.flush_lane.records[0];
906    assert_eq!(
907        flushed.txn_token.id.get(),
908        seeded.txn_token.id.get(),
909        "flushed lane contents must remain unchanged after blocked append"
910    );
911    assert_eq!(
912        flushed.end_seq, seeded.end_seq,
913        "commit metadata must remain intact while lane is sealed"
914    );
915}
916
917#[test]
918fn bd_ncivz_1_overflow_block_writer_policy() {
919    let config = BufferConfig {
920        capacity_bytes: 160,
921        overflow_policy: OverflowPolicy::BlockWriter,
922        overflow_fallback_bytes: 320,
923    };
924    let mut buffer = PerCoreWalBuffer::new(1, config);
925
926    assert_eq!(
927        buffer.append(make_record(1, 1, 48)),
928        AppendOutcome::Appended
929    );
930    assert_eq!(buffer.append(make_record(1, 2, 48)), AppendOutcome::Blocked);
931    assert_eq!(buffer.overflow_len(), 0);
932    assert_eq!(
933        buffer.fallback_decision(),
934        FallbackDecision::ContinueParallel
935    );
936}
937
938#[test]
939fn bd_ncivz_1_overflow_allocate_triggers_deterministic_fallback() {
940    let config = BufferConfig {
941        capacity_bytes: 192,
942        overflow_policy: OverflowPolicy::AllocateOverflow,
943        overflow_fallback_bytes: 170,
944    };
945    let mut buffer = PerCoreWalBuffer::new(2, config);
946
947    assert_eq!(
948        buffer.append(make_record(2, 1, 64)),
949        AppendOutcome::Appended
950    );
951    assert_eq!(
952        buffer.append(make_record(2, 2, 64)),
953        AppendOutcome::QueuedOverflow
954    );
955    assert_eq!(
956        buffer.append(make_record(2, 3, 64)),
957        AppendOutcome::QueuedOverflow
958    );
959
960    assert_eq!(
961        buffer.fallback_decision(),
962        FallbackDecision::ForceSerializedDrain
963    );
964    assert_eq!(buffer.overflow_len(), 2);
965
966    let drained = buffer.force_serialized_drain();
967    assert_eq!(drained.len(), 3);
968    assert_eq!(buffer.active_len(), 0);
969    assert_eq!(buffer.flush_len(), 0);
970    assert_eq!(buffer.overflow_len(), 0);
971    assert_eq!(
972        buffer.fallback_decision(),
973        FallbackDecision::ContinueParallel
974    );
975}
976
977#[test]
978fn bd_ncivz_1_per_core_pool_concurrent_writers_no_contention() {
979    let pool = Arc::new(PerCoreWalBufferPool::new(8, BufferConfig::default()));
980    let records_per_core = 400_u64;
981    let runtime = test_runtime();
982    let handle = runtime.handle();
983
984    runtime.block_on(async {
985        let mut writer_tasks = Vec::new();
986        for core_id in 0..8_usize {
987            let pool_ref = Arc::clone(&pool);
988            writer_tasks.push(
989                handle
990                    .try_spawn(async move {
991                        spawn_blocking(move || {
992                            for seq in 0..records_per_core {
993                                let record = make_record(core_id, seq, 64);
994                                let outcome = pool_ref
995                                    .append_to_core(core_id, record)
996                                    .expect("core index should exist");
997                                assert!(
998                                    matches!(
999                                        outcome,
1000                                        AppendOutcome::Appended | AppendOutcome::QueuedOverflow
1001                                    ),
1002                                    "append outcome should not block"
1003                                );
1004                            }
1005                        })
1006                        .await;
1007                    })
1008                    .expect("writer task should spawn"),
1009            );
1010        }
1011
1012        for writer_task in writer_tasks {
1013            writer_task.await;
1014        }
1015    });
1016
1017    assert_eq!(pool.contention_events_total(), 0);
1018}
1019
1020#[test]
1021fn bd_ncivz_2_epoch_counter_defaults_to_10ms_interval() {
1022    let coordinator =
1023        EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1024    assert_eq!(coordinator.current_epoch(), 0);
1025    assert_eq!(
1026        coordinator.epoch_advance_interval(),
1027        Duration::from_millis(10)
1028    );
1029    assert_eq!(coordinator.pool.core_count(), 2);
1030}
1031
1032#[test]
1033fn bd_ncivz_2_epoch_fence_waits_for_active_core_observation() {
1034    let coordinator = Arc::new(EpochOrderCoordinator::new(
1035        2,
1036        BufferConfig::default(),
1037        EpochConfig::default(),
1038    ));
1039    coordinator
1040        .observe_epoch(0)
1041        .expect("core 0 should be valid");
1042    coordinator
1043        .observe_epoch(1)
1044        .expect("core 1 should be valid");
1045    let runtime = test_runtime();
1046    let handle = runtime.handle();
1047
1048    let next_epoch = runtime.block_on(async {
1049        let observer_ref = Arc::clone(&coordinator);
1050        let observer = handle
1051            .try_spawn(async move {
1052                sleep(wall_now(), Duration::from_millis(8)).await;
1053                observer_ref
1054                    .observe_epoch(0)
1055                    .expect("core 0 observation should succeed");
1056                observer_ref
1057                    .observe_epoch(1)
1058                    .expect("core 1 observation should succeed");
1059            })
1060            .expect("observer task should spawn");
1061
1062        let fence_ref = Arc::clone(&coordinator);
1063        let next_epoch = spawn_blocking(move || {
1064            fence_ref.advance_epoch_and_wait(&[0, 1], Duration::from_millis(200))
1065        })
1066        .await
1067        .expect("fence should complete after observations");
1068        observer.await;
1069        next_epoch
1070    });
1071
1072    assert_eq!(next_epoch, 1);
1073    assert_eq!(coordinator.current_epoch(), 1);
1074}
1075
1076#[test]
1077fn bd_ncivz_2_append_during_epoch_fence_stays_in_previous_epoch_lane() {
1078    let coordinator = Arc::new(EpochOrderCoordinator::new(
1079        1,
1080        BufferConfig::default(),
1081        EpochConfig::default(),
1082    ));
1083    coordinator
1084        .observe_epoch(0)
1085        .expect("core 0 observation should succeed");
1086    coordinator
1087        .append_to_core(0, 1, 64)
1088        .expect("initial append should succeed");
1089
1090    let runtime = test_runtime();
1091    let handle = runtime.handle();
1092
1093    runtime.block_on(async {
1094        let advance_ref = Arc::clone(&coordinator);
1095        let advance = handle
1096            .try_spawn(async move {
1097                spawn_blocking(move || {
1098                    advance_ref.advance_epoch_and_wait(&[0], Duration::from_millis(200))
1099                })
1100                .await
1101            })
1102            .expect("advance task should spawn");
1103
1104        let deadline = Instant::now() + Duration::from_millis(200);
1105        while coordinator.current_epoch() != 1 {
1106            assert!(
1107                Instant::now() < deadline,
1108                "advance should publish epoch 1 before timing out"
1109            );
1110            std::thread::yield_now();
1111        }
1112        assert_eq!(
1113            coordinator.current_append_epoch(),
1114            0,
1115            "writers must keep appending to the previous epoch until sealing completes"
1116        );
1117
1118        coordinator
1119            .append_to_core(0, 2, 64)
1120            .expect("append during fence window should still succeed");
1121        coordinator
1122            .observe_epoch(0)
1123            .expect("core 0 observation should release the fence");
1124
1125        let next_epoch = advance.await.expect("advance should complete");
1126        assert_eq!(next_epoch, 1);
1127    });
1128
1129    let batch = coordinator
1130        .flush_epoch(0)
1131        .expect("flush should not see an epoch straddle");
1132    assert_eq!(batch.total_records(), 2);
1133    assert_eq!(batch.records_per_core, vec![2]);
1134    assert!(
1135        batch.records.iter().all(|record| record.epoch == 0),
1136        "records appended during the fence window must stay in epoch 0: {:?}",
1137        batch
1138            .records
1139            .iter()
1140            .map(|record| record.epoch)
1141            .collect::<Vec<_>>()
1142    );
1143}
1144
1145#[test]
1146fn bd_ncivz_2_group_commit_flushes_epoch_across_cores() {
1147    let coordinator = Arc::new(EpochOrderCoordinator::new(
1148        2,
1149        BufferConfig::default(),
1150        EpochConfig::default(),
1151    ));
1152
1153    coordinator
1154        .observe_epoch(0)
1155        .expect("core 0 should be valid");
1156    coordinator
1157        .observe_epoch(1)
1158        .expect("core 1 should be valid");
1159
1160    assert_eq!(
1161        coordinator
1162            .append_to_core(0, 1, 64)
1163            .expect("append on core 0 should succeed"),
1164        AppendOutcome::Appended
1165    );
1166    assert_eq!(
1167        coordinator
1168            .append_to_core(0, 2, 64)
1169            .expect("append on core 0 should succeed"),
1170        AppendOutcome::Appended
1171    );
1172    assert_eq!(
1173        coordinator
1174            .append_to_core(1, 3, 64)
1175            .expect("append on core 1 should succeed"),
1176        AppendOutcome::Appended
1177    );
1178    let runtime = test_runtime();
1179    let handle = runtime.handle();
1180
1181    runtime.block_on(async {
1182        let observer_ref = Arc::clone(&coordinator);
1183        let observer = handle
1184            .try_spawn(async move {
1185                sleep(wall_now(), Duration::from_millis(6)).await;
1186                observer_ref
1187                    .observe_epoch(0)
1188                    .expect("core 0 observation should succeed");
1189                observer_ref
1190                    .observe_epoch(1)
1191                    .expect("core 1 observation should succeed");
1192            })
1193            .expect("observer task should spawn");
1194
1195        let advance_ref = Arc::clone(&coordinator);
1196        spawn_blocking(move || {
1197            advance_ref.advance_epoch_and_wait(&[0, 1], Duration::from_millis(200))
1198        })
1199        .await
1200        .expect("epoch advance should succeed");
1201        observer.await;
1202
1203        let batch = coordinator
1204            .flush_epoch(0)
1205            .expect("epoch 0 flush should succeed");
1206        assert_eq!(batch.epoch, 0);
1207        assert_eq!(batch.records_per_core, vec![2, 1]);
1208        assert_eq!(batch.total_records(), 3);
1209        assert!(batch.records.iter().all(|record| record.epoch == 0));
1210
1211        assert_eq!(coordinator.durable_epoch(), None);
1212        coordinator.mark_epoch_durable(0);
1213        assert_eq!(coordinator.durable_epoch(), Some(0));
1214        coordinator
1215            .wait_until_epoch_durable(0, Duration::from_millis(25))
1216            .expect("durability wait should pass");
1217    });
1218}
1219
1220#[test]
1221fn bd_ncivz_2_writers_block_until_epoch_is_durable() {
1222    let coordinator = Arc::new(EpochOrderCoordinator::new(
1223        1,
1224        BufferConfig::default(),
1225        EpochConfig::default(),
1226    ));
1227    coordinator
1228        .observe_epoch(0)
1229        .expect("core 0 observation should succeed");
1230    coordinator
1231        .append_to_core(0, 1, 64)
1232        .expect("append should succeed");
1233    let runtime = test_runtime();
1234    let handle = runtime.handle();
1235
1236    runtime.block_on(async {
1237        let waiter_ref = Arc::clone(&coordinator);
1238        let waiter = handle
1239            .try_spawn(async move {
1240                spawn_blocking(move || {
1241                    waiter_ref.wait_until_epoch_durable(0, Duration::from_millis(600))
1242                })
1243                .await
1244            })
1245            .expect("waiter task should spawn");
1246
1247        sleep(wall_now(), Duration::from_millis(30)).await;
1248        assert!(
1249            !waiter.is_finished(),
1250            "writer should still be waiting before flush"
1251        );
1252
1253        let advance_ref = Arc::clone(&coordinator);
1254        spawn_blocking(move || advance_ref.advance_epoch_and_wait(&[], Duration::from_millis(25)))
1255            .await
1256            .expect("advancing with no active fence set should succeed");
1257        assert_eq!(
1258            coordinator
1259                .flush_epoch(0)
1260                .expect("flush should succeed")
1261                .total_records(),
1262            1
1263        );
1264        assert_eq!(coordinator.durable_epoch(), None);
1265        coordinator.mark_epoch_durable(0);
1266
1267        waiter.await.expect("epoch should become durable");
1268    });
1269}
1270
1271#[test]
1272fn bd_ncivz_2_recovery_replays_in_epoch_order() {
1273    let mut r1 = make_record(0, 10, 16);
1274    r1.epoch = 2;
1275    let mut r2 = make_record(1, 2, 16);
1276    r2.epoch = 1;
1277    let mut r3 = make_record(0, 3, 16);
1278    r3.epoch = 1;
1279    let mut r4 = make_record(1, 9, 16);
1280    r4.epoch = 2;
1281
1282    let ordered = EpochOrderCoordinator::recovery_order(&[r1, r2, r3, r4]);
1283    assert!(
1284        ordered
1285            .windows(2)
1286            .all(|pair| pair[0].epoch <= pair[1].epoch)
1287    );
1288    assert_eq!(ordered[0].epoch, 1);
1289    assert_eq!(ordered[1].epoch, 1);
1290    assert_eq!(ordered[2].epoch, 2);
1291    assert_eq!(ordered[3].epoch, 2);
1292}
1293
1294#[test]
1295fn bd_ncivz_2_epoch_fence_timeout_when_active_core_not_observed() {
1296    let coordinator =
1297        EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1298    let error = coordinator
1299        .advance_epoch_and_wait(&[1], Duration::from_millis(20))
1300        .expect_err("fence must timeout without active core observation");
1301    assert!(
1302        error.contains("timed out"),
1303        "error should describe fence timeout: {error}"
1304    );
1305}
1306
1307#[test]
1308fn bd_ncivz_2_epoch_fence_timeout_does_not_poison_next_advance() {
1309    let coordinator =
1310        EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1311
1312    coordinator
1313        .append_to_core(0, 1, 64)
1314        .expect("initial append should succeed");
1315
1316    let error = coordinator
1317        .advance_epoch_and_wait(&[0], Duration::from_millis(20))
1318        .expect_err("fence must timeout without active core observation");
1319    assert!(
1320        error.contains("timed out"),
1321        "error should describe fence timeout: {error}"
1322    );
1323    assert_eq!(
1324        coordinator.current_epoch(),
1325        0,
1326        "timed-out epoch advance must roll back the published epoch"
1327    );
1328    assert_eq!(
1329        coordinator.current_append_epoch(),
1330        0,
1331        "timed-out epoch advance must not move the append epoch"
1332    );
1333
1334    coordinator
1335        .append_to_core(0, 2, 64)
1336        .expect("append after timeout should still use epoch 0");
1337    coordinator
1338        .advance_epoch_and_wait(&[], Duration::from_millis(25))
1339        .expect("retry advance should succeed");
1340
1341    let batch = coordinator
1342        .flush_epoch(0)
1343        .expect("retry advance should seal the original epoch");
1344    assert_eq!(batch.total_records(), 2);
1345    assert!(
1346        batch.records.iter().all(|record| record.epoch == 0),
1347        "timed-out fence must not relabel epoch-0 records: {:?}",
1348        batch
1349            .records
1350            .iter()
1351            .map(|record| record.epoch)
1352            .collect::<Vec<_>>()
1353    );
1354}
1355
1356#[test]
1357fn bd_ncivz_2_fence_detects_epoch_boundary_straddle() {
1358    let pool = PerCoreWalBufferPool::new(1, BufferConfig::default());
1359    let mut epoch0 = make_record(0, 1, 64);
1360    epoch0.epoch = 0;
1361    let mut epoch1 = make_record(0, 2, 64);
1362    epoch1.epoch = 1;
1363
1364    assert_eq!(
1365        pool.append_to_core(0, epoch0)
1366            .expect("append should succeed"),
1367        AppendOutcome::Appended
1368    );
1369    assert_eq!(
1370        pool.append_to_core(0, epoch1)
1371            .expect("append should succeed"),
1372        AppendOutcome::Appended
1373    );
1374    pool.seal_active_for_epoch(0)
1375        .expect("sealing should succeed");
1376
1377    let error = pool
1378        .flush_epoch_batches(0)
1379        .expect_err("mixed epochs in one flush lane must fail");
1380    assert!(
1381        error.contains("straddle"),
1382        "error should report epoch straddle: {error}"
1383    );
1384}
1385
1386#[test]
1387fn bd_ncivz_2_commits_across_epochs_preserve_serial_epoch_order() {
1388    let coordinator =
1389        EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1390    coordinator
1391        .observe_epoch(0)
1392        .expect("core 0 observation should succeed");
1393
1394    coordinator
1395        .append_to_core(0, 1, 64)
1396        .expect("append should succeed");
1397    coordinator
1398        .advance_epoch_and_wait(&[], Duration::from_millis(25))
1399        .expect("advance should succeed");
1400    let first_batch = coordinator
1401        .flush_epoch(0)
1402        .expect("epoch 0 flush should succeed");
1403
1404    coordinator
1405        .observe_epoch(0)
1406        .expect("core 0 observation should succeed");
1407    coordinator
1408        .append_to_core(0, 2, 64)
1409        .expect("append should succeed");
1410    coordinator
1411        .advance_epoch_and_wait(&[], Duration::from_millis(25))
1412        .expect("advance should succeed");
1413    let second_batch = coordinator
1414        .flush_epoch(1)
1415        .expect("epoch 1 flush should succeed");
1416
1417    let mut combined = first_batch.records;
1418    combined.extend(second_batch.records);
1419    let ordered = EpochOrderCoordinator::recovery_order(&combined);
1420    assert!(
1421        ordered.windows(2).all(|pair| {
1422            pair[0].epoch < pair[1].epoch
1423                || (pair[0].epoch == pair[1].epoch
1424                    && pair[0].begin_seq.get() <= pair[1].begin_seq.get())
1425        }),
1426        "recovery ordering should be monotonic by epoch and begin_seq"
1427    );
1428}
1429
1430#[test]
1431fn bd_ncivz_2_abort_cleanup_drops_non_committed_records() {
1432    let coordinator =
1433        EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1434    coordinator
1435        .observe_epoch(0)
1436        .expect("core 0 observation should succeed");
1437
1438    coordinator
1439        .append_to_core(0, 1, 64)
1440        .expect("committed append should succeed");
1441
1442    let mut aborted = make_record(0, 2, 64);
1443    aborted.epoch = coordinator.current_epoch();
1444    aborted.end_seq = None;
1445    let aborted_txn_id = aborted.txn_token.id.get();
1446    assert_eq!(
1447        coordinator
1448            .pool
1449            .append_to_core(0, aborted)
1450            .expect("aborted append should still enter active lane"),
1451        AppendOutcome::Appended
1452    );
1453
1454    coordinator
1455        .advance_epoch_and_wait(&[], Duration::from_millis(25))
1456        .expect("epoch advance should succeed");
1457    let batch = coordinator
1458        .flush_epoch(0)
1459        .expect("flush should prune aborted records");
1460
1461    assert_eq!(batch.total_records(), 1);
1462    assert!(
1463        batch.records.iter().all(|record| record.end_seq.is_some()),
1464        "all flushed records must be committed"
1465    );
1466    assert!(
1467        batch
1468            .records
1469            .iter()
1470            .all(|record| record.txn_token.id.get() != aborted_txn_id),
1471        "aborted record must not survive cleanup"
1472    );
1473
1474    let ordered = EpochOrderCoordinator::recovery_order(&batch.records);
1475    assert_eq!(ordered.len(), 1);
1476    assert!(
1477        ordered[0].end_seq.is_some(),
1478        "recovery input should only contain committed records"
1479    );
1480}
1481
1482// -- PerCoreWalBuffer unit-level state machine tests --
1483
1484#[test]
1485fn seal_active_on_already_sealed_returns_error() {
1486    let config = BufferConfig {
1487        capacity_bytes: 640,
1488        ..BufferConfig::default()
1489    };
1490    let mut buf = PerCoreWalBuffer::new(0, config);
1491    buf.seal_active(1).expect("first seal");
1492    assert!(buf.seal_active(2).is_err());
1493}
1494
1495#[test]
1496fn begin_flush_on_writable_returns_error() {
1497    let config = BufferConfig {
1498        capacity_bytes: 640,
1499        ..BufferConfig::default()
1500    };
1501    let mut buf = PerCoreWalBuffer::new(0, config);
1502    assert!(buf.begin_flush().is_err());
1503}
1504
1505#[test]
1506fn complete_flush_on_non_flushing_returns_error() {
1507    let config = BufferConfig {
1508        capacity_bytes: 640,
1509        ..BufferConfig::default()
1510    };
1511    let mut buf = PerCoreWalBuffer::new(0, config);
1512    assert!(buf.complete_flush().is_err());
1513}
1514
1515#[test]
1516fn force_serialized_drain_collects_all_lanes() {
1517    let config = BufferConfig {
1518        capacity_bytes: 640,
1519        overflow_policy: OverflowPolicy::AllocateOverflow,
1520        overflow_fallback_bytes: 4096,
1521    };
1522    let mut buf = PerCoreWalBuffer::new(0, config);
1523
1524    buf.append(make_record(0, 1, 64));
1525    buf.seal_active(1).unwrap();
1526    buf.begin_flush().unwrap();
1527
1528    buf.append(make_record(0, 2, 64));
1529
1530    let drained = buf.force_serialized_drain();
1531    assert_eq!(drained.len(), 2);
1532    assert_eq!(buf.active_state(), BufferState::Writable);
1533    assert_eq!(buf.flush_state(), BufferState::Writable);
1534    assert_eq!(buf.active_len(), 0);
1535    assert_eq!(buf.flush_len(), 0);
1536    assert_eq!(buf.overflow_len(), 0);
1537    assert_eq!(buf.fallback_decision(), FallbackDecision::ContinueParallel);
1538}
1539
1540#[test]
1541fn overflow_drains_into_active_after_complete_flush() {
1542    let config = BufferConfig {
1543        capacity_bytes: 160,
1544        overflow_policy: OverflowPolicy::AllocateOverflow,
1545        overflow_fallback_bytes: 4096,
1546    };
1547    let mut buf = PerCoreWalBuffer::new(0, config);
1548
1549    buf.append(make_record(0, 1, 48));
1550    assert_eq!(
1551        buf.append(make_record(0, 2, 48)),
1552        AppendOutcome::QueuedOverflow
1553    );
1554    assert_eq!(buf.active_len(), 1);
1555    assert_eq!(buf.overflow_len(), 1);
1556
1557    buf.seal_active(1).unwrap();
1558    buf.begin_flush().unwrap();
1559    buf.complete_flush().unwrap();
1560
1561    assert!(buf.overflow_len() == 0 || buf.active_len() > 0);
1562    assert_eq!(buf.active_state(), BufferState::Writable);
1563}
1564
1565#[test]
1566fn append_record_exceeding_capacity_latches_fallback() {
1567    let config = BufferConfig {
1568        capacity_bytes: 32,
1569        ..BufferConfig::default()
1570    };
1571    let mut buf = PerCoreWalBuffer::new(0, config);
1572    let outcome = buf.append(make_record(0, 1, 128));
1573    assert_eq!(outcome, AppendOutcome::Blocked);
1574    assert_eq!(
1575        buf.fallback_decision(),
1576        FallbackDecision::ForceSerializedDrain
1577    );
1578}
1579
1580#[test]
1581fn append_batch_with_oversized_single_record_latches_fallback() {
1582    let config = BufferConfig {
1583        capacity_bytes: 32,
1584        ..BufferConfig::default()
1585    };
1586    let mut buf = PerCoreWalBuffer::new(0, config);
1587    let outcome = buf.append_batch(vec![make_record(0, 1, 128)]);
1588    assert_eq!(outcome, AppendOutcome::Blocked);
1589    assert_eq!(
1590        buf.fallback_decision(),
1591        FallbackDecision::ForceSerializedDrain
1592    );
1593}
1594
1595#[test]
1596fn append_batch_all_fit_returns_appended() {
1597    let config = BufferConfig {
1598        capacity_bytes: 4096,
1599        ..BufferConfig::default()
1600    };
1601    let mut buf = PerCoreWalBuffer::new(0, config);
1602    let records = vec![make_record(0, 1, 32), make_record(0, 2, 32)];
1603    assert_eq!(buf.append_batch(records), AppendOutcome::Appended);
1604    assert_eq!(buf.active_len(), 2);
1605}
1606
1607#[test]
1608fn recovery_order_sorts_by_epoch_then_seq() {
1609    let mut r1 = make_record(0, 10, 16);
1610    r1.epoch = 2;
1611    let mut r2 = make_record(0, 5, 16);
1612    r2.epoch = 1;
1613    let mut r3 = make_record(0, 7, 16);
1614    r3.epoch = 2;
1615
1616    let ordered = EpochOrderCoordinator::recovery_order(&[r1, r2.clone(), r3]);
1617    assert_eq!(ordered[0].epoch, 1);
1618    assert_eq!(ordered[0].begin_seq.get(), r2.begin_seq.get());
1619    assert!(ordered[1].epoch <= ordered[2].epoch);
1620}
1621
1622#[test]
1623fn buffer_config_default_values() {
1624    let config = BufferConfig::default();
1625    assert_eq!(config.capacity_bytes, DEFAULT_BUFFER_CAPACITY_BYTES);
1626    assert_eq!(
1627        config.overflow_fallback_bytes,
1628        DEFAULT_OVERFLOW_FALLBACK_BYTES
1629    );
1630    assert_eq!(config.overflow_policy, OverflowPolicy::AllocateOverflow);
1631}
1632
1633#[test]
1634fn thread_buffer_slot_stable_within_thread() {
1635    let slot1 = thread_buffer_slot(16);
1636    let slot2 = thread_buffer_slot(16);
1637    assert_eq!(slot1, slot2);
1638    assert!(slot1 < 16);
1639}
1640
1641#[test]
1642fn pool_invalid_core_id_returns_error() {
1643    let pool = PerCoreWalBufferPool::new(2, BufferConfig::default());
1644    assert!(pool.append_to_core(2, make_record(0, 1, 16)).is_err());
1645    assert!(pool.append_to_core(99, make_record(0, 1, 16)).is_err());
1646    assert!(pool.append_to_core(0, make_record(0, 1, 16)).is_ok());
1647}
1648
1649#[test]
1650fn pool_core_count_and_contention_start_at_zero() {
1651    let pool = PerCoreWalBufferPool::new(4, BufferConfig::default());
1652    assert_eq!(pool.core_count(), 4);
1653    assert_eq!(pool.contention_events_total(), 0);
1654}
1655
1656#[test]
1657fn epoch_config_default_values() {
1658    let cfg = EpochConfig::default();
1659    assert_eq!(cfg.advance_interval_ms, DEFAULT_EPOCH_ADVANCE_INTERVAL_MS);
1660}
1661
1662#[test]
1663fn epoch_flush_batch_total_records() {
1664    let batch = EpochFlushBatch {
1665        epoch: 1,
1666        records: vec![make_record(0, 1, 8), make_record(0, 2, 8)],
1667        records_per_core: vec![2],
1668    };
1669    assert_eq!(batch.total_records(), 2);
1670
1671    let empty = EpochFlushBatch {
1672        epoch: 0,
1673        records: Vec::new(),
1674        records_per_core: Vec::new(),
1675    };
1676    assert_eq!(empty.total_records(), 0);
1677}
1678
1679#[test]
1680fn wal_record_encoded_len_includes_images() {
1681    let r = make_record(0, 1, 100);
1682    let len = r.encoded_len();
1683    assert!(len >= RECORD_FIXED_OVERHEAD_BYTES + 200);
1684    let small = make_record(0, 1, 0);
1685    assert!(small.encoded_len() >= RECORD_FIXED_OVERHEAD_BYTES);
1686    assert!(small.encoded_len() < len);
1687}
1688
1689#[test]
1690fn buffer_state_variant_equality() {
1691    assert_eq!(BufferState::Writable, BufferState::Writable);
1692    assert_eq!(
1693        BufferState::Sealed { epoch: 5 },
1694        BufferState::Sealed { epoch: 5 }
1695    );
1696    assert_ne!(
1697        BufferState::Sealed { epoch: 5 },
1698        BufferState::Sealed { epoch: 6 }
1699    );
1700    assert_ne!(
1701        BufferState::Sealed { epoch: 1 },
1702        BufferState::Flushing { epoch: 1 }
1703    );
1704    let dbg = format!("{:?}", BufferState::Flushing { epoch: 3 });
1705    assert!(dbg.contains("Flushing"));
1706}
1707
1708#[test]
1709fn pool_append_batch_to_core_works() {
1710    let pool = PerCoreWalBufferPool::new(2, BufferConfig::default());
1711    let records = vec![make_record(0, 1, 16), make_record(0, 2, 16)];
1712    let outcome = pool.append_batch_to_core(0, records).unwrap();
1713    assert_eq!(outcome, AppendOutcome::Appended);
1714    assert!(
1715        pool.append_batch_to_core(99, vec![make_record(0, 1, 16)])
1716            .is_err()
1717    );
1718}
1719
1720#[test]
1721fn overflow_policy_debug_clone_copy_eq() {
1722    let a = OverflowPolicy::BlockWriter;
1723    let b = OverflowPolicy::AllocateOverflow;
1724    assert_ne!(a, b);
1725    let copied = a;
1726    assert_eq!(copied, a);
1727    let dbg = format!("{a:?}");
1728    assert!(dbg.contains("BlockWriter"));
1729    let dbg_b = format!("{b:?}");
1730    assert!(dbg_b.contains("AllocateOverflow"));
1731}
1732
1733#[test]
1734fn append_outcome_debug_clone_copy_eq() {
1735    let variants = [
1736        AppendOutcome::Appended,
1737        AppendOutcome::QueuedOverflow,
1738        AppendOutcome::Blocked,
1739    ];
1740    for v in &variants {
1741        let copied = *v;
1742        assert_eq!(copied, *v);
1743    }
1744    let dbg = format!("{:?}", AppendOutcome::QueuedOverflow);
1745    assert!(dbg.contains("QueuedOverflow"));
1746    assert_ne!(AppendOutcome::Appended, AppendOutcome::Blocked);
1747}
1748
1749#[test]
1750fn fallback_decision_debug_clone_copy_eq() {
1751    let a = FallbackDecision::ContinueParallel;
1752    let b = FallbackDecision::ForceSerializedDrain;
1753    assert_ne!(a, b);
1754    let copied = a;
1755    assert_eq!(copied, a);
1756    let dbg = format!("{b:?}");
1757    assert!(dbg.contains("ForceSerializedDrain"));
1758}
1759
1760#[test]
1761fn wal_record_debug_and_clone() {
1762    let r = make_record(0, 5, 32);
1763    let dbg = format!("{r:?}");
1764    assert!(dbg.contains("WalRecord"));
1765    let cloned = r.clone();
1766    assert_eq!(cloned.epoch, r.epoch);
1767    assert_eq!(cloned.page_id, r.page_id);
1768    assert_eq!(cloned.before_image.len(), r.before_image.len());
1769    assert_eq!(cloned.after_image, r.after_image);
1770}
1771
1772#[test]
1773fn epoch_coordinator_mark_and_query_durable_epoch() {
1774    let coord = EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1775    assert_eq!(coord.durable_epoch(), None);
1776    coord.mark_epoch_durable(5);
1777    assert_eq!(coord.durable_epoch(), Some(5));
1778    coord.mark_epoch_durable(3);
1779    assert_eq!(coord.durable_epoch(), Some(5), "should keep max");
1780    coord.mark_epoch_durable(10);
1781    assert_eq!(coord.durable_epoch(), Some(10));
1782}
1783
1784#[test]
1785fn buffer_config_clone_copy_debug() {
1786    let cfg = BufferConfig::default();
1787    let copied = cfg;
1788    assert_eq!(copied, cfg);
1789    let dbg = format!("{cfg:?}");
1790    assert!(dbg.contains("BufferConfig"));
1791}
1792
1793#[test]
1794fn epoch_config_default_and_epoch_flush_batch_total() {
1795    let ecfg = EpochConfig::default();
1796    assert_eq!(ecfg.advance_interval_ms, DEFAULT_EPOCH_ADVANCE_INTERVAL_MS);
1797    let batch = EpochFlushBatch {
1798        epoch: 5,
1799        records: Vec::new(),
1800        records_per_core: vec![0, 0],
1801    };
1802    assert_eq!(batch.total_records(), 0);
1803    assert_eq!(batch.epoch, 5);
1804}
1805
1806#[test]
1807fn epoch_order_coordinator_current_and_append_epoch() {
1808    let coord = EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1809    assert_eq!(coord.current_epoch(), 0);
1810    assert_eq!(coord.current_append_epoch(), 0);
1811}
1812
1813#[test]
1814fn per_core_buffer_pool_core_count() {
1815    let pool = PerCoreWalBufferPool::new(4, BufferConfig::default());
1816    assert_eq!(pool.core_count(), 4);
1817}