Skip to main content

lora_wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29#[cfg(test)]
30use std::fs::OpenOptions;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex, Weak};
34use std::thread::{self, JoinHandle};
35use std::time::Duration;
36
37use lora_store::MutationEvent;
38
39use crate::config::SyncMode;
40use crate::dir::{SegmentDir, SegmentId};
41use crate::error::WalError;
42use crate::lock::DirLock;
43use crate::lsn::Lsn;
44use crate::record::WalRecord;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48/// State guarded by the inner `Mutex`. Nothing in this struct is
49/// `Send`-unsafe; the lock is purely for `&self`-safe interior
50/// mutation.
51struct WalState {
52    next_lsn: Lsn,
53    durable_lsn: Lsn,
54    active_segment_id: SegmentId,
55    active_writer: SegmentWriter,
56    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
57    oldest_segment_id: SegmentId,
58}
59
60/// Latched failure from the background flusher. Wrapped in a `Mutex`
61/// instead of an `AtomicCell<Option<String>>` because failures are
62/// rare and we want the message preserved verbatim for operator-facing
63/// reporting (`/admin/wal/status` `bgFailure`). Once `Some`, every
64/// subsequent commit/flush returns [`WalError::Poisoned`] and the
65/// operator is expected to restart from the last consistent
66/// snapshot + WAL.
67type BgFailure = Mutex<Option<String>>;
68
69/// Selects the durability work that [`Wal::flush_inner`] actually does.
70/// Centralising the three modes here means `flush` and `force_fsync`
71/// share one code path and the call sites don't have to remember which
72/// mode advances `durable_lsn` and which does not.
73#[derive(Debug, Clone, Copy)]
74enum FlushKind {
75    /// Honour the configured [`SyncMode`]. This is what the recorder's
76    /// `flush()` calls into.
77    PerConfiguredMode,
78    /// Always write the buffer + fsync + advance `durable_lsn`,
79    /// regardless of mode. Used by checkpoints and the bg flusher.
80    ForceFsync,
81}
82
83/// Live, append-side WAL handle.
84///
85/// Construct via [`Wal::open`]. The returned tuple includes the list of
86/// committed mutation events that need to be re-applied to the
87/// in-memory store before any new traffic is accepted.
88///
89/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
90/// background flusher needs a `Weak<Wal>` to call back into without
91/// taking a strong reference (which would prevent shutdown).
92pub struct Wal {
93    segments: SegmentDir,
94    sync_mode: SyncMode,
95    segment_target_bytes: u64,
96    state: Mutex<WalState>,
97    /// Latched bg-flusher failure; surfaced via [`Wal::bg_failure`] and
98    /// propagated to commit/flush/force_fsync as
99    /// [`WalError::Poisoned`].
100    bg_failure: Arc<BgFailure>,
101    /// Background flusher for `SyncMode::Group`. `Drop` joins the
102    /// thread, so a `Wal` going out of scope is a clean shutdown
103    /// signal.
104    _flusher: Mutex<Option<GroupFlusherHandle>>,
105    /// Held for the lifetime of the WAL so a second handle cannot append
106    /// to the same active segment concurrently.
107    _dir_lock: DirLock,
108}
109
110impl Wal {
111    /// Open or create the WAL directory at `dir`.
112    ///
113    /// `checkpoint_lsn` is the LSN stamped into the most recent
114    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
115    /// there is no snapshot). Replay skips records at or below this
116    /// fence — they are already represented in the loaded state.
117    ///
118    /// Returns `(wal, committed_events)`. The caller is expected to
119    /// apply every event in `committed_events` to its in-memory store
120    /// in order before issuing any new `begin` / `append` calls.
121    pub fn open(
122        dir: impl Into<std::path::PathBuf>,
123        sync_mode: SyncMode,
124        segment_target_bytes: u64,
125        checkpoint_lsn: Lsn,
126    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127        let segments = SegmentDir::new(dir);
128        fs::create_dir_all(segments.root())?;
129        let dir_lock = DirLock::acquire(segments.root())?;
130
131        let entries = segments.list()?;
132        let (active_id, active_writer, replay) = if entries.is_empty() {
133            Self::open_fresh(&segments)?
134        } else {
135            Self::open_existing(&segments, &entries, checkpoint_lsn)?
136        };
137
138        let next_lsn = if replay.max_lsn.is_zero() {
139            Lsn::new(1)
140        } else {
141            replay.max_lsn.next()
142        };
143        // Treat everything readable at open time as the recovered
144        // durability fence. This does not prove the bytes were
145        // fsync-confirmed before the previous process died; it means
146        // they survived to this open and future appends must start
147        // after them.
148        let durable_lsn = replay.max_lsn;
149
150        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
151
152        let state = WalState {
153            next_lsn,
154            durable_lsn,
155            active_segment_id: active_id,
156            active_writer,
157            oldest_segment_id,
158        };
159
160        let wal = Arc::new(Self {
161            segments,
162            sync_mode,
163            segment_target_bytes,
164            state: Mutex::new(state),
165            bg_failure: Arc::new(Mutex::new(None)),
166            _flusher: Mutex::new(None),
167            _dir_lock: dir_lock,
168        });
169
170        // Spawn the Group flusher *after* the Arc exists so it can
171        // hold a `Weak<Wal>` that drops when the last strong ref
172        // does. The flusher's own Drop joins the thread, so removing
173        // the field (e.g. on Wal::drop) is a clean shutdown signal.
174        if let SyncMode::Group { interval_ms } = sync_mode {
175            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
176            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
177            *wal._flusher.lock().unwrap() = Some(handle);
178        }
179
180        Ok((wal, replay.committed_events))
181    }
182
183    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
184    /// so LSN 0 stays reserved for "empty / never written".
185    fn open_fresh(
186        segments: &SegmentDir,
187    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
188        let id = SegmentId::FIRST;
189        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
190        segments.sync_dir()?;
191        let replay = ReplayOutcome {
192            committed_events: Vec::new(),
193            max_lsn: Lsn::ZERO,
194            torn_tail: None,
195            checkpoint_lsn_observed: None,
196        };
197        Ok((id, writer, replay))
198    }
199
200    /// Existing directory. Replay every segment to surface committed
201    /// events + detect a torn tail; reopen the highest-id segment
202    /// for append; truncate it if the torn tail is in *that* segment.
203    fn open_existing(
204        segments: &SegmentDir,
205        entries: &[crate::dir::SegmentEntry],
206        checkpoint_lsn: Lsn,
207    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
208        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
209        let replay = replay_segments(&paths, checkpoint_lsn)?;
210
211        // The active segment is whichever file has the highest
212        // numeric id — segment file names are self-describing, so
213        // there is no separate CURRENT pointer.
214        let active = entries.last().expect("entries non-empty in open_existing");
215        let (mut writer, _torn_from_writer) =
216            SegmentWriter::open_for_append(segments.path_for(active.id))?;
217
218        // A torn tail in a *sealed* segment is impossible (sealed
219        // segments are never appended to), so we only need to handle
220        // the active one.
221        if let Some(t) = &replay.torn_tail {
222            if t.segment_path == active.path {
223                writer.truncate_to(t.last_good_offset)?;
224            } else {
225                return Err(WalError::Malformed(format!(
226                    "torn tail found in sealed segment {}",
227                    t.segment_path.display()
228                )));
229            }
230        }
231
232        Ok((active.id, writer, replay))
233    }
234
235    pub fn dir(&self) -> &Path {
236        self.segments.root()
237    }
238
239    pub fn sync_mode(&self) -> SyncMode {
240        self.sync_mode
241    }
242
243    pub fn durable_lsn(&self) -> Lsn {
244        self.state.lock().unwrap().durable_lsn
245    }
246
247    /// Latched message from the background flusher, if it has ever
248    /// failed an `fsync`. `None` means the WAL is healthy. Once set,
249    /// every commit / flush / force_fsync starts returning
250    /// [`WalError::Poisoned`] and the WAL stops accepting new
251    /// transactions until the operator restarts from the last
252    /// consistent snapshot + WAL.
253    pub fn bg_failure(&self) -> Option<String> {
254        self.bg_failure.lock().unwrap().clone()
255    }
256
257    fn check_healthy(&self) -> Result<(), WalError> {
258        if self.bg_failure.lock().unwrap().is_some() {
259            return Err(WalError::Poisoned);
260        }
261        Ok(())
262    }
263
264    /// LSN that the *next* `begin` / `append` call will allocate.
265    /// Exposed for tests and for sanity checks at boot; not part of
266    /// any durability contract.
267    pub fn next_lsn(&self) -> Lsn {
268        self.state.lock().unwrap().next_lsn
269    }
270
271    pub fn oldest_segment_id(&self) -> u64 {
272        self.state.lock().unwrap().oldest_segment_id.raw()
273    }
274
275    pub fn active_segment_id(&self) -> u64 {
276        self.state.lock().unwrap().active_segment_id.raw()
277    }
278
279    /// Begin a new transaction. Allocates a `TxBegin` record and
280    /// returns its LSN, which the caller must thread back through
281    /// `append` / `commit` / `abort` so replay can group the events.
282    ///
283    /// If the active segment has crossed `segment_target_bytes`,
284    /// rotation happens here — `TxBegin` is the only record kind
285    /// guaranteed to be a transaction boundary, so rotating just
286    /// before its append keeps every transaction wholly in one
287    /// segment.
288    pub fn begin(&self) -> Result<Lsn, WalError> {
289        self.check_healthy()?;
290        let mut state = self.state.lock().unwrap();
291        self.maybe_rotate(&mut state)?;
292        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
293    }
294
295    /// Append a single mutation to the in-memory pending buffer of
296    /// the active segment. Not durable until `flush()` runs.
297    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
298        self.check_healthy()?;
299        let mut state = self.state.lock().unwrap();
300        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
301            lsn,
302            tx_begin_lsn,
303            event: event.clone(),
304        })
305    }
306
307    /// Append many mutations as one framed record. This keeps the replay
308    /// contract identical to repeated `append` calls while avoiding per-event
309    /// length/CRC/framing overhead for write-heavy statements.
310    pub fn append_batch(
311        &self,
312        tx_begin_lsn: Lsn,
313        events: Vec<MutationEvent>,
314    ) -> Result<Lsn, WalError> {
315        self.check_healthy()?;
316        if events.is_empty() {
317            return Err(WalError::Encode(
318                "mutation batch must contain at least one event".into(),
319            ));
320        }
321        let mut state = self.state.lock().unwrap();
322        Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
323            lsn,
324            tx_begin_lsn,
325            events,
326        })
327    }
328
329    /// Append a `TxCommit` marker. Caller is expected to subsequently
330    /// call `flush()` (under `SyncMode::PerCommit`) to make the
331    /// commit durable before returning to its caller.
332    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
333        self.check_healthy()?;
334        let mut state = self.state.lock().unwrap();
335        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
336    }
337
338    /// Append a `TxAbort` marker. Replay drops the events keyed by
339    /// `tx_begin_lsn` without re-applying them.
340    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
341        self.check_healthy()?;
342        let mut state = self.state.lock().unwrap();
343        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
344    }
345
346    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
347    /// LSN written into the snapshot file's header — replay uses
348    /// it to defend against the snapshot-rename-but-no-marker race.
349    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
350        self.check_healthy()?;
351        let mut state = self.state.lock().unwrap();
352        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
353            lsn,
354            snapshot_lsn,
355        })
356    }
357
358    /// Single-source-of-truth for "allocate the next LSN, build the
359    /// record, push it onto the active segment's pending buffer".
360    /// The five public append paths (`begin / append / commit / abort
361    /// / checkpoint_marker`) all funnel through here so the LSN
362    /// allocation never gets out of sync with the encoded record.
363    #[inline]
364    fn alloc_and_append(
365        state: &mut WalState,
366        build: impl FnOnce(Lsn) -> WalRecord,
367    ) -> Result<Lsn, WalError> {
368        let lsn = state.next_lsn;
369        state.next_lsn = lsn.next();
370        state.active_writer.append(&build(lsn))?;
371        Ok(lsn)
372    }
373
374    /// Flush the active segment's pending buffer.
375    ///
376    /// What "flush" means depends on [`SyncMode`]:
377    ///
378    /// - `PerCommit` — write the buffer to the OS, `fsync`, and
379    ///   advance `durable_lsn`. The strongest contract: every
380    ///   record up to `next_lsn - 1` is on disk.
381    /// - `Group` — leave the buffer in memory. The background flusher
382    ///   writes, fsyncs, and advances `durable_lsn` on its cadence.
383    /// - `None` — write the buffer to the OS only, but advance
384    ///   `durable_lsn` anyway. The mode opts out of crash
385    ///   durability, so the checkpoint fence reports
386    ///   "what's been written" instead of "what's actually safe".
387    pub fn flush(&self) -> Result<(), WalError> {
388        self.check_healthy()?;
389        self.flush_inner(FlushKind::PerConfiguredMode)
390    }
391
392    /// Unconditionally write the buffer to the OS, `fsync`, and
393    /// advance `durable_lsn`. Used by callers that need a durability
394    /// point right now regardless of the configured cadence (e.g.
395    /// checkpoint). Returns [`WalError::Poisoned`] if the bg flusher
396    /// has already failed.
397    pub fn force_fsync(&self) -> Result<(), WalError> {
398        self.check_healthy()?;
399        self.flush_inner(FlushKind::ForceFsync)
400    }
401
402    /// Single source of truth for the flush state machine. Skips the
403    /// `check_healthy` gate so the bg flusher can call into it
404    /// without recursing through its own latch.
405    fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
406        let mut state = self.state.lock().unwrap();
407        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
408
409        // Decide whether this call is allowed to advance
410        // `durable_lsn`. The bg flusher's job in Group mode is
411        // exactly to do that "out of band"; PerCommit and None do it
412        // inline; Group's user-driven `flush()` does not.
413        let do_fsync = matches!(
414            (kind, self.sync_mode),
415            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
416        );
417        let advance_durable = matches!(
418            (kind, self.sync_mode),
419            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
420        );
421
422        if matches!(
423            (kind, self.sync_mode),
424            (FlushKind::PerConfiguredMode, SyncMode::Group { .. })
425        ) {
426            // Group mode batches both the write syscall and the fsync. This
427            // keeps the write-heavy hot path close to in-memory execution; the
428            // background flusher (or Drop) will force the buffer out.
429        } else if do_fsync {
430            state.active_writer.flush_and_sync()?;
431        } else {
432            state.active_writer.flush_buffer()?;
433        }
434        if advance_durable {
435            state.durable_lsn = written_lsn;
436        }
437        Ok(())
438    }
439
440    /// Drop sealed segments whose entire LSN range is at or below
441    /// `fence_lsn`. Idempotent and safe to call repeatedly.
442    ///
443    /// The active segment is never deleted — even if every record in
444    /// it predates the fence, it is still the rotation target for
445    /// new appends. The segment immediately before the active one
446    /// is also kept as a tombstone so a subsequent crash before the
447    /// next checkpoint still finds a self-describing log start.
448    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
449        let mut state = self.state.lock().unwrap();
450        let active_id = state.active_segment_id;
451        let entries = self.segments.list()?;
452
453        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
454        for (i, entry) in entries.iter().enumerate() {
455            // Active segment and the one immediately preceding it
456            // are kept by policy.
457            if entry.id >= active_id.saturating_prev() {
458                break;
459            }
460            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
461            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
462            let next = match entries.get(i + 1) {
463                Some(n) => n,
464                None => break,
465            };
466            let next_base = SegmentDir::base_lsn(&next.path)?;
467            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
468                to_drop.push(entry.clone());
469            }
470        }
471
472        for entry in to_drop {
473            fs::remove_file(&entry.path)?;
474            if entry.id >= state.oldest_segment_id {
475                state.oldest_segment_id = entry.id.next();
476            }
477        }
478        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
479            self.segments.sync_dir()?;
480        }
481        Ok(())
482    }
483
484    /// Rotate the active segment when it has grown past
485    /// `segment_target_bytes`. Called from `begin()` so rotation only
486    /// ever lands at a transaction boundary.
487    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
488        if state.active_writer.bytes_written() < self.segment_target_bytes {
489            return Ok(());
490        }
491        // Seal the current segment (forces a flush + fsync) and open
492        // a fresh one with `base_lsn = next_lsn` so the segment file
493        // names line up with the record LSNs they contain.
494        state.active_writer.flush_and_sync()?;
495        state.active_writer.seal()?;
496
497        let next_id = state.active_segment_id.next();
498        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
499        self.segments.sync_dir()?;
500        state.active_writer = writer;
501        state.active_segment_id = next_id;
502        Ok(())
503    }
504}
505
506impl Drop for Wal {
507    fn drop(&mut self) {
508        if matches!(self.sync_mode, SyncMode::Group { .. }) {
509            let _ = self.flush_inner(FlushKind::ForceFsync);
510        }
511        // Join the group flusher, if any, before the directory lock is
512        // released. That keeps the "one live append owner" boundary intact
513        // through shutdown.
514        if let Ok(slot) = self._flusher.get_mut() {
515            let _ = slot.take();
516        }
517    }
518}
519
520// ---------------------------------------------------------------------------
521// Group-mode background flusher
522// ---------------------------------------------------------------------------
523
524/// Owns the OS thread that periodically `fsync`s the WAL under
525/// `SyncMode::Group`. Held inside the `Wal` itself so dropping the
526/// last `Arc<Wal>` runs `Drop` here, signals shutdown, and joins
527/// before the underlying `WalState` is destroyed.
528struct GroupFlusherHandle {
529    shutdown: Arc<AtomicBool>,
530    handle: Option<JoinHandle<()>>,
531}
532
533impl Drop for GroupFlusherHandle {
534    fn drop(&mut self) {
535        self.shutdown.store(true, Ordering::Release);
536        if let Some(h) = self.handle.take() {
537            // `let _ = ...` because the thread can only fail by
538            // panicking; even then, the Wal itself is being dropped
539            // and there is nothing useful to do with the panic at
540            // teardown.
541            let _ = h.join();
542        }
543    }
544}
545
546fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
547    let shutdown = Arc::new(AtomicBool::new(false));
548    let shutdown_clone = Arc::clone(&shutdown);
549    let handle = thread::spawn(move || {
550        // Sleep first so a shortlived Wal that opens-and-closes
551        // immediately doesn't pay for an extra wakeup. We re-check
552        // the shutdown flag at every iteration so a Drop signal
553        // racing with a sleep wakes up at most one interval late.
554        while !shutdown_clone.load(Ordering::Acquire) {
555            // Break the sleep into ~50 ms slices so shutdown can be
556            // observed without waiting up to a full `interval` at
557            // teardown. This matters for tests, which want fast
558            // join times.
559            let slice = Duration::from_millis(50).min(interval);
560            let mut elapsed = Duration::ZERO;
561            while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
562                thread::sleep(slice);
563                elapsed += slice;
564            }
565            if shutdown_clone.load(Ordering::Acquire) {
566                break;
567            }
568            match weak.upgrade() {
569                Some(wal) => {
570                    // Latch any fsync failure into `bg_failure` and
571                    // stop the flusher. Subsequent commits / flushes
572                    // see the latch via `check_healthy` and start
573                    // returning `WalError::Poisoned`, which
574                    // `WalRecorder` propagates to the host as a
575                    // durability error. Operators recover by
576                    // restarting from the last consistent
577                    // snapshot + WAL.
578                    if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
579                        let mut slot = wal.bg_failure.lock().unwrap();
580                        if slot.is_none() {
581                            *slot = Some(format!("bg fsync failed: {err}"));
582                        }
583                        break;
584                    }
585                }
586                None => break,
587            }
588        }
589    });
590    GroupFlusherHandle {
591        shutdown,
592        handle: Some(handle),
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599    use lora_store::{MutationEvent, Properties, PropertyValue};
600
601    use crate::testing::TmpDir;
602
603    fn ev(id: u64) -> MutationEvent {
604        let mut p = Properties::new();
605        p.insert("v".into(), PropertyValue::Int(id as i64));
606        MutationEvent::CreateNode {
607            id,
608            labels: vec!["N".into()],
609            properties: p,
610        }
611    }
612
613    fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
614        Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
615    }
616
617    #[test]
618    fn fresh_open_creates_first_segment() {
619        let dir = TmpDir::new("fresh");
620        let (wal, replay) = open_default(&dir.path);
621        assert!(replay.is_empty());
622        assert_eq!(wal.next_lsn(), Lsn::new(1));
623        assert_eq!(wal.active_segment_id(), 1);
624        // No CURRENT pointer file is written — the highest segment id
625        // is the source of truth for "active segment".
626        let entries: Vec<_> = std::fs::read_dir(&dir.path)
627            .unwrap()
628            .filter_map(|e| e.ok())
629            .map(|e| e.file_name().to_string_lossy().into_owned())
630            .collect();
631        assert!(
632            entries.iter().any(|n| n == ".lora-wal.lock"),
633            "WAL dir should contain the live directory lock, found: {entries:?}"
634        );
635        assert!(
636            entries
637                .iter()
638                .filter(|n| n.as_str() != ".lora-wal.lock")
639                .all(|n| n.ends_with(".wal")),
640            "WAL dir should contain only segment files plus the lock, found: {entries:?}"
641        );
642    }
643
644    #[test]
645    fn opening_same_directory_twice_fails_until_first_handle_drops() {
646        let dir = TmpDir::new("exclusive");
647        let (wal, _) = open_default(&dir.path);
648
649        match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
650            Err(WalError::AlreadyOpen { dir: locked_dir }) => {
651                assert_eq!(locked_dir, dir.path);
652            }
653            Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
654            Ok(_) => panic!("second WAL open on same directory should fail"),
655        }
656
657        drop(wal);
658        let (reopened, _) = open_default(&dir.path);
659        drop(reopened);
660    }
661
662    #[test]
663    fn begin_append_commit_round_trip_through_replay() {
664        let dir = TmpDir::new("commit");
665
666        // First boot: write three transactions and crash without
667        // running shutdown.
668        {
669            let (wal, _) = open_default(&dir.path);
670            let begin = wal.begin().unwrap();
671            wal.append(begin, &ev(1)).unwrap();
672            wal.append(begin, &ev(2)).unwrap();
673            wal.commit(begin).unwrap();
674            wal.flush().unwrap();
675
676            let begin = wal.begin().unwrap();
677            wal.append(begin, &ev(3)).unwrap();
678            wal.commit(begin).unwrap();
679            wal.flush().unwrap();
680            // drop without explicit close
681        }
682
683        // Second boot: replay should yield events 1, 2, 3 in order.
684        let (wal, replay) = open_default(&dir.path);
685        assert_eq!(replay.len(), 3);
686        assert_eq!(replay[0], ev(1));
687        assert_eq!(replay[1], ev(2));
688        assert_eq!(replay[2], ev(3));
689        // next_lsn should be past every record we wrote (2 begins +
690        // 3 mutations + 2 commits = 7 records → next_lsn = 8).
691        assert_eq!(wal.next_lsn(), Lsn::new(8));
692    }
693
694    #[test]
695    fn aborted_transaction_is_dropped_on_replay() {
696        let dir = TmpDir::new("abort");
697
698        {
699            let (wal, _) = open_default(&dir.path);
700            let b1 = wal.begin().unwrap();
701            wal.append(b1, &ev(1)).unwrap();
702            wal.commit(b1).unwrap();
703            wal.flush().unwrap();
704
705            let b2 = wal.begin().unwrap();
706            wal.append(b2, &ev(99)).unwrap();
707            wal.abort(b2).unwrap();
708            wal.flush().unwrap();
709        }
710
711        let (_, replay) = open_default(&dir.path);
712        assert_eq!(replay, vec![ev(1)]);
713    }
714
715    #[test]
716    fn uncommitted_transaction_at_end_of_log_is_discarded() {
717        let dir = TmpDir::new("uncommitted");
718
719        {
720            let (wal, _) = open_default(&dir.path);
721            let b1 = wal.begin().unwrap();
722            wal.append(b1, &ev(1)).unwrap();
723            wal.commit(b1).unwrap();
724            wal.flush().unwrap();
725
726            // Begin + append but never commit. Simulates a crash
727            // mid-query.
728            let b2 = wal.begin().unwrap();
729            wal.append(b2, &ev(99)).unwrap();
730            wal.flush().unwrap();
731        }
732
733        let (_, replay) = open_default(&dir.path);
734        assert_eq!(replay, vec![ev(1)]);
735    }
736
737    #[test]
738    fn segment_rotation_at_begin_boundary() {
739        let dir = TmpDir::new("rotate");
740
741        // Tiny segment target so we trip rotation on the second
742        // transaction.
743        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
744
745        // First tx: a few events, takes us past 256 bytes.
746        let b1 = wal.begin().unwrap();
747        for i in 0..5 {
748            wal.append(b1, &ev(i)).unwrap();
749        }
750        wal.commit(b1).unwrap();
751        wal.flush().unwrap();
752        assert_eq!(wal.active_segment_id(), 1);
753
754        // Second `begin` triggers rotation.
755        let b2 = wal.begin().unwrap();
756        wal.append(b2, &ev(100)).unwrap();
757        wal.commit(b2).unwrap();
758        wal.flush().unwrap();
759        assert_eq!(
760            wal.active_segment_id(),
761            2,
762            "begin() should have rotated to segment 2"
763        );
764
765        let segments = SegmentDir::new(&dir.path).list().unwrap();
766        assert_eq!(segments.len(), 2);
767
768        drop(wal);
769        let (_, replay) = open_default(&dir.path);
770        assert_eq!(replay.len(), 6);
771    }
772
773    #[test]
774    fn checkpoint_lsn_skips_already_checkpointed_events() {
775        let dir = TmpDir::new("ckpt-skip");
776        let (wal, _) = open_default(&dir.path);
777
778        // Tx A: events 1,2 — ends at lsn 4.
779        let a = wal.begin().unwrap();
780        wal.append(a, &ev(1)).unwrap();
781        wal.append(a, &ev(2)).unwrap();
782        let commit_a = wal.commit(a).unwrap();
783        wal.flush().unwrap();
784
785        // Tx B: event 3 — past the fence.
786        let b = wal.begin().unwrap();
787        wal.append(b, &ev(3)).unwrap();
788        wal.commit(b).unwrap();
789        wal.flush().unwrap();
790        drop(wal);
791
792        // Re-open with checkpoint_lsn = commit_a so tx A is treated
793        // as already-applied.
794        let (_, replay) =
795            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
796        assert_eq!(replay, vec![ev(3)]);
797    }
798
799    #[test]
800    fn replay_rejects_commit_without_begin() {
801        let dir = TmpDir::new("commit-without-begin");
802
803        {
804            let (wal, _) = open_default(&dir.path);
805            wal.commit(Lsn::new(99)).unwrap();
806            wal.flush().unwrap();
807        }
808
809        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
810            Ok(_) => panic!("malformed WAL should not open"),
811            Err(err) => err,
812        };
813        assert!(
814            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
815            "expected malformed missing-begin error, got {err:?}"
816        );
817    }
818
819    #[test]
820    fn replay_rejects_mutation_without_begin() {
821        let dir = TmpDir::new("mutation-without-begin");
822
823        {
824            let (wal, _) = open_default(&dir.path);
825            wal.append(Lsn::new(99), &ev(1)).unwrap();
826            wal.flush().unwrap();
827        }
828
829        let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
830            Ok(_) => panic!("malformed WAL should not open"),
831            Err(err) => err,
832        };
833        assert!(
834            matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
835            "expected malformed missing-begin error, got {err:?}"
836        );
837    }
838
839    #[test]
840    fn torn_tail_is_truncated_on_open() {
841        let dir = TmpDir::new("torn");
842
843        {
844            let (wal, _) = open_default(&dir.path);
845            let b = wal.begin().unwrap();
846            wal.append(b, &ev(1)).unwrap();
847            wal.commit(b).unwrap();
848            wal.flush().unwrap();
849        }
850
851        // Append garbage to the active segment by hand.
852        let segments = SegmentDir::new(&dir.path).list().unwrap();
853        let active = &segments.last().unwrap().path;
854        {
855            use std::io::Write;
856            let mut f = OpenOptions::new().append(true).open(active).unwrap();
857            f.write_all(&[0xff; 32]).unwrap();
858            f.sync_all().unwrap();
859        }
860
861        // Re-open. Torn tail must be truncated; replay still yields
862        // ev(1); next_lsn picks up cleanly.
863        let (wal, replay) = open_default(&dir.path);
864        assert_eq!(replay, vec![ev(1)]);
865
866        // Subsequent appends don't trip a CRC failure.
867        let b = wal.begin().unwrap();
868        wal.append(b, &ev(2)).unwrap();
869        wal.commit(b).unwrap();
870        wal.flush().unwrap();
871        drop(wal);
872
873        let (_, replay) = open_default(&dir.path);
874        assert_eq!(replay, vec![ev(1), ev(2)]);
875    }
876
877    #[test]
878    fn checkpoint_marker_is_recorded_and_observed() {
879        let dir = TmpDir::new("ckpt-marker");
880
881        let snapshot_lsn = {
882            let (wal, _) = open_default(&dir.path);
883            let b = wal.begin().unwrap();
884            wal.append(b, &ev(1)).unwrap();
885            let commit = wal.commit(b).unwrap();
886            wal.flush().unwrap();
887            wal.checkpoint_marker(commit).unwrap();
888            wal.flush().unwrap();
889            commit
890        };
891
892        let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
893        assert_eq!(
894            outcome.checkpoint_lsn_observed,
895            Some(snapshot_lsn),
896            "checkpoint marker should be surfaced by replay"
897        );
898    }
899
900    #[test]
901    fn group_mode_durable_lsn_advances_via_bg_flusher() {
902        let dir = TmpDir::new("group");
903        // 25 ms interval = bg flusher should land within one or two
904        // 50 ms slices.
905        let (wal, _) = Wal::open(
906            &dir.path,
907            SyncMode::Group { interval_ms: 25 },
908            8 * 1024 * 1024,
909            Lsn::ZERO,
910        )
911        .unwrap();
912
913        let begin = wal.begin().unwrap();
914        wal.append(begin, &ev(1)).unwrap();
915        wal.commit(begin).unwrap();
916        wal.flush().unwrap(); // Group: write_buffer only; durable_lsn untouched.
917
918        // Immediately after a Group flush, durable_lsn should still
919        // be Lsn::ZERO — the bg flusher hasn't fired yet.
920        assert_eq!(
921            wal.durable_lsn(),
922            Lsn::ZERO,
923            "Group flush() must not advance durable_lsn"
924        );
925
926        // Wait up to ~500 ms for the bg flusher to advance the LSN.
927        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
928        loop {
929            if wal.durable_lsn() > Lsn::ZERO {
930                break;
931            }
932            if std::time::Instant::now() >= deadline {
933                panic!(
934                    "bg flusher did not advance durable_lsn within 500 ms (still at {})",
935                    wal.durable_lsn()
936                );
937            }
938            std::thread::sleep(std::time::Duration::from_millis(10));
939        }
940        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
941        // Wal drop should join the bg thread cleanly.
942        drop(wal);
943    }
944
945    #[test]
946    fn none_mode_advances_durable_lsn_on_flush() {
947        let dir = TmpDir::new("none");
948        let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
949
950        let begin = wal.begin().unwrap();
951        wal.append(begin, &ev(1)).unwrap();
952        wal.commit(begin).unwrap();
953        wal.flush().unwrap();
954
955        // None mode: flush() advances durable_lsn even without
956        // fsync, because the mode opted out of crash durability.
957        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
958    }
959
960    #[test]
961    fn force_fsync_always_advances_durable_lsn() {
962        let dir = TmpDir::new("force-fsync");
963        let (wal, _) = Wal::open(
964            &dir.path,
965            SyncMode::Group {
966                interval_ms: 60_000,
967            },
968            8 * 1024 * 1024,
969            Lsn::ZERO,
970        )
971        .unwrap();
972
973        let begin = wal.begin().unwrap();
974        wal.append(begin, &ev(1)).unwrap();
975        wal.commit(begin).unwrap();
976        wal.flush().unwrap(); // Group flush: durable_lsn unchanged.
977        assert_eq!(wal.durable_lsn(), Lsn::ZERO);
978
979        // force_fsync bypasses the configured cadence — used by
980        // checkpoints to grab a fence on demand.
981        wal.force_fsync().unwrap();
982        assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
983    }
984
985    #[test]
986    fn truncate_up_to_drops_old_sealed_segments() {
987        let dir = TmpDir::new("truncate");
988
989        // Tiny target so each tx forces a rotation on the next begin.
990        let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
991
992        let mut last_commit = Lsn::ZERO;
993        for i in 0..5 {
994            let b = wal.begin().unwrap();
995            wal.append(b, &ev(i)).unwrap();
996            last_commit = wal.commit(b).unwrap();
997            wal.flush().unwrap();
998        }
999        // Five transactions × tiny target: we should be on segment ≥ 4.
1000        assert!(
1001            wal.active_segment_id() >= 4,
1002            "expected several rotations, got {}",
1003            wal.active_segment_id()
1004        );
1005
1006        let segments = SegmentDir::new(&dir.path);
1007        let before = segments.list().unwrap().len();
1008        wal.truncate_up_to(last_commit).unwrap();
1009        let after = segments.list().unwrap().len();
1010
1011        assert!(
1012            after < before,
1013            "truncate_up_to should have dropped at least one segment ({} → {})",
1014            before,
1015            after
1016        );
1017        // Active + tombstone are always retained.
1018        assert!(
1019            after >= 2,
1020            "active and the segment preceding it must be kept"
1021        );
1022
1023        // Subsequent appends + reopen still produce all five events
1024        // because the dropped segments only contained transactions
1025        // already at or below `last_commit`, which we feed back as
1026        // the checkpoint fence on reopen.
1027        drop(wal);
1028        let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
1029        // Everything was at or below the fence, so replay is empty.
1030        assert!(replay.is_empty());
1031    }
1032}