Skip to main content

lora_wal/wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29use std::path::Path;
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32
33use lora_store::MutationEvent;
34
35use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
36use crate::config::SyncMode;
37use crate::dir::{SegmentDir, SegmentId};
38use crate::errors::WalError;
39use crate::lock::DirLock;
40use crate::lsn::Lsn;
41use crate::record::WalRecord;
42use crate::replay::{replay_segments, ReplayOutcome};
43use crate::segment::SegmentWriter;
44
45/// State guarded by the inner `Mutex`. Nothing in this struct is
46/// `Send`-unsafe; the lock is purely for `&self`-safe interior
47/// mutation.
48struct WalState {
49    next_lsn: Lsn,
50    durable_lsn: Lsn,
51    active_segment_id: SegmentId,
52    active_writer: SegmentWriter,
53    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
54    oldest_segment_id: SegmentId,
55}
56
57/// Latched failure from the background flusher. Wrapped in a `Mutex`
58/// instead of an `AtomicCell<Option<String>>` because failures are
59/// rare and we want the message preserved verbatim for operator-facing
60/// reporting (`/admin/wal/status` `bgFailure`). Once `Some`, every
61/// subsequent commit/flush returns [`WalError::Poisoned`] and the
62/// operator is expected to restart from the last consistent
63/// snapshot + WAL.
64type BgFailure = Mutex<Option<String>>;
65
66/// Selects the durability work that [`Wal::flush_inner`] actually does.
67/// Centralising the three modes here means `flush` and `force_fsync`
68/// share one code path and the call sites don't have to remember which
69/// mode advances `durable_lsn` and which does not.
70#[derive(Debug, Clone, Copy)]
71pub(super) enum FlushKind {
72    /// Honour the configured [`SyncMode`]. This is what the recorder's
73    /// `flush()` calls into.
74    PerConfiguredMode,
75    /// Always write the buffer + fsync + advance `durable_lsn`,
76    /// regardless of mode. Used by checkpoints and the bg flusher.
77    ForceFsync,
78}
79
80/// Live, append-side WAL handle.
81///
82/// Construct via [`Wal::open`]. The returned tuple includes the list of
83/// committed mutation events that need to be re-applied to the
84/// in-memory store before any new traffic is accepted.
85///
86/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
87/// background flusher needs a `Weak<Wal>` to call back into without
88/// taking a strong reference (which would prevent shutdown).
89pub struct Wal {
90    segments: SegmentDir,
91    sync_mode: SyncMode,
92    segment_target_bytes: u64,
93    state: Mutex<WalState>,
94    /// Latched bg-flusher failure; surfaced via [`Wal::bg_failure`] and
95    /// propagated to commit/flush/force_fsync as
96    /// [`WalError::Poisoned`].
97    bg_failure: Arc<BgFailure>,
98    /// Background flusher for `SyncMode::Group`. `Drop` joins the
99    /// thread, so a `Wal` going out of scope is a clean shutdown
100    /// signal.
101    flusher: Mutex<Option<GroupFlusherHandle>>,
102    /// Held for the lifetime of the WAL so a second handle cannot append
103    /// to the same active segment concurrently.
104    _dir_lock: DirLock,
105}
106
107impl Wal {
108    /// Open or create the WAL directory at `dir`.
109    ///
110    /// `checkpoint_lsn` is the LSN stamped into the most recent
111    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
112    /// there is no snapshot). Replay skips records at or below this
113    /// fence — they are already represented in the loaded state.
114    ///
115    /// Returns `(wal, committed_events)`. The caller is expected to
116    /// apply every event in `committed_events` to its in-memory store
117    /// in order before issuing any new `begin` / `append` calls.
118    pub fn open(
119        dir: impl Into<std::path::PathBuf>,
120        sync_mode: SyncMode,
121        segment_target_bytes: u64,
122        checkpoint_lsn: Lsn,
123    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
124        let segments = SegmentDir::new(dir);
125        fs::create_dir_all(segments.root())?;
126        let dir_lock = DirLock::acquire(segments.root())?;
127
128        let entries = segments.list()?;
129        let (active_id, active_writer, replay) = if entries.is_empty() {
130            Self::open_fresh(&segments)?
131        } else {
132            Self::open_existing(&segments, &entries, checkpoint_lsn)?
133        };
134
135        let next_lsn = if replay.max_lsn.is_zero() {
136            Lsn::new(1)
137        } else {
138            replay.max_lsn.next()
139        };
140        // Treat everything readable at open time as the recovered
141        // durability fence. This does not prove the bytes were
142        // fsync-confirmed before the previous process died; it means
143        // they survived to this open and future appends must start
144        // after them.
145        let durable_lsn = replay.max_lsn;
146
147        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
148
149        let state = WalState {
150            next_lsn,
151            durable_lsn,
152            active_segment_id: active_id,
153            active_writer,
154            oldest_segment_id,
155        };
156
157        let wal = Arc::new(Self {
158            segments,
159            sync_mode,
160            segment_target_bytes,
161            state: Mutex::new(state),
162            bg_failure: Arc::new(Mutex::new(None)),
163            flusher: Mutex::new(None),
164            _dir_lock: dir_lock,
165        });
166
167        // Spawn the Group flusher *after* the Arc exists so it can
168        // hold a `Weak<Wal>` that drops when the last strong ref
169        // does. The flusher's own Drop joins the thread, so removing
170        // the field (e.g. on Wal::drop) is a clean shutdown signal.
171        if let SyncMode::Group { interval_ms } = sync_mode {
172            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
173            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
174            *wal.flusher.lock().unwrap() = Some(handle);
175        }
176
177        Ok((wal, replay.committed_events))
178    }
179
180    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
181    /// so LSN 0 stays reserved for "empty / never written".
182    fn open_fresh(
183        segments: &SegmentDir,
184    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
185        let id = SegmentId::FIRST;
186        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
187        segments.sync_dir()?;
188        let replay = ReplayOutcome {
189            committed_events: Vec::new(),
190            max_lsn: Lsn::ZERO,
191            torn_tail: None,
192            checkpoint_lsn_observed: None,
193        };
194        Ok((id, writer, replay))
195    }
196
197    /// Existing directory. Replay every segment to surface committed
198    /// events + detect a torn tail; reopen the highest-id segment
199    /// for append; truncate it if the torn tail is in *that* segment.
200    fn open_existing(
201        segments: &SegmentDir,
202        entries: &[crate::dir::SegmentEntry],
203        checkpoint_lsn: Lsn,
204    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
205        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
206        let replay = replay_segments(&paths, checkpoint_lsn)?;
207
208        // The active segment is whichever file has the highest
209        // numeric id — segment file names are self-describing, so
210        // there is no separate CURRENT pointer.
211        let active = entries.last().expect("entries non-empty in open_existing");
212        let (mut writer, _torn_from_writer) =
213            SegmentWriter::open_for_append(segments.path_for(active.id))?;
214
215        // A torn tail in a *sealed* segment is impossible (sealed
216        // segments are never appended to), so we only need to handle
217        // the active one.
218        if let Some(t) = &replay.torn_tail {
219            if t.segment_path == active.path {
220                writer.truncate_to(t.last_good_offset)?;
221            } else {
222                return Err(WalError::Malformed(format!(
223                    "torn tail found in sealed segment {}",
224                    t.segment_path.display()
225                )));
226            }
227        }
228
229        Ok((active.id, writer, replay))
230    }
231
232    pub fn dir(&self) -> &Path {
233        self.segments.root()
234    }
235
236    pub fn sync_mode(&self) -> SyncMode {
237        self.sync_mode
238    }
239
240    pub fn durable_lsn(&self) -> Lsn {
241        self.state.lock().unwrap().durable_lsn
242    }
243
244    /// Latched message from the background flusher, if it has ever
245    /// failed an `fsync`. `None` means the WAL is healthy. Once set,
246    /// every commit / flush / force_fsync starts returning
247    /// [`WalError::Poisoned`] and the WAL stops accepting new
248    /// transactions until the operator restarts from the last
249    /// consistent snapshot + WAL.
250    pub fn bg_failure(&self) -> Option<String> {
251        self.bg_failure.lock().unwrap().clone()
252    }
253
254    /// Direct handle to the latched-failure mutex. Used by the bg
255    /// flusher to record an fsync failure exactly once. Hidden from
256    /// outside the module so the latch stays single-writer.
257    pub(super) fn bg_failure_slot(&self) -> &BgFailure {
258        &self.bg_failure
259    }
260
261    fn check_healthy(&self) -> Result<(), WalError> {
262        if self.bg_failure.lock().unwrap().is_some() {
263            return Err(WalError::Poisoned);
264        }
265        Ok(())
266    }
267
268    /// LSN that the *next* `begin` / `append` call will allocate.
269    /// Exposed for tests and for sanity checks at boot; not part of
270    /// any durability contract.
271    pub fn next_lsn(&self) -> Lsn {
272        self.state.lock().unwrap().next_lsn
273    }
274
275    pub fn oldest_segment_id(&self) -> u64 {
276        self.state.lock().unwrap().oldest_segment_id.raw()
277    }
278
279    pub fn active_segment_id(&self) -> u64 {
280        self.state.lock().unwrap().active_segment_id.raw()
281    }
282
283    /// Begin a new transaction. Allocates a `TxBegin` record and
284    /// returns its LSN, which the caller must thread back through
285    /// `append` / `commit` / `abort` so replay can group the events.
286    ///
287    /// If the active segment has crossed `segment_target_bytes`,
288    /// rotation happens here — `TxBegin` is the only record kind
289    /// guaranteed to be a transaction boundary, so rotating just
290    /// before its append keeps every transaction wholly in one
291    /// segment.
292    pub fn begin(&self) -> Result<Lsn, WalError> {
293        self.check_healthy()?;
294        let mut state = self.state.lock().unwrap();
295        self.maybe_rotate(&mut state)?;
296        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
297    }
298
299    /// Append a single mutation to the in-memory pending buffer of
300    /// the active segment. Not durable until `flush()` runs.
301    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
302        self.check_healthy()?;
303        let mut state = self.state.lock().unwrap();
304        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
305            lsn,
306            tx_begin_lsn,
307            event: event.clone(),
308        })
309    }
310
311    /// Append many mutations as one framed record. This keeps the replay
312    /// contract identical to repeated `append` calls while avoiding per-event
313    /// length/CRC/framing overhead for write-heavy statements.
314    pub fn append_batch(
315        &self,
316        tx_begin_lsn: Lsn,
317        events: Vec<MutationEvent>,
318    ) -> Result<Lsn, WalError> {
319        self.check_healthy()?;
320        if events.is_empty() {
321            return Err(WalError::Encode(
322                "mutation batch must contain at least one event".into(),
323            ));
324        }
325        let mut state = self.state.lock().unwrap();
326        Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
327            lsn,
328            tx_begin_lsn,
329            events,
330        })
331    }
332
333    /// Append a `TxCommit` marker. Caller is expected to subsequently
334    /// call `flush()` (under `SyncMode::PerCommit`) to make the
335    /// commit durable before returning to its caller.
336    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
337        self.check_healthy()?;
338        let mut state = self.state.lock().unwrap();
339        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
340    }
341
342    /// Append a `TxAbort` marker. Replay drops the events keyed by
343    /// `tx_begin_lsn` without re-applying them.
344    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
345        self.check_healthy()?;
346        let mut state = self.state.lock().unwrap();
347        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
348    }
349
350    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
351    /// LSN written into the snapshot file's header — replay uses
352    /// it to defend against the snapshot-rename-but-no-marker race.
353    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
354        self.check_healthy()?;
355        let mut state = self.state.lock().unwrap();
356        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
357            lsn,
358            snapshot_lsn,
359        })
360    }
361
362    /// Single-source-of-truth for "allocate the next LSN, build the
363    /// record, push it onto the active segment's pending buffer".
364    /// The five public append paths (`begin / append / commit / abort
365    /// / checkpoint_marker`) all funnel through here so the LSN
366    /// allocation never gets out of sync with the encoded record.
367    #[inline]
368    fn alloc_and_append(
369        state: &mut WalState,
370        build: impl FnOnce(Lsn) -> WalRecord,
371    ) -> Result<Lsn, WalError> {
372        let lsn = state.next_lsn;
373        state.next_lsn = lsn.next();
374        state.active_writer.append(&build(lsn))?;
375        Ok(lsn)
376    }
377
378    /// Flush the active segment's pending buffer.
379    ///
380    /// What "flush" means depends on [`SyncMode`]:
381    ///
382    /// - `PerCommit` — write the buffer to the OS, `fsync`, and
383    ///   advance `durable_lsn`. The strongest contract: every
384    ///   record up to `next_lsn - 1` is on disk.
385    /// - `Group` — write the buffer to the OS, but let the background
386    ///   flusher fsync and advance `durable_lsn` on its cadence.
387    /// - `None` — write the buffer to the OS only, but advance
388    ///   `durable_lsn` anyway. The mode opts out of crash
389    ///   durability, so the checkpoint fence reports
390    ///   "what's been written" instead of "what's actually safe".
391    pub fn flush(&self) -> Result<(), WalError> {
392        self.check_healthy()?;
393        self.flush_inner(FlushKind::PerConfiguredMode)
394    }
395
396    /// Unconditionally write the buffer to the OS, `fsync`, and
397    /// advance `durable_lsn`. Used by callers that need a durability
398    /// point right now regardless of the configured cadence (e.g.
399    /// checkpoint). Returns [`WalError::Poisoned`] if the bg flusher
400    /// has already failed.
401    pub fn force_fsync(&self) -> Result<(), WalError> {
402        self.check_healthy()?;
403        self.flush_inner(FlushKind::ForceFsync)
404    }
405
406    /// Single source of truth for the flush state machine. Skips the
407    /// `check_healthy` gate so the bg flusher can call into it
408    /// without recursing through its own latch.
409    pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
410        let mut state = self.state.lock().unwrap();
411        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
412
413        // Decide whether this call is allowed to advance
414        // `durable_lsn`. The bg flusher's job in Group mode is to advance
415        // that fence after fsync; PerCommit and None do it inline; Group's
416        // user-driven `flush()` only pushes bytes to the OS.
417        let do_fsync = matches!(
418            (kind, self.sync_mode),
419            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
420        );
421        let advance_durable = matches!(
422            (kind, self.sync_mode),
423            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
424        );
425
426        if do_fsync {
427            state.active_writer.flush_and_sync()?;
428        } else {
429            state.active_writer.flush_buffer()?;
430        }
431        if advance_durable {
432            state.durable_lsn = written_lsn;
433        }
434        Ok(())
435    }
436
437    /// Drop sealed segments whose entire LSN range is at or below
438    /// `fence_lsn`. Idempotent and safe to call repeatedly.
439    ///
440    /// The active segment is never deleted — even if every record in
441    /// it predates the fence, it is still the rotation target for
442    /// new appends. The segment immediately before the active one
443    /// is also kept as a tombstone so a subsequent crash before the
444    /// next checkpoint still finds a self-describing log start.
445    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
446        let mut state = self.state.lock().unwrap();
447        let active_id = state.active_segment_id;
448        let entries = self.segments.list()?;
449
450        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
451        for (i, entry) in entries.iter().enumerate() {
452            // Active segment and the one immediately preceding it
453            // are kept by policy.
454            if entry.id >= active_id.saturating_prev() {
455                break;
456            }
457            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
458            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
459            let next = match entries.get(i + 1) {
460                Some(n) => n,
461                None => break,
462            };
463            let next_base = SegmentDir::base_lsn(&next.path)?;
464            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
465                to_drop.push(entry.clone());
466            }
467        }
468
469        for entry in to_drop {
470            fs::remove_file(&entry.path)?;
471            if entry.id >= state.oldest_segment_id {
472                state.oldest_segment_id = entry.id.next();
473            }
474        }
475        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
476            self.segments.sync_dir()?;
477        }
478        Ok(())
479    }
480
481    /// Rotate the active segment when it has grown past
482    /// `segment_target_bytes`. Called from `begin()` so rotation only
483    /// ever lands at a transaction boundary.
484    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
485        if state.active_writer.bytes_written() < self.segment_target_bytes {
486            return Ok(());
487        }
488        // Seal the current segment (forces a flush + fsync) and open
489        // a fresh one with `base_lsn = next_lsn` so the segment file
490        // names line up with the record LSNs they contain.
491        state.active_writer.flush_and_sync()?;
492        state.active_writer.seal()?;
493
494        let next_id = state.active_segment_id.next();
495        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
496        self.segments.sync_dir()?;
497        state.active_writer = writer;
498        state.active_segment_id = next_id;
499        Ok(())
500    }
501}
502
503impl Drop for Wal {
504    fn drop(&mut self) {
505        if matches!(self.sync_mode, SyncMode::Group { .. }) {
506            let _ = self.flush_inner(FlushKind::ForceFsync);
507        }
508        // Join the group flusher, if any, before the directory lock is
509        // released. That keeps the "one live append owner" boundary intact
510        // through shutdown.
511        if let Ok(slot) = self.flusher.get_mut() {
512            let _ = slot.take();
513        }
514    }
515}