Skip to main content

reddb_server/storage/unified/store/
commit.rs

1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10/// Adaptive group-commit window applied when the configured
11/// `GroupCommitOptions::window_ms` is 0 (the historical default).
12///
13/// Sub-millisecond so individual single-writer commits don't see a
14/// visible latency penalty (a 200 µs floor sits below typical NVMe
15/// fsync latency of 50–150 µs by ~one fsync), while still giving a
16/// pipelined writer a chance to drop a second statement into the
17/// same drain cycle. A lone synchronous `insert_one` still pays one
18/// fsync per row in the worst case, but two back-to-back inserts on
19/// the same connection now coalesce into one drain. See P1 in
20/// `docs/perf/insert_sequential-2026-05-05.md`.
21///
22/// Override via `REDDB_GROUP_COMMIT_WINDOW_US` (microseconds). Set
23/// to `0` to disable the floor entirely (legacy behaviour). Set
24/// `REDDB_GROUP_COMMIT_WINDOW_MS` to any non-zero value to bypass
25/// this floor — explicit ms config wins.
26const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28/// Shorthand — `Arc<parking_lot::Mutex<WalWriter>>` avoids poisoning
29/// (one writer panicking mid-append used to taint every subsequent
30/// lock acquisition) and shaves a few syscalls off the fast path.
31/// The group-commit coordinator + writer threads all acquire this
32/// mutex in the hot insert path, so the unpoison/fast-park win
33/// compounds under 16-way concurrency.
34type WalMutex = parking_lot::Mutex<WalWriter>;
35
36/// Shorthand for the group-commit coordinator's state pair. Same
37/// non-poisoning + lighter-park motivation as `WalMutex`; writer
38/// threads `wait` on this condvar until the coordinator publishes a
39/// new `durable_lsn`, so the park cost shows up on every
40/// WalDurableGrouped transaction.
41type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47const STORE_WAL_VERSION: u8 = 1;
48
49#[derive(Debug, Clone)]
50pub(crate) enum StoreWalAction {
51    CreateCollection {
52        name: String,
53    },
54    DropCollection {
55        name: String,
56    },
57    UpsertEntityRecord {
58        collection: String,
59        record: Vec<u8>,
60    },
61    DeleteEntityRecord {
62        collection: String,
63        entity_id: u64,
64    },
65    /// Batched upsert — one WAL action carrying N serialized entity
66    /// records for the same collection. Saves the per-row Begin/
67    /// PageWrite/Commit framing overhead on the bulk insert hot path.
68    /// Replay applies every contained record in order.
69    BulkUpsertEntityRecords {
70        collection: String,
71        records: Vec<Vec<u8>>,
72    },
73    /// Atomic full-collection replace — issue #595 slice 9c.
74    /// Replay drops the in-memory collection state and rebuilds it
75    /// from the contained records. Used by REFRESH MATERIALIZED VIEW
76    /// so a concurrent reader sees either the prior contents or the
77    /// new contents, never a partial state, and a crash mid-refresh
78    /// leaves the prior contents intact on recovery (the action is
79    /// only durable once the WAL commit lands).
80    RefreshCollection {
81        collection: String,
82        records: Vec<Vec<u8>>,
83    },
84}
85
86#[derive(Debug, Default)]
87pub(crate) struct DeferredStoreWalActions {
88    actions: Vec<StoreWalAction>,
89}
90
91impl DeferredStoreWalActions {
92    pub(crate) fn is_empty(&self) -> bool {
93        self.actions.is_empty()
94    }
95
96    pub(crate) fn extend(&mut self, other: Self) {
97        self.actions.extend(other.actions);
98    }
99}
100
101thread_local! {
102    static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
103        const { RefCell::new(None) };
104}
105
106fn begin_deferred_store_wal_capture() {
107    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
108        let mut guard = cell.borrow_mut();
109        debug_assert!(guard.is_none());
110        *guard = Some(Vec::new());
111    });
112}
113
114fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
115    DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
116        let mut guard = cell.borrow_mut();
117        if let Some(pending) = guard.as_mut() {
118            pending.extend(actions);
119            true
120        } else {
121            false
122        }
123    })
124}
125
126fn deferred_store_wal_capture_active() -> bool {
127    DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
128}
129
130fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
131    DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
132        actions: cell.borrow_mut().take().unwrap_or_default(),
133    })
134}
135
136impl StoreWalAction {
137    pub(crate) fn upsert_entity(
138        collection: &str,
139        entity: &UnifiedEntity,
140        metadata: Option<&Metadata>,
141        format_version: u32,
142    ) -> Self {
143        Self::UpsertEntityRecord {
144            collection: collection.to_string(),
145            record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
146        }
147    }
148
149    fn encode(&self) -> Vec<u8> {
150        let mut out = Vec::new();
151        out.push(STORE_WAL_VERSION);
152        match self {
153            Self::CreateCollection { name } => {
154                out.push(1);
155                write_string(&mut out, name);
156            }
157            Self::DropCollection { name } => {
158                out.push(2);
159                write_string(&mut out, name);
160            }
161            Self::UpsertEntityRecord { collection, record } => {
162                out.push(3);
163                write_string(&mut out, collection);
164                write_bytes(&mut out, record);
165            }
166            Self::DeleteEntityRecord {
167                collection,
168                entity_id,
169            } => {
170                out.push(4);
171                write_string(&mut out, collection);
172                out.extend_from_slice(&entity_id.to_le_bytes());
173            }
174            Self::BulkUpsertEntityRecords {
175                collection,
176                records,
177            } => {
178                out.push(5);
179                write_string(&mut out, collection);
180                out.extend_from_slice(&(records.len() as u32).to_le_bytes());
181                for record in records {
182                    write_bytes(&mut out, record);
183                }
184            }
185            Self::RefreshCollection {
186                collection,
187                records,
188            } => {
189                out.push(6);
190                write_string(&mut out, collection);
191                out.extend_from_slice(&(records.len() as u32).to_le_bytes());
192                for record in records {
193                    write_bytes(&mut out, record);
194                }
195            }
196        }
197        out
198    }
199
200    fn decode(bytes: &[u8]) -> io::Result<Self> {
201        if bytes.len() < 2 {
202            return Err(io::Error::new(
203                io::ErrorKind::InvalidData,
204                "store wal action too short",
205            ));
206        }
207        if bytes[0] != STORE_WAL_VERSION {
208            return Err(io::Error::new(
209                io::ErrorKind::InvalidData,
210                format!("unsupported store wal version: {}", bytes[0]),
211            ));
212        }
213
214        let mut pos = 2usize;
215        match bytes[1] {
216            1 => Ok(Self::CreateCollection {
217                name: read_string(bytes, &mut pos)?,
218            }),
219            2 => Ok(Self::DropCollection {
220                name: read_string(bytes, &mut pos)?,
221            }),
222            3 => Ok(Self::UpsertEntityRecord {
223                collection: read_string(bytes, &mut pos)?,
224                record: read_bytes(bytes, &mut pos)?,
225            }),
226            4 => {
227                let collection = read_string(bytes, &mut pos)?;
228                let entity_id = read_u64(bytes, &mut pos)?;
229                Ok(Self::DeleteEntityRecord {
230                    collection,
231                    entity_id,
232                })
233            }
234            5 => {
235                let collection = read_string(bytes, &mut pos)?;
236                if pos + 4 > bytes.len() {
237                    return Err(io::Error::new(
238                        io::ErrorKind::InvalidData,
239                        "bulk upsert wal action: missing record count",
240                    ));
241                }
242                let count = u32::from_le_bytes([
243                    bytes[pos],
244                    bytes[pos + 1],
245                    bytes[pos + 2],
246                    bytes[pos + 3],
247                ]) as usize;
248                pos += 4;
249                let mut records = Vec::with_capacity(count);
250                for _ in 0..count {
251                    records.push(read_bytes(bytes, &mut pos)?);
252                }
253                Ok(Self::BulkUpsertEntityRecords {
254                    collection,
255                    records,
256                })
257            }
258            6 => {
259                let collection = read_string(bytes, &mut pos)?;
260                if pos + 4 > bytes.len() {
261                    return Err(io::Error::new(
262                        io::ErrorKind::InvalidData,
263                        "refresh collection wal action: missing record count",
264                    ));
265                }
266                let count = u32::from_le_bytes([
267                    bytes[pos],
268                    bytes[pos + 1],
269                    bytes[pos + 2],
270                    bytes[pos + 3],
271                ]) as usize;
272                pos += 4;
273                let mut records = Vec::with_capacity(count);
274                for _ in 0..count {
275                    records.push(read_bytes(bytes, &mut pos)?);
276                }
277                Ok(Self::RefreshCollection {
278                    collection,
279                    records,
280                })
281            }
282            other => Err(io::Error::new(
283                io::ErrorKind::InvalidData,
284                format!("unsupported store wal action tag: {other}"),
285            )),
286        }
287    }
288}
289
290#[derive(Debug)]
291struct CommitState {
292    durable_lsn: u64,
293    pending_target_lsn: u64,
294    pending_statements: usize,
295    pending_wal_bytes: u64,
296    first_pending_at: Option<Instant>,
297    shutdown: bool,
298    last_error: Option<String>,
299}
300
301impl CommitState {
302    fn new(initial_durable_lsn: u64) -> Self {
303        Self {
304            durable_lsn: initial_durable_lsn,
305            pending_target_lsn: initial_durable_lsn,
306            pending_statements: 0,
307            pending_wal_bytes: 0,
308            first_pending_at: None,
309            shutdown: false,
310            last_error: None,
311        }
312    }
313}
314
315/// Lock-free append queue sitting in front of `WalWriter`.
316///
317/// Writers atomically reserve a byte range via `next_lsn.fetch_add`
318/// and push their encoded bytes into a parking_lot-guarded vector.
319/// The group-commit coordinator is the sole drainer: it sorts the
320/// pending entries by LSN (so the file bytes land at the offsets
321/// each writer reserved) and hands them to `WalWriter::append_bytes`.
322///
323/// This replaces the old `wal.lock() ... append ... append ... drop`
324/// hot path where 16 concurrent writers serialised on the WAL
325/// mutex for ~13µs each. Hold time on the queue's parking_lot
326/// mutex is ~200ns (just a Vec::push) — 65× shorter, so the mutex
327/// convoy on concurrent inserts disappears.
328pub(crate) struct WalAppendQueue {
329    /// Tuple of (monotonically-increasing LSN, pending-bytes vec)
330    /// protected by one mutex. Keeping the LSN reservation AND the
331    /// push under the same lock guarantees that every queue entry
332    /// is visible the moment its LSN is assigned — no gap between
333    /// `fetch_add` and `push` for the leader to spin on.
334    ///
335    /// Earlier versions reserved LSN with an `AtomicU64::fetch_add`
336    /// outside the lock; the leader drain observed reordered
337    /// `(lsn, bytes)` tuples whose pushes happened in a different
338    /// order than their fetch_add, creating "holes" in the LSN
339    /// sequence that the drain loop interpreted as "wait for the
340    /// missing enqueuer" — under tokio scheduling pressure the
341    /// missing enqueuer was preempted indefinitely and the
342    /// drain loop busy-waited forever (WAL stayed at 8 bytes).
343    pending: parking_lot::Mutex<WalQueueState>,
344}
345
346struct WalQueueState {
347    next_lsn: u64,
348    entries: Vec<(u64, Vec<u8>)>,
349}
350
351impl WalAppendQueue {
352    fn new(initial_lsn: u64) -> Self {
353        Self {
354            pending: parking_lot::Mutex::new(WalQueueState {
355                next_lsn: initial_lsn,
356                entries: Vec::with_capacity(64),
357            }),
358        }
359    }
360
361    /// Reserve an LSN range of `bytes.len()` bytes and push onto the
362    /// queue. Returns the commit LSN (end of reserved range), which
363    /// the caller passes to `wait_until_durable`. LSN assignment and
364    /// push happen under the same mutex — no gap for the drain loop
365    /// to busy-spin on.
366    fn enqueue(&self, bytes: Vec<u8>) -> u64 {
367        let len = bytes.len() as u64;
368        let mut state = self.pending.lock();
369        let start_lsn = state.next_lsn;
370        state.next_lsn = start_lsn + len;
371        state.entries.push((start_lsn, bytes));
372        start_lsn + len
373    }
374
375    /// Drain all queued entries in LSN order. Caller holds the WAL
376    /// file mutex while writing the drained bytes so the on-disk
377    /// layout matches the reserved LSN offsets.
378    fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
379        let mut state = self.pending.lock();
380        let mut v = std::mem::take(&mut state.entries);
381        drop(state);
382        v.sort_by_key(|(lsn, _)| *lsn);
383        v
384    }
385
386    /// Whether any entry is queued. Leader uses this to decide
387    /// whether to spin once more or go back to the condvar.
388    fn has_pending(&self) -> bool {
389        !self.pending.lock().entries.is_empty()
390    }
391
392    /// Reset the LSN cursor and discard any queued entries. Used
393    /// after `wal.truncate()` — the wal-side byte counter goes back
394    /// to the header size, so the queue (which tracks LSNs in the
395    /// same byte space) must follow or every subsequent enqueue
396    /// returns a target the drain loop can never reach.
397    fn reset(&self, next_lsn: u64) {
398        let mut state = self.pending.lock();
399        state.next_lsn = next_lsn;
400        state.entries.clear();
401    }
402}
403
404pub(crate) struct StoreCommitCoordinator {
405    mode: DurabilityMode,
406    config: crate::api::GroupCommitOptions,
407    wal_path: PathBuf,
408    wal: Arc<WalMutex>,
409    /// Lock-free front door for writers. Populated alongside
410    /// `WalDurableGrouped` / `Async` modes so concurrent inserts
411    /// never contend on `wal` for the append step. Strict mode
412    /// bypasses the queue and calls `WalWriter::append` directly
413    /// to preserve the one-fsync-per-commit semantic.
414    queue: Arc<WalAppendQueue>,
415    state: Arc<(CommitStateMutex, CommitStateCondvar)>,
416    /// Number of `wal.sync()` calls issued by the group-commit
417    /// drain loop. Used by tests to observe coalescing — a burst
418    /// of N concurrent commits should bump this by far less than N
419    /// when the adaptive window is doing its job. Strict-mode
420    /// commits go through `force_sync` which also bumps this.
421    fsync_count: Arc<AtomicU64>,
422}
423
424impl StoreCommitCoordinator {
425    pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
426        matches!(
427            mode,
428            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
429        ) || path.exists()
430    }
431
432    pub(crate) fn open(
433        wal_path: impl Into<PathBuf>,
434        mode: DurabilityMode,
435        config: crate::api::GroupCommitOptions,
436    ) -> io::Result<Self> {
437        let wal_path = wal_path.into();
438        let wal = WalWriter::open(&wal_path)?;
439        let initial_durable_lsn = wal.durable_lsn();
440        let initial_current_lsn = wal.current_lsn();
441        let wal = Arc::new(WalMutex::new(wal));
442        let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
443        let state = Arc::new((
444            CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
445            CommitStateCondvar::new(),
446        ));
447        let fsync_count = Arc::new(AtomicU64::new(0));
448
449        if matches!(
450            mode,
451            DurabilityMode::WalDurableGrouped | DurabilityMode::Async
452        ) {
453            let wal_bg = Arc::clone(&wal);
454            let queue_bg = Arc::clone(&queue);
455            let state_bg = Arc::clone(&state);
456            let fsync_bg = Arc::clone(&fsync_count);
457            // P1: adaptive group-commit window. Historical default is
458            // `window_ms = 0` ("no wait"), which under single-writer
459            // OLTP means one fsync per autocommit row — the throughput
460            // floor. When window_ms is 0 we fall back to a small
461            // microsecond floor (`DEFAULT_ADAPTIVE_WINDOW_US`, override
462            // via `REDDB_GROUP_COMMIT_WINDOW_US`) so a pipelined writer
463            // can drop a second statement into the same drain cycle.
464            // Explicit non-zero `window_ms` config takes precedence.
465            //
466            // The loop already short-circuits on
467            // `pending_statements >= max_statements` /
468            // `pending_wal_bytes >= max_wal_bytes`, so the window only
469            // delays the leader when the queue is otherwise empty —
470            // exactly the case we want to coalesce.
471            let window = Self::resolve_window(&config);
472            let max_statements = config.max_statements.max(1);
473            let max_wal_bytes = config.max_wal_bytes.max(1);
474            std::thread::spawn(move || {
475                Self::run_group_commit_loop(
476                    wal_bg,
477                    queue_bg,
478                    state_bg,
479                    fsync_bg,
480                    window,
481                    max_statements,
482                    max_wal_bytes,
483                );
484            });
485        }
486
487        Ok(Self {
488            mode,
489            config,
490            wal_path,
491            wal,
492            queue,
493            state,
494            fsync_count,
495        })
496    }
497
498    /// Resolve the effective group-commit window from the configured
499    /// options + the `REDDB_GROUP_COMMIT_WINDOW_US` env override.
500    ///
501    /// Precedence (highest first):
502    ///   1. `REDDB_GROUP_COMMIT_WINDOW_US=N` — N µs (0 disables).
503    ///   2. `config.window_ms != 0` — explicit ms config wins.
504    ///   3. `DEFAULT_ADAPTIVE_WINDOW_US` — adaptive floor for the
505    ///      historical zero-default. Set the env var to 0 to opt out.
506    fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
507        if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
508            if let Ok(parsed) = raw.parse::<u64>() {
509                return Duration::from_micros(parsed);
510            }
511        }
512        if config.window_ms != 0 {
513            return Duration::from_millis(config.window_ms);
514        }
515        Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
516    }
517
518    /// Total `wal.sync()` calls issued since this coordinator opened.
519    /// Public for tests that want to observe fsync coalescing under
520    /// concurrent autocommits.
521    #[cfg(test)]
522    pub(crate) fn fsync_count(&self) -> u64 {
523        self.fsync_count.load(Ordering::Relaxed)
524    }
525
526    pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
527        if actions.is_empty() {
528            return Ok(());
529        }
530
531        let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
532
533        // Strict mode: bypass the queue, write + fsync inline. Strict
534        // commits are exactly one-fsync-per-call by contract, so the
535        // coalescing win of the queue doesn't apply and we'd pay an
536        // extra hop through the drain loop.
537        if matches!(self.mode, DurabilityMode::Strict) {
538            let commit_lsn = {
539                let mut wal = self.wal.lock();
540                wal.append(&WalRecord::TxCommitBatch {
541                    tx_id,
542                    actions: actions.iter().map(StoreWalAction::encode).collect(),
543                })?;
544                wal.current_lsn()
545            };
546            self.force_sync()?;
547            let _ = commit_lsn;
548            return Ok(());
549        }
550
551        // Grouped / Async path — lock-free enqueue. Encode every
552        // WalRecord into one contiguous byte blob OUTSIDE any lock,
553        // then hand it to the queue with a single fetch_add+push.
554        let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
555        let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
556            total.saturating_add(payload.len() as u64)
557        });
558        let blob = WalRecord::TxCommitBatch {
559            tx_id,
560            actions: encoded_actions,
561        }
562        .encode();
563
564        let commit_lsn = self.queue.enqueue(blob);
565        self.wait_until_durable(commit_lsn, wal_bytes)?;
566        Ok(())
567    }
568
569    /// Encode + enqueue a single, non-batched [`WalRecord`] (issue
570    /// #693). Honours the same Strict / Grouped / Async branching as
571    /// [`Self::append_actions`] so durability semantics are uniform
572    /// across record kinds.
573    pub(crate) fn append_single_record(&self, record: WalRecord) -> io::Result<()> {
574        let blob = record.encode();
575        let wal_bytes = blob.len() as u64;
576        if matches!(self.mode, DurabilityMode::Strict) {
577            {
578                let mut wal = self.wal.lock();
579                wal.append(&record)?;
580            }
581            self.force_sync()?;
582            return Ok(());
583        }
584        let commit_lsn = self.queue.enqueue(blob);
585        self.wait_until_durable(commit_lsn, wal_bytes)?;
586        Ok(())
587    }
588
589    pub(crate) fn force_sync(&self) -> io::Result<()> {
590        {
591            let mut wal = self.wal.lock();
592            wal.sync()?;
593            self.fsync_count.fetch_add(1, Ordering::Relaxed);
594            let durable = wal.durable_lsn();
595            drop(wal);
596            let (state_lock, cond) = &*self.state;
597            let mut state = state_lock.lock();
598            state.durable_lsn = durable;
599            state.pending_target_lsn = durable.max(state.pending_target_lsn);
600            state.pending_statements = 0;
601            state.pending_wal_bytes = 0;
602            state.first_pending_at = None;
603            state.last_error = None;
604            cond.notify_all();
605        }
606        Ok(())
607    }
608
609    pub(crate) fn truncate(&self) -> io::Result<()> {
610        let mut wal = self.wal.lock();
611        wal.truncate()?;
612        let durable = wal.durable_lsn();
613        let current = wal.current_lsn();
614        drop(wal);
615
616        // Queue's next_lsn tracks byte offsets in the same space as
617        // wal.current_lsn. After truncate both must be reset together
618        // — otherwise enqueue returns a target_lsn in the old range
619        // that drain can never reach, and wait_until_durable hangs.
620        self.queue.reset(current);
621
622        let (state_lock, cond) = &*self.state;
623        let mut state = state_lock.lock();
624        state.durable_lsn = durable;
625        state.pending_target_lsn = durable;
626        state.pending_statements = 0;
627        state.pending_wal_bytes = 0;
628        state.first_pending_at = None;
629        state.last_error = None;
630        cond.notify_all();
631        Ok(())
632    }
633
634    pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
635        if !self.wal_path.exists() {
636            return Ok(());
637        }
638
639        let reader = match WalReader::open(&self.wal_path) {
640            Ok(reader) => reader,
641            Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
642            Err(err) => return Err(err),
643        };
644
645        let mut tx_states = std::collections::HashMap::<u64, bool>::new();
646        let mut pending = Vec::<(u64, Vec<u8>)>::new();
647
648        for record in reader.iter() {
649            let (_lsn, record) = match record {
650                Ok(record) => record,
651                Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
652                Err(err) => return Err(err),
653            };
654            match record {
655                WalRecord::TxCommitBatch { actions, .. } => {
656                    for payload in actions {
657                        let action = StoreWalAction::decode(&payload)?;
658                        store.apply_replayed_action(&action).map_err(|err| {
659                            io::Error::other(format!("failed to replay store wal action: {err}"))
660                        })?;
661                    }
662                }
663                WalRecord::Begin { tx_id } => {
664                    tx_states.insert(tx_id, false);
665                }
666                WalRecord::Commit { tx_id } => {
667                    tx_states.insert(tx_id, true);
668                }
669                WalRecord::Rollback { tx_id } => {
670                    tx_states.remove(&tx_id);
671                }
672                WalRecord::PageWrite {
673                    tx_id,
674                    page_id: _,
675                    data,
676                } => pending.push((tx_id, data)),
677                WalRecord::Checkpoint { .. } => {}
678                WalRecord::VectorInsert {
679                    collection,
680                    entity_id,
681                    vector,
682                } => {
683                    // Issue #694 — capture `vector.turbo` WAL inserts so the
684                    // boot-time TurboQuant index rebuild can drain them in
685                    // WAL order (deterministic against the pre-restart
686                    // state under a fixed codec seed). The legacy `vector`
687                    // path doesn't read this map.
688                    let mut map = store.replayed_turbo_inserts.lock();
689                    map.entry(collection).or_default().push((entity_id, vector));
690                }
691                WalRecord::FullPageImage { .. } => {
692                    // Pager-level FPI (gh-478); store replay ignores.
693                }
694            }
695        }
696
697        for (tx_id, payload) in pending {
698            if !tx_states.get(&tx_id).copied().unwrap_or(false) {
699                continue;
700            }
701            let action = StoreWalAction::decode(&payload)?;
702            store.apply_replayed_action(&action).map_err(|err| {
703                io::Error::other(format!("failed to replay store wal action: {err}"))
704            })?;
705        }
706
707        Ok(())
708    }
709
710    fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
711        match self.mode {
712            DurabilityMode::Strict => self.force_sync(),
713            // Async: record the pending target so the background
714            // flusher eventually covers it, but don't block the
715            // caller. Matches PG `synchronous_commit=off` semantics —
716            // crash inside the flush window loses unflushed commits.
717            DurabilityMode::Async => {
718                let (state_lock, cond) = &*self.state;
719                let mut state = state_lock.lock();
720                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
721                state.pending_statements = state.pending_statements.saturating_add(1);
722                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
723                state.first_pending_at.get_or_insert_with(Instant::now);
724                cond.notify_all();
725                Ok(())
726            }
727            DurabilityMode::WalDurableGrouped => {
728                let (state_lock, cond) = &*self.state;
729                let mut state = state_lock.lock();
730                state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
731                state.pending_statements = state.pending_statements.saturating_add(1);
732                state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
733                state.first_pending_at.get_or_insert_with(Instant::now);
734                cond.notify_all();
735
736                loop {
737                    if let Some(err) = state.last_error.clone() {
738                        return Err(io::Error::other(err));
739                    }
740                    if state.durable_lsn >= target_lsn {
741                        return Ok(());
742                    }
743                    // parking_lot::Condvar mutates the guard in place —
744                    // no LockResult to unwrap, no poisoning to fold.
745                    cond.wait(&mut state);
746                }
747            }
748        }
749    }
750
751    fn run_group_commit_loop(
752        wal: Arc<WalMutex>,
753        queue: Arc<WalAppendQueue>,
754        state: Arc<(CommitStateMutex, CommitStateCondvar)>,
755        fsync_count: Arc<AtomicU64>,
756        window: Duration,
757        max_statements: usize,
758        max_wal_bytes: u64,
759    ) {
760        let (state_lock, cond) = &*state;
761        loop {
762            let target_lsn = {
763                let mut guard = state_lock.lock();
764
765                while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
766                    cond.wait(&mut guard);
767                }
768
769                if guard.shutdown {
770                    return;
771                }
772
773                let immediate = window.is_zero()
774                    || guard.pending_statements >= max_statements
775                    || guard.pending_wal_bytes >= max_wal_bytes;
776
777                if !immediate {
778                    let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
779                    let now = Instant::now();
780                    if now < deadline {
781                        let timeout = deadline.saturating_duration_since(now);
782                        let _ = cond.wait_for(&mut guard, timeout);
783                        if guard.shutdown {
784                            return;
785                        }
786                        if guard.pending_target_lsn <= guard.durable_lsn {
787                            continue;
788                        }
789                        let should_wait_again = guard.pending_statements < max_statements
790                            && guard.pending_wal_bytes < max_wal_bytes
791                            && guard
792                                .first_pending_at
793                                .map(|first| first.elapsed() < window)
794                                .unwrap_or(false);
795                        if should_wait_again {
796                            continue;
797                        }
798                    }
799                }
800
801                guard.pending_target_lsn
802            };
803
804            // Drain all queued entries. Since `WalAppendQueue::enqueue`
805            // assigns LSN and pushes under a single mutex, the drained
806            // tuples are guaranteed to form a contiguous byte range
807            // starting at `wal.current_lsn()` — no gaps, no leftover
808            // handling.
809            let batches = queue.drain_sorted();
810
811            let sync_result = {
812                let mut wal = wal.lock();
813                let mut write_err: Option<io::Error> = None;
814                for (_lsn, bytes) in batches {
815                    if let Err(e) = wal.append_bytes(&bytes) {
816                        write_err = Some(e);
817                        break;
818                    }
819                }
820                match write_err {
821                    Some(e) => Err(e),
822                    None => wal.sync().map(|_| {
823                        // Count the fsync exactly once per drain
824                        // cycle so tests can compare it against the
825                        // number of `append_actions` callers that
826                        // entered the queue.
827                        fsync_count.fetch_add(1, Ordering::Relaxed);
828                        wal.durable_lsn()
829                    }),
830                }
831            };
832
833            // Late enqueuers that arrived after our drain stay in the
834            // queue — don't clear the `pending_*` counters yet and
835            // don't claim we reached `target_lsn` unless the fsync
836            // actually covers it.
837            let more_pending = queue.has_pending();
838
839            let mut guard = state_lock.lock();
840            match sync_result {
841                Ok(durable_lsn) => {
842                    guard.durable_lsn = durable_lsn;
843                    if !more_pending {
844                        guard.pending_statements = 0;
845                        guard.pending_wal_bytes = 0;
846                        guard.first_pending_at = None;
847                    }
848                    guard.last_error = None;
849                    let _ = target_lsn;
850                }
851                Err(err) => {
852                    guard.last_error = Some(err.to_string());
853                }
854            }
855            cond.notify_all();
856        }
857    }
858}
859
860impl Drop for StoreCommitCoordinator {
861    fn drop(&mut self) {
862        let (state_lock, cond) = &*self.state;
863        // parking_lot::Mutex::lock is infallible (no poisoning).
864        let mut state = state_lock.lock();
865        state.shutdown = true;
866        cond.notify_all();
867    }
868}
869
870impl UnifiedStore {
871    pub(crate) fn begin_deferred_store_wal_capture() {
872        begin_deferred_store_wal_capture();
873    }
874
875    pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
876        take_deferred_store_wal_capture()
877    }
878
879    pub(crate) fn append_deferred_store_wal_actions(
880        &self,
881        actions: DeferredStoreWalActions,
882    ) -> Result<(), StoreError> {
883        if actions.actions.is_empty() {
884            return Ok(());
885        }
886        match self.config.durability_mode {
887            DurabilityMode::Strict => self.flush_paged_state(),
888            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
889                if let Some(commit) = &self.commit {
890                    commit
891                        .append_actions(&actions.actions)
892                        .map_err(StoreError::Io)
893                } else {
894                    self.flush_paged_state()
895                }
896            }
897        }
898    }
899
900    pub(crate) fn wal_path_for_db(path: &Path) -> PathBuf {
901        path.with_extension("rdb-uwal")
902    }
903
904    /// Emit a [`WalRecord::VectorInsert`] for a `vector.turbo`
905    /// collection (issue #693). Returns `Ok(())` when no commit
906    /// coordinator is configured (in-memory mode) so callers don't
907    /// have to special-case the missing WAL — the in-memory index
908    /// update remains durable enough for in-memory runtimes by
909    /// construction.
910    pub(crate) fn append_vector_insert_record(
911        &self,
912        collection: &str,
913        entity_id: u64,
914        vector: &[f32],
915    ) -> std::io::Result<()> {
916        let Some(commit) = &self.commit else {
917            return Ok(());
918        };
919        let record = WalRecord::VectorInsert {
920            collection: collection.to_string(),
921            entity_id,
922            vector: vector.to_vec(),
923        };
924        commit.append_single_record(record)
925    }
926
927    pub(crate) fn finish_paged_write(
928        &self,
929        actions: impl IntoIterator<Item = StoreWalAction>,
930    ) -> Result<(), StoreError> {
931        let actions: Vec<StoreWalAction> = actions.into_iter().collect();
932        if deferred_store_wal_capture_active() {
933            let captured = capture_deferred_store_wal_actions(actions);
934            debug_assert!(captured);
935            return Ok(());
936        }
937        match self.config.durability_mode {
938            DurabilityMode::Strict => self.flush_paged_state(),
939            DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
940                if let Some(commit) = &self.commit {
941                    commit.append_actions(&actions).map_err(StoreError::Io)?;
942                    Ok(())
943                } else {
944                    self.flush_paged_state()
945                }
946            }
947        }
948    }
949
950    pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
951        match action {
952            StoreWalAction::CreateCollection { name } => {
953                if self.get_collection(name).is_none() {
954                    let _ = self.create_collection_in_memory(name);
955                }
956                Ok(())
957            }
958            StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
959            StoreWalAction::UpsertEntityRecord { collection, record } => {
960                self.apply_replayed_upsert(collection, record)
961            }
962            StoreWalAction::DeleteEntityRecord {
963                collection,
964                entity_id,
965            } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
966            StoreWalAction::BulkUpsertEntityRecords {
967                collection,
968                records,
969            } => {
970                for record in records {
971                    self.apply_replayed_upsert(collection, record)?;
972                }
973                Ok(())
974            }
975            StoreWalAction::RefreshCollection {
976                collection,
977                records,
978            } => self.apply_replayed_refresh_collection(collection, records),
979        }
980    }
981
982    /// Atomic full-collection replace — issue #595 slice 9c.
983    ///
984    /// Builds a fresh `SegmentManager` populated with `entities`,
985    /// atomically swaps it into the collections map, rebuilds the
986    /// paged B-tree from the same records, and emits a single
987    /// `RefreshCollection` WAL action.
988    ///
989    /// Concurrent readers that resolved an `Arc<SegmentManager>` for
990    /// this collection before the swap continue reading the prior
991    /// contents; readers after the swap see the new contents. The
992    /// transition is single-pointer so partial state is unobservable.
993    ///
994    /// A crash between in-memory mutation and WAL fsync leaves the
995    /// previous WAL stream intact — on recovery the prior
996    /// `RefreshCollection` (or whatever last bulk-applied to the
997    /// collection) is replayed, so the prior contents are observed.
998    pub fn refresh_collection(
999        &self,
1000        name: &str,
1001        entities: Vec<UnifiedEntity>,
1002    ) -> Result<Vec<Vec<u8>>, StoreError> {
1003        let fv = self.format_version();
1004
1005        // Build a fresh manager. Done outside the collections lock so
1006        // the critical section is just the pointer swap below.
1007        let new_manager = Arc::new(SegmentManager::with_config(
1008            name,
1009            self.config.manager_config.clone(),
1010        ));
1011
1012        let mut prepared = entities;
1013        for entity in &mut prepared {
1014            if entity.id.raw() == 0 {
1015                entity.id = self.next_entity_id();
1016            } else {
1017                self.register_entity_id(entity.id);
1018            }
1019            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
1020                if *row_id == 0 {
1021                    *row_id = new_manager.next_row_id();
1022                } else {
1023                    new_manager.register_row_id(*row_id);
1024                }
1025            }
1026            entity.ensure_table_logical_id();
1027        }
1028
1029        let serialized: Vec<Vec<u8>> = prepared
1030            .iter()
1031            .map(|e| Self::serialize_entity_record(e, None, fv))
1032            .collect();
1033
1034        new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1035            StoreError::Io(std::io::Error::other(format!(
1036                "refresh_collection: bulk_insert into new manager failed: {e}"
1037            )))
1038        })?;
1039
1040        self.swap_collection_state(name, new_manager, &prepared, &serialized);
1041
1042        self.finish_paged_write([StoreWalAction::RefreshCollection {
1043            collection: name.to_string(),
1044            records: serialized.clone(),
1045        }])?;
1046
1047        Ok(serialized)
1048    }
1049
1050    /// Atomic swap of the in-memory collection state. Used by both
1051    /// the live `refresh_collection` path and WAL replay. The new
1052    /// manager is already populated; this fn replaces the live Arc,
1053    /// purges side-state pointing at the previous contents, and
1054    /// rebuilds the paged B-tree from the supplied records.
1055    fn swap_collection_state(
1056        &self,
1057        name: &str,
1058        new_manager: Arc<SegmentManager>,
1059        prepared: &[UnifiedEntity],
1060        serialized: &[Vec<u8>],
1061    ) {
1062        {
1063            let mut collections = self.collections.write();
1064            collections.insert(name.to_string(), new_manager);
1065        }
1066
1067        self.entity_cache
1068            .retain(|_, (collection, _)| collection != name);
1069        self.remove_from_graph_label_index_batch(
1070            name,
1071            &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1072        );
1073
1074        if let Some(pager) = &self.pager {
1075            let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1076            let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1077                .iter()
1078                .zip(serialized.iter())
1079                .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1080                .collect();
1081            sorted.sort_by(|a, b| a.0.cmp(&b.0));
1082            if !sorted.is_empty() {
1083                let _ = new_btree.bulk_insert_sorted(&sorted);
1084            }
1085            self.btree_indices
1086                .write()
1087                .insert(name.to_string(), new_btree);
1088            self.mark_paged_registry_dirty();
1089        }
1090    }
1091
1092    fn apply_replayed_refresh_collection(
1093        &self,
1094        collection: &str,
1095        records: &[Vec<u8>],
1096    ) -> Result<(), StoreError> {
1097        let new_manager = Arc::new(SegmentManager::with_config(
1098            collection,
1099            self.config.manager_config.clone(),
1100        ));
1101
1102        let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1103        for record in records {
1104            let (entity, _metadata) =
1105                Self::deserialize_entity_record(record, self.format_version())?;
1106            self.register_entity_id(entity.id);
1107            if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1108                new_manager.register_row_id(*row_id);
1109            }
1110            prepared.push(entity);
1111        }
1112
1113        if !prepared.is_empty() {
1114            new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1115                StoreError::Io(std::io::Error::other(format!(
1116                    "replay refresh_collection: bulk_insert failed: {e}"
1117                )))
1118            })?;
1119        }
1120
1121        self.swap_collection_state(collection, new_manager, &prepared, records);
1122        Ok(())
1123    }
1124
1125    /// Replica-side analogue of [`Self::refresh_collection`] — issue
1126    /// #596 slice 9d. Takes pre-serialized record bytes from the
1127    /// primary's CDC stream (the same bytes the primary wrote into the
1128    /// `RefreshCollection` WAL action), applies the atomic swap, and
1129    /// emits the same `RefreshCollection` WAL action against the
1130    /// replica's local store WAL so the post-swap state survives a
1131    /// replica restart through the normal recovery path.
1132    ///
1133    /// Idempotency: re-applying the same records bytes is a full
1134    /// rebuild from the same payload, so the resulting backing-
1135    /// collection contents are equal to the prior call's result.
1136    /// The primary-side `LogicalChangeApplier` short-circuits exact
1137    /// duplicate LSN+payload combinations before reaching here.
1138    pub fn refresh_collection_from_records(
1139        &self,
1140        name: &str,
1141        records: Vec<Vec<u8>>,
1142    ) -> Result<(), StoreError> {
1143        self.apply_replayed_refresh_collection(name, &records)?;
1144        self.finish_paged_write([StoreWalAction::RefreshCollection {
1145            collection: name.to_string(),
1146            records,
1147        }])?;
1148        Ok(())
1149    }
1150
1151    pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1152        let mut collections = self.collections.write();
1153        if collections.contains_key(name) {
1154            return Ok(());
1155        }
1156        let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1157        collections.insert(name.to_string(), Arc::new(manager));
1158        self.mark_paged_registry_dirty();
1159        Ok(())
1160    }
1161
1162    fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1163        let manager = {
1164            let mut collections = self.collections.write();
1165            match collections.remove(name) {
1166                Some(manager) => manager,
1167                None => return Ok(()),
1168            }
1169        };
1170
1171        let entities = manager.query_all(|_| true);
1172        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1173        for entity_id in &entity_ids {
1174            self.context_index.remove_entity(*entity_id);
1175            let _ = self.unindex_cross_refs(*entity_id);
1176        }
1177        self.btree_indices.write().remove(name);
1178        self.entity_cache.retain(|entity_id, (collection, _)| {
1179            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1180        });
1181        self.remove_from_graph_label_index_batch(name, &entity_ids);
1182        self.mark_paged_registry_dirty();
1183        Ok(())
1184    }
1185
1186    fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1187        self.create_collection_in_memory(collection)?;
1188        let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1189        let manager = self
1190            .get_collection(collection)
1191            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1192
1193        self.register_entity_id(entity.id);
1194        if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1195            manager.register_row_id(*row_id);
1196        }
1197
1198        self.context_index.remove_entity(entity.id);
1199        let _ = self.unindex_cross_refs(entity.id);
1200        self.remove_from_graph_label_index(collection, entity.id);
1201
1202        if manager.get(entity.id).is_some() {
1203            manager
1204                .update_with_metadata(entity.clone(), metadata.as_ref())
1205                .map_err(StoreError::from)?;
1206        } else {
1207            manager.insert(entity.clone())?;
1208            if let Some(metadata) = metadata.as_ref() {
1209                manager.set_metadata(entity.id, metadata.clone())?;
1210            }
1211        }
1212
1213        self.context_index.index_entity(collection, &entity);
1214        if let EntityKind::GraphNode(node) = &entity.kind {
1215            self.update_graph_label_index(collection, &node.label, entity.id);
1216        }
1217        self.index_cross_refs(&entity, collection)?;
1218
1219        if let Some(pager) = &self.pager {
1220            let mut btree_indices = self.btree_indices.write();
1221            let btree = btree_indices
1222                .entry(collection.to_string())
1223                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1224            let root_before = btree.root_page_id();
1225            let key = entity.id.raw().to_be_bytes();
1226            match btree.insert(&key, record) {
1227                Ok(_) => {}
1228                Err(BTreeError::DuplicateKey) => {
1229                    let _ = btree.delete(&key);
1230                    let _ = btree.insert(&key, record);
1231                }
1232                Err(err) => {
1233                    return Err(StoreError::Io(io::Error::other(format!(
1234                        "replay upsert btree error: {err}"
1235                    ))));
1236                }
1237            }
1238            if root_before != btree.root_page_id() {
1239                self.mark_paged_registry_dirty();
1240            }
1241        }
1242
1243        Ok(())
1244    }
1245
1246    fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1247        self.entity_cache.remove(id.raw());
1248        if let Some(manager) = self.get_collection(collection) {
1249            let deleted = manager.delete(id)?;
1250            if !deleted {
1251                return Ok(());
1252            }
1253        } else {
1254            return Ok(());
1255        }
1256
1257        if let Some(_pager) = &self.pager {
1258            let btree_indices = self.btree_indices.read();
1259            if let Some(btree) = btree_indices.get(collection) {
1260                let root_before = btree.root_page_id();
1261                let key = id.raw().to_be_bytes();
1262                let _ = btree.delete(&key);
1263                if root_before != btree.root_page_id() {
1264                    self.mark_paged_registry_dirty();
1265                }
1266            }
1267        }
1268
1269        let _ = self.unindex_cross_refs(id);
1270        self.remove_from_graph_label_index(collection, id);
1271        self.context_index.remove_entity(id);
1272        Ok(())
1273    }
1274}
1275
1276fn write_string(out: &mut Vec<u8>, value: &str) {
1277    out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1278    out.extend_from_slice(value.as_bytes());
1279}
1280
1281fn write_bytes(out: &mut Vec<u8>, value: &[u8]) {
1282    out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1283    out.extend_from_slice(value);
1284}
1285
1286fn read_u32(data: &[u8], pos: &mut usize) -> io::Result<u32> {
1287    if data.len().saturating_sub(*pos) < 4 {
1288        return Err(io::Error::new(
1289            io::ErrorKind::UnexpectedEof,
1290            "unexpected eof while reading u32",
1291        ));
1292    }
1293    let value = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
1294    *pos += 4;
1295    Ok(value)
1296}
1297
1298fn read_u64(data: &[u8], pos: &mut usize) -> io::Result<u64> {
1299    if data.len().saturating_sub(*pos) < 8 {
1300        return Err(io::Error::new(
1301            io::ErrorKind::UnexpectedEof,
1302            "unexpected eof while reading u64",
1303        ));
1304    }
1305    let value = u64::from_le_bytes([
1306        data[*pos],
1307        data[*pos + 1],
1308        data[*pos + 2],
1309        data[*pos + 3],
1310        data[*pos + 4],
1311        data[*pos + 5],
1312        data[*pos + 6],
1313        data[*pos + 7],
1314    ]);
1315    *pos += 8;
1316    Ok(value)
1317}
1318
1319fn read_string(data: &[u8], pos: &mut usize) -> io::Result<String> {
1320    let len = read_u32(data, pos)? as usize;
1321    if data.len().saturating_sub(*pos) < len {
1322        return Err(io::Error::new(
1323            io::ErrorKind::UnexpectedEof,
1324            "unexpected eof while reading string",
1325        ));
1326    }
1327    let value = std::str::from_utf8(&data[*pos..*pos + len])
1328        .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?
1329        .to_string();
1330    *pos += len;
1331    Ok(value)
1332}
1333
1334fn read_bytes(data: &[u8], pos: &mut usize) -> io::Result<Vec<u8>> {
1335    let len = read_u32(data, pos)? as usize;
1336    if data.len().saturating_sub(*pos) < len {
1337        return Err(io::Error::new(
1338            io::ErrorKind::UnexpectedEof,
1339            "unexpected eof while reading bytes",
1340        ));
1341    }
1342    let value = data[*pos..*pos + len].to_vec();
1343    *pos += len;
1344    Ok(value)
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use super::*;
1350    use crate::api::{DurabilityMode, GroupCommitOptions};
1351    use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1352    use std::time::SystemTime;
1353
1354    /// Serialise tests that mutate `REDDB_GROUP_COMMIT_WINDOW_US`.
1355    /// The env table is process-global, so two parallel test threads
1356    /// flipping it would race the assertions in the test that
1357    /// happens to read it last.
1358    fn env_lock() -> &'static StdMutex<()> {
1359        static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1360        LOCK.get_or_init(|| StdMutex::new(()))
1361    }
1362
1363    fn temp_wal(name: &str) -> PathBuf {
1364        let nanos = SystemTime::now()
1365            .duration_since(std::time::UNIX_EPOCH)
1366            .unwrap()
1367            .as_nanos();
1368        let path = std::env::temp_dir().join(format!(
1369            "rb_commit_coord_{}_{}_{}.wal",
1370            name,
1371            std::process::id(),
1372            nanos
1373        ));
1374        let _ = std::fs::remove_file(&path);
1375        path
1376    }
1377
1378    /// Concurrent autocommits MUST coalesce into far fewer fsyncs
1379    /// than the number of callers when the adaptive group-commit
1380    /// window is active. Without the 200µs floor, every caller
1381    /// would race the drain loop and pay an independent fsync.
1382    ///
1383    /// The test fires 32 concurrent `append_actions` calls and asserts
1384    /// the coordinator issued strictly fewer fsyncs than callers.
1385    /// (We don't pin to "one fsync" because timing on loaded CI hosts
1386    /// can split the burst across a couple of drain cycles — the win
1387    /// is the ratio, not the absolute count.)
1388    #[test]
1389    fn group_commit_coalesces_concurrent_autocommits() {
1390        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1391        // Make sure no stale env var skews this run — the test
1392        // exercises the default 200µs floor.
1393        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1394
1395        let path = temp_wal("coalesce");
1396        let coord = Arc::new(
1397            StoreCommitCoordinator::open(
1398                path.clone(),
1399                DurabilityMode::WalDurableGrouped,
1400                GroupCommitOptions::default(),
1401            )
1402            .expect("open commit coordinator"),
1403        );
1404
1405        const WRITERS: usize = 32;
1406        let barrier = Arc::new(Barrier::new(WRITERS));
1407        let mut handles = Vec::with_capacity(WRITERS);
1408        for tx in 0..WRITERS {
1409            let coord_c = Arc::clone(&coord);
1410            let barrier_c = Arc::clone(&barrier);
1411            handles.push(std::thread::spawn(move || {
1412                // Synchronise the start so every writer races the
1413                // drain loop together — this is the workload shape
1414                // the adaptive window is supposed to coalesce.
1415                barrier_c.wait();
1416                let action = StoreWalAction::CreateCollection {
1417                    name: format!("c{tx}"),
1418                };
1419                coord_c
1420                    .append_actions(std::slice::from_ref(&action))
1421                    .expect("append_actions");
1422            }));
1423        }
1424        for h in handles {
1425            h.join().expect("writer thread");
1426        }
1427
1428        let fsyncs = coord.fsync_count();
1429        assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1430        assert!(
1431            fsyncs < WRITERS as u64,
1432            "expected fsyncs ({fsyncs}) to be strictly less than \
1433             concurrent writers ({WRITERS}); coalescing failed"
1434        );
1435
1436        drop(coord);
1437        let _ = std::fs::remove_file(&path);
1438    }
1439
1440    /// Sanity check: with the env override forcing a zero window,
1441    /// fsync coalescing degrades — callers race and we approach
1442    /// one fsync per caller. This proves the window is the actual
1443    /// knob doing the work above (i.e. the test isn't passing for
1444    /// some unrelated reason like buffered-IO coalescing).
1445    #[test]
1446    fn zero_window_disables_coalescing_floor() {
1447        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1448        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1449
1450        let path = temp_wal("zero_window");
1451        let coord = Arc::new(
1452            StoreCommitCoordinator::open(
1453                path.clone(),
1454                DurabilityMode::WalDurableGrouped,
1455                GroupCommitOptions::default(),
1456            )
1457            .expect("open commit coordinator"),
1458        );
1459
1460        const WRITERS: usize = 8;
1461        let barrier = Arc::new(Barrier::new(WRITERS));
1462        let mut handles = Vec::with_capacity(WRITERS);
1463        for tx in 0..WRITERS {
1464            let coord_c = Arc::clone(&coord);
1465            let barrier_c = Arc::clone(&barrier);
1466            handles.push(std::thread::spawn(move || {
1467                barrier_c.wait();
1468                let action = StoreWalAction::CreateCollection {
1469                    name: format!("z{tx}"),
1470                };
1471                coord_c
1472                    .append_actions(std::slice::from_ref(&action))
1473                    .expect("append_actions");
1474            }));
1475        }
1476        for h in handles {
1477            h.join().expect("writer thread");
1478        }
1479
1480        // With zero window, fsync count is bounded above by WRITERS
1481        // (every caller might trigger its own drain) and below by 1
1482        // (the queue may still naturally batch under contention).
1483        // The point of the assertion is to confirm the env override
1484        // is wired through — the open() above used the env knob.
1485        let fsyncs = coord.fsync_count();
1486        assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1487
1488        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1489        drop(coord);
1490        let _ = std::fs::remove_file(&path);
1491    }
1492
1493    /// `resolve_window` precedence: env > config.window_ms > default.
1494    #[test]
1495    fn resolve_window_precedence() {
1496        let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1497        // Default: window_ms=0 → adaptive 200µs floor.
1498        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1499        let cfg = GroupCommitOptions::default();
1500        assert_eq!(
1501            StoreCommitCoordinator::resolve_window(&cfg),
1502            Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1503        );
1504
1505        // Explicit ms config wins over the default floor.
1506        let cfg_ms = GroupCommitOptions {
1507            window_ms: 5,
1508            ..GroupCommitOptions::default()
1509        };
1510        assert_eq!(
1511            StoreCommitCoordinator::resolve_window(&cfg_ms),
1512            Duration::from_millis(5)
1513        );
1514
1515        // Env override wins over both.
1516        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1517        assert_eq!(
1518            StoreCommitCoordinator::resolve_window(&cfg),
1519            Duration::from_micros(750)
1520        );
1521        assert_eq!(
1522            StoreCommitCoordinator::resolve_window(&cfg_ms),
1523            Duration::from_micros(750)
1524        );
1525
1526        // Env=0 explicitly disables the floor.
1527        std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1528        assert_eq!(
1529            StoreCommitCoordinator::resolve_window(&cfg),
1530            Duration::from_micros(0)
1531        );
1532
1533        std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1534    }
1535}