1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex, TryLockError};
4use std::time::{Duration, Instant};
5
6#[cfg(test)]
7use asupersync::runtime::{Runtime, RuntimeBuilder, spawn_blocking};
8#[cfg(test)]
9use asupersync::time::{sleep, wall_now};
10use fsqlite_types::{CommitSeq, PageNumber, TxnEpoch, TxnId, TxnToken};
11
12const DEFAULT_BUFFER_CAPACITY_BYTES: usize = 4 * 1024 * 1024;
17const DEFAULT_OVERFLOW_FALLBACK_BYTES: usize = 8 * 1024 * 1024;
19const RECORD_FIXED_OVERHEAD_BYTES: usize = 48;
20const DEFAULT_EPOCH_ADVANCE_INTERVAL_MS: u64 = 10;
22
23pub const DEFAULT_BUFFER_SLOT_COUNT: usize = 128;
29
30static NEXT_BUFFER_SLOT: AtomicUsize = AtomicUsize::new(0);
33
34std::thread_local! {
35 static THREAD_BUFFER_SLOT: usize =
38 NEXT_BUFFER_SLOT.fetch_add(1, Ordering::Relaxed);
39}
40
41#[inline]
46pub fn thread_buffer_slot(slot_count: usize) -> usize {
47 THREAD_BUFFER_SLOT.with(|&slot| slot % slot_count)
48}
49
50#[cfg(test)]
52pub fn reset_slot_counter() {
53 NEXT_BUFFER_SLOT.store(0, Ordering::SeqCst);
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58 BlockWriter,
59 AllocateOverflow,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub struct BufferConfig {
64 pub capacity_bytes: usize,
65 pub overflow_policy: OverflowPolicy,
66 pub overflow_fallback_bytes: usize,
67}
68
69impl Default for BufferConfig {
70 fn default() -> Self {
71 Self {
72 capacity_bytes: DEFAULT_BUFFER_CAPACITY_BYTES,
73 overflow_policy: OverflowPolicy::AllocateOverflow,
74 overflow_fallback_bytes: DEFAULT_OVERFLOW_FALLBACK_BYTES,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum BufferState {
81 Writable,
82 Sealed { epoch: u64 },
83 Flushing { epoch: u64 },
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum AppendOutcome {
88 Appended,
89 QueuedOverflow,
90 Blocked,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum FallbackDecision {
95 ContinueParallel,
96 ForceSerializedDrain,
97}
98
99#[derive(Debug, Clone)]
100pub struct WalRecord {
101 pub txn_token: TxnToken,
102 pub epoch: u64,
103 pub page_id: PageNumber,
104 pub begin_seq: CommitSeq,
105 pub end_seq: Option<CommitSeq>,
106 pub before_image: Vec<u8>,
107 pub after_image: Vec<u8>,
108}
109
110impl WalRecord {
111 fn encoded_len(&self) -> usize {
112 let metadata_guard = self.txn_token.id.get()
113 ^ u64::from(self.txn_token.epoch.get())
114 ^ u64::from(self.page_id.get())
115 ^ self.epoch
116 ^ self.begin_seq.get()
117 ^ self.end_seq.map_or(0, CommitSeq::get);
118
119 let metadata_bytes = if metadata_guard == u64::MAX {
120 RECORD_FIXED_OVERHEAD_BYTES + 1
121 } else {
122 RECORD_FIXED_OVERHEAD_BYTES
123 };
124
125 metadata_bytes
126 .saturating_add(self.before_image.len())
127 .saturating_add(self.after_image.len())
128 }
129}
130
131#[derive(Debug, Clone)]
132struct BufferLane {
133 state: BufferState,
134 bytes_used: usize,
135 records: Vec<WalRecord>,
136}
137
138impl BufferLane {
139 fn new_writable() -> Self {
140 Self {
141 state: BufferState::Writable,
142 bytes_used: 0,
143 records: Vec::new(),
144 }
145 }
146}
147
148#[derive(Debug)]
149pub struct PerCoreWalBuffer {
150 config: BufferConfig,
151 active: BufferLane,
152 flush_lane: BufferLane,
153 overflow: VecDeque<WalRecord>,
154 overflow_bytes: usize,
155 fallback_latched: bool,
156}
157
158impl PerCoreWalBuffer {
159 pub fn new(_core_id: usize, config: BufferConfig) -> Self {
160 Self {
161 config,
162 active: BufferLane::new_writable(),
163 flush_lane: BufferLane::new_writable(),
164 overflow: VecDeque::new(),
165 overflow_bytes: 0,
166 fallback_latched: false,
167 }
168 }
169
170 pub fn append(&mut self, record: WalRecord) -> AppendOutcome {
171 if self.active.state != BufferState::Writable {
172 return AppendOutcome::Blocked;
173 }
174
175 let needed = record.encoded_len();
176 if needed > self.config.capacity_bytes {
177 self.fallback_latched = true;
178 return AppendOutcome::Blocked;
179 }
180
181 if self.active.bytes_used.saturating_add(needed) <= self.config.capacity_bytes {
182 self.active.bytes_used = self.active.bytes_used.saturating_add(needed);
183 self.active.records.push(record);
184 return AppendOutcome::Appended;
185 }
186
187 match self.config.overflow_policy {
188 OverflowPolicy::BlockWriter => AppendOutcome::Blocked,
189 OverflowPolicy::AllocateOverflow => {
190 self.overflow_bytes = self.overflow_bytes.saturating_add(needed);
191 self.overflow.push_back(record);
192 if self.overflow_bytes > self.config.overflow_fallback_bytes {
193 self.fallback_latched = true;
194 }
195 AppendOutcome::QueuedOverflow
196 }
197 }
198 }
199
200 pub fn append_batch(&mut self, records: Vec<WalRecord>) -> AppendOutcome {
201 if self.active.state != BufferState::Writable {
202 return AppendOutcome::Blocked;
203 }
204
205 let mut total_needed = 0usize;
206 for record in &records {
207 let needed = record.encoded_len();
208 if needed > self.config.capacity_bytes {
209 self.fallback_latched = true;
210 return AppendOutcome::Blocked;
211 }
212 total_needed = total_needed.saturating_add(needed);
213 }
214
215 if self.config.overflow_policy == OverflowPolicy::BlockWriter
216 && self.active.bytes_used.saturating_add(total_needed) > self.config.capacity_bytes
217 {
218 return AppendOutcome::Blocked;
219 }
220
221 let mut queued_overflow = false;
222 for record in records {
223 match self.append(record) {
224 AppendOutcome::Appended => {}
225 AppendOutcome::QueuedOverflow => queued_overflow = true,
226 AppendOutcome::Blocked => return AppendOutcome::Blocked,
227 }
228 }
229
230 if queued_overflow {
231 AppendOutcome::QueuedOverflow
232 } else {
233 AppendOutcome::Appended
234 }
235 }
236
237 pub fn seal_active(&mut self, epoch: u64) -> Result<(), &'static str> {
238 if self.active.state != BufferState::Writable {
239 return Err("active lane is not writable");
240 }
241 self.active.state = BufferState::Sealed { epoch };
242 Ok(())
243 }
244
245 pub fn begin_flush(&mut self) -> Result<usize, &'static str> {
246 let BufferState::Sealed { epoch } = self.active.state else {
247 return Err("active lane must be sealed before flush");
248 };
249
250 if self.flush_lane.state != BufferState::Writable {
251 return Err("flush lane must be writable before flush");
252 }
253 if !self.flush_lane.records.is_empty() || self.flush_lane.bytes_used != 0 {
254 return Err("flush lane must be empty before flush");
255 }
256
257 std::mem::swap(&mut self.active, &mut self.flush_lane);
258 self.flush_lane.state = BufferState::Flushing { epoch };
259 Ok(self.flush_lane.records.len())
260 }
261
262 pub fn complete_flush(&mut self) -> Result<(), &'static str> {
263 if !matches!(self.flush_lane.state, BufferState::Flushing { .. }) {
264 return Err("flush lane is not in flushing state");
265 }
266
267 self.flush_lane.records.clear();
268 self.flush_lane.bytes_used = 0;
269 self.flush_lane.state = BufferState::Writable;
270 self.drain_overflow_into_active();
271 Ok(())
272 }
273
274 pub fn fallback_decision(&self) -> FallbackDecision {
275 if self.fallback_latched {
276 FallbackDecision::ForceSerializedDrain
277 } else {
278 FallbackDecision::ContinueParallel
279 }
280 }
281
282 pub fn force_serialized_drain(&mut self) -> Vec<WalRecord> {
283 let mut drained = Vec::with_capacity(
284 self.active
285 .records
286 .len()
287 .saturating_add(self.flush_lane.records.len())
288 .saturating_add(self.overflow.len()),
289 );
290 drained.append(&mut self.active.records);
291 drained.append(&mut self.flush_lane.records);
292 drained.extend(self.overflow.drain(..));
293
294 self.active.bytes_used = 0;
295 self.active.state = BufferState::Writable;
296
297 self.flush_lane.bytes_used = 0;
298 self.flush_lane.state = BufferState::Writable;
299
300 self.overflow_bytes = 0;
301 self.fallback_latched = false;
302 drained
303 }
304
305 pub fn active_state(&self) -> BufferState {
306 self.active.state
307 }
308
309 pub fn flush_state(&self) -> BufferState {
310 self.flush_lane.state
311 }
312
313 pub fn active_len(&self) -> usize {
314 self.active.records.len()
315 }
316
317 pub fn flush_len(&self) -> usize {
318 self.flush_lane.records.len()
319 }
320
321 pub fn overflow_len(&self) -> usize {
322 self.overflow.len()
323 }
324
325 fn drain_overflow_into_active(&mut self) {
326 if self.active.state != BufferState::Writable {
327 return;
328 }
329
330 while let Some(front) = self.overflow.front() {
331 let needed = front.encoded_len();
332 if self.active.bytes_used.saturating_add(needed) > self.config.capacity_bytes {
333 break;
334 }
335
336 let Some(record) = self.overflow.pop_front() else {
337 break;
338 };
339 self.overflow_bytes = self.overflow_bytes.saturating_sub(needed);
340 self.active.bytes_used = self.active.bytes_used.saturating_add(needed);
341 self.active.records.push(record);
342 }
343
344 if self.overflow.is_empty() {
345 self.fallback_latched = false;
346 }
347 }
348}
349
350#[derive(Debug)]
351struct BufferCell {
352 inner: Mutex<PerCoreWalBuffer>,
353 contention_events: AtomicU64,
354}
355
356impl BufferCell {
357 fn new(core_id: usize, config: BufferConfig) -> Self {
358 Self {
359 inner: Mutex::new(PerCoreWalBuffer::new(core_id, config)),
360 contention_events: AtomicU64::new(0),
361 }
362 }
363
364 fn append(&self, record: WalRecord) -> AppendOutcome {
365 match self.inner.try_lock() {
366 Ok(mut guard) => guard.append(record),
367 Err(TryLockError::WouldBlock) => {
368 self.contention_events.fetch_add(1, Ordering::Relaxed);
369 let mut guard = self
370 .inner
371 .lock()
372 .unwrap_or_else(std::sync::PoisonError::into_inner);
373 guard.append(record)
374 }
375 Err(TryLockError::Poisoned(poisoned)) => {
376 let mut guard = poisoned.into_inner();
377 guard.append(record)
378 }
379 }
380 }
381
382 fn append_batch(&self, records: Vec<WalRecord>) -> AppendOutcome {
383 match self.inner.try_lock() {
384 Ok(mut guard) => guard.append_batch(records),
385 Err(TryLockError::WouldBlock) => {
386 self.contention_events.fetch_add(1, Ordering::Relaxed);
387 let mut guard = self
388 .inner
389 .lock()
390 .unwrap_or_else(std::sync::PoisonError::into_inner);
391 guard.append_batch(records)
392 }
393 Err(TryLockError::Poisoned(poisoned)) => {
394 let mut guard = poisoned.into_inner();
395 guard.append_batch(records)
396 }
397 }
398 }
399
400 fn contention_events(&self) -> u64 {
401 self.contention_events.load(Ordering::Relaxed)
402 }
403}
404
405#[derive(Debug)]
406pub struct PerCoreWalBufferPool {
407 cells: Vec<BufferCell>,
408}
409
410impl PerCoreWalBufferPool {
411 pub fn new(core_count: usize, config: BufferConfig) -> Self {
412 assert!(core_count > 0, "core_count must be > 0");
413 let mut cells = Vec::with_capacity(core_count);
414 for core_id in 0..core_count {
415 cells.push(BufferCell::new(core_id, config));
416 }
417 Self { cells }
418 }
419
420 pub fn append_to_core(
421 &self,
422 core_id: usize,
423 record: WalRecord,
424 ) -> Result<AppendOutcome, String> {
425 let Some(cell) = self.cells.get(core_id) else {
426 return Err(format!(
427 "invalid core_id={core_id}; available cores={}",
428 self.cells.len()
429 ));
430 };
431 Ok(cell.append(record))
432 }
433
434 pub fn append_batch_to_core(
435 &self,
436 core_id: usize,
437 records: Vec<WalRecord>,
438 ) -> Result<AppendOutcome, String> {
439 let Some(cell) = self.cells.get(core_id) else {
440 return Err(format!(
441 "invalid core_id={core_id}; available cores={}",
442 self.cells.len()
443 ));
444 };
445 Ok(cell.append_batch(records))
446 }
447
448 pub fn contention_events_total(&self) -> u64 {
449 self.cells
450 .iter()
451 .map(BufferCell::contention_events)
452 .sum::<u64>()
453 }
454
455 pub fn core_count(&self) -> usize {
456 self.cells.len()
457 }
458
459 pub fn seal_active_for_epoch(&self, epoch: u64) -> Result<usize, String> {
460 let mut sealed_lanes = 0_usize;
461
462 for (core_id, cell) in self.cells.iter().enumerate() {
463 let did_seal = {
464 let mut guard = cell
465 .inner
466 .lock()
467 .unwrap_or_else(std::sync::PoisonError::into_inner);
468 if guard.active_state() == BufferState::Writable && guard.active_len() > 0 {
469 guard.seal_active(epoch).map_err(|error| {
470 format!(
471 "core {core_id}: failed to seal active lane for epoch {epoch}: {error}"
472 )
473 })?;
474 true
475 } else {
476 false
477 }
478 };
479
480 if did_seal {
481 sealed_lanes = sealed_lanes.saturating_add(1);
482 }
483 }
484
485 Ok(sealed_lanes)
486 }
487
488 pub fn flush_epoch_batches(&self, epoch: u64) -> Result<(Vec<WalRecord>, Vec<usize>), String> {
489 let mut all_records = Vec::new();
490 let mut records_per_core = Vec::with_capacity(self.cells.len());
491
492 for (core_id, cell) in self.cells.iter().enumerate() {
493 let core_records = {
494 let mut guard = cell
495 .inner
496 .lock()
497 .unwrap_or_else(std::sync::PoisonError::into_inner);
498
499 if matches!(guard.active_state(), BufferState::Sealed { epoch: sealed_epoch } if sealed_epoch == epoch)
500 {
501 guard.begin_flush().map_err(|error| {
502 format!("core {core_id}: begin_flush failed for epoch {epoch}: {error}")
503 })?;
504 }
505
506 let should_drain = matches!(
507 guard.flush_state(),
508 BufferState::Flushing {
509 epoch: flushing_epoch
510 } if flushing_epoch == epoch
511 );
512
513 let core_records = if should_drain {
514 guard
517 .flush_lane
518 .records
519 .retain(|record| record.end_seq.is_some());
520
521 if guard
522 .flush_lane
523 .records
524 .iter()
525 .any(|record| record.epoch != epoch)
526 {
527 return Err(format!(
528 "core {core_id}: epoch boundary straddle detected in flush lane for epoch {epoch}"
529 ));
530 }
531
532 let core_records = std::mem::take(&mut guard.flush_lane.records);
533 guard.flush_lane.bytes_used = 0;
534 guard.flush_lane.state = BufferState::Writable;
535 guard.drain_overflow_into_active();
536 Some(core_records)
537 } else {
538 None
539 };
540
541 drop(guard);
542 core_records
543 };
544
545 if let Some(core_records) = core_records {
546 records_per_core.push(core_records.len());
547 all_records.extend(core_records);
548 } else {
549 records_per_core.push(0);
550 }
551 }
552
553 Ok((all_records, records_per_core))
554 }
555}
556
557#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub struct EpochConfig {
559 pub advance_interval_ms: u64,
560}
561
562impl Default for EpochConfig {
563 fn default() -> Self {
564 Self {
565 advance_interval_ms: DEFAULT_EPOCH_ADVANCE_INTERVAL_MS,
566 }
567 }
568}
569
570#[derive(Debug, Clone)]
571pub struct EpochFlushBatch {
572 pub epoch: u64,
573 pub records: Vec<WalRecord>,
574 pub records_per_core: Vec<usize>,
575}
576
577impl EpochFlushBatch {
578 pub fn total_records(&self) -> usize {
579 self.records.len()
580 }
581}
582
583#[derive(Debug)]
584struct EpochWaitState {
585 observed_epochs: Vec<u64>,
586 durable_epoch: Option<u64>,
587}
588
589#[derive(Debug)]
590pub struct EpochOrderCoordinator {
591 pool: Arc<PerCoreWalBufferPool>,
592 current_epoch: AtomicU64,
593 append_epoch: AtomicU64,
594 advance_lock: Mutex<()>,
595 wait_state: Mutex<EpochWaitState>,
596 wait_cv: Condvar,
597 config: EpochConfig,
598}
599
600impl EpochOrderCoordinator {
601 pub fn new(core_count: usize, buffer_config: BufferConfig, config: EpochConfig) -> Self {
602 let pool = Arc::new(PerCoreWalBufferPool::new(core_count, buffer_config));
603 let wait_state = EpochWaitState {
604 observed_epochs: vec![0; core_count],
605 durable_epoch: None,
606 };
607 Self {
608 pool,
609 current_epoch: AtomicU64::new(0),
610 append_epoch: AtomicU64::new(0),
611 advance_lock: Mutex::new(()),
612 wait_state: Mutex::new(wait_state),
613 wait_cv: Condvar::new(),
614 config,
615 }
616 }
617
618 pub fn current_epoch(&self) -> u64 {
619 self.current_epoch.load(Ordering::SeqCst)
620 }
621
622 pub fn current_append_epoch(&self) -> u64 {
623 self.append_epoch.load(Ordering::SeqCst)
624 }
625
626 pub fn durable_epoch(&self) -> Option<u64> {
627 let guard = self
628 .wait_state
629 .lock()
630 .unwrap_or_else(std::sync::PoisonError::into_inner);
631 guard.durable_epoch
632 }
633
634 pub fn epoch_advance_interval(&self) -> Duration {
635 Duration::from_millis(self.config.advance_interval_ms)
636 }
637
638 pub fn observe_epoch(&self, core_id: usize) -> Result<u64, String> {
639 let observed_epoch = self.current_epoch();
640 let mut guard = self
641 .wait_state
642 .lock()
643 .unwrap_or_else(std::sync::PoisonError::into_inner);
644 let Some(slot) = guard.observed_epochs.get_mut(core_id) else {
645 return Err(format!(
646 "invalid core_id={core_id}; available cores={}",
647 guard.observed_epochs.len()
648 ));
649 };
650 *slot = (*slot).max(observed_epoch);
651 drop(guard);
652 self.wait_cv.notify_all();
653 Ok(observed_epoch)
654 }
655
656 pub fn append_to_core(
657 &self,
658 core_id: usize,
659 begin_seq: u64,
660 payload_len: usize,
661 ) -> Result<AppendOutcome, String> {
662 let mut record = make_record(core_id, begin_seq, payload_len);
663 record.epoch = self.current_append_epoch();
664 self.pool.append_to_core(core_id, record)
665 }
666
667 pub fn append_records_to_core(
668 &self,
669 core_id: usize,
670 records: Vec<WalRecord>,
671 ) -> Result<AppendOutcome, String> {
672 self.pool.append_batch_to_core(core_id, records)
673 }
674
675 pub fn advance_epoch_and_wait(
676 &self,
677 active_cores: &[usize],
678 timeout: Duration,
679 ) -> Result<u64, String> {
680 let _advance_guard = self
681 .advance_lock
682 .lock()
683 .unwrap_or_else(std::sync::PoisonError::into_inner);
684 let core_count = self.pool.core_count();
685 for &core_id in active_cores {
686 if core_id >= core_count {
687 return Err(format!(
688 "invalid active core_id={core_id}; available cores={core_count}"
689 ));
690 }
691 }
692
693 let previous_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
694 let next_epoch = previous_epoch.saturating_add(1);
695 let deadline = Instant::now() + timeout;
696 let mut guard = self
697 .wait_state
698 .lock()
699 .unwrap_or_else(std::sync::PoisonError::into_inner);
700
701 while active_cores
702 .iter()
703 .any(|core_id| guard.observed_epochs[*core_id] < next_epoch)
704 {
705 let now = Instant::now();
706 if now >= deadline {
707 self.current_epoch.store(previous_epoch, Ordering::SeqCst);
708 return Err(format!(
709 "epoch fence timed out waiting for active cores to observe epoch {next_epoch}"
710 ));
711 }
712
713 let remaining = deadline.saturating_duration_since(now);
714 let (next_guard, wait_result) = self
715 .wait_cv
716 .wait_timeout(guard, remaining)
717 .unwrap_or_else(std::sync::PoisonError::into_inner);
718 guard = next_guard;
719 if wait_result.timed_out()
720 && active_cores
721 .iter()
722 .any(|core_id| guard.observed_epochs[*core_id] < next_epoch)
723 {
724 self.current_epoch.store(previous_epoch, Ordering::SeqCst);
725 return Err(format!(
726 "epoch fence timed out waiting for active cores to observe epoch {next_epoch}"
727 ));
728 }
729 }
730
731 drop(guard);
732 self.pool.seal_active_for_epoch(previous_epoch)?;
733 self.append_epoch.store(next_epoch, Ordering::SeqCst);
734 Ok(next_epoch)
735 }
736
737 pub fn flush_epoch(&self, epoch: u64) -> Result<EpochFlushBatch, String> {
738 let (records, records_per_core) = self.pool.flush_epoch_batches(epoch)?;
739 Ok(EpochFlushBatch {
740 epoch,
741 records,
742 records_per_core,
743 })
744 }
745
746 pub fn mark_epoch_durable(&self, epoch: u64) {
747 let mut guard = self
748 .wait_state
749 .lock()
750 .unwrap_or_else(std::sync::PoisonError::into_inner);
751 guard.durable_epoch = Some(
752 guard
753 .durable_epoch
754 .map_or(epoch, |existing| existing.max(epoch)),
755 );
756 drop(guard);
757 self.wait_cv.notify_all();
758 }
759
760 pub fn wait_until_epoch_durable(&self, epoch: u64, timeout: Duration) -> Result<(), String> {
761 let deadline = Instant::now() + timeout;
762 let mut guard = self
763 .wait_state
764 .lock()
765 .unwrap_or_else(std::sync::PoisonError::into_inner);
766 while guard
767 .durable_epoch
768 .is_none_or(|durable_epoch| durable_epoch < epoch)
769 {
770 let now = Instant::now();
771 if now >= deadline {
772 return Err(format!("timeout while waiting for durable epoch {epoch}"));
773 }
774 let remaining = deadline.saturating_duration_since(now);
775 let (next_guard, wait_result) = self
776 .wait_cv
777 .wait_timeout(guard, remaining)
778 .unwrap_or_else(std::sync::PoisonError::into_inner);
779 guard = next_guard;
780 if wait_result.timed_out()
781 && guard
782 .durable_epoch
783 .is_none_or(|durable_epoch| durable_epoch < epoch)
784 {
785 return Err(format!("timeout while waiting for durable epoch {epoch}"));
786 }
787 }
788 Ok(())
789 }
790
791 pub fn recovery_order(records: &[WalRecord]) -> Vec<WalRecord> {
792 let mut ordered = records.to_vec();
793 ordered.sort_by_key(|record| {
794 (
795 record.epoch,
796 record.begin_seq.get(),
797 record.txn_token.id.get(),
798 record.page_id.get(),
799 )
800 });
801 ordered
802 }
803}
804
805fn make_record(core_id: usize, seq: u64, payload_len: usize) -> WalRecord {
806 let core_u64 = u64::try_from(core_id).expect("core id should fit into u64");
807 let txn_id_raw = core_u64.saturating_mul(1_000_000).saturating_add(seq + 1);
808 let txn_id = TxnId::new(txn_id_raw).expect("txn id should be non-zero");
809
810 let page_raw = u32::try_from(core_id + 1).expect("core id should fit into u32");
811 let page_id = PageNumber::new(page_raw).expect("page id should be non-zero");
812
813 WalRecord {
814 txn_token: TxnToken::new(txn_id, TxnEpoch::new(1)),
815 epoch: seq,
816 page_id,
817 begin_seq: CommitSeq::new(seq),
818 end_seq: Some(CommitSeq::new(seq.saturating_add(1))),
819 before_image: vec![0x10; payload_len],
820 after_image: vec![0x20; payload_len],
821 }
822}
823
824#[cfg(test)]
825fn test_runtime() -> Runtime {
826 RuntimeBuilder::current_thread()
827 .blocking_threads(8, 8)
828 .build()
829 .expect("runtime build")
830}
831
832#[test]
833fn bd_ncivz_1_state_machine_double_buffering() {
834 let config = BufferConfig {
835 capacity_bytes: 640,
836 ..BufferConfig::default()
837 };
838 let mut buffer = PerCoreWalBuffer::new(0, config);
839
840 assert_eq!(buffer.active_state(), BufferState::Writable);
841 assert_eq!(buffer.flush_state(), BufferState::Writable);
842
843 assert_eq!(
844 buffer.append(make_record(0, 1, 64)),
845 AppendOutcome::Appended
846 );
847 assert_eq!(
848 buffer.append(make_record(0, 2, 64)),
849 AppendOutcome::Appended
850 );
851 assert_eq!(buffer.active_len(), 2);
852
853 buffer.seal_active(7).expect("active lane should seal");
854 assert_eq!(buffer.active_state(), BufferState::Sealed { epoch: 7 });
855
856 let flushed_records = buffer.begin_flush().expect("sealed lane should flush");
857 assert_eq!(flushed_records, 2);
858 assert_eq!(buffer.flush_state(), BufferState::Flushing { epoch: 7 });
859 assert_eq!(buffer.active_state(), BufferState::Writable);
860
861 assert_eq!(
862 buffer.append(make_record(0, 3, 64)),
863 AppendOutcome::Appended
864 );
865 assert_eq!(buffer.active_len(), 1);
866
867 buffer
868 .complete_flush()
869 .expect("flushing lane should complete");
870 assert_eq!(buffer.flush_state(), BufferState::Writable);
871 assert_eq!(buffer.flush_len(), 0);
872 assert_eq!(buffer.active_len(), 1);
873 assert_eq!(
874 buffer.fallback_decision(),
875 FallbackDecision::ContinueParallel
876 );
877}
878
879#[test]
880fn bd_ncivz_1_sealed_lane_rejects_mutating_appends() {
881 let config = BufferConfig {
882 capacity_bytes: 640,
883 ..BufferConfig::default()
884 };
885 let mut buffer = PerCoreWalBuffer::new(0, config);
886
887 let seeded = make_record(0, 1, 64);
888 assert_eq!(buffer.append(seeded.clone()), AppendOutcome::Appended);
889 buffer
890 .seal_active(3)
891 .expect("active lane should seal before flush");
892
893 assert_eq!(
894 buffer.append(make_record(0, 2, 64)),
895 AppendOutcome::Blocked,
896 "sealed lane must reject mutating appends"
897 );
898
899 let flushed_records = buffer
900 .begin_flush()
901 .expect("sealed records should move into flush lane");
902 assert_eq!(flushed_records, 1);
903 assert_eq!(buffer.flush_lane.records.len(), 1);
904
905 let flushed = &buffer.flush_lane.records[0];
906 assert_eq!(
907 flushed.txn_token.id.get(),
908 seeded.txn_token.id.get(),
909 "flushed lane contents must remain unchanged after blocked append"
910 );
911 assert_eq!(
912 flushed.end_seq, seeded.end_seq,
913 "commit metadata must remain intact while lane is sealed"
914 );
915}
916
917#[test]
918fn bd_ncivz_1_overflow_block_writer_policy() {
919 let config = BufferConfig {
920 capacity_bytes: 160,
921 overflow_policy: OverflowPolicy::BlockWriter,
922 overflow_fallback_bytes: 320,
923 };
924 let mut buffer = PerCoreWalBuffer::new(1, config);
925
926 assert_eq!(
927 buffer.append(make_record(1, 1, 48)),
928 AppendOutcome::Appended
929 );
930 assert_eq!(buffer.append(make_record(1, 2, 48)), AppendOutcome::Blocked);
931 assert_eq!(buffer.overflow_len(), 0);
932 assert_eq!(
933 buffer.fallback_decision(),
934 FallbackDecision::ContinueParallel
935 );
936}
937
938#[test]
939fn bd_ncivz_1_overflow_allocate_triggers_deterministic_fallback() {
940 let config = BufferConfig {
941 capacity_bytes: 192,
942 overflow_policy: OverflowPolicy::AllocateOverflow,
943 overflow_fallback_bytes: 170,
944 };
945 let mut buffer = PerCoreWalBuffer::new(2, config);
946
947 assert_eq!(
948 buffer.append(make_record(2, 1, 64)),
949 AppendOutcome::Appended
950 );
951 assert_eq!(
952 buffer.append(make_record(2, 2, 64)),
953 AppendOutcome::QueuedOverflow
954 );
955 assert_eq!(
956 buffer.append(make_record(2, 3, 64)),
957 AppendOutcome::QueuedOverflow
958 );
959
960 assert_eq!(
961 buffer.fallback_decision(),
962 FallbackDecision::ForceSerializedDrain
963 );
964 assert_eq!(buffer.overflow_len(), 2);
965
966 let drained = buffer.force_serialized_drain();
967 assert_eq!(drained.len(), 3);
968 assert_eq!(buffer.active_len(), 0);
969 assert_eq!(buffer.flush_len(), 0);
970 assert_eq!(buffer.overflow_len(), 0);
971 assert_eq!(
972 buffer.fallback_decision(),
973 FallbackDecision::ContinueParallel
974 );
975}
976
977#[test]
978fn bd_ncivz_1_per_core_pool_concurrent_writers_no_contention() {
979 let pool = Arc::new(PerCoreWalBufferPool::new(8, BufferConfig::default()));
980 let records_per_core = 400_u64;
981 let runtime = test_runtime();
982 let handle = runtime.handle();
983
984 runtime.block_on(async {
985 let mut writer_tasks = Vec::new();
986 for core_id in 0..8_usize {
987 let pool_ref = Arc::clone(&pool);
988 writer_tasks.push(
989 handle
990 .try_spawn(async move {
991 spawn_blocking(move || {
992 for seq in 0..records_per_core {
993 let record = make_record(core_id, seq, 64);
994 let outcome = pool_ref
995 .append_to_core(core_id, record)
996 .expect("core index should exist");
997 assert!(
998 matches!(
999 outcome,
1000 AppendOutcome::Appended | AppendOutcome::QueuedOverflow
1001 ),
1002 "append outcome should not block"
1003 );
1004 }
1005 })
1006 .await;
1007 })
1008 .expect("writer task should spawn"),
1009 );
1010 }
1011
1012 for writer_task in writer_tasks {
1013 writer_task.await;
1014 }
1015 });
1016
1017 assert_eq!(pool.contention_events_total(), 0);
1018}
1019
1020#[test]
1021fn bd_ncivz_2_epoch_counter_defaults_to_10ms_interval() {
1022 let coordinator =
1023 EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1024 assert_eq!(coordinator.current_epoch(), 0);
1025 assert_eq!(
1026 coordinator.epoch_advance_interval(),
1027 Duration::from_millis(10)
1028 );
1029 assert_eq!(coordinator.pool.core_count(), 2);
1030}
1031
1032#[test]
1033fn bd_ncivz_2_epoch_fence_waits_for_active_core_observation() {
1034 let coordinator = Arc::new(EpochOrderCoordinator::new(
1035 2,
1036 BufferConfig::default(),
1037 EpochConfig::default(),
1038 ));
1039 coordinator
1040 .observe_epoch(0)
1041 .expect("core 0 should be valid");
1042 coordinator
1043 .observe_epoch(1)
1044 .expect("core 1 should be valid");
1045 let runtime = test_runtime();
1046 let handle = runtime.handle();
1047
1048 let next_epoch = runtime.block_on(async {
1049 let observer_ref = Arc::clone(&coordinator);
1050 let observer = handle
1051 .try_spawn(async move {
1052 sleep(wall_now(), Duration::from_millis(8)).await;
1053 observer_ref
1054 .observe_epoch(0)
1055 .expect("core 0 observation should succeed");
1056 observer_ref
1057 .observe_epoch(1)
1058 .expect("core 1 observation should succeed");
1059 })
1060 .expect("observer task should spawn");
1061
1062 let fence_ref = Arc::clone(&coordinator);
1063 let next_epoch = spawn_blocking(move || {
1064 fence_ref.advance_epoch_and_wait(&[0, 1], Duration::from_millis(200))
1065 })
1066 .await
1067 .expect("fence should complete after observations");
1068 observer.await;
1069 next_epoch
1070 });
1071
1072 assert_eq!(next_epoch, 1);
1073 assert_eq!(coordinator.current_epoch(), 1);
1074}
1075
1076#[test]
1077fn bd_ncivz_2_append_during_epoch_fence_stays_in_previous_epoch_lane() {
1078 let coordinator = Arc::new(EpochOrderCoordinator::new(
1079 1,
1080 BufferConfig::default(),
1081 EpochConfig::default(),
1082 ));
1083 coordinator
1084 .observe_epoch(0)
1085 .expect("core 0 observation should succeed");
1086 coordinator
1087 .append_to_core(0, 1, 64)
1088 .expect("initial append should succeed");
1089
1090 let runtime = test_runtime();
1091 let handle = runtime.handle();
1092
1093 runtime.block_on(async {
1094 let advance_ref = Arc::clone(&coordinator);
1095 let advance = handle
1096 .try_spawn(async move {
1097 spawn_blocking(move || {
1098 advance_ref.advance_epoch_and_wait(&[0], Duration::from_millis(200))
1099 })
1100 .await
1101 })
1102 .expect("advance task should spawn");
1103
1104 let deadline = Instant::now() + Duration::from_millis(200);
1105 while coordinator.current_epoch() != 1 {
1106 assert!(
1107 Instant::now() < deadline,
1108 "advance should publish epoch 1 before timing out"
1109 );
1110 std::thread::yield_now();
1111 }
1112 assert_eq!(
1113 coordinator.current_append_epoch(),
1114 0,
1115 "writers must keep appending to the previous epoch until sealing completes"
1116 );
1117
1118 coordinator
1119 .append_to_core(0, 2, 64)
1120 .expect("append during fence window should still succeed");
1121 coordinator
1122 .observe_epoch(0)
1123 .expect("core 0 observation should release the fence");
1124
1125 let next_epoch = advance.await.expect("advance should complete");
1126 assert_eq!(next_epoch, 1);
1127 });
1128
1129 let batch = coordinator
1130 .flush_epoch(0)
1131 .expect("flush should not see an epoch straddle");
1132 assert_eq!(batch.total_records(), 2);
1133 assert_eq!(batch.records_per_core, vec![2]);
1134 assert!(
1135 batch.records.iter().all(|record| record.epoch == 0),
1136 "records appended during the fence window must stay in epoch 0: {:?}",
1137 batch
1138 .records
1139 .iter()
1140 .map(|record| record.epoch)
1141 .collect::<Vec<_>>()
1142 );
1143}
1144
1145#[test]
1146fn bd_ncivz_2_group_commit_flushes_epoch_across_cores() {
1147 let coordinator = Arc::new(EpochOrderCoordinator::new(
1148 2,
1149 BufferConfig::default(),
1150 EpochConfig::default(),
1151 ));
1152
1153 coordinator
1154 .observe_epoch(0)
1155 .expect("core 0 should be valid");
1156 coordinator
1157 .observe_epoch(1)
1158 .expect("core 1 should be valid");
1159
1160 assert_eq!(
1161 coordinator
1162 .append_to_core(0, 1, 64)
1163 .expect("append on core 0 should succeed"),
1164 AppendOutcome::Appended
1165 );
1166 assert_eq!(
1167 coordinator
1168 .append_to_core(0, 2, 64)
1169 .expect("append on core 0 should succeed"),
1170 AppendOutcome::Appended
1171 );
1172 assert_eq!(
1173 coordinator
1174 .append_to_core(1, 3, 64)
1175 .expect("append on core 1 should succeed"),
1176 AppendOutcome::Appended
1177 );
1178 let runtime = test_runtime();
1179 let handle = runtime.handle();
1180
1181 runtime.block_on(async {
1182 let observer_ref = Arc::clone(&coordinator);
1183 let observer = handle
1184 .try_spawn(async move {
1185 sleep(wall_now(), Duration::from_millis(6)).await;
1186 observer_ref
1187 .observe_epoch(0)
1188 .expect("core 0 observation should succeed");
1189 observer_ref
1190 .observe_epoch(1)
1191 .expect("core 1 observation should succeed");
1192 })
1193 .expect("observer task should spawn");
1194
1195 let advance_ref = Arc::clone(&coordinator);
1196 spawn_blocking(move || {
1197 advance_ref.advance_epoch_and_wait(&[0, 1], Duration::from_millis(200))
1198 })
1199 .await
1200 .expect("epoch advance should succeed");
1201 observer.await;
1202
1203 let batch = coordinator
1204 .flush_epoch(0)
1205 .expect("epoch 0 flush should succeed");
1206 assert_eq!(batch.epoch, 0);
1207 assert_eq!(batch.records_per_core, vec![2, 1]);
1208 assert_eq!(batch.total_records(), 3);
1209 assert!(batch.records.iter().all(|record| record.epoch == 0));
1210
1211 assert_eq!(coordinator.durable_epoch(), None);
1212 coordinator.mark_epoch_durable(0);
1213 assert_eq!(coordinator.durable_epoch(), Some(0));
1214 coordinator
1215 .wait_until_epoch_durable(0, Duration::from_millis(25))
1216 .expect("durability wait should pass");
1217 });
1218}
1219
1220#[test]
1221fn bd_ncivz_2_writers_block_until_epoch_is_durable() {
1222 let coordinator = Arc::new(EpochOrderCoordinator::new(
1223 1,
1224 BufferConfig::default(),
1225 EpochConfig::default(),
1226 ));
1227 coordinator
1228 .observe_epoch(0)
1229 .expect("core 0 observation should succeed");
1230 coordinator
1231 .append_to_core(0, 1, 64)
1232 .expect("append should succeed");
1233 let runtime = test_runtime();
1234 let handle = runtime.handle();
1235
1236 runtime.block_on(async {
1237 let waiter_ref = Arc::clone(&coordinator);
1238 let waiter = handle
1239 .try_spawn(async move {
1240 spawn_blocking(move || {
1241 waiter_ref.wait_until_epoch_durable(0, Duration::from_millis(600))
1242 })
1243 .await
1244 })
1245 .expect("waiter task should spawn");
1246
1247 sleep(wall_now(), Duration::from_millis(30)).await;
1248 assert!(
1249 !waiter.is_finished(),
1250 "writer should still be waiting before flush"
1251 );
1252
1253 let advance_ref = Arc::clone(&coordinator);
1254 spawn_blocking(move || advance_ref.advance_epoch_and_wait(&[], Duration::from_millis(25)))
1255 .await
1256 .expect("advancing with no active fence set should succeed");
1257 assert_eq!(
1258 coordinator
1259 .flush_epoch(0)
1260 .expect("flush should succeed")
1261 .total_records(),
1262 1
1263 );
1264 assert_eq!(coordinator.durable_epoch(), None);
1265 coordinator.mark_epoch_durable(0);
1266
1267 waiter.await.expect("epoch should become durable");
1268 });
1269}
1270
1271#[test]
1272fn bd_ncivz_2_recovery_replays_in_epoch_order() {
1273 let mut r1 = make_record(0, 10, 16);
1274 r1.epoch = 2;
1275 let mut r2 = make_record(1, 2, 16);
1276 r2.epoch = 1;
1277 let mut r3 = make_record(0, 3, 16);
1278 r3.epoch = 1;
1279 let mut r4 = make_record(1, 9, 16);
1280 r4.epoch = 2;
1281
1282 let ordered = EpochOrderCoordinator::recovery_order(&[r1, r2, r3, r4]);
1283 assert!(
1284 ordered
1285 .windows(2)
1286 .all(|pair| pair[0].epoch <= pair[1].epoch)
1287 );
1288 assert_eq!(ordered[0].epoch, 1);
1289 assert_eq!(ordered[1].epoch, 1);
1290 assert_eq!(ordered[2].epoch, 2);
1291 assert_eq!(ordered[3].epoch, 2);
1292}
1293
1294#[test]
1295fn bd_ncivz_2_epoch_fence_timeout_when_active_core_not_observed() {
1296 let coordinator =
1297 EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1298 let error = coordinator
1299 .advance_epoch_and_wait(&[1], Duration::from_millis(20))
1300 .expect_err("fence must timeout without active core observation");
1301 assert!(
1302 error.contains("timed out"),
1303 "error should describe fence timeout: {error}"
1304 );
1305}
1306
1307#[test]
1308fn bd_ncivz_2_epoch_fence_timeout_does_not_poison_next_advance() {
1309 let coordinator =
1310 EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1311
1312 coordinator
1313 .append_to_core(0, 1, 64)
1314 .expect("initial append should succeed");
1315
1316 let error = coordinator
1317 .advance_epoch_and_wait(&[0], Duration::from_millis(20))
1318 .expect_err("fence must timeout without active core observation");
1319 assert!(
1320 error.contains("timed out"),
1321 "error should describe fence timeout: {error}"
1322 );
1323 assert_eq!(
1324 coordinator.current_epoch(),
1325 0,
1326 "timed-out epoch advance must roll back the published epoch"
1327 );
1328 assert_eq!(
1329 coordinator.current_append_epoch(),
1330 0,
1331 "timed-out epoch advance must not move the append epoch"
1332 );
1333
1334 coordinator
1335 .append_to_core(0, 2, 64)
1336 .expect("append after timeout should still use epoch 0");
1337 coordinator
1338 .advance_epoch_and_wait(&[], Duration::from_millis(25))
1339 .expect("retry advance should succeed");
1340
1341 let batch = coordinator
1342 .flush_epoch(0)
1343 .expect("retry advance should seal the original epoch");
1344 assert_eq!(batch.total_records(), 2);
1345 assert!(
1346 batch.records.iter().all(|record| record.epoch == 0),
1347 "timed-out fence must not relabel epoch-0 records: {:?}",
1348 batch
1349 .records
1350 .iter()
1351 .map(|record| record.epoch)
1352 .collect::<Vec<_>>()
1353 );
1354}
1355
1356#[test]
1357fn bd_ncivz_2_fence_detects_epoch_boundary_straddle() {
1358 let pool = PerCoreWalBufferPool::new(1, BufferConfig::default());
1359 let mut epoch0 = make_record(0, 1, 64);
1360 epoch0.epoch = 0;
1361 let mut epoch1 = make_record(0, 2, 64);
1362 epoch1.epoch = 1;
1363
1364 assert_eq!(
1365 pool.append_to_core(0, epoch0)
1366 .expect("append should succeed"),
1367 AppendOutcome::Appended
1368 );
1369 assert_eq!(
1370 pool.append_to_core(0, epoch1)
1371 .expect("append should succeed"),
1372 AppendOutcome::Appended
1373 );
1374 pool.seal_active_for_epoch(0)
1375 .expect("sealing should succeed");
1376
1377 let error = pool
1378 .flush_epoch_batches(0)
1379 .expect_err("mixed epochs in one flush lane must fail");
1380 assert!(
1381 error.contains("straddle"),
1382 "error should report epoch straddle: {error}"
1383 );
1384}
1385
1386#[test]
1387fn bd_ncivz_2_commits_across_epochs_preserve_serial_epoch_order() {
1388 let coordinator =
1389 EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1390 coordinator
1391 .observe_epoch(0)
1392 .expect("core 0 observation should succeed");
1393
1394 coordinator
1395 .append_to_core(0, 1, 64)
1396 .expect("append should succeed");
1397 coordinator
1398 .advance_epoch_and_wait(&[], Duration::from_millis(25))
1399 .expect("advance should succeed");
1400 let first_batch = coordinator
1401 .flush_epoch(0)
1402 .expect("epoch 0 flush should succeed");
1403
1404 coordinator
1405 .observe_epoch(0)
1406 .expect("core 0 observation should succeed");
1407 coordinator
1408 .append_to_core(0, 2, 64)
1409 .expect("append should succeed");
1410 coordinator
1411 .advance_epoch_and_wait(&[], Duration::from_millis(25))
1412 .expect("advance should succeed");
1413 let second_batch = coordinator
1414 .flush_epoch(1)
1415 .expect("epoch 1 flush should succeed");
1416
1417 let mut combined = first_batch.records;
1418 combined.extend(second_batch.records);
1419 let ordered = EpochOrderCoordinator::recovery_order(&combined);
1420 assert!(
1421 ordered.windows(2).all(|pair| {
1422 pair[0].epoch < pair[1].epoch
1423 || (pair[0].epoch == pair[1].epoch
1424 && pair[0].begin_seq.get() <= pair[1].begin_seq.get())
1425 }),
1426 "recovery ordering should be monotonic by epoch and begin_seq"
1427 );
1428}
1429
1430#[test]
1431fn bd_ncivz_2_abort_cleanup_drops_non_committed_records() {
1432 let coordinator =
1433 EpochOrderCoordinator::new(1, BufferConfig::default(), EpochConfig::default());
1434 coordinator
1435 .observe_epoch(0)
1436 .expect("core 0 observation should succeed");
1437
1438 coordinator
1439 .append_to_core(0, 1, 64)
1440 .expect("committed append should succeed");
1441
1442 let mut aborted = make_record(0, 2, 64);
1443 aborted.epoch = coordinator.current_epoch();
1444 aborted.end_seq = None;
1445 let aborted_txn_id = aborted.txn_token.id.get();
1446 assert_eq!(
1447 coordinator
1448 .pool
1449 .append_to_core(0, aborted)
1450 .expect("aborted append should still enter active lane"),
1451 AppendOutcome::Appended
1452 );
1453
1454 coordinator
1455 .advance_epoch_and_wait(&[], Duration::from_millis(25))
1456 .expect("epoch advance should succeed");
1457 let batch = coordinator
1458 .flush_epoch(0)
1459 .expect("flush should prune aborted records");
1460
1461 assert_eq!(batch.total_records(), 1);
1462 assert!(
1463 batch.records.iter().all(|record| record.end_seq.is_some()),
1464 "all flushed records must be committed"
1465 );
1466 assert!(
1467 batch
1468 .records
1469 .iter()
1470 .all(|record| record.txn_token.id.get() != aborted_txn_id),
1471 "aborted record must not survive cleanup"
1472 );
1473
1474 let ordered = EpochOrderCoordinator::recovery_order(&batch.records);
1475 assert_eq!(ordered.len(), 1);
1476 assert!(
1477 ordered[0].end_seq.is_some(),
1478 "recovery input should only contain committed records"
1479 );
1480}
1481
1482#[test]
1485fn seal_active_on_already_sealed_returns_error() {
1486 let config = BufferConfig {
1487 capacity_bytes: 640,
1488 ..BufferConfig::default()
1489 };
1490 let mut buf = PerCoreWalBuffer::new(0, config);
1491 buf.seal_active(1).expect("first seal");
1492 assert!(buf.seal_active(2).is_err());
1493}
1494
1495#[test]
1496fn begin_flush_on_writable_returns_error() {
1497 let config = BufferConfig {
1498 capacity_bytes: 640,
1499 ..BufferConfig::default()
1500 };
1501 let mut buf = PerCoreWalBuffer::new(0, config);
1502 assert!(buf.begin_flush().is_err());
1503}
1504
1505#[test]
1506fn complete_flush_on_non_flushing_returns_error() {
1507 let config = BufferConfig {
1508 capacity_bytes: 640,
1509 ..BufferConfig::default()
1510 };
1511 let mut buf = PerCoreWalBuffer::new(0, config);
1512 assert!(buf.complete_flush().is_err());
1513}
1514
1515#[test]
1516fn force_serialized_drain_collects_all_lanes() {
1517 let config = BufferConfig {
1518 capacity_bytes: 640,
1519 overflow_policy: OverflowPolicy::AllocateOverflow,
1520 overflow_fallback_bytes: 4096,
1521 };
1522 let mut buf = PerCoreWalBuffer::new(0, config);
1523
1524 buf.append(make_record(0, 1, 64));
1525 buf.seal_active(1).unwrap();
1526 buf.begin_flush().unwrap();
1527
1528 buf.append(make_record(0, 2, 64));
1529
1530 let drained = buf.force_serialized_drain();
1531 assert_eq!(drained.len(), 2);
1532 assert_eq!(buf.active_state(), BufferState::Writable);
1533 assert_eq!(buf.flush_state(), BufferState::Writable);
1534 assert_eq!(buf.active_len(), 0);
1535 assert_eq!(buf.flush_len(), 0);
1536 assert_eq!(buf.overflow_len(), 0);
1537 assert_eq!(buf.fallback_decision(), FallbackDecision::ContinueParallel);
1538}
1539
1540#[test]
1541fn overflow_drains_into_active_after_complete_flush() {
1542 let config = BufferConfig {
1543 capacity_bytes: 160,
1544 overflow_policy: OverflowPolicy::AllocateOverflow,
1545 overflow_fallback_bytes: 4096,
1546 };
1547 let mut buf = PerCoreWalBuffer::new(0, config);
1548
1549 buf.append(make_record(0, 1, 48));
1550 assert_eq!(
1551 buf.append(make_record(0, 2, 48)),
1552 AppendOutcome::QueuedOverflow
1553 );
1554 assert_eq!(buf.active_len(), 1);
1555 assert_eq!(buf.overflow_len(), 1);
1556
1557 buf.seal_active(1).unwrap();
1558 buf.begin_flush().unwrap();
1559 buf.complete_flush().unwrap();
1560
1561 assert!(buf.overflow_len() == 0 || buf.active_len() > 0);
1562 assert_eq!(buf.active_state(), BufferState::Writable);
1563}
1564
1565#[test]
1566fn append_record_exceeding_capacity_latches_fallback() {
1567 let config = BufferConfig {
1568 capacity_bytes: 32,
1569 ..BufferConfig::default()
1570 };
1571 let mut buf = PerCoreWalBuffer::new(0, config);
1572 let outcome = buf.append(make_record(0, 1, 128));
1573 assert_eq!(outcome, AppendOutcome::Blocked);
1574 assert_eq!(
1575 buf.fallback_decision(),
1576 FallbackDecision::ForceSerializedDrain
1577 );
1578}
1579
1580#[test]
1581fn append_batch_with_oversized_single_record_latches_fallback() {
1582 let config = BufferConfig {
1583 capacity_bytes: 32,
1584 ..BufferConfig::default()
1585 };
1586 let mut buf = PerCoreWalBuffer::new(0, config);
1587 let outcome = buf.append_batch(vec![make_record(0, 1, 128)]);
1588 assert_eq!(outcome, AppendOutcome::Blocked);
1589 assert_eq!(
1590 buf.fallback_decision(),
1591 FallbackDecision::ForceSerializedDrain
1592 );
1593}
1594
1595#[test]
1596fn append_batch_all_fit_returns_appended() {
1597 let config = BufferConfig {
1598 capacity_bytes: 4096,
1599 ..BufferConfig::default()
1600 };
1601 let mut buf = PerCoreWalBuffer::new(0, config);
1602 let records = vec![make_record(0, 1, 32), make_record(0, 2, 32)];
1603 assert_eq!(buf.append_batch(records), AppendOutcome::Appended);
1604 assert_eq!(buf.active_len(), 2);
1605}
1606
1607#[test]
1608fn recovery_order_sorts_by_epoch_then_seq() {
1609 let mut r1 = make_record(0, 10, 16);
1610 r1.epoch = 2;
1611 let mut r2 = make_record(0, 5, 16);
1612 r2.epoch = 1;
1613 let mut r3 = make_record(0, 7, 16);
1614 r3.epoch = 2;
1615
1616 let ordered = EpochOrderCoordinator::recovery_order(&[r1, r2.clone(), r3]);
1617 assert_eq!(ordered[0].epoch, 1);
1618 assert_eq!(ordered[0].begin_seq.get(), r2.begin_seq.get());
1619 assert!(ordered[1].epoch <= ordered[2].epoch);
1620}
1621
1622#[test]
1623fn buffer_config_default_values() {
1624 let config = BufferConfig::default();
1625 assert_eq!(config.capacity_bytes, DEFAULT_BUFFER_CAPACITY_BYTES);
1626 assert_eq!(
1627 config.overflow_fallback_bytes,
1628 DEFAULT_OVERFLOW_FALLBACK_BYTES
1629 );
1630 assert_eq!(config.overflow_policy, OverflowPolicy::AllocateOverflow);
1631}
1632
1633#[test]
1634fn thread_buffer_slot_stable_within_thread() {
1635 let slot1 = thread_buffer_slot(16);
1636 let slot2 = thread_buffer_slot(16);
1637 assert_eq!(slot1, slot2);
1638 assert!(slot1 < 16);
1639}
1640
1641#[test]
1642fn pool_invalid_core_id_returns_error() {
1643 let pool = PerCoreWalBufferPool::new(2, BufferConfig::default());
1644 assert!(pool.append_to_core(2, make_record(0, 1, 16)).is_err());
1645 assert!(pool.append_to_core(99, make_record(0, 1, 16)).is_err());
1646 assert!(pool.append_to_core(0, make_record(0, 1, 16)).is_ok());
1647}
1648
1649#[test]
1650fn pool_core_count_and_contention_start_at_zero() {
1651 let pool = PerCoreWalBufferPool::new(4, BufferConfig::default());
1652 assert_eq!(pool.core_count(), 4);
1653 assert_eq!(pool.contention_events_total(), 0);
1654}
1655
1656#[test]
1657fn epoch_config_default_values() {
1658 let cfg = EpochConfig::default();
1659 assert_eq!(cfg.advance_interval_ms, DEFAULT_EPOCH_ADVANCE_INTERVAL_MS);
1660}
1661
1662#[test]
1663fn epoch_flush_batch_total_records() {
1664 let batch = EpochFlushBatch {
1665 epoch: 1,
1666 records: vec![make_record(0, 1, 8), make_record(0, 2, 8)],
1667 records_per_core: vec![2],
1668 };
1669 assert_eq!(batch.total_records(), 2);
1670
1671 let empty = EpochFlushBatch {
1672 epoch: 0,
1673 records: Vec::new(),
1674 records_per_core: Vec::new(),
1675 };
1676 assert_eq!(empty.total_records(), 0);
1677}
1678
1679#[test]
1680fn wal_record_encoded_len_includes_images() {
1681 let r = make_record(0, 1, 100);
1682 let len = r.encoded_len();
1683 assert!(len >= RECORD_FIXED_OVERHEAD_BYTES + 200);
1684 let small = make_record(0, 1, 0);
1685 assert!(small.encoded_len() >= RECORD_FIXED_OVERHEAD_BYTES);
1686 assert!(small.encoded_len() < len);
1687}
1688
1689#[test]
1690fn buffer_state_variant_equality() {
1691 assert_eq!(BufferState::Writable, BufferState::Writable);
1692 assert_eq!(
1693 BufferState::Sealed { epoch: 5 },
1694 BufferState::Sealed { epoch: 5 }
1695 );
1696 assert_ne!(
1697 BufferState::Sealed { epoch: 5 },
1698 BufferState::Sealed { epoch: 6 }
1699 );
1700 assert_ne!(
1701 BufferState::Sealed { epoch: 1 },
1702 BufferState::Flushing { epoch: 1 }
1703 );
1704 let dbg = format!("{:?}", BufferState::Flushing { epoch: 3 });
1705 assert!(dbg.contains("Flushing"));
1706}
1707
1708#[test]
1709fn pool_append_batch_to_core_works() {
1710 let pool = PerCoreWalBufferPool::new(2, BufferConfig::default());
1711 let records = vec![make_record(0, 1, 16), make_record(0, 2, 16)];
1712 let outcome = pool.append_batch_to_core(0, records).unwrap();
1713 assert_eq!(outcome, AppendOutcome::Appended);
1714 assert!(
1715 pool.append_batch_to_core(99, vec![make_record(0, 1, 16)])
1716 .is_err()
1717 );
1718}
1719
1720#[test]
1721fn overflow_policy_debug_clone_copy_eq() {
1722 let a = OverflowPolicy::BlockWriter;
1723 let b = OverflowPolicy::AllocateOverflow;
1724 assert_ne!(a, b);
1725 let copied = a;
1726 assert_eq!(copied, a);
1727 let dbg = format!("{a:?}");
1728 assert!(dbg.contains("BlockWriter"));
1729 let dbg_b = format!("{b:?}");
1730 assert!(dbg_b.contains("AllocateOverflow"));
1731}
1732
1733#[test]
1734fn append_outcome_debug_clone_copy_eq() {
1735 let variants = [
1736 AppendOutcome::Appended,
1737 AppendOutcome::QueuedOverflow,
1738 AppendOutcome::Blocked,
1739 ];
1740 for v in &variants {
1741 let copied = *v;
1742 assert_eq!(copied, *v);
1743 }
1744 let dbg = format!("{:?}", AppendOutcome::QueuedOverflow);
1745 assert!(dbg.contains("QueuedOverflow"));
1746 assert_ne!(AppendOutcome::Appended, AppendOutcome::Blocked);
1747}
1748
1749#[test]
1750fn fallback_decision_debug_clone_copy_eq() {
1751 let a = FallbackDecision::ContinueParallel;
1752 let b = FallbackDecision::ForceSerializedDrain;
1753 assert_ne!(a, b);
1754 let copied = a;
1755 assert_eq!(copied, a);
1756 let dbg = format!("{b:?}");
1757 assert!(dbg.contains("ForceSerializedDrain"));
1758}
1759
1760#[test]
1761fn wal_record_debug_and_clone() {
1762 let r = make_record(0, 5, 32);
1763 let dbg = format!("{r:?}");
1764 assert!(dbg.contains("WalRecord"));
1765 let cloned = r.clone();
1766 assert_eq!(cloned.epoch, r.epoch);
1767 assert_eq!(cloned.page_id, r.page_id);
1768 assert_eq!(cloned.before_image.len(), r.before_image.len());
1769 assert_eq!(cloned.after_image, r.after_image);
1770}
1771
1772#[test]
1773fn epoch_coordinator_mark_and_query_durable_epoch() {
1774 let coord = EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1775 assert_eq!(coord.durable_epoch(), None);
1776 coord.mark_epoch_durable(5);
1777 assert_eq!(coord.durable_epoch(), Some(5));
1778 coord.mark_epoch_durable(3);
1779 assert_eq!(coord.durable_epoch(), Some(5), "should keep max");
1780 coord.mark_epoch_durable(10);
1781 assert_eq!(coord.durable_epoch(), Some(10));
1782}
1783
1784#[test]
1785fn buffer_config_clone_copy_debug() {
1786 let cfg = BufferConfig::default();
1787 let copied = cfg;
1788 assert_eq!(copied, cfg);
1789 let dbg = format!("{cfg:?}");
1790 assert!(dbg.contains("BufferConfig"));
1791}
1792
1793#[test]
1794fn epoch_config_default_and_epoch_flush_batch_total() {
1795 let ecfg = EpochConfig::default();
1796 assert_eq!(ecfg.advance_interval_ms, DEFAULT_EPOCH_ADVANCE_INTERVAL_MS);
1797 let batch = EpochFlushBatch {
1798 epoch: 5,
1799 records: Vec::new(),
1800 records_per_core: vec![0, 0],
1801 };
1802 assert_eq!(batch.total_records(), 0);
1803 assert_eq!(batch.epoch, 5);
1804}
1805
1806#[test]
1807fn epoch_order_coordinator_current_and_append_epoch() {
1808 let coord = EpochOrderCoordinator::new(2, BufferConfig::default(), EpochConfig::default());
1809 assert_eq!(coord.current_epoch(), 0);
1810 assert_eq!(coord.current_append_epoch(), 0);
1811}
1812
1813#[test]
1814fn per_core_buffer_pool_core_count() {
1815 let pool = PerCoreWalBufferPool::new(4, BufferConfig::default());
1816 assert_eq!(pool.core_count(), 4);
1817}