Skip to main content

reddb_server/storage/unified/store/
commit.rs

1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10/// Adaptive group-commit window applied when the configured
11/// `GroupCommitOptions::window_ms` is 0 (the historical default).
12///
13/// Sub-millisecond so individual single-writer commits don't see a
14/// visible latency penalty (a 200 µs floor sits below typical NVMe
15/// fsync latency of 50–150 µs by ~one fsync), while still giving a
16/// pipelined writer a chance to drop a second statement into the
17/// same drain cycle. A lone synchronous `insert_one` still pays one
18/// fsync per row in the worst case, but two back-to-back inserts on
19/// the same connection now coalesce into one drain. See P1 in
20/// `docs/perf/insert_sequential-2026-05-05.md`.
21///
22/// Override via `REDDB_GROUP_COMMIT_WINDOW_US` (microseconds). Set
23/// to `0` to disable the floor entirely (legacy behaviour). Set
24/// `REDDB_GROUP_COMMIT_WINDOW_MS` to any non-zero value to bypass
25/// this floor — explicit ms config wins.
26const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28/// Shorthand — `Arc<parking_lot::Mutex<WalWriter>>` avoids poisoning
29/// (one writer panicking mid-append used to taint every subsequent
30/// lock acquisition) and shaves a few syscalls off the fast path.
31/// The group-commit coordinator + writer threads all acquire this
32/// mutex in the hot insert path, so the unpoison/fast-park win
33/// compounds under 16-way concurrency.
34type WalMutex = parking_lot::Mutex<WalWriter>;
35
36/// Shorthand for the group-commit coordinator's state pair. Same
37/// non-poisoning + lighter-park motivation as `WalMutex`; writer
38/// threads `wait` on this condvar until the coordinator publishes a
39/// new `durable_lsn`, so the park cost shows up on every
40/// WalDurableGrouped transaction.
41type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47#[derive(Debug, Clone)]
48pub(crate) enum StoreWalAction {
49    CreateCollection {
50        name: String,
51    },
52    DropCollection {
53        name: String,
54    },
55    UpsertEntityRecord {
56        collection: String,
57        record: Vec<u8>,
58    },
59    DeleteEntityRecord {
60        collection: String,
61        entity_id: u64,
62    },
63    /// Batched upsert — one WAL action carrying N serialized entity
64    /// records for the same collection. Saves the per-row Begin/
65    /// PageWrite/Commit framing overhead on the bulk insert hot path.
66    /// Replay applies every contained record in order.
67    BulkUpsertEntityRecords {
68        collection: String,
69        records: Vec<Vec<u8>>,
70    },
71    /// Atomic full-collection replace — issue #595 slice 9c.
72    /// Replay drops the in-memory collection state and rebuilds it
73    /// from the contained records. Used by REFRESH MATERIALIZED VIEW
74    /// so a concurrent reader sees either the prior contents or the
75    /// new contents, never a partial state, and a crash mid-refresh
76    /// leaves the prior contents intact on recovery (the action is
77    /// only durable once the WAL commit lands).
78    RefreshCollection {
79        collection: String,
80        records: Vec<Vec<u8>>,
81    },
82}
83
84#[derive(Debug, Default)]
85pub(crate) struct DeferredStoreWalActions {
86    actions: Vec<StoreWalAction>,
87}
88
89impl DeferredStoreWalActions {
90    pub(crate) fn is_empty(&self) -> bool {
91        self.actions.is_empty()
92    }
93
94    pub(crate) fn extend(&mut self, other: Self) {
95        self.actions.extend(other.actions);
96    }
97}
98
99thread_local! {
100    static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
101        const { RefCell::new(None) };
102}
103
104fn begin_deferred_store_wal_capture() {
105    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
106        let mut guard = cell.borrow_mut();
107        debug_assert!(guard.is_none());
108        *guard = Some(Vec::new());
109    });
110}
111
112fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
113    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
114        let mut guard = cell.borrow_mut();
115        if let Some(pending) = guard.as_mut() {
116            pending.extend(actions);
117            true
118        } else {
119            false
120        }
121    })
122}
123
124fn deferred_store_wal_capture_active() -> bool {
125    DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
126}
127
128fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
129    DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
130        actions: cell.borrow_mut().take().unwrap_or_default(),
131    })
132}
133
134impl StoreWalAction {
135    pub(crate) fn upsert_entity(
136        collection: &str,
137        entity: &UnifiedEntity,
138        metadata: Option<&Metadata>,
139        format_version: u32,
140    ) -> Self {
141        Self::UpsertEntityRecord {
142            collection: collection.to_string(),
143            record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
144        }
145    }
146
147    fn encode(&self) -> Vec<u8> {
148        reddb_file::encode_store_wal_action_frame(&self.to_frame())
149            .expect("encode store wal action frame")
150    }
151
152    fn to_frame(&self) -> reddb_file::StoreWalActionFrame {
153        match self {
154            Self::CreateCollection { name } => {
155                reddb_file::StoreWalActionFrame::CreateCollection { name: name.clone() }
156            }
157            Self::DropCollection { name } => {
158                reddb_file::StoreWalActionFrame::DropCollection { name: name.clone() }
159            }
160            Self::UpsertEntityRecord { collection, record } => {
161                reddb_file::StoreWalActionFrame::UpsertEntityRecord {
162                    collection: collection.clone(),
163                    record: record.clone(),
164                }
165            }
166            Self::DeleteEntityRecord {
167                collection,
168                entity_id,
169            } => reddb_file::StoreWalActionFrame::DeleteEntityRecord {
170                collection: collection.clone(),
171                entity_id: *entity_id,
172            },
173            Self::BulkUpsertEntityRecords {
174                collection,
175                records,
176            } => reddb_file::StoreWalActionFrame::BulkUpsertEntityRecords {
177                collection: collection.clone(),
178                records: records.clone(),
179            },
180            Self::RefreshCollection {
181                collection,
182                records,
183            } => reddb_file::StoreWalActionFrame::RefreshCollection {
184                collection: collection.clone(),
185                records: records.clone(),
186            },
187        }
188    }
189
190    fn decode(bytes: &[u8]) -> io::Result<Self> {
191        match reddb_file::decode_store_wal_action_frame(bytes)? {
192            reddb_file::StoreWalActionFrame::CreateCollection { name } => {
193                Ok(Self::CreateCollection { name })
194            }
195            reddb_file::StoreWalActionFrame::DropCollection { name } => {
196                Ok(Self::DropCollection { name })
197            }
198            reddb_file::StoreWalActionFrame::UpsertEntityRecord { collection, record } => {
199                Ok(Self::UpsertEntityRecord { collection, record })
200            }
201            reddb_file::StoreWalActionFrame::DeleteEntityRecord {
202                collection,
203                entity_id,
204            } => Ok(Self::DeleteEntityRecord {
205                collection,
206                entity_id,
207            }),
208            reddb_file::StoreWalActionFrame::BulkUpsertEntityRecords {
209                collection,
210                records,
211            } => Ok(Self::BulkUpsertEntityRecords {
212                collection,
213                records,
214            }),
215            reddb_file::StoreWalActionFrame::RefreshCollection {
216                collection,
217                records,
218            } => Ok(Self::RefreshCollection {
219                collection,
220                records,
221            }),
222        }
223    }
224}
225
226#[derive(Debug)]
227struct CommitState {
228    durable_lsn: u64,
229    pending_target_lsn: u64,
230    pending_statements: usize,
231    pending_wal_bytes: u64,
232    first_pending_at: Option<Instant>,
233    shutdown: bool,
234    last_error: Option<String>,
235}
236
237impl CommitState {
238    fn new(initial_durable_lsn: u64) -> Self {
239        Self {
240            durable_lsn: initial_durable_lsn,
241            pending_target_lsn: initial_durable_lsn,
242            pending_statements: 0,
243            pending_wal_bytes: 0,
244            first_pending_at: None,
245            shutdown: false,
246            last_error: None,
247        }
248    }
249}
250
251/// Lock-free append queue sitting in front of `WalWriter`.
252///
253/// Writers atomically reserve a byte range via `next_lsn.fetch_add`
254/// and push their encoded bytes into a parking_lot-guarded vector.
255/// The group-commit coordinator is the sole drainer: it sorts the
256/// pending entries by LSN (so the file bytes land at the offsets
257/// each writer reserved) and hands them to `WalWriter::append_bytes`.
258///
259/// This replaces the old `wal.lock() ... append ... append ... drop`
260/// hot path where 16 concurrent writers serialised on the WAL
261/// mutex for ~13µs each. Hold time on the queue's parking_lot
262/// mutex is ~200ns (just a Vec::push) — 65× shorter, so the mutex
263/// convoy on concurrent inserts disappears.
264pub(crate) struct WalAppendQueue {
265    /// Tuple of (monotonically-increasing LSN, pending-bytes vec)
266    /// protected by one mutex. Keeping the LSN reservation AND the
267    /// push under the same lock guarantees that every queue entry
268    /// is visible the moment its LSN is assigned — no gap between
269    /// `fetch_add` and `push` for the leader to spin on.
270    ///
271    /// Earlier versions reserved LSN with an `AtomicU64::fetch_add`
272    /// outside the lock; the leader drain observed reordered
273    /// `(lsn, bytes)` tuples whose pushes happened in a different
274    /// order than their fetch_add, creating "holes" in the LSN
275    /// sequence that the drain loop interpreted as "wait for the
276    /// missing enqueuer" — under tokio scheduling pressure the
277    /// missing enqueuer was preempted indefinitely and the
278    /// drain loop busy-waited forever (WAL stayed at 8 bytes).
279    pending: parking_lot::Mutex<WalQueueState>,
280}
281
282struct WalQueueState {
283    next_lsn: u64,
284    entries: Vec<(u64, Vec<u8>)>,
285}
286
287impl WalAppendQueue {
288    fn new(initial_lsn: u64) -> Self {
289        Self {
290            pending: parking_lot::Mutex::new(WalQueueState {
291                next_lsn: initial_lsn,
292                entries: Vec::with_capacity(64),
293            }),
294        }
295    }
296
297    /// Reserve an LSN range of `bytes.len()` bytes and push onto the
298    /// queue. Returns the commit LSN (end of reserved range), which
299    /// the caller passes to `wait_until_durable`. LSN assignment and
300    /// push happen under the same mutex — no gap for the drain loop
301    /// to busy-spin on.
302    fn enqueue(&self, bytes: Vec<u8>) -> u64 {
303        let len = bytes.len() as u64;
304        let mut state = self.pending.lock();
305        let start_lsn = state.next_lsn;
306        state.next_lsn = start_lsn + len;
307        state.entries.push((start_lsn, bytes));
308        start_lsn + len
309    }
310
311    /// Drain all queued entries in LSN order. Caller holds the WAL
312    /// file mutex while writing the drained bytes so the on-disk
313    /// layout matches the reserved LSN offsets.
314    fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
315        let mut state = self.pending.lock();
316        let mut v = std::mem::take(&mut state.entries);
317        drop(state);
318        v.sort_by_key(|(lsn, _)| *lsn);
319        v
320    }
321
322    /// Whether any entry is queued. Leader uses this to decide
323    /// whether to spin once more or go back to the condvar.
324    fn has_pending(&self) -> bool {
325        !self.pending.lock().entries.is_empty()
326    }
327
328    /// Reset the LSN cursor and discard any queued entries. Used
329    /// after `wal.truncate()` — the wal-side byte counter goes back
330    /// to the header size, so the queue (which tracks LSNs in the
331    /// same byte space) must follow or every subsequent enqueue
332    /// returns a target the drain loop can never reach.
333    fn reset(&self, next_lsn: u64) {
334        let mut state = self.pending.lock();
335        state.next_lsn = next_lsn;
336        state.entries.clear();
337    }
338}
339
340pub(crate) struct StoreCommitCoordinator {
341    mode: DurabilityMode,
342    config: crate::api::GroupCommitOptions,
343    wal_path: PathBuf,
344    wal: Arc<WalMutex>,
345    /// Lock-free front door for writers. Populated alongside
346    /// `WalDurableGrouped` / `Async` modes so concurrent inserts
347    /// never contend on `wal` for the append step. Strict mode
348    /// bypasses the queue and calls `WalWriter::append` directly
349    /// to preserve the one-fsync-per-commit semantic.
350    queue: Arc<WalAppendQueue>,
351    state: Arc<(CommitStateMutex, CommitStateCondvar)>,
352    /// Number of `wal.sync()` calls issued by the group-commit
353    /// drain loop. Used by tests to observe coalescing — a burst
354    /// of N concurrent commits should bump this by far less than N
355    /// when the adaptive window is doing its job. Strict-mode
356    /// commits go through `force_sync` which also bumps this.
357    fsync_count: Arc<AtomicU64>,
358}
359
360impl StoreCommitCoordinator {
361    pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
362        matches!(
363            mode,
364            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
365        ) || path.exists()
366    }
367
368    pub(crate) fn open(
369        wal_path: impl Into<PathBuf>,
370        mode: DurabilityMode,
371        config: crate::api::GroupCommitOptions,
372    ) -> io::Result<Self> {
373        let wal_path = wal_path.into();
374        let wal = WalWriter::open(&wal_path)?;
375        let initial_durable_lsn = wal.durable_lsn();
376        let initial_current_lsn = wal.current_lsn();
377        let wal = Arc::new(WalMutex::new(wal));
378        let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
379        let state = Arc::new((
380            CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
381            CommitStateCondvar::new(),
382        ));
383        let fsync_count = Arc::new(AtomicU64::new(0));
384
385        if matches!(
386            mode,
387            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
388        ) {
389            let wal_bg = Arc::clone(&wal);
390            let queue_bg = Arc::clone(&queue);
391            let state_bg = Arc::clone(&state);
392            let fsync_bg = Arc::clone(&fsync_count);
393            // P1: adaptive group-commit window. Historical default is
394            // `window_ms = 0` ("no wait"), which under single-writer
395            // OLTP means one fsync per autocommit row — the throughput
396            // floor. When window_ms is 0 we fall back to a small
397            // microsecond floor (`DEFAULT_ADAPTIVE_WINDOW_US`, override
398            // via `REDDB_GROUP_COMMIT_WINDOW_US`) so a pipelined writer
399            // can drop a second statement into the same drain cycle.
400            // Explicit non-zero `window_ms` config takes precedence.
401            //
402            // The loop already short-circuits on
403            // `pending_statements >= max_statements` /
404            // `pending_wal_bytes >= max_wal_bytes`, so the window only
405            // delays the leader when the queue is otherwise empty —
406            // exactly the case we want to coalesce.
407            let window = Self::resolve_window(&config);
408            let max_statements = config.max_statements.max(1);
409            let max_wal_bytes = config.max_wal_bytes.max(1);
410            std::thread::spawn(move || {
411                Self::run_group_commit_loop(
412                    wal_bg,
413                    queue_bg,
414                    state_bg,
415                    fsync_bg,
416                    window,
417                    max_statements,
418                    max_wal_bytes,
419                );
420            });
421        }
422
423        Ok(Self {
424            mode,
425            config,
426            wal_path,
427            wal,
428            queue,
429            state,
430            fsync_count,
431        })
432    }
433
434    /// Resolve the effective group-commit window from the configured
435    /// options + the `REDDB_GROUP_COMMIT_WINDOW_US` env override.
436    ///
437    /// Precedence (highest first):
438    ///   1. `REDDB_GROUP_COMMIT_WINDOW_US=N` — N µs (0 disables).
439    ///   2. `config.window_ms != 0` — explicit ms config wins.
440    ///   3. `DEFAULT_ADAPTIVE_WINDOW_US` — adaptive floor for the
441    ///      historical zero-default. Set the env var to 0 to opt out.
442    fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
443        if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
444            if let Ok(parsed) = raw.parse::<u64>() {
445                return Duration::from_micros(parsed);
446            }
447        }
448        if config.window_ms != 0 {
449            return Duration::from_millis(config.window_ms);
450        }
451        Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
452    }
453
454    /// Total `wal.sync()` calls issued since this coordinator opened.
455    /// Public for tests that want to observe fsync coalescing under
456    /// concurrent autocommits.
457    #[cfg(test)]
458    pub(crate) fn fsync_count(&self) -> u64 {
459        self.fsync_count.load(Ordering::Relaxed)
460    }
461
462    pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
463        if actions.is_empty() {
464            return Ok(());
465        }
466
467        let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
468
469        // Strict mode: bypass the queue, write + fsync inline. Strict
470        // commits are exactly one-fsync-per-call by contract, so the
471        // coalescing win of the queue doesn't apply and we'd pay an
472        // extra hop through the drain loop.
473        if matches!(self.mode, DurabilityMode::Strict) {
474            let commit_lsn = {
475                let mut wal = self.wal.lock();
476                wal.append(&WalRecord::TxCommitBatch {
477                    tx_id,
478                    actions: actions.iter().map(StoreWalAction::encode).collect(),
479                })?;
480                wal.current_lsn()
481            };
482            self.force_sync()?;
483            let _ = commit_lsn;
484            return Ok(());
485        }
486
487        // Grouped / Async path — lock-free enqueue. Encode every
488        // WalRecord into one contiguous byte blob OUTSIDE any lock,
489        // then hand it to the queue with a single fetch_add+push.
490        let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
491        let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
492            total.saturating_add(payload.len() as u64)
493        });
494        let blob = WalRecord::TxCommitBatch {
495            tx_id,
496            actions: encoded_actions,
497        }
498        .encode();
499
500        let commit_lsn = self.queue.enqueue(blob);
501        self.wait_until_durable(commit_lsn, wal_bytes)?;
502        Ok(())
503    }
504
505    /// Encode + enqueue a single, non-batched [`WalRecord`] (issue
506    /// #693). Honours the same Strict / Grouped / Async branching as
507    /// [`Self::append_actions`] so durability semantics are uniform
508    /// across record kinds.
509    pub(crate) fn append_single_record(&self, record: WalRecord) -> io::Result<()> {
510        let blob = record.encode();
511        let wal_bytes = blob.len() as u64;
512        if matches!(self.mode, DurabilityMode::Strict) {
513            {
514                let mut wal = self.wal.lock();
515                wal.append(&record)?;
516            }
517            self.force_sync()?;
518            return Ok(());
519        }
520        let commit_lsn = self.queue.enqueue(blob);
521        self.wait_until_durable(commit_lsn, wal_bytes)?;
522        Ok(())
523    }
524
525    pub(crate) fn force_sync(&self) -> io::Result<()> {
526        {
527            let mut wal = self.wal.lock();
528            wal.sync()?;
529            self.fsync_count.fetch_add(1, Ordering::Relaxed);
530            let durable = wal.durable_lsn();
531            drop(wal);
532            let (state_lock, cond) = &*self.state;
533            let mut state = state_lock.lock();
534            state.durable_lsn = durable;
535            state.pending_target_lsn = durable.max(state.pending_target_lsn);
536            state.pending_statements = 0;
537            state.pending_wal_bytes = 0;
538            state.first_pending_at = None;
539            state.last_error = None;
540            cond.notify_all();
541        }
542        Ok(())
543    }
544
545    pub(crate) fn truncate(&self) -> io::Result<()> {
546        let mut wal = self.wal.lock();
547        wal.truncate()?;
548        let durable = wal.durable_lsn();
549        let current = wal.current_lsn();
550        drop(wal);
551
552        // Queue's next_lsn tracks byte offsets in the same space as
553        // wal.current_lsn. After truncate both must be reset together
554        // — otherwise enqueue returns a target_lsn in the old range
555        // that drain can never reach, and wait_until_durable hangs.
556        self.queue.reset(current);
557
558        let (state_lock, cond) = &*self.state;
559        let mut state = state_lock.lock();
560        state.durable_lsn = durable;
561        state.pending_target_lsn = durable;
562        state.pending_statements = 0;
563        state.pending_wal_bytes = 0;
564        state.first_pending_at = None;
565        state.last_error = None;
566        cond.notify_all();
567        Ok(())
568    }
569
570    pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
571        if !self.wal_path.exists() {
572            return Ok(());
573        }
574
575        let reader = match WalReader::open(&self.wal_path) {
576            Ok(reader) => reader,
577            Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
578            Err(err) => return Err(err),
579        };
580
581        let mut tx_states = std::collections::HashMap::<u64, bool>::new();
582        let mut pending = Vec::<(u64, Vec<u8>)>::new();
583
584        for record in reader.iter() {
585            let (_lsn, record) = match record {
586                Ok(record) => record,
587                Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
588                Err(err) => return Err(err),
589            };
590            match record {
591                WalRecord::TxCommitBatch { actions, .. } => {
592                    for payload in actions {
593                        let action = StoreWalAction::decode(&payload)?;
594                        store.apply_replayed_action(&action).map_err(|err| {
595                            io::Error::other(format!("failed to replay store wal action: {err}"))
596                        })?;
597                    }
598                }
599                WalRecord::Begin { tx_id } => {
600                    tx_states.insert(tx_id, false);
601                }
602                WalRecord::Commit { tx_id } => {
603                    tx_states.insert(tx_id, true);
604                }
605                WalRecord::Rollback { tx_id } => {
606                    tx_states.remove(&tx_id);
607                }
608                WalRecord::PageWrite {
609                    tx_id,
610                    page_id: _,
611                    data,
612                } => pending.push((tx_id, data)),
613                WalRecord::Checkpoint { .. } => {}
614                WalRecord::VectorInsert {
615                    collection,
616                    entity_id,
617                    vector,
618                } => {
619                    // Issue #694 — capture `vector.turbo` WAL inserts so the
620                    // boot-time TurboQuant index rebuild can drain them in
621                    // WAL order (deterministic against the pre-restart
622                    // state under a fixed codec seed). The legacy `vector`
623                    // path doesn't read this map.
624                    let mut map = store.replayed_turbo_inserts.lock();
625                    map.entry(collection).or_default().push((entity_id, vector));
626                }
627                WalRecord::FullPageImage { .. } => {
628                    // Pager-level FPI (gh-478); store replay ignores.
629                }
630            }
631        }
632
633        for (tx_id, payload) in pending {
634            if !tx_states.get(&tx_id).copied().unwrap_or(false) {
635                continue;
636            }
637            let action = StoreWalAction::decode(&payload)?;
638            store.apply_replayed_action(&action).map_err(|err| {
639                io::Error::other(format!("failed to replay store wal action: {err}"))
640            })?;
641        }
642
643        Ok(())
644    }
645
646    fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
647        match self.mode {
648            DurabilityMode::Strict => self.force_sync(),
649            // Async: record the pending target so the background
650            // flusher eventually covers it, but don't block the
651            // caller. Matches PG `synchronous_commit=off` semantics —
652            // crash inside the flush window loses unflushed commits.
653            DurabilityMode::Async => {
654                let (state_lock, cond) = &*self.state;
655                let mut state = state_lock.lock();
656                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
657                state.pending_statements = state.pending_statements.saturating_add(1);
658                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
659                state.first_pending_at.get_or_insert_with(Instant::now);
660                cond.notify_all();
661                Ok(())
662            }
663            DurabilityMode::WalDurableGrouped => {
664                let (state_lock, cond) = &*self.state;
665                let mut state = state_lock.lock();
666                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
667                state.pending_statements = state.pending_statements.saturating_add(1);
668                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
669                state.first_pending_at.get_or_insert_with(Instant::now);
670                cond.notify_all();
671
672                loop {
673                    if let Some(err) = state.last_error.clone() {
674                        return Err(io::Error::other(err));
675                    }
676                    if state.durable_lsn >= target_lsn {
677                        return Ok(());
678                    }
679                    // parking_lot::Condvar mutates the guard in place —
680                    // no LockResult to unwrap, no poisoning to fold.
681                    cond.wait(&mut state);
682                }
683            }
684        }
685    }
686
687    fn run_group_commit_loop(
688        wal: Arc<WalMutex>,
689        queue: Arc<WalAppendQueue>,
690        state: Arc<(CommitStateMutex, CommitStateCondvar)>,
691        fsync_count: Arc<AtomicU64>,
692        window: Duration,
693        max_statements: usize,
694        max_wal_bytes: u64,
695    ) {
696        let (state_lock, cond) = &*state;
697        loop {
698            let target_lsn = {
699                let mut guard = state_lock.lock();
700
701                while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
702                    cond.wait(&mut guard);
703                }
704
705                if guard.shutdown {
706                    return;
707                }
708
709                let immediate = window.is_zero()
710                    || guard.pending_statements >= max_statements
711                    || guard.pending_wal_bytes >= max_wal_bytes;
712
713                if !immediate {
714                    let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
715                    let now = Instant::now();
716                    if now < deadline {
717                        let timeout = deadline.saturating_duration_since(now);
718                        let _ = cond.wait_for(&mut guard, timeout);
719                        if guard.shutdown {
720                            return;
721                        }
722                        if guard.pending_target_lsn <= guard.durable_lsn {
723                            continue;
724                        }
725                        let should_wait_again = guard.pending_statements < max_statements
726                            && guard.pending_wal_bytes < max_wal_bytes
727                            && guard
728                                .first_pending_at
729                                .map(|first| first.elapsed() < window)
730                                .unwrap_or(false);
731                        if should_wait_again {
732                            continue;
733                        }
734                    }
735                }
736
737                guard.pending_target_lsn
738            };
739
740            // Drain all queued entries. Since `WalAppendQueue::enqueue`
741            // assigns LSN and pushes under a single mutex, the drained
742            // tuples are guaranteed to form a contiguous byte range
743            // starting at `wal.current_lsn()` — no gaps, no leftover
744            // handling.
745            let batches = queue.drain_sorted();
746
747            let sync_result = {
748                let mut wal = wal.lock();
749                let mut write_err: Option<io::Error> = None;
750                for (_lsn, bytes) in batches {
751                    if let Err(e) = wal.append_bytes(&bytes) {
752                        write_err = Some(e);
753                        break;
754                    }
755                }
756                match write_err {
757                    Some(e) => Err(e),
758                    None => wal.sync().map(|_| {
759                        // Count the fsync exactly once per drain
760                        // cycle so tests can compare it against the
761                        // number of `append_actions` callers that
762                        // entered the queue.
763                        fsync_count.fetch_add(1, Ordering::Relaxed);
764                        wal.durable_lsn()
765                    }),
766                }
767            };
768
769            // Late enqueuers that arrived after our drain stay in the
770            // queue — don't clear the `pending_*` counters yet and
771            // don't claim we reached `target_lsn` unless the fsync
772            // actually covers it.
773            let more_pending = queue.has_pending();
774
775            let mut guard = state_lock.lock();
776            match sync_result {
777                Ok(durable_lsn) => {
778                    guard.durable_lsn = durable_lsn;
779                    if !more_pending {
780                        guard.pending_statements = 0;
781                        guard.pending_wal_bytes = 0;
782                        guard.first_pending_at = None;
783                    }
784                    guard.last_error = None;
785                    let _ = target_lsn;
786                }
787                Err(err) => {
788                    guard.last_error = Some(err.to_string());
789                }
790            }
791            cond.notify_all();
792        }
793    }
794}
795
796impl Drop for StoreCommitCoordinator {
797    fn drop(&mut self) {
798        let (state_lock, cond) = &*self.state;
799        // parking_lot::Mutex::lock is infallible (no poisoning).
800        let mut state = state_lock.lock();
801        state.shutdown = true;
802        cond.notify_all();
803    }
804}
805
806impl UnifiedStore {
807    pub(crate) fn begin_deferred_store_wal_capture() {
808        begin_deferred_store_wal_capture();
809    }
810
811    pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
812        take_deferred_store_wal_capture()
813    }
814
815    pub(crate) fn append_deferred_store_wal_actions(
816        &self,
817        actions: DeferredStoreWalActions,
818    ) -> Result<(), StoreError> {
819        if actions.actions.is_empty() {
820            return Ok(());
821        }
822        if self.config.embedded_wal_path.is_some() {
823            return self.append_embedded_store_wal_actions(&actions.actions);
824        }
825        match self.config.durability_mode {
826            DurabilityMode::Strict => self.flush_paged_state(),
827            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
828                if let Some(commit) = &self.commit {
829                    commit
830                        .append_actions(&actions.actions)
831                        .map_err(StoreError::Io)
832                } else {
833                    self.flush_paged_state()
834                }
835            }
836        }
837    }
838
839    /// Emit a [`WalRecord::VectorInsert`] for a `vector.turbo`
840    /// collection (issue #693). Returns `Ok(())` when no commit
841    /// coordinator is configured (in-memory mode) so callers don't
842    /// have to special-case the missing WAL — the in-memory index
843    /// update remains durable enough for in-memory runtimes by
844    /// construction.
845    pub(crate) fn append_vector_insert_record(
846        &self,
847        collection: &str,
848        entity_id: u64,
849        vector: &[f32],
850    ) -> std::io::Result<()> {
851        let Some(commit) = &self.commit else {
852            return Ok(());
853        };
854        let record = WalRecord::VectorInsert {
855            collection: collection.to_string(),
856            entity_id,
857            vector: vector.to_vec(),
858        };
859        commit.append_single_record(record)
860    }
861
862    pub(crate) fn finish_paged_write(
863        &self,
864        actions: impl IntoIterator<Item = StoreWalAction>,
865    ) -> Result<(), StoreError> {
866        let actions: Vec<StoreWalAction> = actions.into_iter().collect();
867        if deferred_store_wal_capture_active() {
868            let captured = capture_deferred_store_wal_actions(actions);
869            debug_assert!(captured);
870            return Ok(());
871        }
872        if self.config.embedded_wal_path.is_some() {
873            return self.append_embedded_store_wal_actions(&actions);
874        }
875        match self.config.durability_mode {
876            DurabilityMode::Strict => self.flush_paged_state(),
877            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
878                if let Some(commit) = &self.commit {
879                    commit.append_actions(&actions).map_err(StoreError::Io)?;
880                    Ok(())
881                } else {
882                    self.flush_paged_state()
883                }
884            }
885        }
886    }
887
888    pub(crate) fn apply_encoded_store_wal_action(&self, payload: &[u8]) -> Result<(), StoreError> {
889        let action = StoreWalAction::decode(payload).map_err(StoreError::Io)?;
890        self.apply_replayed_action(&action)
891    }
892
893    fn append_embedded_store_wal_actions(
894        &self,
895        actions: &[StoreWalAction],
896    ) -> Result<(), StoreError> {
897        let Some(path) = self.config.embedded_wal_path.as_deref() else {
898            return Ok(());
899        };
900        if actions.is_empty() {
901            return Ok(());
902        }
903        let payloads: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
904        let encoded_len = crate::storage::EmbeddedRdbArtifact::wal_payloads_encoded_len(&payloads)
905            .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))?;
906        match crate::storage::EmbeddedRdbArtifact::append_wal_payloads(path, &payloads) {
907            Ok(_) => Ok(()),
908            Err(crate::api::RedDBError::InvalidOperation(msg))
909                if msg.contains("embedded wal region full") =>
910            {
911                let snapshot = self.to_binary_dump_bytes();
912                crate::storage::EmbeddedRdbArtifact::write_snapshot_with_wal_capacity(
913                    path,
914                    &snapshot,
915                    encoded_len,
916                )
917                .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))?;
918                crate::storage::EmbeddedRdbArtifact::append_wal_payloads(path, &payloads)
919                    .map(|_| ())
920                    .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))
921            }
922            Err(err) => Err(StoreError::Io(std::io::Error::other(err.to_string()))),
923        }
924    }
925
926    pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
927        match action {
928            StoreWalAction::CreateCollection { name } => {
929                if self.get_collection(name).is_none() {
930                    let _ = self.create_collection_in_memory(name);
931                }
932                Ok(())
933            }
934            StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
935            StoreWalAction::UpsertEntityRecord { collection, record } => {
936                self.apply_replayed_upsert(collection, record)
937            }
938            StoreWalAction::DeleteEntityRecord {
939                collection,
940                entity_id,
941            } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
942            StoreWalAction::BulkUpsertEntityRecords {
943                collection,
944                records,
945            } => {
946                for record in records {
947                    self.apply_replayed_upsert(collection, record)?;
948                }
949                Ok(())
950            }
951            StoreWalAction::RefreshCollection {
952                collection,
953                records,
954            } => self.apply_replayed_refresh_collection(collection, records),
955        }
956    }
957
958    /// Atomic full-collection replace — issue #595 slice 9c.
959    ///
960    /// Builds a fresh `SegmentManager` populated with `entities`,
961    /// atomically swaps it into the collections map, rebuilds the
962    /// paged B-tree from the same records, and emits a single
963    /// `RefreshCollection` WAL action.
964    ///
965    /// Concurrent readers that resolved an `Arc<SegmentManager>` for
966    /// this collection before the swap continue reading the prior
967    /// contents; readers after the swap see the new contents. The
968    /// transition is single-pointer so partial state is unobservable.
969    ///
970    /// A crash between in-memory mutation and WAL fsync leaves the
971    /// previous WAL stream intact — on recovery the prior
972    /// `RefreshCollection` (or whatever last bulk-applied to the
973    /// collection) is replayed, so the prior contents are observed.
974    pub fn refresh_collection(
975        &self,
976        name: &str,
977        entities: Vec<UnifiedEntity>,
978    ) -> Result<Vec<Vec<u8>>, StoreError> {
979        let fv = self.format_version();
980
981        // Build a fresh manager. Done outside the collections lock so
982        // the critical section is just the pointer swap below.
983        let new_manager = Arc::new(SegmentManager::with_config(
984            name,
985            self.config.manager_config.clone(),
986        ));
987
988        let mut prepared = entities;
989        for entity in &mut prepared {
990            if entity.id.raw() == 0 {
991                entity.id = self.next_entity_id();
992            } else {
993                self.register_entity_id(entity.id);
994            }
995            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
996                if *row_id == 0 {
997                    *row_id = new_manager.next_row_id();
998                } else {
999                    new_manager.register_row_id(*row_id);
1000                }
1001            }
1002            entity.ensure_table_logical_id();
1003        }
1004
1005        let serialized: Vec<Vec<u8>> = prepared
1006            .iter()
1007            .map(|e| Self::serialize_entity_record(e, None, fv))
1008            .collect();
1009
1010        new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1011            StoreError::Io(std::io::Error::other(format!(
1012                "refresh_collection: bulk_insert into new manager failed: {e}"
1013            )))
1014        })?;
1015
1016        self.swap_collection_state(name, new_manager, &prepared, &serialized);
1017
1018        self.finish_paged_write([StoreWalAction::RefreshCollection {
1019            collection: name.to_string(),
1020            records: serialized.clone(),
1021        }])?;
1022
1023        Ok(serialized)
1024    }
1025
1026    /// Atomic swap of the in-memory collection state. Used by both
1027    /// the live `refresh_collection` path and WAL replay. The new
1028    /// manager is already populated; this fn replaces the live Arc,
1029    /// purges side-state pointing at the previous contents, and
1030    /// rebuilds the paged B-tree from the supplied records.
1031    fn swap_collection_state(
1032        &self,
1033        name: &str,
1034        new_manager: Arc<SegmentManager>,
1035        prepared: &[UnifiedEntity],
1036        serialized: &[Vec<u8>],
1037    ) {
1038        {
1039            let mut collections = self.collections.write();
1040            collections.insert(name.to_string(), new_manager);
1041        }
1042
1043        self.entity_cache
1044            .retain(|_, (collection, _)| collection != name);
1045        self.remove_from_graph_label_index_batch(
1046            name,
1047            &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1048        );
1049
1050        if let Some(pager) = &self.pager {
1051            let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1052            let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1053                .iter()
1054                .zip(serialized.iter())
1055                .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1056                .collect();
1057            sorted.sort_by(|a, b| a.0.cmp(&b.0));
1058            if !sorted.is_empty() {
1059                let _ = new_btree.bulk_insert_sorted(&sorted);
1060            }
1061            self.btree_indices
1062                .write()
1063                .insert(name.to_string(), new_btree);
1064            self.mark_paged_registry_dirty();
1065        }
1066    }
1067
1068    fn apply_replayed_refresh_collection(
1069        &self,
1070        collection: &str,
1071        records: &[Vec<u8>],
1072    ) -> Result<(), StoreError> {
1073        let new_manager = Arc::new(SegmentManager::with_config(
1074            collection,
1075            self.config.manager_config.clone(),
1076        ));
1077
1078        let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1079        for record in records {
1080            let (entity, _metadata) =
1081                Self::deserialize_entity_record(record, self.format_version())?;
1082            self.register_entity_id(entity.id);
1083            if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1084                new_manager.register_row_id(*row_id);
1085            }
1086            prepared.push(entity);
1087        }
1088
1089        if !prepared.is_empty() {
1090            new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1091                StoreError::Io(std::io::Error::other(format!(
1092                    "replay refresh_collection: bulk_insert failed: {e}"
1093                )))
1094            })?;
1095        }
1096
1097        self.swap_collection_state(collection, new_manager, &prepared, records);
1098        Ok(())
1099    }
1100
1101    /// Replica-side analogue of [`Self::refresh_collection`] — issue
1102    /// #596 slice 9d. Takes pre-serialized record bytes from the
1103    /// primary's CDC stream (the same bytes the primary wrote into the
1104    /// `RefreshCollection` WAL action), applies the atomic swap, and
1105    /// emits the same `RefreshCollection` WAL action against the
1106    /// replica's local store WAL so the post-swap state survives a
1107    /// replica restart through the normal recovery path.
1108    ///
1109    /// Idempotency: re-applying the same records bytes is a full
1110    /// rebuild from the same payload, so the resulting backing-
1111    /// collection contents are equal to the prior call's result.
1112    /// The primary-side `LogicalChangeApplier` short-circuits exact
1113    /// duplicate LSN+payload combinations before reaching here.
1114    pub fn refresh_collection_from_records(
1115        &self,
1116        name: &str,
1117        records: Vec<Vec<u8>>,
1118    ) -> Result<(), StoreError> {
1119        self.apply_replayed_refresh_collection(name, &records)?;
1120        self.finish_paged_write([StoreWalAction::RefreshCollection {
1121            collection: name.to_string(),
1122            records,
1123        }])?;
1124        Ok(())
1125    }
1126
1127    pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1128        let mut collections = self.collections.write();
1129        if collections.contains_key(name) {
1130            return Ok(());
1131        }
1132        let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1133        collections.insert(name.to_string(), Arc::new(manager));
1134        self.mark_paged_registry_dirty();
1135        Ok(())
1136    }
1137
1138    fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1139        let manager = {
1140            let mut collections = self.collections.write();
1141            match collections.remove(name) {
1142                Some(manager) => manager,
1143                None => return Ok(()),
1144            }
1145        };
1146
1147        let entities = manager.query_all(|_| true);
1148        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1149        for entity_id in &entity_ids {
1150            self.context_index.remove_entity(*entity_id);
1151            let _ = self.unindex_cross_refs(*entity_id);
1152        }
1153        self.btree_indices.write().remove(name);
1154        self.entity_cache.retain(|entity_id, (collection, _)| {
1155            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1156        });
1157        self.remove_from_graph_label_index_batch(name, &entity_ids);
1158        self.mark_paged_registry_dirty();
1159        Ok(())
1160    }
1161
1162    fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1163        self.create_collection_in_memory(collection)?;
1164        let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1165        let manager = self
1166            .get_collection(collection)
1167            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1168
1169        self.register_entity_id(entity.id);
1170        if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1171            manager.register_row_id(*row_id);
1172        }
1173
1174        self.context_index.remove_entity(entity.id);
1175        let _ = self.unindex_cross_refs(entity.id);
1176        self.remove_from_graph_label_index(collection, entity.id);
1177
1178        if manager.get(entity.id).is_some() {
1179            manager
1180                .update_with_metadata(entity.clone(), metadata.as_ref())
1181                .map_err(StoreError::from)?;
1182        } else {
1183            manager.insert(entity.clone())?;
1184            if let Some(metadata) = metadata.as_ref() {
1185                manager.set_metadata(entity.id, metadata.clone())?;
1186            }
1187        }
1188
1189        self.context_index.index_entity(collection, &entity);
1190        if let EntityKind::GraphNode(node) = &entity.kind {
1191            self.update_graph_label_index(collection, &node.label, entity.id);
1192        }
1193        self.index_cross_refs(&entity, collection)?;
1194
1195        if let Some(pager) = &self.pager {
1196            let mut btree_indices = self.btree_indices.write();
1197            let btree = btree_indices
1198                .entry(collection.to_string())
1199                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1200            let root_before = btree.root_page_id();
1201            let key = entity.id.raw().to_be_bytes();
1202            match btree.insert(&key, record) {
1203                Ok(_) => {}
1204                Err(BTreeError::DuplicateKey) => {
1205                    let _ = btree.delete(&key);
1206                    let _ = btree.insert(&key, record);
1207                }
1208                Err(err) => {
1209                    return Err(StoreError::Io(io::Error::other(format!(
1210                        "replay upsert btree error: {err}"
1211                    ))));
1212                }
1213            }
1214            if root_before != btree.root_page_id() {
1215                self.mark_paged_registry_dirty();
1216            }
1217        }
1218
1219        Ok(())
1220    }
1221
1222    fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1223        self.entity_cache.remove(id.raw());
1224        if let Some(manager) = self.get_collection(collection) {
1225            let deleted = manager.delete(id)?;
1226            if !deleted {
1227                return Ok(());
1228            }
1229        } else {
1230            return Ok(());
1231        }
1232
1233        if let Some(_pager) = &self.pager {
1234            let btree_indices = self.btree_indices.read();
1235            if let Some(btree) = btree_indices.get(collection) {
1236                let root_before = btree.root_page_id();
1237                let key = id.raw().to_be_bytes();
1238                let _ = btree.delete(&key);
1239                if root_before != btree.root_page_id() {
1240                    self.mark_paged_registry_dirty();
1241                }
1242            }
1243        }
1244
1245        let _ = self.unindex_cross_refs(id);
1246        self.remove_from_graph_label_index(collection, id);
1247        self.context_index.remove_entity(id);
1248        Ok(())
1249    }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255    use crate::api::{DurabilityMode, GroupCommitOptions};
1256    use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1257    use std::time::SystemTime;
1258
1259    /// Serialise tests that mutate `REDDB_GROUP_COMMIT_WINDOW_US`.
1260    /// The env table is process-global, so two parallel test threads
1261    /// flipping it would race the assertions in the test that
1262    /// happens to read it last.
1263    fn env_lock() -> &'static StdMutex<()> {
1264        static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1265        LOCK.get_or_init(|| StdMutex::new(()))
1266    }
1267
1268    fn temp_wal(name: &str) -> PathBuf {
1269        let nanos = SystemTime::now()
1270            .duration_since(std::time::UNIX_EPOCH)
1271            .unwrap()
1272            .as_nanos();
1273        let path = reddb_file::layout::store_commit_coord_temp_wal_path(
1274            &std::env::temp_dir(),
1275            name,
1276            std::process::id(),
1277            nanos,
1278        );
1279        let _ = std::fs::remove_file(&path);
1280        path
1281    }
1282
1283    /// Concurrent autocommits MUST coalesce into far fewer fsyncs
1284    /// than the number of callers when the adaptive group-commit
1285    /// window is active. Without the 200µs floor, every caller
1286    /// would race the drain loop and pay an independent fsync.
1287    ///
1288    /// The test fires 32 concurrent `append_actions` calls and asserts
1289    /// the coordinator issued strictly fewer fsyncs than callers.
1290    /// (We don't pin to "one fsync" because timing on loaded CI hosts
1291    /// can split the burst across a couple of drain cycles — the win
1292    /// is the ratio, not the absolute count.)
1293    #[test]
1294    fn group_commit_coalesces_concurrent_autocommits() {
1295        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1296        // Make sure no stale env var skews this run — the test
1297        // exercises the default 200µs floor.
1298        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1299
1300        let path = temp_wal("coalesce");
1301        let coord = Arc::new(
1302            StoreCommitCoordinator::open(
1303                path.clone(),
1304                DurabilityMode::WalDurableGrouped,
1305                GroupCommitOptions::default(),
1306            )
1307            .expect("open commit coordinator"),
1308        );
1309
1310        const WRITERS: usize = 32;
1311        let barrier = Arc::new(Barrier::new(WRITERS));
1312        let mut handles = Vec::with_capacity(WRITERS);
1313        for tx in 0..WRITERS {
1314            let coord_c = Arc::clone(&coord);
1315            let barrier_c = Arc::clone(&barrier);
1316            handles.push(std::thread::spawn(move || {
1317                // Synchronise the start so every writer races the
1318                // drain loop together — this is the workload shape
1319                // the adaptive window is supposed to coalesce.
1320                barrier_c.wait();
1321                let action = StoreWalAction::CreateCollection {
1322                    name: format!("c{tx}"),
1323                };
1324                coord_c
1325                    .append_actions(std::slice::from_ref(&action))
1326                    .expect("append_actions");
1327            }));
1328        }
1329        for h in handles {
1330            h.join().expect("writer thread");
1331        }
1332
1333        let fsyncs = coord.fsync_count();
1334        assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1335        assert!(
1336            fsyncs < WRITERS as u64,
1337            "expected fsyncs ({fsyncs}) to be strictly less than \
1338             concurrent writers ({WRITERS}); coalescing failed"
1339        );
1340
1341        drop(coord);
1342        let _ = std::fs::remove_file(&path);
1343    }
1344
1345    /// Sanity check: with the env override forcing a zero window,
1346    /// fsync coalescing degrades — callers race and we approach
1347    /// one fsync per caller. This proves the window is the actual
1348    /// knob doing the work above (i.e. the test isn't passing for
1349    /// some unrelated reason like buffered-IO coalescing).
1350    #[test]
1351    fn zero_window_disables_coalescing_floor() {
1352        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1353        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1354
1355        let path = temp_wal("zero_window");
1356        let coord = Arc::new(
1357            StoreCommitCoordinator::open(
1358                path.clone(),
1359                DurabilityMode::WalDurableGrouped,
1360                GroupCommitOptions::default(),
1361            )
1362            .expect("open commit coordinator"),
1363        );
1364
1365        const WRITERS: usize = 8;
1366        let barrier = Arc::new(Barrier::new(WRITERS));
1367        let mut handles = Vec::with_capacity(WRITERS);
1368        for tx in 0..WRITERS {
1369            let coord_c = Arc::clone(&coord);
1370            let barrier_c = Arc::clone(&barrier);
1371            handles.push(std::thread::spawn(move || {
1372                barrier_c.wait();
1373                let action = StoreWalAction::CreateCollection {
1374                    name: format!("z{tx}"),
1375                };
1376                coord_c
1377                    .append_actions(std::slice::from_ref(&action))
1378                    .expect("append_actions");
1379            }));
1380        }
1381        for h in handles {
1382            h.join().expect("writer thread");
1383        }
1384
1385        // With zero window, fsync count is bounded above by WRITERS
1386        // (every caller might trigger its own drain) and below by 1
1387        // (the queue may still naturally batch under contention).
1388        // The point of the assertion is to confirm the env override
1389        // is wired through — the open() above used the env knob.
1390        let fsyncs = coord.fsync_count();
1391        assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1392
1393        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1394        drop(coord);
1395        let _ = std::fs::remove_file(&path);
1396    }
1397
1398    /// `resolve_window` precedence: env > config.window_ms > default.
1399    #[test]
1400    fn resolve_window_precedence() {
1401        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1402        // Default: window_ms=0 → adaptive 200µs floor.
1403        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1404        let cfg = GroupCommitOptions::default();
1405        assert_eq!(
1406            StoreCommitCoordinator::resolve_window(&cfg),
1407            Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1408        );
1409
1410        // Explicit ms config wins over the default floor.
1411        let cfg_ms = GroupCommitOptions {
1412            window_ms: 5,
1413            ..GroupCommitOptions::default()
1414        };
1415        assert_eq!(
1416            StoreCommitCoordinator::resolve_window(&cfg_ms),
1417            Duration::from_millis(5)
1418        );
1419
1420        // Env override wins over both.
1421        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1422        assert_eq!(
1423            StoreCommitCoordinator::resolve_window(&cfg),
1424            Duration::from_micros(750)
1425        );
1426        assert_eq!(
1427            StoreCommitCoordinator::resolve_window(&cfg_ms),
1428            Duration::from_micros(750)
1429        );
1430
1431        // Env=0 explicitly disables the floor.
1432        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1433        assert_eq!(
1434            StoreCommitCoordinator::resolve_window(&cfg),
1435            Duration::from_micros(0)
1436        );
1437
1438        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1439    }
1440}