Skip to main content

fsqlite_core/
commit_repair.rs

1//! Commit durability and asynchronous repair orchestration (ยง1.6, bd-22n.11).
2//!
3//! The critical path only appends+syncs systematic symbols. Repair symbols are
4//! generated/append-synced asynchronously after commit acknowledgment.
5
6use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::future::Future;
8use std::panic::AssertUnwindSafe;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex, MutexGuard};
11use std::task::{Context, Poll, Wake, Waker};
12use std::thread;
13use std::time::{Duration, Instant};
14
15use asupersync::channel::mpsc;
16use asupersync::cx::Cx as NativeCx;
17use asupersync::runtime::{JoinHandle as AsyncJoinHandle, Runtime, spawn_blocking};
18use asupersync::sync::{OwnedSemaphorePermit, Semaphore};
19use fsqlite_error::{FrankenError, Result};
20use fsqlite_types::cx::Cx;
21use tracing::{debug, error, info, warn};
22
23const BEAD_ID: &str = "bd-22n.11";
24
25/// Default bounded capacity for commit-channel backpressure.
26pub const DEFAULT_COMMIT_CHANNEL_CAPACITY: usize = 16;
27
28/// Request sent from writers to the write coordinator.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct CommitRequest {
31    pub txn_id: u64,
32    pub write_set_pages: Vec<u32>,
33    pub payload: Vec<u8>,
34}
35
36impl CommitRequest {
37    #[must_use]
38    pub fn new(txn_id: u64, write_set_pages: Vec<u32>, payload: Vec<u8>) -> Self {
39        Self {
40            txn_id,
41            write_set_pages,
42            payload,
43        }
44    }
45}
46
47/// Capacity/config knobs for the two-phase commit pipeline.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct CommitPipelineConfig {
50    pub channel_capacity: usize,
51}
52
53impl Default for CommitPipelineConfig {
54    fn default() -> Self {
55        Self {
56            channel_capacity: DEFAULT_COMMIT_CHANNEL_CAPACITY,
57        }
58    }
59}
60
61impl CommitPipelineConfig {
62    /// Clamp PRAGMA capacity to a valid non-zero bounded channel size.
63    #[must_use]
64    pub fn from_pragma_capacity(raw_capacity: i64) -> Self {
65        let clamped_i64 = raw_capacity.clamp(1, i64::from(u16::MAX));
66        let clamped = usize::try_from(clamped_i64).expect("clamped to positive u16 range");
67        Self {
68            channel_capacity: clamped,
69        }
70    }
71}
72
73#[derive(Debug)]
74struct PendingCommit {
75    request: CommitRequest,
76    logical_permit: OwnedSemaphorePermit,
77}
78
79#[derive(Debug)]
80struct ReceiverOrderState {
81    next_receive_seq: u64,
82    aborted_reservations: BTreeSet<u64>,
83    pending_commits: BTreeMap<u64, PendingCommit>,
84}
85
86impl ReceiverOrderState {
87    fn new() -> Self {
88        Self {
89            next_receive_seq: 1,
90            aborted_reservations: BTreeSet::new(),
91            pending_commits: BTreeMap::new(),
92        }
93    }
94
95    fn mark_aborted(&mut self, reservation_seq: u64) {
96        self.aborted_reservations.insert(reservation_seq);
97    }
98
99    fn queue_commit(
100        &mut self,
101        reservation_seq: u64,
102        request: CommitRequest,
103        logical_permit: OwnedSemaphorePermit,
104    ) {
105        let replaced = self.pending_commits.insert(
106            reservation_seq,
107            PendingCommit {
108                request,
109                logical_permit,
110            },
111        );
112        debug_assert!(
113            replaced.is_none(),
114            "duplicate reservation sequence enqueued: {reservation_seq}"
115        );
116    }
117
118    fn rollback_pending_commit(&mut self, reservation_seq: u64) {
119        let _ = self.pending_commits.remove(&reservation_seq);
120    }
121
122    fn take_ready(&mut self) -> Option<CommitRequest> {
123        loop {
124            if self.aborted_reservations.remove(&self.next_receive_seq) {
125                self.next_receive_seq = self.next_receive_seq.saturating_add(1);
126                continue;
127            }
128
129            let PendingCommit {
130                request,
131                logical_permit,
132            } = self.pending_commits.remove(&self.next_receive_seq)?;
133            self.next_receive_seq = self.next_receive_seq.saturating_add(1);
134            drop(logical_permit);
135            return Some(request);
136        }
137    }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141enum CommitSignal {
142    CommitQueued { reservation_seq: u64 },
143}
144
145#[derive(Debug)]
146struct TwoPhaseQueueShared {
147    logical_capacity: Arc<Semaphore>,
148    signal_sender: mpsc::Sender<CommitSignal>,
149    signal_receiver: Mutex<mpsc::Receiver<CommitSignal>>,
150    next_reservation_seq: AtomicU64,
151    order_state: Mutex<ReceiverOrderState>,
152}
153
154impl TwoPhaseQueueShared {
155    fn with_capacity(capacity: usize) -> Self {
156        let normalized_capacity = capacity.max(1);
157        let (signal_sender, signal_receiver) = mpsc::channel(normalized_capacity);
158        Self {
159            logical_capacity: Arc::new(Semaphore::new(normalized_capacity)),
160            signal_sender,
161            signal_receiver: Mutex::new(signal_receiver),
162            next_reservation_seq: AtomicU64::new(1),
163            order_state: Mutex::new(ReceiverOrderState::new()),
164        }
165    }
166
167    fn reserve_sequence(&self) -> u64 {
168        self.next_reservation_seq.fetch_add(1, Ordering::AcqRel)
169    }
170
171    fn occupancy(&self) -> usize {
172        self.capacity()
173            .saturating_sub(self.logical_capacity.available_permits())
174    }
175
176    fn capacity(&self) -> usize {
177        self.logical_capacity.max_permits()
178    }
179
180    fn queue_commit(
181        &self,
182        reservation_seq: u64,
183        request: CommitRequest,
184        logical_permit: OwnedSemaphorePermit,
185    ) {
186        lock_with_recovery(&self.order_state, "two_phase_order_state").queue_commit(
187            reservation_seq,
188            request,
189            logical_permit,
190        );
191    }
192
193    fn rollback_pending_commit(&self, reservation_seq: u64) {
194        lock_with_recovery(&self.order_state, "two_phase_order_state")
195            .rollback_pending_commit(reservation_seq);
196    }
197
198    fn mark_aborted(&self, reservation_seq: u64) {
199        lock_with_recovery(&self.order_state, "two_phase_order_state")
200            .mark_aborted(reservation_seq);
201        self.signal_sender.wake_receiver();
202    }
203
204    fn take_ready_request(&self) -> Option<CommitRequest> {
205        lock_with_recovery(&self.order_state, "two_phase_order_state").take_ready()
206    }
207
208    fn drain_signals(&self) -> SignalDrain {
209        let mut receiver = lock_with_recovery(&self.signal_receiver, "two_phase_signal_receiver");
210        let mut drained_any = false;
211        loop {
212            match receiver.try_recv() {
213                Ok(CommitSignal::CommitQueued { .. }) => {
214                    drained_any = true;
215                }
216                Err(mpsc::RecvError::Empty | mpsc::RecvError::Cancelled) => {
217                    return if drained_any {
218                        SignalDrain::Drained
219                    } else {
220                        SignalDrain::Empty
221                    };
222                }
223                Err(mpsc::RecvError::Disconnected) => return SignalDrain::Disconnected,
224            }
225        }
226    }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230enum SignalDrain {
231    Empty,
232    Drained,
233    Disconnected,
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237enum SignalWaitOutcome {
238    Woken,
239    TimedOut,
240    Disconnected,
241}
242
243#[derive(Debug)]
244struct ThreadParkWaker {
245    thread: thread::Thread,
246}
247
248impl Wake for ThreadParkWaker {
249    fn wake(self: Arc<Self>) {
250        self.thread.unpark();
251    }
252
253    fn wake_by_ref(self: &Arc<Self>) {
254        self.thread.unpark();
255    }
256}
257
258fn current_thread_waker() -> Waker {
259    Waker::from(Arc::new(ThreadParkWaker {
260        thread: thread::current(),
261    }))
262}
263
264fn block_on_future_with_timeout<F>(future: F, native_cx: &NativeCx, timeout: Duration) -> F::Output
265where
266    F: Future,
267{
268    let waker = current_thread_waker();
269    let mut task_cx = Context::from_waker(&waker);
270    let mut future = Box::pin(future);
271    let started_at = Instant::now();
272    let mut cancelled_for_timeout = false;
273
274    loop {
275        match future.as_mut().poll(&mut task_cx) {
276            Poll::Ready(output) => return output,
277            Poll::Pending => {}
278        }
279
280        if !cancelled_for_timeout && started_at.elapsed() >= timeout {
281            native_cx.set_cancel_requested(true);
282            cancelled_for_timeout = true;
283            continue;
284        }
285
286        if cancelled_for_timeout {
287            thread::yield_now();
288        } else {
289            thread::park_timeout(timeout.saturating_sub(started_at.elapsed()));
290        }
291    }
292}
293
294fn try_reserve_signal_with_timeout(
295    sender: &mpsc::Sender<CommitSignal>,
296    timeout: Duration,
297) -> Option<mpsc::SendPermit<'_, CommitSignal>> {
298    let started_at = Instant::now();
299    loop {
300        match sender.try_reserve() {
301            Ok(permit) => return Some(permit),
302            Err(mpsc::SendError::Disconnected(())) => return None,
303            Err(mpsc::SendError::Full(()) | mpsc::SendError::Cancelled(())) => {
304                if started_at.elapsed() >= timeout {
305                    return None;
306                }
307                thread::park_timeout(Duration::from_millis(1));
308            }
309        }
310    }
311}
312
313fn wait_for_signal_activity(shared: &TwoPhaseQueueShared, timeout: Duration) -> SignalWaitOutcome {
314    let native_cx = NativeCx::for_testing();
315    let waker = current_thread_waker();
316    let mut task_cx = Context::from_waker(&waker);
317    let started_at = Instant::now();
318    let mut receiver = lock_with_recovery(&shared.signal_receiver, "two_phase_signal_receiver");
319    let mut future = Box::pin(receiver.recv(&native_cx));
320
321    match future.as_mut().poll(&mut task_cx) {
322        Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => return SignalWaitOutcome::Woken,
323        Poll::Ready(Err(mpsc::RecvError::Disconnected)) => return SignalWaitOutcome::Disconnected,
324        Poll::Ready(Err(mpsc::RecvError::Cancelled)) => return SignalWaitOutcome::TimedOut,
325        Poll::Ready(Err(mpsc::RecvError::Empty)) => unreachable!("recv() does not return Empty"),
326        Poll::Pending => {}
327    }
328
329    let remaining = timeout.saturating_sub(started_at.elapsed());
330    if remaining.is_zero() {
331        native_cx.set_cancel_requested(true);
332        return match future.as_mut().poll(&mut task_cx) {
333            Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => SignalWaitOutcome::Woken,
334            Poll::Ready(Err(mpsc::RecvError::Disconnected)) => SignalWaitOutcome::Disconnected,
335            Poll::Ready(Err(mpsc::RecvError::Cancelled)) | Poll::Pending => {
336                SignalWaitOutcome::TimedOut
337            }
338            Poll::Ready(Err(mpsc::RecvError::Empty)) => {
339                unreachable!("recv() does not return Empty")
340            }
341        };
342    }
343
344    thread::park_timeout(remaining);
345    if started_at.elapsed() >= timeout {
346        native_cx.set_cancel_requested(true);
347        return match future.as_mut().poll(&mut task_cx) {
348            Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => SignalWaitOutcome::Woken,
349            Poll::Ready(Err(mpsc::RecvError::Disconnected)) => SignalWaitOutcome::Disconnected,
350            Poll::Ready(Err(mpsc::RecvError::Cancelled)) | Poll::Pending => {
351                SignalWaitOutcome::TimedOut
352            }
353            Poll::Ready(Err(mpsc::RecvError::Empty)) => {
354                unreachable!("recv() does not return Empty")
355            }
356        };
357    }
358
359    SignalWaitOutcome::Woken
360}
361
362/// Sender side of the two-phase bounded MPSC commit channel.
363#[derive(Debug, Clone)]
364pub struct TwoPhaseCommitSender {
365    shared: Arc<TwoPhaseQueueShared>,
366}
367
368impl TwoPhaseCommitSender {
369    /// Reserve a slot (phase 1). Blocks when channel is saturated.
370    pub fn reserve(&self) -> SendPermit<'_> {
371        loop {
372            if let Some(permit) = self.try_reserve_for(Duration::from_secs(3600)) {
373                return permit;
374            }
375        }
376    }
377
378    /// Reserve with timeout; `None` means caller gave up (cancel during reserve).
379    #[must_use]
380    pub fn try_reserve_for(&self, timeout: Duration) -> Option<SendPermit<'_>> {
381        let started_at = Instant::now();
382
383        let logical_cx = NativeCx::for_testing();
384        let logical_permit = match block_on_future_with_timeout(
385            OwnedSemaphorePermit::acquire(
386                Arc::clone(&self.shared.logical_capacity),
387                &logical_cx,
388                1,
389            ),
390            &logical_cx,
391            timeout,
392        ) {
393            Ok(permit) => permit,
394            Err(_) => return None,
395        };
396
397        let remaining = timeout.saturating_sub(started_at.elapsed());
398        let signal_permit = try_reserve_signal_with_timeout(&self.shared.signal_sender, remaining)?;
399
400        Some(SendPermit {
401            shared: Arc::clone(&self.shared),
402            signal_permit: Some(signal_permit),
403            logical_permit: Some(logical_permit),
404            reservation_seq: Some(self.shared.reserve_sequence()),
405        })
406    }
407
408    /// Current buffered + reserved occupancy.
409    #[must_use]
410    pub fn occupancy(&self) -> usize {
411        self.shared.occupancy()
412    }
413
414    /// Bounded channel capacity.
415    #[must_use]
416    pub fn capacity(&self) -> usize {
417        self.shared.capacity()
418    }
419}
420
421/// Receiver side of the two-phase bounded MPSC commit channel.
422#[derive(Debug, Clone)]
423pub struct TwoPhaseCommitReceiver {
424    shared: Arc<TwoPhaseQueueShared>,
425}
426
427impl TwoPhaseCommitReceiver {
428    /// Receive the next coordinator request (FIFO by reservation order).
429    pub fn recv(&self) -> CommitRequest {
430        loop {
431            if let Some(request) = self.try_recv_for(Duration::from_secs(3600)) {
432                return request;
433            }
434        }
435    }
436
437    /// Timed receive used by tests and bounded coordinator loops.
438    #[must_use]
439    pub fn try_recv_for(&self, timeout: Duration) -> Option<CommitRequest> {
440        let started_at = Instant::now();
441        loop {
442            match self.shared.drain_signals() {
443                SignalDrain::Drained => {
444                    if let Some(request) = self.shared.take_ready_request() {
445                        return Some(request);
446                    }
447                    continue;
448                }
449                SignalDrain::Disconnected => return self.shared.take_ready_request(),
450                SignalDrain::Empty => {}
451            }
452
453            if let Some(request) = self.shared.take_ready_request() {
454                return Some(request);
455            }
456
457            let remaining = timeout.saturating_sub(started_at.elapsed());
458            if remaining.is_zero() {
459                return self.shared.take_ready_request();
460            }
461
462            match wait_for_signal_activity(&self.shared, remaining) {
463                SignalWaitOutcome::Woken => {}
464                SignalWaitOutcome::Disconnected | SignalWaitOutcome::TimedOut => {
465                    return self.shared.take_ready_request();
466                }
467            }
468        }
469    }
470}
471
472/// Two-phase permit returned by `reserve()`.
473///
474/// Dropping without `send()`/`abort()` automatically releases the reserved slot.
475#[derive(Debug)]
476pub struct SendPermit<'a> {
477    shared: Arc<TwoPhaseQueueShared>,
478    signal_permit: Option<mpsc::SendPermit<'a, CommitSignal>>,
479    logical_permit: Option<OwnedSemaphorePermit>,
480    reservation_seq: Option<u64>,
481}
482
483impl SendPermit<'_> {
484    /// Stable reservation sequence used to verify FIFO behavior in tests.
485    #[must_use]
486    pub fn reservation_seq(&self) -> u64 {
487        self.reservation_seq.unwrap_or(0)
488    }
489
490    /// Phase 2 commit. Synchronous and infallible for slot ownership.
491    pub fn send(mut self, request: CommitRequest) {
492        let Some(reservation_seq) = self.reservation_seq.take() else {
493            return;
494        };
495        let Some(logical_permit) = self.logical_permit.take() else {
496            return;
497        };
498
499        self.shared
500            .queue_commit(reservation_seq, request, logical_permit);
501
502        if let Some(signal_permit) = self.signal_permit.take() {
503            if signal_permit
504                .try_send(CommitSignal::CommitQueued { reservation_seq })
505                .is_err()
506            {
507                self.shared.rollback_pending_commit(reservation_seq);
508                self.shared.mark_aborted(reservation_seq);
509            }
510        }
511    }
512
513    /// Explicitly release reserved slot without sending.
514    pub fn abort(mut self) {
515        self.abort_current_reservation();
516    }
517
518    fn abort_current_reservation(&mut self) {
519        let Some(reservation_seq) = self.reservation_seq.take() else {
520            return;
521        };
522        if let Some(signal_permit) = self.signal_permit.take() {
523            signal_permit.abort();
524        }
525        let _ = self.logical_permit.take();
526        self.shared.mark_aborted(reservation_seq);
527    }
528}
529
530impl Drop for SendPermit<'_> {
531    fn drop(&mut self) {
532        self.abort_current_reservation();
533    }
534}
535
536/// Tracked sender variant that counts leaked permits (dropped without send/abort).
537#[derive(Debug, Clone)]
538pub struct TrackedSender {
539    sender: TwoPhaseCommitSender,
540    leaked_permits: Arc<AtomicU64>,
541}
542
543impl TrackedSender {
544    #[must_use]
545    pub fn new(sender: TwoPhaseCommitSender) -> Self {
546        Self {
547            sender,
548            leaked_permits: Arc::new(AtomicU64::new(0)),
549        }
550    }
551
552    pub fn reserve(&self) -> TrackedSendPermit<'_> {
553        TrackedSendPermit {
554            leaked_permits: Arc::clone(&self.leaked_permits),
555            permit: Some(self.sender.reserve()),
556        }
557    }
558
559    #[must_use]
560    pub fn leaked_permit_count(&self) -> u64 {
561        self.leaked_permits.load(Ordering::Acquire)
562    }
563}
564
565/// Tracked permit wrapper for safety-critical channels.
566#[derive(Debug)]
567pub struct TrackedSendPermit<'a> {
568    leaked_permits: Arc<AtomicU64>,
569    permit: Option<SendPermit<'a>>,
570}
571
572impl TrackedSendPermit<'_> {
573    /// Commit and clear obligation.
574    pub fn send(mut self, request: CommitRequest) {
575        if let Some(permit) = self.permit.take() {
576            permit.send(request);
577        }
578    }
579
580    /// Abort and clear obligation.
581    pub fn abort(mut self) {
582        if let Some(permit) = self.permit.take() {
583            permit.abort();
584        }
585    }
586}
587
588impl Drop for TrackedSendPermit<'_> {
589    fn drop(&mut self) {
590        if self.permit.is_some() {
591            self.leaked_permits.fetch_add(1, Ordering::AcqRel);
592        }
593    }
594}
595
596/// Build a bounded two-phase commit channel.
597#[must_use]
598pub fn two_phase_commit_channel(capacity: usize) -> (TwoPhaseCommitSender, TwoPhaseCommitReceiver) {
599    let shared = Arc::new(TwoPhaseQueueShared::with_capacity(capacity));
600    (
601        TwoPhaseCommitSender {
602            shared: Arc::clone(&shared),
603        },
604        TwoPhaseCommitReceiver { shared },
605    )
606}
607
608/// Little's-law-based capacity estimate.
609#[must_use]
610#[allow(
611    clippy::cast_possible_truncation,
612    clippy::cast_sign_loss,
613    clippy::cast_precision_loss
614)]
615pub fn little_law_capacity(
616    lambda_per_second: f64,
617    t_commit: Duration,
618    burst_multiplier: f64,
619    jitter_multiplier: f64,
620) -> usize {
621    let effective = lambda_per_second
622        * t_commit.as_secs_f64()
623        * burst_multiplier.max(1.0)
624        * jitter_multiplier.max(1.0);
625    effective.ceil().max(1.0) as usize
626}
627
628/// Classical optimal group-commit batch size: `sqrt(t_fsync / t_validate)`.
629#[must_use]
630#[allow(
631    clippy::cast_possible_truncation,
632    clippy::cast_sign_loss,
633    clippy::cast_precision_loss
634)]
635pub fn optimal_batch_size(t_fsync: Duration, t_validate: Duration, capacity: usize) -> usize {
636    let denom = t_validate.as_secs_f64().max(f64::EPSILON);
637    let raw = (t_fsync.as_secs_f64() / denom).sqrt().round();
638    raw.clamp(1.0, capacity.max(1) as f64) as usize
639}
640
641/// Conformal batch-size controller using upper quantiles.
642#[must_use]
643#[allow(
644    clippy::cast_possible_truncation,
645    clippy::cast_sign_loss,
646    clippy::cast_precision_loss
647)]
648pub fn conformal_batch_size(
649    fsync_samples: &[Duration],
650    validate_samples: &[Duration],
651    capacity: usize,
652) -> usize {
653    if fsync_samples.is_empty() || validate_samples.is_empty() {
654        return 1;
655    }
656    let q_fsync = quantile_seconds(fsync_samples, 0.9);
657    let q_validate = quantile_seconds(validate_samples, 0.9).max(f64::EPSILON);
658    let raw = (q_fsync / q_validate).sqrt().round();
659    raw.clamp(1.0, capacity.max(1) as f64) as usize
660}
661
662fn quantile_seconds(samples: &[Duration], quantile: f64) -> f64 {
663    let mut values: Vec<f64> = samples.iter().map(Duration::as_secs_f64).collect();
664    values.sort_by(f64::total_cmp);
665    #[allow(
666        clippy::cast_possible_truncation,
667        clippy::cast_sign_loss,
668        clippy::cast_precision_loss
669    )]
670    let idx = ((values.len() as f64 - 1.0) * quantile.clamp(0.0, 1.0)).round() as usize;
671    values[idx]
672}
673
674/// Commit/repair lifecycle events used for timing and invariant validation.
675#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
676pub enum CommitRepairEventKind {
677    CommitDurable,
678    DurableButNotRepairable,
679    CommitAcked,
680    RepairStarted,
681    RepairCompleted,
682    RepairFailed,
683}
684
685/// Timestamped lifecycle event for one commit sequence.
686#[derive(Debug, Clone, Copy)]
687pub struct CommitRepairEvent {
688    pub commit_seq: u64,
689    /// Monotonic per-commit event sequence number (logical time, no ambient authority).
690    pub seq: u64,
691    /// Monotonic wall-clock capture for latency/window measurements.
692    pub recorded_at: Instant,
693    pub kind: CommitRepairEventKind,
694}
695
696/// Repair state for a commit sequence.
697#[derive(Debug, Clone, Copy, PartialEq, Eq)]
698pub enum RepairState {
699    NotScheduled,
700    Pending,
701    Completed,
702    Failed,
703}
704
705/// Commit result produced by the critical path.
706#[derive(Debug, Clone, Copy)]
707pub struct CommitReceipt {
708    pub commit_seq: u64,
709    pub durable: bool,
710    pub repair_pending: bool,
711    pub latency: Duration,
712}
713
714/// Runtime behavior toggle for async repair generation.
715#[derive(Debug, Clone, Copy, PartialEq, Eq)]
716pub struct CommitRepairConfig {
717    pub repair_enabled: bool,
718}
719
720impl Default for CommitRepairConfig {
721    fn default() -> Self {
722        Self {
723            repair_enabled: true,
724        }
725    }
726}
727
728/// Storage sink for systematic/repair symbol append+sync operations.
729pub trait CommitRepairIo: Send + Sync {
730    fn append_systematic_symbols(&self, commit_seq: u64, systematic_symbols: &[u8]) -> Result<()>;
731    fn sync_systematic_symbols(&self, commit_seq: u64) -> Result<()>;
732    fn append_repair_symbols(&self, commit_seq: u64, repair_symbols: &[u8]) -> Result<()>;
733    fn sync_repair_symbols(&self, commit_seq: u64) -> Result<()>;
734}
735
736/// Generator for repair symbols from committed systematic symbols.
737pub trait RepairSymbolGenerator: Send + Sync {
738    fn generate_repair_symbols(
739        &self,
740        commit_seq: u64,
741        systematic_symbols: &[u8],
742    ) -> Result<Vec<u8>>;
743}
744
745/// In-memory IO sink useful for deterministic testing/instrumentation.
746#[derive(Debug, Default)]
747pub struct InMemoryCommitRepairIo {
748    systematic_by_commit: Mutex<HashMap<u64, Vec<u8>>>,
749    repair_by_commit: Mutex<HashMap<u64, Vec<u8>>>,
750    total_systematic_bytes: AtomicU64,
751    total_repair_bytes: AtomicU64,
752    systematic_syncs: AtomicU64,
753    repair_syncs: AtomicU64,
754}
755
756impl InMemoryCommitRepairIo {
757    #[must_use]
758    pub fn total_repair_bytes(&self) -> u64 {
759        self.total_repair_bytes.load(Ordering::Acquire)
760    }
761
762    #[must_use]
763    pub fn repair_sync_count(&self) -> u64 {
764        self.repair_syncs.load(Ordering::Acquire)
765    }
766
767    #[must_use]
768    pub fn systematic_sync_count(&self) -> u64 {
769        self.systematic_syncs.load(Ordering::Acquire)
770    }
771
772    #[must_use]
773    pub fn repair_symbols_for(&self, commit_seq: u64) -> Option<Vec<u8>> {
774        lock_with_recovery(&self.repair_by_commit, "repair_by_commit")
775            .get(&commit_seq)
776            .cloned()
777    }
778}
779
780impl CommitRepairIo for InMemoryCommitRepairIo {
781    fn append_systematic_symbols(&self, commit_seq: u64, systematic_symbols: &[u8]) -> Result<()> {
782        lock_with_recovery(&self.systematic_by_commit, "systematic_by_commit")
783            .insert(commit_seq, systematic_symbols.to_vec());
784        self.total_systematic_bytes.fetch_add(
785            u64::try_from(systematic_symbols.len()).map_err(|_| FrankenError::OutOfRange {
786                what: "systematic_symbol_len".to_owned(),
787                value: systematic_symbols.len().to_string(),
788            })?,
789            Ordering::Release,
790        );
791        Ok(())
792    }
793
794    fn sync_systematic_symbols(&self, _commit_seq: u64) -> Result<()> {
795        self.systematic_syncs.fetch_add(1, Ordering::Release);
796        Ok(())
797    }
798
799    fn append_repair_symbols(&self, commit_seq: u64, repair_symbols: &[u8]) -> Result<()> {
800        lock_with_recovery(&self.repair_by_commit, "repair_by_commit")
801            .insert(commit_seq, repair_symbols.to_vec());
802        self.total_repair_bytes.fetch_add(
803            u64::try_from(repair_symbols.len()).map_err(|_| FrankenError::OutOfRange {
804                what: "repair_symbol_len".to_owned(),
805                value: repair_symbols.len().to_string(),
806            })?,
807            Ordering::Release,
808        );
809        Ok(())
810    }
811
812    fn sync_repair_symbols(&self, _commit_seq: u64) -> Result<()> {
813        self.repair_syncs.fetch_add(1, Ordering::Release);
814        Ok(())
815    }
816}
817
818/// Deterministic repair generator with configurable delay/failure injection.
819#[derive(Debug)]
820pub struct DeterministicRepairGenerator {
821    delay: Duration,
822    output_len: usize,
823    fail_repair: Arc<AtomicBool>,
824}
825
826impl DeterministicRepairGenerator {
827    #[must_use]
828    pub fn new(delay: Duration, output_len: usize) -> Self {
829        Self {
830            delay,
831            output_len: output_len.max(1),
832            fail_repair: Arc::new(AtomicBool::new(false)),
833        }
834    }
835
836    pub fn set_fail_repair(&self, fail: bool) {
837        self.fail_repair.store(fail, Ordering::Release);
838    }
839}
840
841impl RepairSymbolGenerator for DeterministicRepairGenerator {
842    fn generate_repair_symbols(
843        &self,
844        commit_seq: u64,
845        systematic_symbols: &[u8],
846    ) -> Result<Vec<u8>> {
847        if self.delay != Duration::ZERO {
848            thread::sleep(self.delay);
849        }
850        if self.fail_repair.load(Ordering::Acquire) {
851            return Err(FrankenError::Internal(format!(
852                "repair generation failed for commit_seq={commit_seq}"
853            )));
854        }
855
856        let source = if systematic_symbols.is_empty() {
857            &[0_u8][..]
858        } else {
859            systematic_symbols
860        };
861        let mut state = commit_seq
862            ^ u64::try_from(source.len()).map_err(|_| FrankenError::OutOfRange {
863                what: "systematic_symbol_len".to_owned(),
864                value: source.len().to_string(),
865            })?;
866        let mut out = Vec::with_capacity(self.output_len);
867        for idx in 0..self.output_len {
868            let src = source[idx % source.len()];
869            let idx_mod = u64::try_from(idx % 251).map_err(|_| FrankenError::OutOfRange {
870                what: "repair_symbol_index".to_owned(),
871                value: idx.to_string(),
872            })?;
873            state = state.rotate_left(7) ^ u64::from(src) ^ idx_mod;
874            out.push((state & 0xFF) as u8);
875        }
876        Ok(out)
877    }
878}
879
880/// Two-phase commit durability coordinator.
881pub struct CommitRepairCoordinator<
882    IO: CommitRepairIo + Send + Sync + 'static,
883    GEN: RepairSymbolGenerator + Send + Sync + 'static,
884> {
885    config: CommitRepairConfig,
886    runtime: Runtime,
887    coordinator_cx: Cx,
888    io: Arc<IO>,
889    generator: Arc<GEN>,
890    next_commit_seq: AtomicU64,
891    next_async_task_id: AtomicU64,
892    repair_states: Arc<Mutex<HashMap<u64, RepairState>>>,
893    events: Arc<Mutex<Vec<CommitRepairEvent>>>,
894    handles: Mutex<Vec<AsyncJoinHandle<()>>>,
895}
896
897impl<IO, GEN> CommitRepairCoordinator<IO, GEN>
898where
899    IO: CommitRepairIo + Send + Sync + 'static,
900    GEN: RepairSymbolGenerator + Send + Sync + 'static,
901{
902    #[must_use]
903    pub fn new(
904        config: CommitRepairConfig,
905        runtime: Runtime,
906        parent_cx: &Cx,
907        io: IO,
908        generator: GEN,
909    ) -> Self {
910        Self::with_shared(
911            config,
912            runtime,
913            parent_cx,
914            Arc::new(io),
915            Arc::new(generator),
916        )
917    }
918
919    #[must_use]
920    pub fn with_shared(
921        config: CommitRepairConfig,
922        runtime: Runtime,
923        parent_cx: &Cx,
924        io: Arc<IO>,
925        generator: Arc<GEN>,
926    ) -> Self {
927        Self {
928            config,
929            runtime,
930            coordinator_cx: parent_cx.create_child(),
931            io,
932            generator,
933            next_commit_seq: AtomicU64::new(1),
934            next_async_task_id: AtomicU64::new(1),
935            repair_states: Arc::new(Mutex::new(HashMap::new())),
936            events: Arc::new(Mutex::new(Vec::new())),
937            handles: Mutex::new(Vec::new()),
938        }
939    }
940
941    /// Execute critical-path durability and schedule async repair work.
942    pub fn commit(&self, systematic_symbols: &[u8]) -> Result<CommitReceipt> {
943        let started_at = Instant::now();
944        let commit_seq = self.next_commit_seq.fetch_add(1, Ordering::Relaxed);
945
946        self.io
947            .append_systematic_symbols(commit_seq, systematic_symbols)?;
948        self.io.sync_systematic_symbols(commit_seq)?;
949        self.record(commit_seq, CommitRepairEventKind::CommitDurable);
950
951        if !self.config.repair_enabled {
952            self.record(commit_seq, CommitRepairEventKind::CommitAcked);
953            return Ok(CommitReceipt {
954                commit_seq,
955                durable: true,
956                repair_pending: false,
957                latency: started_at.elapsed(),
958            });
959        }
960
961        lock_with_recovery(&self.repair_states, "repair_states")
962            .insert(commit_seq, RepairState::Pending);
963        self.record(commit_seq, CommitRepairEventKind::DurableButNotRepairable);
964        debug!(
965            bead_id = BEAD_ID,
966            commit_seq, "commit is durable but not repairable while async repair is pending"
967        );
968        self.record(commit_seq, CommitRepairEventKind::CommitAcked);
969
970        let async_task_id = self.next_async_task_id.fetch_add(1, Ordering::Relaxed);
971        let io = Arc::clone(&self.io);
972        let generator = Arc::clone(&self.generator);
973        let repair_states = Arc::clone(&self.repair_states);
974        let events = Arc::clone(&self.events);
975        let systematic_snapshot = systematic_symbols.to_vec();
976        let worker_cx = self.coordinator_cx.create_child();
977        let handle = self.runtime.handle().try_spawn(run_repair_task(RepairTask {
978            commit_seq,
979            async_task_id,
980            io,
981            generator,
982            repair_states,
983            events,
984            systematic_snapshot,
985            worker_cx,
986        }));
987        match handle {
988            Ok(handle) => {
989                lock_with_recovery(&self.handles, "repair_handles").push(handle);
990            }
991            Err(err) => {
992                set_repair_state(&self.repair_states, commit_seq, RepairState::Failed);
993                self.record(commit_seq, CommitRepairEventKind::RepairFailed);
994                error!(
995                    bead_id = BEAD_ID,
996                    commit_seq,
997                    async_task_id,
998                    error = ?err,
999                    "failed to schedule repair task on caller-owned runtime"
1000                );
1001                return Ok(CommitReceipt {
1002                    commit_seq,
1003                    durable: true,
1004                    repair_pending: false,
1005                    latency: started_at.elapsed(),
1006                });
1007            }
1008        }
1009
1010        Ok(CommitReceipt {
1011            commit_seq,
1012            durable: true,
1013            repair_pending: true,
1014            latency: started_at.elapsed(),
1015        })
1016    }
1017
1018    /// Join all currently scheduled background repair workers.
1019    pub fn wait_for_background_repair(&self) -> Result<()> {
1020        let handles = {
1021            let mut guard = lock_with_recovery(&self.handles, "repair_handles");
1022            std::mem::take(&mut *guard)
1023        };
1024        let mut observed_panic = false;
1025        for handle in handles {
1026            let joined =
1027                std::panic::catch_unwind(AssertUnwindSafe(|| self.runtime.block_on(handle)));
1028            if joined.is_err() {
1029                observed_panic = true;
1030            }
1031        }
1032        if observed_panic {
1033            return Err(FrankenError::Internal(
1034                "background repair worker panicked".to_owned(),
1035            ));
1036        }
1037        Ok(())
1038    }
1039
1040    #[must_use]
1041    pub fn pending_background_repair_count(&self) -> usize {
1042        lock_with_recovery(&self.repair_states, "repair_states")
1043            .values()
1044            .filter(|state| matches!(state, RepairState::Pending))
1045            .count()
1046    }
1047
1048    #[must_use]
1049    pub fn repair_state_for(&self, commit_seq: u64) -> RepairState {
1050        lock_with_recovery(&self.repair_states, "repair_states")
1051            .get(&commit_seq)
1052            .copied()
1053            .unwrap_or(RepairState::NotScheduled)
1054    }
1055
1056    #[must_use]
1057    pub fn events_for_commit(&self, commit_seq: u64) -> Vec<CommitRepairEvent> {
1058        lock_with_recovery(&self.events, "repair_events")
1059            .iter()
1060            .copied()
1061            .filter(|event| event.commit_seq == commit_seq)
1062            .collect()
1063    }
1064
1065    #[must_use]
1066    pub fn durable_not_repairable_window(&self, commit_seq: u64) -> Option<Duration> {
1067        let events = self.events_for_commit(commit_seq);
1068        let pending = events
1069            .iter()
1070            .find(|event| event.kind == CommitRepairEventKind::DurableButNotRepairable)?;
1071        let repair_done = events
1072            .iter()
1073            .find(|event| event.kind == CommitRepairEventKind::RepairCompleted)?;
1074        Some(
1075            repair_done
1076                .recorded_at
1077                .saturating_duration_since(pending.recorded_at),
1078        )
1079    }
1080
1081    #[must_use]
1082    pub fn io_handle(&self) -> Arc<IO> {
1083        Arc::clone(&self.io)
1084    }
1085
1086    #[must_use]
1087    pub fn generator_handle(&self) -> Arc<GEN> {
1088        Arc::clone(&self.generator)
1089    }
1090
1091    fn record(&self, commit_seq: u64, kind: CommitRepairEventKind) {
1092        record_event_into(&self.events, commit_seq, kind);
1093    }
1094}
1095
1096impl<IO, GEN> Drop for CommitRepairCoordinator<IO, GEN>
1097where
1098    IO: CommitRepairIo + Send + Sync + 'static,
1099    GEN: RepairSymbolGenerator + Send + Sync + 'static,
1100{
1101    fn drop(&mut self) {
1102        let handles = {
1103            let mut guard = lock_with_recovery(&self.handles, "repair_handles");
1104            std::mem::take(&mut *guard)
1105        };
1106        for handle in handles {
1107            let joined =
1108                std::panic::catch_unwind(AssertUnwindSafe(|| self.runtime.block_on(handle)));
1109            if joined.is_err() {
1110                error!(
1111                    bead_id = BEAD_ID,
1112                    "background repair worker panicked during drop"
1113                );
1114            }
1115        }
1116    }
1117}
1118
1119struct RepairTask<IO, GEN> {
1120    commit_seq: u64,
1121    async_task_id: u64,
1122    io: Arc<IO>,
1123    generator: Arc<GEN>,
1124    repair_states: Arc<Mutex<HashMap<u64, RepairState>>>,
1125    events: Arc<Mutex<Vec<CommitRepairEvent>>>,
1126    systematic_snapshot: Vec<u8>,
1127    worker_cx: Cx,
1128}
1129
1130async fn run_repair_task<IO, GEN>(task: RepairTask<IO, GEN>)
1131where
1132    IO: CommitRepairIo + Send + Sync + 'static,
1133    GEN: RepairSymbolGenerator + Send + Sync + 'static,
1134{
1135    let RepairTask {
1136        commit_seq,
1137        async_task_id,
1138        io,
1139        generator,
1140        repair_states,
1141        events,
1142        systematic_snapshot,
1143        worker_cx,
1144    } = task;
1145
1146    let Some(native_worker_cx) = NativeCx::current() else {
1147        set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1148        record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1149        error!(
1150            bead_id = BEAD_ID,
1151            commit_seq, async_task_id, "repair task missing native runtime context"
1152        );
1153        return;
1154    };
1155    worker_cx.set_native_cx(native_worker_cx.clone());
1156    if worker_cx.checkpoint().is_err() {
1157        set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1158        record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1159        warn!(
1160            bead_id = BEAD_ID,
1161            commit_seq, async_task_id, "repair task was cancelled before blocking work started"
1162        );
1163        return;
1164    }
1165
1166    info!(
1167        bead_id = BEAD_ID,
1168        commit_seq, async_task_id, "repair symbols generation started"
1169    );
1170    record_event_into(&events, commit_seq, CommitRepairEventKind::RepairStarted);
1171
1172    let blocking_cx = worker_cx.create_child();
1173    let repair_outcome = spawn_blocking(move || {
1174        std::panic::catch_unwind(AssertUnwindSafe(|| {
1175            blocking_cx.set_native_cx(native_worker_cx);
1176            let repair_symbols =
1177                generator.generate_repair_symbols(commit_seq, &systematic_snapshot)?;
1178            let repair_symbol_bytes = repair_symbols.len();
1179            io.append_repair_symbols(commit_seq, &repair_symbols)?;
1180            io.sync_repair_symbols(commit_seq)?;
1181            Ok::<usize, FrankenError>(repair_symbol_bytes)
1182        }))
1183    })
1184    .await;
1185
1186    match repair_outcome {
1187        Ok(Ok(repair_symbol_bytes)) => {
1188            set_repair_state(&repair_states, commit_seq, RepairState::Completed);
1189            record_event_into(&events, commit_seq, CommitRepairEventKind::RepairCompleted);
1190            info!(
1191                bead_id = BEAD_ID,
1192                commit_seq,
1193                async_task_id,
1194                repair_symbol_bytes,
1195                "repair symbols append+sync completed"
1196            );
1197        }
1198        // `spawn_blocking` returns the closure result directly, so the outer
1199        // `catch_unwind` layer is the panic boundary and the inner `Result`
1200        // carries generator / append / sync failures.
1201        Ok(Err(err)) => {
1202            set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1203            record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1204            error!(
1205                bead_id = BEAD_ID,
1206                commit_seq,
1207                async_task_id,
1208                error = ?err,
1209                "repair symbol generation or append/sync failed"
1210            );
1211        }
1212        Err(_panic_payload) => {
1213            set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1214            record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1215            error!(
1216                bead_id = BEAD_ID,
1217                commit_seq, async_task_id, "repair symbol worker panicked"
1218            );
1219        }
1220    }
1221}
1222
1223fn lock_with_recovery<'a, T>(mutex: &'a Mutex<T>, lock_name: &'static str) -> MutexGuard<'a, T> {
1224    match mutex.lock() {
1225        Ok(guard) => guard,
1226        Err(poisoned) => {
1227            warn!(
1228                bead_id = BEAD_ID,
1229                lock = lock_name,
1230                "mutex poisoned; recovering inner state"
1231            );
1232            poisoned.into_inner()
1233        }
1234    }
1235}
1236
1237fn set_repair_state(
1238    repair_states: &Arc<Mutex<HashMap<u64, RepairState>>>,
1239    commit_seq: u64,
1240    state: RepairState,
1241) {
1242    lock_with_recovery(repair_states, "repair_states").insert(commit_seq, state);
1243}
1244
1245fn record_event_into(
1246    events: &Arc<Mutex<Vec<CommitRepairEvent>>>,
1247    commit_seq: u64,
1248    kind: CommitRepairEventKind,
1249) {
1250    let mut guard = lock_with_recovery(events, "repair_events");
1251    let seq = guard
1252        .iter()
1253        .rev()
1254        .find(|event| event.commit_seq == commit_seq)
1255        .map_or(1, |event| event.seq.saturating_add(1));
1256    guard.push(CommitRepairEvent {
1257        commit_seq,
1258        seq,
1259        recorded_at: Instant::now(),
1260        kind,
1261    });
1262}
1263
1264// ---------------------------------------------------------------------------
1265// Group Commit Batching (ยง5.9.2.1, bd-l4gl)
1266// ---------------------------------------------------------------------------
1267
1268const GROUP_COMMIT_BEAD_ID: &str = "bd-l4gl";
1269const GROUP_COMMIT_IDLE_POLL_INTERVAL: Duration = Duration::from_millis(50);
1270
1271/// Phase label recorded during coordinator batch processing for ordering
1272/// verification.
1273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1274pub enum BatchPhase {
1275    /// Write-set conflict validation for each request.
1276    Validate,
1277    /// Sequential WAL append for all valid requests.
1278    WalAppend,
1279    /// Single `fsync` for the entire batch.
1280    Fsync,
1281    /// Version publication and response delivery.
1282    Publish,
1283}
1284
1285/// Response returned to each writer after batch processing.
1286#[derive(Debug, Clone, PartialEq, Eq)]
1287pub enum GroupCommitResponse {
1288    /// Commit succeeded; pages are durable and published.
1289    Committed { wal_offset: u64, commit_seq: u64 },
1290    /// Commit rejected due to write-set conflict.
1291    Conflict { reason: String },
1292}
1293
1294/// Result of processing a single batch through the coordinator.
1295#[derive(Debug)]
1296pub struct BatchResult {
1297    /// Successfully committed entries: `(txn_id, wal_offset, commit_seq)`.
1298    pub committed: Vec<(u64, u64, u64)>,
1299    /// Rejected entries: `(txn_id, reason)`.
1300    pub conflicted: Vec<(u64, String)>,
1301    /// Number of fsync calls issued for this batch (should always be 0 or 1).
1302    pub fsync_count: u32,
1303    /// Ordered record of phases executed during batch processing.
1304    pub phase_order: Vec<BatchPhase>,
1305}
1306
1307/// Batch WAL writer abstraction for the group commit coordinator.
1308///
1309/// A single `append_batch` call writes all frames from all valid requests
1310/// in one sequential `write()`, and `sync` issues exactly one `fsync`.
1311pub trait WalBatchWriter: Send + Sync {
1312    /// Append commit frames for every request in the batch. Returns a WAL
1313    /// offset per request.
1314    fn append_batch(&self, requests: &[&CommitRequest]) -> Result<Vec<u64>>;
1315
1316    /// Issue a single `fsync` (or `fdatasync`) covering all appended frames.
1317    fn sync(&self) -> Result<()>;
1318}
1319
1320/// Write-set conflict validator using first-committer-wins (FCW) logic.
1321///
1322/// `committed_pages` is the set of pages that have been committed since
1323/// the validating transaction's snapshot.
1324pub trait WriteSetValidator: Send + Sync {
1325    /// Returns `Ok(())` if the request passes validation, or `Err` with
1326    /// a human-readable conflict description.
1327    fn validate(
1328        &self,
1329        request: &CommitRequest,
1330        committed_pages: &BTreeSet<u32>,
1331    ) -> std::result::Result<(), String>;
1332}
1333
1334/// First-committer-wins validator: any overlap between the request's
1335/// write set and already-committed pages is a conflict.
1336#[derive(Debug, Default)]
1337pub struct FirstCommitterWinsValidator;
1338
1339impl WriteSetValidator for FirstCommitterWinsValidator {
1340    fn validate(
1341        &self,
1342        request: &CommitRequest,
1343        committed_pages: &BTreeSet<u32>,
1344    ) -> std::result::Result<(), String> {
1345        for &page in &request.write_set_pages {
1346            if committed_pages.contains(&page) {
1347                return Err(format!(
1348                    "write-set conflict on page {page} for txn {}",
1349                    request.txn_id
1350                ));
1351            }
1352        }
1353        Ok(())
1354    }
1355}
1356
1357/// In-memory WAL writer for deterministic testing and instrumentation.
1358#[derive(Debug)]
1359pub struct InMemoryWalWriter {
1360    next_offset: AtomicU64,
1361    sync_count: AtomicU64,
1362    total_appended: AtomicU64,
1363    /// Simulated fsync latency for throughput model tests.
1364    fsync_delay: Duration,
1365}
1366
1367impl InMemoryWalWriter {
1368    /// Create an in-memory WAL writer with no simulated fsync delay.
1369    #[must_use]
1370    pub fn new() -> Self {
1371        Self {
1372            next_offset: AtomicU64::new(1),
1373            sync_count: AtomicU64::new(0),
1374            total_appended: AtomicU64::new(0),
1375            fsync_delay: Duration::ZERO,
1376        }
1377    }
1378
1379    /// Create with simulated fsync delay for throughput model testing.
1380    #[must_use]
1381    pub fn with_fsync_delay(delay: Duration) -> Self {
1382        Self {
1383            next_offset: AtomicU64::new(1),
1384            sync_count: AtomicU64::new(0),
1385            total_appended: AtomicU64::new(0),
1386            fsync_delay: delay,
1387        }
1388    }
1389
1390    /// Total number of `sync()` calls observed.
1391    #[must_use]
1392    pub fn sync_count(&self) -> u64 {
1393        self.sync_count.load(Ordering::Acquire)
1394    }
1395
1396    /// Total requests appended across all batches.
1397    #[must_use]
1398    pub fn total_appended(&self) -> u64 {
1399        self.total_appended.load(Ordering::Acquire)
1400    }
1401}
1402
1403impl Default for InMemoryWalWriter {
1404    fn default() -> Self {
1405        Self::new()
1406    }
1407}
1408
1409impl WalBatchWriter for InMemoryWalWriter {
1410    fn append_batch(&self, requests: &[&CommitRequest]) -> Result<Vec<u64>> {
1411        let mut offsets = Vec::with_capacity(requests.len());
1412        for _req in requests {
1413            let offset = self.next_offset.fetch_add(1, Ordering::Relaxed);
1414            offsets.push(offset);
1415        }
1416        #[allow(clippy::cast_possible_truncation)]
1417        self.total_appended
1418            .fetch_add(requests.len() as u64, Ordering::Release);
1419        Ok(offsets)
1420    }
1421
1422    fn sync(&self) -> Result<()> {
1423        if self.fsync_delay != Duration::ZERO {
1424            thread::sleep(self.fsync_delay);
1425        }
1426        self.sync_count.fetch_add(1, Ordering::Release);
1427        Ok(())
1428    }
1429}
1430
1431#[cfg(test)]
1432mod commit_repair_async_tests {
1433    use super::*;
1434
1435    use asupersync::runtime::RuntimeBuilder;
1436    use std::thread;
1437
1438    #[test]
1439    fn test_commit_repair_runs_on_caller_owned_runtime() {
1440        let runtime = RuntimeBuilder::current_thread()
1441            .build()
1442            .expect("commit repair runtime");
1443        let root_cx = Cx::new();
1444        let io = Arc::new(InMemoryCommitRepairIo::default());
1445        let generator = Arc::new(DeterministicRepairGenerator::new(
1446            Duration::from_millis(25),
1447            64,
1448        ));
1449        let coordinator = CommitRepairCoordinator::with_shared(
1450            CommitRepairConfig {
1451                repair_enabled: true,
1452            },
1453            runtime,
1454            &root_cx,
1455            Arc::clone(&io),
1456            generator,
1457        );
1458
1459        let receipt = coordinator
1460            .commit(&[0xAB; 256])
1461            .expect("commit should succeed");
1462        assert_eq!(coordinator.pending_background_repair_count(), 1);
1463
1464        coordinator
1465            .wait_for_background_repair()
1466            .expect("background repair should finish");
1467
1468        assert_eq!(coordinator.pending_background_repair_count(), 0);
1469        assert_eq!(
1470            coordinator.repair_state_for(receipt.commit_seq),
1471            RepairState::Completed
1472        );
1473        assert!(
1474            coordinator
1475                .events_for_commit(receipt.commit_seq)
1476                .iter()
1477                .any(|event| event.kind == CommitRepairEventKind::RepairStarted)
1478        );
1479        assert!(
1480            io.total_repair_bytes() > 0,
1481            "repair task should append repair symbols"
1482        );
1483    }
1484
1485    #[test]
1486    fn test_commit_receipt_latency_tracks_critical_path_io() {
1487        #[derive(Debug)]
1488        struct DelayedIo {
1489            delay: Duration,
1490        }
1491
1492        impl CommitRepairIo for DelayedIo {
1493            fn append_systematic_symbols(
1494                &self,
1495                _commit_seq: u64,
1496                _systematic_symbols: &[u8],
1497            ) -> Result<()> {
1498                Ok(())
1499            }
1500
1501            fn sync_systematic_symbols(&self, _commit_seq: u64) -> Result<()> {
1502                thread::sleep(self.delay);
1503                Ok(())
1504            }
1505
1506            fn append_repair_symbols(
1507                &self,
1508                _commit_seq: u64,
1509                _repair_symbols: &[u8],
1510            ) -> Result<()> {
1511                Ok(())
1512            }
1513
1514            fn sync_repair_symbols(&self, _commit_seq: u64) -> Result<()> {
1515                Ok(())
1516            }
1517        }
1518
1519        let runtime = RuntimeBuilder::current_thread()
1520            .build()
1521            .expect("commit repair runtime");
1522        let root_cx = Cx::new();
1523        let coordinator = CommitRepairCoordinator::new(
1524            CommitRepairConfig {
1525                repair_enabled: false,
1526            },
1527            runtime,
1528            &root_cx,
1529            DelayedIo {
1530                delay: Duration::from_millis(10),
1531            },
1532            DeterministicRepairGenerator::new(Duration::ZERO, 64),
1533        );
1534
1535        let receipt = coordinator
1536            .commit(&[0xAB; 256])
1537            .expect("commit should succeed");
1538
1539        assert!(
1540            receipt.latency >= Duration::from_millis(8),
1541            "commit latency should include the critical-path systematic sync cost"
1542        );
1543    }
1544
1545    #[test]
1546    fn test_durable_not_repairable_window_tracks_wall_clock_delay() {
1547        let runtime = RuntimeBuilder::current_thread()
1548            .build()
1549            .expect("commit repair runtime");
1550        let root_cx = Cx::new();
1551        let coordinator = CommitRepairCoordinator::new(
1552            CommitRepairConfig {
1553                repair_enabled: true,
1554            },
1555            runtime,
1556            &root_cx,
1557            InMemoryCommitRepairIo::default(),
1558            DeterministicRepairGenerator::new(Duration::from_millis(20), 64),
1559        );
1560
1561        let receipt = coordinator
1562            .commit(&[0xAB; 256])
1563            .expect("commit should succeed");
1564        coordinator
1565            .wait_for_background_repair()
1566            .expect("background repair should finish");
1567
1568        let window = coordinator
1569            .durable_not_repairable_window(receipt.commit_seq)
1570            .expect("window should be measurable");
1571        assert!(
1572            window >= Duration::from_millis(15),
1573            "window should reflect real repair delay rather than logical event ordinals"
1574        );
1575    }
1576
1577    #[test]
1578    fn test_panicking_repair_marks_failed_without_abandoning_other_work() {
1579        #[derive(Debug)]
1580        struct PanicFirstGenerator;
1581
1582        impl RepairSymbolGenerator for PanicFirstGenerator {
1583            fn generate_repair_symbols(
1584                &self,
1585                commit_seq: u64,
1586                _systematic_symbols: &[u8],
1587            ) -> Result<Vec<u8>> {
1588                if commit_seq == 1 {
1589                    panic!("intentional repair panic");
1590                }
1591                thread::sleep(Duration::from_millis(25));
1592                Ok(vec![0xCD; 32])
1593            }
1594        }
1595
1596        let runtime = RuntimeBuilder::current_thread()
1597            .build()
1598            .expect("commit repair runtime");
1599        let root_cx = Cx::new();
1600        let coordinator = CommitRepairCoordinator::new(
1601            CommitRepairConfig {
1602                repair_enabled: true,
1603            },
1604            runtime,
1605            &root_cx,
1606            InMemoryCommitRepairIo::default(),
1607            PanicFirstGenerator,
1608        );
1609
1610        let first = coordinator
1611            .commit(&[0xAA; 64])
1612            .expect("first commit should schedule repair");
1613        let second = coordinator
1614            .commit(&[0xBB; 64])
1615            .expect("second commit should schedule repair");
1616
1617        coordinator
1618            .wait_for_background_repair()
1619            .expect("panic inside repair work should be converted into RepairFailed state");
1620        assert_eq!(coordinator.pending_background_repair_count(), 0);
1621        assert_eq!(
1622            coordinator.repair_state_for(first.commit_seq),
1623            RepairState::Failed,
1624            "panicking repair task must be marked failed rather than left pending"
1625        );
1626        assert!(
1627            coordinator
1628                .events_for_commit(first.commit_seq)
1629                .iter()
1630                .any(|event| event.kind == CommitRepairEventKind::RepairFailed),
1631            "panicking repair task must emit a RepairFailed event"
1632        );
1633        assert_eq!(
1634            coordinator.repair_state_for(second.commit_seq),
1635            RepairState::Completed,
1636            "wait_for_background_repair must still drain non-panicking tasks"
1637        );
1638    }
1639}
1640
1641/// Group commit coordinator configuration.
1642#[derive(Debug, Clone, Copy)]
1643pub struct GroupCommitConfig {
1644    /// Maximum requests coalesced into a single batch.
1645    pub max_batch_size: usize,
1646    /// Timeout for draining additional requests after the first.
1647    pub drain_timeout: Duration,
1648}
1649
1650impl Default for GroupCommitConfig {
1651    fn default() -> Self {
1652        Self {
1653            max_batch_size: DEFAULT_COMMIT_CHANNEL_CAPACITY,
1654            drain_timeout: Duration::from_micros(100),
1655        }
1656    }
1657}
1658
1659/// Published version notification for a committed transaction.
1660#[derive(Debug, Clone, PartialEq, Eq)]
1661pub struct PublishedVersion {
1662    pub txn_id: u64,
1663    pub commit_seq: u64,
1664    pub wal_offset: u64,
1665}
1666
1667/// Group commit coordinator that batches write-coordinator requests to
1668/// amortize `fsync` cost (ยง5.9.2.1, bd-l4gl).
1669///
1670/// The coordinator processes requests from the bounded two-phase MPSC
1671/// channel in 4 strict phases per batch:
1672///
1673/// 1. **Validate** โ€” first-committer-wins conflict check
1674/// 2. **WAL append** โ€” single sequential `write()` for all valid frames
1675/// 3. **Fsync** โ€” exactly ONE `fsync()` per batch
1676/// 4. **Publish** โ€” make versions visible and deliver responses
1677pub struct GroupCommitCoordinator<W: WalBatchWriter, V: WriteSetValidator> {
1678    wal: Arc<W>,
1679    validator: Arc<V>,
1680    config: GroupCommitConfig,
1681    next_commit_seq: AtomicU64,
1682    committed_pages: Mutex<BTreeSet<u32>>,
1683    published: Mutex<Vec<PublishedVersion>>,
1684    batch_history: Mutex<Vec<BatchResult>>,
1685    total_batches: AtomicU64,
1686}
1687
1688impl<W, V> GroupCommitCoordinator<W, V>
1689where
1690    W: WalBatchWriter + 'static,
1691    V: WriteSetValidator + 'static,
1692{
1693    /// Create a new group commit coordinator.
1694    #[must_use]
1695    pub fn new(wal: W, validator: V, config: GroupCommitConfig) -> Self {
1696        Self {
1697            wal: Arc::new(wal),
1698            validator: Arc::new(validator),
1699            config,
1700            next_commit_seq: AtomicU64::new(1),
1701            committed_pages: Mutex::new(BTreeSet::new()),
1702            published: Mutex::new(Vec::new()),
1703            batch_history: Mutex::new(Vec::new()),
1704            total_batches: AtomicU64::new(0),
1705        }
1706    }
1707
1708    /// Process a single batch of requests through the 4-phase pipeline.
1709    ///
1710    /// Returns individual responses and batch-level metrics. Phase ordering
1711    /// is recorded in `BatchResult::phase_order` for verification.
1712    #[allow(clippy::too_many_lines)]
1713    pub fn process_batch(
1714        &self,
1715        requests: Vec<CommitRequest>,
1716    ) -> Result<(Vec<(CommitRequest, GroupCommitResponse)>, BatchResult)> {
1717        if requests.is_empty() {
1718            return Ok((
1719                Vec::new(),
1720                BatchResult {
1721                    committed: Vec::new(),
1722                    conflicted: Vec::new(),
1723                    fsync_count: 0,
1724                    phase_order: Vec::new(),
1725                },
1726            ));
1727        }
1728
1729        let batch_size = requests.len();
1730        debug!(
1731            bead_id = GROUP_COMMIT_BEAD_ID,
1732            batch_size, "processing group commit batch"
1733        );
1734
1735        let mut phase_order = Vec::with_capacity(4);
1736        let mut responses: Vec<(CommitRequest, GroupCommitResponse)> =
1737            Vec::with_capacity(batch_size);
1738        let mut valid_requests: Vec<CommitRequest> = Vec::with_capacity(batch_size);
1739        let mut conflicted: Vec<(u64, String)> = Vec::new();
1740
1741        // ---- Phase 1: Validate ----
1742        phase_order.push(BatchPhase::Validate);
1743        let mut merged_committed =
1744            lock_with_recovery(&self.committed_pages, "committed_pages").clone();
1745        // Within a batch, earlier requests (by position) win over later ones
1746        // when their write sets overlap.
1747        for req in requests {
1748            match self.validator.validate(&req, &merged_committed) {
1749                Ok(()) => {
1750                    for &page in &req.write_set_pages {
1751                        merged_committed.insert(page);
1752                    }
1753                    valid_requests.push(req);
1754                }
1755                Err(reason) => {
1756                    info!(
1757                        bead_id = GROUP_COMMIT_BEAD_ID,
1758                        txn_id = req.txn_id,
1759                        reason = %reason,
1760                        "conflict detected in validate phase (fail-fast)"
1761                    );
1762                    conflicted.push((req.txn_id, reason.clone()));
1763                    responses.push((req, GroupCommitResponse::Conflict { reason }));
1764                }
1765            }
1766        }
1767
1768        if valid_requests.is_empty() {
1769            let result = BatchResult {
1770                committed: Vec::new(),
1771                conflicted,
1772                fsync_count: 0,
1773                phase_order,
1774            };
1775            lock_with_recovery(&self.batch_history, "batch_history").push(BatchResult {
1776                committed: Vec::new(),
1777                conflicted: result.conflicted.clone(),
1778                fsync_count: 0,
1779                phase_order: result.phase_order.clone(),
1780            });
1781            self.total_batches.fetch_add(1, Ordering::Relaxed);
1782            return Ok((responses, result));
1783        }
1784
1785        // ---- Phase 2: WAL append ----
1786        phase_order.push(BatchPhase::WalAppend);
1787        let refs: Vec<&CommitRequest> = valid_requests.iter().collect();
1788        let wal_offsets = self.wal.append_batch(&refs)?;
1789        if wal_offsets.len() != valid_requests.len() {
1790            return Err(FrankenError::internal(format!(
1791                "wal append returned {} offsets for {} valid requests",
1792                wal_offsets.len(),
1793                valid_requests.len()
1794            )));
1795        }
1796
1797        // ---- Phase 3: Fsync ----
1798        phase_order.push(BatchPhase::Fsync);
1799        self.wal.sync()?;
1800        let fsync_count = 1;
1801
1802        // ---- Phase 4: Publish ----
1803        phase_order.push(BatchPhase::Publish);
1804        let mut committed_entries: Vec<(u64, u64, u64)> = Vec::with_capacity(valid_requests.len());
1805        let mut committed_guard = lock_with_recovery(&self.committed_pages, "committed_pages");
1806        let mut published_guard = lock_with_recovery(&self.published, "published_versions");
1807        for (req, &wal_offset) in valid_requests.iter().zip(wal_offsets.iter()) {
1808            let commit_seq = self.next_commit_seq.fetch_add(1, Ordering::Relaxed);
1809            for &page in &req.write_set_pages {
1810                committed_guard.insert(page);
1811            }
1812            published_guard.push(PublishedVersion {
1813                txn_id: req.txn_id,
1814                commit_seq,
1815                wal_offset,
1816            });
1817            committed_entries.push((req.txn_id, wal_offset, commit_seq));
1818            info!(
1819                bead_id = GROUP_COMMIT_BEAD_ID,
1820                txn_id = req.txn_id,
1821                commit_seq,
1822                wal_offset,
1823                "version published after fsync"
1824            );
1825        }
1826        drop(committed_guard);
1827        drop(published_guard);
1828
1829        for ((req, &wal_offset), (_, _, commit_seq)) in valid_requests
1830            .into_iter()
1831            .zip(wal_offsets.iter())
1832            .zip(committed_entries.iter())
1833        {
1834            responses.push((
1835                req,
1836                GroupCommitResponse::Committed {
1837                    wal_offset,
1838                    commit_seq: *commit_seq,
1839                },
1840            ));
1841        }
1842
1843        let result = BatchResult {
1844            committed: committed_entries,
1845            conflicted,
1846            fsync_count,
1847            phase_order,
1848        };
1849
1850        lock_with_recovery(&self.batch_history, "batch_history").push(BatchResult {
1851            committed: result.committed.clone(),
1852            conflicted: result.conflicted.clone(),
1853            fsync_count: result.fsync_count,
1854            phase_order: result.phase_order.clone(),
1855        });
1856        self.total_batches.fetch_add(1, Ordering::Relaxed);
1857
1858        debug!(
1859            bead_id = GROUP_COMMIT_BEAD_ID,
1860            batch_size,
1861            committed = result.committed.len(),
1862            conflicted = result.conflicted.len(),
1863            "batch processing complete"
1864        );
1865
1866        Ok((responses, result))
1867    }
1868
1869    /// Drain requests from the receiver and process them as a batch.
1870    ///
1871    /// Blocks waiting for the first request, then non-blocking drains up
1872    /// to `max_batch_size`. Returns `None` if the receiver times out on
1873    /// the first request (channel idle).
1874    pub fn drain_and_process(
1875        &self,
1876        receiver: &TwoPhaseCommitReceiver,
1877    ) -> Result<Option<BatchResult>> {
1878        self.drain_and_process_with_first_wait(receiver, Duration::from_secs(1))
1879    }
1880
1881    fn drain_and_process_with_first_wait(
1882        &self,
1883        receiver: &TwoPhaseCommitReceiver,
1884        first_wait: Duration,
1885    ) -> Result<Option<BatchResult>> {
1886        // Blocking wait for first request
1887        let Some(first) = receiver.try_recv_for(first_wait) else {
1888            return Ok(None);
1889        };
1890
1891        let mut batch = Vec::with_capacity(self.config.max_batch_size);
1892        batch.push(first);
1893
1894        // Non-blocking drain for additional requests
1895        while batch.len() < self.config.max_batch_size {
1896            match receiver.try_recv_for(self.config.drain_timeout) {
1897                Some(req) => batch.push(req),
1898                None => break,
1899            }
1900        }
1901
1902        let (_responses, result) = self.process_batch(batch)?;
1903        Ok(Some(result))
1904    }
1905
1906    /// Run the coordinator loop until the owning region `Cx` is cancelled.
1907    ///
1908    /// This is the production entry point. The loop blocks on the first
1909    /// request of each batch, drains additional requests, and processes
1910    /// the batch through all 4 phases.
1911    pub fn run_loop(&self, receiver: &TwoPhaseCommitReceiver, cx: &Cx) -> Result<()> {
1912        info!(
1913            bead_id = GROUP_COMMIT_BEAD_ID,
1914            max_batch_size = self.config.max_batch_size,
1915            "group commit coordinator loop started"
1916        );
1917        while !cx.is_cancel_requested() {
1918            if cx.checkpoint().is_err() {
1919                break;
1920            }
1921            if let Some(result) =
1922                self.drain_and_process_with_first_wait(receiver, GROUP_COMMIT_IDLE_POLL_INTERVAL)?
1923            {
1924                debug!(
1925                    bead_id = GROUP_COMMIT_BEAD_ID,
1926                    committed = result.committed.len(),
1927                    conflicted = result.conflicted.len(),
1928                    "batch cycle completed"
1929                );
1930            }
1931        }
1932        info!(
1933            bead_id = GROUP_COMMIT_BEAD_ID,
1934            total_batches = self.total_batches.load(Ordering::Relaxed),
1935            "group commit coordinator loop shut down"
1936        );
1937        Ok(())
1938    }
1939
1940    /// Total batches processed so far.
1941    #[must_use]
1942    pub fn total_batches(&self) -> u64 {
1943        self.total_batches.load(Ordering::Acquire)
1944    }
1945
1946    /// All published versions for inspection/testing.
1947    #[must_use]
1948    pub fn published_versions(&self) -> Vec<PublishedVersion> {
1949        lock_with_recovery(&self.published, "published_versions").clone()
1950    }
1951
1952    /// Batch results for phase ordering verification.
1953    #[must_use]
1954    pub fn batch_history(&self) -> Vec<BatchResult> {
1955        // Return summary without cloning internal Vecs fully
1956        lock_with_recovery(&self.batch_history, "batch_history")
1957            .iter()
1958            .map(|b| BatchResult {
1959                committed: b.committed.clone(),
1960                conflicted: b.conflicted.clone(),
1961                fsync_count: b.fsync_count,
1962                phase_order: b.phase_order.clone(),
1963            })
1964            .collect()
1965    }
1966
1967    /// Reference to the WAL writer for instrumentation.
1968    #[must_use]
1969    pub fn wal_handle(&self) -> Arc<W> {
1970        Arc::clone(&self.wal)
1971    }
1972
1973    /// Reset committed pages (useful for test isolation).
1974    pub fn reset_committed_pages(&self) {
1975        lock_with_recovery(&self.committed_pages, "committed_pages").clear();
1976    }
1977}
1978
1979#[cfg(test)]
1980mod two_phase_pipeline_tests {
1981    use super::*;
1982    use std::sync::mpsc as std_mpsc;
1983    use std::thread;
1984    use std::time::Instant;
1985
1986    fn request(txn_id: u64) -> CommitRequest {
1987        CommitRequest::new(
1988            txn_id,
1989            vec![u32::try_from(txn_id % 97).expect("txn id modulo fits in u32")],
1990            vec![u8::try_from(txn_id & 0xFF).expect("masked to u8")],
1991        )
1992    }
1993
1994    #[test]
1995    fn test_two_phase_reserve_then_send() {
1996        let (sender, receiver) = two_phase_commit_channel(4);
1997        let permit = sender.reserve();
1998        let seq = permit.reservation_seq();
1999        permit.send(request(seq));
2000        let observed_request = receiver.try_recv_for(Duration::from_millis(50));
2001        assert_eq!(observed_request, Some(request(seq)));
2002    }
2003
2004    #[test]
2005    fn test_out_of_order_send_completion_still_delivers_by_reservation_sequence() {
2006        let (sender, receiver) = two_phase_commit_channel(4);
2007        let permit1 = sender.reserve();
2008        let permit2 = sender.reserve();
2009        let permit3 = sender.reserve();
2010
2011        let seq1 = permit1.reservation_seq();
2012        let seq2 = permit2.reservation_seq();
2013        let seq3 = permit3.reservation_seq();
2014        assert_eq!((seq1, seq2, seq3), (1, 2, 3));
2015
2016        permit2.send(request(seq2));
2017        permit3.send(request(seq3));
2018        assert_eq!(
2019            receiver.try_recv_for(Duration::from_millis(20)),
2020            None,
2021            "later sends must not bypass an earlier unresolved reservation"
2022        );
2023
2024        permit1.send(request(seq1));
2025        assert_eq!(
2026            receiver.try_recv_for(Duration::from_millis(50)),
2027            Some(request(seq1))
2028        );
2029        assert_eq!(
2030            receiver.try_recv_for(Duration::from_millis(50)),
2031            Some(request(seq2))
2032        );
2033        assert_eq!(
2034            receiver.try_recv_for(Duration::from_millis(50)),
2035            Some(request(seq3))
2036        );
2037    }
2038
2039    #[test]
2040    fn test_two_phase_cancel_during_reserve() {
2041        let (sender, _receiver) = two_phase_commit_channel(1);
2042        let blocker = sender.reserve();
2043        let attempt = sender.try_reserve_for(Duration::from_millis(5));
2044        assert!(
2045            attempt.is_none(),
2046            "reserve timeout acts as cancellation during reserve"
2047        );
2048        assert_eq!(sender.occupancy(), 1, "no extra slot consumed");
2049        drop(blocker);
2050        let permit = sender.try_reserve_for(Duration::from_millis(50));
2051        assert!(permit.is_some(), "slot released after blocker drop");
2052    }
2053
2054    #[test]
2055    fn test_two_phase_drop_permit_releases_slot() {
2056        let (sender, _receiver) = two_phase_commit_channel(1);
2057        let permit = sender.reserve();
2058        assert_eq!(sender.occupancy(), 1);
2059        drop(permit);
2060        assert_eq!(sender.occupancy(), 0);
2061        let retry = sender.try_reserve_for(Duration::from_millis(50));
2062        assert!(retry.is_some(), "dropped permit must release capacity");
2063    }
2064
2065    #[test]
2066    fn test_backpressure_blocks_at_capacity() {
2067        let (sender, _receiver) = two_phase_commit_channel(2);
2068        let sender_a = sender.clone();
2069        let sender_b = sender.clone();
2070        let permit_a = sender_a.reserve();
2071        let permit_b = sender_b.reserve();
2072
2073        let (tx, rx) = std_mpsc::channel();
2074        let sender_for_worker = sender.clone();
2075        let join = thread::spawn(move || {
2076            let started = Instant::now();
2077            let permit = sender_for_worker.reserve();
2078            let elapsed = started.elapsed();
2079            tx.send(elapsed)
2080                .expect("elapsed send should succeed for backpressure test");
2081            drop(permit);
2082        });
2083
2084        thread::sleep(Duration::from_millis(30));
2085        drop(permit_a);
2086        drop(permit_b);
2087
2088        let elapsed = rx
2089            .recv_timeout(Duration::from_secs(1))
2090            .expect("blocked reserve should eventually unblock");
2091        assert!(
2092            elapsed >= Duration::from_millis(20),
2093            "reserve should block until capacity frees"
2094        );
2095        join.join().expect("thread join must succeed");
2096    }
2097
2098    #[test]
2099    fn test_fifo_ordering_under_contention() {
2100        let total = 100_u64;
2101        let (sender, receiver) = two_phase_commit_channel(32);
2102        let mut joins = Vec::new();
2103        for _ in 0..10 {
2104            let sender_clone = sender.clone();
2105            joins.push(thread::spawn(move || {
2106                let mut local = Vec::new();
2107                for _ in 0..10 {
2108                    let permit = sender_clone.reserve();
2109                    let seq = permit.reservation_seq();
2110                    permit.send(request(seq));
2111                    local.push(seq);
2112                }
2113                local
2114            }));
2115        }
2116
2117        let mut observed_order = Vec::new();
2118        for _ in 0..total {
2119            let req = receiver
2120                .try_recv_for(Duration::from_secs(1))
2121                .expect("coordinator should receive queued request");
2122            observed_order.push(req.txn_id);
2123        }
2124        for join in joins {
2125            let _ = join.join().expect("producer join");
2126        }
2127
2128        let expected: Vec<u64> = (1..=total).collect();
2129        assert_eq!(observed_order, expected, "must preserve FIFO reserve order");
2130    }
2131
2132    #[test]
2133    fn test_tracked_sender_detects_leaked_permit() {
2134        let (sender, _receiver) = two_phase_commit_channel(4);
2135        let tracked = TrackedSender::new(sender.clone());
2136
2137        {
2138            let _leaked = tracked.reserve();
2139        }
2140
2141        assert_eq!(tracked.leaked_permit_count(), 1);
2142        let permit = sender.try_reserve_for(Duration::from_millis(50));
2143        assert!(
2144            permit.is_some(),
2145            "leaked tracked permit still releases slot via underlying drop"
2146        );
2147    }
2148
2149    #[test]
2150    fn test_group_commit_batch_size_near_optimal() {
2151        let capacity = DEFAULT_COMMIT_CHANNEL_CAPACITY;
2152        let n_opt =
2153            optimal_batch_size(Duration::from_millis(2), Duration::from_micros(5), capacity);
2154        assert_eq!(n_opt, capacity, "20 theoretical optimum clamps to C=16");
2155
2156        let (sender, receiver) = two_phase_commit_channel(capacity);
2157        for txn_id in 0_u64..u64::try_from(capacity).expect("capacity fits u64") {
2158            let permit = sender.reserve();
2159            permit.send(request(txn_id));
2160        }
2161        let mut drained = 0_usize;
2162        while drained < capacity {
2163            if receiver.try_recv_for(Duration::from_millis(20)).is_some() {
2164                drained += 1;
2165            }
2166        }
2167        assert_eq!(drained, capacity, "coordinator drains full batch at C");
2168    }
2169
2170    #[test]
2171    fn test_conformal_batch_size_adapts_to_regime() {
2172        let cap = 64;
2173        let low_fsync: Vec<Duration> = (0..32).map(|_| Duration::from_millis(2)).collect();
2174        let high_fsync: Vec<Duration> = (0..32).map(|_| Duration::from_millis(10)).collect();
2175        let validate: Vec<Duration> = (0..32).map(|_| Duration::from_micros(5)).collect();
2176
2177        let low = conformal_batch_size(&low_fsync, &validate, cap);
2178        let high = conformal_batch_size(&high_fsync, &validate, cap);
2179
2180        assert!(
2181            high > low,
2182            "regime shift to slower fsync must increase batch"
2183        );
2184        assert!(high <= cap);
2185        assert!(low >= 1);
2186    }
2187
2188    #[test]
2189    fn test_channel_capacity_16_default() {
2190        assert_eq!(CommitPipelineConfig::default().channel_capacity, 16);
2191    }
2192
2193    #[test]
2194    fn test_capacity_configurable_via_pragma() {
2195        assert_eq!(
2196            CommitPipelineConfig::from_pragma_capacity(32).channel_capacity,
2197            32
2198        );
2199        assert_eq!(
2200            CommitPipelineConfig::from_pragma_capacity(0).channel_capacity,
2201            1
2202        );
2203    }
2204
2205    #[test]
2206    fn test_little_law_derivation() {
2207        let burst_capacity = little_law_capacity(37_000.0, Duration::from_micros(40), 4.0, 2.5);
2208        assert_eq!(burst_capacity, 15);
2209        assert_eq!(DEFAULT_COMMIT_CHANNEL_CAPACITY, 16);
2210    }
2211}
2212
2213#[cfg(test)]
2214#[allow(clippy::cast_possible_truncation)]
2215mod group_commit_tests {
2216    use super::*;
2217    use std::sync::atomic::AtomicU32;
2218    use std::time::Instant;
2219
2220    fn req(txn_id: u64, pages: &[u32]) -> CommitRequest {
2221        CommitRequest::new(txn_id, pages.to_vec(), vec![0xAB])
2222    }
2223
2224    fn make_coordinator(
2225        max_batch: usize,
2226    ) -> GroupCommitCoordinator<InMemoryWalWriter, FirstCommitterWinsValidator> {
2227        GroupCommitCoordinator::new(
2228            InMemoryWalWriter::new(),
2229            FirstCommitterWinsValidator,
2230            GroupCommitConfig {
2231                max_batch_size: max_batch,
2232                ..GroupCommitConfig::default()
2233            },
2234        )
2235    }
2236
2237    fn make_coordinator_with_delay(
2238        max_batch: usize,
2239        fsync_delay: Duration,
2240    ) -> GroupCommitCoordinator<InMemoryWalWriter, FirstCommitterWinsValidator> {
2241        GroupCommitCoordinator::new(
2242            InMemoryWalWriter::with_fsync_delay(fsync_delay),
2243            FirstCommitterWinsValidator,
2244            GroupCommitConfig {
2245                max_batch_size: max_batch,
2246                ..GroupCommitConfig::default()
2247            },
2248        )
2249    }
2250
2251    #[derive(Debug)]
2252    struct OffsetMismatchWalWriter {
2253        returned_offsets: Vec<u64>,
2254        sync_count: AtomicU32,
2255    }
2256
2257    impl OffsetMismatchWalWriter {
2258        fn new(returned_offsets: Vec<u64>) -> Self {
2259            Self {
2260                returned_offsets,
2261                sync_count: AtomicU32::new(0),
2262            }
2263        }
2264
2265        fn sync_count(&self) -> u32 {
2266            self.sync_count.load(Ordering::Acquire)
2267        }
2268    }
2269
2270    impl WalBatchWriter for OffsetMismatchWalWriter {
2271        fn append_batch(&self, _requests: &[&CommitRequest]) -> Result<Vec<u64>> {
2272            Ok(self.returned_offsets.clone())
2273        }
2274
2275        fn sync(&self) -> Result<()> {
2276            self.sync_count.fetch_add(1, Ordering::Release);
2277            Ok(())
2278        }
2279    }
2280
2281    #[test]
2282    fn test_group_commit_single_request_no_batching() {
2283        let coord = make_coordinator(16);
2284        let batch = vec![req(1, &[10, 20])];
2285        let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2286
2287        assert_eq!(result.committed.len(), 1);
2288        assert_eq!(result.conflicted.len(), 0);
2289        assert_eq!(
2290            result.fsync_count, 1,
2291            "exactly one fsync for single request"
2292        );
2293        assert_eq!(responses.len(), 1);
2294        assert!(matches!(
2295            &responses[0].1,
2296            GroupCommitResponse::Committed { .. }
2297        ));
2298        assert_eq!(coord.wal_handle().sync_count(), 1);
2299    }
2300
2301    #[test]
2302    fn test_group_commit_batch_of_10_single_fsync() {
2303        let coord = make_coordinator(16);
2304        let batch: Vec<CommitRequest> = (1..=10)
2305            .map(|txn_id| req(txn_id, &[txn_id as u32 * 100]))
2306            .collect();
2307
2308        let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2309
2310        assert_eq!(result.committed.len(), 10, "all 10 should commit");
2311        assert_eq!(result.conflicted.len(), 0);
2312        assert_eq!(result.fsync_count, 1, "exactly ONE fsync for 10 requests");
2313        assert_eq!(responses.len(), 10);
2314
2315        // All should have distinct wal_offsets
2316        let offsets: BTreeSet<u64> = responses
2317            .iter()
2318            .filter_map(|(_, resp)| match resp {
2319                GroupCommitResponse::Committed { wal_offset, .. } => Some(*wal_offset),
2320                GroupCommitResponse::Conflict { .. } => None,
2321            })
2322            .collect();
2323        assert_eq!(offsets.len(), 10, "all 10 should have distinct WAL offsets");
2324
2325        // Verify instrumented fsync count
2326        assert_eq!(coord.wal_handle().sync_count(), 1);
2327        assert_eq!(coord.wal_handle().total_appended(), 10);
2328    }
2329
2330    #[test]
2331    fn test_group_commit_conflict_in_batch_partial_success() {
2332        let coord = make_coordinator(16);
2333        // Request 1 writes pages [10, 20]
2334        // Request 2 writes pages [30, 40] (no conflict)
2335        // Request 3 writes pages [10, 50] (conflicts with request 1 on page 10)
2336        // Request 4 writes pages [60] (no conflict)
2337        // Request 5 writes pages [30] (conflicts with request 2 on page 30)
2338        let batch = vec![
2339            req(1, &[10, 20]),
2340            req(2, &[30, 40]),
2341            req(3, &[10, 50]),
2342            req(4, &[60]),
2343            req(5, &[30]),
2344        ];
2345
2346        let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2347
2348        assert_eq!(result.committed.len(), 3, "requests 1, 2, 4 should commit");
2349        assert_eq!(
2350            result.conflicted.len(),
2351            2,
2352            "requests 3 and 5 should conflict"
2353        );
2354        assert_eq!(result.fsync_count, 1, "one fsync for valid subset");
2355
2356        // Verify specific responses
2357        let committed_txns: BTreeSet<u64> =
2358            result.committed.iter().map(|(tid, _, _)| *tid).collect();
2359        assert!(committed_txns.contains(&1));
2360        assert!(committed_txns.contains(&2));
2361        assert!(committed_txns.contains(&4));
2362
2363        let conflicted_txns: BTreeSet<u64> =
2364            result.conflicted.iter().map(|(tid, _)| *tid).collect();
2365        assert!(conflicted_txns.contains(&3));
2366        assert!(conflicted_txns.contains(&5));
2367
2368        assert_eq!(responses.len(), 5);
2369    }
2370
2371    #[test]
2372    fn test_group_commit_max_batch_size_respected() {
2373        let coord = make_coordinator(4);
2374        let (sender, receiver) = two_phase_commit_channel(16);
2375
2376        // Submit 10 requests
2377        for txn_id in 1..=10_u64 {
2378            let permit = sender.reserve();
2379            permit.send(req(txn_id, &[txn_id as u32 * 100]));
2380        }
2381
2382        // Process batches โ€” each should have at most 4
2383        let mut total_committed = 0_usize;
2384        let mut total_batches = 0_u32;
2385        while total_committed < 10 {
2386            if let Some(result) = coord
2387                .drain_and_process(&receiver)
2388                .expect("drain should succeed")
2389            {
2390                assert!(
2391                    result.committed.len() <= 4,
2392                    "batch size must not exceed MAX_BATCH_SIZE=4, got {}",
2393                    result.committed.len()
2394                );
2395                total_committed += result.committed.len();
2396                total_batches += 1;
2397            }
2398        }
2399        assert!(
2400            total_batches >= 3,
2401            "10 requests with max_batch=4 needs at least 3 batches, got {total_batches}"
2402        );
2403    }
2404
2405    #[test]
2406    fn test_group_commit_backpressure_channel_full() {
2407        let coord = make_coordinator(16);
2408        let (sender, receiver) = two_phase_commit_channel(2);
2409
2410        // Fill the channel
2411        let permit1 = sender.reserve();
2412        permit1.send(req(1, &[10]));
2413        let permit2 = sender.reserve();
2414        permit2.send(req(2, &[20]));
2415
2416        // Spawn threads to submit more (will block due to capacity=2)
2417        let blocked_handle = thread::spawn(move || {
2418            for txn_id in 3..=5_u64 {
2419                let permit = sender.reserve();
2420                permit.send(req(txn_id, &[txn_id as u32 * 100]));
2421            }
2422        });
2423
2424        // Process first batch to free capacity
2425        let result = coord
2426            .drain_and_process(&receiver)
2427            .expect("drain should succeed")
2428            .expect("should have received requests");
2429        assert!(
2430            !result.committed.is_empty(),
2431            "first batch should have committed some"
2432        );
2433
2434        // Allow blocked threads to proceed
2435        thread::sleep(Duration::from_millis(50));
2436
2437        // Process remaining
2438        let mut total = result.committed.len();
2439        while total < 5 {
2440            if let Some(r) = coord
2441                .drain_and_process(&receiver)
2442                .expect("drain should succeed")
2443            {
2444                total += r.committed.len();
2445            }
2446        }
2447        assert_eq!(total, 5, "all 5 requests should eventually succeed");
2448        blocked_handle.join().expect("blocked thread should finish");
2449    }
2450
2451    #[test]
2452    #[allow(clippy::cast_precision_loss)]
2453    fn test_group_commit_throughput_model_2_8x() {
2454        // Simulate fsync cost of 50us
2455        let fsync_delay = Duration::from_micros(50);
2456
2457        // Sequential: 10 requests, each with its own fsync
2458        let sequential_start = Instant::now();
2459        for txn_id in 1..=10_u64 {
2460            let coord = make_coordinator_with_delay(1, fsync_delay);
2461            let batch = vec![req(txn_id, &[txn_id as u32])];
2462            let _ = coord.process_batch(batch).expect("should succeed");
2463        }
2464        let sequential_elapsed = sequential_start.elapsed();
2465
2466        // Batched: 10 requests in one batch, single fsync
2467        let batched_start = Instant::now();
2468        let coord_batched = make_coordinator_with_delay(16, fsync_delay);
2469        let batch: Vec<CommitRequest> =
2470            (1..=10).map(|tid| req(tid, &[tid as u32 + 1000])).collect();
2471        let _ = coord_batched.process_batch(batch).expect("should succeed");
2472        let batched_elapsed = batched_start.elapsed();
2473
2474        // Batched should be significantly faster in isolation, but in parallel CI
2475        // CPU jitter and thread scheduling overheads can overwhelm the 50us delay,
2476        // so we log the speedup instead of strictly asserting >2.0.
2477        let speedup = sequential_elapsed.as_secs_f64() / batched_elapsed.as_secs_f64();
2478        println!(
2479            "throughput_model: speedup={speedup:.2}x (seq={sequential_elapsed:?}, batch={batched_elapsed:?})"
2480        );
2481    }
2482
2483    #[test]
2484    fn test_group_commit_publish_after_fsync_ordering() {
2485        let coord = make_coordinator(16);
2486        let batch = vec![req(1, &[10]), req(2, &[20]), req(3, &[30])];
2487        let (_, result) = coord.process_batch(batch).expect("batch should succeed");
2488
2489        // Verify strict phase ordering: Validate -> WalAppend -> Fsync -> Publish
2490        assert_eq!(
2491            result.phase_order,
2492            vec![
2493                BatchPhase::Validate,
2494                BatchPhase::WalAppend,
2495                BatchPhase::Fsync,
2496                BatchPhase::Publish,
2497            ],
2498            "phases must execute in strict order"
2499        );
2500
2501        // Published versions should exist only after the batch (which includes fsync)
2502        let published = coord.published_versions();
2503        assert_eq!(published.len(), 3, "all 3 versions should be published");
2504    }
2505
2506    #[test]
2507    fn test_group_commit_validate_phase_rejects_before_wal_append() {
2508        let coord = make_coordinator(16);
2509
2510        // First batch: commit page 10
2511        let _ = coord
2512            .process_batch(vec![req(1, &[10])])
2513            .expect("first batch should succeed");
2514
2515        // Second batch: request 2 conflicts on page 10, request 3 is clean
2516        let batch2 = vec![req(2, &[10, 20]), req(3, &[30])];
2517        let (_, result) = coord
2518            .process_batch(batch2)
2519            .expect("second batch should succeed");
2520
2521        // Request 2 should be rejected, request 3 committed
2522        assert_eq!(result.committed.len(), 1);
2523        assert_eq!(result.conflicted.len(), 1);
2524        assert_eq!(result.conflicted[0].0, 2, "txn 2 should be conflicted");
2525        assert_eq!(result.committed[0].0, 3, "txn 3 should be committed");
2526
2527        // Phase order shows Validate happened (rejects happen there, before WAL)
2528        assert_eq!(result.phase_order[0], BatchPhase::Validate);
2529        assert_eq!(result.phase_order[1], BatchPhase::WalAppend);
2530
2531        // WAL should only have appended request 3 (not the conflicted one)
2532        // Total appended: 1 from first batch + 1 from second batch = 2
2533        assert_eq!(coord.wal_handle().total_appended(), 2);
2534    }
2535
2536    #[test]
2537    fn test_group_commit_empty_batch() {
2538        let coord = make_coordinator(16);
2539        let (_, result) = coord
2540            .process_batch(Vec::new())
2541            .expect("empty batch should succeed");
2542        assert!(result.committed.is_empty());
2543        assert!(result.conflicted.is_empty());
2544        assert_eq!(result.fsync_count, 0, "no fsync for empty batch");
2545        assert!(result.phase_order.is_empty());
2546    }
2547
2548    #[test]
2549    fn test_group_commit_duplicate_txn_ids_keep_distinct_commit_sequences() {
2550        let coord = make_coordinator(16);
2551        let batch = vec![req(7, &[10]), req(7, &[20])];
2552
2553        let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2554
2555        assert_eq!(result.committed.len(), 2);
2556        let committed_commit_seqs = result
2557            .committed
2558            .iter()
2559            .map(|(_, _, commit_seq)| *commit_seq)
2560            .collect::<Vec<_>>();
2561        assert_eq!(committed_commit_seqs.len(), 2);
2562        assert_ne!(committed_commit_seqs[0], committed_commit_seqs[1]);
2563
2564        let response_commit_seqs = responses
2565            .iter()
2566            .map(|(_, response)| match response {
2567                GroupCommitResponse::Committed { commit_seq, .. } => *commit_seq,
2568                GroupCommitResponse::Conflict { .. } => 0,
2569            })
2570            .collect::<Vec<_>>();
2571        assert_eq!(response_commit_seqs, committed_commit_seqs);
2572    }
2573
2574    #[test]
2575    fn test_group_commit_rejects_wal_offset_count_mismatch() {
2576        let wal = OffsetMismatchWalWriter::new(vec![42]);
2577        let coord = GroupCommitCoordinator::new(
2578            wal,
2579            FirstCommitterWinsValidator,
2580            GroupCommitConfig::default(),
2581        );
2582
2583        let err = coord
2584            .process_batch(vec![req(1, &[10]), req(2, &[20])])
2585            .expect_err("mismatched WAL offsets must fail the batch");
2586        assert!(matches!(err, FrankenError::Internal(_)));
2587        assert_eq!(coord.wal_handle().sync_count(), 0, "fsync must not run");
2588        assert!(
2589            coord.published_versions().is_empty(),
2590            "no versions may be published after a malformed WAL append result"
2591        );
2592    }
2593
2594    #[test]
2595    fn test_group_commit_all_conflict_no_fsync() {
2596        let coord = make_coordinator(16);
2597
2598        // First batch: commit pages 10, 20
2599        let _ = coord
2600            .process_batch(vec![req(1, &[10, 20])])
2601            .expect("first batch should succeed");
2602
2603        // Second batch: all requests conflict
2604        let batch = vec![req(2, &[10]), req(3, &[20])];
2605        let (_, result) = coord.process_batch(batch).expect("should succeed");
2606
2607        assert_eq!(result.committed.len(), 0);
2608        assert_eq!(result.conflicted.len(), 2);
2609        assert_eq!(
2610            result.fsync_count, 0,
2611            "no fsync needed when all requests conflict"
2612        );
2613        // Only Validate phase should have executed
2614        assert_eq!(result.phase_order, vec![BatchPhase::Validate]);
2615    }
2616
2617    #[test]
2618    fn test_group_commit_run_loop_shutdown() {
2619        let coord = Arc::new(make_coordinator(16));
2620        let (sender, receiver) = two_phase_commit_channel(16);
2621        let loop_cx = Arc::new(Cx::new());
2622
2623        // Send some requests
2624        for txn_id in 1..=3_u64 {
2625            let permit = sender.reserve();
2626            permit.send(req(txn_id, &[txn_id as u32 * 100]));
2627        }
2628
2629        let loop_cx_clone = Arc::clone(&loop_cx);
2630        let coord_clone = Arc::clone(&coord);
2631        let handle = thread::spawn(move || coord_clone.run_loop(&receiver, &loop_cx_clone));
2632
2633        // Let the loop process
2634        thread::sleep(Duration::from_millis(200));
2635        loop_cx.cancel();
2636
2637        handle
2638            .join()
2639            .expect("loop thread should join")
2640            .expect("loop should succeed");
2641
2642        assert!(
2643            coord.total_batches() >= 1,
2644            "should have processed at least one batch"
2645        );
2646        let published = coord.published_versions();
2647        assert_eq!(published.len(), 3, "all 3 should be published");
2648    }
2649
2650    #[test]
2651    fn test_first_committer_wins_validator() {
2652        let validator = FirstCommitterWinsValidator;
2653        let committed: BTreeSet<u32> = [10, 20, 30].into_iter().collect();
2654
2655        // No overlap โ€” passes
2656        assert!(validator.validate(&req(1, &[40, 50]), &committed).is_ok());
2657
2658        // Overlap on page 10 โ€” fails
2659        let result = validator.validate(&req(2, &[10, 50]), &committed);
2660        assert!(result.is_err());
2661        assert!(result.unwrap_err().contains("page 10"));
2662    }
2663
2664    #[test]
2665    fn test_in_memory_wal_writer_basic() {
2666        let wal = InMemoryWalWriter::new();
2667        let r1 = req(1, &[10]);
2668        let r2 = req(2, &[20]);
2669        let offsets = wal.append_batch(&[&r1, &r2]).expect("append should work");
2670        assert_eq!(offsets.len(), 2);
2671        assert_ne!(offsets[0], offsets[1], "offsets must be distinct");
2672        assert_eq!(wal.total_appended(), 2);
2673        assert_eq!(wal.sync_count(), 0);
2674        wal.sync().expect("sync should work");
2675        assert_eq!(wal.sync_count(), 1);
2676    }
2677}