lora_wal/wal/wal.rs
1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//! 0000000001.wal sealed segment
8//! 0000000002.wal sealed segment
9//! 0000000003.wal active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29use std::path::Path;
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32
33use lora_store::MutationEvent;
34
35use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
36use crate::config::SyncMode;
37use crate::dir::{SegmentDir, SegmentId};
38use crate::errors::WalError;
39use crate::lock::DirLock;
40use crate::lsn::Lsn;
41use crate::record::WalRecord;
42use crate::replay::{replay_segments, ReplayOutcome};
43use crate::segment::SegmentWriter;
44
45/// State guarded by the inner `Mutex`. Nothing in this struct is
46/// `Send`-unsafe; the lock is purely for `&self`-safe interior
47/// mutation.
48struct WalState {
49 next_lsn: Lsn,
50 durable_lsn: Lsn,
51 active_segment_id: SegmentId,
52 active_writer: SegmentWriter,
53 /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
54 oldest_segment_id: SegmentId,
55}
56
57/// Latched failure from the background flusher. Wrapped in a `Mutex`
58/// instead of an `AtomicCell<Option<String>>` because failures are
59/// rare and we want the message preserved verbatim for operator-facing
60/// reporting (`/admin/wal/status` `bgFailure`). Once `Some`, every
61/// subsequent commit/flush returns [`WalError::Poisoned`] and the
62/// operator is expected to restart from the last consistent
63/// snapshot + WAL.
64type BgFailure = Mutex<Option<String>>;
65
66/// Selects the durability work that [`Wal::flush_inner`] actually does.
67/// Centralising the three modes here means `flush` and `force_fsync`
68/// share one code path and the call sites don't have to remember which
69/// mode advances `durable_lsn` and which does not.
70#[derive(Debug, Clone, Copy)]
71pub(super) enum FlushKind {
72 /// Honour the configured [`SyncMode`]. This is what the recorder's
73 /// `flush()` calls into.
74 PerConfiguredMode,
75 /// Always write the buffer + fsync + advance `durable_lsn`,
76 /// regardless of mode. Used by checkpoints and the bg flusher.
77 ForceFsync,
78}
79
80/// Live, append-side WAL handle.
81///
82/// Construct via [`Wal::open`]. The returned tuple includes the list of
83/// committed mutation events that need to be re-applied to the
84/// in-memory store before any new traffic is accepted.
85///
86/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
87/// background flusher needs a `Weak<Wal>` to call back into without
88/// taking a strong reference (which would prevent shutdown).
89pub struct Wal {
90 segments: SegmentDir,
91 sync_mode: SyncMode,
92 segment_target_bytes: u64,
93 state: Mutex<WalState>,
94 /// Latched bg-flusher failure; surfaced via [`Wal::bg_failure`] and
95 /// propagated to commit/flush/force_fsync as
96 /// [`WalError::Poisoned`].
97 bg_failure: Arc<BgFailure>,
98 /// Background flusher for `SyncMode::Group`. `Drop` joins the
99 /// thread, so a `Wal` going out of scope is a clean shutdown
100 /// signal.
101 flusher: Mutex<Option<GroupFlusherHandle>>,
102 /// Held for the lifetime of the WAL so a second handle cannot append
103 /// to the same active segment concurrently.
104 _dir_lock: DirLock,
105}
106
107impl Wal {
108 /// Open or create the WAL directory at `dir`.
109 ///
110 /// `checkpoint_lsn` is the LSN stamped into the most recent
111 /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
112 /// there is no snapshot). Replay skips records at or below this
113 /// fence — they are already represented in the loaded state.
114 ///
115 /// Returns `(wal, committed_events)`. The caller is expected to
116 /// apply every event in `committed_events` to its in-memory store
117 /// in order before issuing any new `begin` / `append` calls.
118 pub fn open(
119 dir: impl Into<std::path::PathBuf>,
120 sync_mode: SyncMode,
121 segment_target_bytes: u64,
122 checkpoint_lsn: Lsn,
123 ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
124 let segments = SegmentDir::new(dir);
125 fs::create_dir_all(segments.root())?;
126 let dir_lock = DirLock::acquire(segments.root())?;
127
128 let entries = segments.list()?;
129 let (active_id, active_writer, replay) = if entries.is_empty() {
130 Self::open_fresh(&segments)?
131 } else {
132 Self::open_existing(&segments, &entries, checkpoint_lsn)?
133 };
134
135 let next_lsn = if replay.max_lsn.is_zero() {
136 Lsn::new(1)
137 } else {
138 replay.max_lsn.next()
139 };
140 // Treat everything readable at open time as the recovered
141 // durability fence. This does not prove the bytes were
142 // fsync-confirmed before the previous process died; it means
143 // they survived to this open and future appends must start
144 // after them.
145 let durable_lsn = replay.max_lsn;
146
147 let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
148
149 let state = WalState {
150 next_lsn,
151 durable_lsn,
152 active_segment_id: active_id,
153 active_writer,
154 oldest_segment_id,
155 };
156
157 let wal = Arc::new(Self {
158 segments,
159 sync_mode,
160 segment_target_bytes,
161 state: Mutex::new(state),
162 bg_failure: Arc::new(Mutex::new(None)),
163 flusher: Mutex::new(None),
164 _dir_lock: dir_lock,
165 });
166
167 // Spawn the Group flusher *after* the Arc exists so it can
168 // hold a `Weak<Wal>` that drops when the last strong ref
169 // does. The flusher's own Drop joins the thread, so removing
170 // the field (e.g. on Wal::drop) is a clean shutdown signal.
171 if let SyncMode::Group { interval_ms } = sync_mode {
172 let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
173 let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
174 *wal.flusher.lock().unwrap() = Some(handle);
175 }
176
177 Ok((wal, replay.committed_events))
178 }
179
180 /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
181 /// so LSN 0 stays reserved for "empty / never written".
182 fn open_fresh(
183 segments: &SegmentDir,
184 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
185 let id = SegmentId::FIRST;
186 let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
187 segments.sync_dir()?;
188 let replay = ReplayOutcome {
189 committed_events: Vec::new(),
190 max_lsn: Lsn::ZERO,
191 torn_tail: None,
192 checkpoint_lsn_observed: None,
193 };
194 Ok((id, writer, replay))
195 }
196
197 /// Existing directory. Replay every segment to surface committed
198 /// events + detect a torn tail; reopen the highest-id segment
199 /// for append; truncate it if the torn tail is in *that* segment.
200 fn open_existing(
201 segments: &SegmentDir,
202 entries: &[crate::dir::SegmentEntry],
203 checkpoint_lsn: Lsn,
204 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
205 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
206 let replay = replay_segments(&paths, checkpoint_lsn)?;
207
208 // The active segment is whichever file has the highest
209 // numeric id — segment file names are self-describing, so
210 // there is no separate CURRENT pointer.
211 let active = entries.last().expect("entries non-empty in open_existing");
212 let (mut writer, _torn_from_writer) =
213 SegmentWriter::open_for_append(segments.path_for(active.id))?;
214
215 // A torn tail in a *sealed* segment is impossible (sealed
216 // segments are never appended to), so we only need to handle
217 // the active one.
218 if let Some(t) = &replay.torn_tail {
219 if t.segment_path == active.path {
220 writer.truncate_to(t.last_good_offset)?;
221 } else {
222 return Err(WalError::Malformed(format!(
223 "torn tail found in sealed segment {}",
224 t.segment_path.display()
225 )));
226 }
227 }
228
229 Ok((active.id, writer, replay))
230 }
231
232 pub fn dir(&self) -> &Path {
233 self.segments.root()
234 }
235
236 pub fn sync_mode(&self) -> SyncMode {
237 self.sync_mode
238 }
239
240 pub fn durable_lsn(&self) -> Lsn {
241 self.state.lock().unwrap().durable_lsn
242 }
243
244 /// Latched message from the background flusher, if it has ever
245 /// failed an `fsync`. `None` means the WAL is healthy. Once set,
246 /// every commit / flush / force_fsync starts returning
247 /// [`WalError::Poisoned`] and the WAL stops accepting new
248 /// transactions until the operator restarts from the last
249 /// consistent snapshot + WAL.
250 pub fn bg_failure(&self) -> Option<String> {
251 self.bg_failure.lock().unwrap().clone()
252 }
253
254 /// Direct handle to the latched-failure mutex. Used by the bg
255 /// flusher to record an fsync failure exactly once. Hidden from
256 /// outside the module so the latch stays single-writer.
257 pub(super) fn bg_failure_slot(&self) -> &BgFailure {
258 &self.bg_failure
259 }
260
261 fn check_healthy(&self) -> Result<(), WalError> {
262 if self.bg_failure.lock().unwrap().is_some() {
263 return Err(WalError::Poisoned);
264 }
265 Ok(())
266 }
267
268 /// LSN that the *next* `begin` / `append` call will allocate.
269 /// Exposed for tests and for sanity checks at boot; not part of
270 /// any durability contract.
271 pub fn next_lsn(&self) -> Lsn {
272 self.state.lock().unwrap().next_lsn
273 }
274
275 pub fn oldest_segment_id(&self) -> u64 {
276 self.state.lock().unwrap().oldest_segment_id.raw()
277 }
278
279 pub fn active_segment_id(&self) -> u64 {
280 self.state.lock().unwrap().active_segment_id.raw()
281 }
282
283 /// Begin a new transaction. Allocates a `TxBegin` record and
284 /// returns its LSN, which the caller must thread back through
285 /// `append` / `commit` / `abort` so replay can group the events.
286 ///
287 /// If the active segment has crossed `segment_target_bytes`,
288 /// rotation happens here — `TxBegin` is the only record kind
289 /// guaranteed to be a transaction boundary, so rotating just
290 /// before its append keeps every transaction wholly in one
291 /// segment.
292 pub fn begin(&self) -> Result<Lsn, WalError> {
293 self.check_healthy()?;
294 let mut state = self.state.lock().unwrap();
295 self.maybe_rotate(&mut state)?;
296 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
297 }
298
299 /// Append a single mutation to the in-memory pending buffer of
300 /// the active segment. Not durable until `flush()` runs.
301 pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
302 self.check_healthy()?;
303 let mut state = self.state.lock().unwrap();
304 Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
305 lsn,
306 tx_begin_lsn,
307 event: event.clone(),
308 })
309 }
310
311 /// Append many mutations as one framed record. This keeps the replay
312 /// contract identical to repeated `append` calls while avoiding per-event
313 /// length/CRC/framing overhead for write-heavy statements.
314 pub fn append_batch(
315 &self,
316 tx_begin_lsn: Lsn,
317 events: Vec<MutationEvent>,
318 ) -> Result<Lsn, WalError> {
319 self.check_healthy()?;
320 if events.is_empty() {
321 return Err(WalError::Encode(
322 "mutation batch must contain at least one event".into(),
323 ));
324 }
325 let mut state = self.state.lock().unwrap();
326 Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
327 lsn,
328 tx_begin_lsn,
329 events,
330 })
331 }
332
333 /// Append a `TxCommit` marker. Caller is expected to subsequently
334 /// call `flush()` (under `SyncMode::PerCommit`) to make the
335 /// commit durable before returning to its caller.
336 pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
337 self.check_healthy()?;
338 let mut state = self.state.lock().unwrap();
339 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
340 }
341
342 /// Append a `TxAbort` marker. Replay drops the events keyed by
343 /// `tx_begin_lsn` without re-applying them.
344 pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
345 self.check_healthy()?;
346 let mut state = self.state.lock().unwrap();
347 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
348 }
349
350 /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
351 /// LSN written into the snapshot file's header — replay uses
352 /// it to defend against the snapshot-rename-but-no-marker race.
353 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
354 self.check_healthy()?;
355 let mut state = self.state.lock().unwrap();
356 Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
357 lsn,
358 snapshot_lsn,
359 })
360 }
361
362 /// Single-source-of-truth for "allocate the next LSN, build the
363 /// record, push it onto the active segment's pending buffer".
364 /// The five public append paths (`begin / append / commit / abort
365 /// / checkpoint_marker`) all funnel through here so the LSN
366 /// allocation never gets out of sync with the encoded record.
367 #[inline]
368 fn alloc_and_append(
369 state: &mut WalState,
370 build: impl FnOnce(Lsn) -> WalRecord,
371 ) -> Result<Lsn, WalError> {
372 let lsn = state.next_lsn;
373 state.next_lsn = lsn.next();
374 state.active_writer.append(&build(lsn))?;
375 Ok(lsn)
376 }
377
378 /// Flush the active segment's pending buffer.
379 ///
380 /// What "flush" means depends on [`SyncMode`]:
381 ///
382 /// - `PerCommit` — write the buffer to the OS, `fsync`, and
383 /// advance `durable_lsn`. The strongest contract: every
384 /// record up to `next_lsn - 1` is on disk.
385 /// - `Group` — write the buffer to the OS, but let the background
386 /// flusher fsync and advance `durable_lsn` on its cadence.
387 /// - `None` — write the buffer to the OS only, but advance
388 /// `durable_lsn` anyway. The mode opts out of crash
389 /// durability, so the checkpoint fence reports
390 /// "what's been written" instead of "what's actually safe".
391 pub fn flush(&self) -> Result<(), WalError> {
392 self.check_healthy()?;
393 self.flush_inner(FlushKind::PerConfiguredMode)
394 }
395
396 /// Unconditionally write the buffer to the OS, `fsync`, and
397 /// advance `durable_lsn`. Used by callers that need a durability
398 /// point right now regardless of the configured cadence (e.g.
399 /// checkpoint). Returns [`WalError::Poisoned`] if the bg flusher
400 /// has already failed.
401 pub fn force_fsync(&self) -> Result<(), WalError> {
402 self.check_healthy()?;
403 self.flush_inner(FlushKind::ForceFsync)
404 }
405
406 /// Single source of truth for the flush state machine. Skips the
407 /// `check_healthy` gate so the bg flusher can call into it
408 /// without recursing through its own latch.
409 pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
410 let mut state = self.state.lock().unwrap();
411 let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
412
413 // Decide whether this call is allowed to advance
414 // `durable_lsn`. The bg flusher's job in Group mode is to advance
415 // that fence after fsync; PerCommit and None do it inline; Group's
416 // user-driven `flush()` only pushes bytes to the OS.
417 let do_fsync = matches!(
418 (kind, self.sync_mode),
419 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
420 );
421 let advance_durable = matches!(
422 (kind, self.sync_mode),
423 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
424 );
425
426 if do_fsync {
427 state.active_writer.flush_and_sync()?;
428 } else {
429 state.active_writer.flush_buffer()?;
430 }
431 if advance_durable {
432 state.durable_lsn = written_lsn;
433 }
434 Ok(())
435 }
436
437 /// Drop sealed segments whose entire LSN range is at or below
438 /// `fence_lsn`. Idempotent and safe to call repeatedly.
439 ///
440 /// The active segment is never deleted — even if every record in
441 /// it predates the fence, it is still the rotation target for
442 /// new appends. The segment immediately before the active one
443 /// is also kept as a tombstone so a subsequent crash before the
444 /// next checkpoint still finds a self-describing log start.
445 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
446 let mut state = self.state.lock().unwrap();
447 let active_id = state.active_segment_id;
448 let entries = self.segments.list()?;
449
450 let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
451 for (i, entry) in entries.iter().enumerate() {
452 // Active segment and the one immediately preceding it
453 // are kept by policy.
454 if entry.id >= active_id.saturating_prev() {
455 break;
456 }
457 // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
458 // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
459 let next = match entries.get(i + 1) {
460 Some(n) => n,
461 None => break,
462 };
463 let next_base = SegmentDir::base_lsn(&next.path)?;
464 if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
465 to_drop.push(entry.clone());
466 }
467 }
468
469 for entry in to_drop {
470 fs::remove_file(&entry.path)?;
471 if entry.id >= state.oldest_segment_id {
472 state.oldest_segment_id = entry.id.next();
473 }
474 }
475 if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
476 self.segments.sync_dir()?;
477 }
478 Ok(())
479 }
480
481 /// Rotate the active segment when it has grown past
482 /// `segment_target_bytes`. Called from `begin()` so rotation only
483 /// ever lands at a transaction boundary.
484 fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
485 if state.active_writer.bytes_written() < self.segment_target_bytes {
486 return Ok(());
487 }
488 // Seal the current segment (forces a flush + fsync) and open
489 // a fresh one with `base_lsn = next_lsn` so the segment file
490 // names line up with the record LSNs they contain.
491 state.active_writer.flush_and_sync()?;
492 state.active_writer.seal()?;
493
494 let next_id = state.active_segment_id.next();
495 let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
496 self.segments.sync_dir()?;
497 state.active_writer = writer;
498 state.active_segment_id = next_id;
499 Ok(())
500 }
501}
502
503impl Drop for Wal {
504 fn drop(&mut self) {
505 if matches!(self.sync_mode, SyncMode::Group { .. }) {
506 let _ = self.flush_inner(FlushKind::ForceFsync);
507 }
508 // Join the group flusher, if any, before the directory lock is
509 // released. That keeps the "one live append owner" boundary intact
510 // through shutdown.
511 if let Ok(slot) = self.flusher.get_mut() {
512 let _ = slot.take();
513 }
514 }
515}