Skip to main content

reddb_server/storage/unified/store/
commit.rs

1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10/// Adaptive group-commit window applied when the configured
11/// `GroupCommitOptions::window_ms` is 0 (the historical default).
12///
13/// Sub-millisecond so individual single-writer commits don't see a
14/// visible latency penalty (a 200 µs floor sits below typical NVMe
15/// fsync latency of 50–150 µs by ~one fsync), while still giving a
16/// pipelined writer a chance to drop a second statement into the
17/// same drain cycle. A lone synchronous `insert_one` still pays one
18/// fsync per row in the worst case, but two back-to-back inserts on
19/// the same connection now coalesce into one drain. See P1 in
20/// `docs/perf/insert_sequential-2026-05-05.md`.
21///
22/// Override via `REDDB_GROUP_COMMIT_WINDOW_US` (microseconds). Set
23/// to `0` to disable the floor entirely (legacy behaviour). Set
24/// `REDDB_GROUP_COMMIT_WINDOW_MS` to any non-zero value to bypass
25/// this floor — explicit ms config wins.
26const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28/// Shorthand — `Arc<parking_lot::Mutex<WalWriter>>` avoids poisoning
29/// (one writer panicking mid-append used to taint every subsequent
30/// lock acquisition) and shaves a few syscalls off the fast path.
31/// The group-commit coordinator + writer threads all acquire this
32/// mutex in the hot insert path, so the unpoison/fast-park win
33/// compounds under 16-way concurrency.
34type WalMutex = parking_lot::Mutex<WalWriter>;
35
36/// Shorthand for the group-commit coordinator's state pair. Same
37/// non-poisoning + lighter-park motivation as `WalMutex`; writer
38/// threads `wait` on this condvar until the coordinator publishes a
39/// new `durable_lsn`, so the park cost shows up on every
40/// WalDurableGrouped transaction.
41type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47const STORE_WAL_VERSION: u8 = 1;
48
49#[derive(Debug, Clone)]
50pub(crate) enum StoreWalAction {
51    CreateCollection {
52        name: String,
53    },
54    DropCollection {
55        name: String,
56    },
57    UpsertEntityRecord {
58        collection: String,
59        record: Vec<u8>,
60    },
61    DeleteEntityRecord {
62        collection: String,
63        entity_id: u64,
64    },
65    /// Batched upsert — one WAL action carrying N serialized entity
66    /// records for the same collection. Saves the per-row Begin/
67    /// PageWrite/Commit framing overhead on the bulk insert hot path.
68    /// Replay applies every contained record in order.
69    BulkUpsertEntityRecords {
70        collection: String,
71        records: Vec<Vec<u8>>,
72    },
73    /// Atomic full-collection replace — issue #595 slice 9c.
74    /// Replay drops the in-memory collection state and rebuilds it
75    /// from the contained records. Used by REFRESH MATERIALIZED VIEW
76    /// so a concurrent reader sees either the prior contents or the
77    /// new contents, never a partial state, and a crash mid-refresh
78    /// leaves the prior contents intact on recovery (the action is
79    /// only durable once the WAL commit lands).
80    RefreshCollection {
81        collection: String,
82        records: Vec<Vec<u8>>,
83    },
84}
85
86#[derive(Debug, Default)]
87pub(crate) struct DeferredStoreWalActions {
88    actions: Vec<StoreWalAction>,
89}
90
91impl DeferredStoreWalActions {
92    pub(crate) fn is_empty(&self) -> bool {
93        self.actions.is_empty()
94    }
95
96    pub(crate) fn extend(&mut self, other: Self) {
97        self.actions.extend(other.actions);
98    }
99}
100
101thread_local! {
102    static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
103        const { RefCell::new(None) };
104}
105
106fn begin_deferred_store_wal_capture() {
107    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
108        let mut guard = cell.borrow_mut();
109        debug_assert!(guard.is_none());
110        *guard = Some(Vec::new());
111    });
112}
113
114fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
115    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
116        let mut guard = cell.borrow_mut();
117        if let Some(pending) = guard.as_mut() {
118            pending.extend(actions);
119            true
120        } else {
121            false
122        }
123    })
124}
125
126fn deferred_store_wal_capture_active() -> bool {
127    DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
128}
129
130fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
131    DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
132        actions: cell.borrow_mut().take().unwrap_or_default(),
133    })
134}
135
136impl StoreWalAction {
137    pub(crate) fn upsert_entity(
138        collection: &str,
139        entity: &UnifiedEntity,
140        metadata: Option<&Metadata>,
141        format_version: u32,
142    ) -> Self {
143        Self::UpsertEntityRecord {
144            collection: collection.to_string(),
145            record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
146        }
147    }
148
149    fn encode(&self) -> Vec<u8> {
150        let mut out = Vec::new();
151        out.push(STORE_WAL_VERSION);
152        match self {
153            Self::CreateCollection { name } => {
154                out.push(1);
155                write_string(&mut out, name);
156            }
157            Self::DropCollection { name } => {
158                out.push(2);
159                write_string(&mut out, name);
160            }
161            Self::UpsertEntityRecord { collection, record } => {
162                out.push(3);
163                write_string(&mut out, collection);
164                write_bytes(&mut out, record);
165            }
166            Self::DeleteEntityRecord {
167                collection,
168                entity_id,
169            } => {
170                out.push(4);
171                write_string(&mut out, collection);
172                out.extend_from_slice(&entity_id.to_le_bytes());
173            }
174            Self::BulkUpsertEntityRecords {
175                collection,
176                records,
177            } => {
178                out.push(5);
179                write_string(&mut out, collection);
180                out.extend_from_slice(&(records.len() as u32).to_le_bytes());
181                for record in records {
182                    write_bytes(&mut out, record);
183                }
184            }
185            Self::RefreshCollection {
186                collection,
187                records,
188            } => {
189                out.push(6);
190                write_string(&mut out, collection);
191                out.extend_from_slice(&(records.len() as u32).to_le_bytes());
192                for record in records {
193                    write_bytes(&mut out, record);
194                }
195            }
196        }
197        out
198    }
199
200    fn decode(bytes: &[u8]) -> io::Result<Self> {
201        if bytes.len() < 2 {
202            return Err(io::Error::new(
203                io::ErrorKind::InvalidData,
204                "store wal action too short",
205            ));
206        }
207        if bytes[0] != STORE_WAL_VERSION {
208            return Err(io::Error::new(
209                io::ErrorKind::InvalidData,
210                format!("unsupported store wal version: {}", bytes[0]),
211            ));
212        }
213
214        let mut pos = 2usize;
215        match bytes[1] {
216            1 => Ok(Self::CreateCollection {
217                name: read_string(bytes, &mut pos)?,
218            }),
219            2 => Ok(Self::DropCollection {
220                name: read_string(bytes, &mut pos)?,
221            }),
222            3 => Ok(Self::UpsertEntityRecord {
223                collection: read_string(bytes, &mut pos)?,
224                record: read_bytes(bytes, &mut pos)?,
225            }),
226            4 => {
227                let collection = read_string(bytes, &mut pos)?;
228                let entity_id = read_u64(bytes, &mut pos)?;
229                Ok(Self::DeleteEntityRecord {
230                    collection,
231                    entity_id,
232                })
233            }
234            5 => {
235                let collection = read_string(bytes, &mut pos)?;
236                if pos + 4 > bytes.len() {
237                    return Err(io::Error::new(
238                        io::ErrorKind::InvalidData,
239                        "bulk upsert wal action: missing record count",
240                    ));
241                }
242                let count = u32::from_le_bytes([
243                    bytes[pos],
244                    bytes[pos + 1],
245                    bytes[pos + 2],
246                    bytes[pos + 3],
247                ]) as usize;
248                pos += 4;
249                let mut records = Vec::with_capacity(count);
250                for _ in 0..count {
251                    records.push(read_bytes(bytes, &mut pos)?);
252                }
253                Ok(Self::BulkUpsertEntityRecords {
254                    collection,
255                    records,
256                })
257            }
258            6 => {
259                let collection = read_string(bytes, &mut pos)?;
260                if pos + 4 > bytes.len() {
261                    return Err(io::Error::new(
262                        io::ErrorKind::InvalidData,
263                        "refresh collection wal action: missing record count",
264                    ));
265                }
266                let count = u32::from_le_bytes([
267                    bytes[pos],
268                    bytes[pos + 1],
269                    bytes[pos + 2],
270                    bytes[pos + 3],
271                ]) as usize;
272                pos += 4;
273                let mut records = Vec::with_capacity(count);
274                for _ in 0..count {
275                    records.push(read_bytes(bytes, &mut pos)?);
276                }
277                Ok(Self::RefreshCollection {
278                    collection,
279                    records,
280                })
281            }
282            other => Err(io::Error::new(
283                io::ErrorKind::InvalidData,
284                format!("unsupported store wal action tag: {other}"),
285            )),
286        }
287    }
288}
289
290#[derive(Debug)]
291struct CommitState {
292    durable_lsn: u64,
293    pending_target_lsn: u64,
294    pending_statements: usize,
295    pending_wal_bytes: u64,
296    first_pending_at: Option<Instant>,
297    shutdown: bool,
298    last_error: Option<String>,
299}
300
301impl CommitState {
302    fn new(initial_durable_lsn: u64) -> Self {
303        Self {
304            durable_lsn: initial_durable_lsn,
305            pending_target_lsn: initial_durable_lsn,
306            pending_statements: 0,
307            pending_wal_bytes: 0,
308            first_pending_at: None,
309            shutdown: false,
310            last_error: None,
311        }
312    }
313}
314
315/// Lock-free append queue sitting in front of `WalWriter`.
316///
317/// Writers atomically reserve a byte range via `next_lsn.fetch_add`
318/// and push their encoded bytes into a parking_lot-guarded vector.
319/// The group-commit coordinator is the sole drainer: it sorts the
320/// pending entries by LSN (so the file bytes land at the offsets
321/// each writer reserved) and hands them to `WalWriter::append_bytes`.
322///
323/// This replaces the old `wal.lock() ... append ... append ... drop`
324/// hot path where 16 concurrent writers serialised on the WAL
325/// mutex for ~13µs each. Hold time on the queue's parking_lot
326/// mutex is ~200ns (just a Vec::push) — 65× shorter, so the mutex
327/// convoy on concurrent inserts disappears.
328pub(crate) struct WalAppendQueue {
329    /// Tuple of (monotonically-increasing LSN, pending-bytes vec)
330    /// protected by one mutex. Keeping the LSN reservation AND the
331    /// push under the same lock guarantees that every queue entry
332    /// is visible the moment its LSN is assigned — no gap between
333    /// `fetch_add` and `push` for the leader to spin on.
334    ///
335    /// Earlier versions reserved LSN with an `AtomicU64::fetch_add`
336    /// outside the lock; the leader drain observed reordered
337    /// `(lsn, bytes)` tuples whose pushes happened in a different
338    /// order than their fetch_add, creating "holes" in the LSN
339    /// sequence that the drain loop interpreted as "wait for the
340    /// missing enqueuer" — under tokio scheduling pressure the
341    /// missing enqueuer was preempted indefinitely and the
342    /// drain loop busy-waited forever (WAL stayed at 8 bytes).
343    pending: parking_lot::Mutex<WalQueueState>,
344}
345
346struct WalQueueState {
347    next_lsn: u64,
348    entries: Vec<(u64, Vec<u8>)>,
349}
350
351impl WalAppendQueue {
352    fn new(initial_lsn: u64) -> Self {
353        Self {
354            pending: parking_lot::Mutex::new(WalQueueState {
355                next_lsn: initial_lsn,
356                entries: Vec::with_capacity(64),
357            }),
358        }
359    }
360
361    /// Reserve an LSN range of `bytes.len()` bytes and push onto the
362    /// queue. Returns the commit LSN (end of reserved range), which
363    /// the caller passes to `wait_until_durable`. LSN assignment and
364    /// push happen under the same mutex — no gap for the drain loop
365    /// to busy-spin on.
366    fn enqueue(&self, bytes: Vec<u8>) -> u64 {
367        let len = bytes.len() as u64;
368        let mut state = self.pending.lock();
369        let start_lsn = state.next_lsn;
370        state.next_lsn = start_lsn + len;
371        state.entries.push((start_lsn, bytes));
372        start_lsn + len
373    }
374
375    /// Drain all queued entries in LSN order. Caller holds the WAL
376    /// file mutex while writing the drained bytes so the on-disk
377    /// layout matches the reserved LSN offsets.
378    fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
379        let mut state = self.pending.lock();
380        let mut v = std::mem::take(&mut state.entries);
381        drop(state);
382        v.sort_by_key(|(lsn, _)| *lsn);
383        v
384    }
385
386    /// Whether any entry is queued. Leader uses this to decide
387    /// whether to spin once more or go back to the condvar.
388    fn has_pending(&self) -> bool {
389        !self.pending.lock().entries.is_empty()
390    }
391
392    /// Reset the LSN cursor and discard any queued entries. Used
393    /// after `wal.truncate()` — the wal-side byte counter goes back
394    /// to the header size, so the queue (which tracks LSNs in the
395    /// same byte space) must follow or every subsequent enqueue
396    /// returns a target the drain loop can never reach.
397    fn reset(&self, next_lsn: u64) {
398        let mut state = self.pending.lock();
399        state.next_lsn = next_lsn;
400        state.entries.clear();
401    }
402}
403
404pub(crate) struct StoreCommitCoordinator {
405    mode: DurabilityMode,
406    config: crate::api::GroupCommitOptions,
407    wal_path: PathBuf,
408    wal: Arc<WalMutex>,
409    /// Lock-free front door for writers. Populated alongside
410    /// `WalDurableGrouped` / `Async` modes so concurrent inserts
411    /// never contend on `wal` for the append step. Strict mode
412    /// bypasses the queue and calls `WalWriter::append` directly
413    /// to preserve the one-fsync-per-commit semantic.
414    queue: Arc<WalAppendQueue>,
415    state: Arc<(CommitStateMutex, CommitStateCondvar)>,
416    /// Number of `wal.sync()` calls issued by the group-commit
417    /// drain loop. Used by tests to observe coalescing — a burst
418    /// of N concurrent commits should bump this by far less than N
419    /// when the adaptive window is doing its job. Strict-mode
420    /// commits go through `force_sync` which also bumps this.
421    fsync_count: Arc<AtomicU64>,
422}
423
424impl StoreCommitCoordinator {
425    pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
426        matches!(
427            mode,
428            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
429        ) || path.exists()
430    }
431
432    pub(crate) fn open(
433        wal_path: impl Into<PathBuf>,
434        mode: DurabilityMode,
435        config: crate::api::GroupCommitOptions,
436    ) -> io::Result<Self> {
437        let wal_path = wal_path.into();
438        let wal = WalWriter::open(&wal_path)?;
439        let initial_durable_lsn = wal.durable_lsn();
440        let initial_current_lsn = wal.current_lsn();
441        let wal = Arc::new(WalMutex::new(wal));
442        let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
443        let state = Arc::new((
444            CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
445            CommitStateCondvar::new(),
446        ));
447        let fsync_count = Arc::new(AtomicU64::new(0));
448
449        if matches!(
450            mode,
451            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
452        ) {
453            let wal_bg = Arc::clone(&wal);
454            let queue_bg = Arc::clone(&queue);
455            let state_bg = Arc::clone(&state);
456            let fsync_bg = Arc::clone(&fsync_count);
457            // P1: adaptive group-commit window. Historical default is
458            // `window_ms = 0` ("no wait"), which under single-writer
459            // OLTP means one fsync per autocommit row — the throughput
460            // floor. When window_ms is 0 we fall back to a small
461            // microsecond floor (`DEFAULT_ADAPTIVE_WINDOW_US`, override
462            // via `REDDB_GROUP_COMMIT_WINDOW_US`) so a pipelined writer
463            // can drop a second statement into the same drain cycle.
464            // Explicit non-zero `window_ms` config takes precedence.
465            //
466            // The loop already short-circuits on
467            // `pending_statements >= max_statements` /
468            // `pending_wal_bytes >= max_wal_bytes`, so the window only
469            // delays the leader when the queue is otherwise empty —
470            // exactly the case we want to coalesce.
471            let window = Self::resolve_window(&config);
472            let max_statements = config.max_statements.max(1);
473            let max_wal_bytes = config.max_wal_bytes.max(1);
474            std::thread::spawn(move || {
475                Self::run_group_commit_loop(
476                    wal_bg,
477                    queue_bg,
478                    state_bg,
479                    fsync_bg,
480                    window,
481                    max_statements,
482                    max_wal_bytes,
483                );
484            });
485        }
486
487        Ok(Self {
488            mode,
489            config,
490            wal_path,
491            wal,
492            queue,
493            state,
494            fsync_count,
495        })
496    }
497
498    /// Resolve the effective group-commit window from the configured
499    /// options + the `REDDB_GROUP_COMMIT_WINDOW_US` env override.
500    ///
501    /// Precedence (highest first):
502    ///   1. `REDDB_GROUP_COMMIT_WINDOW_US=N` — N µs (0 disables).
503    ///   2. `config.window_ms != 0` — explicit ms config wins.
504    ///   3. `DEFAULT_ADAPTIVE_WINDOW_US` — adaptive floor for the
505    ///      historical zero-default. Set the env var to 0 to opt out.
506    fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
507        if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
508            if let Ok(parsed) = raw.parse::<u64>() {
509                return Duration::from_micros(parsed);
510            }
511        }
512        if config.window_ms != 0 {
513            return Duration::from_millis(config.window_ms);
514        }
515        Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
516    }
517
518    /// Total `wal.sync()` calls issued since this coordinator opened.
519    /// Public for tests that want to observe fsync coalescing under
520    /// concurrent autocommits.
521    #[cfg(test)]
522    pub(crate) fn fsync_count(&self) -> u64 {
523        self.fsync_count.load(Ordering::Relaxed)
524    }
525
526    pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
527        if actions.is_empty() {
528            return Ok(());
529        }
530
531        let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
532
533        // Strict mode: bypass the queue, write + fsync inline. Strict
534        // commits are exactly one-fsync-per-call by contract, so the
535        // coalescing win of the queue doesn't apply and we'd pay an
536        // extra hop through the drain loop.
537        if matches!(self.mode, DurabilityMode::Strict) {
538            let commit_lsn = {
539                let mut wal = self.wal.lock();
540                wal.append(&WalRecord::TxCommitBatch {
541                    tx_id,
542                    actions: actions.iter().map(StoreWalAction::encode).collect(),
543                })?;
544                wal.current_lsn()
545            };
546            self.force_sync()?;
547            let _ = commit_lsn;
548            return Ok(());
549        }
550
551        // Grouped / Async path — lock-free enqueue. Encode every
552        // WalRecord into one contiguous byte blob OUTSIDE any lock,
553        // then hand it to the queue with a single fetch_add+push.
554        let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
555        let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
556            total.saturating_add(payload.len() as u64)
557        });
558        let blob = WalRecord::TxCommitBatch {
559            tx_id,
560            actions: encoded_actions,
561        }
562        .encode();
563
564        let commit_lsn = self.queue.enqueue(blob);
565        self.wait_until_durable(commit_lsn, wal_bytes)?;
566        Ok(())
567    }
568
569    pub(crate) fn force_sync(&self) -> io::Result<()> {
570        {
571            let mut wal = self.wal.lock();
572            wal.sync()?;
573            self.fsync_count.fetch_add(1, Ordering::Relaxed);
574            let durable = wal.durable_lsn();
575            drop(wal);
576            let (state_lock, cond) = &*self.state;
577            let mut state = state_lock.lock();
578            state.durable_lsn = durable;
579            state.pending_target_lsn = durable.max(state.pending_target_lsn);
580            state.pending_statements = 0;
581            state.pending_wal_bytes = 0;
582            state.first_pending_at = None;
583            state.last_error = None;
584            cond.notify_all();
585        }
586        Ok(())
587    }
588
589    pub(crate) fn truncate(&self) -> io::Result<()> {
590        let mut wal = self.wal.lock();
591        wal.truncate()?;
592        let durable = wal.durable_lsn();
593        let current = wal.current_lsn();
594        drop(wal);
595
596        // Queue's next_lsn tracks byte offsets in the same space as
597        // wal.current_lsn. After truncate both must be reset together
598        // — otherwise enqueue returns a target_lsn in the old range
599        // that drain can never reach, and wait_until_durable hangs.
600        self.queue.reset(current);
601
602        let (state_lock, cond) = &*self.state;
603        let mut state = state_lock.lock();
604        state.durable_lsn = durable;
605        state.pending_target_lsn = durable;
606        state.pending_statements = 0;
607        state.pending_wal_bytes = 0;
608        state.first_pending_at = None;
609        state.last_error = None;
610        cond.notify_all();
611        Ok(())
612    }
613
614    pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
615        if !self.wal_path.exists() {
616            return Ok(());
617        }
618
619        let reader = match WalReader::open(&self.wal_path) {
620            Ok(reader) => reader,
621            Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
622            Err(err) => return Err(err),
623        };
624
625        let mut tx_states = std::collections::HashMap::<u64, bool>::new();
626        let mut pending = Vec::<(u64, Vec<u8>)>::new();
627
628        for record in reader.iter() {
629            let (_lsn, record) = match record {
630                Ok(record) => record,
631                Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
632                Err(err) => return Err(err),
633            };
634            match record {
635                WalRecord::TxCommitBatch { actions, .. } => {
636                    for payload in actions {
637                        let action = StoreWalAction::decode(&payload)?;
638                        store.apply_replayed_action(&action).map_err(|err| {
639                            io::Error::other(format!("failed to replay store wal action: {err}"))
640                        })?;
641                    }
642                }
643                WalRecord::Begin { tx_id } => {
644                    tx_states.insert(tx_id, false);
645                }
646                WalRecord::Commit { tx_id } => {
647                    tx_states.insert(tx_id, true);
648                }
649                WalRecord::Rollback { tx_id } => {
650                    tx_states.remove(&tx_id);
651                }
652                WalRecord::PageWrite {
653                    tx_id,
654                    page_id: _,
655                    data,
656                } => pending.push((tx_id, data)),
657                WalRecord::Checkpoint { .. } => {}
658                WalRecord::FullPageImage { .. } => {
659                    // Pager-level FPI (gh-478); store replay ignores.
660                }
661            }
662        }
663
664        for (tx_id, payload) in pending {
665            if !tx_states.get(&tx_id).copied().unwrap_or(false) {
666                continue;
667            }
668            let action = StoreWalAction::decode(&payload)?;
669            store.apply_replayed_action(&action).map_err(|err| {
670                io::Error::other(format!("failed to replay store wal action: {err}"))
671            })?;
672        }
673
674        Ok(())
675    }
676
677    fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
678        match self.mode {
679            DurabilityMode::Strict => self.force_sync(),
680            // Async: record the pending target so the background
681            // flusher eventually covers it, but don't block the
682            // caller. Matches PG `synchronous_commit=off` semantics —
683            // crash inside the flush window loses unflushed commits.
684            DurabilityMode::Async => {
685                let (state_lock, cond) = &*self.state;
686                let mut state = state_lock.lock();
687                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
688                state.pending_statements = state.pending_statements.saturating_add(1);
689                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
690                state.first_pending_at.get_or_insert_with(Instant::now);
691                cond.notify_all();
692                Ok(())
693            }
694            DurabilityMode::WalDurableGrouped => {
695                let (state_lock, cond) = &*self.state;
696                let mut state = state_lock.lock();
697                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
698                state.pending_statements = state.pending_statements.saturating_add(1);
699                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
700                state.first_pending_at.get_or_insert_with(Instant::now);
701                cond.notify_all();
702
703                loop {
704                    if let Some(err) = state.last_error.clone() {
705                        return Err(io::Error::other(err));
706                    }
707                    if state.durable_lsn >= target_lsn {
708                        return Ok(());
709                    }
710                    // parking_lot::Condvar mutates the guard in place —
711                    // no LockResult to unwrap, no poisoning to fold.
712                    cond.wait(&mut state);
713                }
714            }
715        }
716    }
717
718    fn run_group_commit_loop(
719        wal: Arc<WalMutex>,
720        queue: Arc<WalAppendQueue>,
721        state: Arc<(CommitStateMutex, CommitStateCondvar)>,
722        fsync_count: Arc<AtomicU64>,
723        window: Duration,
724        max_statements: usize,
725        max_wal_bytes: u64,
726    ) {
727        let (state_lock, cond) = &*state;
728        loop {
729            let target_lsn = {
730                let mut guard = state_lock.lock();
731
732                while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
733                    cond.wait(&mut guard);
734                }
735
736                if guard.shutdown {
737                    return;
738                }
739
740                let immediate = window.is_zero()
741                    || guard.pending_statements >= max_statements
742                    || guard.pending_wal_bytes >= max_wal_bytes;
743
744                if !immediate {
745                    let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
746                    let now = Instant::now();
747                    if now < deadline {
748                        let timeout = deadline.saturating_duration_since(now);
749                        let _ = cond.wait_for(&mut guard, timeout);
750                        if guard.shutdown {
751                            return;
752                        }
753                        if guard.pending_target_lsn <= guard.durable_lsn {
754                            continue;
755                        }
756                        let should_wait_again = guard.pending_statements < max_statements
757                            && guard.pending_wal_bytes < max_wal_bytes
758                            && guard
759                                .first_pending_at
760                                .map(|first| first.elapsed() < window)
761                                .unwrap_or(false);
762                        if should_wait_again {
763                            continue;
764                        }
765                    }
766                }
767
768                guard.pending_target_lsn
769            };
770
771            // Drain all queued entries. Since `WalAppendQueue::enqueue`
772            // assigns LSN and pushes under a single mutex, the drained
773            // tuples are guaranteed to form a contiguous byte range
774            // starting at `wal.current_lsn()` — no gaps, no leftover
775            // handling.
776            let batches = queue.drain_sorted();
777
778            let sync_result = {
779                let mut wal = wal.lock();
780                let mut write_err: Option<io::Error> = None;
781                for (_lsn, bytes) in batches {
782                    if let Err(e) = wal.append_bytes(&bytes) {
783                        write_err = Some(e);
784                        break;
785                    }
786                }
787                match write_err {
788                    Some(e) => Err(e),
789                    None => wal.sync().map(|_| {
790                        // Count the fsync exactly once per drain
791                        // cycle so tests can compare it against the
792                        // number of `append_actions` callers that
793                        // entered the queue.
794                        fsync_count.fetch_add(1, Ordering::Relaxed);
795                        wal.durable_lsn()
796                    }),
797                }
798            };
799
800            // Late enqueuers that arrived after our drain stay in the
801            // queue — don't clear the `pending_*` counters yet and
802            // don't claim we reached `target_lsn` unless the fsync
803            // actually covers it.
804            let more_pending = queue.has_pending();
805
806            let mut guard = state_lock.lock();
807            match sync_result {
808                Ok(durable_lsn) => {
809                    guard.durable_lsn = durable_lsn;
810                    if !more_pending {
811                        guard.pending_statements = 0;
812                        guard.pending_wal_bytes = 0;
813                        guard.first_pending_at = None;
814                    }
815                    guard.last_error = None;
816                    let _ = target_lsn;
817                }
818                Err(err) => {
819                    guard.last_error = Some(err.to_string());
820                }
821            }
822            cond.notify_all();
823        }
824    }
825}
826
827impl Drop for StoreCommitCoordinator {
828    fn drop(&mut self) {
829        let (state_lock, cond) = &*self.state;
830        // parking_lot::Mutex::lock is infallible (no poisoning).
831        let mut state = state_lock.lock();
832        state.shutdown = true;
833        cond.notify_all();
834    }
835}
836
837impl UnifiedStore {
838    pub(crate) fn begin_deferred_store_wal_capture() {
839        begin_deferred_store_wal_capture();
840    }
841
842    pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
843        take_deferred_store_wal_capture()
844    }
845
846    pub(crate) fn append_deferred_store_wal_actions(
847        &self,
848        actions: DeferredStoreWalActions,
849    ) -> Result<(), StoreError> {
850        if actions.actions.is_empty() {
851            return Ok(());
852        }
853        match self.config.durability_mode {
854            DurabilityMode::Strict => self.flush_paged_state(),
855            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
856                if let Some(commit) = &self.commit {
857                    commit
858                        .append_actions(&actions.actions)
859                        .map_err(StoreError::Io)
860                } else {
861                    self.flush_paged_state()
862                }
863            }
864        }
865    }
866
867    pub(crate) fn wal_path_for_db(path: &Path) -> PathBuf {
868        path.with_extension("rdb-uwal")
869    }
870
871    pub(crate) fn finish_paged_write(
872        &self,
873        actions: impl IntoIterator<Item = StoreWalAction>,
874    ) -> Result<(), StoreError> {
875        let actions: Vec<StoreWalAction> = actions.into_iter().collect();
876        if deferred_store_wal_capture_active() {
877            let captured = capture_deferred_store_wal_actions(actions);
878            debug_assert!(captured);
879            return Ok(());
880        }
881        match self.config.durability_mode {
882            DurabilityMode::Strict => self.flush_paged_state(),
883            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
884                if let Some(commit) = &self.commit {
885                    commit.append_actions(&actions).map_err(StoreError::Io)?;
886                    Ok(())
887                } else {
888                    self.flush_paged_state()
889                }
890            }
891        }
892    }
893
894    pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
895        match action {
896            StoreWalAction::CreateCollection { name } => {
897                if self.get_collection(name).is_none() {
898                    let _ = self.create_collection_in_memory(name);
899                }
900                Ok(())
901            }
902            StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
903            StoreWalAction::UpsertEntityRecord { collection, record } => {
904                self.apply_replayed_upsert(collection, record)
905            }
906            StoreWalAction::DeleteEntityRecord {
907                collection,
908                entity_id,
909            } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
910            StoreWalAction::BulkUpsertEntityRecords {
911                collection,
912                records,
913            } => {
914                for record in records {
915                    self.apply_replayed_upsert(collection, record)?;
916                }
917                Ok(())
918            }
919            StoreWalAction::RefreshCollection {
920                collection,
921                records,
922            } => self.apply_replayed_refresh_collection(collection, records),
923        }
924    }
925
926    /// Atomic full-collection replace — issue #595 slice 9c.
927    ///
928    /// Builds a fresh `SegmentManager` populated with `entities`,
929    /// atomically swaps it into the collections map, rebuilds the
930    /// paged B-tree from the same records, and emits a single
931    /// `RefreshCollection` WAL action.
932    ///
933    /// Concurrent readers that resolved an `Arc<SegmentManager>` for
934    /// this collection before the swap continue reading the prior
935    /// contents; readers after the swap see the new contents. The
936    /// transition is single-pointer so partial state is unobservable.
937    ///
938    /// A crash between in-memory mutation and WAL fsync leaves the
939    /// previous WAL stream intact — on recovery the prior
940    /// `RefreshCollection` (or whatever last bulk-applied to the
941    /// collection) is replayed, so the prior contents are observed.
942    pub fn refresh_collection(
943        &self,
944        name: &str,
945        entities: Vec<UnifiedEntity>,
946    ) -> Result<Vec<Vec<u8>>, StoreError> {
947        let fv = self.format_version();
948
949        // Build a fresh manager. Done outside the collections lock so
950        // the critical section is just the pointer swap below.
951        let new_manager = Arc::new(SegmentManager::with_config(
952            name,
953            self.config.manager_config.clone(),
954        ));
955
956        let mut prepared = entities;
957        for entity in &mut prepared {
958            if entity.id.raw() == 0 {
959                entity.id = self.next_entity_id();
960            } else {
961                self.register_entity_id(entity.id);
962            }
963            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
964                if *row_id == 0 {
965                    *row_id = new_manager.next_row_id();
966                } else {
967                    new_manager.register_row_id(*row_id);
968                }
969            }
970            entity.ensure_table_logical_id();
971        }
972
973        let serialized: Vec<Vec<u8>> = prepared
974            .iter()
975            .map(|e| Self::serialize_entity_record(e, None, fv))
976            .collect();
977
978        new_manager.bulk_insert(prepared.clone()).map_err(|e| {
979            StoreError::Io(std::io::Error::other(format!(
980                "refresh_collection: bulk_insert into new manager failed: {e}"
981            )))
982        })?;
983
984        self.swap_collection_state(name, new_manager, &prepared, &serialized);
985
986        self.finish_paged_write([StoreWalAction::RefreshCollection {
987            collection: name.to_string(),
988            records: serialized.clone(),
989        }])?;
990
991        Ok(serialized)
992    }
993
994    /// Atomic swap of the in-memory collection state. Used by both
995    /// the live `refresh_collection` path and WAL replay. The new
996    /// manager is already populated; this fn replaces the live Arc,
997    /// purges side-state pointing at the previous contents, and
998    /// rebuilds the paged B-tree from the supplied records.
999    fn swap_collection_state(
1000        &self,
1001        name: &str,
1002        new_manager: Arc<SegmentManager>,
1003        prepared: &[UnifiedEntity],
1004        serialized: &[Vec<u8>],
1005    ) {
1006        {
1007            let mut collections = self.collections.write();
1008            collections.insert(name.to_string(), new_manager);
1009        }
1010
1011        self.entity_cache
1012            .retain(|_, (collection, _)| collection != name);
1013        self.remove_from_graph_label_index_batch(
1014            name,
1015            &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1016        );
1017
1018        if let Some(pager) = &self.pager {
1019            let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1020            let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1021                .iter()
1022                .zip(serialized.iter())
1023                .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1024                .collect();
1025            sorted.sort_by(|a, b| a.0.cmp(&b.0));
1026            if !sorted.is_empty() {
1027                let _ = new_btree.bulk_insert_sorted(&sorted);
1028            }
1029            self.btree_indices
1030                .write()
1031                .insert(name.to_string(), new_btree);
1032            self.mark_paged_registry_dirty();
1033        }
1034    }
1035
1036    fn apply_replayed_refresh_collection(
1037        &self,
1038        collection: &str,
1039        records: &[Vec<u8>],
1040    ) -> Result<(), StoreError> {
1041        let new_manager = Arc::new(SegmentManager::with_config(
1042            collection,
1043            self.config.manager_config.clone(),
1044        ));
1045
1046        let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1047        for record in records {
1048            let (entity, _metadata) =
1049                Self::deserialize_entity_record(record, self.format_version())?;
1050            self.register_entity_id(entity.id);
1051            if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1052                new_manager.register_row_id(*row_id);
1053            }
1054            prepared.push(entity);
1055        }
1056
1057        if !prepared.is_empty() {
1058            new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1059                StoreError::Io(std::io::Error::other(format!(
1060                    "replay refresh_collection: bulk_insert failed: {e}"
1061                )))
1062            })?;
1063        }
1064
1065        self.swap_collection_state(collection, new_manager, &prepared, records);
1066        Ok(())
1067    }
1068
1069    /// Replica-side analogue of [`Self::refresh_collection`] — issue
1070    /// #596 slice 9d. Takes pre-serialized record bytes from the
1071    /// primary's CDC stream (the same bytes the primary wrote into the
1072    /// `RefreshCollection` WAL action), applies the atomic swap, and
1073    /// emits the same `RefreshCollection` WAL action against the
1074    /// replica's local store WAL so the post-swap state survives a
1075    /// replica restart through the normal recovery path.
1076    ///
1077    /// Idempotency: re-applying the same records bytes is a full
1078    /// rebuild from the same payload, so the resulting backing-
1079    /// collection contents are equal to the prior call's result.
1080    /// The primary-side `LogicalChangeApplier` short-circuits exact
1081    /// duplicate LSN+payload combinations before reaching here.
1082    pub fn refresh_collection_from_records(
1083        &self,
1084        name: &str,
1085        records: Vec<Vec<u8>>,
1086    ) -> Result<(), StoreError> {
1087        self.apply_replayed_refresh_collection(name, &records)?;
1088        self.finish_paged_write([StoreWalAction::RefreshCollection {
1089            collection: name.to_string(),
1090            records,
1091        }])?;
1092        Ok(())
1093    }
1094
1095    pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1096        let mut collections = self.collections.write();
1097        if collections.contains_key(name) {
1098            return Ok(());
1099        }
1100        let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1101        collections.insert(name.to_string(), Arc::new(manager));
1102        self.mark_paged_registry_dirty();
1103        Ok(())
1104    }
1105
1106    fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1107        let manager = {
1108            let mut collections = self.collections.write();
1109            match collections.remove(name) {
1110                Some(manager) => manager,
1111                None => return Ok(()),
1112            }
1113        };
1114
1115        let entities = manager.query_all(|_| true);
1116        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1117        for entity_id in &entity_ids {
1118            self.context_index.remove_entity(*entity_id);
1119            let _ = self.unindex_cross_refs(*entity_id);
1120        }
1121        self.btree_indices.write().remove(name);
1122        self.entity_cache.retain(|entity_id, (collection, _)| {
1123            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1124        });
1125        self.remove_from_graph_label_index_batch(name, &entity_ids);
1126        self.mark_paged_registry_dirty();
1127        Ok(())
1128    }
1129
1130    fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1131        self.create_collection_in_memory(collection)?;
1132        let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1133        let manager = self
1134            .get_collection(collection)
1135            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1136
1137        self.register_entity_id(entity.id);
1138        if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1139            manager.register_row_id(*row_id);
1140        }
1141
1142        self.context_index.remove_entity(entity.id);
1143        let _ = self.unindex_cross_refs(entity.id);
1144        self.remove_from_graph_label_index(collection, entity.id);
1145
1146        if manager.get(entity.id).is_some() {
1147            manager
1148                .update_with_metadata(entity.clone(), metadata.as_ref())
1149                .map_err(StoreError::from)?;
1150        } else {
1151            manager.insert(entity.clone())?;
1152            if let Some(metadata) = metadata.as_ref() {
1153                manager.set_metadata(entity.id, metadata.clone())?;
1154            }
1155        }
1156
1157        self.context_index.index_entity(collection, &entity);
1158        if let EntityKind::GraphNode(node) = &entity.kind {
1159            self.update_graph_label_index(collection, &node.label, entity.id);
1160        }
1161        self.index_cross_refs(&entity, collection)?;
1162
1163        if let Some(pager) = &self.pager {
1164            let mut btree_indices = self.btree_indices.write();
1165            let btree = btree_indices
1166                .entry(collection.to_string())
1167                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1168            let root_before = btree.root_page_id();
1169            let key = entity.id.raw().to_be_bytes();
1170            match btree.insert(&key, record) {
1171                Ok(_) => {}
1172                Err(BTreeError::DuplicateKey) => {
1173                    let _ = btree.delete(&key);
1174                    let _ = btree.insert(&key, record);
1175                }
1176                Err(err) => {
1177                    return Err(StoreError::Io(io::Error::other(format!(
1178                        "replay upsert btree error: {err}"
1179                    ))));
1180                }
1181            }
1182            if root_before != btree.root_page_id() {
1183                self.mark_paged_registry_dirty();
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1191        self.entity_cache.remove(id.raw());
1192        if let Some(manager) = self.get_collection(collection) {
1193            let deleted = manager.delete(id)?;
1194            if !deleted {
1195                return Ok(());
1196            }
1197        } else {
1198            return Ok(());
1199        }
1200
1201        if let Some(_pager) = &self.pager {
1202            let btree_indices = self.btree_indices.read();
1203            if let Some(btree) = btree_indices.get(collection) {
1204                let root_before = btree.root_page_id();
1205                let key = id.raw().to_be_bytes();
1206                let _ = btree.delete(&key);
1207                if root_before != btree.root_page_id() {
1208                    self.mark_paged_registry_dirty();
1209                }
1210            }
1211        }
1212
1213        let _ = self.unindex_cross_refs(id);
1214        self.remove_from_graph_label_index(collection, id);
1215        self.context_index.remove_entity(id);
1216        Ok(())
1217    }
1218}
1219
1220fn write_string(out: &mut Vec<u8>, value: &str) {
1221    out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1222    out.extend_from_slice(value.as_bytes());
1223}
1224
1225fn write_bytes(out: &mut Vec<u8>, value: &[u8]) {
1226    out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1227    out.extend_from_slice(value);
1228}
1229
1230fn read_u32(data: &[u8], pos: &mut usize) -> io::Result<u32> {
1231    if data.len().saturating_sub(*pos) < 4 {
1232        return Err(io::Error::new(
1233            io::ErrorKind::UnexpectedEof,
1234            "unexpected eof while reading u32",
1235        ));
1236    }
1237    let value = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
1238    *pos += 4;
1239    Ok(value)
1240}
1241
1242fn read_u64(data: &[u8], pos: &mut usize) -> io::Result<u64> {
1243    if data.len().saturating_sub(*pos) < 8 {
1244        return Err(io::Error::new(
1245            io::ErrorKind::UnexpectedEof,
1246            "unexpected eof while reading u64",
1247        ));
1248    }
1249    let value = u64::from_le_bytes([
1250        data[*pos],
1251        data[*pos + 1],
1252        data[*pos + 2],
1253        data[*pos + 3],
1254        data[*pos + 4],
1255        data[*pos + 5],
1256        data[*pos + 6],
1257        data[*pos + 7],
1258    ]);
1259    *pos += 8;
1260    Ok(value)
1261}
1262
1263fn read_string(data: &[u8], pos: &mut usize) -> io::Result<String> {
1264    let len = read_u32(data, pos)? as usize;
1265    if data.len().saturating_sub(*pos) < len {
1266        return Err(io::Error::new(
1267            io::ErrorKind::UnexpectedEof,
1268            "unexpected eof while reading string",
1269        ));
1270    }
1271    let value = std::str::from_utf8(&data[*pos..*pos + len])
1272        .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?
1273        .to_string();
1274    *pos += len;
1275    Ok(value)
1276}
1277
1278fn read_bytes(data: &[u8], pos: &mut usize) -> io::Result<Vec<u8>> {
1279    let len = read_u32(data, pos)? as usize;
1280    if data.len().saturating_sub(*pos) < len {
1281        return Err(io::Error::new(
1282            io::ErrorKind::UnexpectedEof,
1283            "unexpected eof while reading bytes",
1284        ));
1285    }
1286    let value = data[*pos..*pos + len].to_vec();
1287    *pos += len;
1288    Ok(value)
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294    use crate::api::{DurabilityMode, GroupCommitOptions};
1295    use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1296    use std::time::SystemTime;
1297
1298    /// Serialise tests that mutate `REDDB_GROUP_COMMIT_WINDOW_US`.
1299    /// The env table is process-global, so two parallel test threads
1300    /// flipping it would race the assertions in the test that
1301    /// happens to read it last.
1302    fn env_lock() -> &'static StdMutex<()> {
1303        static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1304        LOCK.get_or_init(|| StdMutex::new(()))
1305    }
1306
1307    fn temp_wal(name: &str) -> PathBuf {
1308        let nanos = SystemTime::now()
1309            .duration_since(std::time::UNIX_EPOCH)
1310            .unwrap()
1311            .as_nanos();
1312        let path = std::env::temp_dir().join(format!(
1313            "rb_commit_coord_{}_{}_{}.wal",
1314            name,
1315            std::process::id(),
1316            nanos
1317        ));
1318        let _ = std::fs::remove_file(&path);
1319        path
1320    }
1321
1322    /// Concurrent autocommits MUST coalesce into far fewer fsyncs
1323    /// than the number of callers when the adaptive group-commit
1324    /// window is active. Without the 200µs floor, every caller
1325    /// would race the drain loop and pay an independent fsync.
1326    ///
1327    /// The test fires 32 concurrent `append_actions` calls and asserts
1328    /// the coordinator issued strictly fewer fsyncs than callers.
1329    /// (We don't pin to "one fsync" because timing on loaded CI hosts
1330    /// can split the burst across a couple of drain cycles — the win
1331    /// is the ratio, not the absolute count.)
1332    #[test]
1333    fn group_commit_coalesces_concurrent_autocommits() {
1334        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1335        // Make sure no stale env var skews this run — the test
1336        // exercises the default 200µs floor.
1337        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1338
1339        let path = temp_wal("coalesce");
1340        let coord = Arc::new(
1341            StoreCommitCoordinator::open(
1342                path.clone(),
1343                DurabilityMode::WalDurableGrouped,
1344                GroupCommitOptions::default(),
1345            )
1346            .expect("open commit coordinator"),
1347        );
1348
1349        const WRITERS: usize = 32;
1350        let barrier = Arc::new(Barrier::new(WRITERS));
1351        let mut handles = Vec::with_capacity(WRITERS);
1352        for tx in 0..WRITERS {
1353            let coord_c = Arc::clone(&coord);
1354            let barrier_c = Arc::clone(&barrier);
1355            handles.push(std::thread::spawn(move || {
1356                // Synchronise the start so every writer races the
1357                // drain loop together — this is the workload shape
1358                // the adaptive window is supposed to coalesce.
1359                barrier_c.wait();
1360                let action = StoreWalAction::CreateCollection {
1361                    name: format!("c{tx}"),
1362                };
1363                coord_c
1364                    .append_actions(std::slice::from_ref(&action))
1365                    .expect("append_actions");
1366            }));
1367        }
1368        for h in handles {
1369            h.join().expect("writer thread");
1370        }
1371
1372        let fsyncs = coord.fsync_count();
1373        assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1374        assert!(
1375            fsyncs < WRITERS as u64,
1376            "expected fsyncs ({fsyncs}) to be strictly less than \
1377             concurrent writers ({WRITERS}); coalescing failed"
1378        );
1379
1380        drop(coord);
1381        let _ = std::fs::remove_file(&path);
1382    }
1383
1384    /// Sanity check: with the env override forcing a zero window,
1385    /// fsync coalescing degrades — callers race and we approach
1386    /// one fsync per caller. This proves the window is the actual
1387    /// knob doing the work above (i.e. the test isn't passing for
1388    /// some unrelated reason like buffered-IO coalescing).
1389    #[test]
1390    fn zero_window_disables_coalescing_floor() {
1391        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1392        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1393
1394        let path = temp_wal("zero_window");
1395        let coord = Arc::new(
1396            StoreCommitCoordinator::open(
1397                path.clone(),
1398                DurabilityMode::WalDurableGrouped,
1399                GroupCommitOptions::default(),
1400            )
1401            .expect("open commit coordinator"),
1402        );
1403
1404        const WRITERS: usize = 8;
1405        let barrier = Arc::new(Barrier::new(WRITERS));
1406        let mut handles = Vec::with_capacity(WRITERS);
1407        for tx in 0..WRITERS {
1408            let coord_c = Arc::clone(&coord);
1409            let barrier_c = Arc::clone(&barrier);
1410            handles.push(std::thread::spawn(move || {
1411                barrier_c.wait();
1412                let action = StoreWalAction::CreateCollection {
1413                    name: format!("z{tx}"),
1414                };
1415                coord_c
1416                    .append_actions(std::slice::from_ref(&action))
1417                    .expect("append_actions");
1418            }));
1419        }
1420        for h in handles {
1421            h.join().expect("writer thread");
1422        }
1423
1424        // With zero window, fsync count is bounded above by WRITERS
1425        // (every caller might trigger its own drain) and below by 1
1426        // (the queue may still naturally batch under contention).
1427        // The point of the assertion is to confirm the env override
1428        // is wired through — the open() above used the env knob.
1429        let fsyncs = coord.fsync_count();
1430        assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1431
1432        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1433        drop(coord);
1434        let _ = std::fs::remove_file(&path);
1435    }
1436
1437    /// `resolve_window` precedence: env > config.window_ms > default.
1438    #[test]
1439    fn resolve_window_precedence() {
1440        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1441        // Default: window_ms=0 → adaptive 200µs floor.
1442        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1443        let cfg = GroupCommitOptions::default();
1444        assert_eq!(
1445            StoreCommitCoordinator::resolve_window(&cfg),
1446            Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1447        );
1448
1449        // Explicit ms config wins over the default floor.
1450        let cfg_ms = GroupCommitOptions {
1451            window_ms: 5,
1452            ..GroupCommitOptions::default()
1453        };
1454        assert_eq!(
1455            StoreCommitCoordinator::resolve_window(&cfg_ms),
1456            Duration::from_millis(5)
1457        );
1458
1459        // Env override wins over both.
1460        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1461        assert_eq!(
1462            StoreCommitCoordinator::resolve_window(&cfg),
1463            Duration::from_micros(750)
1464        );
1465        assert_eq!(
1466            StoreCommitCoordinator::resolve_window(&cfg_ms),
1467            Duration::from_micros(750)
1468        );
1469
1470        // Env=0 explicitly disables the floor.
1471        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1472        assert_eq!(
1473            StoreCommitCoordinator::resolve_window(&cfg),
1474            Duration::from_micros(0)
1475        );
1476
1477        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1478    }
1479}