1use fsqlite_types::sync_primitives::Instant;
24use std::collections::VecDeque;
25
26use fsqlite_types::{
27 CommitMarker, CommitProof, CommitSeq, ObjectId, OperatingMode, PageNumber, TxnToken,
28};
29use tracing::{debug, info, trace, warn};
30
31use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
32
33#[derive(Debug, Clone)]
42pub struct CommitSubmission {
43 pub capsule_object_id: ObjectId,
45 pub capsule_digest: [u8; 32],
47 pub write_set_pages: Vec<PageNumber>,
49 pub witness_refs: Vec<ObjectId>,
51 pub edge_ids: Vec<ObjectId>,
53 pub merge_witness_ids: Vec<ObjectId>,
55 pub txn_token: TxnToken,
57 pub begin_seq: CommitSeq,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum CommitResult {
68 Committed {
70 commit_seq: CommitSeq,
71 commit_time_unix_ns: u64,
72 },
73 ConflictFcw { conflicting_pages: Vec<PageNumber> },
75 ConflictSsi,
77 ShuttingDown,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct FsyncBarriers {
88 pub fsync1_complete: bool,
90 pub fsync2_complete: bool,
92}
93
94impl FsyncBarriers {
95 #[must_use]
97 pub const fn new() -> Self {
98 Self {
99 fsync1_complete: false,
100 fsync2_complete: false,
101 }
102 }
103
104 #[must_use]
106 pub const fn all_complete(self) -> bool {
107 self.fsync1_complete && self.fsync2_complete
108 }
109}
110
111impl Default for FsyncBarriers {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[derive(Debug)]
127pub struct GroupCommitBatch {
128 pending: VecDeque<PendingCommit>,
130 max_batch_size: usize,
132}
133
134#[derive(Debug)]
136struct PendingCommit {
137 submission: CommitSubmission,
138 allocated_seq: CommitSeq,
139 allocated_time_ns: u64,
140 proof_object_id: ObjectId,
141 barriers: FsyncBarriers,
142}
143
144impl GroupCommitBatch {
145 #[must_use]
147 pub fn new(max_batch_size: usize) -> Self {
148 Self {
149 pending: VecDeque::with_capacity(max_batch_size),
150 max_batch_size,
151 }
152 }
153
154 #[must_use]
156 pub fn len(&self) -> usize {
157 self.pending.len()
158 }
159
160 #[must_use]
162 pub fn is_empty(&self) -> bool {
163 self.pending.is_empty()
164 }
165
166 #[must_use]
168 pub fn is_full(&self) -> bool {
169 self.pending.len() >= self.max_batch_size
170 }
171
172 fn push(&mut self, pending: PendingCommit) {
174 self.pending.push_back(pending);
175 }
176
177 fn mark_fsync1_complete(&mut self) {
179 for pc in &mut self.pending {
180 pc.barriers.fsync1_complete = true;
181 }
182 }
183
184 fn mark_fsync2_complete(&mut self) {
186 for pc in &mut self.pending {
187 pc.barriers.fsync2_complete = true;
188 }
189 }
190
191 fn drain_committed(&mut self) -> Vec<(CommitSubmission, CommitSeq, u64)> {
193 let mut committed = Vec::with_capacity(self.pending.len());
194 while let Some(front) = self.pending.front() {
195 if front.barriers.all_complete() {
196 let pc = self.pending.pop_front().expect("checked non-empty");
197 committed.push((pc.submission, pc.allocated_seq, pc.allocated_time_ns));
198 } else {
199 break;
200 }
201 }
202 committed
203 }
204}
205
206#[derive(Debug, Clone)]
215pub struct CommitIndex {
216 entries: std::collections::HashMap<PageNumber, CommitSeq>,
218}
219
220impl CommitIndex {
221 #[must_use]
223 pub fn new() -> Self {
224 Self {
225 entries: std::collections::HashMap::new(),
226 }
227 }
228
229 pub fn record_commit(&mut self, pages: &[PageNumber], seq: CommitSeq) {
231 for &page in pages {
232 self.entries
233 .entry(page)
234 .and_modify(|existing| {
235 if seq > *existing {
236 *existing = seq;
237 }
238 })
239 .or_insert(seq);
240 }
241 }
242
243 #[must_use]
245 pub fn check_conflicts(
246 &self,
247 write_set: &[PageNumber],
248 begin_seq: CommitSeq,
249 ) -> Vec<PageNumber> {
250 write_set
251 .iter()
252 .filter(|page| {
253 self.entries
254 .get(page)
255 .is_some_and(|&latest| latest > begin_seq)
256 })
257 .copied()
258 .collect()
259 }
260}
261
262impl Default for CommitIndex {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[derive(Debug)]
275pub struct WriteCoordinator {
276 mode: OperatingMode,
278 commit_seq_tip: CommitSeq,
280 last_commit_time_ns: u64,
282 commit_index: CommitIndex,
284 prev_marker_id: Option<ObjectId>,
286 batch: GroupCommitBatch,
288 shutting_down: bool,
290 epoch: u64,
292}
293
294impl WriteCoordinator {
295 #[must_use]
300 pub fn new(mode: OperatingMode, initial_seq: CommitSeq, group_commit_max: usize) -> Self {
301 info!(
302 mode = %mode,
303 initial_seq = initial_seq.get(),
304 group_commit_max,
305 "WriteCoordinator initialized"
306 );
307 Self {
308 mode,
309 commit_seq_tip: initial_seq,
310 last_commit_time_ns: 0,
311 commit_index: CommitIndex::new(),
312 prev_marker_id: None,
313 batch: GroupCommitBatch::new(group_commit_max),
314 shutting_down: false,
315 epoch: 0,
316 }
317 }
318
319 #[must_use]
321 pub const fn mode(&self) -> OperatingMode {
322 self.mode
323 }
324
325 #[must_use]
327 pub const fn commit_seq_tip(&self) -> CommitSeq {
328 self.commit_seq_tip
329 }
330
331 #[must_use]
333 pub fn pending_count(&self) -> usize {
334 self.batch.len()
335 }
336
337 #[must_use]
339 pub const fn current_epoch(&self) -> u64 {
340 self.epoch
341 }
342
343 pub fn initiate_shutdown(&mut self) {
345 self.shutting_down = true;
346 }
347
348 pub fn validate(&self, submission: &CommitSubmission) -> Result<(), CommitResult> {
356 if self.shutting_down {
357 GLOBAL_GROUP_COMMIT_METRICS.record_shutdown_rejection();
358 warn!(
359 phase = "validate",
360 "rejecting submission: coordinator shutting down"
361 );
362 return Err(CommitResult::ShuttingDown);
363 }
364
365 let conflicts = self
367 .commit_index
368 .check_conflicts(&submission.write_set_pages, submission.begin_seq);
369 if !conflicts.is_empty() {
370 GLOBAL_GROUP_COMMIT_METRICS.record_fcw_conflict();
371 debug!(
372 phase = "validate",
373 begin_seq = submission.begin_seq.get(),
374 conflict_count = conflicts.len(),
375 "FCW conflict detected"
376 );
377 return Err(CommitResult::ConflictFcw {
378 conflicting_pages: conflicts,
379 });
380 }
381
382 Ok(())
388 }
389
390 pub fn submit(
402 &mut self,
403 submission: CommitSubmission,
404 now_unix_ns: u64,
405 ) -> Result<CommitSeq, CommitResult> {
406 self.validate(&submission)?;
408
409 GLOBAL_GROUP_COMMIT_METRICS.record_submission();
410
411 let new_seq = self.commit_seq_tip.next();
413 let commit_time = now_unix_ns.max(self.last_commit_time_ns.wrapping_add(1));
414
415 let proof = CommitProof {
417 commit_seq: new_seq,
418 edges: Vec::new(), evidence_refs: submission.witness_refs.clone(),
420 };
421 let proof_object_id = Self::derive_proof_object_id(&proof);
422
423 self.commit_seq_tip = new_seq;
425 self.last_commit_time_ns = commit_time;
426 self.commit_index
427 .record_commit(&submission.write_set_pages, new_seq);
428
429 trace!(
430 target: "fsqlite_wal::native_commit",
431 phase = "submit",
432 commit_seq = new_seq.get(),
433 pages = submission.write_set_pages.len(),
434 begin_seq = submission.begin_seq.get(),
435 pending_batch = self.batch.len() + 1,
436 "allocated commit_seq"
437 );
438
439 self.batch.push(PendingCommit {
441 submission,
442 allocated_seq: new_seq,
443 allocated_time_ns: commit_time,
444 proof_object_id,
445 barriers: FsyncBarriers::new(),
446 });
447
448 Ok(new_seq)
449 }
450
451 pub fn fsync1(&mut self) -> usize {
459 let count = self.batch.len();
460 self.batch.mark_fsync1_complete();
461 GLOBAL_GROUP_COMMIT_METRICS.record_fsync1();
462 debug!(
463 target: "fsqlite_wal::native_commit",
464 phase = "fsync1",
465 batch_size = count,
466 "pre-marker fsync complete"
467 );
468 count
469 }
470
471 pub fn append_markers_and_fsync2(&mut self) -> Vec<CommitMarker> {
478 let mut markers = Vec::with_capacity(self.batch.pending.len());
479
480 for pc in &mut self.batch.pending {
481 if pc.barriers.fsync1_complete && !pc.barriers.fsync2_complete {
482 let marker = CommitMarker::new(
484 pc.allocated_seq,
485 pc.allocated_time_ns,
486 pc.submission.capsule_object_id,
487 pc.proof_object_id,
488 self.prev_marker_id,
489 );
490
491 let marker_bytes = marker.to_record_bytes();
493 let marker_oid = ObjectId::derive_from_canonical_bytes(&marker_bytes);
494 self.prev_marker_id = Some(marker_oid);
495
496 markers.push(marker);
497 }
498 }
499
500 self.batch.mark_fsync2_complete();
502 GLOBAL_GROUP_COMMIT_METRICS.record_fsync2();
503
504 debug!(
505 target: "fsqlite_wal::native_commit",
506 phase = "fsync2",
507 markers_appended = markers.len(),
508 "post-marker fsync complete"
509 );
510
511 markers
512 }
513
514 pub fn drain_committed(&mut self) -> Vec<CommitResult> {
519 let drained = self.batch.drain_committed();
520 let batch_size = drained.len();
521 let results: Vec<CommitResult> = drained
522 .into_iter()
523 .map(|(_, seq, time)| {
524 info!(
525 target: "fsqlite_wal::native_commit",
526 commit_seq = seq.get(),
527 durable = true,
528 "commit published"
529 );
530 CommitResult::Committed {
531 commit_seq: seq,
532 commit_time_unix_ns: time,
533 }
534 })
535 .collect();
536
537 if batch_size > 0 {
538 info!(
539 target: "fsqlite_wal::native_commit",
540 group_size = batch_size,
541 "parallel_wal_commit group drained"
542 );
543 }
544
545 results
546 }
547
548 pub fn flush_batch(&mut self) -> Vec<CommitResult> {
554 let group_size = self.batch.len();
555 if group_size == 0 {
556 return Vec::new();
557 }
558
559 let start = Instant::now();
560 self.epoch += 1;
561 let epoch = self.epoch;
562
563 let span = tracing::info_span!(
564 target: "fsqlite_wal::native_commit",
565 "parallel_wal_commit",
566 epoch,
567 group_size,
568 frames_in_batch = group_size,
569 );
570 let _guard = span.enter();
571
572 let fsync1_count = self.fsync1();
573 let markers = self.append_markers_and_fsync2();
574 let results = self.drain_committed();
575
576 #[allow(clippy::cast_possible_truncation)]
577 let latency_us = start.elapsed().as_micros() as u64;
578 GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(group_size as u64, latency_us);
579
580 info!(
581 target: "fsqlite_wal::native_commit",
582 epoch,
583 group_size,
584 fsync1_count,
585 markers_appended = markers.len(),
586 committed = results.len(),
587 latency_us,
588 "parallel_wal_commit complete"
589 );
590
591 results
592 }
593
594 pub fn submit_and_commit(
600 &mut self,
601 submission: CommitSubmission,
602 now_unix_ns: u64,
603 ) -> CommitResult {
604 match self.submit(submission, now_unix_ns) {
605 Ok(_seq) => {
606 let mut results = self.flush_batch();
607 results.pop().unwrap_or(CommitResult::ShuttingDown)
608 }
609 Err(result) => result,
610 }
611 }
612
613 fn derive_proof_object_id(proof: &CommitProof) -> ObjectId {
615 let mut canonical =
616 Vec::with_capacity(16 + 8 + proof.edges.len() * 16 + proof.evidence_refs.len() * 32);
617 canonical.extend_from_slice(b"fsqlite:proof:v1");
618 canonical.extend_from_slice(&proof.commit_seq.get().to_le_bytes());
619 for edge in &proof.edges {
620 canonical.extend_from_slice(&edge.from.get().to_le_bytes());
621 canonical.extend_from_slice(&edge.to.get().to_le_bytes());
622 }
623 for evidence in &proof.evidence_refs {
624 canonical.extend_from_slice(evidence.as_bytes());
625 }
626 ObjectId::derive_from_canonical_bytes(&canonical)
627 }
628}
629
630#[cfg(test)]
635mod tests {
636 use fsqlite_types::{CommitCapsule, TxnEpoch, TxnId};
637
638 use super::*;
639
640 fn make_oid(seed: u8) -> ObjectId {
641 ObjectId::from_bytes([seed; 16])
642 }
643
644 fn make_submission(pages: &[u32], begin_seq: u64, seed: u8) -> CommitSubmission {
645 let txn_id = TxnId::new(u64::from(seed) + 1).expect("valid txn id");
646 CommitSubmission {
647 capsule_object_id: make_oid(seed),
648 capsule_digest: [seed; 32],
649 write_set_pages: pages
650 .iter()
651 .map(|&p| PageNumber::new(p).expect("non-zero page"))
652 .collect(),
653 witness_refs: Vec::new(),
654 edge_ids: Vec::new(),
655 merge_witness_ids: Vec::new(),
656 txn_token: TxnToken::new(txn_id, TxnEpoch::new(1)),
657 begin_seq: CommitSeq::new(begin_seq),
658 }
659 }
660
661 fn group_commit_metrics_test_guard() -> std::sync::MutexGuard<'static, ()> {
662 crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
663 .lock()
664 .expect("global group commit metrics test lock poisoned")
665 }
666
667 #[test]
670 fn test_compat_mode_wal_format() {
671 let mode = OperatingMode::default();
673 assert_eq!(mode, OperatingMode::Compatibility);
674 assert!(!mode.is_native());
675 assert!(mode.legacy_readers_allowed());
676 assert_eq!(mode.to_string(), "compatibility");
677
678 assert_eq!(
680 OperatingMode::from_pragma("compatibility"),
681 Some(OperatingMode::Compatibility)
682 );
683 assert_eq!(
684 OperatingMode::from_pragma("compat"),
685 Some(OperatingMode::Compatibility)
686 );
687 assert_eq!(
688 OperatingMode::from_pragma("native"),
689 Some(OperatingMode::Native)
690 );
691 assert_eq!(
692 OperatingMode::from_pragma("NATIVE"),
693 Some(OperatingMode::Native)
694 );
695 assert!(OperatingMode::from_pragma("invalid").is_none());
696
697 let coord = WriteCoordinator::new(OperatingMode::Compatibility, CommitSeq::ZERO, 16);
699 assert_eq!(coord.mode(), OperatingMode::Compatibility);
700 assert_eq!(coord.commit_seq_tip(), CommitSeq::ZERO);
701 }
702
703 #[test]
706 fn test_native_mode_commit_capsule() {
707 let capsule = CommitCapsule {
711 object_id: make_oid(0x11),
712 snapshot_basis: CommitSeq::new(5),
713 intent_log: Vec::new(),
714 page_deltas: vec![
715 (PageNumber::new(10).unwrap(), vec![0xAA; 4096]),
716 (PageNumber::new(20).unwrap(), vec![0xBB; 4096]),
717 ],
718 read_set_digest: [0x01; 32],
719 write_set_digest: [0x02; 32],
720 read_witness_refs: vec![make_oid(0x30)],
721 write_witness_refs: vec![make_oid(0x31)],
722 dependency_edge_refs: Vec::new(),
723 merge_witness_refs: Vec::new(),
724 };
725
726 assert_eq!(capsule.snapshot_basis.get(), 5);
728 assert_eq!(capsule.page_deltas.len(), 2);
729 assert_eq!(capsule.read_witness_refs.len(), 1);
730
731 let submission = CommitSubmission {
733 capsule_object_id: capsule.object_id,
734 capsule_digest: [0xFF; 32],
735 write_set_pages: capsule.page_deltas.iter().map(|(pgno, _)| *pgno).collect(),
736 witness_refs: capsule.read_witness_refs.clone(),
737 edge_ids: Vec::new(),
738 merge_witness_ids: Vec::new(),
739 txn_token: TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(1)),
740 begin_seq: capsule.snapshot_basis,
741 };
742
743 assert_eq!(submission.capsule_object_id, capsule.object_id);
744 assert_eq!(submission.write_set_pages.len(), 2);
745 }
746
747 #[test]
750 fn test_native_marker_append() {
751 let marker = CommitMarker::new(
753 CommitSeq::new(42),
754 1_700_000_000_000_000_000,
755 make_oid(0x11),
756 make_oid(0x22),
757 Some(make_oid(0x33)),
758 );
759
760 let bytes = marker.to_record_bytes();
762 assert_eq!(
763 bytes.len(),
764 fsqlite_types::COMMIT_MARKER_RECORD_V1_SIZE,
765 "marker record must be exactly 88 bytes"
766 );
767
768 let recovered =
770 CommitMarker::from_record_bytes(&bytes).expect("marker roundtrip must succeed");
771 assert_eq!(recovered.commit_seq, marker.commit_seq);
772 assert_eq!(recovered.commit_time_unix_ns, marker.commit_time_unix_ns);
773 assert_eq!(recovered.capsule_object_id, marker.capsule_object_id);
774 assert_eq!(recovered.proof_object_id, marker.proof_object_id);
775 assert_eq!(recovered.prev_marker, marker.prev_marker);
776 assert_eq!(recovered.integrity_hash, marker.integrity_hash);
777
778 assert!(marker.verify_integrity());
780
781 let genesis = CommitMarker::new(
783 CommitSeq::new(1),
784 1_700_000_000_000_000_000,
785 make_oid(0x01),
786 make_oid(0x02),
787 None,
788 );
789 assert!(genesis.prev_marker.is_none());
790 assert!(genesis.verify_integrity());
791 let genesis_bytes = genesis.to_record_bytes();
792 let genesis_recovered = CommitMarker::from_record_bytes(&genesis_bytes).unwrap();
793 assert!(genesis_recovered.prev_marker.is_none());
794 }
795
796 #[test]
799 fn test_native_group_commit() {
800 let _metrics_guard = group_commit_metrics_test_guard();
801 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
803
804 let base_time = 1_700_000_000_000_000_000_u64;
806 for i in 0..5u8 {
807 let pages = &[u32::from(i) * 10 + 1]; let sub = make_submission(pages, 0, i);
809 let seq = coord.submit(sub, base_time + u64::from(i)).unwrap();
810 assert_eq!(seq.get(), u64::from(i) + 1);
811 }
812
813 assert_eq!(coord.pending_count(), 5);
815
816 let fsync1_count = coord.fsync1();
818 assert_eq!(fsync1_count, 5);
819
820 let markers = coord.append_markers_and_fsync2();
822 assert_eq!(markers.len(), 5);
823
824 assert!(markers[0].prev_marker.is_none()); for (i, marker) in markers.iter().enumerate().skip(1) {
827 assert!(
828 marker.prev_marker.is_some(),
829 "marker {i} should link to previous"
830 );
831 }
832
833 let results = coord.drain_committed();
835 assert_eq!(results.len(), 5);
836 for (i, result) in results.iter().enumerate() {
837 match result {
838 CommitResult::Committed { commit_seq, .. } => {
839 assert_eq!(commit_seq.get(), (i as u64) + 1);
840 }
841 other => unreachable!("expected Committed, got {other:?}"),
842 }
843 }
844
845 assert_eq!(coord.pending_count(), 0);
846 }
847
848 #[test]
851 fn test_native_crash_recovery() {
852 let _metrics_guard = group_commit_metrics_test_guard();
853 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
855
856 let sub1 = make_submission(&[1], 0, 1);
858 let seq1 = coord.submit(sub1, 1_000_000).unwrap();
859 assert_eq!(seq1.get(), 1);
860 let mut coord = WriteCoordinator::new(
867 OperatingMode::Native,
868 CommitSeq::ZERO, 16,
870 );
871
872 let sub2 = make_submission(&[2], 0, 2);
874 let _seq2 = coord.submit(sub2, 2_000_000).unwrap();
875 let _fsync1 = coord.fsync1();
876 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
882
883 let sub3 = make_submission(&[3], 0, 3);
885 let result = coord.submit_and_commit(sub3, 3_000_000);
886 assert!(
887 matches!(result, CommitResult::Committed { commit_seq, .. } if commit_seq.get() == 1),
888 "complete commit should succeed"
889 );
890
891 assert_eq!(coord.commit_seq_tip().get(), 1);
893 }
894
895 #[test]
898 fn test_native_concurrent_writers() {
899 let _metrics_guard = group_commit_metrics_test_guard();
900 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
902
903 let base_time = 1_700_000_000_000_000_000_u64;
905 for i in 0..10u8 {
906 let page = u32::from(i) + 1; let sub = make_submission(&[page], 0, i);
908 let seq = coord.submit(sub, base_time + u64::from(i)).unwrap();
909 assert_eq!(
910 seq.get(),
911 u64::from(i) + 1,
912 "writer {i} should get sequential commit_seq"
913 );
914 }
915
916 assert_eq!(coord.pending_count(), 10);
918
919 coord.fsync1();
921 let markers = coord.append_markers_and_fsync2();
922 assert_eq!(markers.len(), 10);
923 let results = coord.drain_committed();
924 assert_eq!(results.len(), 10);
925
926 let conflicting = make_submission(&[5], 0, 11);
928 let result = coord.submit(conflicting, base_time + 100);
929 assert!(
930 matches!(result, Err(CommitResult::ConflictFcw { .. })),
931 "overlapping page should trigger FCW conflict"
932 );
933
934 let non_conflicting = make_submission(&[5], 5, 12);
936 let result = coord.submit(non_conflicting, base_time + 200);
937 assert!(
938 result.is_ok(),
939 "writer with updated begin_seq should not conflict"
940 );
941 }
942
943 #[test]
946 fn test_coordinator_shutdown_rejects_submissions() {
947 let _metrics_guard = group_commit_metrics_test_guard();
948 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
949
950 coord.initiate_shutdown();
951
952 let sub = make_submission(&[1], 0, 1);
953 let result = coord.submit(sub, 1_000_000);
954 assert!(matches!(result, Err(CommitResult::ShuttingDown)));
955 }
956
957 #[test]
958 fn test_commit_seq_gap_free() {
959 let _metrics_guard = group_commit_metrics_test_guard();
960 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::new(100), 16);
961
962 for i in 0..5u8 {
963 let sub = make_submission(&[u32::from(i) + 1], 100, i);
964 let seq = coord.submit(sub, 1_000_000 + u64::from(i)).unwrap();
965 assert_eq!(seq.get(), 101 + u64::from(i), "commit_seq must be gap-free");
966 }
967 }
968
969 #[test]
970 fn test_commit_time_monotonic() {
971 let _metrics_guard = group_commit_metrics_test_guard();
972 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
973
974 let sub1 = make_submission(&[1], 0, 1);
976 let _seq1 = coord.submit(sub1, 1_000_000).unwrap();
977
978 let sub2 = make_submission(&[2], 0, 2);
979 let _seq2 = coord.submit(sub2, 500_000).unwrap(); coord.fsync1();
983 coord.append_markers_and_fsync2();
984 let results = coord.drain_committed();
985
986 let times: Vec<u64> = results
987 .iter()
988 .filter_map(|r| {
989 if let CommitResult::Committed {
990 commit_time_unix_ns,
991 ..
992 } = r
993 {
994 Some(*commit_time_unix_ns)
995 } else {
996 None
997 }
998 })
999 .collect();
1000
1001 assert_eq!(times.len(), 2);
1002 assert!(
1003 times[0] < times[1],
1004 "commit times must be monotonically increasing: {times:?}"
1005 );
1006 }
1007
1008 #[test]
1009 fn test_marker_integrity_tamper_detection() {
1010 let marker = CommitMarker::new(
1011 CommitSeq::new(1),
1012 1_000_000,
1013 make_oid(0x11),
1014 make_oid(0x22),
1015 None,
1016 );
1017 assert!(marker.verify_integrity());
1018
1019 let mut tampered = marker;
1021 tampered.commit_seq = CommitSeq::new(999);
1022 assert!(!tampered.verify_integrity());
1023 }
1024
1025 #[test]
1026 fn test_fsync_barriers_order() {
1027 let mut barriers = FsyncBarriers::new();
1028 assert!(!barriers.all_complete());
1029
1030 barriers.fsync1_complete = true;
1031 assert!(!barriers.all_complete());
1032
1033 barriers.fsync2_complete = true;
1034 assert!(barriers.all_complete());
1035 }
1036
1037 #[test]
1040 fn test_group_commit_metrics_basic() {
1041 use crate::metrics::GroupCommitMetrics;
1042 let m = GroupCommitMetrics::new();
1043
1044 m.record_submission();
1045 m.record_submission();
1046 m.record_submission();
1047 m.record_group_commit(3, 500);
1048 m.record_fsync1();
1049 m.record_fsync2();
1050 m.record_fcw_conflict();
1051 m.record_ssi_conflict();
1052 m.record_shutdown_rejection();
1053
1054 let snap = m.snapshot();
1055 assert_eq!(snap.submissions_total, 3);
1056 assert_eq!(snap.group_commits_total, 1);
1057 assert_eq!(snap.group_commit_size_sum, 3);
1058 assert_eq!(snap.commit_latency_us_total, 500);
1059 assert_eq!(snap.fsync1_total, 1);
1060 assert_eq!(snap.fsync2_total, 1);
1061 assert_eq!(snap.fcw_conflicts_total, 1);
1062 assert_eq!(snap.ssi_conflicts_total, 1);
1063 assert_eq!(snap.shutdown_rejections_total, 1);
1064 assert_eq!(snap.avg_group_size(), 3);
1065 assert_eq!(snap.avg_commit_latency_us(), 500);
1066 }
1067
1068 #[test]
1069 fn test_group_commit_metrics_reset() {
1070 use crate::metrics::GroupCommitMetrics;
1071 let m = GroupCommitMetrics::new();
1072 m.record_submission();
1073 m.record_group_commit(1, 100);
1074 m.record_fsync1();
1075 m.record_fsync2();
1076 m.record_fcw_conflict();
1077 m.reset();
1078 let snap = m.snapshot();
1079 assert_eq!(snap.submissions_total, 0);
1080 assert_eq!(snap.group_commits_total, 0);
1081 assert_eq!(snap.fsync1_total, 0);
1082 assert_eq!(snap.fsync2_total, 0);
1083 assert_eq!(snap.fcw_conflicts_total, 0);
1084 }
1085
1086 #[test]
1087 fn test_group_commit_metrics_display() {
1088 use crate::metrics::GroupCommitMetrics;
1089 let m = GroupCommitMetrics::new();
1090 m.record_submission();
1091 m.record_group_commit(1, 200);
1092 m.record_fsync1();
1093 m.record_fsync2();
1094 let s = m.snapshot().to_string();
1095 assert!(s.contains("group_commits=1"));
1096 assert!(s.contains("submissions=1"));
1097 assert!(s.contains("fsync1=1"));
1098 assert!(s.contains("fsync2=1"));
1099 }
1100
1101 #[test]
1102 fn test_group_commit_metrics_avg_zero() {
1103 use crate::metrics::GroupCommitMetrics;
1104 let m = GroupCommitMetrics::new();
1105 let snap = m.snapshot();
1106 assert_eq!(snap.avg_group_size(), 0);
1107 assert_eq!(snap.avg_commit_latency_us(), 0);
1108 assert_eq!(snap.fsync_reduction_ratio(), 0);
1109 }
1110
1111 #[test]
1117 fn test_fsync_reduction_proof_deterministic() {
1118 let _metrics_guard = group_commit_metrics_test_guard();
1119 use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1120
1121 GLOBAL_GROUP_COMMIT_METRICS.reset();
1123
1124 let n = 10_u8; let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
1126
1127 let base_time = 1_700_000_000_000_000_000_u64;
1128
1129 for i in 0..n {
1131 let page = u32::from(i) + 1;
1132 let sub = make_submission(&[page], 0, i);
1133 coord.submit(sub, base_time + u64::from(i)).unwrap();
1134 }
1135
1136 coord.fsync1();
1138 coord.append_markers_and_fsync2();
1139 let results = coord.drain_committed();
1140 assert_eq!(results.len(), usize::from(n));
1141
1142 GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(u64::from(n), 0);
1144
1145 let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1146
1147 assert_eq!(snap.submissions_total, u64::from(n));
1149 assert_eq!(snap.fsync1_total, 1, "only 1 FSYNC_1 for entire batch");
1150 assert_eq!(snap.fsync2_total, 1, "only 1 FSYNC_2 for entire batch");
1151
1152 let unbatched_fsyncs = u64::from(n) * 2;
1154 let batched_fsyncs = snap.fsync1_total + snap.fsync2_total;
1155 let reduction = unbatched_fsyncs / batched_fsyncs;
1156
1157 assert!(
1158 reduction >= 5,
1159 "group commit must achieve >=5x fsync reduction: \
1160 {n} commits, unbatched={unbatched_fsyncs} fsyncs, \
1161 batched={batched_fsyncs} fsyncs, reduction={reduction}x"
1162 );
1163
1164 assert!(
1166 snap.fsync_reduction_ratio() >= 5,
1167 "fsync_reduction_ratio must be >= 5: got {}",
1168 snap.fsync_reduction_ratio()
1169 );
1170 }
1171
1172 #[test]
1174 fn test_submit_and_commit_records_metrics() {
1175 let _metrics_guard = group_commit_metrics_test_guard();
1176 use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1177
1178 GLOBAL_GROUP_COMMIT_METRICS.reset();
1179
1180 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1181 let sub = make_submission(&[1], 0, 1);
1182 let result = coord.submit_and_commit(sub, 1_000_000);
1183 assert!(matches!(result, CommitResult::Committed { .. }));
1184
1185 let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1186 assert_eq!(snap.submissions_total, 1);
1187 assert_eq!(snap.group_commits_total, 1);
1188 assert_eq!(snap.group_commit_size_sum, 1);
1189 assert_eq!(snap.fsync1_total, 1);
1190 assert_eq!(snap.fsync2_total, 1);
1191 }
1192
1193 #[test]
1195 fn test_fcw_conflict_metric() {
1196 let _metrics_guard = group_commit_metrics_test_guard();
1197 use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1198
1199 GLOBAL_GROUP_COMMIT_METRICS.reset();
1200
1201 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1202
1203 let sub1 = make_submission(&[1], 0, 1);
1205 coord.submit_and_commit(sub1, 1_000_000);
1206
1207 let sub2 = make_submission(&[1], 0, 2);
1209 let result = coord.submit(sub2, 2_000_000);
1210 assert!(matches!(result, Err(CommitResult::ConflictFcw { .. })));
1211
1212 let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1213 assert_eq!(snap.fcw_conflicts_total, 1);
1214 }
1215
1216 #[test]
1218 fn test_shutdown_rejection_metric() {
1219 let _metrics_guard = group_commit_metrics_test_guard();
1220 use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1221
1222 GLOBAL_GROUP_COMMIT_METRICS.reset();
1223
1224 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1225 coord.initiate_shutdown();
1226
1227 let sub = make_submission(&[1], 0, 1);
1228 let result = coord.submit(sub, 1_000_000);
1229 assert!(matches!(result, Err(CommitResult::ShuttingDown)));
1230
1231 let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1232 assert_eq!(snap.shutdown_rejections_total, 1);
1233 }
1234
1235 #[test]
1239 fn test_flush_batch_epoch_tracking() {
1240 let _metrics_guard = group_commit_metrics_test_guard();
1241 use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1242
1243 GLOBAL_GROUP_COMMIT_METRICS.reset();
1244
1245 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
1246 assert_eq!(coord.current_epoch(), 0);
1247
1248 let base_time = 1_700_000_000_000_000_000_u64;
1250 for i in 0..3u8 {
1251 let sub = make_submission(&[u32::from(i) + 1], 0, i);
1252 coord.submit(sub, base_time + u64::from(i)).unwrap();
1253 }
1254 let results = coord.flush_batch();
1255 assert_eq!(results.len(), 3);
1256 assert_eq!(coord.current_epoch(), 1);
1257
1258 for i in 3..5u8 {
1260 let sub = make_submission(&[u32::from(i) + 10], 3, i);
1261 coord.submit(sub, base_time + 100 + u64::from(i)).unwrap();
1262 }
1263 let results = coord.flush_batch();
1264 assert_eq!(results.len(), 2);
1265 assert_eq!(coord.current_epoch(), 2);
1266
1267 let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1268 assert_eq!(snap.group_commits_total, 2);
1269 assert_eq!(snap.group_commit_size_sum, 5); assert_eq!(snap.submissions_total, 5);
1271 assert_eq!(snap.fsync1_total, 2);
1272 assert_eq!(snap.fsync2_total, 2);
1273 }
1274
1275 #[test]
1277 fn test_flush_batch_empty_noop() {
1278 let _metrics_guard = group_commit_metrics_test_guard();
1279 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1280 let results = coord.flush_batch();
1281 assert!(results.is_empty());
1282 assert_eq!(coord.current_epoch(), 0); }
1284
1285 #[test]
1287 fn test_submit_and_commit_uses_flush_batch_epoch() {
1288 let _metrics_guard = group_commit_metrics_test_guard();
1289 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1290 let sub = make_submission(&[1], 0, 1);
1291 let result = coord.submit_and_commit(sub, 1_000_000);
1292 assert!(matches!(result, CommitResult::Committed { .. }));
1293 assert_eq!(coord.current_epoch(), 1);
1294 }
1295
1296 #[test]
1297 fn test_group_commit_batch_accessors() {
1298 let batch = GroupCommitBatch::new(4);
1299 assert!(batch.is_empty());
1300 assert_eq!(batch.len(), 0);
1301 assert!(!batch.is_full());
1302 }
1303
1304 #[test]
1305 fn test_fsync_barriers_default_equals_new() {
1306 let a = FsyncBarriers::new();
1307 let b = FsyncBarriers::default();
1308 assert_eq!(a.fsync1_complete, b.fsync1_complete);
1309 assert_eq!(a.fsync2_complete, b.fsync2_complete);
1310 assert!(!a.all_complete());
1311 }
1312
1313 #[test]
1314 fn test_commit_index_record_and_conflict_check() {
1315 let mut idx = CommitIndex::new();
1316 let p1 = PageNumber::new(1).unwrap();
1317 let p2 = PageNumber::new(2).unwrap();
1318 let p3 = PageNumber::new(3).unwrap();
1319
1320 idx.record_commit(&[p1, p2], CommitSeq::new(5));
1321 idx.record_commit(&[p2, p3], CommitSeq::new(10));
1322
1323 let conflicts = idx.check_conflicts(&[p1, p2, p3], CommitSeq::new(7));
1324 assert!(conflicts.contains(&p2), "p2 modified at seq 10 > 7");
1325 assert!(conflicts.contains(&p3), "p3 modified at seq 10 > 7");
1326 assert!(!conflicts.contains(&p1), "p1 last modified at seq 5 <= 7");
1327
1328 assert!(
1329 idx.check_conflicts(&[p1, p2], CommitSeq::new(10))
1330 .is_empty()
1331 );
1332 }
1333
1334 #[test]
1335 fn commit_result_debug_clone_eq_all_variants() {
1336 let committed = CommitResult::Committed {
1337 commit_seq: CommitSeq::new(1),
1338 commit_time_unix_ns: 42,
1339 };
1340 let dbg = format!("{committed:?}");
1341 assert!(dbg.contains("Committed"));
1342 assert_eq!(committed.clone(), committed);
1343
1344 let fcw = CommitResult::ConflictFcw {
1345 conflicting_pages: vec![PageNumber::new(5).unwrap()],
1346 };
1347 assert_eq!(fcw.clone(), fcw);
1348 assert_ne!(fcw, committed);
1349
1350 let ssi = CommitResult::ConflictSsi;
1351 assert_eq!(ssi.clone(), ssi);
1352
1353 let shutdown = CommitResult::ShuttingDown;
1354 assert_eq!(shutdown.clone(), shutdown);
1355 assert_ne!(ssi, shutdown);
1356 }
1357
1358 #[test]
1359 fn commit_submission_debug_and_clone() {
1360 let sub = make_submission(&[1, 2], 5, 7);
1361 let dbg = format!("{sub:?}");
1362 assert!(dbg.contains("CommitSubmission"));
1363 let cloned = sub.clone();
1364 assert_eq!(cloned.write_set_pages.len(), 2);
1365 assert_eq!(cloned.begin_seq, CommitSeq::new(5));
1366 assert_eq!(cloned.capsule_digest, [7u8; 32]);
1367 }
1368
1369 #[test]
1370 fn fsync_barriers_debug_clone_copy() {
1371 let mut b = FsyncBarriers::new();
1372 b.fsync1_complete = true;
1373 let dbg = format!("{b:?}");
1374 assert!(dbg.contains("FsyncBarriers"));
1375 let copied = b;
1376 assert_eq!(copied, b);
1377 assert!(copied.fsync1_complete);
1378 assert!(!copied.fsync2_complete);
1379 }
1380
1381 #[test]
1382 fn group_commit_batch_is_full_at_max() {
1383 let _metrics_guard = group_commit_metrics_test_guard();
1384 let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 3);
1385 let base = 1_000_000_u64;
1386 for i in 0..3u8 {
1387 let sub = make_submission(&[u32::from(i) + 1], 0, i);
1388 coord.submit(sub, base + u64::from(i)).unwrap();
1389 }
1390 assert!(coord.batch.is_full());
1391 assert_eq!(coord.batch.len(), 3);
1392 }
1393
1394 #[test]
1395 fn test_commit_index_default_equals_new() {
1396 let a = CommitIndex::new();
1397 let b = CommitIndex::default();
1398 assert!(
1399 a.check_conflicts(&[PageNumber::new(1).unwrap()], CommitSeq::ZERO)
1400 .is_empty()
1401 );
1402 assert!(
1403 b.check_conflicts(&[PageNumber::new(1).unwrap()], CommitSeq::ZERO)
1404 .is_empty()
1405 );
1406 }
1407
1408 #[test]
1409 fn commit_submission_debug_clone() {
1410 let sub = make_submission(&[1, 2, 3], 0, 0xAA);
1411 let cloned = sub.clone();
1412 assert_eq!(cloned.capsule_digest, sub.capsule_digest);
1413 assert_eq!(cloned.write_set_pages.len(), 3);
1414 assert_eq!(cloned.begin_seq, CommitSeq::new(0));
1415 let dbg = format!("{sub:?}");
1416 assert!(dbg.contains("CommitSubmission"));
1417 }
1418
1419 #[test]
1420 fn commit_result_all_variants_debug_clone_eq() {
1421 let committed = CommitResult::Committed {
1422 commit_seq: CommitSeq::new(5),
1423 commit_time_unix_ns: 1_000,
1424 };
1425 let fcw = CommitResult::ConflictFcw {
1426 conflicting_pages: vec![PageNumber::new(1).unwrap()],
1427 };
1428 let ssi = CommitResult::ConflictSsi;
1429 let shutdown = CommitResult::ShuttingDown;
1430 assert_eq!(committed.clone(), committed);
1431 assert_eq!(fcw.clone(), fcw);
1432 assert_eq!(ssi.clone(), ssi);
1433 assert_eq!(shutdown.clone(), shutdown);
1434 assert_ne!(committed, fcw);
1435 assert_ne!(ssi, shutdown);
1436 let dbg = format!("{committed:?}");
1437 assert!(dbg.contains("Committed"));
1438 }
1439
1440 #[test]
1441 fn write_coordinator_accessors_on_fresh_instance() {
1442 let coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::new(10), 8);
1443 assert_eq!(coord.mode(), OperatingMode::Native);
1444 assert_eq!(coord.commit_seq_tip(), CommitSeq::new(10));
1445 assert_eq!(coord.pending_count(), 0);
1446 assert_eq!(coord.current_epoch(), 0);
1447 }
1448
1449 #[test]
1450 fn group_commit_batch_empty_boundary() {
1451 let batch = GroupCommitBatch::new(4);
1452 assert_eq!(batch.len(), 0);
1453 assert!(batch.is_empty());
1454 assert!(!batch.is_full());
1455 }
1456}