1use std::collections::{HashMap, HashSet, VecDeque};
22use std::env;
23use std::fs::{self, File, OpenOptions};
24use std::hash::BuildHasher;
25use std::io::{self, BufReader, BufWriter, Read, Seek, Write};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::{Arc, Mutex, OnceLock};
29use std::time::Duration;
30
31#[cfg(not(target_arch = "wasm32"))]
32use asupersync::runtime::{BlockingTaskHandle, RuntimeHandle};
33use fsqlite_types::{CommitSeq, PageNumber, TxnToken, cx::Cx, limits};
34
35use crate::group_commit::TransactionFrameBatchContext;
36use crate::per_core_buffer::{
37 AppendOutcome, BufferConfig, DEFAULT_BUFFER_SLOT_COUNT, EpochConfig, EpochFlushBatch,
38 EpochOrderCoordinator, WalRecord, thread_buffer_slot,
39};
40
41#[derive(Debug, Clone, Copy)]
47pub struct ParallelWalConfig {
48 pub slot_count: usize,
50 pub epoch_interval_ms: u64,
52 pub buffer_capacity_bytes: usize,
54}
55
56impl Default for ParallelWalConfig {
57 fn default() -> Self {
58 Self {
59 slot_count: DEFAULT_BUFFER_SLOT_COUNT,
60 epoch_interval_ms: 10,
61 buffer_capacity_bytes: 4 * 1024 * 1024,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
74pub enum ParallelWalOperatingMode {
75 #[default]
76 Auto,
77 Conservative,
78 ShadowCompare,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum ParallelWalOrderedResidue {
92 #[default]
93 CommitCertificateThenPublish,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ParallelWalFallbackReason {
99 OperatorForced,
100 LaneOverflow,
101 CertificateGap,
102 CertificateChecksumMismatch,
103 PublicationMismatch,
104 RecoveryGap,
105 CheckpointConflict,
106 ControllerEvidenceLost,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct ParallelWalControlSurface {
116 pub mode: ParallelWalOperatingMode,
118 pub lane_count_override: Option<usize>,
120 pub helper_lane_budget: Option<usize>,
122 pub max_parallel_commit_bytes: Option<u64>,
124 pub max_flush_delay_ms: Option<u64>,
126 pub shadow_compare_sampling_per_mille: Option<u16>,
128}
129
130impl Default for ParallelWalControlSurface {
131 fn default() -> Self {
132 Self {
133 mode: ParallelWalOperatingMode::Auto,
134 lane_count_override: None,
135 helper_lane_budget: None,
136 max_parallel_commit_bytes: None,
137 max_flush_delay_ms: None,
138 shadow_compare_sampling_per_mille: None,
139 }
140 }
141}
142
143pub const PARALLEL_WAL_LANE_POLICY_VERSION: &str = "thread_slot_v1";
145pub const PARALLEL_WAL_COMPATIBILITY_SELECTOR: &str = "wal_invariant,integrity_check,row_level";
147pub const PARALLEL_WAL_STAGE_SCENARIO_ID: &str = "parallel_wal_lane_stage";
149pub const PARALLEL_WAL_FLUSH_SCENARIO_ID: &str = "parallel_wal_lane_flush";
151const MAX_PARALLEL_WAL_LANE_COUNT: usize = 65_535;
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
157pub enum ParallelWalShadowVerdict {
158 #[default]
159 NotRun,
160 Clean,
161 Diverged,
162}
163
164#[derive(Debug, Clone)]
166pub struct ParallelWalLaneBatch<T> {
167 pub batch_id: u64,
169 pub lane_id: u16,
171 pub staged_frame_count: u32,
173 pub staging_elapsed_ns: u64,
175 pub shadow_verdict: ParallelWalShadowVerdict,
177 pub payload: T,
179}
180
181#[derive(Debug)]
187pub struct ParallelWalLaneStager<T> {
188 control: ParallelWalControlSurface,
189 next_batch_id: AtomicU64,
190 lane_batches: Mutex<HashMap<u16, VecDeque<ParallelWalLaneBatch<T>>>>,
191 lane_backlog_frames: Mutex<HashMap<u16, usize>>,
192}
193
194impl<T> ParallelWalLaneStager<T> {
195 #[must_use]
196 pub fn new(control: ParallelWalControlSurface) -> Self {
197 Self {
198 control,
199 next_batch_id: AtomicU64::new(1),
200 lane_batches: Mutex::new(HashMap::new()),
201 lane_backlog_frames: Mutex::new(HashMap::new()),
202 }
203 }
204
205 #[must_use]
206 pub fn control(&self) -> &ParallelWalControlSurface {
207 &self.control
208 }
209
210 #[must_use]
211 pub fn next_batch_id(&self) -> u64 {
212 self.next_batch_id.fetch_add(1, Ordering::Relaxed)
213 }
214
215 #[must_use]
216 pub fn lane_count(&self) -> usize {
217 match self.control.mode {
218 ParallelWalOperatingMode::Conservative => 1,
219 _ => self
220 .control
221 .lane_count_override
222 .unwrap_or_else(default_parallel_wal_lane_count)
223 .clamp(1, MAX_PARALLEL_WAL_LANE_COUNT),
224 }
225 }
226
227 #[must_use]
228 pub fn current_lane_id(&self) -> u16 {
229 u16::try_from(thread_buffer_slot(self.lane_count()))
230 .expect("lane_count is clamped to the u16 lane-id range")
231 }
232
233 #[must_use]
234 pub fn current_lane_backlog(&self, lane_id: u16) -> usize {
235 self.lane_backlog_frames
236 .lock()
237 .unwrap_or_else(std::sync::PoisonError::into_inner)
238 .get(&lane_id)
239 .copied()
240 .unwrap_or(0)
241 }
242
243 pub fn record_batch(&self, batch: ParallelWalLaneBatch<T>) -> usize {
244 let lane_id = batch.lane_id;
245 let staged_frame_count = usize::try_from(batch.staged_frame_count).unwrap_or(0);
246
247 let mut lane_batches = self
248 .lane_batches
249 .lock()
250 .unwrap_or_else(std::sync::PoisonError::into_inner);
251 let mut lane_backlog = self
252 .lane_backlog_frames
253 .lock()
254 .unwrap_or_else(std::sync::PoisonError::into_inner);
255 lane_batches.entry(lane_id).or_default().push_back(batch);
256 let backlog = lane_backlog.entry(lane_id).or_insert(0);
257 *backlog = backlog.saturating_add(staged_frame_count);
258 *backlog
259 }
260
261 pub fn take_batches_for_flush(
262 &self,
263 contexts: &[TransactionFrameBatchContext],
264 ) -> Option<HashMap<u64, ParallelWalLaneBatch<T>>> {
265 let mut lane_batches = self
266 .lane_batches
267 .lock()
268 .unwrap_or_else(std::sync::PoisonError::into_inner);
269
270 let mut expected_offsets = HashMap::<u16, usize>::new();
271 for context in contexts {
272 let offset = expected_offsets.entry(context.lane_id).or_insert(0);
273 let candidate = lane_batches
274 .get(&context.lane_id)
275 .and_then(|queue| queue.get(*offset))
276 .filter(|candidate| candidate.batch_id == context.batch_id)?;
277 let _ = candidate;
278 *offset = offset.saturating_add(1);
279 }
280
281 let mut by_batch_id = HashMap::with_capacity(contexts.len());
282 let mut lane_backlog = self
283 .lane_backlog_frames
284 .lock()
285 .unwrap_or_else(std::sync::PoisonError::into_inner);
286 for context in contexts {
287 let candidate = lane_batches
288 .get_mut(&context.lane_id)
289 .and_then(VecDeque::pop_front)
290 .expect("verified lane-local batch must still exist");
291 let backlog = lane_backlog.entry(context.lane_id).or_insert(0);
292 *backlog = backlog.saturating_sub(
293 usize::try_from(candidate.staged_frame_count).unwrap_or(usize::MAX),
294 );
295 if *backlog == 0 {
296 lane_backlog.remove(&context.lane_id);
297 }
298 if lane_batches
299 .get(&context.lane_id)
300 .is_some_and(VecDeque::is_empty)
301 {
302 lane_batches.remove(&context.lane_id);
303 }
304 by_batch_id.insert(candidate.batch_id, candidate);
305 }
306
307 Some(by_batch_id)
308 }
309
310 pub fn discard_batches_for_flush(&self, contexts: &[TransactionFrameBatchContext]) -> usize {
320 if contexts.is_empty() {
321 return 0;
322 }
323
324 let discard_ids = contexts
325 .iter()
326 .map(|context| context.batch_id)
327 .collect::<HashSet<_>>();
328 let mut removed_batches = 0_usize;
329 let mut removed_frames_by_lane = HashMap::<u16, usize>::new();
330
331 let mut lane_batches = self
332 .lane_batches
333 .lock()
334 .unwrap_or_else(std::sync::PoisonError::into_inner);
335 for (lane_id, queue) in lane_batches.iter_mut() {
336 let mut retained = VecDeque::with_capacity(queue.len());
337 while let Some(batch) = queue.pop_front() {
338 if discard_ids.contains(&batch.batch_id) {
339 removed_batches = removed_batches.saturating_add(1);
340 let removed_frames =
341 usize::try_from(batch.staged_frame_count).unwrap_or(usize::MAX);
342 let entry = removed_frames_by_lane.entry(*lane_id).or_insert(0);
343 *entry = entry.saturating_add(removed_frames);
344 } else {
345 retained.push_back(batch);
346 }
347 }
348 *queue = retained;
349 }
350 lane_batches.retain(|_, queue| !queue.is_empty());
351
352 if !removed_frames_by_lane.is_empty() {
353 let mut lane_backlog = self
354 .lane_backlog_frames
355 .lock()
356 .unwrap_or_else(std::sync::PoisonError::into_inner);
357 for (lane_id, removed_frames) in removed_frames_by_lane {
358 let backlog = lane_backlog.entry(lane_id).or_insert(0);
359 *backlog = backlog.saturating_sub(removed_frames);
360 if *backlog == 0 {
361 lane_backlog.remove(&lane_id);
362 }
363 }
364 }
365
366 removed_batches
367 }
368}
369
370#[must_use]
371pub fn parallel_wal_mode_name(mode: ParallelWalOperatingMode) -> &'static str {
372 match mode {
373 ParallelWalOperatingMode::Auto => "auto",
374 ParallelWalOperatingMode::Conservative => "conservative",
375 ParallelWalOperatingMode::ShadowCompare => "shadow_compare",
376 }
377}
378
379#[must_use]
380pub fn parallel_wal_fallback_reason_name(
381 reason: Option<ParallelWalFallbackReason>,
382) -> &'static str {
383 match reason {
384 None => "none",
385 Some(ParallelWalFallbackReason::OperatorForced) => "operator_forced",
386 Some(ParallelWalFallbackReason::LaneOverflow) => "lane_overflow",
387 Some(ParallelWalFallbackReason::CertificateGap) => "certificate_gap",
388 Some(ParallelWalFallbackReason::CertificateChecksumMismatch) => {
389 "certificate_checksum_mismatch"
390 }
391 Some(ParallelWalFallbackReason::PublicationMismatch) => "publication_mismatch",
392 Some(ParallelWalFallbackReason::RecoveryGap) => "recovery_gap",
393 Some(ParallelWalFallbackReason::CheckpointConflict) => "checkpoint_conflict",
394 Some(ParallelWalFallbackReason::ControllerEvidenceLost) => "controller_evidence_lost",
395 }
396}
397
398#[must_use]
399pub fn parallel_wal_shadow_verdict_name(verdict: ParallelWalShadowVerdict) -> &'static str {
400 match verdict {
401 ParallelWalShadowVerdict::NotRun => "not_run",
402 ParallelWalShadowVerdict::Clean => "clean",
403 ParallelWalShadowVerdict::Diverged => "diverged",
404 }
405}
406
407#[must_use]
413pub fn parallel_wal_should_shadow_compare(
414 control: &ParallelWalControlSurface,
415 batch_id: u64,
416) -> bool {
417 match control.mode {
418 ParallelWalOperatingMode::Conservative => false,
419 ParallelWalOperatingMode::ShadowCompare => true,
420 ParallelWalOperatingMode::Auto => {
421 control
422 .shadow_compare_sampling_per_mille
423 .is_some_and(|rate| {
424 let rate = u64::from(rate.min(1_000));
425 rate > 0 && batch_id.saturating_sub(1) % 1_000 < rate
426 })
427 }
428 }
429}
430
431#[must_use]
432pub fn default_parallel_wal_lane_count() -> usize {
433 std::thread::available_parallelism()
434 .map(std::num::NonZeroUsize::get)
435 .unwrap_or(1)
436 .max(1)
437}
438
439#[must_use]
440pub fn resolve_parallel_wal_control_surface_from_env() -> ParallelWalControlSurface {
441 let mut control = ParallelWalControlSurface {
442 mode: ParallelWalOperatingMode::Conservative,
445 ..ParallelWalControlSurface::default()
446 };
447
448 if let Ok(mode) = env::var("FSQLITE_PARALLEL_WAL_MODE") {
449 control.mode = match mode.trim().to_ascii_lowercase().as_str() {
450 "auto" => ParallelWalOperatingMode::Auto,
451 "conservative" | "serialized" | "single_lane" => ParallelWalOperatingMode::Conservative,
452 "shadow" | "shadow_compare" => ParallelWalOperatingMode::ShadowCompare,
453 _ => control.mode,
454 };
455 }
456 if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_LANES") {
457 if let Ok(value) = raw.trim().parse::<usize>() {
458 control.lane_count_override = Some(value.max(1));
459 }
460 }
461 if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_MAX_BATCH_BYTES") {
462 if let Ok(value) = raw.trim().parse::<u64>() {
463 control.max_parallel_commit_bytes = Some(value.max(1));
464 }
465 }
466 if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_MAX_FLUSH_DELAY_MS") {
467 if let Ok(value) = raw.trim().parse::<u64>() {
468 control.max_flush_delay_ms = Some(value);
469 }
470 }
471 if let Ok(raw) = env::var("FSQLITE_PARALLEL_WAL_SHADOW_COMPARE_PER_MILLE") {
472 if let Ok(value) = raw.trim().parse::<u16>() {
473 control.shadow_compare_sampling_per_mille = Some(value);
474 }
475 }
476
477 control
478}
479
480#[derive(Debug, Clone, PartialEq, Eq)]
487pub struct ParallelWalCommitCertificate {
488 pub format_version: u16,
489 pub residue: ParallelWalOrderedResidue,
490 pub certificate_epoch: u64,
491 pub commit_seq_lo: CommitSeq,
492 pub commit_seq_hi: CommitSeq,
493 pub durable_segment_epoch: u64,
494 pub lane_count: u16,
495 pub lane_record_counts: Vec<u32>,
496 pub db_size_pages: u32,
497 pub page_set_size: u32,
498 pub certificate_crc32c: u32,
499 pub fallback_active: bool,
500}
501
502#[derive(Debug, Clone, PartialEq, Eq)]
505pub struct ParallelWalTraceRecord {
506 pub component: String,
507 pub trace_id: u64,
508 pub decision_id: Option<u64>,
509 pub mode: ParallelWalOperatingMode,
510 pub lane_id: Option<usize>,
511 pub epoch: Option<u64>,
512 pub commit_seq_lo: Option<CommitSeq>,
513 pub commit_seq_hi: Option<CommitSeq>,
514 pub checkpoint_epoch: Option<u64>,
515 pub recovery_epoch: Option<u64>,
516 pub fallback_active: bool,
517 pub fallback_reason: Option<ParallelWalFallbackReason>,
518 pub policy_id: Option<String>,
519 pub policy_version: Option<String>,
520}
521
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
528pub enum ParallelWalDecisionAction {
529 KeepCurrent,
530 SealEpochNow,
531 IncreaseLaneBudget,
532 DecreaseLaneBudget,
533 ForceConservative,
534}
535
536#[derive(Debug, Clone, PartialEq, Eq)]
538pub struct ParallelWalDecisionRecord {
539 pub policy_id: String,
540 pub policy_version: String,
541 pub decision_id: u64,
542 pub action: ParallelWalDecisionAction,
543 pub confidence_bps: u16,
544 pub expected_loss_micros: u64,
545 pub top_evidence_terms: Vec<String>,
546 pub counterfactual_action: ParallelWalDecisionAction,
547 pub counterfactual_regret_micros: i64,
548 pub fallback_active: bool,
549}
550
551const SEGMENT_MAGIC: u32 = 0x5057_414C; const SEGMENT_VERSION: u16 = 1;
560
561const SEGMENT_HEADER_SIZE: usize = 24;
563
564const SEGMENT_RECORD_MIN_SIZE: usize = 8 + 4 + 8 + 4 + 8 + 1 + 4 + 4;
566
567const MAX_SEGMENT_RECORD_IMAGE_BYTES: usize = limits::MAX_PAGE_SIZE as usize;
569
570const MAX_SEGMENT_RECORD_SIZE: usize =
572 SEGMENT_RECORD_MIN_SIZE + 8 + 2 * MAX_SEGMENT_RECORD_IMAGE_BYTES;
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
576pub enum FsyncPolicy {
577 #[default]
579 Full,
580 Normal,
582 Off,
584}
585
586#[derive(Debug, Clone, Copy)]
598pub struct SegmentHeader {
599 pub epoch: u64,
601 pub record_count: u32,
603}
604
605impl SegmentHeader {
606 #[must_use]
608 pub const fn new(epoch: u64, record_count: u32) -> Self {
609 Self {
610 epoch,
611 record_count,
612 }
613 }
614
615 #[must_use]
617 pub fn to_bytes(&self) -> [u8; SEGMENT_HEADER_SIZE] {
618 let mut buf = [0u8; SEGMENT_HEADER_SIZE];
619 buf[0..4].copy_from_slice(&SEGMENT_MAGIC.to_le_bytes());
620 buf[4..6].copy_from_slice(&SEGMENT_VERSION.to_le_bytes());
621 buf[8..16].copy_from_slice(&self.epoch.to_le_bytes());
623 buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
624 let checksum = crc32c::crc32c(&buf[0..20]);
626 buf[20..24].copy_from_slice(&checksum.to_le_bytes());
627 buf
628 }
629
630 pub fn from_bytes(buf: &[u8; SEGMENT_HEADER_SIZE]) -> Result<Self, String> {
632 let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
633 if magic != SEGMENT_MAGIC {
634 return Err(format!("invalid segment magic: {magic:#x}"));
635 }
636 let version = u16::from_le_bytes([buf[4], buf[5]]);
637 if version != SEGMENT_VERSION {
638 return Err(format!("unsupported segment version: {version}"));
639 }
640 let epoch = u64::from_le_bytes([
641 buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
642 ]);
643 let record_count = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
644 let stored_checksum = u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]);
645 let computed_checksum = crc32c::crc32c(&buf[0..20]);
646 if stored_checksum != computed_checksum {
647 return Err(format!(
648 "segment header checksum mismatch: stored={stored_checksum:#x}, computed={computed_checksum:#x}"
649 ));
650 }
651 Ok(Self {
652 epoch,
653 record_count,
654 })
655 }
656}
657
658#[must_use]
660pub fn segment_path(db_path: &Path, epoch: u64) -> PathBuf {
661 let mut path = db_path.to_path_buf();
662 let file_name = path
663 .file_name()
664 .map_or_else(|| "db".to_string(), |n| n.to_string_lossy().to_string());
665 path.set_file_name(format!("{file_name}-wal-seg-{epoch:016x}"));
666 path
667}
668
669pub fn list_segments(db_path: &Path) -> io::Result<Vec<(u64, PathBuf)>> {
671 let dir = db_path.parent().unwrap_or_else(|| Path::new("."));
672 let db_name = db_path
673 .file_name()
674 .map_or_else(|| "db".to_string(), |n| n.to_string_lossy().to_string());
675 let prefix = format!("{db_name}-wal-seg-");
676
677 let mut segments = Vec::new();
678 for entry in fs::read_dir(dir)? {
679 let entry = entry?;
680 let name = entry.file_name();
681 let name_str = name.to_string_lossy();
682 if let Some(epoch_hex) = name_str.strip_prefix(&prefix) {
683 if let Ok(epoch) = u64::from_str_radix(epoch_hex, 16) {
684 segments.push((epoch, entry.path()));
685 }
686 }
687 }
688 segments.sort_by_key(|(epoch, _)| *epoch);
689 Ok(segments)
690}
691
692pub fn write_segment(
700 db_path: &Path,
701 batch: &EpochFlushBatch,
702 fsync_policy: FsyncPolicy,
703) -> io::Result<usize> {
704 let path = segment_path(db_path, batch.epoch);
705
706 let ordered_records = ordered_segment_records(batch.epoch, &batch.records)?;
707 for record in &ordered_records {
708 validate_segment_record_images(record)
709 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
710 }
711 let record_count = u32::try_from(ordered_records.len()).map_err(|_| {
712 io::Error::new(
713 io::ErrorKind::InvalidInput,
714 format!(
715 "segment record count {} exceeds u32 header field",
716 ordered_records.len()
717 ),
718 )
719 })?;
720
721 let file = OpenOptions::new()
722 .write(true)
723 .create(true)
724 .truncate(true)
725 .open(&path)?;
726 let mut writer = BufWriter::new(file);
727
728 let header = SegmentHeader::new(batch.epoch, record_count);
730 let header_bytes = header.to_bytes();
731 writer.write_all(&header_bytes)?;
732 let mut total_bytes = SEGMENT_HEADER_SIZE;
733
734 for record in &ordered_records {
736 let record_bytes =
737 serialize_record(record).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
738 let len = u32::try_from(record_bytes.len()).map_err(|_| {
739 io::Error::new(
740 io::ErrorKind::InvalidInput,
741 format!(
742 "segment record length {} exceeds u32 length prefix",
743 record_bytes.len()
744 ),
745 )
746 })?;
747 writer.write_all(&len.to_le_bytes())?;
748 writer.write_all(&record_bytes)?;
749 total_bytes += 4 + record_bytes.len();
750 }
751
752 writer.flush()?;
753
754 if fsync_policy == FsyncPolicy::Full || fsync_policy == FsyncPolicy::Normal {
756 writer.get_ref().sync_all()?;
757 }
758
759 Ok(total_bytes)
760}
761
762pub fn read_segment(path: &Path) -> io::Result<(SegmentHeader, Vec<WalRecord>)> {
764 let file = File::open(path)?;
765 let mut reader = BufReader::new(file);
766
767 let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
769 reader.read_exact(&mut header_buf)?;
770 let header = SegmentHeader::from_bytes(&header_buf)
771 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
772
773 let file_len = reader.get_ref().metadata()?.len();
774 let body_len = file_len.saturating_sub(SEGMENT_HEADER_SIZE as u64);
775 let min_record_on_disk_len = u64::try_from(4 + SEGMENT_RECORD_MIN_SIZE).map_err(|_| {
776 io::Error::new(
777 io::ErrorKind::InvalidData,
778 "minimum segment record length exceeds u64",
779 )
780 })?;
781 let max_possible_records = body_len / min_record_on_disk_len;
782 if u64::from(header.record_count) > max_possible_records {
783 return Err(io::Error::new(
784 io::ErrorKind::InvalidData,
785 format!(
786 "segment record count {} exceeds maximum possible {} for file length {}",
787 header.record_count, max_possible_records, file_len
788 ),
789 ));
790 }
791
792 let record_capacity = usize::try_from(header.record_count).map_err(|_| {
794 io::Error::new(
795 io::ErrorKind::InvalidData,
796 format!(
797 "segment record count {} exceeds addressable size",
798 header.record_count
799 ),
800 )
801 })?;
802 let mut records = Vec::with_capacity(record_capacity);
803 for _ in 0..header.record_count {
804 let mut len_buf = [0u8; 4];
805 reader.read_exact(&mut len_buf)?;
806 let len = u32::from_le_bytes(len_buf) as usize;
807 if len > MAX_SEGMENT_RECORD_SIZE {
808 return Err(io::Error::new(
809 io::ErrorKind::InvalidData,
810 format!("segment record length {len} exceeds maximum {MAX_SEGMENT_RECORD_SIZE}"),
811 ));
812 }
813
814 let mut record_buf = vec![0u8; len];
815 reader.read_exact(&mut record_buf)?;
816 let record = deserialize_record(&record_buf)
817 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
818 records.push(record);
819 }
820 let consumed_len = reader.stream_position()?;
821 if consumed_len != file_len {
822 return Err(io::Error::new(
823 io::ErrorKind::InvalidData,
824 format!(
825 "segment has {} trailing bytes after declared records",
826 file_len.saturating_sub(consumed_len)
827 ),
828 ));
829 }
830
831 Ok((header, ordered_segment_records(header.epoch, &records)?))
832}
833
834pub fn delete_segment(path: &Path) -> io::Result<()> {
836 fs::remove_file(path)
837}
838
839#[derive(Debug, Clone)]
845pub struct SegmentRecoveryResult {
846 pub segments_recovered: usize,
848 pub records_applied: usize,
850 pub bytes_read: u64,
852 pub epochs: Vec<u64>,
854 pub partial_segments: Vec<PathBuf>,
856}
857
858#[derive(Debug, Clone, Copy, Default)]
860pub struct SegmentRecoveryOptions {
861 pub delete_after_recovery: bool,
863 pub skip_corrupt: bool,
866}
867
868pub fn recover_segments(
879 db_path: &Path,
880 options: SegmentRecoveryOptions,
881) -> io::Result<(SegmentRecoveryResult, Vec<WalRecord>)> {
882 let segments = list_segments(db_path)?;
883
884 let mut result = SegmentRecoveryResult {
885 segments_recovered: 0,
886 records_applied: 0,
887 bytes_read: 0,
888 epochs: Vec::with_capacity(segments.len()),
889 partial_segments: Vec::new(),
890 };
891
892 let mut all_records = Vec::new();
893
894 for (segment_index, (epoch, path)) in segments.iter().enumerate() {
895 let metadata = fs::metadata(path)?;
897 let file_size = metadata.len();
898
899 match read_segment(path) {
901 Ok((header, records)) => {
902 if header.epoch != *epoch {
903 let error = io::Error::new(
904 io::ErrorKind::InvalidData,
905 format!(
906 "segment {} has mismatched epoch: header={}, filename={}",
907 path.display(),
908 header.epoch,
909 epoch
910 ),
911 );
912 if options.skip_corrupt {
913 eprintln!(
914 "warning: stopping recovery at corrupt segment {}: {error}",
915 path.display()
916 );
917 result.partial_segments.extend(
918 segments[segment_index..]
919 .iter()
920 .map(|(_, path)| path.clone()),
921 );
922 break;
923 }
924 return Err(error);
925 }
926
927 result.segments_recovered += 1;
928 result.records_applied += records.len();
929 result.bytes_read += file_size;
930 result.epochs.push(*epoch);
931
932 all_records.extend(records);
933 }
934 Err(e) => {
935 if options.skip_corrupt {
936 eprintln!(
937 "warning: stopping recovery at corrupt segment {}: {e}",
938 path.display()
939 );
940 result.partial_segments.extend(
941 segments[segment_index..]
942 .iter()
943 .map(|(_, path)| path.clone()),
944 );
945 break;
946 }
947 return Err(e);
948 }
949 }
950 }
951
952 if options.delete_after_recovery {
954 for (_, path) in &segments {
955 if result.partial_segments.contains(path) {
957 continue;
958 }
959 if let Err(e) = delete_segment(path) {
960 eprintln!("warning: failed to delete segment {}: {e}", path.display());
961 }
962 }
963 }
964
965 Ok((result, EpochOrderCoordinator::recovery_order(&all_records)))
966}
967
968fn ordered_segment_records(epoch: u64, records: &[WalRecord]) -> io::Result<Vec<WalRecord>> {
969 let ordered = EpochOrderCoordinator::recovery_order(records);
970 if let Some(record) = ordered.iter().find(|record| record.epoch != epoch) {
971 return Err(io::Error::new(
972 io::ErrorKind::InvalidData,
973 format!(
974 "segment epoch {epoch} contains record from epoch {}",
975 record.epoch
976 ),
977 ));
978 }
979 Ok(ordered)
980}
981
982pub fn recover_and_apply_segments(
991 db_path: &Path,
992 page_contents: &mut HashMap<u32, Vec<u8>, impl BuildHasher>,
993 options: SegmentRecoveryOptions,
994) -> io::Result<SegmentRecoveryResult> {
995 let (result, records) = recover_segments(db_path, options)?;
996
997 for record in records {
999 let page_id = record.page_id.get();
1000 if !record.after_image.is_empty() {
1001 page_contents.insert(page_id, record.after_image);
1002 }
1003 }
1004
1005 Ok(result)
1006}
1007
1008pub fn max_durable_epoch(db_path: &Path) -> io::Result<Option<u64>> {
1013 let segments = list_segments(db_path)?;
1014 Ok(segments.last().map(|(epoch, _)| *epoch))
1015}
1016
1017pub fn cleanup_segments(db_path: &Path) -> io::Result<usize> {
1021 let segments = list_segments(db_path)?;
1022 let count = segments.len();
1023 for (_, path) in segments {
1024 delete_segment(&path)?;
1025 }
1026 Ok(count)
1027}
1028
1029fn serialize_record(record: &WalRecord) -> Result<Vec<u8>, String> {
1031 validate_segment_record_images(record)?;
1044 let before_len = u32::try_from(record.before_image.len())
1045 .map_err(|_| "before_image length exceeds u32 length prefix".to_string())?;
1046 let after_len = u32::try_from(record.after_image.len())
1047 .map_err(|_| "after_image length exceeds u32 length prefix".to_string())?;
1048
1049 let mut buf = Vec::with_capacity(64 + record.before_image.len() + record.after_image.len());
1050
1051 buf.extend_from_slice(&record.txn_token.id.get().to_le_bytes());
1052 buf.extend_from_slice(&record.txn_token.epoch.get().to_le_bytes());
1053 buf.extend_from_slice(&record.epoch.to_le_bytes());
1054 buf.extend_from_slice(&record.page_id.get().to_le_bytes());
1055 buf.extend_from_slice(&record.begin_seq.get().to_le_bytes());
1056 if let Some(end_seq) = record.end_seq {
1057 buf.push(1);
1058 buf.extend_from_slice(&end_seq.get().to_le_bytes());
1059 } else {
1060 buf.push(0);
1061 }
1062 buf.extend_from_slice(&before_len.to_le_bytes());
1063 buf.extend_from_slice(&record.before_image);
1064 buf.extend_from_slice(&after_len.to_le_bytes());
1065 buf.extend_from_slice(&record.after_image);
1066
1067 Ok(buf)
1068}
1069
1070fn validate_segment_record_images(record: &WalRecord) -> Result<(), String> {
1071 validate_segment_image_len("before_image", record.before_image.len())?;
1072 validate_segment_image_len("after_image", record.after_image.len())
1073}
1074
1075fn validate_segment_image_len(field: &'static str, len: usize) -> Result<(), String> {
1076 if len > MAX_SEGMENT_RECORD_IMAGE_BYTES {
1077 return Err(format!(
1078 "{field} length {len} exceeds maximum {MAX_SEGMENT_RECORD_IMAGE_BYTES}"
1079 ));
1080 }
1081 Ok(())
1082}
1083
1084fn read_record_bytes<'a>(
1085 buf: &'a [u8],
1086 offset: &mut usize,
1087 len: usize,
1088 field: &'static str,
1089) -> Result<&'a [u8], String> {
1090 let end = offset
1091 .checked_add(len)
1092 .ok_or_else(|| format!("{field} offset overflow"))?;
1093 let bytes = buf
1094 .get(*offset..end)
1095 .ok_or_else(|| format!("{field} truncated"))?;
1096 *offset = end;
1097 Ok(bytes)
1098}
1099
1100fn read_record_u32(buf: &[u8], offset: &mut usize, field: &'static str) -> Result<u32, String> {
1101 let bytes = read_record_bytes(buf, offset, 4, field)?;
1102 Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
1103}
1104
1105fn read_record_u64(buf: &[u8], offset: &mut usize, field: &'static str) -> Result<u64, String> {
1106 let bytes = read_record_bytes(buf, offset, 8, field)?;
1107 Ok(u64::from_le_bytes([
1108 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1109 ]))
1110}
1111
1112fn deserialize_record(buf: &[u8]) -> Result<WalRecord, String> {
1114 if buf.len() < SEGMENT_RECORD_MIN_SIZE {
1115 return Err("record too short".to_string());
1116 }
1117
1118 let mut offset = 0;
1119
1120 let txn_id = read_record_u64(buf, &mut offset, "txn_id")?;
1121 let txn_epoch = read_record_u32(buf, &mut offset, "txn_epoch")?;
1122 let record_epoch = read_record_u64(buf, &mut offset, "record_epoch")?;
1123 let page_id = read_record_u32(buf, &mut offset, "page_id")?;
1124 let begin_seq = read_record_u64(buf, &mut offset, "begin_seq")?;
1125 let has_end_seq = *read_record_bytes(buf, &mut offset, 1, "end_seq flag")?
1126 .first()
1127 .ok_or_else(|| "end_seq flag truncated".to_string())?;
1128 let end_seq = if has_end_seq == 1 {
1129 let seq = read_record_u64(buf, &mut offset, "end_seq")?;
1130 Some(CommitSeq::new(seq))
1131 } else if has_end_seq == 0 {
1132 None
1133 } else {
1134 return Err(format!("invalid end_seq flag: {has_end_seq}"));
1135 };
1136 let before_len = read_record_u32(buf, &mut offset, "before_image length")? as usize;
1137 validate_segment_image_len("before_image", before_len)?;
1138 let before_image = read_record_bytes(buf, &mut offset, before_len, "before_image")?.to_vec();
1139 let after_len = read_record_u32(buf, &mut offset, "after_image length")? as usize;
1140 validate_segment_image_len("after_image", after_len)?;
1141 let after_image = read_record_bytes(buf, &mut offset, after_len, "after_image")?.to_vec();
1142 if offset != buf.len() {
1143 return Err(format!(
1144 "trailing bytes after WAL record: {}",
1145 buf.len().saturating_sub(offset)
1146 ));
1147 }
1148
1149 let txn_id = fsqlite_types::TxnId::new(txn_id).ok_or("invalid txn_id (zero)")?;
1150 let page_id = PageNumber::new(page_id).ok_or("invalid page_id (zero)")?;
1151
1152 Ok(WalRecord {
1153 txn_token: TxnToken::new(txn_id, fsqlite_types::TxnEpoch::new(txn_epoch)),
1154 epoch: record_epoch,
1155 page_id,
1156 begin_seq: CommitSeq::new(begin_seq),
1157 end_seq,
1158 before_image,
1159 after_image,
1160 })
1161}
1162
1163#[derive(Debug, Clone)]
1169pub struct ParallelWalFrame {
1170 pub page_number: PageNumber,
1172 pub page_data: Vec<u8>,
1174 pub db_size_if_commit: u32,
1176}
1177
1178#[derive(Debug, Clone)]
1180pub struct ParallelWalBatch {
1181 pub txn_token: TxnToken,
1183 pub commit_seq: CommitSeq,
1185 pub frames: Vec<ParallelWalFrame>,
1187}
1188
1189impl ParallelWalBatch {
1190 #[must_use]
1192 pub fn new(txn_token: TxnToken, commit_seq: CommitSeq, frames: Vec<ParallelWalFrame>) -> Self {
1193 Self {
1194 txn_token,
1195 commit_seq,
1196 frames,
1197 }
1198 }
1199}
1200
1201pub struct ParallelWalCoordinator {
1210 inner: Arc<EpochOrderCoordinator>,
1212 db_path: PathBuf,
1214 config: ParallelWalConfig,
1216 running: Arc<AtomicBool>,
1218 pending_batches: Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1220 ticker_cx: Mutex<Option<Cx>>,
1222 #[cfg(not(target_arch = "wasm32"))]
1224 ticker_handle: Mutex<Option<BlockingTaskHandle>>,
1225}
1226
1227impl std::fmt::Debug for ParallelWalCoordinator {
1228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229 f.debug_struct("ParallelWalCoordinator")
1230 .field("db_path", &self.db_path)
1231 .field("config", &self.config)
1232 .field("running", &self.running.load(Ordering::Relaxed))
1233 .finish_non_exhaustive()
1234 }
1235}
1236
1237impl ParallelWalCoordinator {
1238 #[must_use]
1240 pub fn new(db_path: &Path, config: ParallelWalConfig) -> Self {
1241 let buffer_config = BufferConfig {
1242 capacity_bytes: config.buffer_capacity_bytes,
1243 ..BufferConfig::default()
1244 };
1245 let epoch_config = EpochConfig {
1246 advance_interval_ms: config.epoch_interval_ms,
1247 };
1248
1249 Self {
1250 inner: Arc::new(EpochOrderCoordinator::new(
1251 config.slot_count,
1252 buffer_config,
1253 epoch_config,
1254 )),
1255 db_path: db_path.to_path_buf(),
1256 config,
1257 running: Arc::new(AtomicBool::new(false)),
1258 pending_batches: Arc::new(Mutex::new(VecDeque::new())),
1259 ticker_cx: Mutex::new(None),
1260 #[cfg(not(target_arch = "wasm32"))]
1261 ticker_handle: Mutex::new(None),
1262 }
1263 }
1264
1265 #[must_use]
1267 pub fn current_epoch(&self) -> u64 {
1268 self.inner.current_epoch()
1269 }
1270
1271 #[must_use]
1273 pub fn durable_epoch(&self) -> Option<u64> {
1274 self.inner.durable_epoch()
1275 }
1276
1277 #[must_use]
1279 pub fn thread_slot(&self) -> usize {
1280 thread_buffer_slot(self.config.slot_count)
1281 }
1282
1283 pub fn submit_batch(&self, batch: ParallelWalBatch) -> Result<u64, String> {
1290 let slot = self.thread_slot();
1291 let epoch = self.inner.current_append_epoch();
1292 let records = batch
1293 .frames
1294 .into_iter()
1295 .map(|frame| WalRecord {
1296 txn_token: batch.txn_token,
1297 epoch,
1298 page_id: frame.page_number,
1299 begin_seq: batch.commit_seq,
1300 end_seq: Some(batch.commit_seq),
1301 before_image: Vec::new(), after_image: frame.page_data,
1303 })
1304 .collect();
1305
1306 let outcome = self.inner.append_records_to_core(slot, records)?;
1307 if matches!(outcome, AppendOutcome::Blocked) {
1308 return Err("buffer blocked, fallback to serialized path".to_string());
1309 }
1310
1311 Ok(epoch)
1312 }
1313
1314 pub fn wait_for_epoch_durable(&self, epoch: u64, timeout: Duration) -> Result<(), String> {
1319 self.inner.wait_until_epoch_durable(epoch, timeout)
1320 }
1321
1322 #[cfg(not(target_arch = "wasm32"))]
1328 pub fn start_on_runtime(&self, runtime: &RuntimeHandle, parent_cx: &Cx) -> Result<(), String> {
1329 self.start_on_runtime_with_fsync(runtime, parent_cx, FsyncPolicy::default())
1330 }
1331
1332 #[cfg(not(target_arch = "wasm32"))]
1334 pub fn start_on_runtime_with_fsync(
1335 &self,
1336 runtime: &RuntimeHandle,
1337 parent_cx: &Cx,
1338 fsync_policy: FsyncPolicy,
1339 ) -> Result<(), String> {
1340 if self.running.load(Ordering::Acquire) {
1341 return Err("coordinator already running".to_string());
1342 }
1343
1344 let prior_ticker_cx = self
1345 .ticker_cx
1346 .lock()
1347 .unwrap_or_else(std::sync::PoisonError::into_inner)
1348 .take();
1349 if let Some(ticker_cx) = prior_ticker_cx {
1350 ticker_cx.cancel();
1351 }
1352 let prior_handle = self
1353 .ticker_handle
1354 .lock()
1355 .unwrap_or_else(std::sync::PoisonError::into_inner)
1356 .take();
1357 if let Some(handle) = prior_handle {
1358 handle.wait();
1359 }
1360 if self
1361 .running
1362 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
1363 .is_err()
1364 {
1365 return Err("coordinator already running".to_string());
1366 }
1367
1368 let ticker_cx = parent_cx.create_child();
1369
1370 let running = Arc::clone(&self.running);
1372 let inner = Arc::clone(&self.inner);
1373 let db_path = self.db_path.clone();
1374 let pending_batches = Arc::clone(&self.pending_batches);
1375 let interval = Duration::from_millis(self.config.epoch_interval_ms);
1376 let flush_timeout = Duration::from_millis(self.config.epoch_interval_ms * 10);
1377 let loop_cx = ticker_cx.clone();
1378
1379 let Some(handle) = runtime.spawn_blocking(move || {
1380 epoch_ticker_loop(
1381 running,
1382 inner,
1383 db_path,
1384 pending_batches,
1385 interval,
1386 flush_timeout,
1387 fsync_policy,
1388 loop_cx,
1389 );
1390 }) else {
1391 self.running.store(false, Ordering::Release);
1392 return Err(
1393 "failed to spawn epoch ticker task: runtime has no blocking pool".to_string(),
1394 );
1395 };
1396
1397 *self
1398 .ticker_cx
1399 .lock()
1400 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(ticker_cx);
1401
1402 let mut ticker_handle = self
1403 .ticker_handle
1404 .lock()
1405 .unwrap_or_else(std::sync::PoisonError::into_inner);
1406 *ticker_handle = Some(handle);
1407
1408 Ok(())
1409 }
1410
1411 pub fn stop(&self) {
1416 self.running.store(false, Ordering::Release);
1417 let prior_ticker_cx = self
1418 .ticker_cx
1419 .lock()
1420 .unwrap_or_else(std::sync::PoisonError::into_inner)
1421 .take();
1422 if let Some(ticker_cx) = prior_ticker_cx {
1423 ticker_cx.cancel();
1424 }
1425
1426 #[cfg(not(target_arch = "wasm32"))]
1427 let mut handle = self
1428 .ticker_handle
1429 .lock()
1430 .unwrap_or_else(std::sync::PoisonError::into_inner);
1431 #[cfg(not(target_arch = "wasm32"))]
1432 if let Some(h) = handle.take() {
1433 h.wait();
1434 }
1435 }
1436
1437 #[must_use]
1439 pub fn is_running(&self) -> bool {
1440 self.running.load(Ordering::Acquire)
1441 }
1442
1443 pub fn advance_and_flush(&self, timeout: Duration) -> Result<u64, String> {
1447 flush_pending_batches(
1448 &self.pending_batches,
1449 &self.inner,
1450 &self.db_path,
1451 FsyncPolicy::default(),
1452 )?;
1453
1454 let new_epoch = self.inner.advance_epoch_and_wait(&[], timeout)?;
1457
1458 let prev_epoch = new_epoch.saturating_sub(1);
1459 let batch = self.inner.flush_epoch(prev_epoch)?;
1460 if batch.records.is_empty() {
1461 self.inner.mark_epoch_durable(prev_epoch);
1462 } else {
1463 enqueue_flush_batch(&self.pending_batches, batch);
1464 flush_pending_batches(
1465 &self.pending_batches,
1466 &self.inner,
1467 &self.db_path,
1468 FsyncPolicy::default(),
1469 )?;
1470 }
1471
1472 Ok(new_epoch)
1473 }
1474}
1475
1476impl Drop for ParallelWalCoordinator {
1477 fn drop(&mut self) {
1478 self.stop();
1479 }
1480}
1481
1482fn enqueue_flush_batch(
1483 pending_batches: &Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1484 batch: EpochFlushBatch,
1485) {
1486 let mut pending = pending_batches
1487 .lock()
1488 .unwrap_or_else(std::sync::PoisonError::into_inner);
1489 pending.push_back(batch);
1490}
1491
1492fn flush_pending_batches(
1493 pending_batches: &Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1494 inner: &EpochOrderCoordinator,
1495 db_path: &Path,
1496 fsync_policy: FsyncPolicy,
1497) -> Result<(), String> {
1498 loop {
1499 let next_batch = {
1500 let mut pending = pending_batches
1501 .lock()
1502 .unwrap_or_else(std::sync::PoisonError::into_inner);
1503 pending.pop_front()
1504 };
1505
1506 let Some(batch) = next_batch else {
1507 return Ok(());
1508 };
1509
1510 if let Err(error) = write_segment(db_path, &batch, fsync_policy) {
1511 let epoch = batch.epoch;
1512 let mut pending = pending_batches
1513 .lock()
1514 .unwrap_or_else(std::sync::PoisonError::into_inner);
1515 pending.push_front(batch);
1516 return Err(format!("write_segment({epoch}) failed: {error}"));
1517 }
1518
1519 inner.mark_epoch_durable(batch.epoch);
1520 }
1521}
1522
1523#[allow(clippy::too_many_arguments)]
1539fn epoch_ticker_loop(
1540 running: Arc<AtomicBool>,
1541 inner: Arc<EpochOrderCoordinator>,
1542 db_path: PathBuf,
1543 pending_batches: Arc<Mutex<VecDeque<EpochFlushBatch>>>,
1544 interval: Duration,
1545 flush_timeout: Duration,
1546 fsync_policy: FsyncPolicy,
1547 ticker_cx: Cx,
1548) {
1549 while running.load(Ordering::Acquire) {
1550 if ticker_cx.checkpoint().is_err() {
1551 break;
1552 }
1553
1554 std::thread::sleep(interval);
1556
1557 if !running.load(Ordering::Acquire) || ticker_cx.is_cancel_requested() {
1559 break;
1560 }
1561
1562 if let Err(error) = flush_pending_batches(&pending_batches, &inner, &db_path, fsync_policy)
1563 {
1564 eprintln!("epoch ticker: {error}");
1565 continue;
1566 }
1567
1568 match inner.advance_epoch_and_wait(&[], flush_timeout) {
1571 Ok(new_epoch) => {
1572 let prev_epoch = new_epoch.saturating_sub(1);
1573 match inner.flush_epoch(prev_epoch) {
1574 Ok(batch) => {
1575 if batch.records.is_empty() {
1576 inner.mark_epoch_durable(prev_epoch);
1577 } else {
1578 enqueue_flush_batch(&pending_batches, batch);
1579 if let Err(error) = flush_pending_batches(
1580 &pending_batches,
1581 &inner,
1582 &db_path,
1583 fsync_policy,
1584 ) {
1585 eprintln!("epoch ticker: {error}");
1586 }
1587 }
1588 }
1589 Err(error) => {
1590 eprintln!("epoch ticker: flush_epoch({prev_epoch}) failed: {error}");
1591 }
1592 }
1593 }
1594 Err(error) => {
1595 eprintln!("epoch ticker: advance_epoch_and_wait failed: {error}");
1596 }
1597 }
1598 }
1599
1600 running.store(false, Ordering::Release);
1601}
1602
1603type CoordinatorRef = Arc<ParallelWalCoordinator>;
1608
1609static PARALLEL_WAL_COORDINATORS: OnceLock<Mutex<HashMap<PathBuf, CoordinatorRef>>> =
1610 OnceLock::new();
1611
1612pub fn parallel_wal_coordinator_for_path(db_path: &Path) -> CoordinatorRef {
1614 let coordinators = PARALLEL_WAL_COORDINATORS.get_or_init(|| Mutex::new(HashMap::new()));
1615 let mut coordinators = coordinators
1616 .lock()
1617 .unwrap_or_else(std::sync::PoisonError::into_inner);
1618
1619 Arc::clone(
1620 coordinators
1621 .entry(db_path.to_path_buf())
1622 .or_insert_with(|| {
1623 Arc::new(ParallelWalCoordinator::new(
1624 db_path,
1625 ParallelWalConfig::default(),
1626 ))
1627 }),
1628 )
1629}
1630
1631pub fn remove_parallel_wal_coordinator(db_path: &Path) {
1633 if let Some(coordinators) = PARALLEL_WAL_COORDINATORS.get() {
1634 let mut coordinators = coordinators
1635 .lock()
1636 .unwrap_or_else(std::sync::PoisonError::into_inner);
1637 if let Some(coordinator) = coordinators.remove(db_path) {
1638 coordinator.stop();
1639 }
1640 }
1641}
1642
1643#[cfg(test)]
1648mod tests {
1649 use super::*;
1650 use asupersync::runtime::RuntimeBuilder;
1651 use std::path::PathBuf;
1652 use std::sync::{LazyLock, Mutex, MutexGuard};
1653
1654 use crate::per_core_buffer::reset_slot_counter;
1655
1656 static PARALLEL_WAL_LANE_TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
1657
1658 fn lane_test_guard() -> MutexGuard<'static, ()> {
1659 PARALLEL_WAL_LANE_TEST_LOCK
1660 .lock()
1661 .unwrap_or_else(std::sync::PoisonError::into_inner)
1662 }
1663
1664 fn test_runtime() -> asupersync::runtime::Runtime {
1665 RuntimeBuilder::current_thread()
1666 .blocking_threads(1, 1)
1667 .build()
1668 .expect("runtime should build")
1669 }
1670
1671 fn test_cx() -> Cx {
1672 Cx::default()
1673 }
1674
1675 fn sample_batch(txn_id: u64, commit_seq: u64) -> ParallelWalBatch {
1676 ParallelWalBatch::new(
1677 TxnToken::new(
1678 fsqlite_types::TxnId::new(txn_id).expect("txn id should be non-zero"),
1679 fsqlite_types::TxnEpoch::new(0),
1680 ),
1681 CommitSeq::new(commit_seq),
1682 vec![
1683 ParallelWalFrame {
1684 page_number: PageNumber::new(7).expect("page should be non-zero"),
1685 page_data: vec![0xAA; 16],
1686 db_size_if_commit: 0,
1687 },
1688 ParallelWalFrame {
1689 page_number: PageNumber::new(9).expect("page should be non-zero"),
1690 page_data: vec![0xBB; 24],
1691 db_size_if_commit: 12,
1692 },
1693 ],
1694 )
1695 }
1696
1697 fn sample_lane_batch(
1698 batch_id: u64,
1699 lane_id: u16,
1700 staged_frame_count: u32,
1701 payload: u32,
1702 ) -> ParallelWalLaneBatch<u32> {
1703 ParallelWalLaneBatch {
1704 batch_id,
1705 lane_id,
1706 staged_frame_count,
1707 staging_elapsed_ns: u64::from(staged_frame_count) * 10,
1708 shadow_verdict: ParallelWalShadowVerdict::NotRun,
1709 payload,
1710 }
1711 }
1712
1713 fn sample_lane_context(batch_id: u64, lane_id: u16) -> TransactionFrameBatchContext {
1714 TransactionFrameBatchContext {
1715 batch_id,
1716 lane_id,
1717 staged_frame_count: 1,
1718 staging_elapsed_ns: 10,
1719 }
1720 }
1721
1722 #[test]
1723 fn test_parallel_wal_coordinator_creation() {
1724 let path = PathBuf::from("/tmp/test.db");
1725 let coordinator = ParallelWalCoordinator::new(&path, ParallelWalConfig::default());
1726
1727 assert_eq!(coordinator.current_epoch(), 0);
1728 assert_eq!(coordinator.durable_epoch(), None);
1729 }
1730
1731 #[test]
1732 fn test_thread_slot_assignment() {
1733 let _guard = lane_test_guard();
1734 let path = PathBuf::from("/tmp/test.db");
1735 let config = ParallelWalConfig {
1736 slot_count: 4,
1737 ..ParallelWalConfig::default()
1738 };
1739 let coordinator = ParallelWalCoordinator::new(&path, config);
1740
1741 let slot1 = coordinator.thread_slot();
1743 let slot2 = coordinator.thread_slot();
1744 assert_eq!(slot1, slot2);
1745 assert!(slot1 < 4);
1746 }
1747
1748 #[test]
1749 fn test_lane_stager_identity_is_stable_within_thread() {
1750 let _guard = lane_test_guard();
1751 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1752 mode: ParallelWalOperatingMode::Auto,
1753 lane_count_override: Some(4),
1754 ..ParallelWalControlSurface::default()
1755 });
1756
1757 let first = stager.current_lane_id();
1758 let second = stager.current_lane_id();
1759 assert_eq!(first, second);
1760 assert!(usize::from(first) < 4);
1761 }
1762
1763 #[test]
1764 fn test_lane_stager_reuses_lanes_after_worker_churn() {
1765 let _guard = lane_test_guard();
1766 reset_slot_counter();
1767
1768 let stager = Arc::new(ParallelWalLaneStager::<u32>::new(
1769 ParallelWalControlSurface {
1770 mode: ParallelWalOperatingMode::Auto,
1771 lane_count_override: Some(2),
1772 ..ParallelWalControlSurface::default()
1773 },
1774 ));
1775
1776 let spawn_wave = || {
1777 let mut lanes = Vec::new();
1778 for _ in 0..2 {
1779 let stager = Arc::clone(&stager);
1780 lanes.push(std::thread::spawn(move || stager.current_lane_id()));
1781 }
1782 let mut observed = lanes
1783 .into_iter()
1784 .map(|handle| handle.join().expect("lane thread should join"))
1785 .collect::<Vec<_>>();
1786 observed.sort_unstable();
1787 observed
1788 };
1789
1790 assert_eq!(spawn_wave(), vec![0, 1]);
1791 assert_eq!(spawn_wave(), vec![0, 1]);
1792 }
1793
1794 #[test]
1795 fn test_lane_stager_conservative_mode_collapses_to_single_lane() {
1796 let _guard = lane_test_guard();
1797 let stager = Arc::new(ParallelWalLaneStager::<u32>::new(
1798 ParallelWalControlSurface {
1799 mode: ParallelWalOperatingMode::Conservative,
1800 lane_count_override: Some(8),
1801 ..ParallelWalControlSurface::default()
1802 },
1803 ));
1804
1805 assert_eq!(stager.lane_count(), 1);
1806
1807 let mut lanes = Vec::new();
1808 for _ in 0..2 {
1809 let stager = Arc::clone(&stager);
1810 lanes.push(std::thread::spawn(move || stager.current_lane_id()));
1811 }
1812
1813 let observed = lanes
1814 .into_iter()
1815 .map(|handle| handle.join().expect("lane thread should join"))
1816 .collect::<Vec<_>>();
1817 assert_eq!(observed, vec![0, 0]);
1818 }
1819
1820 #[test]
1821 fn test_lane_stager_clamps_lane_count_to_lane_id_range() {
1822 let _guard = lane_test_guard();
1823 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1824 mode: ParallelWalOperatingMode::Auto,
1825 lane_count_override: Some(MAX_PARALLEL_WAL_LANE_COUNT + 1),
1826 ..ParallelWalControlSurface::default()
1827 });
1828
1829 assert_eq!(stager.lane_count(), MAX_PARALLEL_WAL_LANE_COUNT);
1830 assert_eq!(stager.lane_count(), usize::from(u16::MAX));
1831 assert!(usize::from(stager.current_lane_id()) < stager.lane_count());
1832 }
1833
1834 #[test]
1835 fn test_lane_stager_same_lane_order_mismatch_returns_none_without_drain() {
1836 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1837 mode: ParallelWalOperatingMode::Auto,
1838 lane_count_override: Some(2),
1839 ..ParallelWalControlSurface::default()
1840 });
1841
1842 assert_eq!(stager.record_batch(sample_lane_batch(10, 0, 1, 10)), 1);
1843 assert_eq!(stager.record_batch(sample_lane_batch(11, 0, 1, 11)), 2);
1844
1845 let out_of_order = [sample_lane_context(11, 0), sample_lane_context(10, 0)];
1846 assert!(stager.take_batches_for_flush(&out_of_order).is_none());
1847 assert_eq!(stager.current_lane_backlog(0), 2);
1848
1849 let in_order = [sample_lane_context(10, 0), sample_lane_context(11, 0)];
1850 let drained = stager
1851 .take_batches_for_flush(&in_order)
1852 .expect("verified in-order batches should drain");
1853 assert_eq!(drained.len(), 2);
1854 assert_eq!(drained.get(&10).map(|batch| batch.payload), Some(10));
1855 assert_eq!(drained.get(&11).map(|batch| batch.payload), Some(11));
1856 assert_eq!(stager.current_lane_backlog(0), 0);
1857 }
1858
1859 #[test]
1860 fn test_lane_stager_discard_batches_for_flush_removes_stale_payloads() {
1861 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1862 mode: ParallelWalOperatingMode::Auto,
1863 lane_count_override: Some(2),
1864 ..ParallelWalControlSurface::default()
1865 });
1866
1867 assert_eq!(stager.record_batch(sample_lane_batch(10, 0, 2, 10)), 2);
1868 assert_eq!(stager.record_batch(sample_lane_batch(11, 0, 3, 11)), 5);
1869 assert_eq!(stager.record_batch(sample_lane_batch(12, 0, 5, 12)), 10);
1870
1871 assert_eq!(
1872 stager.discard_batches_for_flush(&[sample_lane_context(11, 0)]),
1873 1
1874 );
1875 assert_eq!(
1876 stager.current_lane_backlog(0),
1877 7,
1878 "discarding a stale middle batch must subtract its staged frames without disturbing retained payloads"
1879 );
1880
1881 let retained = [sample_lane_context(10, 0), sample_lane_context(12, 0)];
1882 let drained = stager
1883 .take_batches_for_flush(&retained)
1884 .expect("discarded stale payload should not block later retained batches");
1885 assert_eq!(drained.len(), 2);
1886 assert_eq!(drained.get(&10).map(|batch| batch.payload), Some(10));
1887 assert_eq!(drained.get(&12).map(|batch| batch.payload), Some(12));
1888 assert_eq!(stager.current_lane_backlog(0), 0);
1889 }
1890
1891 #[test]
1892 fn test_lane_stager_discard_batches_for_flush_is_idempotent() {
1893 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1894 mode: ParallelWalOperatingMode::Auto,
1895 lane_count_override: Some(2),
1896 ..ParallelWalControlSurface::default()
1897 });
1898
1899 assert_eq!(stager.record_batch(sample_lane_batch(20, 1, 4, 20)), 4);
1900 let context = [sample_lane_context(20, 1)];
1901 assert_eq!(stager.discard_batches_for_flush(&context), 1);
1902 assert_eq!(stager.current_lane_backlog(1), 0);
1903 assert_eq!(
1904 stager.discard_batches_for_flush(&context),
1905 0,
1906 "discarding an already-flushed raw fallback batch should be a no-op"
1907 );
1908 assert_eq!(stager.current_lane_backlog(1), 0);
1909 }
1910
1911 #[test]
1912 fn test_lane_stager_discard_batches_for_flush_ignores_unknown_ids() {
1913 let stager = ParallelWalLaneStager::<u32>::new(ParallelWalControlSurface {
1914 mode: ParallelWalOperatingMode::Auto,
1915 lane_count_override: Some(2),
1916 ..ParallelWalControlSurface::default()
1917 });
1918
1919 assert_eq!(stager.record_batch(sample_lane_batch(30, 0, 2, 30)), 2);
1920 assert_eq!(
1921 stager.discard_batches_for_flush(&[sample_lane_context(99, 0)]),
1922 0
1923 );
1924 assert_eq!(stager.current_lane_backlog(0), 2);
1925
1926 let drained = stager
1927 .take_batches_for_flush(&[sample_lane_context(30, 0)])
1928 .expect("unknown discard must not perturb queued batches");
1929 assert_eq!(drained.get(&30).map(|batch| batch.payload), Some(30));
1930 assert_eq!(stager.current_lane_backlog(0), 0);
1931 }
1932
1933 #[test]
1934 fn test_auto_shadow_compare_sampling_is_deterministic_by_batch_window() {
1935 let control = ParallelWalControlSurface {
1936 mode: ParallelWalOperatingMode::Auto,
1937 shadow_compare_sampling_per_mille: Some(2),
1938 ..ParallelWalControlSurface::default()
1939 };
1940
1941 assert!(parallel_wal_should_shadow_compare(&control, 1));
1942 assert!(parallel_wal_should_shadow_compare(&control, 2));
1943 assert!(!parallel_wal_should_shadow_compare(&control, 3));
1944 assert!(parallel_wal_should_shadow_compare(&control, 1_001));
1945 assert!(parallel_wal_should_shadow_compare(&control, 1_002));
1946 assert!(!parallel_wal_should_shadow_compare(&control, 1_003));
1947 }
1948
1949 #[test]
1950 fn test_shadow_compare_mode_ignores_sampling_gate() {
1951 let control = ParallelWalControlSurface {
1952 mode: ParallelWalOperatingMode::ShadowCompare,
1953 shadow_compare_sampling_per_mille: Some(0),
1954 ..ParallelWalControlSurface::default()
1955 };
1956
1957 assert!(parallel_wal_should_shadow_compare(&control, 1));
1958 assert!(parallel_wal_should_shadow_compare(&control, 7));
1959 }
1960
1961 #[test]
1962 fn test_conservative_mode_never_runs_shadow_compare_sampling() {
1963 let control = ParallelWalControlSurface {
1964 mode: ParallelWalOperatingMode::Conservative,
1965 shadow_compare_sampling_per_mille: Some(1_000),
1966 ..ParallelWalControlSurface::default()
1967 };
1968
1969 assert!(!parallel_wal_should_shadow_compare(&control, 1));
1970 assert!(!parallel_wal_should_shadow_compare(&control, 1_000));
1971 }
1972
1973 #[test]
1974 fn test_global_coordinator_registry() {
1975 let path = PathBuf::from("/tmp/test_registry.db");
1976 let coord1 = parallel_wal_coordinator_for_path(&path);
1977 let coord2 = parallel_wal_coordinator_for_path(&path);
1978
1979 assert!(Arc::ptr_eq(&coord1, &coord2));
1981
1982 remove_parallel_wal_coordinator(&path);
1984 }
1985
1986 #[test]
1987 fn test_epoch_ticker_start_stop() {
1988 let path = PathBuf::from("/tmp/test_ticker.db");
1989 let config = ParallelWalConfig {
1990 slot_count: 4,
1991 epoch_interval_ms: 5, ..ParallelWalConfig::default()
1993 };
1994 let coordinator = ParallelWalCoordinator::new(&path, config);
1995 let runtime = test_runtime();
1996 let cx = test_cx();
1997
1998 assert!(!coordinator.is_running());
2000
2001 coordinator
2003 .start_on_runtime(&runtime.handle(), &cx)
2004 .expect("start should succeed");
2005 assert!(coordinator.is_running());
2006
2007 assert!(
2009 coordinator
2010 .start_on_runtime(&runtime.handle(), &cx)
2011 .is_err()
2012 );
2013
2014 std::thread::sleep(Duration::from_millis(25));
2016
2017 let _epoch = coordinator.current_epoch();
2019
2020 coordinator.stop();
2022 assert!(!coordinator.is_running());
2023
2024 coordinator.stop();
2026 assert!(!coordinator.is_running());
2027 }
2028
2029 #[test]
2030 fn test_epoch_ticker_advances_epochs() {
2031 let path = PathBuf::from("/tmp/test_ticker_advance.db");
2032 let config = ParallelWalConfig {
2033 slot_count: 2, epoch_interval_ms: 5, ..ParallelWalConfig::default()
2036 };
2037 let coordinator = ParallelWalCoordinator::new(&path, config);
2038 let runtime = test_runtime();
2039 let cx = test_cx();
2040
2041 let initial_epoch = coordinator.current_epoch();
2042
2043 coordinator
2045 .start_on_runtime(&runtime.handle(), &cx)
2046 .expect("start should succeed");
2047 std::thread::sleep(Duration::from_millis(50));
2048 coordinator.stop();
2049
2050 let final_epoch = coordinator.current_epoch();
2051
2052 assert!(
2053 final_epoch > initial_epoch,
2054 "epoch ticker should advance without stalling on inactive slots: initial={initial_epoch}, final={final_epoch}"
2055 );
2056 }
2057
2058 #[test]
2059 fn test_epoch_ticker_restart_after_parent_cancellation() {
2060 let path = PathBuf::from("/tmp/test_ticker_restart.db");
2061 let config = ParallelWalConfig {
2062 slot_count: 2,
2063 epoch_interval_ms: 5,
2064 ..ParallelWalConfig::default()
2065 };
2066 let coordinator = ParallelWalCoordinator::new(&path, config);
2067 let runtime = test_runtime();
2068 let parent_cx = test_cx();
2069
2070 coordinator
2071 .start_on_runtime(&runtime.handle(), &parent_cx)
2072 .expect("initial start should succeed");
2073 parent_cx.cancel();
2074 std::thread::sleep(Duration::from_millis(15));
2075
2076 let replacement_cx = test_cx();
2077 coordinator
2078 .start_on_runtime(&runtime.handle(), &replacement_cx)
2079 .expect("restart after parent cancellation should drain prior task");
2080 assert!(coordinator.is_running());
2081
2082 coordinator.stop();
2083 assert!(!coordinator.is_running());
2084 }
2085
2086 #[test]
2087 fn test_submit_batch_persists_actual_frame_payloads() {
2088 use tempfile::tempdir;
2089
2090 let _guard = lane_test_guard();
2091 let dir = tempdir().expect("create temp dir");
2092 let db_path = dir.path().join("submit_batch.db");
2093 let config = ParallelWalConfig {
2094 slot_count: 1,
2095 ..ParallelWalConfig::default()
2096 };
2097 let coordinator = ParallelWalCoordinator::new(&db_path, config);
2098
2099 let epoch = coordinator
2100 .submit_batch(sample_batch(11, 77))
2101 .expect("submit should succeed");
2102 assert_eq!(epoch, 0);
2103
2104 coordinator
2105 .advance_and_flush(Duration::from_millis(50))
2106 .expect("flush should succeed");
2107 assert_eq!(coordinator.durable_epoch(), Some(0));
2108
2109 let seg_path = segment_path(&db_path, 0);
2110 let (_, records) = read_segment(&seg_path).expect("segment should read back");
2111 assert_eq!(records.len(), 2);
2112 assert_eq!(records[0].txn_token.id.get(), 11);
2113 assert_eq!(records[0].begin_seq, CommitSeq::new(77));
2114 assert_eq!(records[0].page_id.get(), 7);
2115 assert_eq!(records[0].after_image, vec![0xAA; 16]);
2116 assert_eq!(records[1].page_id.get(), 9);
2117 assert_eq!(records[1].after_image, vec![0xBB; 24]);
2118 }
2119
2120 #[test]
2121 fn test_advance_and_flush_does_not_mark_epoch_durable_on_segment_write_failure() {
2122 use tempfile::tempdir;
2123
2124 let _guard = lane_test_guard();
2125 let dir = tempdir().expect("create temp dir");
2126 let db_path = dir.path().join("missing").join("write_failure.db");
2127 let config = ParallelWalConfig {
2128 slot_count: 1,
2129 ..ParallelWalConfig::default()
2130 };
2131 let coordinator = ParallelWalCoordinator::new(&db_path, config);
2132
2133 coordinator
2134 .submit_batch(sample_batch(21, 99))
2135 .expect("submit should succeed");
2136
2137 let error = coordinator
2138 .advance_and_flush(Duration::from_millis(50))
2139 .expect_err("flush should fail when the segment directory is missing");
2140 assert!(
2141 error.contains("write_segment(0) failed"),
2142 "error should preserve the failing epoch: {error}"
2143 );
2144 assert_eq!(
2145 coordinator.durable_epoch(),
2146 None,
2147 "failed segment writes must not be reported as durable"
2148 );
2149 assert!(
2150 coordinator
2151 .wait_for_epoch_durable(0, Duration::from_millis(10))
2152 .is_err(),
2153 "durability wait must keep blocking after a failed segment write"
2154 );
2155 }
2156
2157 #[test]
2162 fn test_segment_header_roundtrip() {
2163 let header = SegmentHeader::new(42, 100);
2164 let bytes = header.to_bytes();
2165 let parsed = SegmentHeader::from_bytes(&bytes).expect("should parse");
2166 assert_eq!(parsed.epoch, 42);
2167 assert_eq!(parsed.record_count, 100);
2168 }
2169
2170 #[test]
2171 fn test_segment_header_invalid_magic() {
2172 let mut bytes = [0u8; SEGMENT_HEADER_SIZE];
2173 bytes[0..4].copy_from_slice(&0xDEAD_BEEFu32.to_le_bytes());
2174 let result = SegmentHeader::from_bytes(&bytes);
2175 assert!(result.is_err());
2176 assert!(result.unwrap_err().contains("invalid segment magic"));
2177 }
2178
2179 #[test]
2180 fn test_segment_header_checksum_mismatch() {
2181 let header = SegmentHeader::new(42, 100);
2182 let mut bytes = header.to_bytes();
2183 bytes[8] ^= 0xFF;
2185 let result = SegmentHeader::from_bytes(&bytes);
2186 assert!(result.is_err());
2187 assert!(result.unwrap_err().contains("checksum mismatch"));
2188 }
2189
2190 #[test]
2191 fn test_segment_path_generation() {
2192 let db_path = PathBuf::from("/tmp/mydb.sqlite");
2193 let path = segment_path(&db_path, 0x1234_5678_9ABC_DEF0);
2194 assert_eq!(
2195 path.file_name().unwrap().to_str().unwrap(),
2196 "mydb.sqlite-wal-seg-123456789abcdef0"
2197 );
2198 }
2199
2200 #[test]
2201 fn test_segment_write_and_read() {
2202 use tempfile::tempdir;
2203
2204 let dir = tempdir().expect("create temp dir");
2205 let db_path = dir.path().join("test.db");
2206
2207 let records = vec![
2209 WalRecord {
2210 txn_token: TxnToken::new(
2211 fsqlite_types::TxnId::new(1).unwrap(),
2212 fsqlite_types::TxnEpoch::new(0),
2213 ),
2214 epoch: 5,
2215 page_id: PageNumber::new(1).unwrap(),
2216 begin_seq: CommitSeq::new(100),
2217 end_seq: Some(CommitSeq::new(100)),
2218 before_image: vec![0u8; 32],
2219 after_image: vec![1u8; 32],
2220 },
2221 WalRecord {
2222 txn_token: TxnToken::new(
2223 fsqlite_types::TxnId::new(2).unwrap(),
2224 fsqlite_types::TxnEpoch::new(1),
2225 ),
2226 epoch: 5,
2227 page_id: PageNumber::new(2).unwrap(),
2228 begin_seq: CommitSeq::new(101),
2229 end_seq: None,
2230 before_image: Vec::new(),
2231 after_image: vec![2u8; 64],
2232 },
2233 ];
2234
2235 let batch = EpochFlushBatch {
2236 epoch: 5,
2237 records,
2238 records_per_core: vec![1, 1],
2239 };
2240
2241 let bytes_written =
2243 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2244 assert!(bytes_written > SEGMENT_HEADER_SIZE);
2245
2246 let seg_path = segment_path(&db_path, 5);
2248 let (header, records) = read_segment(&seg_path).expect("read should succeed");
2249
2250 assert_eq!(header.epoch, 5);
2251 assert_eq!(header.record_count, 2);
2252 assert_eq!(records.len(), 2);
2253
2254 assert_eq!(records[0].txn_token.id.get(), 1);
2256 assert_eq!(records[0].page_id.get(), 1);
2257 assert_eq!(records[0].before_image.len(), 32);
2258 assert_eq!(records[0].after_image.len(), 32);
2259 assert_eq!(records[0].end_seq, Some(CommitSeq::new(100)));
2260
2261 assert_eq!(records[1].txn_token.id.get(), 2);
2263 assert_eq!(records[1].page_id.get(), 2);
2264 assert_eq!(records[1].before_image.len(), 0);
2265 assert_eq!(records[1].after_image.len(), 64);
2266 assert_eq!(records[1].end_seq, None);
2267
2268 delete_segment(&seg_path).expect("delete should succeed");
2270 }
2271
2272 #[test]
2273 fn test_deserialize_record_rejects_invalid_end_seq_flag() {
2274 let record = WalRecord {
2275 txn_token: TxnToken::new(
2276 fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2277 fsqlite_types::TxnEpoch::new(0),
2278 ),
2279 epoch: 5,
2280 page_id: PageNumber::new(1).expect("page should be non-zero"),
2281 begin_seq: CommitSeq::new(100),
2282 end_seq: None,
2283 before_image: Vec::new(),
2284 after_image: vec![0xAA; 8],
2285 };
2286 let mut bytes = serialize_record(&record).expect("sample record should serialize");
2287 let end_seq_flag_offset = 8 + 4 + 8 + 4 + 8;
2288 bytes[end_seq_flag_offset] = 2;
2289
2290 let error = deserialize_record(&bytes)
2291 .expect_err("invalid end_seq flag must reject corrupt record bytes");
2292 assert!(
2293 error.contains("invalid end_seq flag"),
2294 "unexpected error: {error}"
2295 );
2296 }
2297
2298 #[test]
2299 fn test_deserialize_record_rejects_trailing_bytes() {
2300 let record = WalRecord {
2301 txn_token: TxnToken::new(
2302 fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2303 fsqlite_types::TxnEpoch::new(0),
2304 ),
2305 epoch: 5,
2306 page_id: PageNumber::new(1).expect("page should be non-zero"),
2307 begin_seq: CommitSeq::new(100),
2308 end_seq: None,
2309 before_image: Vec::new(),
2310 after_image: vec![0xAA; 8],
2311 };
2312 let mut bytes = serialize_record(&record).expect("sample record should serialize");
2313 bytes.extend_from_slice(b"junk");
2314
2315 let error =
2316 deserialize_record(&bytes).expect_err("record decoder must reject trailing bytes");
2317 assert!(
2318 error.contains("trailing bytes"),
2319 "unexpected error: {error}"
2320 );
2321 }
2322
2323 #[test]
2324 fn test_read_segment_rejects_impossible_record_count_before_allocation() {
2325 use tempfile::tempdir;
2326
2327 let dir = tempdir().expect("create temp dir");
2328 let db_path = dir.path().join("impossible-count.db");
2329 let seg_path = segment_path(&db_path, 1);
2330 std::fs::write(&seg_path, SegmentHeader::new(1, u32::MAX).to_bytes())
2331 .expect("write corrupt segment header");
2332
2333 let error =
2334 read_segment(&seg_path).expect_err("impossible record count must fail before alloc");
2335 assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2336 assert!(
2337 error.to_string().contains("exceeds maximum possible"),
2338 "unexpected error: {error}"
2339 );
2340 }
2341
2342 #[test]
2343 fn test_read_segment_rejects_record_count_without_min_payload_space() {
2344 use tempfile::tempdir;
2345
2346 let dir = tempdir().expect("create temp dir");
2347 let db_path = dir.path().join("short-records.db");
2348 let seg_path = segment_path(&db_path, 1);
2349 let mut bytes = SegmentHeader::new(1, 2).to_bytes().to_vec();
2350 bytes.extend_from_slice(&0_u32.to_le_bytes());
2351 bytes.extend_from_slice(&0_u32.to_le_bytes());
2352 std::fs::write(&seg_path, bytes).expect("write corrupt segment");
2353
2354 let error = read_segment(&seg_path)
2355 .expect_err("record count must account for minimum record payload bytes");
2356 assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2357 assert!(
2358 error.to_string().contains("exceeds maximum possible"),
2359 "unexpected error: {error}"
2360 );
2361 }
2362
2363 #[test]
2364 fn test_read_segment_rejects_trailing_bytes_after_declared_records() {
2365 use tempfile::tempdir;
2366
2367 let dir = tempdir().expect("create temp dir");
2368 let db_path = dir.path().join("trailing-bytes.db");
2369 let batch = EpochFlushBatch {
2370 epoch: 1,
2371 records: vec![WalRecord {
2372 txn_token: TxnToken::new(
2373 fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2374 fsqlite_types::TxnEpoch::new(0),
2375 ),
2376 epoch: 1,
2377 page_id: PageNumber::new(1).expect("page should be non-zero"),
2378 begin_seq: CommitSeq::new(1),
2379 end_seq: Some(CommitSeq::new(1)),
2380 before_image: Vec::new(),
2381 after_image: vec![0xCC; 16],
2382 }],
2383 records_per_core: vec![1],
2384 };
2385 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2386
2387 let seg_path = segment_path(&db_path, 1);
2388 {
2389 use std::io::Write as _;
2390 let mut file = OpenOptions::new()
2391 .append(true)
2392 .open(&seg_path)
2393 .expect("open segment for append");
2394 file.write_all(b"junk").expect("append trailing bytes");
2395 }
2396
2397 let error =
2398 read_segment(&seg_path).expect_err("segment decoder must reject trailing bytes");
2399 assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2400 assert!(
2401 error.to_string().contains("trailing bytes"),
2402 "unexpected error: {error}"
2403 );
2404 }
2405
2406 #[test]
2407 fn test_read_segment_rejects_oversized_record_length_before_allocation() {
2408 use tempfile::tempdir;
2409
2410 let dir = tempdir().expect("create temp dir");
2411 let db_path = dir.path().join("oversized.db");
2412 let seg_path = segment_path(&db_path, 1);
2413 let mut bytes = Vec::new();
2414 bytes.extend_from_slice(&SegmentHeader::new(1, 1).to_bytes());
2415 bytes.extend_from_slice(&u32::MAX.to_le_bytes());
2416 std::fs::write(&seg_path, bytes).expect("write corrupt segment");
2417
2418 let error =
2419 read_segment(&seg_path).expect_err("oversized record length must fail before alloc");
2420 assert_eq!(error.kind(), io::ErrorKind::InvalidData);
2421 assert!(
2422 error.to_string().contains("exceeds maximum"),
2423 "unexpected error: {error}"
2424 );
2425 }
2426
2427 #[test]
2428 fn test_segment_write_and_recovery_canonicalize_intra_epoch_order() {
2429 use tempfile::tempdir;
2430
2431 let dir = tempdir().expect("create temp dir");
2432 let db_path = dir.path().join("ordered.db");
2433 let page_id = PageNumber::new(1).unwrap();
2434
2435 let later = WalRecord {
2436 txn_token: TxnToken::new(
2437 fsqlite_types::TxnId::new(2).unwrap(),
2438 fsqlite_types::TxnEpoch::new(0),
2439 ),
2440 epoch: 7,
2441 page_id,
2442 begin_seq: CommitSeq::new(200),
2443 end_seq: Some(CommitSeq::new(200)),
2444 before_image: Vec::new(),
2445 after_image: vec![0x22; 8],
2446 };
2447 let earlier = WalRecord {
2448 txn_token: TxnToken::new(
2449 fsqlite_types::TxnId::new(1).unwrap(),
2450 fsqlite_types::TxnEpoch::new(0),
2451 ),
2452 epoch: 7,
2453 page_id,
2454 begin_seq: CommitSeq::new(100),
2455 end_seq: Some(CommitSeq::new(100)),
2456 before_image: Vec::new(),
2457 after_image: vec![0x11; 8],
2458 };
2459 let batch = EpochFlushBatch {
2460 epoch: 7,
2461 records: vec![later, earlier],
2462 records_per_core: vec![1, 1],
2463 };
2464
2465 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2466
2467 let seg_path = segment_path(&db_path, 7);
2468 let (_, records) = read_segment(&seg_path).expect("read should succeed");
2469 assert_eq!(records.len(), 2);
2470 assert_eq!(records[0].begin_seq, CommitSeq::new(100));
2471 assert_eq!(records[1].begin_seq, CommitSeq::new(200));
2472
2473 let mut page_contents = HashMap::new();
2474 recover_and_apply_segments(
2475 &db_path,
2476 &mut page_contents,
2477 SegmentRecoveryOptions::default(),
2478 )
2479 .expect("recovery should succeed");
2480 assert_eq!(
2481 page_contents.get(&page_id.get()),
2482 Some(&vec![0x22; 8]),
2483 "recovery must replay the later commit last even if the flushed batch arrived out of order"
2484 );
2485 }
2486
2487 #[test]
2488 fn test_write_segment_rejects_record_epoch_mismatch() {
2489 use tempfile::tempdir;
2490
2491 let dir = tempdir().expect("create temp dir");
2492 let db_path = dir.path().join("mismatch.db");
2493
2494 let batch = EpochFlushBatch {
2495 epoch: 5,
2496 records: vec![WalRecord {
2497 txn_token: TxnToken::new(
2498 fsqlite_types::TxnId::new(1).unwrap(),
2499 fsqlite_types::TxnEpoch::new(0),
2500 ),
2501 epoch: 4,
2502 page_id: PageNumber::new(1).unwrap(),
2503 begin_seq: CommitSeq::new(100),
2504 end_seq: Some(CommitSeq::new(100)),
2505 before_image: Vec::new(),
2506 after_image: vec![0xAB; 8],
2507 }],
2508 records_per_core: vec![1],
2509 };
2510
2511 let error = write_segment(&db_path, &batch, FsyncPolicy::Off)
2512 .expect_err("segment write must reject mixed-epoch records");
2513 assert!(
2514 error
2515 .to_string()
2516 .contains("segment epoch 5 contains record from epoch 4"),
2517 "unexpected error: {error}"
2518 );
2519 assert!(
2520 !segment_path(&db_path, 5).exists(),
2521 "failed validation must not create or truncate a segment file"
2522 );
2523 }
2524
2525 #[test]
2526 fn test_write_segment_rejects_oversized_page_image_before_create() {
2527 use tempfile::tempdir;
2528
2529 let dir = tempdir().expect("create temp dir");
2530 let db_path = dir.path().join("oversized-write.db");
2531 let batch = EpochFlushBatch {
2532 epoch: 5,
2533 records: vec![WalRecord {
2534 txn_token: TxnToken::new(
2535 fsqlite_types::TxnId::new(1).expect("txn id should be non-zero"),
2536 fsqlite_types::TxnEpoch::new(0),
2537 ),
2538 epoch: 5,
2539 page_id: PageNumber::new(1).expect("page should be non-zero"),
2540 begin_seq: CommitSeq::new(100),
2541 end_seq: Some(CommitSeq::new(100)),
2542 before_image: Vec::new(),
2543 after_image: vec![0xAB; MAX_SEGMENT_RECORD_IMAGE_BYTES + 1],
2544 }],
2545 records_per_core: vec![1],
2546 };
2547
2548 let error = write_segment(&db_path, &batch, FsyncPolicy::Off)
2549 .expect_err("segment write must reject oversized page images");
2550 assert_eq!(error.kind(), io::ErrorKind::InvalidInput);
2551 assert!(
2552 error.to_string().contains("after_image length"),
2553 "unexpected error: {error}"
2554 );
2555 assert!(
2556 !segment_path(&db_path, 5).exists(),
2557 "failed validation must not create or truncate a segment file"
2558 );
2559 }
2560
2561 #[test]
2562 fn test_list_segments() {
2563 use tempfile::tempdir;
2564
2565 let dir = tempdir().expect("create temp dir");
2566 let db_path = dir.path().join("test.db");
2567
2568 for epoch in [1u64, 5, 10, 2] {
2570 let batch = EpochFlushBatch {
2571 epoch,
2572 records: Vec::new(),
2573 records_per_core: Vec::new(),
2574 };
2575 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2576 }
2577
2578 let segments = list_segments(&db_path).expect("list should succeed");
2580 assert_eq!(segments.len(), 4);
2581
2582 assert_eq!(segments[0].0, 1);
2584 assert_eq!(segments[1].0, 2);
2585 assert_eq!(segments[2].0, 5);
2586 assert_eq!(segments[3].0, 10);
2587
2588 for (_, path) in segments {
2590 delete_segment(&path).expect("delete should succeed");
2591 }
2592 }
2593
2594 #[test]
2599 fn test_recover_segments_basic() {
2600 use tempfile::tempdir;
2601
2602 let dir = tempdir().expect("create temp dir");
2603 let db_path = dir.path().join("test.db");
2604
2605 for epoch in 1..=3u64 {
2607 let records = vec![WalRecord {
2608 txn_token: TxnToken::new(
2609 fsqlite_types::TxnId::new(epoch).unwrap(),
2610 fsqlite_types::TxnEpoch::new(0),
2611 ),
2612 epoch,
2613 page_id: PageNumber::new(epoch as u32).unwrap(),
2614 begin_seq: CommitSeq::new(epoch * 100),
2615 end_seq: Some(CommitSeq::new(epoch * 100)),
2616 before_image: Vec::new(),
2617 after_image: vec![epoch as u8; 32],
2618 }];
2619 let batch = EpochFlushBatch {
2620 epoch,
2621 records,
2622 records_per_core: vec![1],
2623 };
2624 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2625 }
2626
2627 let options = SegmentRecoveryOptions::default();
2629 let (result, records) =
2630 recover_segments(&db_path, options).expect("recovery should succeed");
2631
2632 assert_eq!(result.segments_recovered, 3);
2633 assert_eq!(result.records_applied, 3);
2634 assert_eq!(result.epochs, vec![1, 2, 3]);
2635 assert!(result.partial_segments.is_empty());
2636
2637 assert_eq!(records.len(), 3);
2639 assert_eq!(records[0].epoch, 1);
2640 assert_eq!(records[1].epoch, 2);
2641 assert_eq!(records[2].epoch, 3);
2642
2643 cleanup_segments(&db_path).expect("cleanup should succeed");
2645 }
2646
2647 #[test]
2648 fn test_recover_segments_rejects_header_filename_epoch_mismatch() {
2649 use tempfile::tempdir;
2650
2651 let dir = tempdir().expect("create temp dir");
2652 let db_path = dir.path().join("rename.db");
2653 let batch = EpochFlushBatch {
2654 epoch: 5,
2655 records: vec![WalRecord {
2656 txn_token: TxnToken::new(
2657 fsqlite_types::TxnId::new(1).unwrap(),
2658 fsqlite_types::TxnEpoch::new(0),
2659 ),
2660 epoch: 5,
2661 page_id: PageNumber::new(1).unwrap(),
2662 begin_seq: CommitSeq::new(100),
2663 end_seq: Some(CommitSeq::new(100)),
2664 before_image: Vec::new(),
2665 after_image: vec![0xAA; 8],
2666 }],
2667 records_per_core: vec![1],
2668 };
2669 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2670
2671 let original = segment_path(&db_path, 5);
2672 let renamed = segment_path(&db_path, 3);
2673 std::fs::rename(&original, &renamed).expect("rename should succeed");
2674
2675 let error = recover_segments(&db_path, SegmentRecoveryOptions::default())
2676 .expect_err("recovery must fail closed on mismatched epoch metadata");
2677 assert!(
2678 error.to_string().contains("mismatched epoch"),
2679 "unexpected error: {error}"
2680 );
2681
2682 let (result, records) = recover_segments(
2683 &db_path,
2684 SegmentRecoveryOptions {
2685 skip_corrupt: true,
2686 ..Default::default()
2687 },
2688 )
2689 .expect("skip_corrupt should ignore the bad segment");
2690 assert_eq!(result.segments_recovered, 0);
2691 assert_eq!(result.partial_segments, vec![renamed]);
2692 assert!(records.is_empty());
2693 }
2694
2695 #[test]
2696 fn test_recover_and_apply_segments_skip_corrupt_stops_at_first_bad_epoch() {
2697 use tempfile::tempdir;
2698
2699 let dir = tempdir().expect("create temp dir");
2700 let db_path = dir.path().join("prefix.db");
2701
2702 for epoch in 1..=3u64 {
2703 let batch = EpochFlushBatch {
2704 epoch,
2705 records: vec![WalRecord {
2706 txn_token: TxnToken::new(
2707 fsqlite_types::TxnId::new(epoch).unwrap(),
2708 fsqlite_types::TxnEpoch::new(0),
2709 ),
2710 epoch,
2711 page_id: PageNumber::new(1).unwrap(),
2712 begin_seq: CommitSeq::new(epoch * 100),
2713 end_seq: Some(CommitSeq::new(epoch * 100)),
2714 before_image: Vec::new(),
2715 after_image: vec![epoch as u8; 16],
2716 }],
2717 records_per_core: vec![1],
2718 };
2719 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2720 }
2721
2722 let corrupt_epoch_path = segment_path(&db_path, 2);
2723 std::fs::write(&corrupt_epoch_path, [0xFF_u8; 8]).expect("corrupt write should succeed");
2724
2725 let mut page_contents = HashMap::new();
2726 let result = recover_and_apply_segments(
2727 &db_path,
2728 &mut page_contents,
2729 SegmentRecoveryOptions {
2730 skip_corrupt: true,
2731 ..Default::default()
2732 },
2733 )
2734 .expect("skip_corrupt should return the durable prefix");
2735
2736 assert_eq!(result.segments_recovered, 1);
2737 assert_eq!(result.records_applied, 1);
2738 assert_eq!(result.epochs, vec![1]);
2739 assert_eq!(
2740 result.partial_segments,
2741 vec![segment_path(&db_path, 2), segment_path(&db_path, 3)]
2742 );
2743
2744 let page = page_contents
2745 .get(&1)
2746 .expect("prefix recovery should apply the last durable epoch only");
2747 assert!(
2748 page.iter().all(|&byte| byte == 1),
2749 "recovery must stop before epoch 3 once epoch 2 is corrupt"
2750 );
2751 }
2752
2753 #[test]
2754 fn test_recover_and_apply_segments() {
2755 use tempfile::tempdir;
2756
2757 let dir = tempdir().expect("create temp dir");
2758 let db_path = dir.path().join("test.db");
2759
2760 let page_id = 1u32;
2762 for epoch in 1..=3u64 {
2763 let records = vec![WalRecord {
2764 txn_token: TxnToken::new(
2765 fsqlite_types::TxnId::new(epoch).unwrap(),
2766 fsqlite_types::TxnEpoch::new(0),
2767 ),
2768 epoch,
2769 page_id: PageNumber::new(page_id).unwrap(),
2770 begin_seq: CommitSeq::new(epoch * 100),
2771 end_seq: Some(CommitSeq::new(epoch * 100)),
2772 before_image: Vec::new(),
2773 after_image: vec![epoch as u8; 32], }];
2775 let batch = EpochFlushBatch {
2776 epoch,
2777 records,
2778 records_per_core: vec![1],
2779 };
2780 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2781 }
2782
2783 let mut page_contents = HashMap::new();
2785 let options = SegmentRecoveryOptions {
2786 delete_after_recovery: true,
2787 ..Default::default()
2788 };
2789 let result = recover_and_apply_segments(&db_path, &mut page_contents, options)
2790 .expect("should succeed");
2791
2792 assert_eq!(result.segments_recovered, 3);
2793
2794 let page = page_contents.get(&page_id).expect("page should exist");
2796 assert_eq!(page.len(), 32);
2797 assert!(page.iter().all(|&b| b == 3), "should have epoch 3 content");
2798
2799 let remaining = list_segments(&db_path).expect("list should succeed");
2801 assert!(
2802 remaining.is_empty(),
2803 "segments should be deleted after recovery"
2804 );
2805 }
2806
2807 #[test]
2808 fn test_max_durable_epoch() {
2809 use tempfile::tempdir;
2810
2811 let dir = tempdir().expect("create temp dir");
2812 let db_path = dir.path().join("test.db");
2813
2814 let max = max_durable_epoch(&db_path).expect("should succeed");
2816 assert_eq!(max, None);
2817
2818 for epoch in [5u64, 10, 3] {
2820 let batch = EpochFlushBatch {
2821 epoch,
2822 records: Vec::new(),
2823 records_per_core: Vec::new(),
2824 };
2825 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2826 }
2827
2828 let max = max_durable_epoch(&db_path).expect("should succeed");
2830 assert_eq!(max, Some(10));
2831
2832 cleanup_segments(&db_path).expect("cleanup should succeed");
2834
2835 let max = max_durable_epoch(&db_path).expect("should succeed");
2837 assert_eq!(max, None);
2838 }
2839
2840 #[test]
2841 fn test_cleanup_segments() {
2842 use tempfile::tempdir;
2843
2844 let dir = tempdir().expect("create temp dir");
2845 let db_path = dir.path().join("test.db");
2846
2847 for epoch in 1..=5u64 {
2849 let batch = EpochFlushBatch {
2850 epoch,
2851 records: Vec::new(),
2852 records_per_core: Vec::new(),
2853 };
2854 write_segment(&db_path, &batch, FsyncPolicy::Off).expect("write should succeed");
2855 }
2856
2857 let segments = list_segments(&db_path).expect("list should succeed");
2859 assert_eq!(segments.len(), 5);
2860
2861 let count = cleanup_segments(&db_path).expect("cleanup should succeed");
2863 assert_eq!(count, 5);
2864
2865 let segments = list_segments(&db_path).expect("list should succeed");
2867 assert!(segments.is_empty());
2868 }
2869
2870 #[test]
2871 fn parallel_wal_config_default_values() {
2872 let cfg = ParallelWalConfig::default();
2873 assert_eq!(cfg.slot_count, DEFAULT_BUFFER_SLOT_COUNT);
2874 assert_eq!(cfg.epoch_interval_ms, 10);
2875 assert_eq!(cfg.buffer_capacity_bytes, 4 * 1024 * 1024);
2876 let copied = cfg;
2877 assert_eq!(copied.epoch_interval_ms, cfg.epoch_interval_ms);
2878 let dbg = format!("{cfg:?}");
2879 assert!(dbg.contains("ParallelWalConfig"));
2880 }
2881
2882 #[test]
2883 fn parallel_wal_fallback_reason_all_variants_debug_copy_eq() {
2884 let variants = [
2885 ParallelWalFallbackReason::OperatorForced,
2886 ParallelWalFallbackReason::LaneOverflow,
2887 ParallelWalFallbackReason::CertificateGap,
2888 ParallelWalFallbackReason::CertificateChecksumMismatch,
2889 ParallelWalFallbackReason::PublicationMismatch,
2890 ParallelWalFallbackReason::RecoveryGap,
2891 ParallelWalFallbackReason::CheckpointConflict,
2892 ParallelWalFallbackReason::ControllerEvidenceLost,
2893 ];
2894 for (i, v) in variants.iter().enumerate() {
2895 let copied = *v;
2896 assert_eq!(copied, *v);
2897 for (j, w) in variants.iter().enumerate() {
2898 assert_eq!(i == j, v == w);
2899 }
2900 }
2901 let dbg = format!("{:?}", ParallelWalFallbackReason::CertificateGap);
2902 assert!(dbg.contains("CertificateGap"));
2903 }
2904
2905 #[test]
2906 fn parallel_wal_control_surface_default_and_eq() {
2907 let def = ParallelWalControlSurface::default();
2908 assert_eq!(def.mode, ParallelWalOperatingMode::Auto);
2909 assert!(def.lane_count_override.is_none());
2910 assert!(def.helper_lane_budget.is_none());
2911 assert!(def.max_parallel_commit_bytes.is_none());
2912 assert!(def.max_flush_delay_ms.is_none());
2913 assert!(def.shadow_compare_sampling_per_mille.is_none());
2914 let other = ParallelWalControlSurface {
2915 mode: ParallelWalOperatingMode::Conservative,
2916 ..ParallelWalControlSurface::default()
2917 };
2918 assert_ne!(def, other);
2919 let dbg = format!("{def:?}");
2920 assert!(dbg.contains("ParallelWalControlSurface"));
2921 }
2922
2923 #[test]
2924 fn parallel_wal_ordered_residue_and_shadow_verdict_defaults() {
2925 let residue = ParallelWalOrderedResidue::default();
2926 assert_eq!(
2927 residue,
2928 ParallelWalOrderedResidue::CommitCertificateThenPublish
2929 );
2930 let copied = residue;
2931 assert_eq!(copied, residue);
2932
2933 let verdict = ParallelWalShadowVerdict::default();
2934 assert_eq!(verdict, ParallelWalShadowVerdict::NotRun);
2935 assert_ne!(verdict, ParallelWalShadowVerdict::Clean);
2936 assert_ne!(
2937 ParallelWalShadowVerdict::Clean,
2938 ParallelWalShadowVerdict::Diverged
2939 );
2940 let dbg = format!("{verdict:?}");
2941 assert!(dbg.contains("NotRun"));
2942 }
2943
2944 #[test]
2945 fn decision_action_all_variants_copy_eq() {
2946 let variants = [
2947 ParallelWalDecisionAction::KeepCurrent,
2948 ParallelWalDecisionAction::SealEpochNow,
2949 ParallelWalDecisionAction::IncreaseLaneBudget,
2950 ParallelWalDecisionAction::DecreaseLaneBudget,
2951 ParallelWalDecisionAction::ForceConservative,
2952 ];
2953 for (i, a) in variants.iter().enumerate() {
2954 let copied = *a;
2955 assert_eq!(copied, *a);
2956 for (j, b) in variants.iter().enumerate() {
2957 assert_eq!(i == j, a == b);
2958 }
2959 }
2960 }
2961
2962 #[test]
2963 fn fsync_policy_all_variants_and_default() {
2964 assert_eq!(FsyncPolicy::default(), FsyncPolicy::Full);
2965 let variants = [FsyncPolicy::Full, FsyncPolicy::Normal, FsyncPolicy::Off];
2966 for (i, a) in variants.iter().enumerate() {
2967 let copied = *a;
2968 assert_eq!(copied, *a);
2969 for (j, b) in variants.iter().enumerate() {
2970 assert_eq!(i == j, a == b);
2971 }
2972 }
2973 let dbg = format!("{:?}", FsyncPolicy::Off);
2974 assert!(dbg.contains("Off"));
2975 }
2976
2977 #[test]
2978 fn trace_record_clone_eq_debug() {
2979 let tr = ParallelWalTraceRecord {
2980 component: "test".into(),
2981 trace_id: 1,
2982 decision_id: None,
2983 mode: ParallelWalOperatingMode::Auto,
2984 lane_id: Some(0),
2985 epoch: Some(5),
2986 commit_seq_lo: None,
2987 commit_seq_hi: None,
2988 checkpoint_epoch: None,
2989 recovery_epoch: None,
2990 fallback_active: false,
2991 fallback_reason: None,
2992 policy_id: None,
2993 policy_version: None,
2994 };
2995 let cloned = tr.clone();
2996 assert_eq!(cloned, tr);
2997 let dbg = format!("{tr:?}");
2998 assert!(dbg.contains("ParallelWalTraceRecord"));
2999 }
3000
3001 #[test]
3002 fn commit_certificate_clone_eq_debug() {
3003 let cert = ParallelWalCommitCertificate {
3004 format_version: 1,
3005 residue: ParallelWalOrderedResidue::default(),
3006 certificate_epoch: 10,
3007 commit_seq_lo: CommitSeq::new(1),
3008 commit_seq_hi: CommitSeq::new(5),
3009 durable_segment_epoch: 9,
3010 lane_count: 4,
3011 lane_record_counts: vec![2, 3, 0, 1],
3012 db_size_pages: 100,
3013 page_set_size: 6,
3014 certificate_crc32c: 0,
3015 fallback_active: false,
3016 };
3017 let cloned = cert.clone();
3018 assert_eq!(cloned, cert);
3019 assert_eq!(cert.lane_count, 4);
3020 assert_eq!(cert.lane_record_counts.len(), 4);
3021 let dbg = format!("{cert:?}");
3022 assert!(dbg.contains("ParallelWalCommitCertificate"));
3023 }
3024}