1use crate::WalGenerationIdentity;
38use fsqlite_types::sync_primitives::{Duration, Instant};
39use serde::{Deserialize, Serialize};
40use std::collections::VecDeque;
41use std::sync::{
42 LazyLock,
43 atomic::{AtomicBool, AtomicU64, Ordering},
44};
45
46use fsqlite_error::{FrankenError, Result};
47use fsqlite_types::cx::Cx;
48use fsqlite_vfs::{SyncKind, VfsFile};
49use tracing::{debug, info, trace};
50
51use crate::wal::{WalAppendFrameRef, WalFile};
52
53fn env_flag_enabled(value: &str) -> bool {
54 let value = value.trim();
55 value == "1"
56 || value.eq_ignore_ascii_case("true")
57 || value.eq_ignore_ascii_case("yes")
58 || value.eq_ignore_ascii_case("on")
59}
60
61#[must_use]
63pub fn detailed_consolidation_metrics_enabled() -> bool {
64 static ENABLED: LazyLock<bool> = LazyLock::new(|| {
65 std::env::var("FSQLITE_WAL_DETAILED_COMMIT_METRICS")
66 .is_ok_and(|value| env_flag_enabled(&value))
67 || std::env::var("FSQLITE_BENCH_PROFILE_INSERT")
68 .is_ok_and(|value| env_flag_enabled(&value))
69 });
70 *ENABLED
71}
72
73static COMMIT_PHASE_TIMING_ENABLED: AtomicBool = AtomicBool::new(false);
74
75pub fn set_commit_phase_timing_enabled(enabled: bool) -> bool {
81 COMMIT_PHASE_TIMING_ENABLED.swap(enabled, Ordering::Relaxed)
82}
83
84#[must_use]
86pub fn commit_phase_timing_forced_enabled() -> bool {
87 COMMIT_PHASE_TIMING_ENABLED.load(Ordering::Relaxed)
88}
89
90#[must_use]
92pub fn commit_phase_timing_enabled() -> bool {
93 detailed_consolidation_metrics_enabled() || commit_phase_timing_forced_enabled()
94}
95
96#[derive(Debug, Clone, Copy)]
102pub struct GroupCommitConfig {
103 pub max_group_size: usize,
107
108 pub max_group_delay: Duration,
112
113 pub max_group_delay_ceiling: Duration,
118}
119
120impl Default for GroupCommitConfig {
121 fn default() -> Self {
122 Self {
123 max_group_size: 64,
124 max_group_delay: Duration::from_millis(1),
125 max_group_delay_ceiling: Duration::from_millis(10),
126 }
127 }
128}
129
130impl GroupCommitConfig {
131 #[must_use]
133 pub fn validated(mut self) -> Self {
134 if self.max_group_size == 0 {
135 self.max_group_size = 1;
136 }
137 if self.max_group_delay > self.max_group_delay_ceiling {
138 self.max_group_delay = self.max_group_delay_ceiling;
139 }
140 self
141 }
142}
143
144#[derive(Debug, Clone)]
150pub struct FrameSubmission {
151 pub page_number: u32,
153 pub page_data: Vec<u8>,
155 pub db_size_if_commit: u32,
157}
158
159#[derive(Debug, Clone)]
161pub struct TransactionFrameBatch {
162 pub frames: Vec<FrameSubmission>,
164 pub conflict_pages: Vec<u32>,
172 pub conflict_snapshot: Option<TransactionConflictSnapshot>,
177 pub context: TransactionFrameBatchContext,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub struct TransactionConflictSnapshot {
184 pub generation: WalGenerationIdentity,
185 pub last_commit_frame: Option<usize>,
186 pub commit_count: u64,
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
191pub struct TransactionFrameBatchContext {
192 pub batch_id: u64,
194 pub lane_id: u16,
196 pub staged_frame_count: u32,
198 pub staging_elapsed_ns: u64,
200}
201
202impl TransactionFrameBatch {
203 #[must_use]
205 pub fn new(frames: Vec<FrameSubmission>) -> Self {
206 Self {
207 frames,
208 conflict_pages: Vec::new(),
209 conflict_snapshot: None,
210 context: TransactionFrameBatchContext::default(),
211 }
212 }
213
214 #[must_use]
216 pub fn with_conflict_snapshot(
217 mut self,
218 conflict_pages: Vec<u32>,
219 conflict_snapshot: Option<TransactionConflictSnapshot>,
220 ) -> Self {
221 self.conflict_pages = conflict_pages;
222 self.conflict_snapshot = conflict_snapshot;
223 self
224 }
225
226 #[must_use]
228 pub fn with_context(mut self, context: TransactionFrameBatchContext) -> Self {
229 self.context = context;
230 self
231 }
232
233 #[must_use]
235 pub fn frame_count(&self) -> usize {
236 self.frames.len()
237 }
238
239 #[must_use]
241 pub fn has_commit_frame(&self) -> bool {
242 self.frames.last().is_some_and(|f| f.db_size_if_commit > 0)
243 }
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252pub enum ConsolidationPhase {
253 Filling,
255 Flushing,
257 Complete,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum SubmitOutcome {
264 Flusher,
266 Waiter,
268}
269
270const PHASE_HISTOGRAM_CAPACITY: usize = 4096;
281
282pub struct PhaseHistogram {
283 samples: Box<[AtomicU64]>,
285 write_idx: AtomicU64,
287 max_us: AtomicU64,
289 count: AtomicU64,
291 sum_us: AtomicU64,
293 recent_tail_us: AtomicU64,
295}
296
297impl PhaseHistogram {
298 #[must_use]
299 pub fn new() -> Self {
300 let samples: Vec<AtomicU64> = std::iter::repeat_with(|| AtomicU64::new(0))
301 .take(PHASE_HISTOGRAM_CAPACITY)
302 .collect();
303 Self {
304 samples: samples.into_boxed_slice(),
305 write_idx: AtomicU64::new(0),
306 max_us: AtomicU64::new(0),
307 count: AtomicU64::new(0),
308 sum_us: AtomicU64::new(0),
309 recent_tail_us: AtomicU64::new(0),
310 }
311 }
312
313 pub fn record(&self, value_us: u64) {
315 let idx =
316 self.write_idx.fetch_add(1, Ordering::Relaxed) as usize % PHASE_HISTOGRAM_CAPACITY;
317 self.samples[idx].store(value_us, Ordering::Relaxed);
318 self.count.fetch_add(1, Ordering::Relaxed);
319 self.sum_us.fetch_add(value_us, Ordering::Relaxed);
320 let mut prev = self.max_us.load(Ordering::Relaxed);
322 while value_us > prev {
323 match self.max_us.compare_exchange_weak(
324 prev,
325 value_us,
326 Ordering::Relaxed,
327 Ordering::Relaxed,
328 ) {
329 Ok(_) => break,
330 Err(actual) => prev = actual,
331 }
332 }
333
334 let mut prev_tail = self.recent_tail_us.load(Ordering::Relaxed);
339 loop {
340 let decayed = prev_tail.saturating_mul(15) / 16;
341 let next_tail = value_us.max(decayed);
342 match self.recent_tail_us.compare_exchange_weak(
343 prev_tail,
344 next_tail,
345 Ordering::Relaxed,
346 Ordering::Relaxed,
347 ) {
348 Ok(_) => break,
349 Err(actual) => prev_tail = actual,
350 }
351 }
352 }
353
354 #[must_use]
356 pub fn recent_tail_us(&self) -> u64 {
357 self.recent_tail_us.load(Ordering::Relaxed)
358 }
359
360 #[must_use]
362 pub fn percentiles(&self) -> PhasePercentiles {
363 let total_count = self.count.load(Ordering::Relaxed);
364 let max = self.max_us.load(Ordering::Relaxed);
365 let sum = self.sum_us.load(Ordering::Relaxed);
366
367 if total_count == 0 {
368 return PhasePercentiles {
369 p50: 0,
370 p95: 0,
371 p99: 0,
372 max: 0,
373 count: 0,
374 mean_us: 0,
375 };
376 }
377
378 let n = total_count.min(PHASE_HISTOGRAM_CAPACITY as u64) as usize;
380 let mut buf = Vec::with_capacity(n);
381 for i in 0..n {
382 buf.push(self.samples[i].load(Ordering::Relaxed));
383 }
384 buf.sort_unstable();
385
386 let p = |pct: usize| -> u64 {
387 if buf.is_empty() {
388 return 0;
389 }
390 let idx = (pct * buf.len()) / 100;
391 buf[idx.min(buf.len() - 1)]
392 };
393
394 PhasePercentiles {
395 p50: p(50),
396 p95: p(95),
397 p99: p(99),
398 max,
399 count: total_count,
400 mean_us: sum / total_count,
401 }
402 }
403
404 pub fn reset(&self) {
406 for s in &self.samples {
407 s.store(0, Ordering::Relaxed);
408 }
409 self.write_idx.store(0, Ordering::Relaxed);
410 self.max_us.store(0, Ordering::Relaxed);
411 self.count.store(0, Ordering::Relaxed);
412 self.sum_us.store(0, Ordering::Relaxed);
413 self.recent_tail_us.store(0, Ordering::Relaxed);
414 }
415}
416
417impl Default for PhaseHistogram {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
425pub struct PhasePercentiles {
426 pub p50: u64,
427 pub p95: u64,
428 pub p99: u64,
429 pub max: u64,
430 pub count: u64,
431 pub mean_us: u64,
432}
433
434pub struct WakeReasonCounters {
440 pub notify: AtomicU64,
442 pub timeout: AtomicU64,
444 pub flusher_takeover: AtomicU64,
446 pub failed_epoch: AtomicU64,
448 pub busy_retry: AtomicU64,
450}
451
452impl WakeReasonCounters {
453 const fn new() -> Self {
454 Self {
455 notify: AtomicU64::new(0),
456 timeout: AtomicU64::new(0),
457 flusher_takeover: AtomicU64::new(0),
458 failed_epoch: AtomicU64::new(0),
459 busy_retry: AtomicU64::new(0),
460 }
461 }
462
463 #[must_use]
465 pub fn snapshot(&self) -> WakeReasonSnapshot {
466 WakeReasonSnapshot {
467 notify: self.notify.load(Ordering::Relaxed),
468 timeout: self.timeout.load(Ordering::Relaxed),
469 flusher_takeover: self.flusher_takeover.load(Ordering::Relaxed),
470 failed_epoch: self.failed_epoch.load(Ordering::Relaxed),
471 busy_retry: self.busy_retry.load(Ordering::Relaxed),
472 }
473 }
474
475 pub fn reset(&self) {
477 self.notify.store(0, Ordering::Relaxed);
478 self.timeout.store(0, Ordering::Relaxed);
479 self.flusher_takeover.store(0, Ordering::Relaxed);
480 self.failed_epoch.store(0, Ordering::Relaxed);
481 self.busy_retry.store(0, Ordering::Relaxed);
482 }
483}
484
485#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
487pub struct WakeReasonSnapshot {
488 pub notify: u64,
489 pub timeout: u64,
490 pub flusher_takeover: u64,
491 pub failed_epoch: u64,
492 pub busy_retry: u64,
493}
494
495impl WakeReasonSnapshot {
496 #[must_use]
498 pub fn total(&self) -> u64 {
499 self.notify + self.timeout + self.flusher_takeover + self.failed_epoch + self.busy_retry
500 }
501}
502
503pub struct ConsolidationMetrics {
509 pub groups_flushed: AtomicU64,
511 pub frames_consolidated: AtomicU64,
513 pub transactions_batched: AtomicU64,
515 pub fsyncs_total: AtomicU64,
517 pub flush_duration_us_total: AtomicU64,
519 pub wait_duration_us_total: AtomicU64,
521 pub max_group_size_observed: AtomicU64,
523 pub busy_retries: AtomicU64,
525
526 pub prepare_us_total: AtomicU64,
529 pub batch_build_us_total: AtomicU64,
531 pub conflict_snapshot_us_total: AtomicU64,
533 pub lane_prepare_us_total: AtomicU64,
535 pub consolidator_lock_wait_us_total: AtomicU64,
537 pub consolidator_flushing_wait_us_total: AtomicU64,
539 pub flusher_arrival_wait_us_total: AtomicU64,
541 pub inner_lock_wait_us_total: AtomicU64,
543 pub exclusive_lock_us_total: AtomicU64,
545 pub wal_append_us_total: AtomicU64,
547 pub flush_frame_prep_us_total: AtomicU64,
549 pub append_conflict_check_us_total: AtomicU64,
551 pub append_frames_us_total: AtomicU64,
553 pub wal_sync_us_total: AtomicU64,
555 pub waiter_epoch_wait_us_total: AtomicU64,
557 pub flusher_commits: AtomicU64,
559 pub waiter_commits: AtomicU64,
561 pub commit_phase_a_us_total: AtomicU64,
564 pub commit_phase_b_us_total: AtomicU64,
566 pub commit_phase_c1_us_total: AtomicU64,
568 pub commit_phase_c2_us_total: AtomicU64,
570 pub commit_phase_count: AtomicU64,
572
573 pub hist_consolidator_lock_wait: PhaseHistogram,
576 pub hist_arrival_wait: PhaseHistogram,
578 pub hist_wal_backend_lock_wait: PhaseHistogram,
580 pub hist_wal_append: PhaseHistogram,
582 pub hist_exclusive_lock: PhaseHistogram,
584 pub hist_waiter_epoch_wait: PhaseHistogram,
586 pub hist_phase_b: PhaseHistogram,
588 pub hist_wal_sync: PhaseHistogram,
590 pub hist_full_commit: PhaseHistogram,
592
593 pub wake_reasons: WakeReasonCounters,
596}
597
598impl ConsolidationMetrics {
599 #[must_use]
601 pub fn new() -> Self {
602 Self {
603 groups_flushed: AtomicU64::new(0),
604 frames_consolidated: AtomicU64::new(0),
605 transactions_batched: AtomicU64::new(0),
606 fsyncs_total: AtomicU64::new(0),
607 flush_duration_us_total: AtomicU64::new(0),
608 wait_duration_us_total: AtomicU64::new(0),
609 max_group_size_observed: AtomicU64::new(0),
610 busy_retries: AtomicU64::new(0),
611 prepare_us_total: AtomicU64::new(0),
613 batch_build_us_total: AtomicU64::new(0),
614 conflict_snapshot_us_total: AtomicU64::new(0),
615 lane_prepare_us_total: AtomicU64::new(0),
616 consolidator_lock_wait_us_total: AtomicU64::new(0),
617 consolidator_flushing_wait_us_total: AtomicU64::new(0),
618 flusher_arrival_wait_us_total: AtomicU64::new(0),
619 inner_lock_wait_us_total: AtomicU64::new(0),
620 exclusive_lock_us_total: AtomicU64::new(0),
621 wal_append_us_total: AtomicU64::new(0),
622 flush_frame_prep_us_total: AtomicU64::new(0),
623 append_conflict_check_us_total: AtomicU64::new(0),
624 append_frames_us_total: AtomicU64::new(0),
625 wal_sync_us_total: AtomicU64::new(0),
626 waiter_epoch_wait_us_total: AtomicU64::new(0),
627 flusher_commits: AtomicU64::new(0),
628 waiter_commits: AtomicU64::new(0),
629 commit_phase_a_us_total: AtomicU64::new(0),
630 commit_phase_b_us_total: AtomicU64::new(0),
631 commit_phase_c1_us_total: AtomicU64::new(0),
632 commit_phase_c2_us_total: AtomicU64::new(0),
633 commit_phase_count: AtomicU64::new(0),
634 hist_consolidator_lock_wait: PhaseHistogram::new(),
636 hist_arrival_wait: PhaseHistogram::new(),
637 hist_wal_backend_lock_wait: PhaseHistogram::new(),
638 hist_wal_append: PhaseHistogram::new(),
639 hist_exclusive_lock: PhaseHistogram::new(),
640 hist_waiter_epoch_wait: PhaseHistogram::new(),
641 hist_phase_b: PhaseHistogram::new(),
642 hist_wal_sync: PhaseHistogram::new(),
643 hist_full_commit: PhaseHistogram::new(),
644 wake_reasons: WakeReasonCounters::new(),
645 }
646 }
647
648 pub fn record_flush(&self, frames: u64, transactions: u64, duration_us: u64) {
650 self.groups_flushed.fetch_add(1, Ordering::Relaxed);
651 self.frames_consolidated
652 .fetch_add(frames, Ordering::Relaxed);
653 self.transactions_batched
654 .fetch_add(transactions, Ordering::Relaxed);
655 self.fsyncs_total.fetch_add(1, Ordering::Relaxed);
656 self.flush_duration_us_total
657 .fetch_add(duration_us, Ordering::Relaxed);
658 self.max_group_size_observed
660 .fetch_max(frames, Ordering::Relaxed);
661 }
662
663 pub fn record_wait(&self, duration_us: u64) {
665 self.wait_duration_us_total
666 .fetch_add(duration_us, Ordering::Relaxed);
667 }
668
669 pub fn record_busy_retry(&self) {
671 self.busy_retries.fetch_add(1, Ordering::Relaxed);
672 }
673
674 pub fn record_prepare_breakdown(
676 &self,
677 batch_build_us: u64,
678 conflict_snapshot_us: u64,
679 lane_prepare_us: u64,
680 ) {
681 self.batch_build_us_total
682 .fetch_add(batch_build_us, Ordering::Relaxed);
683 self.conflict_snapshot_us_total
684 .fetch_add(conflict_snapshot_us, Ordering::Relaxed);
685 self.lane_prepare_us_total
686 .fetch_add(lane_prepare_us, Ordering::Relaxed);
687 }
688
689 pub fn record_flush_breakdown(
691 &self,
692 flush_frame_prep_us: u64,
693 append_conflict_check_us: u64,
694 append_frames_us: u64,
695 ) {
696 self.flush_frame_prep_us_total
697 .fetch_add(flush_frame_prep_us, Ordering::Relaxed);
698 self.append_conflict_check_us_total
699 .fetch_add(append_conflict_check_us, Ordering::Relaxed);
700 self.append_frames_us_total
701 .fetch_add(append_frames_us, Ordering::Relaxed);
702 }
703
704 #[allow(clippy::too_many_arguments)]
706 pub fn record_phase_timing(
707 &self,
708 prepare_us: u64,
709 consolidator_lock_wait_us: u64,
710 consolidator_flushing_wait_us: u64,
711 is_flusher: bool,
712 flusher_arrival_wait_us: u64,
713 inner_lock_wait_us: u64,
714 exclusive_lock_us: u64,
715 wal_append_us: u64,
716 wal_sync_us: u64,
717 waiter_epoch_wait_us: u64,
718 ) {
719 self.prepare_us_total
720 .fetch_add(prepare_us, Ordering::Relaxed);
721 self.consolidator_lock_wait_us_total
722 .fetch_add(consolidator_lock_wait_us, Ordering::Relaxed);
723 self.consolidator_flushing_wait_us_total
724 .fetch_add(consolidator_flushing_wait_us, Ordering::Relaxed);
725
726 self.hist_consolidator_lock_wait
728 .record(consolidator_lock_wait_us);
729
730 if is_flusher {
731 self.flusher_arrival_wait_us_total
732 .fetch_add(flusher_arrival_wait_us, Ordering::Relaxed);
733 self.inner_lock_wait_us_total
734 .fetch_add(inner_lock_wait_us, Ordering::Relaxed);
735 self.exclusive_lock_us_total
736 .fetch_add(exclusive_lock_us, Ordering::Relaxed);
737 self.wal_append_us_total
738 .fetch_add(wal_append_us, Ordering::Relaxed);
739 self.wal_sync_us_total
740 .fetch_add(wal_sync_us, Ordering::Relaxed);
741 self.flusher_commits.fetch_add(1, Ordering::Relaxed);
742
743 self.hist_arrival_wait.record(flusher_arrival_wait_us);
745 self.hist_wal_backend_lock_wait.record(inner_lock_wait_us);
746 self.hist_wal_append.record(wal_append_us);
747 self.hist_exclusive_lock.record(exclusive_lock_us);
748 self.hist_wal_sync.record(wal_sync_us);
749 } else {
750 self.waiter_epoch_wait_us_total
751 .fetch_add(waiter_epoch_wait_us, Ordering::Relaxed);
752 self.waiter_commits.fetch_add(1, Ordering::Relaxed);
753
754 self.hist_waiter_epoch_wait.record(waiter_epoch_wait_us);
756 }
757
758 let phase_b_total = consolidator_lock_wait_us
760 + consolidator_flushing_wait_us
761 + if is_flusher {
762 flusher_arrival_wait_us
763 + inner_lock_wait_us
764 + exclusive_lock_us
765 + wal_append_us
766 + wal_sync_us
767 } else {
768 waiter_epoch_wait_us
769 };
770 self.hist_phase_b.record(phase_b_total);
771 }
772
773 pub fn record_commit_phases(
775 &self,
776 phase_a_us: u64,
777 phase_b_us: u64,
778 phase_c1_us: u64,
779 phase_c2_us: u64,
780 ) {
781 self.commit_phase_a_us_total
782 .fetch_add(phase_a_us, Ordering::Relaxed);
783 self.commit_phase_b_us_total
784 .fetch_add(phase_b_us, Ordering::Relaxed);
785 self.commit_phase_c1_us_total
786 .fetch_add(phase_c1_us, Ordering::Relaxed);
787 self.commit_phase_c2_us_total
788 .fetch_add(phase_c2_us, Ordering::Relaxed);
789 self.commit_phase_count.fetch_add(1, Ordering::Relaxed);
790
791 self.hist_full_commit
793 .record(phase_a_us + phase_b_us + phase_c1_us + phase_c2_us);
794 }
795
796 #[must_use]
798 pub fn snapshot(&self) -> ConsolidationMetricsSnapshot {
799 ConsolidationMetricsSnapshot {
800 groups_flushed: self.groups_flushed.load(Ordering::Relaxed),
801 frames_consolidated: self.frames_consolidated.load(Ordering::Relaxed),
802 transactions_batched: self.transactions_batched.load(Ordering::Relaxed),
803 fsyncs_total: self.fsyncs_total.load(Ordering::Relaxed),
804 flush_duration_us_total: self.flush_duration_us_total.load(Ordering::Relaxed),
805 wait_duration_us_total: self.wait_duration_us_total.load(Ordering::Relaxed),
806 max_group_size_observed: self.max_group_size_observed.load(Ordering::Relaxed),
807 busy_retries: self.busy_retries.load(Ordering::Relaxed),
808 prepare_us_total: self.prepare_us_total.load(Ordering::Relaxed),
810 batch_build_us_total: self.batch_build_us_total.load(Ordering::Relaxed),
811 conflict_snapshot_us_total: self.conflict_snapshot_us_total.load(Ordering::Relaxed),
812 lane_prepare_us_total: self.lane_prepare_us_total.load(Ordering::Relaxed),
813 consolidator_lock_wait_us_total: self
814 .consolidator_lock_wait_us_total
815 .load(Ordering::Relaxed),
816 consolidator_flushing_wait_us_total: self
817 .consolidator_flushing_wait_us_total
818 .load(Ordering::Relaxed),
819 flusher_arrival_wait_us_total: self
820 .flusher_arrival_wait_us_total
821 .load(Ordering::Relaxed),
822 inner_lock_wait_us_total: self.inner_lock_wait_us_total.load(Ordering::Relaxed),
823 exclusive_lock_us_total: self.exclusive_lock_us_total.load(Ordering::Relaxed),
824 wal_append_us_total: self.wal_append_us_total.load(Ordering::Relaxed),
825 flush_frame_prep_us_total: self.flush_frame_prep_us_total.load(Ordering::Relaxed),
826 append_conflict_check_us_total: self
827 .append_conflict_check_us_total
828 .load(Ordering::Relaxed),
829 append_frames_us_total: self.append_frames_us_total.load(Ordering::Relaxed),
830 wal_sync_us_total: self.wal_sync_us_total.load(Ordering::Relaxed),
831 waiter_epoch_wait_us_total: self.waiter_epoch_wait_us_total.load(Ordering::Relaxed),
832 flusher_commits: self.flusher_commits.load(Ordering::Relaxed),
833 waiter_commits: self.waiter_commits.load(Ordering::Relaxed),
834 commit_phase_a_us_total: self.commit_phase_a_us_total.load(Ordering::Relaxed),
835 commit_phase_b_us_total: self.commit_phase_b_us_total.load(Ordering::Relaxed),
836 commit_phase_c1_us_total: self.commit_phase_c1_us_total.load(Ordering::Relaxed),
837 commit_phase_c2_us_total: self.commit_phase_c2_us_total.load(Ordering::Relaxed),
838 commit_phase_count: self.commit_phase_count.load(Ordering::Relaxed),
839 hist_consolidator_lock_wait: self.hist_consolidator_lock_wait.percentiles(),
841 hist_arrival_wait: self.hist_arrival_wait.percentiles(),
842 hist_wal_backend_lock_wait: self.hist_wal_backend_lock_wait.percentiles(),
843 hist_wal_append: self.hist_wal_append.percentiles(),
844 hist_exclusive_lock: self.hist_exclusive_lock.percentiles(),
845 hist_waiter_epoch_wait: self.hist_waiter_epoch_wait.percentiles(),
846 hist_phase_b: self.hist_phase_b.percentiles(),
847 hist_wal_sync: self.hist_wal_sync.percentiles(),
848 hist_full_commit: self.hist_full_commit.percentiles(),
849 wake_reasons: self.wake_reasons.snapshot(),
850 }
851 }
852
853 pub fn reset(&self) {
855 self.groups_flushed.store(0, Ordering::Relaxed);
856 self.frames_consolidated.store(0, Ordering::Relaxed);
857 self.transactions_batched.store(0, Ordering::Relaxed);
858 self.fsyncs_total.store(0, Ordering::Relaxed);
859 self.flush_duration_us_total.store(0, Ordering::Relaxed);
860 self.wait_duration_us_total.store(0, Ordering::Relaxed);
861 self.max_group_size_observed.store(0, Ordering::Relaxed);
862 self.busy_retries.store(0, Ordering::Relaxed);
863 self.prepare_us_total.store(0, Ordering::Relaxed);
865 self.batch_build_us_total.store(0, Ordering::Relaxed);
866 self.conflict_snapshot_us_total.store(0, Ordering::Relaxed);
867 self.lane_prepare_us_total.store(0, Ordering::Relaxed);
868 self.consolidator_lock_wait_us_total
869 .store(0, Ordering::Relaxed);
870 self.consolidator_flushing_wait_us_total
871 .store(0, Ordering::Relaxed);
872 self.flusher_arrival_wait_us_total
873 .store(0, Ordering::Relaxed);
874 self.inner_lock_wait_us_total.store(0, Ordering::Relaxed);
875 self.exclusive_lock_us_total.store(0, Ordering::Relaxed);
876 self.wal_append_us_total.store(0, Ordering::Relaxed);
877 self.flush_frame_prep_us_total.store(0, Ordering::Relaxed);
878 self.append_conflict_check_us_total
879 .store(0, Ordering::Relaxed);
880 self.append_frames_us_total.store(0, Ordering::Relaxed);
881 self.wal_sync_us_total.store(0, Ordering::Relaxed);
882 self.waiter_epoch_wait_us_total.store(0, Ordering::Relaxed);
883 self.flusher_commits.store(0, Ordering::Relaxed);
884 self.waiter_commits.store(0, Ordering::Relaxed);
885 self.commit_phase_a_us_total.store(0, Ordering::Relaxed);
886 self.commit_phase_b_us_total.store(0, Ordering::Relaxed);
887 self.commit_phase_c1_us_total.store(0, Ordering::Relaxed);
888 self.commit_phase_c2_us_total.store(0, Ordering::Relaxed);
889 self.commit_phase_count.store(0, Ordering::Relaxed);
890 self.hist_consolidator_lock_wait.reset();
892 self.hist_arrival_wait.reset();
893 self.hist_wal_backend_lock_wait.reset();
894 self.hist_wal_append.reset();
895 self.hist_exclusive_lock.reset();
896 self.hist_waiter_epoch_wait.reset();
897 self.hist_phase_b.reset();
898 self.hist_wal_sync.reset();
899 self.hist_full_commit.reset();
900 self.wake_reasons.reset();
901 }
902}
903
904impl Default for ConsolidationMetrics {
905 fn default() -> Self {
906 Self::new()
907 }
908}
909
910#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
912pub struct ConsolidationMetricsSnapshot {
913 pub groups_flushed: u64,
914 pub frames_consolidated: u64,
915 pub transactions_batched: u64,
916 pub fsyncs_total: u64,
917 pub flush_duration_us_total: u64,
918 pub wait_duration_us_total: u64,
919 pub max_group_size_observed: u64,
920 pub busy_retries: u64,
921 pub prepare_us_total: u64,
923 pub batch_build_us_total: u64,
924 pub conflict_snapshot_us_total: u64,
925 pub lane_prepare_us_total: u64,
926 pub consolidator_lock_wait_us_total: u64,
927 pub consolidator_flushing_wait_us_total: u64,
928 pub flusher_arrival_wait_us_total: u64,
929 pub inner_lock_wait_us_total: u64,
930 pub exclusive_lock_us_total: u64,
931 pub wal_append_us_total: u64,
932 pub flush_frame_prep_us_total: u64,
933 pub append_conflict_check_us_total: u64,
934 pub append_frames_us_total: u64,
935 pub wal_sync_us_total: u64,
936 pub waiter_epoch_wait_us_total: u64,
937 pub flusher_commits: u64,
938 pub waiter_commits: u64,
939 pub commit_phase_a_us_total: u64,
941 pub commit_phase_b_us_total: u64,
942 pub commit_phase_c1_us_total: u64,
943 pub commit_phase_c2_us_total: u64,
944 pub commit_phase_count: u64,
945 pub hist_consolidator_lock_wait: PhasePercentiles,
947 pub hist_arrival_wait: PhasePercentiles,
948 pub hist_wal_backend_lock_wait: PhasePercentiles,
949 pub hist_wal_append: PhasePercentiles,
950 pub hist_exclusive_lock: PhasePercentiles,
951 pub hist_waiter_epoch_wait: PhasePercentiles,
952 pub hist_phase_b: PhasePercentiles,
953 pub hist_wal_sync: PhasePercentiles,
954 pub hist_full_commit: PhasePercentiles,
955 pub wake_reasons: WakeReasonSnapshot,
957}
958
959impl ConsolidationMetricsSnapshot {
960 #[must_use]
962 pub fn avg_group_size(&self) -> u64 {
963 self.frames_consolidated
964 .checked_div(self.groups_flushed)
965 .unwrap_or(0)
966 }
967
968 #[must_use]
970 pub fn avg_transactions_per_group(&self) -> u64 {
971 self.transactions_batched
972 .checked_div(self.groups_flushed)
973 .unwrap_or(0)
974 }
975
976 #[must_use]
978 pub fn avg_flush_duration_us(&self) -> u64 {
979 self.flush_duration_us_total
980 .checked_div(self.groups_flushed)
981 .unwrap_or(0)
982 }
983
984 #[must_use]
989 pub fn fsync_reduction_ratio(&self) -> u64 {
990 self.transactions_batched
991 .checked_div(self.fsyncs_total)
992 .unwrap_or(0)
993 }
994
995 #[must_use]
997 pub fn total_commits(&self) -> u64 {
998 self.flusher_commits.saturating_add(self.waiter_commits)
999 }
1000
1001 #[must_use]
1003 pub fn avg_prepare_us(&self) -> u64 {
1004 self.prepare_us_total
1005 .checked_div(self.total_commits())
1006 .unwrap_or(0)
1007 }
1008
1009 #[must_use]
1011 pub fn avg_consolidator_lock_wait_us(&self) -> u64 {
1012 self.consolidator_lock_wait_us_total
1013 .checked_div(self.total_commits())
1014 .unwrap_or(0)
1015 }
1016
1017 #[must_use]
1019 pub fn avg_wal_io_us(&self) -> u64 {
1020 self.wal_append_us_total
1021 .saturating_add(self.wal_sync_us_total)
1022 .checked_div(self.flusher_commits)
1023 .unwrap_or(0)
1024 }
1025
1026 #[must_use]
1028 pub fn avg_waiter_wait_us(&self) -> u64 {
1029 self.waiter_epoch_wait_us_total
1030 .checked_div(self.waiter_commits)
1031 .unwrap_or(0)
1032 }
1033
1034 #[must_use]
1040 pub fn flusher_lock_wait_us_total(&self) -> u64 {
1041 self.inner_lock_wait_us_total
1042 .saturating_add(self.exclusive_lock_us_total)
1043 .saturating_add(self.consolidator_flushing_wait_us_total)
1044 }
1045
1046 #[must_use]
1049 pub fn wal_service_us_total(&self) -> u64 {
1050 self.wal_append_us_total
1051 .saturating_add(self.wal_sync_us_total)
1052 }
1053
1054 #[must_use]
1058 #[allow(clippy::cast_precision_loss)]
1059 pub fn flusher_lock_wait_fraction(&self) -> f64 {
1060 let lock = self.flusher_lock_wait_us_total();
1061 let service = self.wal_service_us_total();
1062 let total = lock.saturating_add(service);
1063 if total == 0 {
1064 return 0.0;
1065 }
1066 lock as f64 / total as f64
1067 }
1068
1069 #[must_use]
1071 pub fn is_lock_topology_limited(&self) -> bool {
1072 self.flusher_lock_wait_us_total() > self.wal_service_us_total()
1073 }
1074
1075 #[must_use]
1077 pub fn phase_timing_report(&self) -> String {
1078 let total = self.total_commits();
1079 if total == 0 {
1080 return "no commits".to_string();
1081 }
1082
1083 let avg_prepare = self.avg_prepare_us();
1085 let avg_consol_lock = self.avg_consolidator_lock_wait_us();
1086 let avg_flushing_wait = self
1087 .consolidator_flushing_wait_us_total
1088 .checked_div(total)
1089 .unwrap_or(0);
1090
1091 let avg_arrival_wait = self
1093 .flusher_arrival_wait_us_total
1094 .checked_div(self.flusher_commits)
1095 .unwrap_or(0);
1096 let avg_inner_lock = self
1097 .inner_lock_wait_us_total
1098 .checked_div(self.flusher_commits)
1099 .unwrap_or(0);
1100 let avg_excl_lock = self
1101 .exclusive_lock_us_total
1102 .checked_div(self.flusher_commits)
1103 .unwrap_or(0);
1104 let avg_append = self
1105 .wal_append_us_total
1106 .checked_div(self.flusher_commits)
1107 .unwrap_or(0);
1108 let avg_sync = self
1109 .wal_sync_us_total
1110 .checked_div(self.flusher_commits)
1111 .unwrap_or(0);
1112
1113 let avg_epoch_wait = self.avg_waiter_wait_us();
1115
1116 format!(
1117 "commits: {} (flusher={}, waiter={})\n\
1118 per-commit avg:\n\
1119 ├─ prepare: {}µs\n\
1120 ├─ consolidator_lock_wait: {}µs\n\
1121 ├─ flushing_wait: {}µs\n\
1122 flusher path ({} commits):\n\
1123 ├─ arrival_wait: {}µs\n\
1124 ├─ inner_lock_wait: {}µs\n\
1125 ├─ exclusive_lock: {}µs\n\
1126 ├─ wal_append: {}µs\n\
1127 └─ wal_sync: {}µs (total WAL I/O: {}µs)\n\
1128 waiter path ({} commits):\n\
1129 └─ epoch_wait: {}µs\n\
1130 full commit path ({} commits):\n\
1131 ├─ phase_A (prepare+inner.lock): {}µs\n\
1132 ├─ phase_B (group_commit): {}µs\n\
1133 ├─ phase_C1 (post-commit+inner.lock): {}µs\n\
1134 └─ phase_C2 (publish): {}µs",
1135 total,
1136 self.flusher_commits,
1137 self.waiter_commits,
1138 avg_prepare,
1139 avg_consol_lock,
1140 avg_flushing_wait,
1141 self.flusher_commits,
1142 avg_arrival_wait,
1143 avg_inner_lock,
1144 avg_excl_lock,
1145 avg_append,
1146 avg_sync,
1147 avg_append + avg_sync,
1148 self.waiter_commits,
1149 avg_epoch_wait,
1150 self.commit_phase_count,
1151 self.commit_phase_a_us_total
1152 .checked_div(self.commit_phase_count)
1153 .unwrap_or(0),
1154 self.commit_phase_b_us_total
1155 .checked_div(self.commit_phase_count)
1156 .unwrap_or(0),
1157 self.commit_phase_c1_us_total
1158 .checked_div(self.commit_phase_count)
1159 .unwrap_or(0),
1160 self.commit_phase_c2_us_total
1161 .checked_div(self.commit_phase_count)
1162 .unwrap_or(0),
1163 )
1164 }
1165}
1166
1167impl std::fmt::Display for ConsolidationMetricsSnapshot {
1168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1169 write!(
1170 f,
1171 "groups={} frames={} txns={} fsyncs={} avg_group={} \
1172 avg_flush_us={} max_group={} busy_retries={} reduction={}x",
1173 self.groups_flushed,
1174 self.frames_consolidated,
1175 self.transactions_batched,
1176 self.fsyncs_total,
1177 self.avg_group_size(),
1178 self.avg_flush_duration_us(),
1179 self.max_group_size_observed,
1180 self.busy_retries,
1181 self.fsync_reduction_ratio(),
1182 )
1183 }
1184}
1185
1186pub static GLOBAL_CONSOLIDATION_METRICS: LazyLock<ConsolidationMetrics> =
1188 LazyLock::new(ConsolidationMetrics::new);
1189
1190#[cfg(test)]
1191pub(crate) static GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK: LazyLock<std::sync::Mutex<()>> =
1192 LazyLock::new(|| std::sync::Mutex::new(()));
1193
1194#[derive(Debug)]
1205pub struct GroupCommitConsolidator {
1206 phase: ConsolidationPhase,
1208 pending_batches: VecDeque<TransactionFrameBatch>,
1210 pending_frame_count: usize,
1212 config: GroupCommitConfig,
1214 filling_started: Option<Instant>,
1216 epoch: u64,
1218 completed_epoch: u64,
1220 next_epoch_batches: VecDeque<TransactionFrameBatch>,
1224 next_epoch_frame_count: usize,
1226 promoted_epoch_flusher_vacant: bool,
1229}
1230
1231impl GroupCommitConsolidator {
1232 #[must_use]
1234 pub fn new(config: GroupCommitConfig) -> Self {
1235 let config = config.validated();
1236 Self {
1237 phase: ConsolidationPhase::Filling,
1238 pending_batches: VecDeque::new(),
1239 pending_frame_count: 0,
1240 config,
1241 filling_started: None,
1242 epoch: 0,
1243 completed_epoch: 0,
1244 next_epoch_batches: VecDeque::new(),
1245 next_epoch_frame_count: 0,
1246 promoted_epoch_flusher_vacant: false,
1247 }
1248 }
1249
1250 #[must_use]
1252 pub const fn phase(&self) -> ConsolidationPhase {
1253 self.phase
1254 }
1255
1256 #[must_use]
1258 pub const fn epoch(&self) -> u64 {
1259 self.epoch
1260 }
1261
1262 #[must_use]
1264 pub const fn max_group_delay(&self) -> Duration {
1265 self.config.max_group_delay
1266 }
1267
1268 #[must_use]
1270 pub const fn pending_frame_count(&self) -> usize {
1271 self.pending_frame_count
1272 }
1273
1274 #[must_use]
1276 pub fn pending_batch_count(&self) -> usize {
1277 self.pending_batches.len()
1278 }
1279
1280 pub fn submit_batch(&mut self, batch: TransactionFrameBatch) -> Result<SubmitOutcome> {
1289 if self.phase == ConsolidationPhase::Flushing {
1294 self.next_epoch_frame_count += batch.frame_count();
1295 self.next_epoch_batches.push_back(batch);
1296
1297 trace!(
1298 target: "fsqlite_wal::group_commit",
1299 epoch = self.epoch,
1300 next_epoch_frames = self.next_epoch_frame_count,
1301 next_epoch_batches = self.next_epoch_batches.len(),
1302 "batch pipelined for next epoch (submitted during FLUSHING)"
1303 );
1304
1305 return Ok(SubmitOutcome::Waiter);
1308 }
1309
1310 if self.phase == ConsolidationPhase::Complete {
1312 self.transition_to_filling();
1313 }
1314
1315 let is_first = self.pending_batches.is_empty();
1316
1317 if is_first {
1318 self.filling_started = Some(Instant::now());
1319 self.promoted_epoch_flusher_vacant = false;
1320 }
1321
1322 self.pending_frame_count += batch.frame_count();
1323 self.pending_batches.push_back(batch);
1324
1325 let outcome = if is_first {
1326 SubmitOutcome::Flusher
1327 } else {
1328 SubmitOutcome::Waiter
1329 };
1330
1331 trace!(
1332 target: "fsqlite_wal::group_commit",
1333 epoch = self.epoch,
1334 pending_frames = self.pending_frame_count,
1335 pending_batches = self.pending_batches.len(),
1336 outcome = ?outcome,
1337 "batch submitted"
1338 );
1339
1340 Ok(outcome)
1341 }
1342
1343 #[must_use]
1349 pub fn should_flush_now(&self) -> bool {
1350 if self.pending_frame_count >= self.config.max_group_size {
1351 return true;
1352 }
1353 if let Some(started) = self.filling_started {
1354 if started.elapsed() >= self.config.max_group_delay {
1355 return true;
1356 }
1357 }
1358 false
1359 }
1360
1361 #[must_use]
1363 pub fn time_until_flush(&self) -> Duration {
1364 if self.pending_frame_count >= self.config.max_group_size {
1365 return Duration::ZERO;
1366 }
1367 self.filling_started
1368 .map_or(self.config.max_group_delay, |started| {
1369 self.config
1370 .max_group_delay
1371 .saturating_sub(started.elapsed())
1372 })
1373 }
1374
1375 #[must_use]
1377 pub fn fill_age(&self) -> Duration {
1378 self.filling_started
1379 .map_or(Duration::ZERO, |started| started.elapsed())
1380 }
1381
1382 pub fn begin_flush(&mut self) -> Result<Vec<TransactionFrameBatch>> {
1391 if self.phase != ConsolidationPhase::Filling {
1392 return Err(FrankenError::Internal(format!(
1393 "begin_flush called in {:?} phase, expected Filling",
1394 self.phase
1395 )));
1396 }
1397
1398 self.phase = ConsolidationPhase::Flushing;
1399 self.promoted_epoch_flusher_vacant = false;
1400 self.epoch += 1;
1401
1402 let batches: Vec<_> = self.pending_batches.drain(..).collect();
1403 let frame_count = self.pending_frame_count;
1404 self.pending_frame_count = 0;
1405
1406 debug!(
1407 target: "fsqlite_wal::group_commit",
1408 epoch = self.epoch,
1409 batches = batches.len(),
1410 frames = frame_count,
1411 "begin_flush: FILLING → FLUSHING"
1412 );
1413
1414 Ok(batches)
1415 }
1416
1417 pub fn complete_flush(&mut self) -> Result<bool> {
1427 if self.phase != ConsolidationPhase::Flushing {
1428 return Err(FrankenError::Internal(format!(
1429 "complete_flush called in {:?} phase, expected Flushing",
1430 self.phase
1431 )));
1432 }
1433
1434 self.completed_epoch = self.epoch;
1435 self.filling_started = None;
1436
1437 if self.next_epoch_batches.is_empty() {
1443 self.phase = ConsolidationPhase::Complete;
1444 self.promoted_epoch_flusher_vacant = false;
1445 debug!(
1446 target: "fsqlite_wal::group_commit",
1447 epoch = self.epoch,
1448 "complete_flush: FLUSHING → COMPLETE"
1449 );
1450 Ok(false)
1451 } else {
1452 let promoted_count = self.next_epoch_batches.len();
1453 let promoted_frames = self.next_epoch_frame_count;
1454 self.pending_batches = std::mem::take(&mut self.next_epoch_batches);
1455 self.pending_frame_count = self.next_epoch_frame_count;
1456 self.next_epoch_frame_count = 0;
1457 self.phase = ConsolidationPhase::Filling;
1458 self.filling_started = Some(Instant::now());
1459 self.promoted_epoch_flusher_vacant = true;
1460
1461 debug!(
1462 target: "fsqlite_wal::group_commit",
1463 epoch = self.epoch,
1464 promoted_batches = promoted_count,
1465 promoted_frames = promoted_frames,
1466 "complete_flush: FLUSHING → FILLING (epoch pipelining)"
1467 );
1468 Ok(true) }
1470 }
1471
1472 #[must_use]
1474 pub fn has_pipelined_batches(&self) -> bool {
1475 !self.next_epoch_batches.is_empty()
1476 }
1477
1478 #[must_use]
1481 pub const fn has_flusher_vacancy(&self) -> bool {
1482 self.promoted_epoch_flusher_vacant
1483 }
1484
1485 #[must_use]
1490 pub fn claim_flusher_vacancy(&mut self) -> bool {
1491 if self.phase == ConsolidationPhase::Filling
1492 && self.promoted_epoch_flusher_vacant
1493 && !self.pending_batches.is_empty()
1494 {
1495 self.promoted_epoch_flusher_vacant = false;
1496 return true;
1497 }
1498 false
1499 }
1500
1501 pub fn abort_flush(&mut self) -> Result<()> {
1510 if self.phase != ConsolidationPhase::Flushing {
1511 return Err(FrankenError::Internal(format!(
1512 "abort_flush called in {:?} phase, expected Flushing",
1513 self.phase
1514 )));
1515 }
1516
1517 if self.next_epoch_batches.is_empty() {
1521 self.phase = ConsolidationPhase::Complete;
1522 self.filling_started = None;
1523 self.promoted_epoch_flusher_vacant = false;
1524 } else {
1525 self.pending_batches = std::mem::take(&mut self.next_epoch_batches);
1526 self.pending_frame_count = self.next_epoch_frame_count;
1527 self.next_epoch_frame_count = 0;
1528 self.phase = ConsolidationPhase::Filling;
1529 self.filling_started = Some(Instant::now());
1530 self.promoted_epoch_flusher_vacant = true;
1531 }
1533
1534 debug!(
1535 target: "fsqlite_wal::group_commit",
1536 epoch = self.epoch,
1537 "abort_flush: FLUSHING → {:?}",
1538 self.phase
1539 );
1540
1541 Ok(())
1542 }
1543
1544 fn transition_to_filling(&mut self) {
1546 self.phase = ConsolidationPhase::Filling;
1547 self.filling_started = None;
1548 self.promoted_epoch_flusher_vacant = false;
1549 trace!(
1550 target: "fsqlite_wal::group_commit",
1551 epoch = self.epoch,
1552 "COMPLETE → FILLING"
1553 );
1554 }
1555
1556 #[must_use]
1558 pub const fn completed_epoch(&self) -> u64 {
1559 self.completed_epoch
1560 }
1561}
1562
1563pub fn write_consolidated_frames<F: VfsFile>(
1578 cx: &Cx,
1579 wal: &mut WalFile<F>,
1580 batches: &[TransactionFrameBatch],
1581) -> Result<usize> {
1582 let frame_size = wal.frame_size();
1583 let total_frames: usize = batches.iter().map(TransactionFrameBatch::frame_count).sum();
1584 if total_frames == 0 {
1585 return Ok(0);
1586 }
1587
1588 let total_bytes = total_frames
1589 .checked_mul(frame_size)
1590 .ok_or_else(|| FrankenError::Internal("frame batch size overflow".to_owned()))?;
1591 let frame_refs = batches.iter().flat_map(|batch| {
1592 batch.frames.iter().map(|frame| WalAppendFrameRef {
1593 page_number: frame.page_number,
1594 page_data: &frame.page_data,
1595 db_size_if_commit: frame.db_size_if_commit,
1596 })
1597 });
1598
1599 let span = tracing::info_span!(
1600 target: "fsqlite_wal::group_commit",
1601 "consolidated_write",
1602 total_frames,
1603 total_bytes,
1604 batches = batches.len(),
1605 );
1606 let _guard = span.enter();
1607
1608 wal.append_frame_iter(cx, total_frames, frame_refs)?;
1609 wal.durable_sync(cx, SyncKind::FullDurable)?;
1610 let bytes_written = u64::try_from(total_bytes).unwrap_or(u64::MAX);
1611
1612 info!(
1613 target: "fsqlite_wal::group_commit",
1614 frames_written = total_frames,
1615 bytes_written,
1616 batches = batches.len(),
1617 "consolidated write + fsync complete"
1618 );
1619
1620 Ok(total_frames)
1621}
1622
1623#[cfg(test)]
1628mod tests {
1629 use fsqlite_types::flags::VfsOpenFlags;
1630 use fsqlite_vfs::MemoryVfs;
1631 use fsqlite_vfs::traits::Vfs;
1632
1633 use super::*;
1634 use crate::checksum::WalSalts;
1635
1636 const PAGE_SIZE: u32 = 4096;
1637
1638 fn test_cx() -> Cx {
1639 Cx::default()
1640 }
1641
1642 fn test_salts() -> WalSalts {
1643 WalSalts {
1644 salt1: 0xDEAD_BEEF,
1645 salt2: 0xCAFE_BABE,
1646 }
1647 }
1648
1649 fn sample_page(seed: u8) -> Vec<u8> {
1650 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
1651 let mut page = vec![0u8; page_size];
1652 for (i, byte) in page.iter_mut().enumerate() {
1653 let reduced = u8::try_from(i % 251).expect("modulo fits u8");
1654 *byte = reduced ^ seed;
1655 }
1656 page
1657 }
1658
1659 fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
1660 let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
1661 let (file, _) = vfs
1662 .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
1663 .expect("open WAL file");
1664 file
1665 }
1666
1667 struct ResetGlobalConsolidationMetrics;
1668
1669 impl Drop for ResetGlobalConsolidationMetrics {
1670 fn drop(&mut self) {
1671 GLOBAL_CONSOLIDATION_METRICS.reset();
1672 }
1673 }
1674
1675 fn with_global_consolidation_metrics<T>(body: impl FnOnce() -> T) -> T {
1676 let _guard = GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
1677 .lock()
1678 .expect("global consolidation metrics test lock poisoned");
1679 let _reset = ResetGlobalConsolidationMetrics;
1680 GLOBAL_CONSOLIDATION_METRICS.reset();
1681 body()
1682 }
1683
1684 #[test]
1687 fn test_consolidator_initial_state() {
1688 let c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1689 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1690 assert_eq!(c.epoch(), 0);
1691 assert_eq!(c.pending_frame_count(), 0);
1692 assert_eq!(c.pending_batch_count(), 0);
1693 }
1694
1695 #[test]
1696 fn test_consolidator_first_writer_becomes_flusher() {
1697 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1698 let batch = TransactionFrameBatch::new(vec![FrameSubmission {
1699 page_number: 1,
1700 page_data: sample_page(0x01),
1701 db_size_if_commit: 0,
1702 }]);
1703 let outcome = c.submit_batch(batch).unwrap();
1704 assert_eq!(outcome, SubmitOutcome::Flusher);
1705 assert_eq!(c.pending_frame_count(), 1);
1706 assert_eq!(c.pending_batch_count(), 1);
1707 }
1708
1709 #[test]
1710 fn test_consolidator_second_writer_becomes_waiter() {
1711 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1712
1713 let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1714 page_number: 1,
1715 page_data: sample_page(0x01),
1716 db_size_if_commit: 0,
1717 }]);
1718 assert_eq!(c.submit_batch(batch1).unwrap(), SubmitOutcome::Flusher);
1719
1720 let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
1721 page_number: 2,
1722 page_data: sample_page(0x02),
1723 db_size_if_commit: 0,
1724 }]);
1725 assert_eq!(c.submit_batch(batch2).unwrap(), SubmitOutcome::Waiter);
1726 assert_eq!(c.pending_frame_count(), 2);
1727 assert_eq!(c.pending_batch_count(), 2);
1728 }
1729
1730 #[test]
1731 fn test_consolidator_filling_flushing_complete_cycle() {
1732 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1733
1734 for i in 0..3u8 {
1736 let batch = TransactionFrameBatch::new(vec![FrameSubmission {
1737 page_number: u32::from(i) + 1,
1738 page_data: sample_page(i),
1739 db_size_if_commit: if i == 2 { 3 } else { 0 },
1740 }]);
1741 c.submit_batch(batch).unwrap();
1742 }
1743 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1744 assert_eq!(c.pending_frame_count(), 3);
1745
1746 let batches = c.begin_flush().unwrap();
1748 assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1749 assert_eq!(batches.len(), 3);
1750 assert_eq!(c.epoch(), 1);
1751 assert_eq!(c.pending_frame_count(), 0);
1752
1753 let batch_extra = TransactionFrameBatch::new(vec![FrameSubmission {
1755 page_number: 10,
1756 page_data: sample_page(0x10),
1757 db_size_if_commit: 0,
1758 }]);
1759 assert_eq!(c.submit_batch(batch_extra).unwrap(), SubmitOutcome::Waiter);
1760
1761 let promoted = c.complete_flush().unwrap();
1763 assert!(promoted);
1764 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1765 assert_eq!(c.completed_epoch(), 1);
1766 assert_eq!(c.pending_batch_count(), 1);
1767 assert!(c.has_flusher_vacancy());
1768
1769 let batches = c.begin_flush().unwrap();
1771 assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1772 assert_eq!(c.epoch(), 2);
1773 assert_eq!(batches.len(), 1);
1774 }
1775
1776 #[test]
1777 fn test_consolidator_auto_transitions_complete_to_filling() {
1778 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1779
1780 let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1782 page_number: 1,
1783 page_data: sample_page(0x01),
1784 db_size_if_commit: 1,
1785 }]);
1786 c.submit_batch(batch1).unwrap();
1787 c.begin_flush().unwrap();
1788 c.complete_flush().unwrap();
1789 assert_eq!(c.phase(), ConsolidationPhase::Complete);
1790
1791 let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
1793 page_number: 2,
1794 page_data: sample_page(0x02),
1795 db_size_if_commit: 2,
1796 }]);
1797 let outcome = c.submit_batch(batch2).unwrap();
1798 assert_eq!(outcome, SubmitOutcome::Flusher);
1799 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1800 }
1801
1802 #[test]
1803 fn test_consolidator_should_flush_on_max_group_size() {
1804 let config = GroupCommitConfig {
1805 max_group_size: 3,
1806 ..GroupCommitConfig::default()
1807 };
1808 let mut c = GroupCommitConsolidator::new(config);
1809
1810 for i in 0..2u8 {
1812 c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1813 page_number: u32::from(i) + 1,
1814 page_data: sample_page(i),
1815 db_size_if_commit: 0,
1816 }]))
1817 .unwrap();
1818 }
1819 assert!(!c.should_flush_now());
1820
1821 c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1823 page_number: 3,
1824 page_data: sample_page(2),
1825 db_size_if_commit: 3,
1826 }]))
1827 .unwrap();
1828 assert!(c.should_flush_now());
1829 }
1830
1831 #[test]
1832 fn test_consolidator_begin_flush_errors_in_wrong_phase() {
1833 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1834
1835 c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1837 page_number: 1,
1838 page_data: sample_page(0x01),
1839 db_size_if_commit: 1,
1840 }]))
1841 .unwrap();
1842 c.begin_flush().unwrap();
1843
1844 assert!(c.begin_flush().is_err());
1846 }
1847
1848 #[test]
1849 fn test_consolidator_complete_flush_errors_in_wrong_phase() {
1850 let c = &mut GroupCommitConsolidator::new(GroupCommitConfig::default());
1851 assert!(c.complete_flush().is_err());
1853 }
1854
1855 #[test]
1856 fn test_consolidator_abort_flush_releases_epoch_and_allows_next_cycle() {
1857 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1858
1859 c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1860 page_number: 1,
1861 page_data: sample_page(0x01),
1862 db_size_if_commit: 1,
1863 }]))
1864 .unwrap();
1865 c.begin_flush().unwrap();
1866 c.abort_flush().unwrap();
1867 assert_eq!(c.phase(), ConsolidationPhase::Complete);
1868 assert_eq!(c.completed_epoch(), 0);
1869
1870 let outcome = c
1871 .submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1872 page_number: 2,
1873 page_data: sample_page(0x02),
1874 db_size_if_commit: 2,
1875 }]))
1876 .unwrap();
1877 assert_eq!(outcome, SubmitOutcome::Flusher);
1878 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1879 assert_eq!(c.pending_batch_count(), 1);
1880 assert_eq!(c.epoch(), 1);
1881 }
1882
1883 #[test]
1884 fn test_consolidator_promoted_epoch_exposes_flusher_takeover_claim_if_original_stops() {
1885 let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1886
1887 let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1888 page_number: 1,
1889 page_data: sample_page(0x01),
1890 db_size_if_commit: 1,
1891 }]);
1892 assert_eq!(c.submit_batch(batch1).unwrap(), SubmitOutcome::Flusher);
1893
1894 let _flushing_batches = c.begin_flush().unwrap();
1895 assert_eq!(c.epoch(), 1);
1896 assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1897
1898 let pipelined_batch = TransactionFrameBatch::new(vec![FrameSubmission {
1899 page_number: 2,
1900 page_data: sample_page(0x02),
1901 db_size_if_commit: 2,
1902 }]);
1903 assert_eq!(
1904 c.submit_batch(pipelined_batch).unwrap(),
1905 SubmitOutcome::Waiter
1906 );
1907
1908 let promoted = c.complete_flush().unwrap();
1909 assert!(
1910 promoted,
1911 "pipelined epoch must be promoted back to FILLING for the next flush"
1912 );
1913 assert_eq!(c.phase(), ConsolidationPhase::Filling);
1914 assert_eq!(c.pending_batch_count(), 1);
1915 assert_eq!(c.pending_frame_count(), 1);
1916 assert_eq!(c.epoch(), 1);
1917 assert_eq!(c.completed_epoch(), 1);
1918 assert!(c.has_flusher_vacancy());
1919
1920 let takeover_batch = TransactionFrameBatch::new(vec![FrameSubmission {
1921 page_number: 3,
1922 page_data: sample_page(0x03),
1923 db_size_if_commit: 3,
1924 }]);
1925 assert_eq!(
1926 c.submit_batch(takeover_batch).unwrap(),
1927 SubmitOutcome::Waiter,
1928 "promoted work stays queued until someone explicitly claims the flusher vacancy"
1929 );
1930 assert!(c.claim_flusher_vacancy());
1931 assert!(!c.has_flusher_vacancy());
1932 assert!(!c.claim_flusher_vacancy());
1933
1934 let takeover_batches = c.begin_flush().unwrap();
1935 assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1936 assert_eq!(c.epoch(), 2);
1937 assert_eq!(takeover_batches.len(), 2);
1938 }
1939
1940 #[test]
1943 fn test_consolidated_write_single_batch() {
1944 let cx = test_cx();
1945 let vfs = MemoryVfs::new();
1946 let file = open_wal_file(&vfs, &cx);
1947 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1948
1949 let batches = vec![TransactionFrameBatch::new(vec![
1950 FrameSubmission {
1951 page_number: 1,
1952 page_data: sample_page(0x01),
1953 db_size_if_commit: 0,
1954 },
1955 FrameSubmission {
1956 page_number: 2,
1957 page_data: sample_page(0x02),
1958 db_size_if_commit: 0,
1959 },
1960 FrameSubmission {
1961 page_number: 3,
1962 page_data: sample_page(0x03),
1963 db_size_if_commit: 3,
1964 },
1965 ])];
1966
1967 let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
1968 assert_eq!(written, 3);
1969 assert_eq!(wal.frame_count(), 3);
1970
1971 for i in 0..3u32 {
1973 let (header, data) = wal
1974 .read_frame(&cx, usize::try_from(i).unwrap())
1975 .expect("read frame");
1976 assert_eq!(header.page_number, i + 1);
1977 let seed = u8::try_from(i + 1).expect("fits");
1978 assert_eq!(data, sample_page(seed));
1979 }
1980
1981 let last_header = wal.read_frame_header(&cx, 2).expect("read header");
1983 assert!(last_header.is_commit());
1984 assert_eq!(last_header.db_size, 3);
1985
1986 wal.close(&cx).expect("close WAL");
1987 }
1988
1989 #[test]
1990 fn test_consolidated_write_multiple_batches() {
1991 let cx = test_cx();
1992 let vfs = MemoryVfs::new();
1993 let file = open_wal_file(&vfs, &cx);
1994 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1995
1996 let batches = vec![
1998 TransactionFrameBatch::new(vec![
1999 FrameSubmission {
2000 page_number: 10,
2001 page_data: sample_page(0x10),
2002 db_size_if_commit: 0,
2003 },
2004 FrameSubmission {
2005 page_number: 11,
2006 page_data: sample_page(0x11),
2007 db_size_if_commit: 11,
2008 },
2009 ]),
2010 TransactionFrameBatch::new(vec![
2011 FrameSubmission {
2012 page_number: 20,
2013 page_data: sample_page(0x20),
2014 db_size_if_commit: 0,
2015 },
2016 FrameSubmission {
2017 page_number: 21,
2018 page_data: sample_page(0x21),
2019 db_size_if_commit: 21,
2020 },
2021 ]),
2022 ];
2023
2024 let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2025 assert_eq!(written, 4);
2026 assert_eq!(wal.frame_count(), 4);
2027
2028 let expected_pages = [10, 11, 20, 21];
2030 for (i, &expected_page) in expected_pages.iter().enumerate() {
2031 let header = wal.read_frame_header(&cx, i).expect("read header");
2032 assert_eq!(header.page_number, expected_page);
2033 }
2034
2035 wal.close(&cx).expect("close WAL");
2036 }
2037
2038 #[test]
2039 fn test_consolidated_write_preserves_checksum_chain() {
2040 let cx = test_cx();
2041 let vfs = MemoryVfs::new();
2042 let file = open_wal_file(&vfs, &cx);
2043 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2044
2045 wal.append_frame(&cx, 1, &sample_page(0x01), 0)
2047 .expect("append");
2048 wal.append_frame(&cx, 2, &sample_page(0x02), 2)
2049 .expect("append commit");
2050 assert_eq!(wal.frame_count(), 2);
2051 let _checksum_after_2 = wal.running_checksum();
2052
2053 let batches = vec![TransactionFrameBatch::new(vec![
2055 FrameSubmission {
2056 page_number: 3,
2057 page_data: sample_page(0x03),
2058 db_size_if_commit: 0,
2059 },
2060 FrameSubmission {
2061 page_number: 4,
2062 page_data: sample_page(0x04),
2063 db_size_if_commit: 4,
2064 },
2065 ])];
2066
2067 let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2068 assert_eq!(written, 2);
2069 assert_eq!(wal.frame_count(), 4);
2070
2071 wal.close(&cx).expect("close WAL");
2073 let file2 = open_wal_file(&vfs, &cx);
2074 let wal2 = WalFile::open(&cx, file2).expect("reopen WAL");
2075 assert_eq!(
2076 wal2.frame_count(),
2077 4,
2078 "all 4 frames should be valid on reopen (checksum chain intact)"
2079 );
2080
2081 wal2.close(&cx).expect("close WAL");
2082 }
2083
2084 #[test]
2085 fn test_consolidated_write_empty_batch() {
2086 let cx = test_cx();
2087 let vfs = MemoryVfs::new();
2088 let file = open_wal_file(&vfs, &cx);
2089 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2090
2091 let written = write_consolidated_frames(&cx, &mut wal, &[]).expect("write empty");
2092 assert_eq!(written, 0);
2093 assert_eq!(wal.frame_count(), 0);
2094
2095 wal.close(&cx).expect("close WAL");
2096 }
2097
2098 #[test]
2099 fn test_consolidated_write_page_size_mismatch_rejected() {
2100 let cx = test_cx();
2101 let vfs = MemoryVfs::new();
2102 let file = open_wal_file(&vfs, &cx);
2103 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2104
2105 let batches = vec![TransactionFrameBatch::new(vec![FrameSubmission {
2106 page_number: 1,
2107 page_data: vec![0u8; 100], db_size_if_commit: 0,
2109 }])];
2110
2111 assert!(
2112 write_consolidated_frames(&cx, &mut wal, &batches).is_err(),
2113 "wrong page size should be rejected"
2114 );
2115
2116 wal.close(&cx).expect("close WAL");
2117 }
2118
2119 #[test]
2122 fn test_consolidation_metrics_basic() {
2123 let m = ConsolidationMetrics::new();
2124 m.record_flush(10, 3, 500);
2125 m.record_flush(20, 5, 1000);
2126 m.record_wait(100);
2127 m.record_busy_retry();
2128 m.record_busy_retry();
2129
2130 let snap = m.snapshot();
2131 assert_eq!(snap.groups_flushed, 2);
2132 assert_eq!(snap.frames_consolidated, 30);
2133 assert_eq!(snap.transactions_batched, 8);
2134 assert_eq!(snap.fsyncs_total, 2);
2135 assert_eq!(snap.flush_duration_us_total, 1500);
2136 assert_eq!(snap.wait_duration_us_total, 100);
2137 assert_eq!(snap.max_group_size_observed, 20);
2138 assert_eq!(snap.busy_retries, 2);
2139 assert_eq!(snap.avg_group_size(), 15);
2140 assert_eq!(snap.avg_transactions_per_group(), 4);
2141 assert_eq!(snap.avg_flush_duration_us(), 750);
2142 assert_eq!(snap.fsync_reduction_ratio(), 4);
2143 }
2144
2145 #[test]
2146 fn test_consolidation_metrics_reset() {
2147 let m = ConsolidationMetrics::new();
2148 m.record_flush(10, 3, 500);
2149 m.record_busy_retry();
2150 m.reset();
2151 let snap = m.snapshot();
2152 assert_eq!(snap.groups_flushed, 0);
2153 assert_eq!(snap.frames_consolidated, 0);
2154 assert_eq!(snap.busy_retries, 0);
2155 }
2156
2157 #[test]
2158 fn test_consolidation_metrics_display() {
2159 let m = ConsolidationMetrics::new();
2160 m.record_flush(10, 5, 500);
2161 m.record_busy_retry();
2162 let s = m.snapshot().to_string();
2163 assert!(s.contains("groups=1"));
2164 assert!(s.contains("frames=10"));
2165 assert!(s.contains("txns=5"));
2166 assert!(s.contains("busy_retries=1"));
2167 assert!(s.contains("reduction=5x"));
2168 }
2169
2170 #[test]
2171 fn test_consolidation_metrics_snapshot_serializes_phase_distributions() {
2172 let m = ConsolidationMetrics::new();
2173 m.record_phase_timing(10, 5, 2, true, 20, 3, 8, 50, 30, 0);
2174 m.record_phase_timing(11, 6, 2, false, 0, 0, 0, 0, 0, 100);
2175 m.wake_reasons.notify.fetch_add(1, Ordering::Relaxed);
2176 m.wake_reasons.timeout.fetch_add(2, Ordering::Relaxed);
2177
2178 let encoded =
2179 serde_json::to_value(m.snapshot()).expect("consolidation snapshot should serialize");
2180
2181 assert_eq!(
2182 encoded["hist_wal_append"]["count"].as_u64(),
2183 Some(1),
2184 "flusher histogram should serialize sample counts"
2185 );
2186 assert_eq!(
2187 encoded["hist_waiter_epoch_wait"]["count"].as_u64(),
2188 Some(1),
2189 "waiter histogram should serialize sample counts"
2190 );
2191 assert_eq!(
2192 encoded["wake_reasons"]["notify"].as_u64(),
2193 Some(1),
2194 "wake reasons should serialize nested counters"
2195 );
2196 assert_eq!(
2197 encoded["wake_reasons"]["timeout"].as_u64(),
2198 Some(2),
2199 "wake reasons should preserve all fields"
2200 );
2201 }
2202
2203 #[test]
2204 fn test_phase_histogram_recent_tail_decays_without_snapshot() {
2205 let h = PhaseHistogram::new();
2206 h.record(1_600);
2207 assert_eq!(h.recent_tail_us(), 1_600);
2208
2209 h.record(0);
2210 assert_eq!(
2211 h.recent_tail_us(),
2212 1_500,
2213 "recent tail should decay by one sixteenth per sample"
2214 );
2215
2216 h.record(2_000);
2217 assert_eq!(
2218 h.recent_tail_us(),
2219 2_000,
2220 "new spikes should replace the decayed tail immediately"
2221 );
2222
2223 h.reset();
2224 assert_eq!(h.recent_tail_us(), 0);
2225 }
2226
2227 #[test]
2233 fn test_fsync_reduction_deterministic_proof() {
2234 with_global_consolidation_metrics(|| {
2235 let n = 10_u64;
2236 GLOBAL_CONSOLIDATION_METRICS.record_flush(n * 2, n, 1000);
2237
2238 let snap = GLOBAL_CONSOLIDATION_METRICS.snapshot();
2239 assert_eq!(snap.fsyncs_total, 1);
2240 assert_eq!(snap.transactions_batched, n);
2241 assert_eq!(
2242 snap.fsync_reduction_ratio(),
2243 n,
2244 "10 transactions in 1 fsync = 10x reduction"
2245 );
2246 });
2247 }
2248
2249 #[test]
2252 fn test_config_validated_clamps_zero_group_size() {
2253 let config = GroupCommitConfig {
2254 max_group_size: 0,
2255 ..GroupCommitConfig::default()
2256 };
2257 let validated = config.validated();
2258 assert_eq!(validated.max_group_size, 1);
2259 }
2260
2261 #[test]
2262 fn test_config_validated_clamps_excessive_delay() {
2263 let config = GroupCommitConfig {
2264 max_group_delay: Duration::from_millis(100),
2265 max_group_delay_ceiling: Duration::from_millis(10),
2266 ..GroupCommitConfig::default()
2267 };
2268 let validated = config.validated();
2269 assert_eq!(validated.max_group_delay, Duration::from_millis(10));
2270 }
2271
2272 #[test]
2275 fn test_batch_has_commit_frame() {
2276 let batch_with_commit = TransactionFrameBatch::new(vec![
2277 FrameSubmission {
2278 page_number: 1,
2279 page_data: vec![],
2280 db_size_if_commit: 0,
2281 },
2282 FrameSubmission {
2283 page_number: 2,
2284 page_data: vec![],
2285 db_size_if_commit: 5,
2286 },
2287 ]);
2288 assert!(batch_with_commit.has_commit_frame());
2289
2290 let batch_without = TransactionFrameBatch::new(vec![FrameSubmission {
2291 page_number: 1,
2292 page_data: vec![],
2293 db_size_if_commit: 0,
2294 }]);
2295 assert!(!batch_without.has_commit_frame());
2296 }
2297
2298 #[test]
2301 fn test_full_consolidation_cycle_with_wal_write() {
2302 let cx = test_cx();
2303 let vfs = MemoryVfs::new();
2304 let file = open_wal_file(&vfs, &cx);
2305 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2306
2307 let mut consolidator = GroupCommitConsolidator::new(GroupCommitConfig {
2308 max_group_size: 10,
2309 ..GroupCommitConfig::default()
2310 });
2311
2312 let batch1 = TransactionFrameBatch::new(vec![
2314 FrameSubmission {
2315 page_number: 1,
2316 page_data: sample_page(0x01),
2317 db_size_if_commit: 0,
2318 },
2319 FrameSubmission {
2320 page_number: 2,
2321 page_data: sample_page(0x02),
2322 db_size_if_commit: 2,
2323 },
2324 ]);
2325 let outcome1 = consolidator.submit_batch(batch1).unwrap();
2326 assert_eq!(outcome1, SubmitOutcome::Flusher);
2327
2328 let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
2329 page_number: 3,
2330 page_data: sample_page(0x03),
2331 db_size_if_commit: 3,
2332 }]);
2333 let outcome2 = consolidator.submit_batch(batch2).unwrap();
2334 assert_eq!(outcome2, SubmitOutcome::Waiter);
2335
2336 let batch3 = TransactionFrameBatch::new(vec![
2337 FrameSubmission {
2338 page_number: 4,
2339 page_data: sample_page(0x04),
2340 db_size_if_commit: 0,
2341 },
2342 FrameSubmission {
2343 page_number: 5,
2344 page_data: sample_page(0x05),
2345 db_size_if_commit: 5,
2346 },
2347 ]);
2348 let outcome3 = consolidator.submit_batch(batch3).unwrap();
2349 assert_eq!(outcome3, SubmitOutcome::Waiter);
2350
2351 let batches = consolidator.begin_flush().unwrap();
2353 assert_eq!(batches.len(), 3);
2354
2355 let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2357 assert_eq!(written, 5);
2358
2359 consolidator.complete_flush().unwrap();
2361 assert_eq!(consolidator.phase(), ConsolidationPhase::Complete);
2362
2363 assert_eq!(wal.frame_count(), 5);
2365
2366 wal.close(&cx).expect("close WAL");
2368 let file2 = open_wal_file(&vfs, &cx);
2369 let wal2 = WalFile::open(&cx, file2).expect("reopen WAL");
2370 assert_eq!(wal2.frame_count(), 5, "all frames valid on reopen");
2371 wal2.close(&cx).expect("close WAL");
2372 }
2373
2374 #[test]
2377 fn phase_histogram_empty_returns_zeros() {
2378 let h = PhaseHistogram::new();
2379 let p = h.percentiles();
2380 assert_eq!(p.count, 0);
2381 assert_eq!(p.p50, 0);
2382 assert_eq!(p.p99, 0);
2383 assert_eq!(p.max, 0);
2384 assert_eq!(p.mean_us, 0);
2385 }
2386
2387 #[test]
2388 fn phase_histogram_single_sample() {
2389 let h = PhaseHistogram::new();
2390 h.record(42);
2391 let p = h.percentiles();
2392 assert_eq!(p.count, 1);
2393 assert_eq!(p.p50, 42);
2394 assert_eq!(p.p95, 42);
2395 assert_eq!(p.p99, 42);
2396 assert_eq!(p.max, 42);
2397 assert_eq!(p.mean_us, 42);
2398 }
2399
2400 #[test]
2401 fn phase_histogram_percentiles_ordered() {
2402 let h = PhaseHistogram::new();
2403 for i in 1..=100 {
2404 h.record(i);
2405 }
2406 let p = h.percentiles();
2407 assert_eq!(p.count, 100);
2408 assert!(p.p50 <= p.p95, "p50={} should be <= p95={}", p.p50, p.p95);
2409 assert!(p.p95 <= p.p99, "p95={} should be <= p99={}", p.p95, p.p99);
2410 assert!(p.p99 <= p.max, "p99={} should be <= max={}", p.p99, p.max);
2411 assert_eq!(p.max, 100);
2412 assert_eq!(p.mean_us, 50);
2413 }
2414
2415 #[test]
2416 fn phase_histogram_max_tracks_outlier() {
2417 let h = PhaseHistogram::new();
2418 for _ in 0..100 {
2419 h.record(10);
2420 }
2421 h.record(99999);
2422 let p = h.percentiles();
2423 assert_eq!(p.max, 99999);
2424 assert_eq!(p.p50, 10);
2425 }
2426
2427 #[test]
2428 fn phase_histogram_reset_clears_state() {
2429 let h = PhaseHistogram::new();
2430 for i in 0..50 {
2431 h.record(i * 10);
2432 }
2433 h.reset();
2434 let p = h.percentiles();
2435 assert_eq!(p.count, 0);
2436 assert_eq!(p.max, 0);
2437 }
2438
2439 #[test]
2440 fn phase_histogram_concurrent_writers_no_panic() {
2441 use std::sync::Arc;
2442 let h = Arc::new(PhaseHistogram::new());
2443 let barrier = Arc::new(std::sync::Barrier::new(4));
2444 let mut handles = Vec::new();
2445 for t in 0..4u64 {
2446 let h = Arc::clone(&h);
2447 let b = Arc::clone(&barrier);
2448 handles.push(std::thread::spawn(move || {
2449 b.wait();
2450 for i in 0..500 {
2451 h.record(t * 1000 + i);
2452 }
2453 }));
2454 }
2455 for handle in handles {
2456 handle.join().unwrap();
2457 }
2458 let p = h.percentiles();
2459 assert_eq!(p.count, 2000);
2460 assert!(p.max >= 3499);
2461 }
2462
2463 #[test]
2466 fn wake_reason_counters_track_all_reasons() {
2467 let w = WakeReasonCounters::new();
2468 w.notify.fetch_add(10, Ordering::Relaxed);
2469 w.timeout.fetch_add(3, Ordering::Relaxed);
2470 w.flusher_takeover.fetch_add(1, Ordering::Relaxed);
2471 w.failed_epoch.fetch_add(2, Ordering::Relaxed);
2472 w.busy_retry.fetch_add(5, Ordering::Relaxed);
2473 let s = w.snapshot();
2474 assert_eq!(s.notify, 10);
2475 assert_eq!(s.timeout, 3);
2476 assert_eq!(s.flusher_takeover, 1);
2477 assert_eq!(s.failed_epoch, 2);
2478 assert_eq!(s.busy_retry, 5);
2479 assert_eq!(s.total(), 21);
2480 }
2481
2482 #[test]
2483 fn wake_reason_reset_clears() {
2484 let w = WakeReasonCounters::new();
2485 w.notify.fetch_add(99, Ordering::Relaxed);
2486 w.reset();
2487 let s = w.snapshot();
2488 assert_eq!(s.total(), 0);
2489 }
2490
2491 #[test]
2494 fn consolidation_metrics_snapshot_includes_distributions() {
2495 with_global_consolidation_metrics(|| {
2496 for i in 0..10u64 {
2497 GLOBAL_CONSOLIDATION_METRICS.record_phase_timing(
2498 10 + i,
2499 5 + i,
2500 2,
2501 true,
2502 20 + i,
2503 3 + i,
2504 8 + i,
2505 50 + i,
2506 30 + i,
2507 0,
2508 );
2509 }
2510 for i in 0..5u64 {
2511 GLOBAL_CONSOLIDATION_METRICS.record_phase_timing(
2512 10 + i,
2513 5 + i,
2514 2,
2515 false,
2516 0,
2517 0,
2518 0,
2519 0,
2520 0,
2521 100 + i,
2522 );
2523 }
2524
2525 let snap = GLOBAL_CONSOLIDATION_METRICS.snapshot();
2526 assert_eq!(snap.hist_consolidator_lock_wait.count, 15);
2527 assert_eq!(snap.hist_arrival_wait.count, 10);
2528 assert_eq!(snap.hist_wal_append.count, 10);
2529 assert_eq!(snap.hist_waiter_epoch_wait.count, 5);
2530 assert_eq!(snap.hist_phase_b.count, 15);
2531 assert!(snap.hist_wal_append.p50 > 0);
2532 assert!(snap.hist_wal_append.max >= 59);
2533 });
2534 }
2535
2536 #[test]
2537 fn transaction_conflict_snapshot_debug_clone_copy_eq() {
2538 let generation = WalGenerationIdentity {
2539 checkpoint_seq: 0,
2540 salts: WalSalts { salt1: 0, salt2: 0 },
2541 };
2542 let a = TransactionConflictSnapshot {
2543 generation,
2544 last_commit_frame: Some(42),
2545 commit_count: 7,
2546 };
2547 let copied = a;
2548 assert_eq!(copied, a);
2549 let b = TransactionConflictSnapshot {
2550 generation,
2551 last_commit_frame: None,
2552 commit_count: 7,
2553 };
2554 assert_ne!(a, b);
2555 let dbg = format!("{a:?}");
2556 assert!(dbg.contains("TransactionConflictSnapshot"));
2557 }
2558
2559 #[test]
2560 fn transaction_frame_batch_context_default_and_eq() {
2561 let def = TransactionFrameBatchContext::default();
2562 assert_eq!(def.batch_id, 0);
2563 assert_eq!(def.lane_id, 0);
2564 assert_eq!(def.staged_frame_count, 0);
2565 assert_eq!(def.staging_elapsed_ns, 0);
2566 let other = TransactionFrameBatchContext {
2567 batch_id: 1,
2568 lane_id: 3,
2569 staged_frame_count: 10,
2570 staging_elapsed_ns: 500,
2571 };
2572 assert_ne!(def, other);
2573 let copied = other;
2574 assert_eq!(copied, other);
2575 let dbg = format!("{def:?}");
2576 assert!(dbg.contains("TransactionFrameBatchContext"));
2577 }
2578
2579 #[test]
2580 fn consolidation_phase_and_submit_outcome_all_variants() {
2581 let phases = [
2582 ConsolidationPhase::Filling,
2583 ConsolidationPhase::Flushing,
2584 ConsolidationPhase::Complete,
2585 ];
2586 for (i, p) in phases.iter().enumerate() {
2587 let copied = *p;
2588 assert_eq!(copied, *p);
2589 for (j, q) in phases.iter().enumerate() {
2590 assert_eq!(i == j, p == q);
2591 }
2592 }
2593 assert_ne!(SubmitOutcome::Flusher, SubmitOutcome::Waiter);
2594 let copied = SubmitOutcome::Flusher;
2595 assert_eq!(copied, SubmitOutcome::Flusher);
2596 let dbg = format!("{:?}", SubmitOutcome::Waiter);
2597 assert!(dbg.contains("Waiter"));
2598 }
2599
2600 #[test]
2601 fn transaction_frame_batch_builders() {
2602 let frame = FrameSubmission {
2603 page_number: 5,
2604 page_data: vec![0u8; 16],
2605 db_size_if_commit: 0,
2606 };
2607 let batch = TransactionFrameBatch::new(vec![frame.clone()])
2608 .with_conflict_snapshot(vec![5, 10], None)
2609 .with_context(TransactionFrameBatchContext {
2610 batch_id: 99,
2611 lane_id: 2,
2612 staged_frame_count: 1,
2613 staging_elapsed_ns: 100,
2614 });
2615 assert_eq!(batch.frame_count(), 1);
2616 assert!(!batch.has_commit_frame());
2617 assert_eq!(batch.conflict_pages, vec![5, 10]);
2618 assert!(batch.conflict_snapshot.is_none());
2619 assert_eq!(batch.context.batch_id, 99);
2620 assert_eq!(batch.context.lane_id, 2);
2621 }
2622
2623 #[test]
2624 fn group_commit_config_default_copy_debug() {
2625 let cfg = GroupCommitConfig::default();
2626 let copied = cfg;
2627 assert_eq!(copied.max_group_size, 64);
2628 assert_eq!(copied.max_group_delay, Duration::from_millis(1));
2629 assert_eq!(copied.max_group_delay_ceiling, Duration::from_millis(10));
2630 let dbg = format!("{cfg:?}");
2631 assert!(dbg.contains("GroupCommitConfig"));
2632 }
2633
2634 #[test]
2635 fn frame_submission_debug_clone() {
2636 let fs = FrameSubmission {
2637 page_number: 42,
2638 page_data: vec![0xAB; 8],
2639 db_size_if_commit: 0,
2640 };
2641 let cloned = fs.clone();
2642 assert_eq!(cloned.page_number, 42);
2643 assert_eq!(cloned.page_data.len(), 8);
2644 assert_eq!(cloned.db_size_if_commit, 0);
2645 let dbg = format!("{fs:?}");
2646 assert!(dbg.contains("FrameSubmission"));
2647 }
2648
2649 #[test]
2650 fn phase_percentiles_default_copy_eq() {
2651 let pp = PhasePercentiles::default();
2652 let copied = pp;
2653 assert_eq!(copied, pp);
2654 assert_eq!(pp.p50, 0);
2655 assert_eq!(pp.p99, 0);
2656 assert_eq!(pp.max, 0);
2657 assert_eq!(pp.count, 0);
2658 }
2659
2660 #[test]
2661 fn wake_reason_snapshot_default_total_zero() {
2662 let ws = WakeReasonSnapshot::default();
2663 assert_eq!(ws.total(), 0);
2664 let copied = ws;
2665 assert_eq!(copied, ws);
2666 let dbg = format!("{ws:?}");
2667 assert!(dbg.contains("WakeReasonSnapshot"));
2668 }
2669}