Skip to main content

lora_wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The engine mutex already serialises us in production
26//! (one query per critical section), so the inner mutex is uncontested
27//! and effectively free.
28
29use std::fs;
30#[cfg(test)]
31use std::fs::OpenOptions;
32use std::path::Path;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::{Arc, Mutex, Weak};
35use std::thread::{self, JoinHandle};
36use std::time::Duration;
37
38use lora_store::MutationEvent;
39
40use crate::config::SyncMode;
41use crate::dir::{SegmentDir, SegmentId};
42use crate::error::WalError;
43use crate::lock::DirLock;
44use crate::lsn::Lsn;
45use crate::record::WalRecord;
46use crate::replay::{replay_segments, ReplayOutcome};
47use crate::segment::SegmentWriter;
48
49/// State guarded by the inner `Mutex`. Nothing in this struct is
50/// `Send`-unsafe; the lock is purely for `&self`-safe interior
51/// mutation.
52struct WalState {
53    next_lsn: Lsn,
54    durable_lsn: Lsn,
55    active_segment_id: SegmentId,
56    active_writer: SegmentWriter,
57    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
58    oldest_segment_id: SegmentId,
59}
60
61/// Latched failure from the background flusher. Wrapped in a `Mutex`
62/// instead of an `AtomicCell<Option<String>>` because failures are
63/// rare and we want the message preserved verbatim for operator-facing
64/// reporting (`/admin/wal/status` `bgFailure`). Once `Some`, every
65/// subsequent commit/flush returns [`WalError::Poisoned`] and the
66/// operator is expected to restart from the last consistent
67/// snapshot + WAL.
68type BgFailure = Mutex<Option<String>>;
69
70/// Selects the durability work that [`Wal::flush_inner`] actually does.
71/// Centralising the three modes here means `flush` and `force_fsync`
72/// share one code path and the call sites don't have to remember which
73/// mode advances `durable_lsn` and which does not.
74#[derive(Debug, Clone, Copy)]
75enum FlushKind {
76    /// Honour the configured [`SyncMode`]. This is what the recorder's
77    /// `flush()` calls into.
78    PerConfiguredMode,
79    /// Always write the buffer + fsync + advance `durable_lsn`,
80    /// regardless of mode. Used by checkpoints and the bg flusher.
81    ForceFsync,
82}
83
84/// Live, append-side WAL handle.
85///
86/// Construct via [`Wal::open`]. The returned tuple includes the list of
87/// committed mutation events that need to be re-applied to the
88/// in-memory store before any new traffic is accepted.
89///
90/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
91/// background flusher needs a `Weak<Wal>` to call back into without
92/// taking a strong reference (which would prevent shutdown).
93pub struct Wal {
94    segments: SegmentDir,
95    sync_mode: SyncMode,
96    segment_target_bytes: u64,
97    state: Mutex<WalState>,
98    /// Latched bg-flusher failure; surfaced via [`Wal::bg_failure`] and
99    /// propagated to commit/flush/force_fsync as
100    /// [`WalError::Poisoned`].
101    bg_failure: Arc<BgFailure>,
102    /// Background flusher for `SyncMode::Group`. `Drop` joins the
103    /// thread, so a `Wal` going out of scope is a clean shutdown
104    /// signal.
105    _flusher: Mutex<Option<GroupFlusherHandle>>,
106    /// Held for the lifetime of the WAL so a second handle cannot append
107    /// to the same active segment concurrently.
108    _dir_lock: DirLock,
109}
110
111impl Wal {
112    /// Open or create the WAL directory at `dir`.
113    ///
114    /// `checkpoint_lsn` is the LSN stamped into the most recent
115    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
116    /// there is no snapshot). Replay skips records at or below this
117    /// fence — they are already represented in the loaded state.
118    ///
119    /// Returns `(wal, committed_events)`. The caller is expected to
120    /// apply every event in `committed_events` to its in-memory store
121    /// in order before issuing any new `begin` / `append` calls.
122    pub fn open(
123        dir: impl Into<std::path::PathBuf>,
124        sync_mode: SyncMode,
125        segment_target_bytes: u64,
126        checkpoint_lsn: Lsn,
127    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
128        let segments = SegmentDir::new(dir);
129        fs::create_dir_all(segments.root())?;
130        let dir_lock = DirLock::acquire(segments.root())?;
131
132        let entries = segments.list()?;
133        let (active_id, active_writer, replay) = if entries.is_empty() {
134            Self::open_fresh(&segments)?
135        } else {
136            Self::open_existing(&segments, &entries, checkpoint_lsn)?
137        };
138
139        let next_lsn = if replay.max_lsn.is_zero() {
140            Lsn::new(1)
141        } else {
142            replay.max_lsn.next()
143        };
144        // Treat everything readable at open time as the recovered
145        // durability fence. This does not prove the bytes were
146        // fsync-confirmed before the previous process died; it means
147        // they survived to this open and future appends must start
148        // after them.
149        let durable_lsn = replay.max_lsn;
150
151        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
152
153        let state = WalState {
154            next_lsn,
155            durable_lsn,
156            active_segment_id: active_id,
157            active_writer,
158            oldest_segment_id,
159        };
160
161        let wal = Arc::new(Self {
162            segments,
163            sync_mode,
164            segment_target_bytes,
165            state: Mutex::new(state),
166            bg_failure: Arc::new(Mutex::new(None)),
167            _flusher: Mutex::new(None),
168            _dir_lock: dir_lock,
169        });
170
171        // Spawn the Group flusher *after* the Arc exists so it can
172        // hold a `Weak<Wal>` that drops when the last strong ref
173        // does. The flusher's own Drop joins the thread, so removing
174        // the field (e.g. on Wal::drop) is a clean shutdown signal.
175        if let SyncMode::Group { interval_ms } = sync_mode {
176            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
177            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
178            *wal._flusher.lock().unwrap() = Some(handle);
179        }
180
181        Ok((wal, replay.committed_events))
182    }
183
184    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
185    /// so LSN 0 stays reserved for "empty / never written".
186    fn open_fresh(
187        segments: &SegmentDir,
188    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
189        let id = SegmentId::FIRST;
190        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
191        segments.sync_dir()?;
192        let replay = ReplayOutcome {
193            committed_events: Vec::new(),
194            max_lsn: Lsn::ZERO,
195            torn_tail: None,
196            checkpoint_lsn_observed: None,
197        };
198        Ok((id, writer, replay))
199    }
200
201    /// Existing directory. Replay every segment to surface committed
202    /// events + detect a torn tail; reopen the highest-id segment
203    /// for append; truncate it if the torn tail is in *that* segment.
204    fn open_existing(
205        segments: &SegmentDir,
206        entries: &[crate::dir::SegmentEntry],
207        checkpoint_lsn: Lsn,
208    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
209        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
210        let replay = replay_segments(&paths, checkpoint_lsn)?;
211
212        // The active segment is whichever file has the highest
213        // numeric id — segment file names are self-describing, so
214        // there is no separate CURRENT pointer.
215        let active = entries.last().expect("entries non-empty in open_existing");
216        let (mut writer, _torn_from_writer) =
217            SegmentWriter::open_for_append(segments.path_for(active.id))?;
218
219        // A torn tail in a *sealed* segment is impossible (sealed
220        // segments are never appended to), so we only need to handle
221        // the active one.
222        if let Some(t) = &replay.torn_tail {
223            if t.segment_path == active.path {
224                writer.truncate_to(t.last_good_offset)?;
225            } else {
226                return Err(WalError::Malformed(format!(
227                    "torn tail found in sealed segment {}",
228                    t.segment_path.display()
229                )));
230            }
231        }
232
233        Ok((active.id, writer, replay))
234    }
235
236    pub fn dir(&self) -> &Path {
237        self.segments.root()
238    }
239
240    pub fn sync_mode(&self) -> SyncMode {
241        self.sync_mode
242    }
243
244    pub fn durable_lsn(&self) -> Lsn {
245        self.state.lock().unwrap().durable_lsn
246    }
247
248    /// Latched message from the background flusher, if it has ever
249    /// failed an `fsync`. `None` means the WAL is healthy. Once set,
250    /// every commit / flush / force_fsync starts returning
251    /// [`WalError::Poisoned`] and the WAL stops accepting new
252    /// transactions until the operator restarts from the last
253    /// consistent snapshot + WAL.
254    pub fn bg_failure(&self) -> Option<String> {
255        self.bg_failure.lock().unwrap().clone()
256    }
257
258    fn check_healthy(&self) -> Result<(), WalError> {
259        if self.bg_failure.lock().unwrap().is_some() {
260            return Err(WalError::Poisoned);
261        }
262        Ok(())
263    }
264
265    /// LSN that the *next* `begin` / `append` call will allocate.
266    /// Exposed for tests and for sanity checks at boot; not part of
267    /// any durability contract.
268    pub fn next_lsn(&self) -> Lsn {
269        self.state.lock().unwrap().next_lsn
270    }
271
272    pub fn oldest_segment_id(&self) -> u64 {
273        self.state.lock().unwrap().oldest_segment_id.raw()
274    }
275
276    pub fn active_segment_id(&self) -> u64 {
277        self.state.lock().unwrap().active_segment_id.raw()
278    }
279
280    /// Begin a new transaction. Allocates a `TxBegin` record and
281    /// returns its LSN, which the caller must thread back through
282    /// `append` / `commit` / `abort` so replay can group the events.
283    ///
284    /// If the active segment has crossed `segment_target_bytes`,
285    /// rotation happens here — `TxBegin` is the only record kind
286    /// guaranteed to be a transaction boundary, so rotating just
287    /// before its append keeps every transaction wholly in one
288    /// segment.
289    pub fn begin(&self) -> Result<Lsn, WalError> {
290        self.check_healthy()?;
291        let mut state = self.state.lock().unwrap();
292        self.maybe_rotate(&mut state)?;
293        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
294    }
295
296    /// Append a single mutation to the in-memory pending buffer of
297    /// the active segment. Not durable until `flush()` runs.
298    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
299        self.check_healthy()?;
300        let mut state = self.state.lock().unwrap();
301        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
302            lsn,
303            tx_begin_lsn,
304            event: event.clone(),
305        })
306    }
307
308    /// Append a `TxCommit` marker. Caller is expected to subsequently
309    /// call `flush()` (under `SyncMode::PerCommit`) to make the
310    /// commit durable before returning to its caller.
311    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
312        self.check_healthy()?;
313        let mut state = self.state.lock().unwrap();
314        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
315    }
316
317    /// Append a `TxAbort` marker. Replay drops the events keyed by
318    /// `tx_begin_lsn` without re-applying them.
319    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
320        self.check_healthy()?;
321        let mut state = self.state.lock().unwrap();
322        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
323    }
324
325    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
326    /// LSN written into the snapshot file's header — replay uses
327    /// it to defend against the snapshot-rename-but-no-marker race.
328    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
329        self.check_healthy()?;
330        let mut state = self.state.lock().unwrap();
331        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
332            lsn,
333            snapshot_lsn,
334        })
335    }
336
337    /// Single-source-of-truth for "allocate the next LSN, build the
338    /// record, push it onto the active segment's pending buffer".
339    /// The five public append paths (`begin / append / commit / abort
340    /// / checkpoint_marker`) all funnel through here so the LSN
341    /// allocation never gets out of sync with the encoded record.
342    #[inline]
343    fn alloc_and_append(
344        state: &mut WalState,
345        build: impl FnOnce(Lsn) -> WalRecord,
346    ) -> Result<Lsn, WalError> {
347        let lsn = state.next_lsn;
348        state.next_lsn = lsn.next();
349        state.active_writer.append(&build(lsn))?;
350        Ok(lsn)
351    }
352
353    /// Flush the active segment's pending buffer.
354    ///
355    /// What "flush" means depends on [`SyncMode`]:
356    ///
357    /// - `PerCommit` — write the buffer to the OS, `fsync`, and
358    ///   advance `durable_lsn`. The strongest contract: every
359    ///   record up to `next_lsn - 1` is on disk.
360    /// - `Group` — write the buffer to the OS only. `durable_lsn`
361    ///   is NOT advanced; the background flusher does that on its
362    ///   own cadence.
363    /// - `None` — write the buffer to the OS only, but advance
364    ///   `durable_lsn` anyway. The mode opts out of crash
365    ///   durability, so the checkpoint fence reports
366    ///   "what's been written" instead of "what's actually safe".
367    pub fn flush(&self) -> Result<(), WalError> {
368        self.check_healthy()?;
369        self.flush_inner(FlushKind::PerConfiguredMode)
370    }
371
372    /// Unconditionally write the buffer to the OS, `fsync`, and
373    /// advance `durable_lsn`. Used by callers that need a durability
374    /// point right now regardless of the configured cadence (e.g.
375    /// checkpoint). Returns [`WalError::Poisoned`] if the bg flusher
376    /// has already failed.
377    pub fn force_fsync(&self) -> Result<(), WalError> {
378        self.check_healthy()?;
379        self.flush_inner(FlushKind::ForceFsync)
380    }
381
382    /// Single source of truth for the flush state machine. Skips the
383    /// `check_healthy` gate so the bg flusher can call into it
384    /// without recursing through its own latch.
385    fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
386        let mut state = self.state.lock().unwrap();
387        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
388
389        // Decide whether this call is allowed to advance
390        // `durable_lsn`. The bg flusher's job in Group mode is
391        // exactly to do that "out of band"; PerCommit and None do it
392        // inline; Group's user-driven `flush()` does not.
393        let do_fsync = matches!(
394            (kind, self.sync_mode),
395            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
396        );
397        let advance_durable = matches!(
398            (kind, self.sync_mode),
399            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
400        );
401
402        if do_fsync {
403            state.active_writer.flush_and_sync()?;
404        } else {
405            state.active_writer.flush_buffer()?;
406        }
407        if advance_durable {
408            state.durable_lsn = written_lsn;
409        }
410        Ok(())
411    }
412
413    /// Drop sealed segments whose entire LSN range is at or below
414    /// `fence_lsn`. Idempotent and safe to call repeatedly.
415    ///
416    /// The active segment is never deleted — even if every record in
417    /// it predates the fence, it is still the rotation target for
418    /// new appends. The segment immediately before the active one
419    /// is also kept as a tombstone so a subsequent crash before the
420    /// next checkpoint still finds a self-describing log start.
421    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
422        let mut state = self.state.lock().unwrap();
423        let active_id = state.active_segment_id;
424        let entries = self.segments.list()?;
425
426        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
427        for (i, entry) in entries.iter().enumerate() {
428            // Active segment and the one immediately preceding it
429            // are kept by policy.
430            if entry.id >= active_id.saturating_prev() {
431                break;
432            }
433            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
434            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
435            let next = match entries.get(i + 1) {
436                Some(n) => n,
437                None => break,
438            };
439            let next_base = SegmentDir::base_lsn(&next.path)?;
440            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
441                to_drop.push(entry.clone());
442            }
443        }
444
445        for entry in to_drop {
446            fs::remove_file(&entry.path)?;
447            if entry.id >= state.oldest_segment_id {
448                state.oldest_segment_id = entry.id.next();
449            }
450        }
451        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
452            self.segments.sync_dir()?;
453        }
454        Ok(())
455    }
456
457    /// Rotate the active segment when it has grown past
458    /// `segment_target_bytes`. Called from `begin()` so rotation only
459    /// ever lands at a transaction boundary.
460    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
461        if state.active_writer.bytes_written() < self.segment_target_bytes {
462            return Ok(());
463        }
464        // Seal the current segment (forces a flush + fsync) and open
465        // a fresh one with `base_lsn = next_lsn` so the segment file
466        // names line up with the record LSNs they contain.
467        state.active_writer.flush_and_sync()?;
468        state.active_writer.seal()?;
469
470        let next_id = state.active_segment_id.next();
471        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
472        self.segments.sync_dir()?;
473        state.active_writer = writer;
474        state.active_segment_id = next_id;
475        Ok(())
476    }
477}
478
479impl Drop for Wal {
480    fn drop(&mut self) {
481        // Join the group flusher, if any, before the directory lock is
482        // released. That keeps the "one live append owner" boundary intact
483        // through shutdown.
484        if let Ok(slot) = self._flusher.get_mut() {
485            let _ = slot.take();
486        }
487    }
488}
489
490// ---------------------------------------------------------------------------
491// Group-mode background flusher
492// ---------------------------------------------------------------------------
493
494/// Owns the OS thread that periodically `fsync`s the WAL under
495/// `SyncMode::Group`. Held inside the `Wal` itself so dropping the
496/// last `Arc<Wal>` runs `Drop` here, signals shutdown, and joins
497/// before the underlying `WalState` is destroyed.
498struct GroupFlusherHandle {
499    shutdown: Arc<AtomicBool>,
500    handle: Option<JoinHandle<()>>,
501}
502
503impl Drop for GroupFlusherHandle {
504    fn drop(&mut self) {
505        self.shutdown.store(true, Ordering::Release);
506        if let Some(h) = self.handle.take() {
507            // `let _ = ...` because the thread can only fail by
508            // panicking; even then, the Wal itself is being dropped
509            // and there is nothing useful to do with the panic at
510            // teardown.
511            let _ = h.join();
512        }
513    }
514}
515
516fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
517    let shutdown = Arc::new(AtomicBool::new(false));
518    let shutdown_clone = Arc::clone(&shutdown);
519    let handle = thread::spawn(move || {
520        // Sleep first so a shortlived Wal that opens-and-closes
521        // immediately doesn't pay for an extra wakeup. We re-check
522        // the shutdown flag at every iteration so a Drop signal
523        // racing with a sleep wakes up at most one interval late.
524        while !shutdown_clone.load(Ordering::Acquire) {
525            // Break the sleep into ~50 ms slices so shutdown can be
526            // observed without waiting up to a full `interval` at
527            // teardown. This matters for tests, which want fast
528            // join times.
529            let slice = Duration::from_millis(50).min(interval);
530            let mut elapsed = Duration::ZERO;
531            while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
532                thread::sleep(slice);
533                elapsed += slice;
534            }
535            if shutdown_clone.load(Ordering::Acquire) {
536                break;
537            }
538            match weak.upgrade() {
539                Some(wal) => {
540                    // Latch any fsync failure into `bg_failure` and
541                    // stop the flusher. Subsequent commits / flushes
542                    // see the latch via `check_healthy` and start
543                    // returning `WalError::Poisoned`, which
544                    // `WalRecorder` propagates to the host as a
545                    // durability error. Operators recover by
546                    // restarting from the last consistent
547                    // snapshot + WAL.
548                    if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
549                        let mut slot = wal.bg_failure.lock().unwrap();
550                        if slot.is_none() {
551                            *slot = Some(format!("bg fsync failed: {err}"));
552                        }
553                        break;
554                    }
555                }
556                None => break,
557            }
558        }
559    });
560    GroupFlusherHandle {
561        shutdown,
562        handle: Some(handle),
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use lora_store::{MutationEvent, Properties, PropertyValue};
570
571    use crate::testing::TmpDir;
572
573    fn ev(id: u64) -> MutationEvent {
574        let mut p = Properties::new();
575        p.insert("v".into(), PropertyValue::Int(id as i64));
576        MutationEvent::CreateNode {
577            id,
578            labels: vec!["N".into()],
579            properties: p,
580        }
581    }
582
583    fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
584        Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
585    }
586
587    #[test]
588    fn fresh_open_creates_first_segment() {
589        let dir = TmpDir::new("fresh");
590        let (wal, replay) = open_default(&dir.path);
591        assert!(replay.is_empty());
592        assert_eq!(wal.next_lsn(), Lsn::new(1));
593        assert_eq!(wal.active_segment_id(), 1);
594        // No CURRENT pointer file is written — the highest segment id
595        // is the source of truth for "active segment".
596        let entries: Vec<_> = std::fs::read_dir(&dir.path)
597            .unwrap()
598            .filter_map(|e| e.ok())
599            .map(|e| e.file_name().to_string_lossy().into_owned())
600            .collect();
601        assert!(
602            entries.iter().any(|n| n == ".lora-wal.lock"),
603            "WAL dir should contain the live directory lock, found: {entries:?}"
604        );
605        assert!(
606            entries
607                .iter()
608                .filter(|n| n.as_str() != ".lora-wal.lock")
609                .all(|n| n.ends_with(".wal")),
610            "WAL dir should contain only segment files plus the lock, found: {entries:?}"
611        );
612    }
613
614    #[test]
615    fn opening_same_directory_twice_fails_until_first_handle_drops() {
616        let dir = TmpDir::new("exclusive");
617        let (wal, _) = open_default(&dir.path);
618
619        match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
620            Err(WalError::AlreadyOpen { dir: locked_dir }) => {
621                assert_eq!(locked_dir, dir.path);
622            }
623            Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
624            Ok(_) => panic!("second WAL open on same directory should fail"),
625        }
626
627        drop(wal);
628        let (reopened, _) = open_default(&dir.path);
629        drop(reopened);
630    }
631
632    #[test]
633    fn begin_append_commit_round_trip_through_replay() {
634        let dir = TmpDir::new("commit");
635
636        // First boot: write three transactions and crash without
637        // running shutdown.
638        {
639            let (wal, _) = open_default(&dir.path);
640            let begin = wal.begin().unwrap();
641            wal.append(begin, &ev(1)).unwrap();
642            wal.append(begin, &ev(2)).unwrap();
643            wal.commit(begin).unwrap();
644            wal.flush().unwrap();
645
646            let begin = wal.begin().unwrap();
647            wal.append(begin, &ev(3)).unwrap();
648            wal.commit(begin).unwrap();
649            wal.flush().unwrap();
650            // drop without explicit close
651        }
652
653        // Second boot: replay should yield events 1, 2, 3 in order.
654        let (wal, replay) = open_default(&dir.path);
655        assert_eq!(replay.len(), 3);
656        assert_eq!(replay[0], ev(1));
657        assert_eq!(replay[1], ev(2));
658        assert_eq!(replay[2], ev(3));
659        // next_lsn should be past every record we wrote (2 begins +
660        // 3 mutations + 2 commits = 7 records → next_lsn = 8).
661        assert_eq!(wal.next_lsn(), Lsn::new(8));
662    }
663
664    #[test]
665    fn aborted_transaction_is_dropped_on_replay() {
666        let dir = TmpDir::new("abort");
667
668        {
669            let (wal, _) = open_default(&dir.path);
670            let b1 = wal.begin().unwrap();
671            wal.append(b1, &ev(1)).unwrap();
672            wal.commit(b1).unwrap();
673            wal.flush().unwrap();
674
675            let b2 = wal.begin().unwrap();
676            wal.append(b2, &ev(99)).unwrap();
677            wal.abort(b2).unwrap();
678            wal.flush().unwrap();
679        }
680
681        let (_, replay) = open_default(&dir.path);
682        assert_eq!(replay, vec![ev(1)]);
683    }
684
685    #[test]
686    fn uncommitted_transaction_at_end_of_log_is_discarded() {
687        let dir = TmpDir::new("uncommitted");
688
689        {
690            let (wal, _) = open_default(&dir.path);
691            let b1 = wal.begin().unwrap();
692            wal.append(b1, &ev(1)).unwrap();
693            wal.commit(b1).unwrap();
694            wal.flush().unwrap();
695
696            // Begin + append but never commit. Simulates a crash
697            // mid-query.
698            let b2 = wal.begin().unwrap();
699            wal.append(b2, &ev(99)).unwrap();
700            wal.flush().unwrap();
701        }
702
703        let (_, replay) = open_default(&dir.path);
704        assert_eq!(replay, vec![ev(1)]);
705    }
706
707    #[test]
708    fn segment_rotation_at_begin_boundary() {
709        let dir = TmpDir::new("rotate");
710
711        // Tiny segment target so we trip rotation on the second
712        // transaction.
713        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
714
715        // First tx: a few events, takes us past 256 bytes.
716        let b1 = wal.begin().unwrap();
717        for i in 0..5 {
718            wal.append(b1, &ev(i)).unwrap();
719        }
720        wal.commit(b1).unwrap();
721        wal.flush().unwrap();
722        assert_eq!(wal.active_segment_id(), 1);
723
724        // Second `begin` triggers rotation.
725        let b2 = wal.begin().unwrap();
726        wal.append(b2, &ev(100)).unwrap();
727        wal.commit(b2).unwrap();
728        wal.flush().unwrap();
729        assert_eq!(
730            wal.active_segment_id(),
731            2,
732            "begin() should have rotated to segment 2"
733        );
734
735        let segments = SegmentDir::new(&dir.path).list().unwrap();
736        assert_eq!(segments.len(), 2);
737
738        drop(wal);
739        let (_, replay) = open_default(&dir.path);
740        assert_eq!(replay.len(), 6);
741    }
742
743    #[test]
744    fn checkpoint_lsn_skips_already_checkpointed_events() {
745        let dir = TmpDir::new("ckpt-skip");
746        let (wal, _) = open_default(&dir.path);
747
748        // Tx A: events 1,2 — ends at lsn 4.
749        let a = wal.begin().unwrap();
750        wal.append(a, &ev(1)).unwrap();
751        wal.append(a, &ev(2)).unwrap();
752        let commit_a = wal.commit(a).unwrap();
753        wal.flush().unwrap();
754
755        // Tx B: event 3 — past the fence.
756        let b = wal.begin().unwrap();
757        wal.append(b, &ev(3)).unwrap();
758        wal.commit(b).unwrap();
759        wal.flush().unwrap();
760        drop(wal);
761
762        // Re-open with checkpoint_lsn = commit_a so tx A is treated
763        // as already-applied.
764        let (_, replay) =
765            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
766        assert_eq!(replay, vec![ev(3)]);
767    }
768
769    #[test]
770    fn replay_rejects_commit_without_begin() {
771        let dir = TmpDir::new("commit-without-begin");
772
773        {
774            let (wal, _) = open_default(&dir.path);
775            wal.commit(Lsn::new(99)).unwrap();
776            wal.flush().unwrap();
777        }
778
779        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
780            Ok(_) => panic!("malformed WAL should not open"),
781            Err(err) => err,
782        };
783        assert!(
784            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
785            "expected malformed missing-begin error, got {err:?}"
786        );
787    }
788
789    #[test]
790    fn replay_rejects_mutation_without_begin() {
791        let dir = TmpDir::new("mutation-without-begin");
792
793        {
794            let (wal, _) = open_default(&dir.path);
795            wal.append(Lsn::new(99), &ev(1)).unwrap();
796            wal.flush().unwrap();
797        }
798
799        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
800            Ok(_) => panic!("malformed WAL should not open"),
801            Err(err) => err,
802        };
803        assert!(
804            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
805            "expected malformed missing-begin error, got {err:?}"
806        );
807    }
808
809    #[test]
810    fn torn_tail_is_truncated_on_open() {
811        let dir = TmpDir::new("torn");
812
813        {
814            let (wal, _) = open_default(&dir.path);
815            let b = wal.begin().unwrap();
816            wal.append(b, &ev(1)).unwrap();
817            wal.commit(b).unwrap();
818            wal.flush().unwrap();
819        }
820
821        // Append garbage to the active segment by hand.
822        let segments = SegmentDir::new(&dir.path).list().unwrap();
823        let active = &segments.last().unwrap().path;
824        {
825            use std::io::Write;
826            let mut f = OpenOptions::new().append(true).open(active).unwrap();
827            f.write_all(&[0xff; 32]).unwrap();
828            f.sync_all().unwrap();
829        }
830
831        // Re-open. Torn tail must be truncated; replay still yields
832        // ev(1); next_lsn picks up cleanly.
833        let (wal, replay) = open_default(&dir.path);
834        assert_eq!(replay, vec![ev(1)]);
835
836        // Subsequent appends don't trip a CRC failure.
837        let b = wal.begin().unwrap();
838        wal.append(b, &ev(2)).unwrap();
839        wal.commit(b).unwrap();
840        wal.flush().unwrap();
841        drop(wal);
842
843        let (_, replay) = open_default(&dir.path);
844        assert_eq!(replay, vec![ev(1), ev(2)]);
845    }
846
847    #[test]
848    fn checkpoint_marker_is_recorded_and_observed() {
849        let dir = TmpDir::new("ckpt-marker");
850
851        let snapshot_lsn = {
852            let (wal, _) = open_default(&dir.path);
853            let b = wal.begin().unwrap();
854            wal.append(b, &ev(1)).unwrap();
855            let commit = wal.commit(b).unwrap();
856            wal.flush().unwrap();
857            wal.checkpoint_marker(commit).unwrap();
858            wal.flush().unwrap();
859            commit
860        };
861
862        let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
863        assert_eq!(
864            outcome.checkpoint_lsn_observed,
865            Some(snapshot_lsn),
866            "checkpoint marker should be surfaced by replay"
867        );
868    }
869
870    #[test]
871    fn group_mode_durable_lsn_advances_via_bg_flusher() {
872        let dir = TmpDir::new("group");
873        // 25 ms interval = bg flusher should land within one or two
874        // 50 ms slices.
875        let (wal, _) = Wal::open(
876            &dir.path,
877            SyncMode::Group { interval_ms: 25 },
878            8 * 1024 * 1024,
879            Lsn::ZERO,
880        )
881        .unwrap();
882
883        let begin = wal.begin().unwrap();
884        wal.append(begin, &ev(1)).unwrap();
885        wal.commit(begin).unwrap();
886        wal.flush().unwrap(); // Group: write_buffer only; durable_lsn untouched.
887
888        // Immediately after a Group flush, durable_lsn should still
889        // be Lsn::ZERO — the bg flusher hasn't fired yet.
890        assert_eq!(
891            wal.durable_lsn(),
892            Lsn::ZERO,
893            "Group flush() must not advance durable_lsn"
894        );
895
896        // Wait up to ~500 ms for the bg flusher to advance the LSN.
897        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
898        loop {
899            if wal.durable_lsn() > Lsn::ZERO {
900                break;
901            }
902            if std::time::Instant::now() >= deadline {
903                panic!(
904                    "bg flusher did not advance durable_lsn within 500 ms (still at {})",
905                    wal.durable_lsn()
906                );
907            }
908            std::thread::sleep(std::time::Duration::from_millis(10));
909        }
910        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
911        // Wal drop should join the bg thread cleanly.
912        drop(wal);
913    }
914
915    #[test]
916    fn none_mode_advances_durable_lsn_on_flush() {
917        let dir = TmpDir::new("none");
918        let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
919
920        let begin = wal.begin().unwrap();
921        wal.append(begin, &ev(1)).unwrap();
922        wal.commit(begin).unwrap();
923        wal.flush().unwrap();
924
925        // None mode: flush() advances durable_lsn even without
926        // fsync, because the mode opted out of crash durability.
927        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
928    }
929
930    #[test]
931    fn force_fsync_always_advances_durable_lsn() {
932        let dir = TmpDir::new("force-fsync");
933        let (wal, _) = Wal::open(
934            &dir.path,
935            SyncMode::Group {
936                interval_ms: 60_000,
937            },
938            8 * 1024 * 1024,
939            Lsn::ZERO,
940        )
941        .unwrap();
942
943        let begin = wal.begin().unwrap();
944        wal.append(begin, &ev(1)).unwrap();
945        wal.commit(begin).unwrap();
946        wal.flush().unwrap(); // Group flush: durable_lsn unchanged.
947        assert_eq!(wal.durable_lsn(), Lsn::ZERO);
948
949        // force_fsync bypasses the configured cadence — used by
950        // checkpoints to grab a fence on demand.
951        wal.force_fsync().unwrap();
952        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
953    }
954
955    #[test]
956    fn truncate_up_to_drops_old_sealed_segments() {
957        let dir = TmpDir::new("truncate");
958
959        // Tiny target so each tx forces a rotation on the next begin.
960        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
961
962        let mut last_commit = Lsn::ZERO;
963        for i in 0..5 {
964            let b = wal.begin().unwrap();
965            wal.append(b, &ev(i)).unwrap();
966            last_commit = wal.commit(b).unwrap();
967            wal.flush().unwrap();
968        }
969        // Five transactions × tiny target: we should be on segment ≥ 4.
970        assert!(
971            wal.active_segment_id() >= 4,
972            "expected several rotations, got {}",
973            wal.active_segment_id()
974        );
975
976        let segments = SegmentDir::new(&dir.path);
977        let before = segments.list().unwrap().len();
978        wal.truncate_up_to(last_commit).unwrap();
979        let after = segments.list().unwrap().len();
980
981        assert!(
982            after < before,
983            "truncate_up_to should have dropped at least one segment ({} → {})",
984            before,
985            after
986        );
987        // Active + tombstone are always retained.
988        assert!(
989            after >= 2,
990            "active and the segment preceding it must be kept"
991        );
992
993        // Subsequent appends + reopen still produce all five events
994        // because the dropped segments only contained transactions
995        // already at or below `last_commit`, which we feed back as
996        // the checkpoint fence on reopen.
997        drop(wal);
998        let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
999        // Everything was at or below the fence, so replay is empty.
1000        assert!(replay.is_empty());
1001    }
1002}