Skip to main content

fsqlite_wal/
native_commit.rs

1//! Native mode commit protocol (§7.11).
2//!
3//! Decouples bulk durability (payload bytes) from ordering (marker stream).
4//! Writers persist `CommitCapsule` payloads concurrently via RaptorQ encoding,
5//! then submit to the [`WriteCoordinator`] which serializes only:
6//! validation + `commit_seq` allocation + [`CommitMarker`] append.
7//!
8//! Critical ordering (two fsync barriers, normative):
9//! ```text
10//! capsule symbols [written, not fsynced]
11//!   → CommitProof
12//!     → FSYNC_1 (pre-marker, group commit point)
13//!       → CommitMarker
14//!         → FSYNC_2 (post-marker)
15//!           → SHM publish
16//!             → client response
17//! ```
18//!
19//! Both fsyncs are mandatory:
20//! - FSYNC_1 prevents "committed marker, lost data" (worst case).
21//! - FSYNC_2 prevents "client thinks committed, marker not persisted."
22
23use fsqlite_types::sync_primitives::Instant;
24use std::collections::VecDeque;
25
26use fsqlite_types::{
27    CommitMarker, CommitProof, CommitSeq, ObjectId, OperatingMode, PageNumber, TxnToken,
28};
29use tracing::{debug, info, trace, warn};
30
31use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
32
33// ---------------------------------------------------------------------------
34// §7.11.1 Writer submission request
35// ---------------------------------------------------------------------------
36
37/// A commit submission from a writer to the [`WriteCoordinator`] (§7.11.1 step 7).
38///
39/// Contains everything the coordinator needs to validate and commit the
40/// transaction without decoding the full capsule.
41#[derive(Debug, Clone)]
42pub struct CommitSubmission {
43    /// Content-addressed identity of the persisted capsule.
44    pub capsule_object_id: ObjectId,
45    /// BLAKE3 digest of the capsule bytes (for coordinator validation).
46    pub capsule_digest: [u8; 32],
47    /// Page numbers touched by this transaction (no false negatives).
48    pub write_set_pages: Vec<PageNumber>,
49    /// SSI witness evidence object refs.
50    pub witness_refs: Vec<ObjectId>,
51    /// SSI dependency edge object refs.
52    pub edge_ids: Vec<ObjectId>,
53    /// SSI merge witness object refs.
54    pub merge_witness_ids: Vec<ObjectId>,
55    /// Transaction identity.
56    pub txn_token: TxnToken,
57    /// Snapshot basis (the commit_seq at BEGIN time).
58    pub begin_seq: CommitSeq,
59}
60
61// ---------------------------------------------------------------------------
62// §7.11.2 Commit result
63// ---------------------------------------------------------------------------
64
65/// Result of a commit attempt.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum CommitResult {
68    /// Successfully committed at the given sequence.
69    Committed {
70        commit_seq: CommitSeq,
71        commit_time_unix_ns: u64,
72    },
73    /// Commit rejected: first-committer-wins conflict on these pages.
74    ConflictFcw { conflicting_pages: Vec<PageNumber> },
75    /// Commit rejected: SSI dangerous structure detected.
76    ConflictSsi,
77    /// Commit rejected: coordinator is shutting down.
78    ShuttingDown,
79}
80
81// ---------------------------------------------------------------------------
82// §7.11.2 Fsync barrier tracking
83// ---------------------------------------------------------------------------
84
85/// Tracks which fsync barriers have been completed for a commit.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct FsyncBarriers {
88    /// FSYNC_1: capsule symbols + CommitProof durable before marker.
89    pub fsync1_complete: bool,
90    /// FSYNC_2: CommitMarker stream durable before client response.
91    pub fsync2_complete: bool,
92}
93
94impl FsyncBarriers {
95    /// No barriers completed yet.
96    #[must_use]
97    pub const fn new() -> Self {
98        Self {
99            fsync1_complete: false,
100            fsync2_complete: false,
101        }
102    }
103
104    /// Both barriers completed — safe to respond to client.
105    #[must_use]
106    pub const fn all_complete(self) -> bool {
107        self.fsync1_complete && self.fsync2_complete
108    }
109}
110
111impl Default for FsyncBarriers {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117// ---------------------------------------------------------------------------
118// §7.11.2 Group commit batch
119// ---------------------------------------------------------------------------
120
121/// A batch of pending commits that will share a single fsync.
122///
123/// Group commit amortizes the cost of fsync across multiple writers.
124/// The coordinator accumulates submissions until either the batch is full
125/// or a timeout expires, then issues a single fsync covering all of them.
126#[derive(Debug)]
127pub struct GroupCommitBatch {
128    /// Pending submissions awaiting commit.
129    pending: VecDeque<PendingCommit>,
130    /// Maximum batch size before forced flush.
131    max_batch_size: usize,
132}
133
134/// A submission plus its allocated commit-seq and proof, awaiting fsync.
135#[derive(Debug)]
136struct PendingCommit {
137    submission: CommitSubmission,
138    allocated_seq: CommitSeq,
139    allocated_time_ns: u64,
140    proof_object_id: ObjectId,
141    barriers: FsyncBarriers,
142}
143
144impl GroupCommitBatch {
145    /// Create a new batch with the given maximum size.
146    #[must_use]
147    pub fn new(max_batch_size: usize) -> Self {
148        Self {
149            pending: VecDeque::with_capacity(max_batch_size),
150            max_batch_size,
151        }
152    }
153
154    /// Number of pending commits in the batch.
155    #[must_use]
156    pub fn len(&self) -> usize {
157        self.pending.len()
158    }
159
160    /// Whether the batch is empty.
161    #[must_use]
162    pub fn is_empty(&self) -> bool {
163        self.pending.is_empty()
164    }
165
166    /// Whether the batch is full and should be flushed.
167    #[must_use]
168    pub fn is_full(&self) -> bool {
169        self.pending.len() >= self.max_batch_size
170    }
171
172    /// Add a pending commit to the batch.
173    fn push(&mut self, pending: PendingCommit) {
174        self.pending.push_back(pending);
175    }
176
177    /// Mark FSYNC_1 complete for all pending commits.
178    fn mark_fsync1_complete(&mut self) {
179        for pc in &mut self.pending {
180            pc.barriers.fsync1_complete = true;
181        }
182    }
183
184    /// Mark FSYNC_2 complete for all pending commits.
185    fn mark_fsync2_complete(&mut self) {
186        for pc in &mut self.pending {
187            pc.barriers.fsync2_complete = true;
188        }
189    }
190
191    /// Drain all fully-committed entries (both fsyncs complete).
192    fn drain_committed(&mut self) -> Vec<(CommitSubmission, CommitSeq, u64)> {
193        let mut committed = Vec::with_capacity(self.pending.len());
194        while let Some(front) = self.pending.front() {
195            if front.barriers.all_complete() {
196                let pc = self.pending.pop_front().expect("checked non-empty");
197                committed.push((pc.submission, pc.allocated_seq, pc.allocated_time_ns));
198            } else {
199                break;
200            }
201        }
202        committed
203    }
204}
205
206// ---------------------------------------------------------------------------
207// §7.11.2 Write Coordinator
208// ---------------------------------------------------------------------------
209
210/// Commit index: tracks which pages have been modified by recent commits.
211///
212/// Used for first-committer-wins (FCW) validation. Maps page numbers to
213/// the latest commit_seq that modified them.
214#[derive(Debug, Clone)]
215pub struct CommitIndex {
216    /// Page -> latest commit_seq that modified it.
217    entries: std::collections::HashMap<PageNumber, CommitSeq>,
218}
219
220impl CommitIndex {
221    /// Create an empty commit index.
222    #[must_use]
223    pub fn new() -> Self {
224        Self {
225            entries: std::collections::HashMap::new(),
226        }
227    }
228
229    /// Record that `pages` were modified at `seq`.
230    pub fn record_commit(&mut self, pages: &[PageNumber], seq: CommitSeq) {
231        for &page in pages {
232            self.entries
233                .entry(page)
234                .and_modify(|existing| {
235                    if seq > *existing {
236                        *existing = seq;
237                    }
238                })
239                .or_insert(seq);
240        }
241    }
242
243    /// Check for FCW conflicts: returns pages modified after `begin_seq`.
244    #[must_use]
245    pub fn check_conflicts(
246        &self,
247        write_set: &[PageNumber],
248        begin_seq: CommitSeq,
249    ) -> Vec<PageNumber> {
250        write_set
251            .iter()
252            .filter(|page| {
253                self.entries
254                    .get(page)
255                    .is_some_and(|&latest| latest > begin_seq)
256            })
257            .copied()
258            .collect()
259    }
260}
261
262impl Default for CommitIndex {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268/// The `WriteCoordinator` serializes the validation + commit_seq allocation +
269/// marker append section of the native commit protocol (§7.11.2).
270///
271/// The serialized section (steps 2-8) MUST NOT observe cancellation once
272/// `commit_seq` is allocated. The coordinator MUST NOT write page payloads
273/// in the serialized section — only marker + proof.
274#[derive(Debug)]
275pub struct WriteCoordinator {
276    /// Current operating mode.
277    mode: OperatingMode,
278    /// Monotonic commit sequence tip.
279    commit_seq_tip: CommitSeq,
280    /// Last assigned commit time (monotonic non-decreasing).
281    last_commit_time_ns: u64,
282    /// Commit index for FCW validation.
283    commit_index: CommitIndex,
284    /// Marker chain: previous marker ObjectId.
285    prev_marker_id: Option<ObjectId>,
286    /// Group commit batch.
287    batch: GroupCommitBatch,
288    /// Whether the coordinator is shutting down.
289    shutting_down: bool,
290    /// Monotonic epoch counter: incremented once per group commit flush.
291    epoch: u64,
292}
293
294impl WriteCoordinator {
295    /// Create a new coordinator for the given mode.
296    ///
297    /// `initial_seq` is the highest committed sequence from recovery.
298    /// `group_commit_max` is the maximum batch size for group commit.
299    #[must_use]
300    pub fn new(mode: OperatingMode, initial_seq: CommitSeq, group_commit_max: usize) -> Self {
301        info!(
302            mode = %mode,
303            initial_seq = initial_seq.get(),
304            group_commit_max,
305            "WriteCoordinator initialized"
306        );
307        Self {
308            mode,
309            commit_seq_tip: initial_seq,
310            last_commit_time_ns: 0,
311            commit_index: CommitIndex::new(),
312            prev_marker_id: None,
313            batch: GroupCommitBatch::new(group_commit_max),
314            shutting_down: false,
315            epoch: 0,
316        }
317    }
318
319    /// The current operating mode.
320    #[must_use]
321    pub const fn mode(&self) -> OperatingMode {
322        self.mode
323    }
324
325    /// The current commit sequence tip (highest committed).
326    #[must_use]
327    pub const fn commit_seq_tip(&self) -> CommitSeq {
328        self.commit_seq_tip
329    }
330
331    /// Number of pending commits in the current group batch.
332    #[must_use]
333    pub fn pending_count(&self) -> usize {
334        self.batch.len()
335    }
336
337    /// The current epoch (incremented once per group commit flush).
338    #[must_use]
339    pub const fn current_epoch(&self) -> u64 {
340        self.epoch
341    }
342
343    /// Initiate shutdown: new submissions will be rejected.
344    pub fn initiate_shutdown(&mut self) {
345        self.shutting_down = true;
346    }
347
348    /// Step 1: Validate a submission (FCW + SSI re-validation).
349    ///
350    /// Returns `Ok(())` if validation passes, or `Err(CommitResult)` with
351    /// the rejection reason.
352    ///
353    /// The coordinator MUST NOT decode the entire capsule here — only check
354    /// the write-set summary against the commit index.
355    pub fn validate(&self, submission: &CommitSubmission) -> Result<(), CommitResult> {
356        if self.shutting_down {
357            GLOBAL_GROUP_COMMIT_METRICS.record_shutdown_rejection();
358            warn!(
359                phase = "validate",
360                "rejecting submission: coordinator shutting down"
361            );
362            return Err(CommitResult::ShuttingDown);
363        }
364
365        // FCW validation: check for page conflicts since begin_seq.
366        let conflicts = self
367            .commit_index
368            .check_conflicts(&submission.write_set_pages, submission.begin_seq);
369        if !conflicts.is_empty() {
370            GLOBAL_GROUP_COMMIT_METRICS.record_fcw_conflict();
371            debug!(
372                phase = "validate",
373                begin_seq = submission.begin_seq.get(),
374                conflict_count = conflicts.len(),
375                "FCW conflict detected"
376            );
377            return Err(CommitResult::ConflictFcw {
378                conflicting_pages: conflicts,
379            });
380        }
381
382        // SSI re-validation would check for dangerous structure here.
383        // For now, we accept (the writer already validated locally in step 2).
384        // Full SSI re-validation is deferred to the SSI witness plane
385        // implementation (bd-3t3.9.*).
386
387        Ok(())
388    }
389
390    /// Steps 2-8: Process a validated submission through the serialized section.
391    ///
392    /// This allocates a commit_seq, builds the proof and marker, and adds
393    /// the commit to the group batch. Returns the allocated commit result.
394    ///
395    /// The caller MUST call `flush_batch` to actually issue fsyncs and
396    /// complete the commits.
397    ///
398    /// # Errors
399    ///
400    /// Returns `Err` if validation fails (FCW or SSI conflict).
401    pub fn submit(
402        &mut self,
403        submission: CommitSubmission,
404        now_unix_ns: u64,
405    ) -> Result<CommitSeq, CommitResult> {
406        // Step 1: Validate
407        self.validate(&submission)?;
408
409        GLOBAL_GROUP_COMMIT_METRICS.record_submission();
410
411        // Step 2: Allocate gap-free commit_seq
412        let new_seq = self.commit_seq_tip.next();
413        let commit_time = now_unix_ns.max(self.last_commit_time_ns.wrapping_add(1));
414
415        // Step 3: Build CommitProof (persisted as ECS object)
416        let proof = CommitProof {
417            commit_seq: new_seq,
418            edges: Vec::new(), // Populated by SSI witness plane
419            evidence_refs: submission.witness_refs.clone(),
420        };
421        let proof_object_id = Self::derive_proof_object_id(&proof);
422
423        // Update coordinator state (inside serialized section)
424        self.commit_seq_tip = new_seq;
425        self.last_commit_time_ns = commit_time;
426        self.commit_index
427            .record_commit(&submission.write_set_pages, new_seq);
428
429        trace!(
430            target: "fsqlite_wal::native_commit",
431            phase = "submit",
432            commit_seq = new_seq.get(),
433            pages = submission.write_set_pages.len(),
434            begin_seq = submission.begin_seq.get(),
435            pending_batch = self.batch.len() + 1,
436            "allocated commit_seq"
437        );
438
439        // Add to group commit batch
440        self.batch.push(PendingCommit {
441            submission,
442            allocated_seq: new_seq,
443            allocated_time_ns: commit_time,
444            proof_object_id,
445            barriers: FsyncBarriers::new(),
446        });
447
448        Ok(new_seq)
449    }
450
451    /// Execute FSYNC_1 (pre-marker group commit point).
452    ///
453    /// Makes all pending capsule symbols AND CommitProof objects durable
454    /// BEFORE markers reference them. Without this barrier, NVMe write
455    /// reordering can make a marker durable while referents are not.
456    ///
457    /// Returns the number of commits covered by this fsync.
458    pub fn fsync1(&mut self) -> usize {
459        let count = self.batch.len();
460        self.batch.mark_fsync1_complete();
461        GLOBAL_GROUP_COMMIT_METRICS.record_fsync1();
462        debug!(
463            target: "fsqlite_wal::native_commit",
464            phase = "fsync1",
465            batch_size = count,
466            "pre-marker fsync complete"
467        );
468        count
469    }
470
471    /// Steps 5-6: Append markers and execute FSYNC_2 (post-marker).
472    ///
473    /// For each pending commit with FSYNC_1 complete, appends a
474    /// `CommitMarker` to the marker stream, then marks FSYNC_2 complete.
475    ///
476    /// Returns the markers that were appended.
477    pub fn append_markers_and_fsync2(&mut self) -> Vec<CommitMarker> {
478        let mut markers = Vec::with_capacity(self.batch.pending.len());
479
480        for pc in &mut self.batch.pending {
481            if pc.barriers.fsync1_complete && !pc.barriers.fsync2_complete {
482                // Step 5: Build and append CommitMarker
483                let marker = CommitMarker::new(
484                    pc.allocated_seq,
485                    pc.allocated_time_ns,
486                    pc.submission.capsule_object_id,
487                    pc.proof_object_id,
488                    self.prev_marker_id,
489                );
490
491                // Derive marker ObjectId for chain linking
492                let marker_bytes = marker.to_record_bytes();
493                let marker_oid = ObjectId::derive_from_canonical_bytes(&marker_bytes);
494                self.prev_marker_id = Some(marker_oid);
495
496                markers.push(marker);
497            }
498        }
499
500        // Step 6: FSYNC_2 on marker stream
501        self.batch.mark_fsync2_complete();
502        GLOBAL_GROUP_COMMIT_METRICS.record_fsync2();
503
504        debug!(
505            target: "fsqlite_wal::native_commit",
506            phase = "fsync2",
507            markers_appended = markers.len(),
508            "post-marker fsync complete"
509        );
510
511        markers
512    }
513
514    /// Steps 7-8: Drain committed entries and return results.
515    ///
516    /// Step 7 (SHM publish) is the caller's responsibility after receiving
517    /// these results — the coordinator only manages the serialized section.
518    pub fn drain_committed(&mut self) -> Vec<CommitResult> {
519        let drained = self.batch.drain_committed();
520        let batch_size = drained.len();
521        let results: Vec<CommitResult> = drained
522            .into_iter()
523            .map(|(_, seq, time)| {
524                info!(
525                    target: "fsqlite_wal::native_commit",
526                    commit_seq = seq.get(),
527                    durable = true,
528                    "commit published"
529                );
530                CommitResult::Committed {
531                    commit_seq: seq,
532                    commit_time_unix_ns: time,
533                }
534            })
535            .collect();
536
537        if batch_size > 0 {
538            info!(
539                target: "fsqlite_wal::native_commit",
540                group_size = batch_size,
541                "parallel_wal_commit group drained"
542            );
543        }
544
545        results
546    }
547
548    /// Flush the pending batch: fsync1, append markers, fsync2, drain.
549    ///
550    /// Wraps the full group commit cycle in a `parallel_wal_commit` tracing
551    /// span with epoch, group_size, and commit_seq range fields.
552    /// Returns the committed results and records metrics.
553    pub fn flush_batch(&mut self) -> Vec<CommitResult> {
554        let group_size = self.batch.len();
555        if group_size == 0 {
556            return Vec::new();
557        }
558
559        let start = Instant::now();
560        self.epoch += 1;
561        let epoch = self.epoch;
562
563        let span = tracing::info_span!(
564            target: "fsqlite_wal::native_commit",
565            "parallel_wal_commit",
566            epoch,
567            group_size,
568            frames_in_batch = group_size,
569        );
570        let _guard = span.enter();
571
572        let fsync1_count = self.fsync1();
573        let markers = self.append_markers_and_fsync2();
574        let results = self.drain_committed();
575
576        #[allow(clippy::cast_possible_truncation)]
577        let latency_us = start.elapsed().as_micros() as u64;
578        GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(group_size as u64, latency_us);
579
580        info!(
581            target: "fsqlite_wal::native_commit",
582            epoch,
583            group_size,
584            fsync1_count,
585            markers_appended = markers.len(),
586            committed = results.len(),
587            latency_us,
588            "parallel_wal_commit complete"
589        );
590
591        results
592    }
593
594    /// Convenience: submit, fsync1, append markers, fsync2, drain.
595    ///
596    /// Processes a single submission through the entire protocol.
597    /// In production, submissions are batched; this is for testing and
598    /// single-commit workloads.
599    pub fn submit_and_commit(
600        &mut self,
601        submission: CommitSubmission,
602        now_unix_ns: u64,
603    ) -> CommitResult {
604        match self.submit(submission, now_unix_ns) {
605            Ok(_seq) => {
606                let mut results = self.flush_batch();
607                results.pop().unwrap_or(CommitResult::ShuttingDown)
608            }
609            Err(result) => result,
610        }
611    }
612
613    /// Derive a deterministic ObjectId for a CommitProof.
614    fn derive_proof_object_id(proof: &CommitProof) -> ObjectId {
615        let mut canonical =
616            Vec::with_capacity(16 + 8 + proof.edges.len() * 16 + proof.evidence_refs.len() * 32);
617        canonical.extend_from_slice(b"fsqlite:proof:v1");
618        canonical.extend_from_slice(&proof.commit_seq.get().to_le_bytes());
619        for edge in &proof.edges {
620            canonical.extend_from_slice(&edge.from.get().to_le_bytes());
621            canonical.extend_from_slice(&edge.to.get().to_le_bytes());
622        }
623        for evidence in &proof.evidence_refs {
624            canonical.extend_from_slice(evidence.as_bytes());
625        }
626        ObjectId::derive_from_canonical_bytes(&canonical)
627    }
628}
629
630// ---------------------------------------------------------------------------
631// Tests
632// ---------------------------------------------------------------------------
633
634#[cfg(test)]
635mod tests {
636    use fsqlite_types::{CommitCapsule, TxnEpoch, TxnId};
637
638    use super::*;
639
640    fn make_oid(seed: u8) -> ObjectId {
641        ObjectId::from_bytes([seed; 16])
642    }
643
644    fn make_submission(pages: &[u32], begin_seq: u64, seed: u8) -> CommitSubmission {
645        let txn_id = TxnId::new(u64::from(seed) + 1).expect("valid txn id");
646        CommitSubmission {
647            capsule_object_id: make_oid(seed),
648            capsule_digest: [seed; 32],
649            write_set_pages: pages
650                .iter()
651                .map(|&p| PageNumber::new(p).expect("non-zero page"))
652                .collect(),
653            witness_refs: Vec::new(),
654            edge_ids: Vec::new(),
655            merge_witness_ids: Vec::new(),
656            txn_token: TxnToken::new(txn_id, TxnEpoch::new(1)),
657            begin_seq: CommitSeq::new(begin_seq),
658        }
659    }
660
661    fn group_commit_metrics_test_guard() -> std::sync::MutexGuard<'static, ()> {
662        crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
663            .lock()
664            .expect("global group commit metrics test lock poisoned")
665    }
666
667    // ── bd-15jh test 1: test_compat_mode_wal_format ──
668
669    #[test]
670    fn test_compat_mode_wal_format() {
671        // Verify that compatibility mode is the default and WAL-based.
672        let mode = OperatingMode::default();
673        assert_eq!(mode, OperatingMode::Compatibility);
674        assert!(!mode.is_native());
675        assert!(mode.legacy_readers_allowed());
676        assert_eq!(mode.to_string(), "compatibility");
677
678        // PRAGMA parsing
679        assert_eq!(
680            OperatingMode::from_pragma("compatibility"),
681            Some(OperatingMode::Compatibility)
682        );
683        assert_eq!(
684            OperatingMode::from_pragma("compat"),
685            Some(OperatingMode::Compatibility)
686        );
687        assert_eq!(
688            OperatingMode::from_pragma("native"),
689            Some(OperatingMode::Native)
690        );
691        assert_eq!(
692            OperatingMode::from_pragma("NATIVE"),
693            Some(OperatingMode::Native)
694        );
695        assert!(OperatingMode::from_pragma("invalid").is_none());
696
697        // Coordinator in compat mode
698        let coord = WriteCoordinator::new(OperatingMode::Compatibility, CommitSeq::ZERO, 16);
699        assert_eq!(coord.mode(), OperatingMode::Compatibility);
700        assert_eq!(coord.commit_seq_tip(), CommitSeq::ZERO);
701    }
702
703    // ── bd-15jh test 2: test_native_mode_commit_capsule ──
704
705    #[test]
706    fn test_native_mode_commit_capsule() {
707        // Verify CommitCapsule is persisted before coordinator contact.
708        // In the protocol, the writer builds and persists the capsule (steps 1-6)
709        // before submitting to the coordinator (step 7).
710        let capsule = CommitCapsule {
711            object_id: make_oid(0x11),
712            snapshot_basis: CommitSeq::new(5),
713            intent_log: Vec::new(),
714            page_deltas: vec![
715                (PageNumber::new(10).unwrap(), vec![0xAA; 4096]),
716                (PageNumber::new(20).unwrap(), vec![0xBB; 4096]),
717            ],
718            read_set_digest: [0x01; 32],
719            write_set_digest: [0x02; 32],
720            read_witness_refs: vec![make_oid(0x30)],
721            write_witness_refs: vec![make_oid(0x31)],
722            dependency_edge_refs: Vec::new(),
723            merge_witness_refs: Vec::new(),
724        };
725
726        // Capsule has content before coordinator sees it
727        assert_eq!(capsule.snapshot_basis.get(), 5);
728        assert_eq!(capsule.page_deltas.len(), 2);
729        assert_eq!(capsule.read_witness_refs.len(), 1);
730
731        // Submission contains only the capsule_object_id (not the full capsule)
732        let submission = CommitSubmission {
733            capsule_object_id: capsule.object_id,
734            capsule_digest: [0xFF; 32],
735            write_set_pages: capsule.page_deltas.iter().map(|(pgno, _)| *pgno).collect(),
736            witness_refs: capsule.read_witness_refs.clone(),
737            edge_ids: Vec::new(),
738            merge_witness_ids: Vec::new(),
739            txn_token: TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(1)),
740            begin_seq: capsule.snapshot_basis,
741        };
742
743        assert_eq!(submission.capsule_object_id, capsule.object_id);
744        assert_eq!(submission.write_set_pages.len(), 2);
745    }
746
747    // ── bd-15jh test 3: test_native_marker_append ──
748
749    #[test]
750    fn test_native_marker_append() {
751        // Verify CommitMarker is appended atomically with correct wire format.
752        let marker = CommitMarker::new(
753            CommitSeq::new(42),
754            1_700_000_000_000_000_000,
755            make_oid(0x11),
756            make_oid(0x22),
757            Some(make_oid(0x33)),
758        );
759
760        // Wire format is exactly 88 bytes
761        let bytes = marker.to_record_bytes();
762        assert_eq!(
763            bytes.len(),
764            fsqlite_types::COMMIT_MARKER_RECORD_V1_SIZE,
765            "marker record must be exactly 88 bytes"
766        );
767
768        // Round-trip
769        let recovered =
770            CommitMarker::from_record_bytes(&bytes).expect("marker roundtrip must succeed");
771        assert_eq!(recovered.commit_seq, marker.commit_seq);
772        assert_eq!(recovered.commit_time_unix_ns, marker.commit_time_unix_ns);
773        assert_eq!(recovered.capsule_object_id, marker.capsule_object_id);
774        assert_eq!(recovered.proof_object_id, marker.proof_object_id);
775        assert_eq!(recovered.prev_marker, marker.prev_marker);
776        assert_eq!(recovered.integrity_hash, marker.integrity_hash);
777
778        // Integrity verification
779        assert!(marker.verify_integrity());
780
781        // Marker without prev (genesis)
782        let genesis = CommitMarker::new(
783            CommitSeq::new(1),
784            1_700_000_000_000_000_000,
785            make_oid(0x01),
786            make_oid(0x02),
787            None,
788        );
789        assert!(genesis.prev_marker.is_none());
790        assert!(genesis.verify_integrity());
791        let genesis_bytes = genesis.to_record_bytes();
792        let genesis_recovered = CommitMarker::from_record_bytes(&genesis_bytes).unwrap();
793        assert!(genesis_recovered.prev_marker.is_none());
794    }
795
796    // ── bd-15jh test 4: test_native_group_commit ──
797
798    #[test]
799    fn test_native_group_commit() {
800        let _metrics_guard = group_commit_metrics_test_guard();
801        // Multiple commits share a single fsync.
802        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
803
804        // Submit 5 writers to different pages
805        let base_time = 1_700_000_000_000_000_000_u64;
806        for i in 0..5u8 {
807            let pages = &[u32::from(i) * 10 + 1]; // pages 1, 11, 21, 31, 41
808            let sub = make_submission(pages, 0, i);
809            let seq = coord.submit(sub, base_time + u64::from(i)).unwrap();
810            assert_eq!(seq.get(), u64::from(i) + 1);
811        }
812
813        // All 5 pending
814        assert_eq!(coord.pending_count(), 5);
815
816        // Single FSYNC_1 covers all 5
817        let fsync1_count = coord.fsync1();
818        assert_eq!(fsync1_count, 5);
819
820        // Append markers + FSYNC_2
821        let markers = coord.append_markers_and_fsync2();
822        assert_eq!(markers.len(), 5);
823
824        // Verify marker chain linking
825        assert!(markers[0].prev_marker.is_none()); // first marker is genesis
826        for (i, marker) in markers.iter().enumerate().skip(1) {
827            assert!(
828                marker.prev_marker.is_some(),
829                "marker {i} should link to previous"
830            );
831        }
832
833        // All should drain as committed
834        let results = coord.drain_committed();
835        assert_eq!(results.len(), 5);
836        for (i, result) in results.iter().enumerate() {
837            match result {
838                CommitResult::Committed { commit_seq, .. } => {
839                    assert_eq!(commit_seq.get(), (i as u64) + 1);
840                }
841                other => unreachable!("expected Committed, got {other:?}"),
842            }
843        }
844
845        assert_eq!(coord.pending_count(), 0);
846    }
847
848    // ── bd-15jh test 5: test_native_crash_recovery ──
849
850    #[test]
851    fn test_native_crash_recovery() {
852        let _metrics_guard = group_commit_metrics_test_guard();
853        // Verify recovery at each step of the protocol.
854        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
855
856        // Simulate crash BEFORE fsync1: commit is not durable
857        let sub1 = make_submission(&[1], 0, 1);
858        let seq1 = coord.submit(sub1, 1_000_000).unwrap();
859        assert_eq!(seq1.get(), 1);
860        // Crash here: no fsync1, no marker. On recovery, commit_seq_tip
861        // would be restored from the marker stream (still at 0).
862        // The pending commit is lost (expected: capsule may exist but
863        // marker does not, so it's not committed).
864
865        // Reset to simulate recovery
866        let mut coord = WriteCoordinator::new(
867            OperatingMode::Native,
868            CommitSeq::ZERO, // recovered from marker stream tip
869            16,
870        );
871
872        // Simulate crash AFTER fsync1 but BEFORE marker append
873        let sub2 = make_submission(&[2], 0, 2);
874        let _seq2 = coord.submit(sub2, 2_000_000).unwrap();
875        let _fsync1 = coord.fsync1();
876        // Crash here: capsule + proof are durable (fsync1 done), but
877        // marker not appended. On recovery, commit is NOT committed
878        // (marker is the point of no return, not the proof).
879
880        // Reset again
881        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
882
883        // Simulate complete commit
884        let sub3 = make_submission(&[3], 0, 3);
885        let result = coord.submit_and_commit(sub3, 3_000_000);
886        assert!(
887            matches!(result, CommitResult::Committed { commit_seq, .. } if commit_seq.get() == 1),
888            "complete commit should succeed"
889        );
890
891        // After recovery, this commit IS visible (marker is durable)
892        assert_eq!(coord.commit_seq_tip().get(), 1);
893    }
894
895    // ── bd-15jh test 6: test_native_concurrent_writers ──
896
897    #[test]
898    fn test_native_concurrent_writers() {
899        let _metrics_guard = group_commit_metrics_test_guard();
900        // N writers commit in parallel without serialization on payload.
901        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
902
903        // 10 writers touching disjoint pages
904        let base_time = 1_700_000_000_000_000_000_u64;
905        for i in 0..10u8 {
906            let page = u32::from(i) + 1; // pages 1..10
907            let sub = make_submission(&[page], 0, i);
908            let seq = coord.submit(sub, base_time + u64::from(i)).unwrap();
909            assert_eq!(
910                seq.get(),
911                u64::from(i) + 1,
912                "writer {i} should get sequential commit_seq"
913            );
914        }
915
916        // All submitted without conflict (different pages)
917        assert_eq!(coord.pending_count(), 10);
918
919        // Batch commit
920        coord.fsync1();
921        let markers = coord.append_markers_and_fsync2();
922        assert_eq!(markers.len(), 10);
923        let results = coord.drain_committed();
924        assert_eq!(results.len(), 10);
925
926        // Now test FCW conflict: writer 11 touches page 5 (already committed)
927        let conflicting = make_submission(&[5], 0, 11);
928        let result = coord.submit(conflicting, base_time + 100);
929        assert!(
930            matches!(result, Err(CommitResult::ConflictFcw { .. })),
931            "overlapping page should trigger FCW conflict"
932        );
933
934        // Writer 12 touches page 5 but with begin_seq >= the commit
935        let non_conflicting = make_submission(&[5], 5, 12);
936        let result = coord.submit(non_conflicting, base_time + 200);
937        assert!(
938            result.is_ok(),
939            "writer with updated begin_seq should not conflict"
940        );
941    }
942
943    // ── Additional protocol invariant tests ──
944
945    #[test]
946    fn test_coordinator_shutdown_rejects_submissions() {
947        let _metrics_guard = group_commit_metrics_test_guard();
948        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
949
950        coord.initiate_shutdown();
951
952        let sub = make_submission(&[1], 0, 1);
953        let result = coord.submit(sub, 1_000_000);
954        assert!(matches!(result, Err(CommitResult::ShuttingDown)));
955    }
956
957    #[test]
958    fn test_commit_seq_gap_free() {
959        let _metrics_guard = group_commit_metrics_test_guard();
960        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::new(100), 16);
961
962        for i in 0..5u8 {
963            let sub = make_submission(&[u32::from(i) + 1], 100, i);
964            let seq = coord.submit(sub, 1_000_000 + u64::from(i)).unwrap();
965            assert_eq!(seq.get(), 101 + u64::from(i), "commit_seq must be gap-free");
966        }
967    }
968
969    #[test]
970    fn test_commit_time_monotonic() {
971        let _metrics_guard = group_commit_metrics_test_guard();
972        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
973
974        // Submit with decreasing wall-clock times
975        let sub1 = make_submission(&[1], 0, 1);
976        let _seq1 = coord.submit(sub1, 1_000_000).unwrap();
977
978        let sub2 = make_submission(&[2], 0, 2);
979        let _seq2 = coord.submit(sub2, 500_000).unwrap(); // earlier timestamp!
980
981        // Drain and verify monotonicity
982        coord.fsync1();
983        coord.append_markers_and_fsync2();
984        let results = coord.drain_committed();
985
986        let times: Vec<u64> = results
987            .iter()
988            .filter_map(|r| {
989                if let CommitResult::Committed {
990                    commit_time_unix_ns,
991                    ..
992                } = r
993                {
994                    Some(*commit_time_unix_ns)
995                } else {
996                    None
997                }
998            })
999            .collect();
1000
1001        assert_eq!(times.len(), 2);
1002        assert!(
1003            times[0] < times[1],
1004            "commit times must be monotonically increasing: {times:?}"
1005        );
1006    }
1007
1008    #[test]
1009    fn test_marker_integrity_tamper_detection() {
1010        let marker = CommitMarker::new(
1011            CommitSeq::new(1),
1012            1_000_000,
1013            make_oid(0x11),
1014            make_oid(0x22),
1015            None,
1016        );
1017        assert!(marker.verify_integrity());
1018
1019        // Tamper with commit_seq
1020        let mut tampered = marker;
1021        tampered.commit_seq = CommitSeq::new(999);
1022        assert!(!tampered.verify_integrity());
1023    }
1024
1025    #[test]
1026    fn test_fsync_barriers_order() {
1027        let mut barriers = FsyncBarriers::new();
1028        assert!(!barriers.all_complete());
1029
1030        barriers.fsync1_complete = true;
1031        assert!(!barriers.all_complete());
1032
1033        barriers.fsync2_complete = true;
1034        assert!(barriers.all_complete());
1035    }
1036
1037    // ── bd-14m.2.1: Group commit observability metrics ──
1038
1039    #[test]
1040    fn test_group_commit_metrics_basic() {
1041        use crate::metrics::GroupCommitMetrics;
1042        let m = GroupCommitMetrics::new();
1043
1044        m.record_submission();
1045        m.record_submission();
1046        m.record_submission();
1047        m.record_group_commit(3, 500);
1048        m.record_fsync1();
1049        m.record_fsync2();
1050        m.record_fcw_conflict();
1051        m.record_ssi_conflict();
1052        m.record_shutdown_rejection();
1053
1054        let snap = m.snapshot();
1055        assert_eq!(snap.submissions_total, 3);
1056        assert_eq!(snap.group_commits_total, 1);
1057        assert_eq!(snap.group_commit_size_sum, 3);
1058        assert_eq!(snap.commit_latency_us_total, 500);
1059        assert_eq!(snap.fsync1_total, 1);
1060        assert_eq!(snap.fsync2_total, 1);
1061        assert_eq!(snap.fcw_conflicts_total, 1);
1062        assert_eq!(snap.ssi_conflicts_total, 1);
1063        assert_eq!(snap.shutdown_rejections_total, 1);
1064        assert_eq!(snap.avg_group_size(), 3);
1065        assert_eq!(snap.avg_commit_latency_us(), 500);
1066    }
1067
1068    #[test]
1069    fn test_group_commit_metrics_reset() {
1070        use crate::metrics::GroupCommitMetrics;
1071        let m = GroupCommitMetrics::new();
1072        m.record_submission();
1073        m.record_group_commit(1, 100);
1074        m.record_fsync1();
1075        m.record_fsync2();
1076        m.record_fcw_conflict();
1077        m.reset();
1078        let snap = m.snapshot();
1079        assert_eq!(snap.submissions_total, 0);
1080        assert_eq!(snap.group_commits_total, 0);
1081        assert_eq!(snap.fsync1_total, 0);
1082        assert_eq!(snap.fsync2_total, 0);
1083        assert_eq!(snap.fcw_conflicts_total, 0);
1084    }
1085
1086    #[test]
1087    fn test_group_commit_metrics_display() {
1088        use crate::metrics::GroupCommitMetrics;
1089        let m = GroupCommitMetrics::new();
1090        m.record_submission();
1091        m.record_group_commit(1, 200);
1092        m.record_fsync1();
1093        m.record_fsync2();
1094        let s = m.snapshot().to_string();
1095        assert!(s.contains("group_commits=1"));
1096        assert!(s.contains("submissions=1"));
1097        assert!(s.contains("fsync1=1"));
1098        assert!(s.contains("fsync2=1"));
1099    }
1100
1101    #[test]
1102    fn test_group_commit_metrics_avg_zero() {
1103        use crate::metrics::GroupCommitMetrics;
1104        let m = GroupCommitMetrics::new();
1105        let snap = m.snapshot();
1106        assert_eq!(snap.avg_group_size(), 0);
1107        assert_eq!(snap.avg_commit_latency_us(), 0);
1108        assert_eq!(snap.fsync_reduction_ratio(), 0);
1109    }
1110
1111    /// Deterministic proof that group commit achieves >5x fsync reduction.
1112    ///
1113    /// Without group commit: N commits × 2 fsyncs = 2N fsyncs.
1114    /// With group commit: N commits batched → 1 fsync1 + 1 fsync2 = 2 fsyncs.
1115    /// Reduction ratio: 2N / 2 = N (for N >= 6, ratio > 5x).
1116    #[test]
1117    fn test_fsync_reduction_proof_deterministic() {
1118        let _metrics_guard = group_commit_metrics_test_guard();
1119        use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1120
1121        // Reset global metrics for this test.
1122        GLOBAL_GROUP_COMMIT_METRICS.reset();
1123
1124        let n = 10_u8; // 10 concurrent writers
1125        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
1126
1127        let base_time = 1_700_000_000_000_000_000_u64;
1128
1129        // Phase 1: Submit all N writers (disjoint pages)
1130        for i in 0..n {
1131            let page = u32::from(i) + 1;
1132            let sub = make_submission(&[page], 0, i);
1133            coord.submit(sub, base_time + u64::from(i)).unwrap();
1134        }
1135
1136        // Phase 2: Single batch flush (1 fsync1 + 1 fsync2)
1137        coord.fsync1();
1138        coord.append_markers_and_fsync2();
1139        let results = coord.drain_committed();
1140        assert_eq!(results.len(), usize::from(n));
1141
1142        // Record the group commit metric
1143        GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(u64::from(n), 0);
1144
1145        let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1146
1147        // Verify: N submissions, but only 1 fsync1 + 1 fsync2 = 2 fsyncs total
1148        assert_eq!(snap.submissions_total, u64::from(n));
1149        assert_eq!(snap.fsync1_total, 1, "only 1 FSYNC_1 for entire batch");
1150        assert_eq!(snap.fsync2_total, 1, "only 1 FSYNC_2 for entire batch");
1151
1152        // Without batching: each commit needs its own fsync1 + fsync2 = 2N fsyncs
1153        let unbatched_fsyncs = u64::from(n) * 2;
1154        let batched_fsyncs = snap.fsync1_total + snap.fsync2_total;
1155        let reduction = unbatched_fsyncs / batched_fsyncs;
1156
1157        assert!(
1158            reduction >= 5,
1159            "group commit must achieve >=5x fsync reduction: \
1160             {n} commits, unbatched={unbatched_fsyncs} fsyncs, \
1161             batched={batched_fsyncs} fsyncs, reduction={reduction}x"
1162        );
1163
1164        // Verify the snapshot ratio method agrees
1165        assert!(
1166            snap.fsync_reduction_ratio() >= 5,
1167            "fsync_reduction_ratio must be >= 5: got {}",
1168            snap.fsync_reduction_ratio()
1169        );
1170    }
1171
1172    /// Verify metrics are emitted during submit_and_commit convenience path.
1173    #[test]
1174    fn test_submit_and_commit_records_metrics() {
1175        let _metrics_guard = group_commit_metrics_test_guard();
1176        use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1177
1178        GLOBAL_GROUP_COMMIT_METRICS.reset();
1179
1180        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1181        let sub = make_submission(&[1], 0, 1);
1182        let result = coord.submit_and_commit(sub, 1_000_000);
1183        assert!(matches!(result, CommitResult::Committed { .. }));
1184
1185        let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1186        assert_eq!(snap.submissions_total, 1);
1187        assert_eq!(snap.group_commits_total, 1);
1188        assert_eq!(snap.group_commit_size_sum, 1);
1189        assert_eq!(snap.fsync1_total, 1);
1190        assert_eq!(snap.fsync2_total, 1);
1191    }
1192
1193    /// Verify FCW conflict increments the metric counter.
1194    #[test]
1195    fn test_fcw_conflict_metric() {
1196        let _metrics_guard = group_commit_metrics_test_guard();
1197        use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1198
1199        GLOBAL_GROUP_COMMIT_METRICS.reset();
1200
1201        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1202
1203        // First commit succeeds
1204        let sub1 = make_submission(&[1], 0, 1);
1205        coord.submit_and_commit(sub1, 1_000_000);
1206
1207        // Second commit to same page with stale begin_seq fails
1208        let sub2 = make_submission(&[1], 0, 2);
1209        let result = coord.submit(sub2, 2_000_000);
1210        assert!(matches!(result, Err(CommitResult::ConflictFcw { .. })));
1211
1212        let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1213        assert_eq!(snap.fcw_conflicts_total, 1);
1214    }
1215
1216    /// Verify shutdown rejection increments the metric counter.
1217    #[test]
1218    fn test_shutdown_rejection_metric() {
1219        let _metrics_guard = group_commit_metrics_test_guard();
1220        use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1221
1222        GLOBAL_GROUP_COMMIT_METRICS.reset();
1223
1224        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1225        coord.initiate_shutdown();
1226
1227        let sub = make_submission(&[1], 0, 1);
1228        let result = coord.submit(sub, 1_000_000);
1229        assert!(matches!(result, Err(CommitResult::ShuttingDown)));
1230
1231        let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1232        assert_eq!(snap.shutdown_rejections_total, 1);
1233    }
1234
1235    // ── bd-14m.2: Parallel WAL epoch and flush_batch ──
1236
1237    /// Verify flush_batch increments epoch and records metrics.
1238    #[test]
1239    fn test_flush_batch_epoch_tracking() {
1240        let _metrics_guard = group_commit_metrics_test_guard();
1241        use crate::metrics::GLOBAL_GROUP_COMMIT_METRICS;
1242
1243        GLOBAL_GROUP_COMMIT_METRICS.reset();
1244
1245        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 32);
1246        assert_eq!(coord.current_epoch(), 0);
1247
1248        // First batch: 3 commits
1249        let base_time = 1_700_000_000_000_000_000_u64;
1250        for i in 0..3u8 {
1251            let sub = make_submission(&[u32::from(i) + 1], 0, i);
1252            coord.submit(sub, base_time + u64::from(i)).unwrap();
1253        }
1254        let results = coord.flush_batch();
1255        assert_eq!(results.len(), 3);
1256        assert_eq!(coord.current_epoch(), 1);
1257
1258        // Second batch: 2 commits
1259        for i in 3..5u8 {
1260            let sub = make_submission(&[u32::from(i) + 10], 3, i);
1261            coord.submit(sub, base_time + 100 + u64::from(i)).unwrap();
1262        }
1263        let results = coord.flush_batch();
1264        assert_eq!(results.len(), 2);
1265        assert_eq!(coord.current_epoch(), 2);
1266
1267        let snap = GLOBAL_GROUP_COMMIT_METRICS.snapshot();
1268        assert_eq!(snap.group_commits_total, 2);
1269        assert_eq!(snap.group_commit_size_sum, 5); // 3 + 2
1270        assert_eq!(snap.submissions_total, 5);
1271        assert_eq!(snap.fsync1_total, 2);
1272        assert_eq!(snap.fsync2_total, 2);
1273    }
1274
1275    /// flush_batch on empty batch is a no-op.
1276    #[test]
1277    fn test_flush_batch_empty_noop() {
1278        let _metrics_guard = group_commit_metrics_test_guard();
1279        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1280        let results = coord.flush_batch();
1281        assert!(results.is_empty());
1282        assert_eq!(coord.current_epoch(), 0); // epoch not incremented
1283    }
1284
1285    /// Verify submit_and_commit delegates through flush_batch path.
1286    #[test]
1287    fn test_submit_and_commit_uses_flush_batch_epoch() {
1288        let _metrics_guard = group_commit_metrics_test_guard();
1289        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 16);
1290        let sub = make_submission(&[1], 0, 1);
1291        let result = coord.submit_and_commit(sub, 1_000_000);
1292        assert!(matches!(result, CommitResult::Committed { .. }));
1293        assert_eq!(coord.current_epoch(), 1);
1294    }
1295
1296    #[test]
1297    fn test_group_commit_batch_accessors() {
1298        let batch = GroupCommitBatch::new(4);
1299        assert!(batch.is_empty());
1300        assert_eq!(batch.len(), 0);
1301        assert!(!batch.is_full());
1302    }
1303
1304    #[test]
1305    fn test_fsync_barriers_default_equals_new() {
1306        let a = FsyncBarriers::new();
1307        let b = FsyncBarriers::default();
1308        assert_eq!(a.fsync1_complete, b.fsync1_complete);
1309        assert_eq!(a.fsync2_complete, b.fsync2_complete);
1310        assert!(!a.all_complete());
1311    }
1312
1313    #[test]
1314    fn test_commit_index_record_and_conflict_check() {
1315        let mut idx = CommitIndex::new();
1316        let p1 = PageNumber::new(1).unwrap();
1317        let p2 = PageNumber::new(2).unwrap();
1318        let p3 = PageNumber::new(3).unwrap();
1319
1320        idx.record_commit(&[p1, p2], CommitSeq::new(5));
1321        idx.record_commit(&[p2, p3], CommitSeq::new(10));
1322
1323        let conflicts = idx.check_conflicts(&[p1, p2, p3], CommitSeq::new(7));
1324        assert!(conflicts.contains(&p2), "p2 modified at seq 10 > 7");
1325        assert!(conflicts.contains(&p3), "p3 modified at seq 10 > 7");
1326        assert!(!conflicts.contains(&p1), "p1 last modified at seq 5 <= 7");
1327
1328        assert!(
1329            idx.check_conflicts(&[p1, p2], CommitSeq::new(10))
1330                .is_empty()
1331        );
1332    }
1333
1334    #[test]
1335    fn commit_result_debug_clone_eq_all_variants() {
1336        let committed = CommitResult::Committed {
1337            commit_seq: CommitSeq::new(1),
1338            commit_time_unix_ns: 42,
1339        };
1340        let dbg = format!("{committed:?}");
1341        assert!(dbg.contains("Committed"));
1342        assert_eq!(committed.clone(), committed);
1343
1344        let fcw = CommitResult::ConflictFcw {
1345            conflicting_pages: vec![PageNumber::new(5).unwrap()],
1346        };
1347        assert_eq!(fcw.clone(), fcw);
1348        assert_ne!(fcw, committed);
1349
1350        let ssi = CommitResult::ConflictSsi;
1351        assert_eq!(ssi.clone(), ssi);
1352
1353        let shutdown = CommitResult::ShuttingDown;
1354        assert_eq!(shutdown.clone(), shutdown);
1355        assert_ne!(ssi, shutdown);
1356    }
1357
1358    #[test]
1359    fn commit_submission_debug_and_clone() {
1360        let sub = make_submission(&[1, 2], 5, 7);
1361        let dbg = format!("{sub:?}");
1362        assert!(dbg.contains("CommitSubmission"));
1363        let cloned = sub.clone();
1364        assert_eq!(cloned.write_set_pages.len(), 2);
1365        assert_eq!(cloned.begin_seq, CommitSeq::new(5));
1366        assert_eq!(cloned.capsule_digest, [7u8; 32]);
1367    }
1368
1369    #[test]
1370    fn fsync_barriers_debug_clone_copy() {
1371        let mut b = FsyncBarriers::new();
1372        b.fsync1_complete = true;
1373        let dbg = format!("{b:?}");
1374        assert!(dbg.contains("FsyncBarriers"));
1375        let copied = b;
1376        assert_eq!(copied, b);
1377        assert!(copied.fsync1_complete);
1378        assert!(!copied.fsync2_complete);
1379    }
1380
1381    #[test]
1382    fn group_commit_batch_is_full_at_max() {
1383        let _metrics_guard = group_commit_metrics_test_guard();
1384        let mut coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::ZERO, 3);
1385        let base = 1_000_000_u64;
1386        for i in 0..3u8 {
1387            let sub = make_submission(&[u32::from(i) + 1], 0, i);
1388            coord.submit(sub, base + u64::from(i)).unwrap();
1389        }
1390        assert!(coord.batch.is_full());
1391        assert_eq!(coord.batch.len(), 3);
1392    }
1393
1394    #[test]
1395    fn test_commit_index_default_equals_new() {
1396        let a = CommitIndex::new();
1397        let b = CommitIndex::default();
1398        assert!(
1399            a.check_conflicts(&[PageNumber::new(1).unwrap()], CommitSeq::ZERO)
1400                .is_empty()
1401        );
1402        assert!(
1403            b.check_conflicts(&[PageNumber::new(1).unwrap()], CommitSeq::ZERO)
1404                .is_empty()
1405        );
1406    }
1407
1408    #[test]
1409    fn commit_submission_debug_clone() {
1410        let sub = make_submission(&[1, 2, 3], 0, 0xAA);
1411        let cloned = sub.clone();
1412        assert_eq!(cloned.capsule_digest, sub.capsule_digest);
1413        assert_eq!(cloned.write_set_pages.len(), 3);
1414        assert_eq!(cloned.begin_seq, CommitSeq::new(0));
1415        let dbg = format!("{sub:?}");
1416        assert!(dbg.contains("CommitSubmission"));
1417    }
1418
1419    #[test]
1420    fn commit_result_all_variants_debug_clone_eq() {
1421        let committed = CommitResult::Committed {
1422            commit_seq: CommitSeq::new(5),
1423            commit_time_unix_ns: 1_000,
1424        };
1425        let fcw = CommitResult::ConflictFcw {
1426            conflicting_pages: vec![PageNumber::new(1).unwrap()],
1427        };
1428        let ssi = CommitResult::ConflictSsi;
1429        let shutdown = CommitResult::ShuttingDown;
1430        assert_eq!(committed.clone(), committed);
1431        assert_eq!(fcw.clone(), fcw);
1432        assert_eq!(ssi.clone(), ssi);
1433        assert_eq!(shutdown.clone(), shutdown);
1434        assert_ne!(committed, fcw);
1435        assert_ne!(ssi, shutdown);
1436        let dbg = format!("{committed:?}");
1437        assert!(dbg.contains("Committed"));
1438    }
1439
1440    #[test]
1441    fn write_coordinator_accessors_on_fresh_instance() {
1442        let coord = WriteCoordinator::new(OperatingMode::Native, CommitSeq::new(10), 8);
1443        assert_eq!(coord.mode(), OperatingMode::Native);
1444        assert_eq!(coord.commit_seq_tip(), CommitSeq::new(10));
1445        assert_eq!(coord.pending_count(), 0);
1446        assert_eq!(coord.current_epoch(), 0);
1447    }
1448
1449    #[test]
1450    fn group_commit_batch_empty_boundary() {
1451        let batch = GroupCommitBatch::new(4);
1452        assert_eq!(batch.len(), 0);
1453        assert!(batch.is_empty());
1454        assert!(!batch.is_full());
1455    }
1456}