Skip to main content

fsqlite_wal/
group_commit.rs

1//! Group commit with consolidation for WAL frame writes (bd-ncivz.3).
2//!
3//! Amortizes `fsync` overhead across multiple concurrent transactions by
4//! batching WAL frame writes into a single I/O + fsync operation.
5//!
6//! # Consolidation Protocol
7//!
8//! Writers submit sealed frame batches to a consolidation queue.
9//! The protocol transitions through three phases:
10//!
11//! ```text
12//! FILLING ──▶ FLUSHING ──▶ COMPLETE ──▶ FILLING (next epoch)
13//! ```
14//!
15//! - **FILLING**: Accepting new frame batches from writers.
16//! - **FLUSHING**: The flusher (first writer to arrive) writes all accumulated
17//!   frames to the WAL file via a single consolidated I/O, then fsyncs.
18//! - **COMPLETE**: All waiters are notified; committed frames are durable.
19//!
20//! The first writer to enter a FILLING phase becomes the *flusher*.
21//! Subsequent writers add their frames and park on a condvar. When the
22//! flusher decides to flush (batch full OR max delay exceeded), it writes
23//! all accumulated frames, fsyncs once, and wakes all parked writers.
24//!
25//! # I/O Optimization
26//!
27//! Consolidated writes serialize all frame buffers into a single contiguous
28//! write to the WAL file, avoiding per-frame syscall overhead. The single
29//! `fsync` after the batch write makes all frames durable atomically.
30//!
31//! # Tuning
32//!
33//! - `max_group_size`: Maximum frames per group before forced flush (default: 64).
34//! - `max_group_delay`: Maximum time to wait for additional writers before
35//!   flushing (default: 1ms). Bounded to ensure tail latency.
36
37use crate::WalGenerationIdentity;
38use fsqlite_types::sync_primitives::{Duration, Instant};
39use serde::{Deserialize, Serialize};
40use std::collections::VecDeque;
41use std::sync::{
42    LazyLock,
43    atomic::{AtomicBool, AtomicU64, Ordering},
44};
45
46use fsqlite_error::{FrankenError, Result};
47use fsqlite_types::cx::Cx;
48use fsqlite_vfs::{SyncKind, VfsFile};
49use tracing::{debug, info, trace};
50
51use crate::wal::{WalAppendFrameRef, WalFile};
52
53fn env_flag_enabled(value: &str) -> bool {
54    let value = value.trim();
55    value == "1"
56        || value.eq_ignore_ascii_case("true")
57        || value.eq_ignore_ascii_case("yes")
58        || value.eq_ignore_ascii_case("on")
59}
60
61/// Whether expensive per-substep consolidation timing is enabled.
62#[must_use]
63pub fn detailed_consolidation_metrics_enabled() -> bool {
64    static ENABLED: LazyLock<bool> = LazyLock::new(|| {
65        std::env::var("FSQLITE_WAL_DETAILED_COMMIT_METRICS")
66            .is_ok_and(|value| env_flag_enabled(&value))
67            || std::env::var("FSQLITE_BENCH_PROFILE_INSERT")
68                .is_ok_and(|value| env_flag_enabled(&value))
69    });
70    *ENABLED
71}
72
73static COMMIT_PHASE_TIMING_ENABLED: AtomicBool = AtomicBool::new(false);
74
75/// Enable or disable WAL commit phase timing for explicit profiling windows.
76///
77/// Normal commits still update correctness-critical WAL state and cheap frame
78/// counters, but they do not need to sample the wall clock for every phase
79/// unless a caller is collecting a hot-path profile or detailed WAL metrics.
80pub fn set_commit_phase_timing_enabled(enabled: bool) -> bool {
81    COMMIT_PHASE_TIMING_ENABLED.swap(enabled, Ordering::Relaxed)
82}
83
84/// Whether commit phase timing has been explicitly enabled by a profiling caller.
85#[must_use]
86pub fn commit_phase_timing_forced_enabled() -> bool {
87    COMMIT_PHASE_TIMING_ENABLED.load(Ordering::Relaxed)
88}
89
90/// Whether commit phase timing should sample `Instant::now()`.
91#[must_use]
92pub fn commit_phase_timing_enabled() -> bool {
93    detailed_consolidation_metrics_enabled() || commit_phase_timing_forced_enabled()
94}
95
96// ---------------------------------------------------------------------------
97// Configuration
98// ---------------------------------------------------------------------------
99
100/// Configuration for group commit consolidation.
101#[derive(Debug, Clone, Copy)]
102pub struct GroupCommitConfig {
103    /// Maximum number of frames per consolidated group before forced flush.
104    ///
105    /// Default: 64 frames (~260 KB at 4 KB page size).
106    pub max_group_size: usize,
107
108    /// Maximum time to wait for additional writers before flushing.
109    ///
110    /// Default: 1ms. Bounded to ensure tail latency stays under 10ms.
111    pub max_group_delay: Duration,
112
113    /// Hard ceiling on group delay (the maximum the tunable can be set to).
114    ///
115    /// Default: 10ms. This is the absolute upper bound on commit latency
116    /// added by group commit batching.
117    pub max_group_delay_ceiling: Duration,
118}
119
120impl Default for GroupCommitConfig {
121    fn default() -> Self {
122        Self {
123            max_group_size: 64,
124            max_group_delay: Duration::from_millis(1),
125            max_group_delay_ceiling: Duration::from_millis(10),
126        }
127    }
128}
129
130impl GroupCommitConfig {
131    /// Validate and clamp configuration values.
132    #[must_use]
133    pub fn validated(mut self) -> Self {
134        if self.max_group_size == 0 {
135            self.max_group_size = 1;
136        }
137        if self.max_group_delay > self.max_group_delay_ceiling {
138            self.max_group_delay = self.max_group_delay_ceiling;
139        }
140        self
141    }
142}
143
144// ---------------------------------------------------------------------------
145// Frame submission
146// ---------------------------------------------------------------------------
147
148/// A single WAL frame submitted for consolidated writing.
149#[derive(Debug, Clone)]
150pub struct FrameSubmission {
151    /// Database page number this frame writes.
152    pub page_number: u32,
153    /// Page data (must be exactly `page_size` bytes).
154    pub page_data: Vec<u8>,
155    /// Database size in pages for commit frames, or 0 for non-commit frames.
156    pub db_size_if_commit: u32,
157}
158
159/// A batch of frames from a single transaction, submitted atomically.
160#[derive(Debug, Clone)]
161pub struct TransactionFrameBatch {
162    /// Frames belonging to this transaction, in write order.
163    pub frames: Vec<FrameSubmission>,
164    /// Pages that must obey first-committer-wins against committed WAL frames
165    /// newer than this transaction's read snapshot.
166    ///
167    /// The process-local MVCC registry catches conflicts between connections
168    /// in the same process. These fields carry the same commit intent through
169    /// the process-global WAL group-commit queue so the eventual flusher can
170    /// also reject stale cross-process commits under the WAL append gate.
171    pub conflict_pages: Vec<u32>,
172    /// WAL visibility snapshot pinned when the submitting transaction began.
173    /// If another process has committed any of `conflict_pages` after this
174    /// horizon, the batch must fail with BUSY_SNAPSHOT instead of appending a
175    /// stale page image that could hide the other writer's committed rows.
176    pub conflict_snapshot: Option<TransactionConflictSnapshot>,
177    /// Lane-local staging context captured before group-commit submission.
178    pub context: TransactionFrameBatchContext,
179}
180
181/// WAL conflict horizon captured by a submitting transaction.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub struct TransactionConflictSnapshot {
184    pub generation: WalGenerationIdentity,
185    pub last_commit_frame: Option<usize>,
186    pub commit_count: u64,
187}
188
189/// Lane-local staging context attached to a transaction batch.
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
191pub struct TransactionFrameBatchContext {
192    /// Monotonic identifier used to correlate the batch with lane-local staging.
193    pub batch_id: u64,
194    /// Stable lane identity chosen for the submitting writer.
195    pub lane_id: u16,
196    /// Number of frames locally staged for this batch.
197    pub staged_frame_count: u32,
198    /// Time spent in local staging before queue submission.
199    pub staging_elapsed_ns: u64,
200}
201
202impl TransactionFrameBatch {
203    /// Create a new batch with the given frames.
204    #[must_use]
205    pub fn new(frames: Vec<FrameSubmission>) -> Self {
206        Self {
207            frames,
208            conflict_pages: Vec::new(),
209            conflict_snapshot: None,
210            context: TransactionFrameBatchContext::default(),
211        }
212    }
213
214    /// Attach cross-process conflict metadata for the submitting transaction.
215    #[must_use]
216    pub fn with_conflict_snapshot(
217        mut self,
218        conflict_pages: Vec<u32>,
219        conflict_snapshot: Option<TransactionConflictSnapshot>,
220    ) -> Self {
221        self.conflict_pages = conflict_pages;
222        self.conflict_snapshot = conflict_snapshot;
223        self
224    }
225
226    /// Attach lane-local staging context to this batch.
227    #[must_use]
228    pub fn with_context(mut self, context: TransactionFrameBatchContext) -> Self {
229        self.context = context;
230        self
231    }
232
233    /// Number of frames in this batch.
234    #[must_use]
235    pub fn frame_count(&self) -> usize {
236        self.frames.len()
237    }
238
239    /// Whether this batch contains a commit frame (last frame has `db_size > 0`).
240    #[must_use]
241    pub fn has_commit_frame(&self) -> bool {
242        self.frames.last().is_some_and(|f| f.db_size_if_commit > 0)
243    }
244}
245
246// ---------------------------------------------------------------------------
247// Consolidation phase state machine
248// ---------------------------------------------------------------------------
249
250/// Phase of the consolidation protocol.
251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252pub enum ConsolidationPhase {
253    /// Accepting new frame batches. The first writer becomes the flusher.
254    Filling,
255    /// The flusher is writing all accumulated frames to the WAL and fsyncing.
256    Flushing,
257    /// All frames in this epoch are durable. Waiters may proceed.
258    Complete,
259}
260
261/// Outcome of submitting a transaction batch for consolidated writing.
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum SubmitOutcome {
264    /// This writer became the flusher and should call `flush_group`.
265    Flusher,
266    /// This writer's frames were accepted; it should wait for flush completion.
267    Waiter,
268}
269
270// ---------------------------------------------------------------------------
271// Lock-free phase histogram (bd-db300.3.8.1)
272// ---------------------------------------------------------------------------
273
274/// Fixed-capacity ring buffer for lock-free percentile estimation.
275///
276/// Writers atomically advance `write_idx` and store samples. Readers
277/// snapshot the ring and sort to extract percentiles. The ring overwrites
278/// old samples when full, giving a rolling window of the last N observations.
279/// This is intentionally simple — no locks, no allocations on the hot path.
280const PHASE_HISTOGRAM_CAPACITY: usize = 4096;
281
282pub struct PhaseHistogram {
283    /// Circular sample buffer.
284    samples: Box<[AtomicU64]>,
285    /// Monotonic write index (wraps via modulo).
286    write_idx: AtomicU64,
287    /// Running max (updated atomically on each record).
288    max_us: AtomicU64,
289    /// Total samples recorded (not capped by ring size).
290    count: AtomicU64,
291    /// Running sum for mean computation.
292    sum_us: AtomicU64,
293    /// Cheap decaying tail estimate for hot-path policy decisions.
294    recent_tail_us: AtomicU64,
295}
296
297impl PhaseHistogram {
298    #[must_use]
299    pub fn new() -> Self {
300        let samples: Vec<AtomicU64> = std::iter::repeat_with(|| AtomicU64::new(0))
301            .take(PHASE_HISTOGRAM_CAPACITY)
302            .collect();
303        Self {
304            samples: samples.into_boxed_slice(),
305            write_idx: AtomicU64::new(0),
306            max_us: AtomicU64::new(0),
307            count: AtomicU64::new(0),
308            sum_us: AtomicU64::new(0),
309            recent_tail_us: AtomicU64::new(0),
310        }
311    }
312
313    /// Record a single sample (microseconds). Lock-free.
314    pub fn record(&self, value_us: u64) {
315        let idx =
316            self.write_idx.fetch_add(1, Ordering::Relaxed) as usize % PHASE_HISTOGRAM_CAPACITY;
317        self.samples[idx].store(value_us, Ordering::Relaxed);
318        self.count.fetch_add(1, Ordering::Relaxed);
319        self.sum_us.fetch_add(value_us, Ordering::Relaxed);
320        // Update max.
321        let mut prev = self.max_us.load(Ordering::Relaxed);
322        while value_us > prev {
323            match self.max_us.compare_exchange_weak(
324                prev,
325                value_us,
326                Ordering::Relaxed,
327                Ordering::Relaxed,
328            ) {
329                Ok(_) => break,
330                Err(actual) => prev = actual,
331            }
332        }
333
334        // Hot-path consumers need a cheap tail-pressure signal but must not
335        // call `percentiles()`, which copies and sorts the whole ring. Maintain
336        // a conservative decaying maximum: spikes influence a few subsequent
337        // decisions, then fade without requiring a telemetry snapshot.
338        let mut prev_tail = self.recent_tail_us.load(Ordering::Relaxed);
339        loop {
340            let decayed = prev_tail.saturating_mul(15) / 16;
341            let next_tail = value_us.max(decayed);
342            match self.recent_tail_us.compare_exchange_weak(
343                prev_tail,
344                next_tail,
345                Ordering::Relaxed,
346                Ordering::Relaxed,
347            ) {
348                Ok(_) => break,
349                Err(actual) => prev_tail = actual,
350            }
351        }
352    }
353
354    /// Return a constant-time recent tail estimate for scheduler hot paths.
355    #[must_use]
356    pub fn recent_tail_us(&self) -> u64 {
357        self.recent_tail_us.load(Ordering::Relaxed)
358    }
359
360    /// Snapshot percentiles: returns (p50, p95, p99, max, count, mean_us).
361    #[must_use]
362    pub fn percentiles(&self) -> PhasePercentiles {
363        let total_count = self.count.load(Ordering::Relaxed);
364        let max = self.max_us.load(Ordering::Relaxed);
365        let sum = self.sum_us.load(Ordering::Relaxed);
366
367        if total_count == 0 {
368            return PhasePercentiles {
369                p50: 0,
370                p95: 0,
371                p99: 0,
372                max: 0,
373                count: 0,
374                mean_us: 0,
375            };
376        }
377
378        // Copy samples into a local vec and sort.
379        let n = total_count.min(PHASE_HISTOGRAM_CAPACITY as u64) as usize;
380        let mut buf = Vec::with_capacity(n);
381        for i in 0..n {
382            buf.push(self.samples[i].load(Ordering::Relaxed));
383        }
384        buf.sort_unstable();
385
386        let p = |pct: usize| -> u64 {
387            if buf.is_empty() {
388                return 0;
389            }
390            let idx = (pct * buf.len()) / 100;
391            buf[idx.min(buf.len() - 1)]
392        };
393
394        PhasePercentiles {
395            p50: p(50),
396            p95: p(95),
397            p99: p(99),
398            max,
399            count: total_count,
400            mean_us: sum / total_count,
401        }
402    }
403
404    /// Reset all samples and counters.
405    pub fn reset(&self) {
406        for s in &self.samples {
407            s.store(0, Ordering::Relaxed);
408        }
409        self.write_idx.store(0, Ordering::Relaxed);
410        self.max_us.store(0, Ordering::Relaxed);
411        self.count.store(0, Ordering::Relaxed);
412        self.sum_us.store(0, Ordering::Relaxed);
413        self.recent_tail_us.store(0, Ordering::Relaxed);
414    }
415}
416
417impl Default for PhaseHistogram {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423/// Percentile snapshot from a `PhaseHistogram`.
424#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
425pub struct PhasePercentiles {
426    pub p50: u64,
427    pub p95: u64,
428    pub p99: u64,
429    pub max: u64,
430    pub count: u64,
431    pub mean_us: u64,
432}
433
434// ---------------------------------------------------------------------------
435// Wake-reason accounting (bd-db300.3.8.1)
436// ---------------------------------------------------------------------------
437
438/// Tracks why a waiter woke up during epoch wait.
439pub struct WakeReasonCounters {
440    /// Woken by condvar notify (normal completion).
441    pub notify: AtomicU64,
442    /// Woken by condvar timeout.
443    pub timeout: AtomicU64,
444    /// Waiter took over as flusher (flusher died or slow).
445    pub flusher_takeover: AtomicU64,
446    /// Woken to observe a failed epoch.
447    pub failed_epoch: AtomicU64,
448    /// Woken but must busy-retry (spurious or race).
449    pub busy_retry: AtomicU64,
450}
451
452impl WakeReasonCounters {
453    const fn new() -> Self {
454        Self {
455            notify: AtomicU64::new(0),
456            timeout: AtomicU64::new(0),
457            flusher_takeover: AtomicU64::new(0),
458            failed_epoch: AtomicU64::new(0),
459            busy_retry: AtomicU64::new(0),
460        }
461    }
462
463    /// Snapshot all counters.
464    #[must_use]
465    pub fn snapshot(&self) -> WakeReasonSnapshot {
466        WakeReasonSnapshot {
467            notify: self.notify.load(Ordering::Relaxed),
468            timeout: self.timeout.load(Ordering::Relaxed),
469            flusher_takeover: self.flusher_takeover.load(Ordering::Relaxed),
470            failed_epoch: self.failed_epoch.load(Ordering::Relaxed),
471            busy_retry: self.busy_retry.load(Ordering::Relaxed),
472        }
473    }
474
475    /// Reset all counters.
476    pub fn reset(&self) {
477        self.notify.store(0, Ordering::Relaxed);
478        self.timeout.store(0, Ordering::Relaxed);
479        self.flusher_takeover.store(0, Ordering::Relaxed);
480        self.failed_epoch.store(0, Ordering::Relaxed);
481        self.busy_retry.store(0, Ordering::Relaxed);
482    }
483}
484
485/// Point-in-time snapshot of wake reasons.
486#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
487pub struct WakeReasonSnapshot {
488    pub notify: u64,
489    pub timeout: u64,
490    pub flusher_takeover: u64,
491    pub failed_epoch: u64,
492    pub busy_retry: u64,
493}
494
495impl WakeReasonSnapshot {
496    /// Total wake events.
497    #[must_use]
498    pub fn total(&self) -> u64 {
499        self.notify + self.timeout + self.flusher_takeover + self.failed_epoch + self.busy_retry
500    }
501}
502
503// ---------------------------------------------------------------------------
504// Consolidation metrics
505// ---------------------------------------------------------------------------
506
507/// Atomic counters for group commit consolidation observability.
508pub struct ConsolidationMetrics {
509    /// Total groups flushed.
510    pub groups_flushed: AtomicU64,
511    /// Total frames written via consolidated groups.
512    pub frames_consolidated: AtomicU64,
513    /// Total transactions batched.
514    pub transactions_batched: AtomicU64,
515    /// Total fsync operations (one per group).
516    pub fsyncs_total: AtomicU64,
517    /// Total time spent flushing (microseconds).
518    pub flush_duration_us_total: AtomicU64,
519    /// Total time writers spent waiting for flush (microseconds).
520    pub wait_duration_us_total: AtomicU64,
521    /// Maximum group size observed.
522    pub max_group_size_observed: AtomicU64,
523    /// Total busy retries during flush (exponential backoff).
524    pub busy_retries: AtomicU64,
525
526    // ── Phase timing instrumentation ──
527    /// Time building batch before entering consolidator (microseconds).
528    pub prepare_us_total: AtomicU64,
529    /// Time cloning staged pages into an owned group-commit batch (microseconds).
530    pub batch_build_us_total: AtomicU64,
531    /// Time pinning WAL conflict snapshot and attaching metadata (microseconds).
532    pub conflict_snapshot_us_total: AtomicU64,
533    /// Time spent preparing lane-local WAL frame bytes (microseconds).
534    pub lane_prepare_us_total: AtomicU64,
535    /// Time waiting to acquire consolidator.lock() (microseconds).
536    pub consolidator_lock_wait_us_total: AtomicU64,
537    /// Time waiting while consolidator phase == FLUSHING (microseconds).
538    pub consolidator_flushing_wait_us_total: AtomicU64,
539    /// Time flusher spends waiting for more batches (microseconds).
540    pub flusher_arrival_wait_us_total: AtomicU64,
541    /// Time waiting to acquire inner.lock() (microseconds).
542    pub inner_lock_wait_us_total: AtomicU64,
543    /// Time acquiring EXCLUSIVE file lock (microseconds).
544    pub exclusive_lock_us_total: AtomicU64,
545    /// Time in WAL append_frames (microseconds).
546    pub wal_append_us_total: AtomicU64,
547    /// Time preparing flusher frame refs and prepared batches (microseconds).
548    pub flush_frame_prep_us_total: AtomicU64,
549    /// Time checking stale WAL conflicts immediately before append (microseconds).
550    pub append_conflict_check_us_total: AtomicU64,
551    /// Time spent in the WAL append call itself (microseconds).
552    pub append_frames_us_total: AtomicU64,
553    /// Time in WAL sync/fsync (microseconds).
554    pub wal_sync_us_total: AtomicU64,
555    /// Time waiters spend waiting for epoch completion (microseconds).
556    pub waiter_epoch_wait_us_total: AtomicU64,
557    /// Count of commits that took flusher role.
558    pub flusher_commits: AtomicU64,
559    /// Count of commits that took waiter role.
560    pub waiter_commits: AtomicU64,
561    // ── Full commit path phase timing ──
562    /// Phase A: prepare under inner.lock (microseconds).
563    pub commit_phase_a_us_total: AtomicU64,
564    /// Phase B: WAL group commit (microseconds).
565    pub commit_phase_b_us_total: AtomicU64,
566    /// Phase C1: post-commit metadata under inner.lock (microseconds).
567    pub commit_phase_c1_us_total: AtomicU64,
568    /// Phase C2: publish to snapshot plane (microseconds).
569    pub commit_phase_c2_us_total: AtomicU64,
570    /// Total commits with phase timing recorded.
571    pub commit_phase_count: AtomicU64,
572
573    // ── Per-phase distribution histograms (bd-db300.3.8.1) ──
574    /// Distribution: consolidator lock wait.
575    pub hist_consolidator_lock_wait: PhaseHistogram,
576    /// Distribution: arrival wait (flusher only).
577    pub hist_arrival_wait: PhaseHistogram,
578    /// Distribution: WAL backend (inner) lock wait.
579    pub hist_wal_backend_lock_wait: PhaseHistogram,
580    /// Distribution: WAL append_frames.
581    pub hist_wal_append: PhaseHistogram,
582    /// Distribution: exclusive file lock acquisition.
583    pub hist_exclusive_lock: PhaseHistogram,
584    /// Distribution: waiter epoch wait.
585    pub hist_waiter_epoch_wait: PhaseHistogram,
586    /// Distribution: full Phase B (group commit path, flusher + waiter).
587    pub hist_phase_b: PhaseHistogram,
588    /// Distribution: WAL sync/fsync.
589    pub hist_wal_sync: PhaseHistogram,
590    /// Distribution: full commit (phase A + B + C).
591    pub hist_full_commit: PhaseHistogram,
592
593    // ── Wake-reason accounting (bd-db300.3.8.1) ──
594    /// Why waiters woke up during epoch wait.
595    pub wake_reasons: WakeReasonCounters,
596}
597
598impl ConsolidationMetrics {
599    /// Create zeroed metrics.
600    #[must_use]
601    pub fn new() -> Self {
602        Self {
603            groups_flushed: AtomicU64::new(0),
604            frames_consolidated: AtomicU64::new(0),
605            transactions_batched: AtomicU64::new(0),
606            fsyncs_total: AtomicU64::new(0),
607            flush_duration_us_total: AtomicU64::new(0),
608            wait_duration_us_total: AtomicU64::new(0),
609            max_group_size_observed: AtomicU64::new(0),
610            busy_retries: AtomicU64::new(0),
611            // Phase timing
612            prepare_us_total: AtomicU64::new(0),
613            batch_build_us_total: AtomicU64::new(0),
614            conflict_snapshot_us_total: AtomicU64::new(0),
615            lane_prepare_us_total: AtomicU64::new(0),
616            consolidator_lock_wait_us_total: AtomicU64::new(0),
617            consolidator_flushing_wait_us_total: AtomicU64::new(0),
618            flusher_arrival_wait_us_total: AtomicU64::new(0),
619            inner_lock_wait_us_total: AtomicU64::new(0),
620            exclusive_lock_us_total: AtomicU64::new(0),
621            wal_append_us_total: AtomicU64::new(0),
622            flush_frame_prep_us_total: AtomicU64::new(0),
623            append_conflict_check_us_total: AtomicU64::new(0),
624            append_frames_us_total: AtomicU64::new(0),
625            wal_sync_us_total: AtomicU64::new(0),
626            waiter_epoch_wait_us_total: AtomicU64::new(0),
627            flusher_commits: AtomicU64::new(0),
628            waiter_commits: AtomicU64::new(0),
629            commit_phase_a_us_total: AtomicU64::new(0),
630            commit_phase_b_us_total: AtomicU64::new(0),
631            commit_phase_c1_us_total: AtomicU64::new(0),
632            commit_phase_c2_us_total: AtomicU64::new(0),
633            commit_phase_count: AtomicU64::new(0),
634            // Phase histograms (bd-db300.3.8.1)
635            hist_consolidator_lock_wait: PhaseHistogram::new(),
636            hist_arrival_wait: PhaseHistogram::new(),
637            hist_wal_backend_lock_wait: PhaseHistogram::new(),
638            hist_wal_append: PhaseHistogram::new(),
639            hist_exclusive_lock: PhaseHistogram::new(),
640            hist_waiter_epoch_wait: PhaseHistogram::new(),
641            hist_phase_b: PhaseHistogram::new(),
642            hist_wal_sync: PhaseHistogram::new(),
643            hist_full_commit: PhaseHistogram::new(),
644            wake_reasons: WakeReasonCounters::new(),
645        }
646    }
647
648    /// Record a completed group flush.
649    pub fn record_flush(&self, frames: u64, transactions: u64, duration_us: u64) {
650        self.groups_flushed.fetch_add(1, Ordering::Relaxed);
651        self.frames_consolidated
652            .fetch_add(frames, Ordering::Relaxed);
653        self.transactions_batched
654            .fetch_add(transactions, Ordering::Relaxed);
655        self.fsyncs_total.fetch_add(1, Ordering::Relaxed);
656        self.flush_duration_us_total
657            .fetch_add(duration_us, Ordering::Relaxed);
658        // Update max group size.
659        self.max_group_size_observed
660            .fetch_max(frames, Ordering::Relaxed);
661    }
662
663    /// Record waiter wait time.
664    pub fn record_wait(&self, duration_us: u64) {
665        self.wait_duration_us_total
666            .fetch_add(duration_us, Ordering::Relaxed);
667    }
668
669    /// Record a flush retry triggered by a transient busy error.
670    pub fn record_busy_retry(&self) {
671        self.busy_retries.fetch_add(1, Ordering::Relaxed);
672    }
673
674    /// Record pre-queue commit preparation breakdown.
675    pub fn record_prepare_breakdown(
676        &self,
677        batch_build_us: u64,
678        conflict_snapshot_us: u64,
679        lane_prepare_us: u64,
680    ) {
681        self.batch_build_us_total
682            .fetch_add(batch_build_us, Ordering::Relaxed);
683        self.conflict_snapshot_us_total
684            .fetch_add(conflict_snapshot_us, Ordering::Relaxed);
685        self.lane_prepare_us_total
686            .fetch_add(lane_prepare_us, Ordering::Relaxed);
687    }
688
689    /// Record flusher-side frame preparation and append breakdown.
690    pub fn record_flush_breakdown(
691        &self,
692        flush_frame_prep_us: u64,
693        append_conflict_check_us: u64,
694        append_frames_us: u64,
695    ) {
696        self.flush_frame_prep_us_total
697            .fetch_add(flush_frame_prep_us, Ordering::Relaxed);
698        self.append_conflict_check_us_total
699            .fetch_add(append_conflict_check_us, Ordering::Relaxed);
700        self.append_frames_us_total
701            .fetch_add(append_frames_us, Ordering::Relaxed);
702    }
703
704    /// Record phase timing for a commit operation.
705    #[allow(clippy::too_many_arguments)]
706    pub fn record_phase_timing(
707        &self,
708        prepare_us: u64,
709        consolidator_lock_wait_us: u64,
710        consolidator_flushing_wait_us: u64,
711        is_flusher: bool,
712        flusher_arrival_wait_us: u64,
713        inner_lock_wait_us: u64,
714        exclusive_lock_us: u64,
715        wal_append_us: u64,
716        wal_sync_us: u64,
717        waiter_epoch_wait_us: u64,
718    ) {
719        self.prepare_us_total
720            .fetch_add(prepare_us, Ordering::Relaxed);
721        self.consolidator_lock_wait_us_total
722            .fetch_add(consolidator_lock_wait_us, Ordering::Relaxed);
723        self.consolidator_flushing_wait_us_total
724            .fetch_add(consolidator_flushing_wait_us, Ordering::Relaxed);
725
726        // bd-db300.3.8.1: record to per-phase histograms.
727        self.hist_consolidator_lock_wait
728            .record(consolidator_lock_wait_us);
729
730        if is_flusher {
731            self.flusher_arrival_wait_us_total
732                .fetch_add(flusher_arrival_wait_us, Ordering::Relaxed);
733            self.inner_lock_wait_us_total
734                .fetch_add(inner_lock_wait_us, Ordering::Relaxed);
735            self.exclusive_lock_us_total
736                .fetch_add(exclusive_lock_us, Ordering::Relaxed);
737            self.wal_append_us_total
738                .fetch_add(wal_append_us, Ordering::Relaxed);
739            self.wal_sync_us_total
740                .fetch_add(wal_sync_us, Ordering::Relaxed);
741            self.flusher_commits.fetch_add(1, Ordering::Relaxed);
742
743            // bd-db300.3.8.1: flusher-specific histograms.
744            self.hist_arrival_wait.record(flusher_arrival_wait_us);
745            self.hist_wal_backend_lock_wait.record(inner_lock_wait_us);
746            self.hist_wal_append.record(wal_append_us);
747            self.hist_exclusive_lock.record(exclusive_lock_us);
748            self.hist_wal_sync.record(wal_sync_us);
749        } else {
750            self.waiter_epoch_wait_us_total
751                .fetch_add(waiter_epoch_wait_us, Ordering::Relaxed);
752            self.waiter_commits.fetch_add(1, Ordering::Relaxed);
753
754            // bd-db300.3.8.1: waiter-specific histogram.
755            self.hist_waiter_epoch_wait.record(waiter_epoch_wait_us);
756        }
757
758        // bd-db300.3.8.1: Phase B total = consolidator_lock + flushing_wait + flusher/waiter work.
759        let phase_b_total = consolidator_lock_wait_us
760            + consolidator_flushing_wait_us
761            + if is_flusher {
762                flusher_arrival_wait_us
763                    + inner_lock_wait_us
764                    + exclusive_lock_us
765                    + wal_append_us
766                    + wal_sync_us
767            } else {
768                waiter_epoch_wait_us
769            };
770        self.hist_phase_b.record(phase_b_total);
771    }
772
773    /// Record full commit path phase timing.
774    pub fn record_commit_phases(
775        &self,
776        phase_a_us: u64,
777        phase_b_us: u64,
778        phase_c1_us: u64,
779        phase_c2_us: u64,
780    ) {
781        self.commit_phase_a_us_total
782            .fetch_add(phase_a_us, Ordering::Relaxed);
783        self.commit_phase_b_us_total
784            .fetch_add(phase_b_us, Ordering::Relaxed);
785        self.commit_phase_c1_us_total
786            .fetch_add(phase_c1_us, Ordering::Relaxed);
787        self.commit_phase_c2_us_total
788            .fetch_add(phase_c2_us, Ordering::Relaxed);
789        self.commit_phase_count.fetch_add(1, Ordering::Relaxed);
790
791        // bd-db300.3.8.1: full commit distribution.
792        self.hist_full_commit
793            .record(phase_a_us + phase_b_us + phase_c1_us + phase_c2_us);
794    }
795
796    /// Take a point-in-time snapshot.
797    #[must_use]
798    pub fn snapshot(&self) -> ConsolidationMetricsSnapshot {
799        ConsolidationMetricsSnapshot {
800            groups_flushed: self.groups_flushed.load(Ordering::Relaxed),
801            frames_consolidated: self.frames_consolidated.load(Ordering::Relaxed),
802            transactions_batched: self.transactions_batched.load(Ordering::Relaxed),
803            fsyncs_total: self.fsyncs_total.load(Ordering::Relaxed),
804            flush_duration_us_total: self.flush_duration_us_total.load(Ordering::Relaxed),
805            wait_duration_us_total: self.wait_duration_us_total.load(Ordering::Relaxed),
806            max_group_size_observed: self.max_group_size_observed.load(Ordering::Relaxed),
807            busy_retries: self.busy_retries.load(Ordering::Relaxed),
808            // Phase timing
809            prepare_us_total: self.prepare_us_total.load(Ordering::Relaxed),
810            batch_build_us_total: self.batch_build_us_total.load(Ordering::Relaxed),
811            conflict_snapshot_us_total: self.conflict_snapshot_us_total.load(Ordering::Relaxed),
812            lane_prepare_us_total: self.lane_prepare_us_total.load(Ordering::Relaxed),
813            consolidator_lock_wait_us_total: self
814                .consolidator_lock_wait_us_total
815                .load(Ordering::Relaxed),
816            consolidator_flushing_wait_us_total: self
817                .consolidator_flushing_wait_us_total
818                .load(Ordering::Relaxed),
819            flusher_arrival_wait_us_total: self
820                .flusher_arrival_wait_us_total
821                .load(Ordering::Relaxed),
822            inner_lock_wait_us_total: self.inner_lock_wait_us_total.load(Ordering::Relaxed),
823            exclusive_lock_us_total: self.exclusive_lock_us_total.load(Ordering::Relaxed),
824            wal_append_us_total: self.wal_append_us_total.load(Ordering::Relaxed),
825            flush_frame_prep_us_total: self.flush_frame_prep_us_total.load(Ordering::Relaxed),
826            append_conflict_check_us_total: self
827                .append_conflict_check_us_total
828                .load(Ordering::Relaxed),
829            append_frames_us_total: self.append_frames_us_total.load(Ordering::Relaxed),
830            wal_sync_us_total: self.wal_sync_us_total.load(Ordering::Relaxed),
831            waiter_epoch_wait_us_total: self.waiter_epoch_wait_us_total.load(Ordering::Relaxed),
832            flusher_commits: self.flusher_commits.load(Ordering::Relaxed),
833            waiter_commits: self.waiter_commits.load(Ordering::Relaxed),
834            commit_phase_a_us_total: self.commit_phase_a_us_total.load(Ordering::Relaxed),
835            commit_phase_b_us_total: self.commit_phase_b_us_total.load(Ordering::Relaxed),
836            commit_phase_c1_us_total: self.commit_phase_c1_us_total.load(Ordering::Relaxed),
837            commit_phase_c2_us_total: self.commit_phase_c2_us_total.load(Ordering::Relaxed),
838            commit_phase_count: self.commit_phase_count.load(Ordering::Relaxed),
839            // Per-phase distributions (bd-db300.3.8.1)
840            hist_consolidator_lock_wait: self.hist_consolidator_lock_wait.percentiles(),
841            hist_arrival_wait: self.hist_arrival_wait.percentiles(),
842            hist_wal_backend_lock_wait: self.hist_wal_backend_lock_wait.percentiles(),
843            hist_wal_append: self.hist_wal_append.percentiles(),
844            hist_exclusive_lock: self.hist_exclusive_lock.percentiles(),
845            hist_waiter_epoch_wait: self.hist_waiter_epoch_wait.percentiles(),
846            hist_phase_b: self.hist_phase_b.percentiles(),
847            hist_wal_sync: self.hist_wal_sync.percentiles(),
848            hist_full_commit: self.hist_full_commit.percentiles(),
849            wake_reasons: self.wake_reasons.snapshot(),
850        }
851    }
852
853    /// Reset all counters to zero.
854    pub fn reset(&self) {
855        self.groups_flushed.store(0, Ordering::Relaxed);
856        self.frames_consolidated.store(0, Ordering::Relaxed);
857        self.transactions_batched.store(0, Ordering::Relaxed);
858        self.fsyncs_total.store(0, Ordering::Relaxed);
859        self.flush_duration_us_total.store(0, Ordering::Relaxed);
860        self.wait_duration_us_total.store(0, Ordering::Relaxed);
861        self.max_group_size_observed.store(0, Ordering::Relaxed);
862        self.busy_retries.store(0, Ordering::Relaxed);
863        // Phase timing
864        self.prepare_us_total.store(0, Ordering::Relaxed);
865        self.batch_build_us_total.store(0, Ordering::Relaxed);
866        self.conflict_snapshot_us_total.store(0, Ordering::Relaxed);
867        self.lane_prepare_us_total.store(0, Ordering::Relaxed);
868        self.consolidator_lock_wait_us_total
869            .store(0, Ordering::Relaxed);
870        self.consolidator_flushing_wait_us_total
871            .store(0, Ordering::Relaxed);
872        self.flusher_arrival_wait_us_total
873            .store(0, Ordering::Relaxed);
874        self.inner_lock_wait_us_total.store(0, Ordering::Relaxed);
875        self.exclusive_lock_us_total.store(0, Ordering::Relaxed);
876        self.wal_append_us_total.store(0, Ordering::Relaxed);
877        self.flush_frame_prep_us_total.store(0, Ordering::Relaxed);
878        self.append_conflict_check_us_total
879            .store(0, Ordering::Relaxed);
880        self.append_frames_us_total.store(0, Ordering::Relaxed);
881        self.wal_sync_us_total.store(0, Ordering::Relaxed);
882        self.waiter_epoch_wait_us_total.store(0, Ordering::Relaxed);
883        self.flusher_commits.store(0, Ordering::Relaxed);
884        self.waiter_commits.store(0, Ordering::Relaxed);
885        self.commit_phase_a_us_total.store(0, Ordering::Relaxed);
886        self.commit_phase_b_us_total.store(0, Ordering::Relaxed);
887        self.commit_phase_c1_us_total.store(0, Ordering::Relaxed);
888        self.commit_phase_c2_us_total.store(0, Ordering::Relaxed);
889        self.commit_phase_count.store(0, Ordering::Relaxed);
890        // Histograms and wake reasons (bd-db300.3.8.1)
891        self.hist_consolidator_lock_wait.reset();
892        self.hist_arrival_wait.reset();
893        self.hist_wal_backend_lock_wait.reset();
894        self.hist_wal_append.reset();
895        self.hist_exclusive_lock.reset();
896        self.hist_waiter_epoch_wait.reset();
897        self.hist_phase_b.reset();
898        self.hist_wal_sync.reset();
899        self.hist_full_commit.reset();
900        self.wake_reasons.reset();
901    }
902}
903
904impl Default for ConsolidationMetrics {
905    fn default() -> Self {
906        Self::new()
907    }
908}
909
910/// Point-in-time snapshot of consolidation metrics.
911#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
912pub struct ConsolidationMetricsSnapshot {
913    pub groups_flushed: u64,
914    pub frames_consolidated: u64,
915    pub transactions_batched: u64,
916    pub fsyncs_total: u64,
917    pub flush_duration_us_total: u64,
918    pub wait_duration_us_total: u64,
919    pub max_group_size_observed: u64,
920    pub busy_retries: u64,
921    // Phase timing (all in microseconds)
922    pub prepare_us_total: u64,
923    pub batch_build_us_total: u64,
924    pub conflict_snapshot_us_total: u64,
925    pub lane_prepare_us_total: u64,
926    pub consolidator_lock_wait_us_total: u64,
927    pub consolidator_flushing_wait_us_total: u64,
928    pub flusher_arrival_wait_us_total: u64,
929    pub inner_lock_wait_us_total: u64,
930    pub exclusive_lock_us_total: u64,
931    pub wal_append_us_total: u64,
932    pub flush_frame_prep_us_total: u64,
933    pub append_conflict_check_us_total: u64,
934    pub append_frames_us_total: u64,
935    pub wal_sync_us_total: u64,
936    pub waiter_epoch_wait_us_total: u64,
937    pub flusher_commits: u64,
938    pub waiter_commits: u64,
939    // Full commit path phases
940    pub commit_phase_a_us_total: u64,
941    pub commit_phase_b_us_total: u64,
942    pub commit_phase_c1_us_total: u64,
943    pub commit_phase_c2_us_total: u64,
944    pub commit_phase_count: u64,
945    // ── Per-phase distributions (bd-db300.3.8.1) ──
946    pub hist_consolidator_lock_wait: PhasePercentiles,
947    pub hist_arrival_wait: PhasePercentiles,
948    pub hist_wal_backend_lock_wait: PhasePercentiles,
949    pub hist_wal_append: PhasePercentiles,
950    pub hist_exclusive_lock: PhasePercentiles,
951    pub hist_waiter_epoch_wait: PhasePercentiles,
952    pub hist_phase_b: PhasePercentiles,
953    pub hist_wal_sync: PhasePercentiles,
954    pub hist_full_commit: PhasePercentiles,
955    // ── Wake reasons (bd-db300.3.8.1) ──
956    pub wake_reasons: WakeReasonSnapshot,
957}
958
959impl ConsolidationMetricsSnapshot {
960    /// Average frames per group, or 0 if no groups flushed.
961    #[must_use]
962    pub fn avg_group_size(&self) -> u64 {
963        self.frames_consolidated
964            .checked_div(self.groups_flushed)
965            .unwrap_or(0)
966    }
967
968    /// Average transactions per group, or 0 if no groups flushed.
969    #[must_use]
970    pub fn avg_transactions_per_group(&self) -> u64 {
971        self.transactions_batched
972            .checked_div(self.groups_flushed)
973            .unwrap_or(0)
974    }
975
976    /// Average flush duration in microseconds, or 0 if no groups flushed.
977    #[must_use]
978    pub fn avg_flush_duration_us(&self) -> u64 {
979        self.flush_duration_us_total
980            .checked_div(self.groups_flushed)
981            .unwrap_or(0)
982    }
983
984    /// Fsync reduction ratio: transactions_batched / fsyncs_total.
985    ///
986    /// Without group commit, each transaction needs its own fsync.
987    /// With group commit, N transactions share 1 fsync.
988    #[must_use]
989    pub fn fsync_reduction_ratio(&self) -> u64 {
990        self.transactions_batched
991            .checked_div(self.fsyncs_total)
992            .unwrap_or(0)
993    }
994
995    /// Total commits (flusher + waiter).
996    #[must_use]
997    pub fn total_commits(&self) -> u64 {
998        self.flusher_commits.saturating_add(self.waiter_commits)
999    }
1000
1001    /// Average prepare time per commit (microseconds).
1002    #[must_use]
1003    pub fn avg_prepare_us(&self) -> u64 {
1004        self.prepare_us_total
1005            .checked_div(self.total_commits())
1006            .unwrap_or(0)
1007    }
1008
1009    /// Average consolidator lock wait per commit (microseconds).
1010    #[must_use]
1011    pub fn avg_consolidator_lock_wait_us(&self) -> u64 {
1012        self.consolidator_lock_wait_us_total
1013            .checked_div(self.total_commits())
1014            .unwrap_or(0)
1015    }
1016
1017    /// Average WAL I/O time per flusher (microseconds).
1018    #[must_use]
1019    pub fn avg_wal_io_us(&self) -> u64 {
1020        self.wal_append_us_total
1021            .saturating_add(self.wal_sync_us_total)
1022            .checked_div(self.flusher_commits)
1023            .unwrap_or(0)
1024    }
1025
1026    /// Average waiter epoch wait time (microseconds).
1027    #[must_use]
1028    pub fn avg_waiter_wait_us(&self) -> u64 {
1029        self.waiter_epoch_wait_us_total
1030            .checked_div(self.waiter_commits)
1031            .unwrap_or(0)
1032    }
1033
1034    // ── bd-db300.3.8.2: lock-wait vs WAL-service split ──
1035
1036    /// Total flusher lock-wait time (inner_lock + exclusive_lock +
1037    /// flushing_wait), microseconds. This is time spent WAITING to acquire
1038    /// the WAL backend write path, NOT doing WAL I/O.
1039    #[must_use]
1040    pub fn flusher_lock_wait_us_total(&self) -> u64 {
1041        self.inner_lock_wait_us_total
1042            .saturating_add(self.exclusive_lock_us_total)
1043            .saturating_add(self.consolidator_flushing_wait_us_total)
1044    }
1045
1046    /// Total WAL service time (append + sync), microseconds. This is time
1047    /// spent doing actual WAL I/O AFTER acquiring the write lock.
1048    #[must_use]
1049    pub fn wal_service_us_total(&self) -> u64 {
1050        self.wal_append_us_total
1051            .saturating_add(self.wal_sync_us_total)
1052    }
1053
1054    /// Lock-wait fraction of total flusher phase-B time (0.0–1.0).
1055    /// Values > 0.5 indicate the regime is lock-topology-limited, not
1056    /// I/O-limited.
1057    #[must_use]
1058    #[allow(clippy::cast_precision_loss)]
1059    pub fn flusher_lock_wait_fraction(&self) -> f64 {
1060        let lock = self.flusher_lock_wait_us_total();
1061        let service = self.wal_service_us_total();
1062        let total = lock.saturating_add(service);
1063        if total == 0 {
1064            return 0.0;
1065        }
1066        lock as f64 / total as f64
1067    }
1068
1069    /// Whether the flusher is lock-topology-limited (lock wait > service).
1070    #[must_use]
1071    pub fn is_lock_topology_limited(&self) -> bool {
1072        self.flusher_lock_wait_us_total() > self.wal_service_us_total()
1073    }
1074
1075    /// Generate detailed phase timing report.
1076    #[must_use]
1077    pub fn phase_timing_report(&self) -> String {
1078        let total = self.total_commits();
1079        if total == 0 {
1080            return "no commits".to_string();
1081        }
1082
1083        // Calculate per-commit averages
1084        let avg_prepare = self.avg_prepare_us();
1085        let avg_consol_lock = self.avg_consolidator_lock_wait_us();
1086        let avg_flushing_wait = self
1087            .consolidator_flushing_wait_us_total
1088            .checked_div(total)
1089            .unwrap_or(0);
1090
1091        // Flusher-only metrics (per flusher)
1092        let avg_arrival_wait = self
1093            .flusher_arrival_wait_us_total
1094            .checked_div(self.flusher_commits)
1095            .unwrap_or(0);
1096        let avg_inner_lock = self
1097            .inner_lock_wait_us_total
1098            .checked_div(self.flusher_commits)
1099            .unwrap_or(0);
1100        let avg_excl_lock = self
1101            .exclusive_lock_us_total
1102            .checked_div(self.flusher_commits)
1103            .unwrap_or(0);
1104        let avg_append = self
1105            .wal_append_us_total
1106            .checked_div(self.flusher_commits)
1107            .unwrap_or(0);
1108        let avg_sync = self
1109            .wal_sync_us_total
1110            .checked_div(self.flusher_commits)
1111            .unwrap_or(0);
1112
1113        // Waiter-only metrics
1114        let avg_epoch_wait = self.avg_waiter_wait_us();
1115
1116        format!(
1117            "commits: {} (flusher={}, waiter={})\n\
1118             per-commit avg:\n\
1119             ├─ prepare: {}µs\n\
1120             ├─ consolidator_lock_wait: {}µs\n\
1121             ├─ flushing_wait: {}µs\n\
1122             flusher path ({} commits):\n\
1123             ├─ arrival_wait: {}µs\n\
1124             ├─ inner_lock_wait: {}µs\n\
1125             ├─ exclusive_lock: {}µs\n\
1126             ├─ wal_append: {}µs\n\
1127             └─ wal_sync: {}µs (total WAL I/O: {}µs)\n\
1128             waiter path ({} commits):\n\
1129             └─ epoch_wait: {}µs\n\
1130             full commit path ({} commits):\n\
1131             ├─ phase_A (prepare+inner.lock): {}µs\n\
1132             ├─ phase_B (group_commit): {}µs\n\
1133             ├─ phase_C1 (post-commit+inner.lock): {}µs\n\
1134             └─ phase_C2 (publish): {}µs",
1135            total,
1136            self.flusher_commits,
1137            self.waiter_commits,
1138            avg_prepare,
1139            avg_consol_lock,
1140            avg_flushing_wait,
1141            self.flusher_commits,
1142            avg_arrival_wait,
1143            avg_inner_lock,
1144            avg_excl_lock,
1145            avg_append,
1146            avg_sync,
1147            avg_append + avg_sync,
1148            self.waiter_commits,
1149            avg_epoch_wait,
1150            self.commit_phase_count,
1151            self.commit_phase_a_us_total
1152                .checked_div(self.commit_phase_count)
1153                .unwrap_or(0),
1154            self.commit_phase_b_us_total
1155                .checked_div(self.commit_phase_count)
1156                .unwrap_or(0),
1157            self.commit_phase_c1_us_total
1158                .checked_div(self.commit_phase_count)
1159                .unwrap_or(0),
1160            self.commit_phase_c2_us_total
1161                .checked_div(self.commit_phase_count)
1162                .unwrap_or(0),
1163        )
1164    }
1165}
1166
1167impl std::fmt::Display for ConsolidationMetricsSnapshot {
1168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1169        write!(
1170            f,
1171            "groups={} frames={} txns={} fsyncs={} avg_group={} \
1172             avg_flush_us={} max_group={} busy_retries={} reduction={}x",
1173            self.groups_flushed,
1174            self.frames_consolidated,
1175            self.transactions_batched,
1176            self.fsyncs_total,
1177            self.avg_group_size(),
1178            self.avg_flush_duration_us(),
1179            self.max_group_size_observed,
1180            self.busy_retries,
1181            self.fsync_reduction_ratio(),
1182        )
1183    }
1184}
1185
1186/// Global consolidation metrics singleton.
1187pub static GLOBAL_CONSOLIDATION_METRICS: LazyLock<ConsolidationMetrics> =
1188    LazyLock::new(ConsolidationMetrics::new);
1189
1190#[cfg(test)]
1191pub(crate) static GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK: LazyLock<std::sync::Mutex<()>> =
1192    LazyLock::new(|| std::sync::Mutex::new(()));
1193
1194// ---------------------------------------------------------------------------
1195// Group commit consolidator (single-threaded core)
1196// ---------------------------------------------------------------------------
1197
1198/// The group commit consolidator accumulates frame batches from concurrent
1199/// writers and flushes them to the WAL file in consolidated groups.
1200///
1201/// This struct manages the FILLING→FLUSHING→COMPLETE state machine.
1202/// It is designed to be held behind a `Mutex` and accessed by concurrent
1203/// writers through [`GroupCommitQueue`].
1204#[derive(Debug)]
1205pub struct GroupCommitConsolidator {
1206    /// Current consolidation phase.
1207    phase: ConsolidationPhase,
1208    /// Accumulated frame batches in the current FILLING phase.
1209    pending_batches: VecDeque<TransactionFrameBatch>,
1210    /// Total number of frames across all pending batches.
1211    pending_frame_count: usize,
1212    /// Configuration.
1213    config: GroupCommitConfig,
1214    /// When the current FILLING phase started (for max_group_delay).
1215    filling_started: Option<Instant>,
1216    /// Monotonic epoch counter: incremented once per group flush.
1217    epoch: u64,
1218    /// Number of completed flush results awaiting pickup by waiters.
1219    completed_epoch: u64,
1220    /// Epoch pipelining: batches submitted during FLUSHING phase, queued
1221    /// for the next epoch. This eliminates the flushing_wait bottleneck —
1222    /// threads never block waiting for a flush to complete.
1223    next_epoch_batches: VecDeque<TransactionFrameBatch>,
1224    /// Total frames across next_epoch_batches.
1225    next_epoch_frame_count: usize,
1226    /// Whether a promoted epoch in FILLING currently has pending work but no
1227    /// explicitly claimed flusher because the previous flusher may have stopped.
1228    promoted_epoch_flusher_vacant: bool,
1229}
1230
1231impl GroupCommitConsolidator {
1232    /// Create a new consolidator with the given configuration.
1233    #[must_use]
1234    pub fn new(config: GroupCommitConfig) -> Self {
1235        let config = config.validated();
1236        Self {
1237            phase: ConsolidationPhase::Filling,
1238            pending_batches: VecDeque::new(),
1239            pending_frame_count: 0,
1240            config,
1241            filling_started: None,
1242            epoch: 0,
1243            completed_epoch: 0,
1244            next_epoch_batches: VecDeque::new(),
1245            next_epoch_frame_count: 0,
1246            promoted_epoch_flusher_vacant: false,
1247        }
1248    }
1249
1250    /// Current consolidation phase.
1251    #[must_use]
1252    pub const fn phase(&self) -> ConsolidationPhase {
1253        self.phase
1254    }
1255
1256    /// Current epoch.
1257    #[must_use]
1258    pub const fn epoch(&self) -> u64 {
1259        self.epoch
1260    }
1261
1262    /// Maximum time a batch may remain in the filling epoch before flush.
1263    #[must_use]
1264    pub const fn max_group_delay(&self) -> Duration {
1265        self.config.max_group_delay
1266    }
1267
1268    /// Number of pending frames in the current FILLING phase.
1269    #[must_use]
1270    pub const fn pending_frame_count(&self) -> usize {
1271        self.pending_frame_count
1272    }
1273
1274    /// Number of pending transaction batches.
1275    #[must_use]
1276    pub fn pending_batch_count(&self) -> usize {
1277        self.pending_batches.len()
1278    }
1279
1280    /// Submit a transaction's frame batch for consolidation.
1281    ///
1282    /// Returns `Flusher` if this writer should call `flush_group`, or
1283    /// `Waiter` if this writer should wait for the flush to complete.
1284    ///
1285    /// # Errors
1286    ///
1287    /// Returns `Err` if the consolidator is in an unexpected phase.
1288    pub fn submit_batch(&mut self, batch: TransactionFrameBatch) -> Result<SubmitOutcome> {
1289        // ── Epoch pipelining: accept submissions during FLUSHING ──
1290        // Instead of blocking, queue batches for the next epoch. This
1291        // eliminates the flushing_wait bottleneck entirely — threads
1292        // never block waiting for a flush to complete.
1293        if self.phase == ConsolidationPhase::Flushing {
1294            self.next_epoch_frame_count += batch.frame_count();
1295            self.next_epoch_batches.push_back(batch);
1296
1297            trace!(
1298                target: "fsqlite_wal::group_commit",
1299                epoch = self.epoch,
1300                next_epoch_frames = self.next_epoch_frame_count,
1301                next_epoch_batches = self.next_epoch_batches.len(),
1302                "batch pipelined for next epoch (submitted during FLUSHING)"
1303            );
1304
1305            // Always a Waiter — the next epoch's flusher will be elected
1306            // when complete_flush() promotes these batches.
1307            return Ok(SubmitOutcome::Waiter);
1308        }
1309
1310        // If we're in COMPLETE, transition to new FILLING epoch.
1311        if self.phase == ConsolidationPhase::Complete {
1312            self.transition_to_filling();
1313        }
1314
1315        let is_first = self.pending_batches.is_empty();
1316
1317        if is_first {
1318            self.filling_started = Some(Instant::now());
1319            self.promoted_epoch_flusher_vacant = false;
1320        }
1321
1322        self.pending_frame_count += batch.frame_count();
1323        self.pending_batches.push_back(batch);
1324
1325        let outcome = if is_first {
1326            SubmitOutcome::Flusher
1327        } else {
1328            SubmitOutcome::Waiter
1329        };
1330
1331        trace!(
1332            target: "fsqlite_wal::group_commit",
1333            epoch = self.epoch,
1334            pending_frames = self.pending_frame_count,
1335            pending_batches = self.pending_batches.len(),
1336            outcome = ?outcome,
1337            "batch submitted"
1338        );
1339
1340        Ok(outcome)
1341    }
1342
1343    /// Check whether the flusher should flush now.
1344    ///
1345    /// Returns `true` if:
1346    /// - The batch is full (`pending_frame_count >= max_group_size`), OR
1347    /// - The max group delay has been exceeded.
1348    #[must_use]
1349    pub fn should_flush_now(&self) -> bool {
1350        if self.pending_frame_count >= self.config.max_group_size {
1351            return true;
1352        }
1353        if let Some(started) = self.filling_started {
1354            if started.elapsed() >= self.config.max_group_delay {
1355                return true;
1356            }
1357        }
1358        false
1359    }
1360
1361    /// Time remaining before the flusher must flush (for sleep/wait).
1362    #[must_use]
1363    pub fn time_until_flush(&self) -> Duration {
1364        if self.pending_frame_count >= self.config.max_group_size {
1365            return Duration::ZERO;
1366        }
1367        self.filling_started
1368            .map_or(self.config.max_group_delay, |started| {
1369                self.config
1370                    .max_group_delay
1371                    .saturating_sub(started.elapsed())
1372            })
1373    }
1374
1375    /// Age of the current FILLING epoch from the first submitted batch.
1376    #[must_use]
1377    pub fn fill_age(&self) -> Duration {
1378        self.filling_started
1379            .map_or(Duration::ZERO, |started| started.elapsed())
1380    }
1381
1382    /// Transition to FLUSHING phase and take ownership of the pending batches.
1383    ///
1384    /// Returns the batches to be written and the page size needed for
1385    /// frame construction.
1386    ///
1387    /// # Errors
1388    ///
1389    /// Returns `Err` if not in FILLING phase.
1390    pub fn begin_flush(&mut self) -> Result<Vec<TransactionFrameBatch>> {
1391        if self.phase != ConsolidationPhase::Filling {
1392            return Err(FrankenError::Internal(format!(
1393                "begin_flush called in {:?} phase, expected Filling",
1394                self.phase
1395            )));
1396        }
1397
1398        self.phase = ConsolidationPhase::Flushing;
1399        self.promoted_epoch_flusher_vacant = false;
1400        self.epoch += 1;
1401
1402        let batches: Vec<_> = self.pending_batches.drain(..).collect();
1403        let frame_count = self.pending_frame_count;
1404        self.pending_frame_count = 0;
1405
1406        debug!(
1407            target: "fsqlite_wal::group_commit",
1408            epoch = self.epoch,
1409            batches = batches.len(),
1410            frames = frame_count,
1411            "begin_flush: FILLING → FLUSHING"
1412        );
1413
1414        Ok(batches)
1415    }
1416
1417    /// Mark the current flush as complete. Waiters can now proceed.
1418    ///
1419    /// # Errors
1420    ///
1421    /// Returns `Err` if not in FLUSHING phase.
1422    /// Returns `true` if pipelined batches were promoted and the caller
1423    /// should flush again. If the original flusher does not continue,
1424    /// a fresh submitter may explicitly claim the promoted epoch via
1425    /// [`Self::claim_flusher_vacancy`].
1426    pub fn complete_flush(&mut self) -> Result<bool> {
1427        if self.phase != ConsolidationPhase::Flushing {
1428            return Err(FrankenError::Internal(format!(
1429                "complete_flush called in {:?} phase, expected Flushing",
1430                self.phase
1431            )));
1432        }
1433
1434        self.completed_epoch = self.epoch;
1435        self.filling_started = None;
1436
1437        // ── Epoch pipelining: promote next-epoch batches ──
1438        // If threads submitted during FLUSHING, their batches are in
1439        // next_epoch_batches. Promote them to pending_batches and
1440        // transition directly to FILLING (skipping COMPLETE) so the
1441        // current flusher can immediately begin_flush() again.
1442        if self.next_epoch_batches.is_empty() {
1443            self.phase = ConsolidationPhase::Complete;
1444            self.promoted_epoch_flusher_vacant = false;
1445            debug!(
1446                target: "fsqlite_wal::group_commit",
1447                epoch = self.epoch,
1448                "complete_flush: FLUSHING → COMPLETE"
1449            );
1450            Ok(false)
1451        } else {
1452            let promoted_count = self.next_epoch_batches.len();
1453            let promoted_frames = self.next_epoch_frame_count;
1454            self.pending_batches = std::mem::take(&mut self.next_epoch_batches);
1455            self.pending_frame_count = self.next_epoch_frame_count;
1456            self.next_epoch_frame_count = 0;
1457            self.phase = ConsolidationPhase::Filling;
1458            self.filling_started = Some(Instant::now());
1459            self.promoted_epoch_flusher_vacant = true;
1460
1461            debug!(
1462                target: "fsqlite_wal::group_commit",
1463                epoch = self.epoch,
1464                promoted_batches = promoted_count,
1465                promoted_frames = promoted_frames,
1466                "complete_flush: FLUSHING → FILLING (epoch pipelining)"
1467            );
1468            Ok(true) // Caller must flush again
1469        }
1470    }
1471
1472    /// Whether pipelined batches are waiting for the next epoch.
1473    #[must_use]
1474    pub fn has_pipelined_batches(&self) -> bool {
1475        !self.next_epoch_batches.is_empty()
1476    }
1477
1478    /// Whether a promoted epoch in `Filling` currently needs an explicit
1479    /// flusher claim before a replacement flusher takes over.
1480    #[must_use]
1481    pub const fn has_flusher_vacancy(&self) -> bool {
1482        self.promoted_epoch_flusher_vacant
1483    }
1484
1485    /// Claim the promoted-epoch flusher vacancy after the original flusher
1486    /// stopped before calling `begin_flush()` again.
1487    ///
1488    /// Returns `true` if the caller successfully claimed the vacancy.
1489    #[must_use]
1490    pub fn claim_flusher_vacancy(&mut self) -> bool {
1491        if self.phase == ConsolidationPhase::Filling
1492            && self.promoted_epoch_flusher_vacant
1493            && !self.pending_batches.is_empty()
1494        {
1495            self.promoted_epoch_flusher_vacant = false;
1496            return true;
1497        }
1498        false
1499    }
1500
1501    /// Abort the current flush after the flusher observed an I/O error.
1502    ///
1503    /// This transitions the state machine out of `Flushing` so waiters can be
1504    /// released with the epoch-level failure published by the caller.
1505    ///
1506    /// # Errors
1507    ///
1508    /// Returns `Err` if not in `Flushing` phase.
1509    pub fn abort_flush(&mut self) -> Result<()> {
1510        if self.phase != ConsolidationPhase::Flushing {
1511            return Err(FrankenError::Internal(format!(
1512                "abort_flush called in {:?} phase, expected Flushing",
1513                self.phase
1514            )));
1515        }
1516
1517        // On abort, promote pipelined batches the same way as
1518        // complete_flush — those transactions weren't part of the
1519        // failed flush, so they should be retried in the next epoch.
1520        if self.next_epoch_batches.is_empty() {
1521            self.phase = ConsolidationPhase::Complete;
1522            self.filling_started = None;
1523            self.promoted_epoch_flusher_vacant = false;
1524        } else {
1525            self.pending_batches = std::mem::take(&mut self.next_epoch_batches);
1526            self.pending_frame_count = self.next_epoch_frame_count;
1527            self.next_epoch_frame_count = 0;
1528            self.phase = ConsolidationPhase::Filling;
1529            self.filling_started = Some(Instant::now());
1530            self.promoted_epoch_flusher_vacant = true;
1531            // Keep filling_started set — promoted batches need the timeout
1532        }
1533
1534        debug!(
1535            target: "fsqlite_wal::group_commit",
1536            epoch = self.epoch,
1537            "abort_flush: FLUSHING → {:?}",
1538            self.phase
1539        );
1540
1541        Ok(())
1542    }
1543
1544    /// Transition from COMPLETE to FILLING for the next epoch.
1545    fn transition_to_filling(&mut self) {
1546        self.phase = ConsolidationPhase::Filling;
1547        self.filling_started = None;
1548        self.promoted_epoch_flusher_vacant = false;
1549        trace!(
1550            target: "fsqlite_wal::group_commit",
1551            epoch = self.epoch,
1552            "COMPLETE → FILLING"
1553        );
1554    }
1555
1556    /// The completed epoch counter (for waiter synchronization).
1557    #[must_use]
1558    pub const fn completed_epoch(&self) -> u64 {
1559        self.completed_epoch
1560    }
1561}
1562
1563// ---------------------------------------------------------------------------
1564// Batch frame writer
1565// ---------------------------------------------------------------------------
1566
1567/// Write a consolidated batch of frames to the WAL file.
1568///
1569/// Serializes all frames into a single contiguous buffer and writes it
1570/// in one `write` call, then fsyncs. This amortizes syscall overhead
1571/// and ensures all frames in the group become durable atomically.
1572///
1573/// Updates the WAL file's `running_checksum` and `frame_count` for each
1574/// frame in the batch, maintaining the checksum chain invariant.
1575///
1576/// Returns the number of frames written.
1577pub fn write_consolidated_frames<F: VfsFile>(
1578    cx: &Cx,
1579    wal: &mut WalFile<F>,
1580    batches: &[TransactionFrameBatch],
1581) -> Result<usize> {
1582    let frame_size = wal.frame_size();
1583    let total_frames: usize = batches.iter().map(TransactionFrameBatch::frame_count).sum();
1584    if total_frames == 0 {
1585        return Ok(0);
1586    }
1587
1588    let total_bytes = total_frames
1589        .checked_mul(frame_size)
1590        .ok_or_else(|| FrankenError::Internal("frame batch size overflow".to_owned()))?;
1591    let frame_refs = batches.iter().flat_map(|batch| {
1592        batch.frames.iter().map(|frame| WalAppendFrameRef {
1593            page_number: frame.page_number,
1594            page_data: &frame.page_data,
1595            db_size_if_commit: frame.db_size_if_commit,
1596        })
1597    });
1598
1599    let span = tracing::info_span!(
1600        target: "fsqlite_wal::group_commit",
1601        "consolidated_write",
1602        total_frames,
1603        total_bytes,
1604        batches = batches.len(),
1605    );
1606    let _guard = span.enter();
1607
1608    wal.append_frame_iter(cx, total_frames, frame_refs)?;
1609    wal.durable_sync(cx, SyncKind::FullDurable)?;
1610    let bytes_written = u64::try_from(total_bytes).unwrap_or(u64::MAX);
1611
1612    info!(
1613        target: "fsqlite_wal::group_commit",
1614        frames_written = total_frames,
1615        bytes_written,
1616        batches = batches.len(),
1617        "consolidated write + fsync complete"
1618    );
1619
1620    Ok(total_frames)
1621}
1622
1623// ---------------------------------------------------------------------------
1624// Tests
1625// ---------------------------------------------------------------------------
1626
1627#[cfg(test)]
1628mod tests {
1629    use fsqlite_types::flags::VfsOpenFlags;
1630    use fsqlite_vfs::MemoryVfs;
1631    use fsqlite_vfs::traits::Vfs;
1632
1633    use super::*;
1634    use crate::checksum::WalSalts;
1635
1636    const PAGE_SIZE: u32 = 4096;
1637
1638    fn test_cx() -> Cx {
1639        Cx::default()
1640    }
1641
1642    fn test_salts() -> WalSalts {
1643        WalSalts {
1644            salt1: 0xDEAD_BEEF,
1645            salt2: 0xCAFE_BABE,
1646        }
1647    }
1648
1649    fn sample_page(seed: u8) -> Vec<u8> {
1650        let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
1651        let mut page = vec![0u8; page_size];
1652        for (i, byte) in page.iter_mut().enumerate() {
1653            let reduced = u8::try_from(i % 251).expect("modulo fits u8");
1654            *byte = reduced ^ seed;
1655        }
1656        page
1657    }
1658
1659    fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
1660        let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
1661        let (file, _) = vfs
1662            .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
1663            .expect("open WAL file");
1664        file
1665    }
1666
1667    struct ResetGlobalConsolidationMetrics;
1668
1669    impl Drop for ResetGlobalConsolidationMetrics {
1670        fn drop(&mut self) {
1671            GLOBAL_CONSOLIDATION_METRICS.reset();
1672        }
1673    }
1674
1675    fn with_global_consolidation_metrics<T>(body: impl FnOnce() -> T) -> T {
1676        let _guard = GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
1677            .lock()
1678            .expect("global consolidation metrics test lock poisoned");
1679        let _reset = ResetGlobalConsolidationMetrics;
1680        GLOBAL_CONSOLIDATION_METRICS.reset();
1681        body()
1682    }
1683
1684    // ── Consolidator state machine tests ──
1685
1686    #[test]
1687    fn test_consolidator_initial_state() {
1688        let c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1689        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1690        assert_eq!(c.epoch(), 0);
1691        assert_eq!(c.pending_frame_count(), 0);
1692        assert_eq!(c.pending_batch_count(), 0);
1693    }
1694
1695    #[test]
1696    fn test_consolidator_first_writer_becomes_flusher() {
1697        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1698        let batch = TransactionFrameBatch::new(vec![FrameSubmission {
1699            page_number: 1,
1700            page_data: sample_page(0x01),
1701            db_size_if_commit: 0,
1702        }]);
1703        let outcome = c.submit_batch(batch).unwrap();
1704        assert_eq!(outcome, SubmitOutcome::Flusher);
1705        assert_eq!(c.pending_frame_count(), 1);
1706        assert_eq!(c.pending_batch_count(), 1);
1707    }
1708
1709    #[test]
1710    fn test_consolidator_second_writer_becomes_waiter() {
1711        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1712
1713        let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1714            page_number: 1,
1715            page_data: sample_page(0x01),
1716            db_size_if_commit: 0,
1717        }]);
1718        assert_eq!(c.submit_batch(batch1).unwrap(), SubmitOutcome::Flusher);
1719
1720        let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
1721            page_number: 2,
1722            page_data: sample_page(0x02),
1723            db_size_if_commit: 0,
1724        }]);
1725        assert_eq!(c.submit_batch(batch2).unwrap(), SubmitOutcome::Waiter);
1726        assert_eq!(c.pending_frame_count(), 2);
1727        assert_eq!(c.pending_batch_count(), 2);
1728    }
1729
1730    #[test]
1731    fn test_consolidator_filling_flushing_complete_cycle() {
1732        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1733
1734        // Submit 3 batches.
1735        for i in 0..3u8 {
1736            let batch = TransactionFrameBatch::new(vec![FrameSubmission {
1737                page_number: u32::from(i) + 1,
1738                page_data: sample_page(i),
1739                db_size_if_commit: if i == 2 { 3 } else { 0 },
1740            }]);
1741            c.submit_batch(batch).unwrap();
1742        }
1743        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1744        assert_eq!(c.pending_frame_count(), 3);
1745
1746        // Begin flush: FILLING → FLUSHING.
1747        let batches = c.begin_flush().unwrap();
1748        assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1749        assert_eq!(batches.len(), 3);
1750        assert_eq!(c.epoch(), 1);
1751        assert_eq!(c.pending_frame_count(), 0);
1752
1753        // Pipelined submissions during FLUSHING become waiters for the next epoch.
1754        let batch_extra = TransactionFrameBatch::new(vec![FrameSubmission {
1755            page_number: 10,
1756            page_data: sample_page(0x10),
1757            db_size_if_commit: 0,
1758        }]);
1759        assert_eq!(c.submit_batch(batch_extra).unwrap(), SubmitOutcome::Waiter);
1760
1761        // Complete flush: FLUSHING → FILLING with a promoted next epoch.
1762        let promoted = c.complete_flush().unwrap();
1763        assert!(promoted);
1764        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1765        assert_eq!(c.completed_epoch(), 1);
1766        assert_eq!(c.pending_batch_count(), 1);
1767        assert!(c.has_flusher_vacancy());
1768
1769        // The original flusher may continue immediately without an explicit claim.
1770        let batches = c.begin_flush().unwrap();
1771        assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1772        assert_eq!(c.epoch(), 2);
1773        assert_eq!(batches.len(), 1);
1774    }
1775
1776    #[test]
1777    fn test_consolidator_auto_transitions_complete_to_filling() {
1778        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1779
1780        // First cycle.
1781        let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1782            page_number: 1,
1783            page_data: sample_page(0x01),
1784            db_size_if_commit: 1,
1785        }]);
1786        c.submit_batch(batch1).unwrap();
1787        c.begin_flush().unwrap();
1788        c.complete_flush().unwrap();
1789        assert_eq!(c.phase(), ConsolidationPhase::Complete);
1790
1791        // Second submission auto-transitions to FILLING.
1792        let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
1793            page_number: 2,
1794            page_data: sample_page(0x02),
1795            db_size_if_commit: 2,
1796        }]);
1797        let outcome = c.submit_batch(batch2).unwrap();
1798        assert_eq!(outcome, SubmitOutcome::Flusher);
1799        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1800    }
1801
1802    #[test]
1803    fn test_consolidator_should_flush_on_max_group_size() {
1804        let config = GroupCommitConfig {
1805            max_group_size: 3,
1806            ..GroupCommitConfig::default()
1807        };
1808        let mut c = GroupCommitConsolidator::new(config);
1809
1810        // Submit 2 frames — should not flush yet.
1811        for i in 0..2u8 {
1812            c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1813                page_number: u32::from(i) + 1,
1814                page_data: sample_page(i),
1815                db_size_if_commit: 0,
1816            }]))
1817            .unwrap();
1818        }
1819        assert!(!c.should_flush_now());
1820
1821        // Submit 3rd frame — should flush now.
1822        c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1823            page_number: 3,
1824            page_data: sample_page(2),
1825            db_size_if_commit: 3,
1826        }]))
1827        .unwrap();
1828        assert!(c.should_flush_now());
1829    }
1830
1831    #[test]
1832    fn test_consolidator_begin_flush_errors_in_wrong_phase() {
1833        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1834
1835        // Submit and begin flush.
1836        c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1837            page_number: 1,
1838            page_data: sample_page(0x01),
1839            db_size_if_commit: 1,
1840        }]))
1841        .unwrap();
1842        c.begin_flush().unwrap();
1843
1844        // Cannot begin flush again in FLUSHING phase.
1845        assert!(c.begin_flush().is_err());
1846    }
1847
1848    #[test]
1849    fn test_consolidator_complete_flush_errors_in_wrong_phase() {
1850        let c = &mut GroupCommitConsolidator::new(GroupCommitConfig::default());
1851        // Cannot complete flush in FILLING phase.
1852        assert!(c.complete_flush().is_err());
1853    }
1854
1855    #[test]
1856    fn test_consolidator_abort_flush_releases_epoch_and_allows_next_cycle() {
1857        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1858
1859        c.submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1860            page_number: 1,
1861            page_data: sample_page(0x01),
1862            db_size_if_commit: 1,
1863        }]))
1864        .unwrap();
1865        c.begin_flush().unwrap();
1866        c.abort_flush().unwrap();
1867        assert_eq!(c.phase(), ConsolidationPhase::Complete);
1868        assert_eq!(c.completed_epoch(), 0);
1869
1870        let outcome = c
1871            .submit_batch(TransactionFrameBatch::new(vec![FrameSubmission {
1872                page_number: 2,
1873                page_data: sample_page(0x02),
1874                db_size_if_commit: 2,
1875            }]))
1876            .unwrap();
1877        assert_eq!(outcome, SubmitOutcome::Flusher);
1878        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1879        assert_eq!(c.pending_batch_count(), 1);
1880        assert_eq!(c.epoch(), 1);
1881    }
1882
1883    #[test]
1884    fn test_consolidator_promoted_epoch_exposes_flusher_takeover_claim_if_original_stops() {
1885        let mut c = GroupCommitConsolidator::new(GroupCommitConfig::default());
1886
1887        let batch1 = TransactionFrameBatch::new(vec![FrameSubmission {
1888            page_number: 1,
1889            page_data: sample_page(0x01),
1890            db_size_if_commit: 1,
1891        }]);
1892        assert_eq!(c.submit_batch(batch1).unwrap(), SubmitOutcome::Flusher);
1893
1894        let _flushing_batches = c.begin_flush().unwrap();
1895        assert_eq!(c.epoch(), 1);
1896        assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1897
1898        let pipelined_batch = TransactionFrameBatch::new(vec![FrameSubmission {
1899            page_number: 2,
1900            page_data: sample_page(0x02),
1901            db_size_if_commit: 2,
1902        }]);
1903        assert_eq!(
1904            c.submit_batch(pipelined_batch).unwrap(),
1905            SubmitOutcome::Waiter
1906        );
1907
1908        let promoted = c.complete_flush().unwrap();
1909        assert!(
1910            promoted,
1911            "pipelined epoch must be promoted back to FILLING for the next flush"
1912        );
1913        assert_eq!(c.phase(), ConsolidationPhase::Filling);
1914        assert_eq!(c.pending_batch_count(), 1);
1915        assert_eq!(c.pending_frame_count(), 1);
1916        assert_eq!(c.epoch(), 1);
1917        assert_eq!(c.completed_epoch(), 1);
1918        assert!(c.has_flusher_vacancy());
1919
1920        let takeover_batch = TransactionFrameBatch::new(vec![FrameSubmission {
1921            page_number: 3,
1922            page_data: sample_page(0x03),
1923            db_size_if_commit: 3,
1924        }]);
1925        assert_eq!(
1926            c.submit_batch(takeover_batch).unwrap(),
1927            SubmitOutcome::Waiter,
1928            "promoted work stays queued until someone explicitly claims the flusher vacancy"
1929        );
1930        assert!(c.claim_flusher_vacancy());
1931        assert!(!c.has_flusher_vacancy());
1932        assert!(!c.claim_flusher_vacancy());
1933
1934        let takeover_batches = c.begin_flush().unwrap();
1935        assert_eq!(c.phase(), ConsolidationPhase::Flushing);
1936        assert_eq!(c.epoch(), 2);
1937        assert_eq!(takeover_batches.len(), 2);
1938    }
1939
1940    // ── Consolidated write tests ──
1941
1942    #[test]
1943    fn test_consolidated_write_single_batch() {
1944        let cx = test_cx();
1945        let vfs = MemoryVfs::new();
1946        let file = open_wal_file(&vfs, &cx);
1947        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1948
1949        let batches = vec![TransactionFrameBatch::new(vec![
1950            FrameSubmission {
1951                page_number: 1,
1952                page_data: sample_page(0x01),
1953                db_size_if_commit: 0,
1954            },
1955            FrameSubmission {
1956                page_number: 2,
1957                page_data: sample_page(0x02),
1958                db_size_if_commit: 0,
1959            },
1960            FrameSubmission {
1961                page_number: 3,
1962                page_data: sample_page(0x03),
1963                db_size_if_commit: 3,
1964            },
1965        ])];
1966
1967        let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
1968        assert_eq!(written, 3);
1969        assert_eq!(wal.frame_count(), 3);
1970
1971        // Verify frame contents.
1972        for i in 0..3u32 {
1973            let (header, data) = wal
1974                .read_frame(&cx, usize::try_from(i).unwrap())
1975                .expect("read frame");
1976            assert_eq!(header.page_number, i + 1);
1977            let seed = u8::try_from(i + 1).expect("fits");
1978            assert_eq!(data, sample_page(seed));
1979        }
1980
1981        // Last frame should be commit.
1982        let last_header = wal.read_frame_header(&cx, 2).expect("read header");
1983        assert!(last_header.is_commit());
1984        assert_eq!(last_header.db_size, 3);
1985
1986        wal.close(&cx).expect("close WAL");
1987    }
1988
1989    #[test]
1990    fn test_consolidated_write_multiple_batches() {
1991        let cx = test_cx();
1992        let vfs = MemoryVfs::new();
1993        let file = open_wal_file(&vfs, &cx);
1994        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1995
1996        // Two transactions, each with 2 frames.
1997        let batches = vec![
1998            TransactionFrameBatch::new(vec![
1999                FrameSubmission {
2000                    page_number: 10,
2001                    page_data: sample_page(0x10),
2002                    db_size_if_commit: 0,
2003                },
2004                FrameSubmission {
2005                    page_number: 11,
2006                    page_data: sample_page(0x11),
2007                    db_size_if_commit: 11,
2008                },
2009            ]),
2010            TransactionFrameBatch::new(vec![
2011                FrameSubmission {
2012                    page_number: 20,
2013                    page_data: sample_page(0x20),
2014                    db_size_if_commit: 0,
2015                },
2016                FrameSubmission {
2017                    page_number: 21,
2018                    page_data: sample_page(0x21),
2019                    db_size_if_commit: 21,
2020                },
2021            ]),
2022        ];
2023
2024        let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2025        assert_eq!(written, 4);
2026        assert_eq!(wal.frame_count(), 4);
2027
2028        // Verify page numbers.
2029        let expected_pages = [10, 11, 20, 21];
2030        for (i, &expected_page) in expected_pages.iter().enumerate() {
2031            let header = wal.read_frame_header(&cx, i).expect("read header");
2032            assert_eq!(header.page_number, expected_page);
2033        }
2034
2035        wal.close(&cx).expect("close WAL");
2036    }
2037
2038    #[test]
2039    fn test_consolidated_write_preserves_checksum_chain() {
2040        let cx = test_cx();
2041        let vfs = MemoryVfs::new();
2042        let file = open_wal_file(&vfs, &cx);
2043        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2044
2045        // Write some frames the normal way first.
2046        wal.append_frame(&cx, 1, &sample_page(0x01), 0)
2047            .expect("append");
2048        wal.append_frame(&cx, 2, &sample_page(0x02), 2)
2049            .expect("append commit");
2050        assert_eq!(wal.frame_count(), 2);
2051        let _checksum_after_2 = wal.running_checksum();
2052
2053        // Now write a consolidated batch.
2054        let batches = vec![TransactionFrameBatch::new(vec![
2055            FrameSubmission {
2056                page_number: 3,
2057                page_data: sample_page(0x03),
2058                db_size_if_commit: 0,
2059            },
2060            FrameSubmission {
2061                page_number: 4,
2062                page_data: sample_page(0x04),
2063                db_size_if_commit: 4,
2064            },
2065        ])];
2066
2067        let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2068        assert_eq!(written, 2);
2069        assert_eq!(wal.frame_count(), 4);
2070
2071        // Verify checksum chain is intact by reopening.
2072        wal.close(&cx).expect("close WAL");
2073        let file2 = open_wal_file(&vfs, &cx);
2074        let wal2 = WalFile::open(&cx, file2).expect("reopen WAL");
2075        assert_eq!(
2076            wal2.frame_count(),
2077            4,
2078            "all 4 frames should be valid on reopen (checksum chain intact)"
2079        );
2080
2081        wal2.close(&cx).expect("close WAL");
2082    }
2083
2084    #[test]
2085    fn test_consolidated_write_empty_batch() {
2086        let cx = test_cx();
2087        let vfs = MemoryVfs::new();
2088        let file = open_wal_file(&vfs, &cx);
2089        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2090
2091        let written = write_consolidated_frames(&cx, &mut wal, &[]).expect("write empty");
2092        assert_eq!(written, 0);
2093        assert_eq!(wal.frame_count(), 0);
2094
2095        wal.close(&cx).expect("close WAL");
2096    }
2097
2098    #[test]
2099    fn test_consolidated_write_page_size_mismatch_rejected() {
2100        let cx = test_cx();
2101        let vfs = MemoryVfs::new();
2102        let file = open_wal_file(&vfs, &cx);
2103        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2104
2105        let batches = vec![TransactionFrameBatch::new(vec![FrameSubmission {
2106            page_number: 1,
2107            page_data: vec![0u8; 100], // wrong size
2108            db_size_if_commit: 0,
2109        }])];
2110
2111        assert!(
2112            write_consolidated_frames(&cx, &mut wal, &batches).is_err(),
2113            "wrong page size should be rejected"
2114        );
2115
2116        wal.close(&cx).expect("close WAL");
2117    }
2118
2119    // ── Metrics tests ──
2120
2121    #[test]
2122    fn test_consolidation_metrics_basic() {
2123        let m = ConsolidationMetrics::new();
2124        m.record_flush(10, 3, 500);
2125        m.record_flush(20, 5, 1000);
2126        m.record_wait(100);
2127        m.record_busy_retry();
2128        m.record_busy_retry();
2129
2130        let snap = m.snapshot();
2131        assert_eq!(snap.groups_flushed, 2);
2132        assert_eq!(snap.frames_consolidated, 30);
2133        assert_eq!(snap.transactions_batched, 8);
2134        assert_eq!(snap.fsyncs_total, 2);
2135        assert_eq!(snap.flush_duration_us_total, 1500);
2136        assert_eq!(snap.wait_duration_us_total, 100);
2137        assert_eq!(snap.max_group_size_observed, 20);
2138        assert_eq!(snap.busy_retries, 2);
2139        assert_eq!(snap.avg_group_size(), 15);
2140        assert_eq!(snap.avg_transactions_per_group(), 4);
2141        assert_eq!(snap.avg_flush_duration_us(), 750);
2142        assert_eq!(snap.fsync_reduction_ratio(), 4);
2143    }
2144
2145    #[test]
2146    fn test_consolidation_metrics_reset() {
2147        let m = ConsolidationMetrics::new();
2148        m.record_flush(10, 3, 500);
2149        m.record_busy_retry();
2150        m.reset();
2151        let snap = m.snapshot();
2152        assert_eq!(snap.groups_flushed, 0);
2153        assert_eq!(snap.frames_consolidated, 0);
2154        assert_eq!(snap.busy_retries, 0);
2155    }
2156
2157    #[test]
2158    fn test_consolidation_metrics_display() {
2159        let m = ConsolidationMetrics::new();
2160        m.record_flush(10, 5, 500);
2161        m.record_busy_retry();
2162        let s = m.snapshot().to_string();
2163        assert!(s.contains("groups=1"));
2164        assert!(s.contains("frames=10"));
2165        assert!(s.contains("txns=5"));
2166        assert!(s.contains("busy_retries=1"));
2167        assert!(s.contains("reduction=5x"));
2168    }
2169
2170    #[test]
2171    fn test_consolidation_metrics_snapshot_serializes_phase_distributions() {
2172        let m = ConsolidationMetrics::new();
2173        m.record_phase_timing(10, 5, 2, true, 20, 3, 8, 50, 30, 0);
2174        m.record_phase_timing(11, 6, 2, false, 0, 0, 0, 0, 0, 100);
2175        m.wake_reasons.notify.fetch_add(1, Ordering::Relaxed);
2176        m.wake_reasons.timeout.fetch_add(2, Ordering::Relaxed);
2177
2178        let encoded =
2179            serde_json::to_value(m.snapshot()).expect("consolidation snapshot should serialize");
2180
2181        assert_eq!(
2182            encoded["hist_wal_append"]["count"].as_u64(),
2183            Some(1),
2184            "flusher histogram should serialize sample counts"
2185        );
2186        assert_eq!(
2187            encoded["hist_waiter_epoch_wait"]["count"].as_u64(),
2188            Some(1),
2189            "waiter histogram should serialize sample counts"
2190        );
2191        assert_eq!(
2192            encoded["wake_reasons"]["notify"].as_u64(),
2193            Some(1),
2194            "wake reasons should serialize nested counters"
2195        );
2196        assert_eq!(
2197            encoded["wake_reasons"]["timeout"].as_u64(),
2198            Some(2),
2199            "wake reasons should preserve all fields"
2200        );
2201    }
2202
2203    #[test]
2204    fn test_phase_histogram_recent_tail_decays_without_snapshot() {
2205        let h = PhaseHistogram::new();
2206        h.record(1_600);
2207        assert_eq!(h.recent_tail_us(), 1_600);
2208
2209        h.record(0);
2210        assert_eq!(
2211            h.recent_tail_us(),
2212            1_500,
2213            "recent tail should decay by one sixteenth per sample"
2214        );
2215
2216        h.record(2_000);
2217        assert_eq!(
2218            h.recent_tail_us(),
2219            2_000,
2220            "new spikes should replace the decayed tail immediately"
2221        );
2222
2223        h.reset();
2224        assert_eq!(h.recent_tail_us(), 0);
2225    }
2226
2227    /// Deterministic proof that consolidation achieves fsync reduction.
2228    ///
2229    /// Without consolidation: N transactions × 1 fsync each = N fsyncs.
2230    /// With consolidation: N transactions in 1 group = 1 fsync.
2231    /// Reduction: N/1 = N (for N=10, reduction = 10x).
2232    #[test]
2233    fn test_fsync_reduction_deterministic_proof() {
2234        with_global_consolidation_metrics(|| {
2235            let n = 10_u64;
2236            GLOBAL_CONSOLIDATION_METRICS.record_flush(n * 2, n, 1000);
2237
2238            let snap = GLOBAL_CONSOLIDATION_METRICS.snapshot();
2239            assert_eq!(snap.fsyncs_total, 1);
2240            assert_eq!(snap.transactions_batched, n);
2241            assert_eq!(
2242                snap.fsync_reduction_ratio(),
2243                n,
2244                "10 transactions in 1 fsync = 10x reduction"
2245            );
2246        });
2247    }
2248
2249    // ── Config validation tests ──
2250
2251    #[test]
2252    fn test_config_validated_clamps_zero_group_size() {
2253        let config = GroupCommitConfig {
2254            max_group_size: 0,
2255            ..GroupCommitConfig::default()
2256        };
2257        let validated = config.validated();
2258        assert_eq!(validated.max_group_size, 1);
2259    }
2260
2261    #[test]
2262    fn test_config_validated_clamps_excessive_delay() {
2263        let config = GroupCommitConfig {
2264            max_group_delay: Duration::from_millis(100),
2265            max_group_delay_ceiling: Duration::from_millis(10),
2266            ..GroupCommitConfig::default()
2267        };
2268        let validated = config.validated();
2269        assert_eq!(validated.max_group_delay, Duration::from_millis(10));
2270    }
2271
2272    // ── TransactionFrameBatch tests ──
2273
2274    #[test]
2275    fn test_batch_has_commit_frame() {
2276        let batch_with_commit = TransactionFrameBatch::new(vec![
2277            FrameSubmission {
2278                page_number: 1,
2279                page_data: vec![],
2280                db_size_if_commit: 0,
2281            },
2282            FrameSubmission {
2283                page_number: 2,
2284                page_data: vec![],
2285                db_size_if_commit: 5,
2286            },
2287        ]);
2288        assert!(batch_with_commit.has_commit_frame());
2289
2290        let batch_without = TransactionFrameBatch::new(vec![FrameSubmission {
2291            page_number: 1,
2292            page_data: vec![],
2293            db_size_if_commit: 0,
2294        }]);
2295        assert!(!batch_without.has_commit_frame());
2296    }
2297
2298    // ── Full consolidation + write integration test ──
2299
2300    #[test]
2301    fn test_full_consolidation_cycle_with_wal_write() {
2302        let cx = test_cx();
2303        let vfs = MemoryVfs::new();
2304        let file = open_wal_file(&vfs, &cx);
2305        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2306
2307        let mut consolidator = GroupCommitConsolidator::new(GroupCommitConfig {
2308            max_group_size: 10,
2309            ..GroupCommitConfig::default()
2310        });
2311
2312        // Simulate 3 concurrent writers submitting batches.
2313        let batch1 = TransactionFrameBatch::new(vec![
2314            FrameSubmission {
2315                page_number: 1,
2316                page_data: sample_page(0x01),
2317                db_size_if_commit: 0,
2318            },
2319            FrameSubmission {
2320                page_number: 2,
2321                page_data: sample_page(0x02),
2322                db_size_if_commit: 2,
2323            },
2324        ]);
2325        let outcome1 = consolidator.submit_batch(batch1).unwrap();
2326        assert_eq!(outcome1, SubmitOutcome::Flusher);
2327
2328        let batch2 = TransactionFrameBatch::new(vec![FrameSubmission {
2329            page_number: 3,
2330            page_data: sample_page(0x03),
2331            db_size_if_commit: 3,
2332        }]);
2333        let outcome2 = consolidator.submit_batch(batch2).unwrap();
2334        assert_eq!(outcome2, SubmitOutcome::Waiter);
2335
2336        let batch3 = TransactionFrameBatch::new(vec![
2337            FrameSubmission {
2338                page_number: 4,
2339                page_data: sample_page(0x04),
2340                db_size_if_commit: 0,
2341            },
2342            FrameSubmission {
2343                page_number: 5,
2344                page_data: sample_page(0x05),
2345                db_size_if_commit: 5,
2346            },
2347        ]);
2348        let outcome3 = consolidator.submit_batch(batch3).unwrap();
2349        assert_eq!(outcome3, SubmitOutcome::Waiter);
2350
2351        // Flusher begins flush.
2352        let batches = consolidator.begin_flush().unwrap();
2353        assert_eq!(batches.len(), 3);
2354
2355        // Write all frames in one consolidated I/O.
2356        let written = write_consolidated_frames(&cx, &mut wal, &batches).expect("write");
2357        assert_eq!(written, 5);
2358
2359        // Mark flush complete.
2360        consolidator.complete_flush().unwrap();
2361        assert_eq!(consolidator.phase(), ConsolidationPhase::Complete);
2362
2363        // Verify WAL integrity.
2364        assert_eq!(wal.frame_count(), 5);
2365
2366        // Reopen to verify checksum chain.
2367        wal.close(&cx).expect("close WAL");
2368        let file2 = open_wal_file(&vfs, &cx);
2369        let wal2 = WalFile::open(&cx, file2).expect("reopen WAL");
2370        assert_eq!(wal2.frame_count(), 5, "all frames valid on reopen");
2371        wal2.close(&cx).expect("close WAL");
2372    }
2373
2374    // ── PhaseHistogram tests (bd-db300.3.8.1) ──────────────────────
2375
2376    #[test]
2377    fn phase_histogram_empty_returns_zeros() {
2378        let h = PhaseHistogram::new();
2379        let p = h.percentiles();
2380        assert_eq!(p.count, 0);
2381        assert_eq!(p.p50, 0);
2382        assert_eq!(p.p99, 0);
2383        assert_eq!(p.max, 0);
2384        assert_eq!(p.mean_us, 0);
2385    }
2386
2387    #[test]
2388    fn phase_histogram_single_sample() {
2389        let h = PhaseHistogram::new();
2390        h.record(42);
2391        let p = h.percentiles();
2392        assert_eq!(p.count, 1);
2393        assert_eq!(p.p50, 42);
2394        assert_eq!(p.p95, 42);
2395        assert_eq!(p.p99, 42);
2396        assert_eq!(p.max, 42);
2397        assert_eq!(p.mean_us, 42);
2398    }
2399
2400    #[test]
2401    fn phase_histogram_percentiles_ordered() {
2402        let h = PhaseHistogram::new();
2403        for i in 1..=100 {
2404            h.record(i);
2405        }
2406        let p = h.percentiles();
2407        assert_eq!(p.count, 100);
2408        assert!(p.p50 <= p.p95, "p50={} should be <= p95={}", p.p50, p.p95);
2409        assert!(p.p95 <= p.p99, "p95={} should be <= p99={}", p.p95, p.p99);
2410        assert!(p.p99 <= p.max, "p99={} should be <= max={}", p.p99, p.max);
2411        assert_eq!(p.max, 100);
2412        assert_eq!(p.mean_us, 50);
2413    }
2414
2415    #[test]
2416    fn phase_histogram_max_tracks_outlier() {
2417        let h = PhaseHistogram::new();
2418        for _ in 0..100 {
2419            h.record(10);
2420        }
2421        h.record(99999);
2422        let p = h.percentiles();
2423        assert_eq!(p.max, 99999);
2424        assert_eq!(p.p50, 10);
2425    }
2426
2427    #[test]
2428    fn phase_histogram_reset_clears_state() {
2429        let h = PhaseHistogram::new();
2430        for i in 0..50 {
2431            h.record(i * 10);
2432        }
2433        h.reset();
2434        let p = h.percentiles();
2435        assert_eq!(p.count, 0);
2436        assert_eq!(p.max, 0);
2437    }
2438
2439    #[test]
2440    fn phase_histogram_concurrent_writers_no_panic() {
2441        use std::sync::Arc;
2442        let h = Arc::new(PhaseHistogram::new());
2443        let barrier = Arc::new(std::sync::Barrier::new(4));
2444        let mut handles = Vec::new();
2445        for t in 0..4u64 {
2446            let h = Arc::clone(&h);
2447            let b = Arc::clone(&barrier);
2448            handles.push(std::thread::spawn(move || {
2449                b.wait();
2450                for i in 0..500 {
2451                    h.record(t * 1000 + i);
2452                }
2453            }));
2454        }
2455        for handle in handles {
2456            handle.join().unwrap();
2457        }
2458        let p = h.percentiles();
2459        assert_eq!(p.count, 2000);
2460        assert!(p.max >= 3499);
2461    }
2462
2463    // ── WakeReasonCounters tests (bd-db300.3.8.1) ──────────────────
2464
2465    #[test]
2466    fn wake_reason_counters_track_all_reasons() {
2467        let w = WakeReasonCounters::new();
2468        w.notify.fetch_add(10, Ordering::Relaxed);
2469        w.timeout.fetch_add(3, Ordering::Relaxed);
2470        w.flusher_takeover.fetch_add(1, Ordering::Relaxed);
2471        w.failed_epoch.fetch_add(2, Ordering::Relaxed);
2472        w.busy_retry.fetch_add(5, Ordering::Relaxed);
2473        let s = w.snapshot();
2474        assert_eq!(s.notify, 10);
2475        assert_eq!(s.timeout, 3);
2476        assert_eq!(s.flusher_takeover, 1);
2477        assert_eq!(s.failed_epoch, 2);
2478        assert_eq!(s.busy_retry, 5);
2479        assert_eq!(s.total(), 21);
2480    }
2481
2482    #[test]
2483    fn wake_reason_reset_clears() {
2484        let w = WakeReasonCounters::new();
2485        w.notify.fetch_add(99, Ordering::Relaxed);
2486        w.reset();
2487        let s = w.snapshot();
2488        assert_eq!(s.total(), 0);
2489    }
2490
2491    // ── Integration: histograms in ConsolidationMetrics ──────────────
2492
2493    #[test]
2494    fn consolidation_metrics_snapshot_includes_distributions() {
2495        with_global_consolidation_metrics(|| {
2496            for i in 0..10u64 {
2497                GLOBAL_CONSOLIDATION_METRICS.record_phase_timing(
2498                    10 + i,
2499                    5 + i,
2500                    2,
2501                    true,
2502                    20 + i,
2503                    3 + i,
2504                    8 + i,
2505                    50 + i,
2506                    30 + i,
2507                    0,
2508                );
2509            }
2510            for i in 0..5u64 {
2511                GLOBAL_CONSOLIDATION_METRICS.record_phase_timing(
2512                    10 + i,
2513                    5 + i,
2514                    2,
2515                    false,
2516                    0,
2517                    0,
2518                    0,
2519                    0,
2520                    0,
2521                    100 + i,
2522                );
2523            }
2524
2525            let snap = GLOBAL_CONSOLIDATION_METRICS.snapshot();
2526            assert_eq!(snap.hist_consolidator_lock_wait.count, 15);
2527            assert_eq!(snap.hist_arrival_wait.count, 10);
2528            assert_eq!(snap.hist_wal_append.count, 10);
2529            assert_eq!(snap.hist_waiter_epoch_wait.count, 5);
2530            assert_eq!(snap.hist_phase_b.count, 15);
2531            assert!(snap.hist_wal_append.p50 > 0);
2532            assert!(snap.hist_wal_append.max >= 59);
2533        });
2534    }
2535
2536    #[test]
2537    fn transaction_conflict_snapshot_debug_clone_copy_eq() {
2538        let generation = WalGenerationIdentity {
2539            checkpoint_seq: 0,
2540            salts: WalSalts { salt1: 0, salt2: 0 },
2541        };
2542        let a = TransactionConflictSnapshot {
2543            generation,
2544            last_commit_frame: Some(42),
2545            commit_count: 7,
2546        };
2547        let copied = a;
2548        assert_eq!(copied, a);
2549        let b = TransactionConflictSnapshot {
2550            generation,
2551            last_commit_frame: None,
2552            commit_count: 7,
2553        };
2554        assert_ne!(a, b);
2555        let dbg = format!("{a:?}");
2556        assert!(dbg.contains("TransactionConflictSnapshot"));
2557    }
2558
2559    #[test]
2560    fn transaction_frame_batch_context_default_and_eq() {
2561        let def = TransactionFrameBatchContext::default();
2562        assert_eq!(def.batch_id, 0);
2563        assert_eq!(def.lane_id, 0);
2564        assert_eq!(def.staged_frame_count, 0);
2565        assert_eq!(def.staging_elapsed_ns, 0);
2566        let other = TransactionFrameBatchContext {
2567            batch_id: 1,
2568            lane_id: 3,
2569            staged_frame_count: 10,
2570            staging_elapsed_ns: 500,
2571        };
2572        assert_ne!(def, other);
2573        let copied = other;
2574        assert_eq!(copied, other);
2575        let dbg = format!("{def:?}");
2576        assert!(dbg.contains("TransactionFrameBatchContext"));
2577    }
2578
2579    #[test]
2580    fn consolidation_phase_and_submit_outcome_all_variants() {
2581        let phases = [
2582            ConsolidationPhase::Filling,
2583            ConsolidationPhase::Flushing,
2584            ConsolidationPhase::Complete,
2585        ];
2586        for (i, p) in phases.iter().enumerate() {
2587            let copied = *p;
2588            assert_eq!(copied, *p);
2589            for (j, q) in phases.iter().enumerate() {
2590                assert_eq!(i == j, p == q);
2591            }
2592        }
2593        assert_ne!(SubmitOutcome::Flusher, SubmitOutcome::Waiter);
2594        let copied = SubmitOutcome::Flusher;
2595        assert_eq!(copied, SubmitOutcome::Flusher);
2596        let dbg = format!("{:?}", SubmitOutcome::Waiter);
2597        assert!(dbg.contains("Waiter"));
2598    }
2599
2600    #[test]
2601    fn transaction_frame_batch_builders() {
2602        let frame = FrameSubmission {
2603            page_number: 5,
2604            page_data: vec![0u8; 16],
2605            db_size_if_commit: 0,
2606        };
2607        let batch = TransactionFrameBatch::new(vec![frame.clone()])
2608            .with_conflict_snapshot(vec![5, 10], None)
2609            .with_context(TransactionFrameBatchContext {
2610                batch_id: 99,
2611                lane_id: 2,
2612                staged_frame_count: 1,
2613                staging_elapsed_ns: 100,
2614            });
2615        assert_eq!(batch.frame_count(), 1);
2616        assert!(!batch.has_commit_frame());
2617        assert_eq!(batch.conflict_pages, vec![5, 10]);
2618        assert!(batch.conflict_snapshot.is_none());
2619        assert_eq!(batch.context.batch_id, 99);
2620        assert_eq!(batch.context.lane_id, 2);
2621    }
2622
2623    #[test]
2624    fn group_commit_config_default_copy_debug() {
2625        let cfg = GroupCommitConfig::default();
2626        let copied = cfg;
2627        assert_eq!(copied.max_group_size, 64);
2628        assert_eq!(copied.max_group_delay, Duration::from_millis(1));
2629        assert_eq!(copied.max_group_delay_ceiling, Duration::from_millis(10));
2630        let dbg = format!("{cfg:?}");
2631        assert!(dbg.contains("GroupCommitConfig"));
2632    }
2633
2634    #[test]
2635    fn frame_submission_debug_clone() {
2636        let fs = FrameSubmission {
2637            page_number: 42,
2638            page_data: vec![0xAB; 8],
2639            db_size_if_commit: 0,
2640        };
2641        let cloned = fs.clone();
2642        assert_eq!(cloned.page_number, 42);
2643        assert_eq!(cloned.page_data.len(), 8);
2644        assert_eq!(cloned.db_size_if_commit, 0);
2645        let dbg = format!("{fs:?}");
2646        assert!(dbg.contains("FrameSubmission"));
2647    }
2648
2649    #[test]
2650    fn phase_percentiles_default_copy_eq() {
2651        let pp = PhasePercentiles::default();
2652        let copied = pp;
2653        assert_eq!(copied, pp);
2654        assert_eq!(pp.p50, 0);
2655        assert_eq!(pp.p99, 0);
2656        assert_eq!(pp.max, 0);
2657        assert_eq!(pp.count, 0);
2658    }
2659
2660    #[test]
2661    fn wake_reason_snapshot_default_total_zero() {
2662        let ws = WakeReasonSnapshot::default();
2663        assert_eq!(ws.total(), 0);
2664        let copied = ws;
2665        assert_eq!(copied, ws);
2666        let dbg = format!("{ws:?}");
2667        assert!(dbg.contains("WakeReasonSnapshot"));
2668    }
2669}