Skip to main content

lora_wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29#[cfg(test)]
30use std::fs::OpenOptions;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex, Weak};
34use std::thread::{self, JoinHandle};
35use std::time::Duration;
36
37use lora_store::MutationEvent;
38
39use crate::config::SyncMode;
40use crate::dir::{SegmentDir, SegmentId};
41use crate::error::WalError;
42use crate::lock::DirLock;
43use crate::lsn::Lsn;
44use crate::record::WalRecord;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48/// State guarded by the inner `Mutex`. Nothing in this struct is
49/// `Send`-unsafe; the lock is purely for `&self`-safe interior
50/// mutation.
51struct WalState {
52    next_lsn: Lsn,
53    durable_lsn: Lsn,
54    active_segment_id: SegmentId,
55    active_writer: SegmentWriter,
56    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
57    oldest_segment_id: SegmentId,
58}
59
60/// Latched failure from the background flusher. Wrapped in a `Mutex`
61/// instead of an `AtomicCell<Option<String>>` because failures are
62/// rare and we want the message preserved verbatim for operator-facing
63/// reporting (`/admin/wal/status` `bgFailure`). Once `Some`, every
64/// subsequent commit/flush returns [`WalError::Poisoned`] and the
65/// operator is expected to restart from the last consistent
66/// snapshot + WAL.
67type BgFailure = Mutex<Option<String>>;
68
69/// Selects the durability work that [`Wal::flush_inner`] actually does.
70/// Centralising the three modes here means `flush` and `force_fsync`
71/// share one code path and the call sites don't have to remember which
72/// mode advances `durable_lsn` and which does not.
73#[derive(Debug, Clone, Copy)]
74enum FlushKind {
75    /// Honour the configured [`SyncMode`]. This is what the recorder's
76    /// `flush()` calls into.
77    PerConfiguredMode,
78    /// Always write the buffer + fsync + advance `durable_lsn`,
79    /// regardless of mode. Used by checkpoints and the bg flusher.
80    ForceFsync,
81}
82
83/// Live, append-side WAL handle.
84///
85/// Construct via [`Wal::open`]. The returned tuple includes the list of
86/// committed mutation events that need to be re-applied to the
87/// in-memory store before any new traffic is accepted.
88///
89/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
90/// background flusher needs a `Weak<Wal>` to call back into without
91/// taking a strong reference (which would prevent shutdown).
92pub struct Wal {
93    segments: SegmentDir,
94    sync_mode: SyncMode,
95    segment_target_bytes: u64,
96    state: Mutex<WalState>,
97    /// Latched bg-flusher failure; surfaced via [`Wal::bg_failure`] and
98    /// propagated to commit/flush/force_fsync as
99    /// [`WalError::Poisoned`].
100    bg_failure: Arc<BgFailure>,
101    /// Background flusher for `SyncMode::Group`. `Drop` joins the
102    /// thread, so a `Wal` going out of scope is a clean shutdown
103    /// signal.
104    _flusher: Mutex<Option<GroupFlusherHandle>>,
105    /// Held for the lifetime of the WAL so a second handle cannot append
106    /// to the same active segment concurrently.
107    _dir_lock: DirLock,
108}
109
110impl Wal {
111    /// Open or create the WAL directory at `dir`.
112    ///
113    /// `checkpoint_lsn` is the LSN stamped into the most recent
114    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
115    /// there is no snapshot). Replay skips records at or below this
116    /// fence — they are already represented in the loaded state.
117    ///
118    /// Returns `(wal, committed_events)`. The caller is expected to
119    /// apply every event in `committed_events` to its in-memory store
120    /// in order before issuing any new `begin` / `append` calls.
121    pub fn open(
122        dir: impl Into<std::path::PathBuf>,
123        sync_mode: SyncMode,
124        segment_target_bytes: u64,
125        checkpoint_lsn: Lsn,
126    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127        let segments = SegmentDir::new(dir);
128        fs::create_dir_all(segments.root())?;
129        let dir_lock = DirLock::acquire(segments.root())?;
130
131        let entries = segments.list()?;
132        let (active_id, active_writer, replay) = if entries.is_empty() {
133            Self::open_fresh(&segments)?
134        } else {
135            Self::open_existing(&segments, &entries, checkpoint_lsn)?
136        };
137
138        let next_lsn = if replay.max_lsn.is_zero() {
139            Lsn::new(1)
140        } else {
141            replay.max_lsn.next()
142        };
143        // Treat everything readable at open time as the recovered
144        // durability fence. This does not prove the bytes were
145        // fsync-confirmed before the previous process died; it means
146        // they survived to this open and future appends must start
147        // after them.
148        let durable_lsn = replay.max_lsn;
149
150        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
151
152        let state = WalState {
153            next_lsn,
154            durable_lsn,
155            active_segment_id: active_id,
156            active_writer,
157            oldest_segment_id,
158        };
159
160        let wal = Arc::new(Self {
161            segments,
162            sync_mode,
163            segment_target_bytes,
164            state: Mutex::new(state),
165            bg_failure: Arc::new(Mutex::new(None)),
166            _flusher: Mutex::new(None),
167            _dir_lock: dir_lock,
168        });
169
170        // Spawn the Group flusher *after* the Arc exists so it can
171        // hold a `Weak<Wal>` that drops when the last strong ref
172        // does. The flusher's own Drop joins the thread, so removing
173        // the field (e.g. on Wal::drop) is a clean shutdown signal.
174        if let SyncMode::Group { interval_ms } = sync_mode {
175            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
176            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
177            *wal._flusher.lock().unwrap() = Some(handle);
178        }
179
180        Ok((wal, replay.committed_events))
181    }
182
183    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
184    /// so LSN 0 stays reserved for "empty / never written".
185    fn open_fresh(
186        segments: &SegmentDir,
187    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
188        let id = SegmentId::FIRST;
189        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
190        segments.sync_dir()?;
191        let replay = ReplayOutcome {
192            committed_events: Vec::new(),
193            max_lsn: Lsn::ZERO,
194            torn_tail: None,
195            checkpoint_lsn_observed: None,
196        };
197        Ok((id, writer, replay))
198    }
199
200    /// Existing directory. Replay every segment to surface committed
201    /// events + detect a torn tail; reopen the highest-id segment
202    /// for append; truncate it if the torn tail is in *that* segment.
203    fn open_existing(
204        segments: &SegmentDir,
205        entries: &[crate::dir::SegmentEntry],
206        checkpoint_lsn: Lsn,
207    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
208        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
209        let replay = replay_segments(&paths, checkpoint_lsn)?;
210
211        // The active segment is whichever file has the highest
212        // numeric id — segment file names are self-describing, so
213        // there is no separate CURRENT pointer.
214        let active = entries.last().expect("entries non-empty in open_existing");
215        let (mut writer, _torn_from_writer) =
216            SegmentWriter::open_for_append(segments.path_for(active.id))?;
217
218        // A torn tail in a *sealed* segment is impossible (sealed
219        // segments are never appended to), so we only need to handle
220        // the active one.
221        if let Some(t) = &replay.torn_tail {
222            if t.segment_path == active.path {
223                writer.truncate_to(t.last_good_offset)?;
224            } else {
225                return Err(WalError::Malformed(format!(
226                    "torn tail found in sealed segment {}",
227                    t.segment_path.display()
228                )));
229            }
230        }
231
232        Ok((active.id, writer, replay))
233    }
234
235    pub fn dir(&self) -> &Path {
236        self.segments.root()
237    }
238
239    pub fn sync_mode(&self) -> SyncMode {
240        self.sync_mode
241    }
242
243    pub fn durable_lsn(&self) -> Lsn {
244        self.state.lock().unwrap().durable_lsn
245    }
246
247    /// Latched message from the background flusher, if it has ever
248    /// failed an `fsync`. `None` means the WAL is healthy. Once set,
249    /// every commit / flush / force_fsync starts returning
250    /// [`WalError::Poisoned`] and the WAL stops accepting new
251    /// transactions until the operator restarts from the last
252    /// consistent snapshot + WAL.
253    pub fn bg_failure(&self) -> Option<String> {
254        self.bg_failure.lock().unwrap().clone()
255    }
256
257    fn check_healthy(&self) -> Result<(), WalError> {
258        if self.bg_failure.lock().unwrap().is_some() {
259            return Err(WalError::Poisoned);
260        }
261        Ok(())
262    }
263
264    /// LSN that the *next* `begin` / `append` call will allocate.
265    /// Exposed for tests and for sanity checks at boot; not part of
266    /// any durability contract.
267    pub fn next_lsn(&self) -> Lsn {
268        self.state.lock().unwrap().next_lsn
269    }
270
271    pub fn oldest_segment_id(&self) -> u64 {
272        self.state.lock().unwrap().oldest_segment_id.raw()
273    }
274
275    pub fn active_segment_id(&self) -> u64 {
276        self.state.lock().unwrap().active_segment_id.raw()
277    }
278
279    /// Begin a new transaction. Allocates a `TxBegin` record and
280    /// returns its LSN, which the caller must thread back through
281    /// `append` / `commit` / `abort` so replay can group the events.
282    ///
283    /// If the active segment has crossed `segment_target_bytes`,
284    /// rotation happens here — `TxBegin` is the only record kind
285    /// guaranteed to be a transaction boundary, so rotating just
286    /// before its append keeps every transaction wholly in one
287    /// segment.
288    pub fn begin(&self) -> Result<Lsn, WalError> {
289        self.check_healthy()?;
290        let mut state = self.state.lock().unwrap();
291        self.maybe_rotate(&mut state)?;
292        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
293    }
294
295    /// Append a single mutation to the in-memory pending buffer of
296    /// the active segment. Not durable until `flush()` runs.
297    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
298        self.check_healthy()?;
299        let mut state = self.state.lock().unwrap();
300        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
301            lsn,
302            tx_begin_lsn,
303            event: event.clone(),
304        })
305    }
306
307    /// Append many mutations as one framed record. This keeps the replay
308    /// contract identical to repeated `append` calls while avoiding per-event
309    /// length/CRC/framing overhead for write-heavy statements.
310    pub fn append_batch(
311        &self,
312        tx_begin_lsn: Lsn,
313        events: Vec<MutationEvent>,
314    ) -> Result<Lsn, WalError> {
315        self.check_healthy()?;
316        if events.is_empty() {
317            return Err(WalError::Encode(
318                "mutation batch must contain at least one event".into(),
319            ));
320        }
321        let mut state = self.state.lock().unwrap();
322        Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
323            lsn,
324            tx_begin_lsn,
325            events,
326        })
327    }
328
329    /// Append a `TxCommit` marker. Caller is expected to subsequently
330    /// call `flush()` (under `SyncMode::PerCommit`) to make the
331    /// commit durable before returning to its caller.
332    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
333        self.check_healthy()?;
334        let mut state = self.state.lock().unwrap();
335        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
336    }
337
338    /// Append a `TxAbort` marker. Replay drops the events keyed by
339    /// `tx_begin_lsn` without re-applying them.
340    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
341        self.check_healthy()?;
342        let mut state = self.state.lock().unwrap();
343        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
344    }
345
346    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
347    /// LSN written into the snapshot file's header — replay uses
348    /// it to defend against the snapshot-rename-but-no-marker race.
349    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
350        self.check_healthy()?;
351        let mut state = self.state.lock().unwrap();
352        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
353            lsn,
354            snapshot_lsn,
355        })
356    }
357
358    /// Single-source-of-truth for "allocate the next LSN, build the
359    /// record, push it onto the active segment's pending buffer".
360    /// The five public append paths (`begin / append / commit / abort
361    /// / checkpoint_marker`) all funnel through here so the LSN
362    /// allocation never gets out of sync with the encoded record.
363    #[inline]
364    fn alloc_and_append(
365        state: &mut WalState,
366        build: impl FnOnce(Lsn) -> WalRecord,
367    ) -> Result<Lsn, WalError> {
368        let lsn = state.next_lsn;
369        state.next_lsn = lsn.next();
370        state.active_writer.append(&build(lsn))?;
371        Ok(lsn)
372    }
373
374    /// Flush the active segment's pending buffer.
375    ///
376    /// What "flush" means depends on [`SyncMode`]:
377    ///
378    /// - `PerCommit` — write the buffer to the OS, `fsync`, and
379    ///   advance `durable_lsn`. The strongest contract: every
380    ///   record up to `next_lsn - 1` is on disk.
381    /// - `Group` — write the buffer to the OS, but let the background
382    ///   flusher fsync and advance `durable_lsn` on its cadence.
383    /// - `None` — write the buffer to the OS only, but advance
384    ///   `durable_lsn` anyway. The mode opts out of crash
385    ///   durability, so the checkpoint fence reports
386    ///   "what's been written" instead of "what's actually safe".
387    pub fn flush(&self) -> Result<(), WalError> {
388        self.check_healthy()?;
389        self.flush_inner(FlushKind::PerConfiguredMode)
390    }
391
392    /// Unconditionally write the buffer to the OS, `fsync`, and
393    /// advance `durable_lsn`. Used by callers that need a durability
394    /// point right now regardless of the configured cadence (e.g.
395    /// checkpoint). Returns [`WalError::Poisoned`] if the bg flusher
396    /// has already failed.
397    pub fn force_fsync(&self) -> Result<(), WalError> {
398        self.check_healthy()?;
399        self.flush_inner(FlushKind::ForceFsync)
400    }
401
402    /// Single source of truth for the flush state machine. Skips the
403    /// `check_healthy` gate so the bg flusher can call into it
404    /// without recursing through its own latch.
405    fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
406        let mut state = self.state.lock().unwrap();
407        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
408
409        // Decide whether this call is allowed to advance
410        // `durable_lsn`. The bg flusher's job in Group mode is to advance
411        // that fence after fsync; PerCommit and None do it inline; Group's
412        // user-driven `flush()` only pushes bytes to the OS.
413        let do_fsync = matches!(
414            (kind, self.sync_mode),
415            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
416        );
417        let advance_durable = matches!(
418            (kind, self.sync_mode),
419            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
420        );
421
422        if do_fsync {
423            state.active_writer.flush_and_sync()?;
424        } else {
425            state.active_writer.flush_buffer()?;
426        }
427        if advance_durable {
428            state.durable_lsn = written_lsn;
429        }
430        Ok(())
431    }
432
433    /// Drop sealed segments whose entire LSN range is at or below
434    /// `fence_lsn`. Idempotent and safe to call repeatedly.
435    ///
436    /// The active segment is never deleted — even if every record in
437    /// it predates the fence, it is still the rotation target for
438    /// new appends. The segment immediately before the active one
439    /// is also kept as a tombstone so a subsequent crash before the
440    /// next checkpoint still finds a self-describing log start.
441    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
442        let mut state = self.state.lock().unwrap();
443        let active_id = state.active_segment_id;
444        let entries = self.segments.list()?;
445
446        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
447        for (i, entry) in entries.iter().enumerate() {
448            // Active segment and the one immediately preceding it
449            // are kept by policy.
450            if entry.id >= active_id.saturating_prev() {
451                break;
452            }
453            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
454            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
455            let next = match entries.get(i + 1) {
456                Some(n) => n,
457                None => break,
458            };
459            let next_base = SegmentDir::base_lsn(&next.path)?;
460            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
461                to_drop.push(entry.clone());
462            }
463        }
464
465        for entry in to_drop {
466            fs::remove_file(&entry.path)?;
467            if entry.id >= state.oldest_segment_id {
468                state.oldest_segment_id = entry.id.next();
469            }
470        }
471        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
472            self.segments.sync_dir()?;
473        }
474        Ok(())
475    }
476
477    /// Rotate the active segment when it has grown past
478    /// `segment_target_bytes`. Called from `begin()` so rotation only
479    /// ever lands at a transaction boundary.
480    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
481        if state.active_writer.bytes_written() < self.segment_target_bytes {
482            return Ok(());
483        }
484        // Seal the current segment (forces a flush + fsync) and open
485        // a fresh one with `base_lsn = next_lsn` so the segment file
486        // names line up with the record LSNs they contain.
487        state.active_writer.flush_and_sync()?;
488        state.active_writer.seal()?;
489
490        let next_id = state.active_segment_id.next();
491        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
492        self.segments.sync_dir()?;
493        state.active_writer = writer;
494        state.active_segment_id = next_id;
495        Ok(())
496    }
497}
498
499impl Drop for Wal {
500    fn drop(&mut self) {
501        if matches!(self.sync_mode, SyncMode::Group { .. }) {
502            let _ = self.flush_inner(FlushKind::ForceFsync);
503        }
504        // Join the group flusher, if any, before the directory lock is
505        // released. That keeps the "one live append owner" boundary intact
506        // through shutdown.
507        if let Ok(slot) = self._flusher.get_mut() {
508            let _ = slot.take();
509        }
510    }
511}
512
513// ---------------------------------------------------------------------------
514// Group-mode background flusher
515// ---------------------------------------------------------------------------
516
517/// Owns the OS thread that periodically `fsync`s the WAL under
518/// `SyncMode::Group`. Held inside the `Wal` itself so dropping the
519/// last `Arc<Wal>` runs `Drop` here, signals shutdown, and joins
520/// before the underlying `WalState` is destroyed.
521struct GroupFlusherHandle {
522    shutdown: Arc<AtomicBool>,
523    handle: Option<JoinHandle<()>>,
524}
525
526impl Drop for GroupFlusherHandle {
527    fn drop(&mut self) {
528        self.shutdown.store(true, Ordering::Release);
529        if let Some(h) = self.handle.take() {
530            // `let _ = ...` because the thread can only fail by
531            // panicking; even then, the Wal itself is being dropped
532            // and there is nothing useful to do with the panic at
533            // teardown.
534            let _ = h.join();
535        }
536    }
537}
538
539fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
540    let shutdown = Arc::new(AtomicBool::new(false));
541    let shutdown_clone = Arc::clone(&shutdown);
542    let handle = thread::spawn(move || {
543        // Sleep first so a shortlived Wal that opens-and-closes
544        // immediately doesn't pay for an extra wakeup. We re-check
545        // the shutdown flag at every iteration so a Drop signal
546        // racing with a sleep wakes up at most one interval late.
547        while !shutdown_clone.load(Ordering::Acquire) {
548            // Break the sleep into ~50 ms slices so shutdown can be
549            // observed without waiting up to a full `interval` at
550            // teardown. This matters for tests, which want fast
551            // join times.
552            let slice = Duration::from_millis(50).min(interval);
553            let mut elapsed = Duration::ZERO;
554            while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
555                thread::sleep(slice);
556                elapsed += slice;
557            }
558            if shutdown_clone.load(Ordering::Acquire) {
559                break;
560            }
561            match weak.upgrade() {
562                Some(wal) => {
563                    // Latch any fsync failure into `bg_failure` and
564                    // stop the flusher. Subsequent commits / flushes
565                    // see the latch via `check_healthy` and start
566                    // returning `WalError::Poisoned`, which
567                    // `WalRecorder` propagates to the host as a
568                    // durability error. Operators recover by
569                    // restarting from the last consistent
570                    // snapshot + WAL.
571                    if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
572                        let mut slot = wal.bg_failure.lock().unwrap();
573                        if slot.is_none() {
574                            *slot = Some(format!("bg fsync failed: {err}"));
575                        }
576                        break;
577                    }
578                }
579                None => break,
580            }
581        }
582    });
583    GroupFlusherHandle {
584        shutdown,
585        handle: Some(handle),
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use lora_store::{MutationEvent, Properties, PropertyValue};
593
594    use crate::testing::TmpDir;
595
596    fn ev(id: u64) -> MutationEvent {
597        let mut p = Properties::new();
598        p.insert("v".into(), PropertyValue::Int(id as i64));
599        MutationEvent::CreateNode {
600            id,
601            labels: vec!["N".into()],
602            properties: p,
603        }
604    }
605
606    fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
607        Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
608    }
609
610    #[test]
611    fn fresh_open_creates_first_segment() {
612        let dir = TmpDir::new("fresh");
613        let (wal, replay) = open_default(&dir.path);
614        assert!(replay.is_empty());
615        assert_eq!(wal.next_lsn(), Lsn::new(1));
616        assert_eq!(wal.active_segment_id(), 1);
617        // No CURRENT pointer file is written — the highest segment id
618        // is the source of truth for "active segment".
619        let entries: Vec<_> = std::fs::read_dir(&dir.path)
620            .unwrap()
621            .filter_map(|e| e.ok())
622            .map(|e| e.file_name().to_string_lossy().into_owned())
623            .collect();
624        assert!(
625            entries.iter().any(|n| n == ".lora-wal.lock"),
626            "WAL dir should contain the live directory lock, found: {entries:?}"
627        );
628        assert!(
629            entries
630                .iter()
631                .filter(|n| n.as_str() != ".lora-wal.lock")
632                .all(|n| n.ends_with(".wal")),
633            "WAL dir should contain only segment files plus the lock, found: {entries:?}"
634        );
635    }
636
637    #[test]
638    fn opening_same_directory_twice_fails_until_first_handle_drops() {
639        let dir = TmpDir::new("exclusive");
640        let (wal, _) = open_default(&dir.path);
641
642        match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
643            Err(WalError::AlreadyOpen { dir: locked_dir }) => {
644                assert_eq!(locked_dir, dir.path);
645            }
646            Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
647            Ok(_) => panic!("second WAL open on same directory should fail"),
648        }
649
650        drop(wal);
651        let (reopened, _) = open_default(&dir.path);
652        drop(reopened);
653    }
654
655    #[test]
656    fn begin_append_commit_round_trip_through_replay() {
657        let dir = TmpDir::new("commit");
658
659        // First boot: write three transactions and crash without
660        // running shutdown.
661        {
662            let (wal, _) = open_default(&dir.path);
663            let begin = wal.begin().unwrap();
664            wal.append(begin, &ev(1)).unwrap();
665            wal.append(begin, &ev(2)).unwrap();
666            wal.commit(begin).unwrap();
667            wal.flush().unwrap();
668
669            let begin = wal.begin().unwrap();
670            wal.append(begin, &ev(3)).unwrap();
671            wal.commit(begin).unwrap();
672            wal.flush().unwrap();
673            // drop without explicit close
674        }
675
676        // Second boot: replay should yield events 1, 2, 3 in order.
677        let (wal, replay) = open_default(&dir.path);
678        assert_eq!(replay.len(), 3);
679        assert_eq!(replay[0], ev(1));
680        assert_eq!(replay[1], ev(2));
681        assert_eq!(replay[2], ev(3));
682        // next_lsn should be past every record we wrote (2 begins +
683        // 3 mutations + 2 commits = 7 records → next_lsn = 8).
684        assert_eq!(wal.next_lsn(), Lsn::new(8));
685    }
686
687    #[test]
688    fn aborted_transaction_is_dropped_on_replay() {
689        let dir = TmpDir::new("abort");
690
691        {
692            let (wal, _) = open_default(&dir.path);
693            let b1 = wal.begin().unwrap();
694            wal.append(b1, &ev(1)).unwrap();
695            wal.commit(b1).unwrap();
696            wal.flush().unwrap();
697
698            let b2 = wal.begin().unwrap();
699            wal.append(b2, &ev(99)).unwrap();
700            wal.abort(b2).unwrap();
701            wal.flush().unwrap();
702        }
703
704        let (_, replay) = open_default(&dir.path);
705        assert_eq!(replay, vec![ev(1)]);
706    }
707
708    #[test]
709    fn uncommitted_transaction_at_end_of_log_is_discarded() {
710        let dir = TmpDir::new("uncommitted");
711
712        {
713            let (wal, _) = open_default(&dir.path);
714            let b1 = wal.begin().unwrap();
715            wal.append(b1, &ev(1)).unwrap();
716            wal.commit(b1).unwrap();
717            wal.flush().unwrap();
718
719            // Begin + append but never commit. Simulates a crash
720            // mid-query.
721            let b2 = wal.begin().unwrap();
722            wal.append(b2, &ev(99)).unwrap();
723            wal.flush().unwrap();
724        }
725
726        let (_, replay) = open_default(&dir.path);
727        assert_eq!(replay, vec![ev(1)]);
728    }
729
730    #[test]
731    fn segment_rotation_at_begin_boundary() {
732        let dir = TmpDir::new("rotate");
733
734        // Tiny segment target so we trip rotation on the second
735        // transaction.
736        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
737
738        // First tx: a few events, takes us past 256 bytes.
739        let b1 = wal.begin().unwrap();
740        for i in 0..5 {
741            wal.append(b1, &ev(i)).unwrap();
742        }
743        wal.commit(b1).unwrap();
744        wal.flush().unwrap();
745        assert_eq!(wal.active_segment_id(), 1);
746
747        // Second `begin` triggers rotation.
748        let b2 = wal.begin().unwrap();
749        wal.append(b2, &ev(100)).unwrap();
750        wal.commit(b2).unwrap();
751        wal.flush().unwrap();
752        assert_eq!(
753            wal.active_segment_id(),
754            2,
755            "begin() should have rotated to segment 2"
756        );
757
758        let segments = SegmentDir::new(&dir.path).list().unwrap();
759        assert_eq!(segments.len(), 2);
760
761        drop(wal);
762        let (_, replay) = open_default(&dir.path);
763        assert_eq!(replay.len(), 6);
764    }
765
766    #[test]
767    fn checkpoint_lsn_skips_already_checkpointed_events() {
768        let dir = TmpDir::new("ckpt-skip");
769        let (wal, _) = open_default(&dir.path);
770
771        // Tx A: events 1,2 — ends at lsn 4.
772        let a = wal.begin().unwrap();
773        wal.append(a, &ev(1)).unwrap();
774        wal.append(a, &ev(2)).unwrap();
775        let commit_a = wal.commit(a).unwrap();
776        wal.flush().unwrap();
777
778        // Tx B: event 3 — past the fence.
779        let b = wal.begin().unwrap();
780        wal.append(b, &ev(3)).unwrap();
781        wal.commit(b).unwrap();
782        wal.flush().unwrap();
783        drop(wal);
784
785        // Re-open with checkpoint_lsn = commit_a so tx A is treated
786        // as already-applied.
787        let (_, replay) =
788            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
789        assert_eq!(replay, vec![ev(3)]);
790    }
791
792    #[test]
793    fn replay_rejects_commit_without_begin() {
794        let dir = TmpDir::new("commit-without-begin");
795
796        {
797            let (wal, _) = open_default(&dir.path);
798            wal.commit(Lsn::new(99)).unwrap();
799            wal.flush().unwrap();
800        }
801
802        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
803            Ok(_) => panic!("malformed WAL should not open"),
804            Err(err) => err,
805        };
806        assert!(
807            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
808            "expected malformed missing-begin error, got {err:?}"
809        );
810    }
811
812    #[test]
813    fn replay_rejects_mutation_without_begin() {
814        let dir = TmpDir::new("mutation-without-begin");
815
816        {
817            let (wal, _) = open_default(&dir.path);
818            wal.append(Lsn::new(99), &ev(1)).unwrap();
819            wal.flush().unwrap();
820        }
821
822        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
823            Ok(_) => panic!("malformed WAL should not open"),
824            Err(err) => err,
825        };
826        assert!(
827            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
828            "expected malformed missing-begin error, got {err:?}"
829        );
830    }
831
832    #[test]
833    fn torn_tail_is_truncated_on_open() {
834        let dir = TmpDir::new("torn");
835
836        {
837            let (wal, _) = open_default(&dir.path);
838            let b = wal.begin().unwrap();
839            wal.append(b, &ev(1)).unwrap();
840            wal.commit(b).unwrap();
841            wal.flush().unwrap();
842        }
843
844        // Append garbage to the active segment by hand.
845        let segments = SegmentDir::new(&dir.path).list().unwrap();
846        let active = &segments.last().unwrap().path;
847        {
848            use std::io::Write;
849            let mut f = OpenOptions::new().append(true).open(active).unwrap();
850            f.write_all(&[0xff; 32]).unwrap();
851            f.sync_all().unwrap();
852        }
853
854        // Re-open. Torn tail must be truncated; replay still yields
855        // ev(1); next_lsn picks up cleanly.
856        let (wal, replay) = open_default(&dir.path);
857        assert_eq!(replay, vec![ev(1)]);
858
859        // Subsequent appends don't trip a CRC failure.
860        let b = wal.begin().unwrap();
861        wal.append(b, &ev(2)).unwrap();
862        wal.commit(b).unwrap();
863        wal.flush().unwrap();
864        drop(wal);
865
866        let (_, replay) = open_default(&dir.path);
867        assert_eq!(replay, vec![ev(1), ev(2)]);
868    }
869
870    #[test]
871    fn checkpoint_marker_is_recorded_and_observed() {
872        let dir = TmpDir::new("ckpt-marker");
873
874        let snapshot_lsn = {
875            let (wal, _) = open_default(&dir.path);
876            let b = wal.begin().unwrap();
877            wal.append(b, &ev(1)).unwrap();
878            let commit = wal.commit(b).unwrap();
879            wal.flush().unwrap();
880            wal.checkpoint_marker(commit).unwrap();
881            wal.flush().unwrap();
882            commit
883        };
884
885        let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
886        assert_eq!(
887            outcome.checkpoint_lsn_observed,
888            Some(snapshot_lsn),
889            "checkpoint marker should be surfaced by replay"
890        );
891    }
892
893    #[test]
894    fn group_mode_durable_lsn_advances_via_bg_flusher() {
895        let dir = TmpDir::new("group");
896        // 25 ms interval = bg flusher should land within one or two
897        // 50 ms slices.
898        let (wal, _) = Wal::open(
899            &dir.path,
900            SyncMode::Group { interval_ms: 25 },
901            8 * 1024 * 1024,
902            Lsn::ZERO,
903        )
904        .unwrap();
905
906        let begin = wal.begin().unwrap();
907        wal.append(begin, &ev(1)).unwrap();
908        wal.commit(begin).unwrap();
909        wal.flush().unwrap(); // Group: write_buffer only; durable_lsn untouched.
910
911        // Immediately after a Group flush, durable_lsn should still
912        // be Lsn::ZERO — the bg flusher hasn't fired yet.
913        assert_eq!(
914            wal.durable_lsn(),
915            Lsn::ZERO,
916            "Group flush() must not advance durable_lsn"
917        );
918
919        // Wait up to ~500 ms for the bg flusher to advance the LSN.
920        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
921        loop {
922            if wal.durable_lsn() > Lsn::ZERO {
923                break;
924            }
925            if std::time::Instant::now() >= deadline {
926                panic!(
927                    "bg flusher did not advance durable_lsn within 500 ms (still at {})",
928                    wal.durable_lsn()
929                );
930            }
931            std::thread::sleep(std::time::Duration::from_millis(10));
932        }
933        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
934        // Wal drop should join the bg thread cleanly.
935        drop(wal);
936    }
937
938    #[test]
939    fn none_mode_advances_durable_lsn_on_flush() {
940        let dir = TmpDir::new("none");
941        let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
942
943        let begin = wal.begin().unwrap();
944        wal.append(begin, &ev(1)).unwrap();
945        wal.commit(begin).unwrap();
946        wal.flush().unwrap();
947
948        // None mode: flush() advances durable_lsn even without
949        // fsync, because the mode opted out of crash durability.
950        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
951    }
952
953    #[test]
954    fn force_fsync_always_advances_durable_lsn() {
955        let dir = TmpDir::new("force-fsync");
956        let (wal, _) = Wal::open(
957            &dir.path,
958            SyncMode::Group {
959                interval_ms: 60_000,
960            },
961            8 * 1024 * 1024,
962            Lsn::ZERO,
963        )
964        .unwrap();
965
966        let begin = wal.begin().unwrap();
967        wal.append(begin, &ev(1)).unwrap();
968        wal.commit(begin).unwrap();
969        wal.flush().unwrap(); // Group flush: durable_lsn unchanged.
970        assert_eq!(wal.durable_lsn(), Lsn::ZERO);
971
972        // force_fsync bypasses the configured cadence — used by
973        // checkpoints to grab a fence on demand.
974        wal.force_fsync().unwrap();
975        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
976    }
977
978    #[test]
979    fn truncate_up_to_drops_old_sealed_segments() {
980        let dir = TmpDir::new("truncate");
981
982        // Tiny target so each tx forces a rotation on the next begin.
983        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
984
985        let mut last_commit = Lsn::ZERO;
986        for i in 0..5 {
987            let b = wal.begin().unwrap();
988            wal.append(b, &ev(i)).unwrap();
989            last_commit = wal.commit(b).unwrap();
990            wal.flush().unwrap();
991        }
992        // Five transactions × tiny target: we should be on segment ≥ 4.
993        assert!(
994            wal.active_segment_id() >= 4,
995            "expected several rotations, got {}",
996            wal.active_segment_id()
997        );
998
999        let segments = SegmentDir::new(&dir.path);
1000        let before = segments.list().unwrap().len();
1001        wal.truncate_up_to(last_commit).unwrap();
1002        let after = segments.list().unwrap().len();
1003
1004        assert!(
1005            after < before,
1006            "truncate_up_to should have dropped at least one segment ({} → {})",
1007            before,
1008            after
1009        );
1010        // Active + tombstone are always retained.
1011        assert!(
1012            after >= 2,
1013            "active and the segment preceding it must be kept"
1014        );
1015
1016        // Subsequent appends + reopen still produce all five events
1017        // because the dropped segments only contained transactions
1018        // already at or below `last_commit`, which we feed back as
1019        // the checkpoint fence on reopen.
1020        drop(wal);
1021        let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
1022        // Everything was at or below the fence, so replay is empty.
1023        assert!(replay.is_empty());
1024    }
1025}