1use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::future::Future;
8use std::panic::AssertUnwindSafe;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex, MutexGuard};
11use std::task::{Context, Poll, Wake, Waker};
12use std::thread;
13use std::time::{Duration, Instant};
14
15use asupersync::channel::mpsc;
16use asupersync::cx::Cx as NativeCx;
17use asupersync::runtime::{JoinHandle as AsyncJoinHandle, Runtime, spawn_blocking};
18use asupersync::sync::{OwnedSemaphorePermit, Semaphore};
19use fsqlite_error::{FrankenError, Result};
20use fsqlite_types::cx::Cx;
21use tracing::{debug, error, info, warn};
22
23const BEAD_ID: &str = "bd-22n.11";
24
25pub const DEFAULT_COMMIT_CHANNEL_CAPACITY: usize = 16;
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct CommitRequest {
31 pub txn_id: u64,
32 pub write_set_pages: Vec<u32>,
33 pub payload: Vec<u8>,
34}
35
36impl CommitRequest {
37 #[must_use]
38 pub fn new(txn_id: u64, write_set_pages: Vec<u32>, payload: Vec<u8>) -> Self {
39 Self {
40 txn_id,
41 write_set_pages,
42 payload,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct CommitPipelineConfig {
50 pub channel_capacity: usize,
51}
52
53impl Default for CommitPipelineConfig {
54 fn default() -> Self {
55 Self {
56 channel_capacity: DEFAULT_COMMIT_CHANNEL_CAPACITY,
57 }
58 }
59}
60
61impl CommitPipelineConfig {
62 #[must_use]
64 pub fn from_pragma_capacity(raw_capacity: i64) -> Self {
65 let clamped_i64 = raw_capacity.clamp(1, i64::from(u16::MAX));
66 let clamped = usize::try_from(clamped_i64).expect("clamped to positive u16 range");
67 Self {
68 channel_capacity: clamped,
69 }
70 }
71}
72
73#[derive(Debug)]
74struct PendingCommit {
75 request: CommitRequest,
76 logical_permit: OwnedSemaphorePermit,
77}
78
79#[derive(Debug)]
80struct ReceiverOrderState {
81 next_receive_seq: u64,
82 aborted_reservations: BTreeSet<u64>,
83 pending_commits: BTreeMap<u64, PendingCommit>,
84}
85
86impl ReceiverOrderState {
87 fn new() -> Self {
88 Self {
89 next_receive_seq: 1,
90 aborted_reservations: BTreeSet::new(),
91 pending_commits: BTreeMap::new(),
92 }
93 }
94
95 fn mark_aborted(&mut self, reservation_seq: u64) {
96 self.aborted_reservations.insert(reservation_seq);
97 }
98
99 fn queue_commit(
100 &mut self,
101 reservation_seq: u64,
102 request: CommitRequest,
103 logical_permit: OwnedSemaphorePermit,
104 ) {
105 let replaced = self.pending_commits.insert(
106 reservation_seq,
107 PendingCommit {
108 request,
109 logical_permit,
110 },
111 );
112 debug_assert!(
113 replaced.is_none(),
114 "duplicate reservation sequence enqueued: {reservation_seq}"
115 );
116 }
117
118 fn rollback_pending_commit(&mut self, reservation_seq: u64) {
119 let _ = self.pending_commits.remove(&reservation_seq);
120 }
121
122 fn take_ready(&mut self) -> Option<CommitRequest> {
123 loop {
124 if self.aborted_reservations.remove(&self.next_receive_seq) {
125 self.next_receive_seq = self.next_receive_seq.saturating_add(1);
126 continue;
127 }
128
129 let PendingCommit {
130 request,
131 logical_permit,
132 } = self.pending_commits.remove(&self.next_receive_seq)?;
133 self.next_receive_seq = self.next_receive_seq.saturating_add(1);
134 drop(logical_permit);
135 return Some(request);
136 }
137 }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141enum CommitSignal {
142 CommitQueued { reservation_seq: u64 },
143}
144
145#[derive(Debug)]
146struct TwoPhaseQueueShared {
147 logical_capacity: Arc<Semaphore>,
148 signal_sender: mpsc::Sender<CommitSignal>,
149 signal_receiver: Mutex<mpsc::Receiver<CommitSignal>>,
150 next_reservation_seq: AtomicU64,
151 order_state: Mutex<ReceiverOrderState>,
152}
153
154impl TwoPhaseQueueShared {
155 fn with_capacity(capacity: usize) -> Self {
156 let normalized_capacity = capacity.max(1);
157 let (signal_sender, signal_receiver) = mpsc::channel(normalized_capacity);
158 Self {
159 logical_capacity: Arc::new(Semaphore::new(normalized_capacity)),
160 signal_sender,
161 signal_receiver: Mutex::new(signal_receiver),
162 next_reservation_seq: AtomicU64::new(1),
163 order_state: Mutex::new(ReceiverOrderState::new()),
164 }
165 }
166
167 fn reserve_sequence(&self) -> u64 {
168 self.next_reservation_seq.fetch_add(1, Ordering::AcqRel)
169 }
170
171 fn occupancy(&self) -> usize {
172 self.capacity()
173 .saturating_sub(self.logical_capacity.available_permits())
174 }
175
176 fn capacity(&self) -> usize {
177 self.logical_capacity.max_permits()
178 }
179
180 fn queue_commit(
181 &self,
182 reservation_seq: u64,
183 request: CommitRequest,
184 logical_permit: OwnedSemaphorePermit,
185 ) {
186 lock_with_recovery(&self.order_state, "two_phase_order_state").queue_commit(
187 reservation_seq,
188 request,
189 logical_permit,
190 );
191 }
192
193 fn rollback_pending_commit(&self, reservation_seq: u64) {
194 lock_with_recovery(&self.order_state, "two_phase_order_state")
195 .rollback_pending_commit(reservation_seq);
196 }
197
198 fn mark_aborted(&self, reservation_seq: u64) {
199 lock_with_recovery(&self.order_state, "two_phase_order_state")
200 .mark_aborted(reservation_seq);
201 self.signal_sender.wake_receiver();
202 }
203
204 fn take_ready_request(&self) -> Option<CommitRequest> {
205 lock_with_recovery(&self.order_state, "two_phase_order_state").take_ready()
206 }
207
208 fn drain_signals(&self) -> SignalDrain {
209 let mut receiver = lock_with_recovery(&self.signal_receiver, "two_phase_signal_receiver");
210 let mut drained_any = false;
211 loop {
212 match receiver.try_recv() {
213 Ok(CommitSignal::CommitQueued { .. }) => {
214 drained_any = true;
215 }
216 Err(mpsc::RecvError::Empty | mpsc::RecvError::Cancelled) => {
217 return if drained_any {
218 SignalDrain::Drained
219 } else {
220 SignalDrain::Empty
221 };
222 }
223 Err(mpsc::RecvError::Disconnected) => return SignalDrain::Disconnected,
224 }
225 }
226 }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230enum SignalDrain {
231 Empty,
232 Drained,
233 Disconnected,
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237enum SignalWaitOutcome {
238 Woken,
239 TimedOut,
240 Disconnected,
241}
242
243#[derive(Debug)]
244struct ThreadParkWaker {
245 thread: thread::Thread,
246}
247
248impl Wake for ThreadParkWaker {
249 fn wake(self: Arc<Self>) {
250 self.thread.unpark();
251 }
252
253 fn wake_by_ref(self: &Arc<Self>) {
254 self.thread.unpark();
255 }
256}
257
258fn current_thread_waker() -> Waker {
259 Waker::from(Arc::new(ThreadParkWaker {
260 thread: thread::current(),
261 }))
262}
263
264fn block_on_future_with_timeout<F>(future: F, native_cx: &NativeCx, timeout: Duration) -> F::Output
265where
266 F: Future,
267{
268 let waker = current_thread_waker();
269 let mut task_cx = Context::from_waker(&waker);
270 let mut future = Box::pin(future);
271 let started_at = Instant::now();
272 let mut cancelled_for_timeout = false;
273
274 loop {
275 match future.as_mut().poll(&mut task_cx) {
276 Poll::Ready(output) => return output,
277 Poll::Pending => {}
278 }
279
280 if !cancelled_for_timeout && started_at.elapsed() >= timeout {
281 native_cx.set_cancel_requested(true);
282 cancelled_for_timeout = true;
283 continue;
284 }
285
286 if cancelled_for_timeout {
287 thread::yield_now();
288 } else {
289 thread::park_timeout(timeout.saturating_sub(started_at.elapsed()));
290 }
291 }
292}
293
294fn try_reserve_signal_with_timeout(
295 sender: &mpsc::Sender<CommitSignal>,
296 timeout: Duration,
297) -> Option<mpsc::SendPermit<'_, CommitSignal>> {
298 let started_at = Instant::now();
299 loop {
300 match sender.try_reserve() {
301 Ok(permit) => return Some(permit),
302 Err(mpsc::SendError::Disconnected(())) => return None,
303 Err(mpsc::SendError::Full(()) | mpsc::SendError::Cancelled(())) => {
304 if started_at.elapsed() >= timeout {
305 return None;
306 }
307 thread::park_timeout(Duration::from_millis(1));
308 }
309 }
310 }
311}
312
313fn wait_for_signal_activity(shared: &TwoPhaseQueueShared, timeout: Duration) -> SignalWaitOutcome {
314 let native_cx = NativeCx::for_testing();
315 let waker = current_thread_waker();
316 let mut task_cx = Context::from_waker(&waker);
317 let started_at = Instant::now();
318 let mut receiver = lock_with_recovery(&shared.signal_receiver, "two_phase_signal_receiver");
319 let mut future = Box::pin(receiver.recv(&native_cx));
320
321 match future.as_mut().poll(&mut task_cx) {
322 Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => return SignalWaitOutcome::Woken,
323 Poll::Ready(Err(mpsc::RecvError::Disconnected)) => return SignalWaitOutcome::Disconnected,
324 Poll::Ready(Err(mpsc::RecvError::Cancelled)) => return SignalWaitOutcome::TimedOut,
325 Poll::Ready(Err(mpsc::RecvError::Empty)) => unreachable!("recv() does not return Empty"),
326 Poll::Pending => {}
327 }
328
329 let remaining = timeout.saturating_sub(started_at.elapsed());
330 if remaining.is_zero() {
331 native_cx.set_cancel_requested(true);
332 return match future.as_mut().poll(&mut task_cx) {
333 Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => SignalWaitOutcome::Woken,
334 Poll::Ready(Err(mpsc::RecvError::Disconnected)) => SignalWaitOutcome::Disconnected,
335 Poll::Ready(Err(mpsc::RecvError::Cancelled)) | Poll::Pending => {
336 SignalWaitOutcome::TimedOut
337 }
338 Poll::Ready(Err(mpsc::RecvError::Empty)) => {
339 unreachable!("recv() does not return Empty")
340 }
341 };
342 }
343
344 thread::park_timeout(remaining);
345 if started_at.elapsed() >= timeout {
346 native_cx.set_cancel_requested(true);
347 return match future.as_mut().poll(&mut task_cx) {
348 Poll::Ready(Ok(CommitSignal::CommitQueued { .. })) => SignalWaitOutcome::Woken,
349 Poll::Ready(Err(mpsc::RecvError::Disconnected)) => SignalWaitOutcome::Disconnected,
350 Poll::Ready(Err(mpsc::RecvError::Cancelled)) | Poll::Pending => {
351 SignalWaitOutcome::TimedOut
352 }
353 Poll::Ready(Err(mpsc::RecvError::Empty)) => {
354 unreachable!("recv() does not return Empty")
355 }
356 };
357 }
358
359 SignalWaitOutcome::Woken
360}
361
362#[derive(Debug, Clone)]
364pub struct TwoPhaseCommitSender {
365 shared: Arc<TwoPhaseQueueShared>,
366}
367
368impl TwoPhaseCommitSender {
369 pub fn reserve(&self) -> SendPermit<'_> {
371 loop {
372 if let Some(permit) = self.try_reserve_for(Duration::from_secs(3600)) {
373 return permit;
374 }
375 }
376 }
377
378 #[must_use]
380 pub fn try_reserve_for(&self, timeout: Duration) -> Option<SendPermit<'_>> {
381 let started_at = Instant::now();
382
383 let logical_cx = NativeCx::for_testing();
384 let logical_permit = match block_on_future_with_timeout(
385 OwnedSemaphorePermit::acquire(
386 Arc::clone(&self.shared.logical_capacity),
387 &logical_cx,
388 1,
389 ),
390 &logical_cx,
391 timeout,
392 ) {
393 Ok(permit) => permit,
394 Err(_) => return None,
395 };
396
397 let remaining = timeout.saturating_sub(started_at.elapsed());
398 let signal_permit = try_reserve_signal_with_timeout(&self.shared.signal_sender, remaining)?;
399
400 Some(SendPermit {
401 shared: Arc::clone(&self.shared),
402 signal_permit: Some(signal_permit),
403 logical_permit: Some(logical_permit),
404 reservation_seq: Some(self.shared.reserve_sequence()),
405 })
406 }
407
408 #[must_use]
410 pub fn occupancy(&self) -> usize {
411 self.shared.occupancy()
412 }
413
414 #[must_use]
416 pub fn capacity(&self) -> usize {
417 self.shared.capacity()
418 }
419}
420
421#[derive(Debug, Clone)]
423pub struct TwoPhaseCommitReceiver {
424 shared: Arc<TwoPhaseQueueShared>,
425}
426
427impl TwoPhaseCommitReceiver {
428 pub fn recv(&self) -> CommitRequest {
430 loop {
431 if let Some(request) = self.try_recv_for(Duration::from_secs(3600)) {
432 return request;
433 }
434 }
435 }
436
437 #[must_use]
439 pub fn try_recv_for(&self, timeout: Duration) -> Option<CommitRequest> {
440 let started_at = Instant::now();
441 loop {
442 match self.shared.drain_signals() {
443 SignalDrain::Drained => {
444 if let Some(request) = self.shared.take_ready_request() {
445 return Some(request);
446 }
447 continue;
448 }
449 SignalDrain::Disconnected => return self.shared.take_ready_request(),
450 SignalDrain::Empty => {}
451 }
452
453 if let Some(request) = self.shared.take_ready_request() {
454 return Some(request);
455 }
456
457 let remaining = timeout.saturating_sub(started_at.elapsed());
458 if remaining.is_zero() {
459 return self.shared.take_ready_request();
460 }
461
462 match wait_for_signal_activity(&self.shared, remaining) {
463 SignalWaitOutcome::Woken => {}
464 SignalWaitOutcome::Disconnected | SignalWaitOutcome::TimedOut => {
465 return self.shared.take_ready_request();
466 }
467 }
468 }
469 }
470}
471
472#[derive(Debug)]
476pub struct SendPermit<'a> {
477 shared: Arc<TwoPhaseQueueShared>,
478 signal_permit: Option<mpsc::SendPermit<'a, CommitSignal>>,
479 logical_permit: Option<OwnedSemaphorePermit>,
480 reservation_seq: Option<u64>,
481}
482
483impl SendPermit<'_> {
484 #[must_use]
486 pub fn reservation_seq(&self) -> u64 {
487 self.reservation_seq.unwrap_or(0)
488 }
489
490 pub fn send(mut self, request: CommitRequest) {
492 let Some(reservation_seq) = self.reservation_seq.take() else {
493 return;
494 };
495 let Some(logical_permit) = self.logical_permit.take() else {
496 return;
497 };
498
499 self.shared
500 .queue_commit(reservation_seq, request, logical_permit);
501
502 if let Some(signal_permit) = self.signal_permit.take() {
503 if signal_permit
504 .try_send(CommitSignal::CommitQueued { reservation_seq })
505 .is_err()
506 {
507 self.shared.rollback_pending_commit(reservation_seq);
508 self.shared.mark_aborted(reservation_seq);
509 }
510 }
511 }
512
513 pub fn abort(mut self) {
515 self.abort_current_reservation();
516 }
517
518 fn abort_current_reservation(&mut self) {
519 let Some(reservation_seq) = self.reservation_seq.take() else {
520 return;
521 };
522 if let Some(signal_permit) = self.signal_permit.take() {
523 signal_permit.abort();
524 }
525 let _ = self.logical_permit.take();
526 self.shared.mark_aborted(reservation_seq);
527 }
528}
529
530impl Drop for SendPermit<'_> {
531 fn drop(&mut self) {
532 self.abort_current_reservation();
533 }
534}
535
536#[derive(Debug, Clone)]
538pub struct TrackedSender {
539 sender: TwoPhaseCommitSender,
540 leaked_permits: Arc<AtomicU64>,
541}
542
543impl TrackedSender {
544 #[must_use]
545 pub fn new(sender: TwoPhaseCommitSender) -> Self {
546 Self {
547 sender,
548 leaked_permits: Arc::new(AtomicU64::new(0)),
549 }
550 }
551
552 pub fn reserve(&self) -> TrackedSendPermit<'_> {
553 TrackedSendPermit {
554 leaked_permits: Arc::clone(&self.leaked_permits),
555 permit: Some(self.sender.reserve()),
556 }
557 }
558
559 #[must_use]
560 pub fn leaked_permit_count(&self) -> u64 {
561 self.leaked_permits.load(Ordering::Acquire)
562 }
563}
564
565#[derive(Debug)]
567pub struct TrackedSendPermit<'a> {
568 leaked_permits: Arc<AtomicU64>,
569 permit: Option<SendPermit<'a>>,
570}
571
572impl TrackedSendPermit<'_> {
573 pub fn send(mut self, request: CommitRequest) {
575 if let Some(permit) = self.permit.take() {
576 permit.send(request);
577 }
578 }
579
580 pub fn abort(mut self) {
582 if let Some(permit) = self.permit.take() {
583 permit.abort();
584 }
585 }
586}
587
588impl Drop for TrackedSendPermit<'_> {
589 fn drop(&mut self) {
590 if self.permit.is_some() {
591 self.leaked_permits.fetch_add(1, Ordering::AcqRel);
592 }
593 }
594}
595
596#[must_use]
598pub fn two_phase_commit_channel(capacity: usize) -> (TwoPhaseCommitSender, TwoPhaseCommitReceiver) {
599 let shared = Arc::new(TwoPhaseQueueShared::with_capacity(capacity));
600 (
601 TwoPhaseCommitSender {
602 shared: Arc::clone(&shared),
603 },
604 TwoPhaseCommitReceiver { shared },
605 )
606}
607
608#[must_use]
610#[allow(
611 clippy::cast_possible_truncation,
612 clippy::cast_sign_loss,
613 clippy::cast_precision_loss
614)]
615pub fn little_law_capacity(
616 lambda_per_second: f64,
617 t_commit: Duration,
618 burst_multiplier: f64,
619 jitter_multiplier: f64,
620) -> usize {
621 let effective = lambda_per_second
622 * t_commit.as_secs_f64()
623 * burst_multiplier.max(1.0)
624 * jitter_multiplier.max(1.0);
625 effective.ceil().max(1.0) as usize
626}
627
628#[must_use]
630#[allow(
631 clippy::cast_possible_truncation,
632 clippy::cast_sign_loss,
633 clippy::cast_precision_loss
634)]
635pub fn optimal_batch_size(t_fsync: Duration, t_validate: Duration, capacity: usize) -> usize {
636 let denom = t_validate.as_secs_f64().max(f64::EPSILON);
637 let raw = (t_fsync.as_secs_f64() / denom).sqrt().round();
638 raw.clamp(1.0, capacity.max(1) as f64) as usize
639}
640
641#[must_use]
643#[allow(
644 clippy::cast_possible_truncation,
645 clippy::cast_sign_loss,
646 clippy::cast_precision_loss
647)]
648pub fn conformal_batch_size(
649 fsync_samples: &[Duration],
650 validate_samples: &[Duration],
651 capacity: usize,
652) -> usize {
653 if fsync_samples.is_empty() || validate_samples.is_empty() {
654 return 1;
655 }
656 let q_fsync = quantile_seconds(fsync_samples, 0.9);
657 let q_validate = quantile_seconds(validate_samples, 0.9).max(f64::EPSILON);
658 let raw = (q_fsync / q_validate).sqrt().round();
659 raw.clamp(1.0, capacity.max(1) as f64) as usize
660}
661
662fn quantile_seconds(samples: &[Duration], quantile: f64) -> f64 {
663 let mut values: Vec<f64> = samples.iter().map(Duration::as_secs_f64).collect();
664 values.sort_by(f64::total_cmp);
665 #[allow(
666 clippy::cast_possible_truncation,
667 clippy::cast_sign_loss,
668 clippy::cast_precision_loss
669 )]
670 let idx = ((values.len() as f64 - 1.0) * quantile.clamp(0.0, 1.0)).round() as usize;
671 values[idx]
672}
673
674#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
676pub enum CommitRepairEventKind {
677 CommitDurable,
678 DurableButNotRepairable,
679 CommitAcked,
680 RepairStarted,
681 RepairCompleted,
682 RepairFailed,
683}
684
685#[derive(Debug, Clone, Copy)]
687pub struct CommitRepairEvent {
688 pub commit_seq: u64,
689 pub seq: u64,
691 pub recorded_at: Instant,
693 pub kind: CommitRepairEventKind,
694}
695
696#[derive(Debug, Clone, Copy, PartialEq, Eq)]
698pub enum RepairState {
699 NotScheduled,
700 Pending,
701 Completed,
702 Failed,
703}
704
705#[derive(Debug, Clone, Copy)]
707pub struct CommitReceipt {
708 pub commit_seq: u64,
709 pub durable: bool,
710 pub repair_pending: bool,
711 pub latency: Duration,
712}
713
714#[derive(Debug, Clone, Copy, PartialEq, Eq)]
716pub struct CommitRepairConfig {
717 pub repair_enabled: bool,
718}
719
720impl Default for CommitRepairConfig {
721 fn default() -> Self {
722 Self {
723 repair_enabled: true,
724 }
725 }
726}
727
728pub trait CommitRepairIo: Send + Sync {
730 fn append_systematic_symbols(&self, commit_seq: u64, systematic_symbols: &[u8]) -> Result<()>;
731 fn sync_systematic_symbols(&self, commit_seq: u64) -> Result<()>;
732 fn append_repair_symbols(&self, commit_seq: u64, repair_symbols: &[u8]) -> Result<()>;
733 fn sync_repair_symbols(&self, commit_seq: u64) -> Result<()>;
734}
735
736pub trait RepairSymbolGenerator: Send + Sync {
738 fn generate_repair_symbols(
739 &self,
740 commit_seq: u64,
741 systematic_symbols: &[u8],
742 ) -> Result<Vec<u8>>;
743}
744
745#[derive(Debug, Default)]
747pub struct InMemoryCommitRepairIo {
748 systematic_by_commit: Mutex<HashMap<u64, Vec<u8>>>,
749 repair_by_commit: Mutex<HashMap<u64, Vec<u8>>>,
750 total_systematic_bytes: AtomicU64,
751 total_repair_bytes: AtomicU64,
752 systematic_syncs: AtomicU64,
753 repair_syncs: AtomicU64,
754}
755
756impl InMemoryCommitRepairIo {
757 #[must_use]
758 pub fn total_repair_bytes(&self) -> u64 {
759 self.total_repair_bytes.load(Ordering::Acquire)
760 }
761
762 #[must_use]
763 pub fn repair_sync_count(&self) -> u64 {
764 self.repair_syncs.load(Ordering::Acquire)
765 }
766
767 #[must_use]
768 pub fn systematic_sync_count(&self) -> u64 {
769 self.systematic_syncs.load(Ordering::Acquire)
770 }
771
772 #[must_use]
773 pub fn repair_symbols_for(&self, commit_seq: u64) -> Option<Vec<u8>> {
774 lock_with_recovery(&self.repair_by_commit, "repair_by_commit")
775 .get(&commit_seq)
776 .cloned()
777 }
778}
779
780impl CommitRepairIo for InMemoryCommitRepairIo {
781 fn append_systematic_symbols(&self, commit_seq: u64, systematic_symbols: &[u8]) -> Result<()> {
782 lock_with_recovery(&self.systematic_by_commit, "systematic_by_commit")
783 .insert(commit_seq, systematic_symbols.to_vec());
784 self.total_systematic_bytes.fetch_add(
785 u64::try_from(systematic_symbols.len()).map_err(|_| FrankenError::OutOfRange {
786 what: "systematic_symbol_len".to_owned(),
787 value: systematic_symbols.len().to_string(),
788 })?,
789 Ordering::Release,
790 );
791 Ok(())
792 }
793
794 fn sync_systematic_symbols(&self, _commit_seq: u64) -> Result<()> {
795 self.systematic_syncs.fetch_add(1, Ordering::Release);
796 Ok(())
797 }
798
799 fn append_repair_symbols(&self, commit_seq: u64, repair_symbols: &[u8]) -> Result<()> {
800 lock_with_recovery(&self.repair_by_commit, "repair_by_commit")
801 .insert(commit_seq, repair_symbols.to_vec());
802 self.total_repair_bytes.fetch_add(
803 u64::try_from(repair_symbols.len()).map_err(|_| FrankenError::OutOfRange {
804 what: "repair_symbol_len".to_owned(),
805 value: repair_symbols.len().to_string(),
806 })?,
807 Ordering::Release,
808 );
809 Ok(())
810 }
811
812 fn sync_repair_symbols(&self, _commit_seq: u64) -> Result<()> {
813 self.repair_syncs.fetch_add(1, Ordering::Release);
814 Ok(())
815 }
816}
817
818#[derive(Debug)]
820pub struct DeterministicRepairGenerator {
821 delay: Duration,
822 output_len: usize,
823 fail_repair: Arc<AtomicBool>,
824}
825
826impl DeterministicRepairGenerator {
827 #[must_use]
828 pub fn new(delay: Duration, output_len: usize) -> Self {
829 Self {
830 delay,
831 output_len: output_len.max(1),
832 fail_repair: Arc::new(AtomicBool::new(false)),
833 }
834 }
835
836 pub fn set_fail_repair(&self, fail: bool) {
837 self.fail_repair.store(fail, Ordering::Release);
838 }
839}
840
841impl RepairSymbolGenerator for DeterministicRepairGenerator {
842 fn generate_repair_symbols(
843 &self,
844 commit_seq: u64,
845 systematic_symbols: &[u8],
846 ) -> Result<Vec<u8>> {
847 if self.delay != Duration::ZERO {
848 thread::sleep(self.delay);
849 }
850 if self.fail_repair.load(Ordering::Acquire) {
851 return Err(FrankenError::Internal(format!(
852 "repair generation failed for commit_seq={commit_seq}"
853 )));
854 }
855
856 let source = if systematic_symbols.is_empty() {
857 &[0_u8][..]
858 } else {
859 systematic_symbols
860 };
861 let mut state = commit_seq
862 ^ u64::try_from(source.len()).map_err(|_| FrankenError::OutOfRange {
863 what: "systematic_symbol_len".to_owned(),
864 value: source.len().to_string(),
865 })?;
866 let mut out = Vec::with_capacity(self.output_len);
867 for idx in 0..self.output_len {
868 let src = source[idx % source.len()];
869 let idx_mod = u64::try_from(idx % 251).map_err(|_| FrankenError::OutOfRange {
870 what: "repair_symbol_index".to_owned(),
871 value: idx.to_string(),
872 })?;
873 state = state.rotate_left(7) ^ u64::from(src) ^ idx_mod;
874 out.push((state & 0xFF) as u8);
875 }
876 Ok(out)
877 }
878}
879
880pub struct CommitRepairCoordinator<
882 IO: CommitRepairIo + Send + Sync + 'static,
883 GEN: RepairSymbolGenerator + Send + Sync + 'static,
884> {
885 config: CommitRepairConfig,
886 runtime: Runtime,
887 coordinator_cx: Cx,
888 io: Arc<IO>,
889 generator: Arc<GEN>,
890 next_commit_seq: AtomicU64,
891 next_async_task_id: AtomicU64,
892 repair_states: Arc<Mutex<HashMap<u64, RepairState>>>,
893 events: Arc<Mutex<Vec<CommitRepairEvent>>>,
894 handles: Mutex<Vec<AsyncJoinHandle<()>>>,
895}
896
897impl<IO, GEN> CommitRepairCoordinator<IO, GEN>
898where
899 IO: CommitRepairIo + Send + Sync + 'static,
900 GEN: RepairSymbolGenerator + Send + Sync + 'static,
901{
902 #[must_use]
903 pub fn new(
904 config: CommitRepairConfig,
905 runtime: Runtime,
906 parent_cx: &Cx,
907 io: IO,
908 generator: GEN,
909 ) -> Self {
910 Self::with_shared(
911 config,
912 runtime,
913 parent_cx,
914 Arc::new(io),
915 Arc::new(generator),
916 )
917 }
918
919 #[must_use]
920 pub fn with_shared(
921 config: CommitRepairConfig,
922 runtime: Runtime,
923 parent_cx: &Cx,
924 io: Arc<IO>,
925 generator: Arc<GEN>,
926 ) -> Self {
927 Self {
928 config,
929 runtime,
930 coordinator_cx: parent_cx.create_child(),
931 io,
932 generator,
933 next_commit_seq: AtomicU64::new(1),
934 next_async_task_id: AtomicU64::new(1),
935 repair_states: Arc::new(Mutex::new(HashMap::new())),
936 events: Arc::new(Mutex::new(Vec::new())),
937 handles: Mutex::new(Vec::new()),
938 }
939 }
940
941 pub fn commit(&self, systematic_symbols: &[u8]) -> Result<CommitReceipt> {
943 let started_at = Instant::now();
944 let commit_seq = self.next_commit_seq.fetch_add(1, Ordering::Relaxed);
945
946 self.io
947 .append_systematic_symbols(commit_seq, systematic_symbols)?;
948 self.io.sync_systematic_symbols(commit_seq)?;
949 self.record(commit_seq, CommitRepairEventKind::CommitDurable);
950
951 if !self.config.repair_enabled {
952 self.record(commit_seq, CommitRepairEventKind::CommitAcked);
953 return Ok(CommitReceipt {
954 commit_seq,
955 durable: true,
956 repair_pending: false,
957 latency: started_at.elapsed(),
958 });
959 }
960
961 lock_with_recovery(&self.repair_states, "repair_states")
962 .insert(commit_seq, RepairState::Pending);
963 self.record(commit_seq, CommitRepairEventKind::DurableButNotRepairable);
964 debug!(
965 bead_id = BEAD_ID,
966 commit_seq, "commit is durable but not repairable while async repair is pending"
967 );
968 self.record(commit_seq, CommitRepairEventKind::CommitAcked);
969
970 let async_task_id = self.next_async_task_id.fetch_add(1, Ordering::Relaxed);
971 let io = Arc::clone(&self.io);
972 let generator = Arc::clone(&self.generator);
973 let repair_states = Arc::clone(&self.repair_states);
974 let events = Arc::clone(&self.events);
975 let systematic_snapshot = systematic_symbols.to_vec();
976 let worker_cx = self.coordinator_cx.create_child();
977 let handle = self.runtime.handle().try_spawn(run_repair_task(RepairTask {
978 commit_seq,
979 async_task_id,
980 io,
981 generator,
982 repair_states,
983 events,
984 systematic_snapshot,
985 worker_cx,
986 }));
987 match handle {
988 Ok(handle) => {
989 lock_with_recovery(&self.handles, "repair_handles").push(handle);
990 }
991 Err(err) => {
992 set_repair_state(&self.repair_states, commit_seq, RepairState::Failed);
993 self.record(commit_seq, CommitRepairEventKind::RepairFailed);
994 error!(
995 bead_id = BEAD_ID,
996 commit_seq,
997 async_task_id,
998 error = ?err,
999 "failed to schedule repair task on caller-owned runtime"
1000 );
1001 return Ok(CommitReceipt {
1002 commit_seq,
1003 durable: true,
1004 repair_pending: false,
1005 latency: started_at.elapsed(),
1006 });
1007 }
1008 }
1009
1010 Ok(CommitReceipt {
1011 commit_seq,
1012 durable: true,
1013 repair_pending: true,
1014 latency: started_at.elapsed(),
1015 })
1016 }
1017
1018 pub fn wait_for_background_repair(&self) -> Result<()> {
1020 let handles = {
1021 let mut guard = lock_with_recovery(&self.handles, "repair_handles");
1022 std::mem::take(&mut *guard)
1023 };
1024 let mut observed_panic = false;
1025 for handle in handles {
1026 let joined =
1027 std::panic::catch_unwind(AssertUnwindSafe(|| self.runtime.block_on(handle)));
1028 if joined.is_err() {
1029 observed_panic = true;
1030 }
1031 }
1032 if observed_panic {
1033 return Err(FrankenError::Internal(
1034 "background repair worker panicked".to_owned(),
1035 ));
1036 }
1037 Ok(())
1038 }
1039
1040 #[must_use]
1041 pub fn pending_background_repair_count(&self) -> usize {
1042 lock_with_recovery(&self.repair_states, "repair_states")
1043 .values()
1044 .filter(|state| matches!(state, RepairState::Pending))
1045 .count()
1046 }
1047
1048 #[must_use]
1049 pub fn repair_state_for(&self, commit_seq: u64) -> RepairState {
1050 lock_with_recovery(&self.repair_states, "repair_states")
1051 .get(&commit_seq)
1052 .copied()
1053 .unwrap_or(RepairState::NotScheduled)
1054 }
1055
1056 #[must_use]
1057 pub fn events_for_commit(&self, commit_seq: u64) -> Vec<CommitRepairEvent> {
1058 lock_with_recovery(&self.events, "repair_events")
1059 .iter()
1060 .copied()
1061 .filter(|event| event.commit_seq == commit_seq)
1062 .collect()
1063 }
1064
1065 #[must_use]
1066 pub fn durable_not_repairable_window(&self, commit_seq: u64) -> Option<Duration> {
1067 let events = self.events_for_commit(commit_seq);
1068 let pending = events
1069 .iter()
1070 .find(|event| event.kind == CommitRepairEventKind::DurableButNotRepairable)?;
1071 let repair_done = events
1072 .iter()
1073 .find(|event| event.kind == CommitRepairEventKind::RepairCompleted)?;
1074 Some(
1075 repair_done
1076 .recorded_at
1077 .saturating_duration_since(pending.recorded_at),
1078 )
1079 }
1080
1081 #[must_use]
1082 pub fn io_handle(&self) -> Arc<IO> {
1083 Arc::clone(&self.io)
1084 }
1085
1086 #[must_use]
1087 pub fn generator_handle(&self) -> Arc<GEN> {
1088 Arc::clone(&self.generator)
1089 }
1090
1091 fn record(&self, commit_seq: u64, kind: CommitRepairEventKind) {
1092 record_event_into(&self.events, commit_seq, kind);
1093 }
1094}
1095
1096impl<IO, GEN> Drop for CommitRepairCoordinator<IO, GEN>
1097where
1098 IO: CommitRepairIo + Send + Sync + 'static,
1099 GEN: RepairSymbolGenerator + Send + Sync + 'static,
1100{
1101 fn drop(&mut self) {
1102 let handles = {
1103 let mut guard = lock_with_recovery(&self.handles, "repair_handles");
1104 std::mem::take(&mut *guard)
1105 };
1106 for handle in handles {
1107 let joined =
1108 std::panic::catch_unwind(AssertUnwindSafe(|| self.runtime.block_on(handle)));
1109 if joined.is_err() {
1110 error!(
1111 bead_id = BEAD_ID,
1112 "background repair worker panicked during drop"
1113 );
1114 }
1115 }
1116 }
1117}
1118
1119struct RepairTask<IO, GEN> {
1120 commit_seq: u64,
1121 async_task_id: u64,
1122 io: Arc<IO>,
1123 generator: Arc<GEN>,
1124 repair_states: Arc<Mutex<HashMap<u64, RepairState>>>,
1125 events: Arc<Mutex<Vec<CommitRepairEvent>>>,
1126 systematic_snapshot: Vec<u8>,
1127 worker_cx: Cx,
1128}
1129
1130async fn run_repair_task<IO, GEN>(task: RepairTask<IO, GEN>)
1131where
1132 IO: CommitRepairIo + Send + Sync + 'static,
1133 GEN: RepairSymbolGenerator + Send + Sync + 'static,
1134{
1135 let RepairTask {
1136 commit_seq,
1137 async_task_id,
1138 io,
1139 generator,
1140 repair_states,
1141 events,
1142 systematic_snapshot,
1143 worker_cx,
1144 } = task;
1145
1146 let Some(native_worker_cx) = NativeCx::current() else {
1147 set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1148 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1149 error!(
1150 bead_id = BEAD_ID,
1151 commit_seq, async_task_id, "repair task missing native runtime context"
1152 );
1153 return;
1154 };
1155 worker_cx.set_native_cx(native_worker_cx.clone());
1156 if worker_cx.checkpoint().is_err() {
1157 set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1158 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1159 warn!(
1160 bead_id = BEAD_ID,
1161 commit_seq, async_task_id, "repair task was cancelled before blocking work started"
1162 );
1163 return;
1164 }
1165
1166 info!(
1167 bead_id = BEAD_ID,
1168 commit_seq, async_task_id, "repair symbols generation started"
1169 );
1170 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairStarted);
1171
1172 let blocking_cx = worker_cx.create_child();
1173 let repair_outcome = spawn_blocking(move || {
1174 std::panic::catch_unwind(AssertUnwindSafe(|| {
1175 blocking_cx.set_native_cx(native_worker_cx);
1176 let repair_symbols =
1177 generator.generate_repair_symbols(commit_seq, &systematic_snapshot)?;
1178 let repair_symbol_bytes = repair_symbols.len();
1179 io.append_repair_symbols(commit_seq, &repair_symbols)?;
1180 io.sync_repair_symbols(commit_seq)?;
1181 Ok::<usize, FrankenError>(repair_symbol_bytes)
1182 }))
1183 })
1184 .await;
1185
1186 match repair_outcome {
1187 Ok(Ok(repair_symbol_bytes)) => {
1188 set_repair_state(&repair_states, commit_seq, RepairState::Completed);
1189 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairCompleted);
1190 info!(
1191 bead_id = BEAD_ID,
1192 commit_seq,
1193 async_task_id,
1194 repair_symbol_bytes,
1195 "repair symbols append+sync completed"
1196 );
1197 }
1198 Ok(Err(err)) => {
1202 set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1203 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1204 error!(
1205 bead_id = BEAD_ID,
1206 commit_seq,
1207 async_task_id,
1208 error = ?err,
1209 "repair symbol generation or append/sync failed"
1210 );
1211 }
1212 Err(_panic_payload) => {
1213 set_repair_state(&repair_states, commit_seq, RepairState::Failed);
1214 record_event_into(&events, commit_seq, CommitRepairEventKind::RepairFailed);
1215 error!(
1216 bead_id = BEAD_ID,
1217 commit_seq, async_task_id, "repair symbol worker panicked"
1218 );
1219 }
1220 }
1221}
1222
1223fn lock_with_recovery<'a, T>(mutex: &'a Mutex<T>, lock_name: &'static str) -> MutexGuard<'a, T> {
1224 match mutex.lock() {
1225 Ok(guard) => guard,
1226 Err(poisoned) => {
1227 warn!(
1228 bead_id = BEAD_ID,
1229 lock = lock_name,
1230 "mutex poisoned; recovering inner state"
1231 );
1232 poisoned.into_inner()
1233 }
1234 }
1235}
1236
1237fn set_repair_state(
1238 repair_states: &Arc<Mutex<HashMap<u64, RepairState>>>,
1239 commit_seq: u64,
1240 state: RepairState,
1241) {
1242 lock_with_recovery(repair_states, "repair_states").insert(commit_seq, state);
1243}
1244
1245fn record_event_into(
1246 events: &Arc<Mutex<Vec<CommitRepairEvent>>>,
1247 commit_seq: u64,
1248 kind: CommitRepairEventKind,
1249) {
1250 let mut guard = lock_with_recovery(events, "repair_events");
1251 let seq = guard
1252 .iter()
1253 .rev()
1254 .find(|event| event.commit_seq == commit_seq)
1255 .map_or(1, |event| event.seq.saturating_add(1));
1256 guard.push(CommitRepairEvent {
1257 commit_seq,
1258 seq,
1259 recorded_at: Instant::now(),
1260 kind,
1261 });
1262}
1263
1264const GROUP_COMMIT_BEAD_ID: &str = "bd-l4gl";
1269const GROUP_COMMIT_IDLE_POLL_INTERVAL: Duration = Duration::from_millis(50);
1270
1271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1274pub enum BatchPhase {
1275 Validate,
1277 WalAppend,
1279 Fsync,
1281 Publish,
1283}
1284
1285#[derive(Debug, Clone, PartialEq, Eq)]
1287pub enum GroupCommitResponse {
1288 Committed { wal_offset: u64, commit_seq: u64 },
1290 Conflict { reason: String },
1292}
1293
1294#[derive(Debug)]
1296pub struct BatchResult {
1297 pub committed: Vec<(u64, u64, u64)>,
1299 pub conflicted: Vec<(u64, String)>,
1301 pub fsync_count: u32,
1303 pub phase_order: Vec<BatchPhase>,
1305}
1306
1307pub trait WalBatchWriter: Send + Sync {
1312 fn append_batch(&self, requests: &[&CommitRequest]) -> Result<Vec<u64>>;
1315
1316 fn sync(&self) -> Result<()>;
1318}
1319
1320pub trait WriteSetValidator: Send + Sync {
1325 fn validate(
1328 &self,
1329 request: &CommitRequest,
1330 committed_pages: &BTreeSet<u32>,
1331 ) -> std::result::Result<(), String>;
1332}
1333
1334#[derive(Debug, Default)]
1337pub struct FirstCommitterWinsValidator;
1338
1339impl WriteSetValidator for FirstCommitterWinsValidator {
1340 fn validate(
1341 &self,
1342 request: &CommitRequest,
1343 committed_pages: &BTreeSet<u32>,
1344 ) -> std::result::Result<(), String> {
1345 for &page in &request.write_set_pages {
1346 if committed_pages.contains(&page) {
1347 return Err(format!(
1348 "write-set conflict on page {page} for txn {}",
1349 request.txn_id
1350 ));
1351 }
1352 }
1353 Ok(())
1354 }
1355}
1356
1357#[derive(Debug)]
1359pub struct InMemoryWalWriter {
1360 next_offset: AtomicU64,
1361 sync_count: AtomicU64,
1362 total_appended: AtomicU64,
1363 fsync_delay: Duration,
1365}
1366
1367impl InMemoryWalWriter {
1368 #[must_use]
1370 pub fn new() -> Self {
1371 Self {
1372 next_offset: AtomicU64::new(1),
1373 sync_count: AtomicU64::new(0),
1374 total_appended: AtomicU64::new(0),
1375 fsync_delay: Duration::ZERO,
1376 }
1377 }
1378
1379 #[must_use]
1381 pub fn with_fsync_delay(delay: Duration) -> Self {
1382 Self {
1383 next_offset: AtomicU64::new(1),
1384 sync_count: AtomicU64::new(0),
1385 total_appended: AtomicU64::new(0),
1386 fsync_delay: delay,
1387 }
1388 }
1389
1390 #[must_use]
1392 pub fn sync_count(&self) -> u64 {
1393 self.sync_count.load(Ordering::Acquire)
1394 }
1395
1396 #[must_use]
1398 pub fn total_appended(&self) -> u64 {
1399 self.total_appended.load(Ordering::Acquire)
1400 }
1401}
1402
1403impl Default for InMemoryWalWriter {
1404 fn default() -> Self {
1405 Self::new()
1406 }
1407}
1408
1409impl WalBatchWriter for InMemoryWalWriter {
1410 fn append_batch(&self, requests: &[&CommitRequest]) -> Result<Vec<u64>> {
1411 let mut offsets = Vec::with_capacity(requests.len());
1412 for _req in requests {
1413 let offset = self.next_offset.fetch_add(1, Ordering::Relaxed);
1414 offsets.push(offset);
1415 }
1416 #[allow(clippy::cast_possible_truncation)]
1417 self.total_appended
1418 .fetch_add(requests.len() as u64, Ordering::Release);
1419 Ok(offsets)
1420 }
1421
1422 fn sync(&self) -> Result<()> {
1423 if self.fsync_delay != Duration::ZERO {
1424 thread::sleep(self.fsync_delay);
1425 }
1426 self.sync_count.fetch_add(1, Ordering::Release);
1427 Ok(())
1428 }
1429}
1430
1431#[cfg(test)]
1432mod commit_repair_async_tests {
1433 use super::*;
1434
1435 use asupersync::runtime::RuntimeBuilder;
1436 use std::thread;
1437
1438 #[test]
1439 fn test_commit_repair_runs_on_caller_owned_runtime() {
1440 let runtime = RuntimeBuilder::current_thread()
1441 .build()
1442 .expect("commit repair runtime");
1443 let root_cx = Cx::new();
1444 let io = Arc::new(InMemoryCommitRepairIo::default());
1445 let generator = Arc::new(DeterministicRepairGenerator::new(
1446 Duration::from_millis(25),
1447 64,
1448 ));
1449 let coordinator = CommitRepairCoordinator::with_shared(
1450 CommitRepairConfig {
1451 repair_enabled: true,
1452 },
1453 runtime,
1454 &root_cx,
1455 Arc::clone(&io),
1456 generator,
1457 );
1458
1459 let receipt = coordinator
1460 .commit(&[0xAB; 256])
1461 .expect("commit should succeed");
1462 assert_eq!(coordinator.pending_background_repair_count(), 1);
1463
1464 coordinator
1465 .wait_for_background_repair()
1466 .expect("background repair should finish");
1467
1468 assert_eq!(coordinator.pending_background_repair_count(), 0);
1469 assert_eq!(
1470 coordinator.repair_state_for(receipt.commit_seq),
1471 RepairState::Completed
1472 );
1473 assert!(
1474 coordinator
1475 .events_for_commit(receipt.commit_seq)
1476 .iter()
1477 .any(|event| event.kind == CommitRepairEventKind::RepairStarted)
1478 );
1479 assert!(
1480 io.total_repair_bytes() > 0,
1481 "repair task should append repair symbols"
1482 );
1483 }
1484
1485 #[test]
1486 fn test_commit_receipt_latency_tracks_critical_path_io() {
1487 #[derive(Debug)]
1488 struct DelayedIo {
1489 delay: Duration,
1490 }
1491
1492 impl CommitRepairIo for DelayedIo {
1493 fn append_systematic_symbols(
1494 &self,
1495 _commit_seq: u64,
1496 _systematic_symbols: &[u8],
1497 ) -> Result<()> {
1498 Ok(())
1499 }
1500
1501 fn sync_systematic_symbols(&self, _commit_seq: u64) -> Result<()> {
1502 thread::sleep(self.delay);
1503 Ok(())
1504 }
1505
1506 fn append_repair_symbols(
1507 &self,
1508 _commit_seq: u64,
1509 _repair_symbols: &[u8],
1510 ) -> Result<()> {
1511 Ok(())
1512 }
1513
1514 fn sync_repair_symbols(&self, _commit_seq: u64) -> Result<()> {
1515 Ok(())
1516 }
1517 }
1518
1519 let runtime = RuntimeBuilder::current_thread()
1520 .build()
1521 .expect("commit repair runtime");
1522 let root_cx = Cx::new();
1523 let coordinator = CommitRepairCoordinator::new(
1524 CommitRepairConfig {
1525 repair_enabled: false,
1526 },
1527 runtime,
1528 &root_cx,
1529 DelayedIo {
1530 delay: Duration::from_millis(10),
1531 },
1532 DeterministicRepairGenerator::new(Duration::ZERO, 64),
1533 );
1534
1535 let receipt = coordinator
1536 .commit(&[0xAB; 256])
1537 .expect("commit should succeed");
1538
1539 assert!(
1540 receipt.latency >= Duration::from_millis(8),
1541 "commit latency should include the critical-path systematic sync cost"
1542 );
1543 }
1544
1545 #[test]
1546 fn test_durable_not_repairable_window_tracks_wall_clock_delay() {
1547 let runtime = RuntimeBuilder::current_thread()
1548 .build()
1549 .expect("commit repair runtime");
1550 let root_cx = Cx::new();
1551 let coordinator = CommitRepairCoordinator::new(
1552 CommitRepairConfig {
1553 repair_enabled: true,
1554 },
1555 runtime,
1556 &root_cx,
1557 InMemoryCommitRepairIo::default(),
1558 DeterministicRepairGenerator::new(Duration::from_millis(20), 64),
1559 );
1560
1561 let receipt = coordinator
1562 .commit(&[0xAB; 256])
1563 .expect("commit should succeed");
1564 coordinator
1565 .wait_for_background_repair()
1566 .expect("background repair should finish");
1567
1568 let window = coordinator
1569 .durable_not_repairable_window(receipt.commit_seq)
1570 .expect("window should be measurable");
1571 assert!(
1572 window >= Duration::from_millis(15),
1573 "window should reflect real repair delay rather than logical event ordinals"
1574 );
1575 }
1576
1577 #[test]
1578 fn test_panicking_repair_marks_failed_without_abandoning_other_work() {
1579 #[derive(Debug)]
1580 struct PanicFirstGenerator;
1581
1582 impl RepairSymbolGenerator for PanicFirstGenerator {
1583 fn generate_repair_symbols(
1584 &self,
1585 commit_seq: u64,
1586 _systematic_symbols: &[u8],
1587 ) -> Result<Vec<u8>> {
1588 if commit_seq == 1 {
1589 panic!("intentional repair panic");
1590 }
1591 thread::sleep(Duration::from_millis(25));
1592 Ok(vec![0xCD; 32])
1593 }
1594 }
1595
1596 let runtime = RuntimeBuilder::current_thread()
1597 .build()
1598 .expect("commit repair runtime");
1599 let root_cx = Cx::new();
1600 let coordinator = CommitRepairCoordinator::new(
1601 CommitRepairConfig {
1602 repair_enabled: true,
1603 },
1604 runtime,
1605 &root_cx,
1606 InMemoryCommitRepairIo::default(),
1607 PanicFirstGenerator,
1608 );
1609
1610 let first = coordinator
1611 .commit(&[0xAA; 64])
1612 .expect("first commit should schedule repair");
1613 let second = coordinator
1614 .commit(&[0xBB; 64])
1615 .expect("second commit should schedule repair");
1616
1617 coordinator
1618 .wait_for_background_repair()
1619 .expect("panic inside repair work should be converted into RepairFailed state");
1620 assert_eq!(coordinator.pending_background_repair_count(), 0);
1621 assert_eq!(
1622 coordinator.repair_state_for(first.commit_seq),
1623 RepairState::Failed,
1624 "panicking repair task must be marked failed rather than left pending"
1625 );
1626 assert!(
1627 coordinator
1628 .events_for_commit(first.commit_seq)
1629 .iter()
1630 .any(|event| event.kind == CommitRepairEventKind::RepairFailed),
1631 "panicking repair task must emit a RepairFailed event"
1632 );
1633 assert_eq!(
1634 coordinator.repair_state_for(second.commit_seq),
1635 RepairState::Completed,
1636 "wait_for_background_repair must still drain non-panicking tasks"
1637 );
1638 }
1639}
1640
1641#[derive(Debug, Clone, Copy)]
1643pub struct GroupCommitConfig {
1644 pub max_batch_size: usize,
1646 pub drain_timeout: Duration,
1648}
1649
1650impl Default for GroupCommitConfig {
1651 fn default() -> Self {
1652 Self {
1653 max_batch_size: DEFAULT_COMMIT_CHANNEL_CAPACITY,
1654 drain_timeout: Duration::from_micros(100),
1655 }
1656 }
1657}
1658
1659#[derive(Debug, Clone, PartialEq, Eq)]
1661pub struct PublishedVersion {
1662 pub txn_id: u64,
1663 pub commit_seq: u64,
1664 pub wal_offset: u64,
1665}
1666
1667pub struct GroupCommitCoordinator<W: WalBatchWriter, V: WriteSetValidator> {
1678 wal: Arc<W>,
1679 validator: Arc<V>,
1680 config: GroupCommitConfig,
1681 next_commit_seq: AtomicU64,
1682 committed_pages: Mutex<BTreeSet<u32>>,
1683 published: Mutex<Vec<PublishedVersion>>,
1684 batch_history: Mutex<Vec<BatchResult>>,
1685 total_batches: AtomicU64,
1686}
1687
1688impl<W, V> GroupCommitCoordinator<W, V>
1689where
1690 W: WalBatchWriter + 'static,
1691 V: WriteSetValidator + 'static,
1692{
1693 #[must_use]
1695 pub fn new(wal: W, validator: V, config: GroupCommitConfig) -> Self {
1696 Self {
1697 wal: Arc::new(wal),
1698 validator: Arc::new(validator),
1699 config,
1700 next_commit_seq: AtomicU64::new(1),
1701 committed_pages: Mutex::new(BTreeSet::new()),
1702 published: Mutex::new(Vec::new()),
1703 batch_history: Mutex::new(Vec::new()),
1704 total_batches: AtomicU64::new(0),
1705 }
1706 }
1707
1708 #[allow(clippy::too_many_lines)]
1713 pub fn process_batch(
1714 &self,
1715 requests: Vec<CommitRequest>,
1716 ) -> Result<(Vec<(CommitRequest, GroupCommitResponse)>, BatchResult)> {
1717 if requests.is_empty() {
1718 return Ok((
1719 Vec::new(),
1720 BatchResult {
1721 committed: Vec::new(),
1722 conflicted: Vec::new(),
1723 fsync_count: 0,
1724 phase_order: Vec::new(),
1725 },
1726 ));
1727 }
1728
1729 let batch_size = requests.len();
1730 debug!(
1731 bead_id = GROUP_COMMIT_BEAD_ID,
1732 batch_size, "processing group commit batch"
1733 );
1734
1735 let mut phase_order = Vec::with_capacity(4);
1736 let mut responses: Vec<(CommitRequest, GroupCommitResponse)> =
1737 Vec::with_capacity(batch_size);
1738 let mut valid_requests: Vec<CommitRequest> = Vec::with_capacity(batch_size);
1739 let mut conflicted: Vec<(u64, String)> = Vec::new();
1740
1741 phase_order.push(BatchPhase::Validate);
1743 let mut merged_committed =
1744 lock_with_recovery(&self.committed_pages, "committed_pages").clone();
1745 for req in requests {
1748 match self.validator.validate(&req, &merged_committed) {
1749 Ok(()) => {
1750 for &page in &req.write_set_pages {
1751 merged_committed.insert(page);
1752 }
1753 valid_requests.push(req);
1754 }
1755 Err(reason) => {
1756 info!(
1757 bead_id = GROUP_COMMIT_BEAD_ID,
1758 txn_id = req.txn_id,
1759 reason = %reason,
1760 "conflict detected in validate phase (fail-fast)"
1761 );
1762 conflicted.push((req.txn_id, reason.clone()));
1763 responses.push((req, GroupCommitResponse::Conflict { reason }));
1764 }
1765 }
1766 }
1767
1768 if valid_requests.is_empty() {
1769 let result = BatchResult {
1770 committed: Vec::new(),
1771 conflicted,
1772 fsync_count: 0,
1773 phase_order,
1774 };
1775 lock_with_recovery(&self.batch_history, "batch_history").push(BatchResult {
1776 committed: Vec::new(),
1777 conflicted: result.conflicted.clone(),
1778 fsync_count: 0,
1779 phase_order: result.phase_order.clone(),
1780 });
1781 self.total_batches.fetch_add(1, Ordering::Relaxed);
1782 return Ok((responses, result));
1783 }
1784
1785 phase_order.push(BatchPhase::WalAppend);
1787 let refs: Vec<&CommitRequest> = valid_requests.iter().collect();
1788 let wal_offsets = self.wal.append_batch(&refs)?;
1789 if wal_offsets.len() != valid_requests.len() {
1790 return Err(FrankenError::internal(format!(
1791 "wal append returned {} offsets for {} valid requests",
1792 wal_offsets.len(),
1793 valid_requests.len()
1794 )));
1795 }
1796
1797 phase_order.push(BatchPhase::Fsync);
1799 self.wal.sync()?;
1800 let fsync_count = 1;
1801
1802 phase_order.push(BatchPhase::Publish);
1804 let mut committed_entries: Vec<(u64, u64, u64)> = Vec::with_capacity(valid_requests.len());
1805 let mut committed_guard = lock_with_recovery(&self.committed_pages, "committed_pages");
1806 let mut published_guard = lock_with_recovery(&self.published, "published_versions");
1807 for (req, &wal_offset) in valid_requests.iter().zip(wal_offsets.iter()) {
1808 let commit_seq = self.next_commit_seq.fetch_add(1, Ordering::Relaxed);
1809 for &page in &req.write_set_pages {
1810 committed_guard.insert(page);
1811 }
1812 published_guard.push(PublishedVersion {
1813 txn_id: req.txn_id,
1814 commit_seq,
1815 wal_offset,
1816 });
1817 committed_entries.push((req.txn_id, wal_offset, commit_seq));
1818 info!(
1819 bead_id = GROUP_COMMIT_BEAD_ID,
1820 txn_id = req.txn_id,
1821 commit_seq,
1822 wal_offset,
1823 "version published after fsync"
1824 );
1825 }
1826 drop(committed_guard);
1827 drop(published_guard);
1828
1829 for ((req, &wal_offset), (_, _, commit_seq)) in valid_requests
1830 .into_iter()
1831 .zip(wal_offsets.iter())
1832 .zip(committed_entries.iter())
1833 {
1834 responses.push((
1835 req,
1836 GroupCommitResponse::Committed {
1837 wal_offset,
1838 commit_seq: *commit_seq,
1839 },
1840 ));
1841 }
1842
1843 let result = BatchResult {
1844 committed: committed_entries,
1845 conflicted,
1846 fsync_count,
1847 phase_order,
1848 };
1849
1850 lock_with_recovery(&self.batch_history, "batch_history").push(BatchResult {
1851 committed: result.committed.clone(),
1852 conflicted: result.conflicted.clone(),
1853 fsync_count: result.fsync_count,
1854 phase_order: result.phase_order.clone(),
1855 });
1856 self.total_batches.fetch_add(1, Ordering::Relaxed);
1857
1858 debug!(
1859 bead_id = GROUP_COMMIT_BEAD_ID,
1860 batch_size,
1861 committed = result.committed.len(),
1862 conflicted = result.conflicted.len(),
1863 "batch processing complete"
1864 );
1865
1866 Ok((responses, result))
1867 }
1868
1869 pub fn drain_and_process(
1875 &self,
1876 receiver: &TwoPhaseCommitReceiver,
1877 ) -> Result<Option<BatchResult>> {
1878 self.drain_and_process_with_first_wait(receiver, Duration::from_secs(1))
1879 }
1880
1881 fn drain_and_process_with_first_wait(
1882 &self,
1883 receiver: &TwoPhaseCommitReceiver,
1884 first_wait: Duration,
1885 ) -> Result<Option<BatchResult>> {
1886 let Some(first) = receiver.try_recv_for(first_wait) else {
1888 return Ok(None);
1889 };
1890
1891 let mut batch = Vec::with_capacity(self.config.max_batch_size);
1892 batch.push(first);
1893
1894 while batch.len() < self.config.max_batch_size {
1896 match receiver.try_recv_for(self.config.drain_timeout) {
1897 Some(req) => batch.push(req),
1898 None => break,
1899 }
1900 }
1901
1902 let (_responses, result) = self.process_batch(batch)?;
1903 Ok(Some(result))
1904 }
1905
1906 pub fn run_loop(&self, receiver: &TwoPhaseCommitReceiver, cx: &Cx) -> Result<()> {
1912 info!(
1913 bead_id = GROUP_COMMIT_BEAD_ID,
1914 max_batch_size = self.config.max_batch_size,
1915 "group commit coordinator loop started"
1916 );
1917 while !cx.is_cancel_requested() {
1918 if cx.checkpoint().is_err() {
1919 break;
1920 }
1921 if let Some(result) =
1922 self.drain_and_process_with_first_wait(receiver, GROUP_COMMIT_IDLE_POLL_INTERVAL)?
1923 {
1924 debug!(
1925 bead_id = GROUP_COMMIT_BEAD_ID,
1926 committed = result.committed.len(),
1927 conflicted = result.conflicted.len(),
1928 "batch cycle completed"
1929 );
1930 }
1931 }
1932 info!(
1933 bead_id = GROUP_COMMIT_BEAD_ID,
1934 total_batches = self.total_batches.load(Ordering::Relaxed),
1935 "group commit coordinator loop shut down"
1936 );
1937 Ok(())
1938 }
1939
1940 #[must_use]
1942 pub fn total_batches(&self) -> u64 {
1943 self.total_batches.load(Ordering::Acquire)
1944 }
1945
1946 #[must_use]
1948 pub fn published_versions(&self) -> Vec<PublishedVersion> {
1949 lock_with_recovery(&self.published, "published_versions").clone()
1950 }
1951
1952 #[must_use]
1954 pub fn batch_history(&self) -> Vec<BatchResult> {
1955 lock_with_recovery(&self.batch_history, "batch_history")
1957 .iter()
1958 .map(|b| BatchResult {
1959 committed: b.committed.clone(),
1960 conflicted: b.conflicted.clone(),
1961 fsync_count: b.fsync_count,
1962 phase_order: b.phase_order.clone(),
1963 })
1964 .collect()
1965 }
1966
1967 #[must_use]
1969 pub fn wal_handle(&self) -> Arc<W> {
1970 Arc::clone(&self.wal)
1971 }
1972
1973 pub fn reset_committed_pages(&self) {
1975 lock_with_recovery(&self.committed_pages, "committed_pages").clear();
1976 }
1977}
1978
1979#[cfg(test)]
1980mod two_phase_pipeline_tests {
1981 use super::*;
1982 use std::sync::mpsc as std_mpsc;
1983 use std::thread;
1984 use std::time::Instant;
1985
1986 fn request(txn_id: u64) -> CommitRequest {
1987 CommitRequest::new(
1988 txn_id,
1989 vec![u32::try_from(txn_id % 97).expect("txn id modulo fits in u32")],
1990 vec![u8::try_from(txn_id & 0xFF).expect("masked to u8")],
1991 )
1992 }
1993
1994 #[test]
1995 fn test_two_phase_reserve_then_send() {
1996 let (sender, receiver) = two_phase_commit_channel(4);
1997 let permit = sender.reserve();
1998 let seq = permit.reservation_seq();
1999 permit.send(request(seq));
2000 let observed_request = receiver.try_recv_for(Duration::from_millis(50));
2001 assert_eq!(observed_request, Some(request(seq)));
2002 }
2003
2004 #[test]
2005 fn test_out_of_order_send_completion_still_delivers_by_reservation_sequence() {
2006 let (sender, receiver) = two_phase_commit_channel(4);
2007 let permit1 = sender.reserve();
2008 let permit2 = sender.reserve();
2009 let permit3 = sender.reserve();
2010
2011 let seq1 = permit1.reservation_seq();
2012 let seq2 = permit2.reservation_seq();
2013 let seq3 = permit3.reservation_seq();
2014 assert_eq!((seq1, seq2, seq3), (1, 2, 3));
2015
2016 permit2.send(request(seq2));
2017 permit3.send(request(seq3));
2018 assert_eq!(
2019 receiver.try_recv_for(Duration::from_millis(20)),
2020 None,
2021 "later sends must not bypass an earlier unresolved reservation"
2022 );
2023
2024 permit1.send(request(seq1));
2025 assert_eq!(
2026 receiver.try_recv_for(Duration::from_millis(50)),
2027 Some(request(seq1))
2028 );
2029 assert_eq!(
2030 receiver.try_recv_for(Duration::from_millis(50)),
2031 Some(request(seq2))
2032 );
2033 assert_eq!(
2034 receiver.try_recv_for(Duration::from_millis(50)),
2035 Some(request(seq3))
2036 );
2037 }
2038
2039 #[test]
2040 fn test_two_phase_cancel_during_reserve() {
2041 let (sender, _receiver) = two_phase_commit_channel(1);
2042 let blocker = sender.reserve();
2043 let attempt = sender.try_reserve_for(Duration::from_millis(5));
2044 assert!(
2045 attempt.is_none(),
2046 "reserve timeout acts as cancellation during reserve"
2047 );
2048 assert_eq!(sender.occupancy(), 1, "no extra slot consumed");
2049 drop(blocker);
2050 let permit = sender.try_reserve_for(Duration::from_millis(50));
2051 assert!(permit.is_some(), "slot released after blocker drop");
2052 }
2053
2054 #[test]
2055 fn test_two_phase_drop_permit_releases_slot() {
2056 let (sender, _receiver) = two_phase_commit_channel(1);
2057 let permit = sender.reserve();
2058 assert_eq!(sender.occupancy(), 1);
2059 drop(permit);
2060 assert_eq!(sender.occupancy(), 0);
2061 let retry = sender.try_reserve_for(Duration::from_millis(50));
2062 assert!(retry.is_some(), "dropped permit must release capacity");
2063 }
2064
2065 #[test]
2066 fn test_backpressure_blocks_at_capacity() {
2067 let (sender, _receiver) = two_phase_commit_channel(2);
2068 let sender_a = sender.clone();
2069 let sender_b = sender.clone();
2070 let permit_a = sender_a.reserve();
2071 let permit_b = sender_b.reserve();
2072
2073 let (tx, rx) = std_mpsc::channel();
2074 let sender_for_worker = sender.clone();
2075 let join = thread::spawn(move || {
2076 let started = Instant::now();
2077 let permit = sender_for_worker.reserve();
2078 let elapsed = started.elapsed();
2079 tx.send(elapsed)
2080 .expect("elapsed send should succeed for backpressure test");
2081 drop(permit);
2082 });
2083
2084 thread::sleep(Duration::from_millis(30));
2085 drop(permit_a);
2086 drop(permit_b);
2087
2088 let elapsed = rx
2089 .recv_timeout(Duration::from_secs(1))
2090 .expect("blocked reserve should eventually unblock");
2091 assert!(
2092 elapsed >= Duration::from_millis(20),
2093 "reserve should block until capacity frees"
2094 );
2095 join.join().expect("thread join must succeed");
2096 }
2097
2098 #[test]
2099 fn test_fifo_ordering_under_contention() {
2100 let total = 100_u64;
2101 let (sender, receiver) = two_phase_commit_channel(32);
2102 let mut joins = Vec::new();
2103 for _ in 0..10 {
2104 let sender_clone = sender.clone();
2105 joins.push(thread::spawn(move || {
2106 let mut local = Vec::new();
2107 for _ in 0..10 {
2108 let permit = sender_clone.reserve();
2109 let seq = permit.reservation_seq();
2110 permit.send(request(seq));
2111 local.push(seq);
2112 }
2113 local
2114 }));
2115 }
2116
2117 let mut observed_order = Vec::new();
2118 for _ in 0..total {
2119 let req = receiver
2120 .try_recv_for(Duration::from_secs(1))
2121 .expect("coordinator should receive queued request");
2122 observed_order.push(req.txn_id);
2123 }
2124 for join in joins {
2125 let _ = join.join().expect("producer join");
2126 }
2127
2128 let expected: Vec<u64> = (1..=total).collect();
2129 assert_eq!(observed_order, expected, "must preserve FIFO reserve order");
2130 }
2131
2132 #[test]
2133 fn test_tracked_sender_detects_leaked_permit() {
2134 let (sender, _receiver) = two_phase_commit_channel(4);
2135 let tracked = TrackedSender::new(sender.clone());
2136
2137 {
2138 let _leaked = tracked.reserve();
2139 }
2140
2141 assert_eq!(tracked.leaked_permit_count(), 1);
2142 let permit = sender.try_reserve_for(Duration::from_millis(50));
2143 assert!(
2144 permit.is_some(),
2145 "leaked tracked permit still releases slot via underlying drop"
2146 );
2147 }
2148
2149 #[test]
2150 fn test_group_commit_batch_size_near_optimal() {
2151 let capacity = DEFAULT_COMMIT_CHANNEL_CAPACITY;
2152 let n_opt =
2153 optimal_batch_size(Duration::from_millis(2), Duration::from_micros(5), capacity);
2154 assert_eq!(n_opt, capacity, "20 theoretical optimum clamps to C=16");
2155
2156 let (sender, receiver) = two_phase_commit_channel(capacity);
2157 for txn_id in 0_u64..u64::try_from(capacity).expect("capacity fits u64") {
2158 let permit = sender.reserve();
2159 permit.send(request(txn_id));
2160 }
2161 let mut drained = 0_usize;
2162 while drained < capacity {
2163 if receiver.try_recv_for(Duration::from_millis(20)).is_some() {
2164 drained += 1;
2165 }
2166 }
2167 assert_eq!(drained, capacity, "coordinator drains full batch at C");
2168 }
2169
2170 #[test]
2171 fn test_conformal_batch_size_adapts_to_regime() {
2172 let cap = 64;
2173 let low_fsync: Vec<Duration> = (0..32).map(|_| Duration::from_millis(2)).collect();
2174 let high_fsync: Vec<Duration> = (0..32).map(|_| Duration::from_millis(10)).collect();
2175 let validate: Vec<Duration> = (0..32).map(|_| Duration::from_micros(5)).collect();
2176
2177 let low = conformal_batch_size(&low_fsync, &validate, cap);
2178 let high = conformal_batch_size(&high_fsync, &validate, cap);
2179
2180 assert!(
2181 high > low,
2182 "regime shift to slower fsync must increase batch"
2183 );
2184 assert!(high <= cap);
2185 assert!(low >= 1);
2186 }
2187
2188 #[test]
2189 fn test_channel_capacity_16_default() {
2190 assert_eq!(CommitPipelineConfig::default().channel_capacity, 16);
2191 }
2192
2193 #[test]
2194 fn test_capacity_configurable_via_pragma() {
2195 assert_eq!(
2196 CommitPipelineConfig::from_pragma_capacity(32).channel_capacity,
2197 32
2198 );
2199 assert_eq!(
2200 CommitPipelineConfig::from_pragma_capacity(0).channel_capacity,
2201 1
2202 );
2203 }
2204
2205 #[test]
2206 fn test_little_law_derivation() {
2207 let burst_capacity = little_law_capacity(37_000.0, Duration::from_micros(40), 4.0, 2.5);
2208 assert_eq!(burst_capacity, 15);
2209 assert_eq!(DEFAULT_COMMIT_CHANNEL_CAPACITY, 16);
2210 }
2211}
2212
2213#[cfg(test)]
2214#[allow(clippy::cast_possible_truncation)]
2215mod group_commit_tests {
2216 use super::*;
2217 use std::sync::atomic::AtomicU32;
2218 use std::time::Instant;
2219
2220 fn req(txn_id: u64, pages: &[u32]) -> CommitRequest {
2221 CommitRequest::new(txn_id, pages.to_vec(), vec![0xAB])
2222 }
2223
2224 fn make_coordinator(
2225 max_batch: usize,
2226 ) -> GroupCommitCoordinator<InMemoryWalWriter, FirstCommitterWinsValidator> {
2227 GroupCommitCoordinator::new(
2228 InMemoryWalWriter::new(),
2229 FirstCommitterWinsValidator,
2230 GroupCommitConfig {
2231 max_batch_size: max_batch,
2232 ..GroupCommitConfig::default()
2233 },
2234 )
2235 }
2236
2237 fn make_coordinator_with_delay(
2238 max_batch: usize,
2239 fsync_delay: Duration,
2240 ) -> GroupCommitCoordinator<InMemoryWalWriter, FirstCommitterWinsValidator> {
2241 GroupCommitCoordinator::new(
2242 InMemoryWalWriter::with_fsync_delay(fsync_delay),
2243 FirstCommitterWinsValidator,
2244 GroupCommitConfig {
2245 max_batch_size: max_batch,
2246 ..GroupCommitConfig::default()
2247 },
2248 )
2249 }
2250
2251 #[derive(Debug)]
2252 struct OffsetMismatchWalWriter {
2253 returned_offsets: Vec<u64>,
2254 sync_count: AtomicU32,
2255 }
2256
2257 impl OffsetMismatchWalWriter {
2258 fn new(returned_offsets: Vec<u64>) -> Self {
2259 Self {
2260 returned_offsets,
2261 sync_count: AtomicU32::new(0),
2262 }
2263 }
2264
2265 fn sync_count(&self) -> u32 {
2266 self.sync_count.load(Ordering::Acquire)
2267 }
2268 }
2269
2270 impl WalBatchWriter for OffsetMismatchWalWriter {
2271 fn append_batch(&self, _requests: &[&CommitRequest]) -> Result<Vec<u64>> {
2272 Ok(self.returned_offsets.clone())
2273 }
2274
2275 fn sync(&self) -> Result<()> {
2276 self.sync_count.fetch_add(1, Ordering::Release);
2277 Ok(())
2278 }
2279 }
2280
2281 #[test]
2282 fn test_group_commit_single_request_no_batching() {
2283 let coord = make_coordinator(16);
2284 let batch = vec![req(1, &[10, 20])];
2285 let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2286
2287 assert_eq!(result.committed.len(), 1);
2288 assert_eq!(result.conflicted.len(), 0);
2289 assert_eq!(
2290 result.fsync_count, 1,
2291 "exactly one fsync for single request"
2292 );
2293 assert_eq!(responses.len(), 1);
2294 assert!(matches!(
2295 &responses[0].1,
2296 GroupCommitResponse::Committed { .. }
2297 ));
2298 assert_eq!(coord.wal_handle().sync_count(), 1);
2299 }
2300
2301 #[test]
2302 fn test_group_commit_batch_of_10_single_fsync() {
2303 let coord = make_coordinator(16);
2304 let batch: Vec<CommitRequest> = (1..=10)
2305 .map(|txn_id| req(txn_id, &[txn_id as u32 * 100]))
2306 .collect();
2307
2308 let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2309
2310 assert_eq!(result.committed.len(), 10, "all 10 should commit");
2311 assert_eq!(result.conflicted.len(), 0);
2312 assert_eq!(result.fsync_count, 1, "exactly ONE fsync for 10 requests");
2313 assert_eq!(responses.len(), 10);
2314
2315 let offsets: BTreeSet<u64> = responses
2317 .iter()
2318 .filter_map(|(_, resp)| match resp {
2319 GroupCommitResponse::Committed { wal_offset, .. } => Some(*wal_offset),
2320 GroupCommitResponse::Conflict { .. } => None,
2321 })
2322 .collect();
2323 assert_eq!(offsets.len(), 10, "all 10 should have distinct WAL offsets");
2324
2325 assert_eq!(coord.wal_handle().sync_count(), 1);
2327 assert_eq!(coord.wal_handle().total_appended(), 10);
2328 }
2329
2330 #[test]
2331 fn test_group_commit_conflict_in_batch_partial_success() {
2332 let coord = make_coordinator(16);
2333 let batch = vec![
2339 req(1, &[10, 20]),
2340 req(2, &[30, 40]),
2341 req(3, &[10, 50]),
2342 req(4, &[60]),
2343 req(5, &[30]),
2344 ];
2345
2346 let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2347
2348 assert_eq!(result.committed.len(), 3, "requests 1, 2, 4 should commit");
2349 assert_eq!(
2350 result.conflicted.len(),
2351 2,
2352 "requests 3 and 5 should conflict"
2353 );
2354 assert_eq!(result.fsync_count, 1, "one fsync for valid subset");
2355
2356 let committed_txns: BTreeSet<u64> =
2358 result.committed.iter().map(|(tid, _, _)| *tid).collect();
2359 assert!(committed_txns.contains(&1));
2360 assert!(committed_txns.contains(&2));
2361 assert!(committed_txns.contains(&4));
2362
2363 let conflicted_txns: BTreeSet<u64> =
2364 result.conflicted.iter().map(|(tid, _)| *tid).collect();
2365 assert!(conflicted_txns.contains(&3));
2366 assert!(conflicted_txns.contains(&5));
2367
2368 assert_eq!(responses.len(), 5);
2369 }
2370
2371 #[test]
2372 fn test_group_commit_max_batch_size_respected() {
2373 let coord = make_coordinator(4);
2374 let (sender, receiver) = two_phase_commit_channel(16);
2375
2376 for txn_id in 1..=10_u64 {
2378 let permit = sender.reserve();
2379 permit.send(req(txn_id, &[txn_id as u32 * 100]));
2380 }
2381
2382 let mut total_committed = 0_usize;
2384 let mut total_batches = 0_u32;
2385 while total_committed < 10 {
2386 if let Some(result) = coord
2387 .drain_and_process(&receiver)
2388 .expect("drain should succeed")
2389 {
2390 assert!(
2391 result.committed.len() <= 4,
2392 "batch size must not exceed MAX_BATCH_SIZE=4, got {}",
2393 result.committed.len()
2394 );
2395 total_committed += result.committed.len();
2396 total_batches += 1;
2397 }
2398 }
2399 assert!(
2400 total_batches >= 3,
2401 "10 requests with max_batch=4 needs at least 3 batches, got {total_batches}"
2402 );
2403 }
2404
2405 #[test]
2406 fn test_group_commit_backpressure_channel_full() {
2407 let coord = make_coordinator(16);
2408 let (sender, receiver) = two_phase_commit_channel(2);
2409
2410 let permit1 = sender.reserve();
2412 permit1.send(req(1, &[10]));
2413 let permit2 = sender.reserve();
2414 permit2.send(req(2, &[20]));
2415
2416 let blocked_handle = thread::spawn(move || {
2418 for txn_id in 3..=5_u64 {
2419 let permit = sender.reserve();
2420 permit.send(req(txn_id, &[txn_id as u32 * 100]));
2421 }
2422 });
2423
2424 let result = coord
2426 .drain_and_process(&receiver)
2427 .expect("drain should succeed")
2428 .expect("should have received requests");
2429 assert!(
2430 !result.committed.is_empty(),
2431 "first batch should have committed some"
2432 );
2433
2434 thread::sleep(Duration::from_millis(50));
2436
2437 let mut total = result.committed.len();
2439 while total < 5 {
2440 if let Some(r) = coord
2441 .drain_and_process(&receiver)
2442 .expect("drain should succeed")
2443 {
2444 total += r.committed.len();
2445 }
2446 }
2447 assert_eq!(total, 5, "all 5 requests should eventually succeed");
2448 blocked_handle.join().expect("blocked thread should finish");
2449 }
2450
2451 #[test]
2452 #[allow(clippy::cast_precision_loss)]
2453 fn test_group_commit_throughput_model_2_8x() {
2454 let fsync_delay = Duration::from_micros(50);
2456
2457 let sequential_start = Instant::now();
2459 for txn_id in 1..=10_u64 {
2460 let coord = make_coordinator_with_delay(1, fsync_delay);
2461 let batch = vec![req(txn_id, &[txn_id as u32])];
2462 let _ = coord.process_batch(batch).expect("should succeed");
2463 }
2464 let sequential_elapsed = sequential_start.elapsed();
2465
2466 let batched_start = Instant::now();
2468 let coord_batched = make_coordinator_with_delay(16, fsync_delay);
2469 let batch: Vec<CommitRequest> =
2470 (1..=10).map(|tid| req(tid, &[tid as u32 + 1000])).collect();
2471 let _ = coord_batched.process_batch(batch).expect("should succeed");
2472 let batched_elapsed = batched_start.elapsed();
2473
2474 let speedup = sequential_elapsed.as_secs_f64() / batched_elapsed.as_secs_f64();
2478 println!(
2479 "throughput_model: speedup={speedup:.2}x (seq={sequential_elapsed:?}, batch={batched_elapsed:?})"
2480 );
2481 }
2482
2483 #[test]
2484 fn test_group_commit_publish_after_fsync_ordering() {
2485 let coord = make_coordinator(16);
2486 let batch = vec![req(1, &[10]), req(2, &[20]), req(3, &[30])];
2487 let (_, result) = coord.process_batch(batch).expect("batch should succeed");
2488
2489 assert_eq!(
2491 result.phase_order,
2492 vec![
2493 BatchPhase::Validate,
2494 BatchPhase::WalAppend,
2495 BatchPhase::Fsync,
2496 BatchPhase::Publish,
2497 ],
2498 "phases must execute in strict order"
2499 );
2500
2501 let published = coord.published_versions();
2503 assert_eq!(published.len(), 3, "all 3 versions should be published");
2504 }
2505
2506 #[test]
2507 fn test_group_commit_validate_phase_rejects_before_wal_append() {
2508 let coord = make_coordinator(16);
2509
2510 let _ = coord
2512 .process_batch(vec![req(1, &[10])])
2513 .expect("first batch should succeed");
2514
2515 let batch2 = vec![req(2, &[10, 20]), req(3, &[30])];
2517 let (_, result) = coord
2518 .process_batch(batch2)
2519 .expect("second batch should succeed");
2520
2521 assert_eq!(result.committed.len(), 1);
2523 assert_eq!(result.conflicted.len(), 1);
2524 assert_eq!(result.conflicted[0].0, 2, "txn 2 should be conflicted");
2525 assert_eq!(result.committed[0].0, 3, "txn 3 should be committed");
2526
2527 assert_eq!(result.phase_order[0], BatchPhase::Validate);
2529 assert_eq!(result.phase_order[1], BatchPhase::WalAppend);
2530
2531 assert_eq!(coord.wal_handle().total_appended(), 2);
2534 }
2535
2536 #[test]
2537 fn test_group_commit_empty_batch() {
2538 let coord = make_coordinator(16);
2539 let (_, result) = coord
2540 .process_batch(Vec::new())
2541 .expect("empty batch should succeed");
2542 assert!(result.committed.is_empty());
2543 assert!(result.conflicted.is_empty());
2544 assert_eq!(result.fsync_count, 0, "no fsync for empty batch");
2545 assert!(result.phase_order.is_empty());
2546 }
2547
2548 #[test]
2549 fn test_group_commit_duplicate_txn_ids_keep_distinct_commit_sequences() {
2550 let coord = make_coordinator(16);
2551 let batch = vec![req(7, &[10]), req(7, &[20])];
2552
2553 let (responses, result) = coord.process_batch(batch).expect("batch should succeed");
2554
2555 assert_eq!(result.committed.len(), 2);
2556 let committed_commit_seqs = result
2557 .committed
2558 .iter()
2559 .map(|(_, _, commit_seq)| *commit_seq)
2560 .collect::<Vec<_>>();
2561 assert_eq!(committed_commit_seqs.len(), 2);
2562 assert_ne!(committed_commit_seqs[0], committed_commit_seqs[1]);
2563
2564 let response_commit_seqs = responses
2565 .iter()
2566 .map(|(_, response)| match response {
2567 GroupCommitResponse::Committed { commit_seq, .. } => *commit_seq,
2568 GroupCommitResponse::Conflict { .. } => 0,
2569 })
2570 .collect::<Vec<_>>();
2571 assert_eq!(response_commit_seqs, committed_commit_seqs);
2572 }
2573
2574 #[test]
2575 fn test_group_commit_rejects_wal_offset_count_mismatch() {
2576 let wal = OffsetMismatchWalWriter::new(vec![42]);
2577 let coord = GroupCommitCoordinator::new(
2578 wal,
2579 FirstCommitterWinsValidator,
2580 GroupCommitConfig::default(),
2581 );
2582
2583 let err = coord
2584 .process_batch(vec![req(1, &[10]), req(2, &[20])])
2585 .expect_err("mismatched WAL offsets must fail the batch");
2586 assert!(matches!(err, FrankenError::Internal(_)));
2587 assert_eq!(coord.wal_handle().sync_count(), 0, "fsync must not run");
2588 assert!(
2589 coord.published_versions().is_empty(),
2590 "no versions may be published after a malformed WAL append result"
2591 );
2592 }
2593
2594 #[test]
2595 fn test_group_commit_all_conflict_no_fsync() {
2596 let coord = make_coordinator(16);
2597
2598 let _ = coord
2600 .process_batch(vec![req(1, &[10, 20])])
2601 .expect("first batch should succeed");
2602
2603 let batch = vec![req(2, &[10]), req(3, &[20])];
2605 let (_, result) = coord.process_batch(batch).expect("should succeed");
2606
2607 assert_eq!(result.committed.len(), 0);
2608 assert_eq!(result.conflicted.len(), 2);
2609 assert_eq!(
2610 result.fsync_count, 0,
2611 "no fsync needed when all requests conflict"
2612 );
2613 assert_eq!(result.phase_order, vec![BatchPhase::Validate]);
2615 }
2616
2617 #[test]
2618 fn test_group_commit_run_loop_shutdown() {
2619 let coord = Arc::new(make_coordinator(16));
2620 let (sender, receiver) = two_phase_commit_channel(16);
2621 let loop_cx = Arc::new(Cx::new());
2622
2623 for txn_id in 1..=3_u64 {
2625 let permit = sender.reserve();
2626 permit.send(req(txn_id, &[txn_id as u32 * 100]));
2627 }
2628
2629 let loop_cx_clone = Arc::clone(&loop_cx);
2630 let coord_clone = Arc::clone(&coord);
2631 let handle = thread::spawn(move || coord_clone.run_loop(&receiver, &loop_cx_clone));
2632
2633 thread::sleep(Duration::from_millis(200));
2635 loop_cx.cancel();
2636
2637 handle
2638 .join()
2639 .expect("loop thread should join")
2640 .expect("loop should succeed");
2641
2642 assert!(
2643 coord.total_batches() >= 1,
2644 "should have processed at least one batch"
2645 );
2646 let published = coord.published_versions();
2647 assert_eq!(published.len(), 3, "all 3 should be published");
2648 }
2649
2650 #[test]
2651 fn test_first_committer_wins_validator() {
2652 let validator = FirstCommitterWinsValidator;
2653 let committed: BTreeSet<u32> = [10, 20, 30].into_iter().collect();
2654
2655 assert!(validator.validate(&req(1, &[40, 50]), &committed).is_ok());
2657
2658 let result = validator.validate(&req(2, &[10, 50]), &committed);
2660 assert!(result.is_err());
2661 assert!(result.unwrap_err().contains("page 10"));
2662 }
2663
2664 #[test]
2665 fn test_in_memory_wal_writer_basic() {
2666 let wal = InMemoryWalWriter::new();
2667 let r1 = req(1, &[10]);
2668 let r2 = req(2, &[20]);
2669 let offsets = wal.append_batch(&[&r1, &r2]).expect("append should work");
2670 assert_eq!(offsets.len(), 2);
2671 assert_ne!(offsets[0], offsets[1], "offsets must be distinct");
2672 assert_eq!(wal.total_appended(), 2);
2673 assert_eq!(wal.sync_count(), 0);
2674 wal.sync().expect("sync should work");
2675 assert_eq!(wal.sync_count(), 1);
2676 }
2677}