lora_wal/wal/wal.rs
1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//! 0000000001.wal sealed segment
8//! 0000000002.wal sealed segment
9//! 0000000003.wal active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29use std::path::Path;
30use std::sync::{Arc, Mutex};
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Duration;
33
34use lora_store::MutationEvent;
35
36#[cfg(not(target_arch = "wasm32"))]
37use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
38use crate::config::SyncMode;
39use crate::dir::{SegmentDir, SegmentId};
40use crate::errors::WalError;
41use crate::lock::DirLock;
42use crate::lsn::Lsn;
43use crate::record::WalRecord;
44use crate::recorder::WroteCommit;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48/// State guarded by the inner `Mutex`. Nothing in this struct is
49/// `Send`-unsafe; the lock is purely for `&self`-safe interior
50/// mutation.
51struct WalState {
52 next_lsn: Lsn,
53 durable_lsn: Lsn,
54 active_segment_id: SegmentId,
55 active_writer: SegmentWriter,
56 /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
57 oldest_segment_id: SegmentId,
58}
59
60/// Reserved latch for durability failures that occur outside the immediate
61/// caller path. Wrapped in a `Mutex` instead of an
62/// `AtomicCell<Option<String>>` because failures are rare and we want the
63/// message preserved verbatim for operator-facing reporting
64/// (`/admin/wal/status` `bgFailure`). Once `Some`, every subsequent
65/// commit/flush returns [`WalError::Poisoned`] and the operator is expected to
66/// restart from the last consistent snapshot + WAL.
67type BgFailure = Mutex<Option<String>>;
68
69/// Selects the durability work that [`Wal::flush_inner`] actually does.
70/// Centralising the normal write-only path and the forced fsync path keeps
71/// the call sites from duplicating durable-LSN rules.
72#[derive(Debug, Clone, Copy)]
73pub(super) enum FlushKind {
74 /// Write pending WAL bytes to the OS without forcing storage durability.
75 /// This is what commits do under [`SyncMode::GroupSync`].
76 PerConfiguredMode,
77 /// Write pending WAL bytes, fsync, and advance `durable_lsn`. Used by
78 /// checkpoints, explicit sync, the background flusher, and clean drop.
79 ForceFsync,
80}
81
82/// Live, append-side WAL handle.
83///
84/// Construct via [`Wal::open`]. The returned tuple includes the list of
85/// committed mutation events that need to be re-applied to the
86/// in-memory store before any new traffic is accepted.
87///
88/// `Wal::open` returns `Arc<Self>` because the optional GroupSync
89/// background flusher needs a `Weak<Wal>` to call back into without
90/// taking a strong reference (which would prevent shutdown).
91pub struct Wal {
92 segments: SegmentDir,
93 sync_mode: SyncMode,
94 segment_target_bytes: u64,
95 state: Mutex<WalState>,
96 /// Latched durability failure; surfaced via [`Wal::bg_failure`] and
97 /// propagated to commit/flush/force_fsync as [`WalError::Poisoned`].
98 bg_failure: Arc<BgFailure>,
99 /// Background flusher for `SyncMode::GroupSync`. `Drop` joins the
100 /// thread, so a `Wal` going out of scope is a clean shutdown
101 /// signal. Absent on `wasm32`, where GroupSync falls back to the
102 /// drop-time flush.
103 #[cfg(not(target_arch = "wasm32"))]
104 flusher: Mutex<Option<GroupFlusherHandle>>,
105 /// Held for the lifetime of the WAL so a second handle cannot append
106 /// to the same active segment concurrently.
107 _dir_lock: DirLock,
108}
109
110impl Wal {
111 /// Open or create the WAL directory at `dir`.
112 ///
113 /// `checkpoint_lsn` is the LSN stamped into the most recent
114 /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
115 /// there is no snapshot). Replay skips records at or below this
116 /// fence — they are already represented in the loaded state.
117 ///
118 /// Returns `(wal, committed_events)`. The caller is expected to
119 /// apply every event in `committed_events` to its in-memory store
120 /// in order before issuing any new `begin` / `append` calls.
121 pub fn open(
122 dir: impl Into<std::path::PathBuf>,
123 sync_mode: SyncMode,
124 segment_target_bytes: u64,
125 checkpoint_lsn: Lsn,
126 ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127 let segments = SegmentDir::new(dir);
128 fs::create_dir_all(segments.root())?;
129 let dir_lock = DirLock::acquire(segments.root())?;
130
131 let entries = segments.list()?;
132 let (active_id, active_writer, replay) = if entries.is_empty() {
133 Self::open_fresh(&segments)?
134 } else {
135 Self::open_existing(&segments, &entries, checkpoint_lsn)?
136 };
137
138 let next_lsn = if replay.max_lsn.is_zero() {
139 Lsn::new(1)
140 } else {
141 replay.max_lsn.next()
142 };
143 // Treat everything readable at open time as the recovered
144 // durability fence. This does not prove the bytes were
145 // fsync-confirmed before the previous process died; it means
146 // they survived to this open and future appends must start
147 // after them.
148 let durable_lsn = replay.max_lsn;
149
150 let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
151
152 let state = WalState {
153 next_lsn,
154 durable_lsn,
155 active_segment_id: active_id,
156 active_writer,
157 oldest_segment_id,
158 };
159
160 let wal = Arc::new(Self {
161 segments,
162 sync_mode,
163 segment_target_bytes,
164 state: Mutex::new(state),
165 bg_failure: Arc::new(Mutex::new(None)),
166 #[cfg(not(target_arch = "wasm32"))]
167 flusher: Mutex::new(None),
168 _dir_lock: dir_lock,
169 });
170
171 // Spawn the GroupSync flusher *after* the Arc exists so it can hold a
172 // `Weak<Wal>` that drops when the last strong ref does. The flusher's
173 // own Drop joins the thread, so removing the field on `Wal::drop` is
174 // a clean shutdown signal. Wasm has no real fsync boundary and no
175 // thread support, so GroupSync there relies on the drop-time flush.
176 #[cfg(not(target_arch = "wasm32"))]
177 {
178 let SyncMode::GroupSync { interval_ms } = sync_mode;
179 let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
180 let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
181 *wal.flusher.lock().unwrap() = Some(handle);
182 }
183
184 Ok((wal, replay.committed_events))
185 }
186
187 /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
188 /// so LSN 0 stays reserved for "empty / never written".
189 fn open_fresh(
190 segments: &SegmentDir,
191 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
192 let id = SegmentId::FIRST;
193 let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
194 segments.sync_dir()?;
195 let replay = ReplayOutcome {
196 committed_events: Vec::new(),
197 max_lsn: Lsn::ZERO,
198 torn_tail: None,
199 checkpoint_lsn_observed: None,
200 last_good_offset: crate::segment::SEGMENT_HEADER_LEN as u64,
201 };
202 Ok((id, writer, replay))
203 }
204
205 /// Existing directory. Replay every segment to surface committed
206 /// events + detect a torn tail; reopen the highest-id segment
207 /// for append; truncate it if the torn tail is in *that* segment.
208 fn open_existing(
209 segments: &SegmentDir,
210 entries: &[crate::dir::SegmentEntry],
211 checkpoint_lsn: Lsn,
212 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
213 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
214 let replay = replay_segments(&paths, checkpoint_lsn)?;
215
216 // The active segment is whichever file has the highest
217 // numeric id — segment file names are self-describing, so
218 // there is no separate CURRENT pointer.
219 let active = entries.last().expect("entries non-empty in open_existing");
220 let mut writer = SegmentWriter::open_for_append_at(
221 segments.path_for(active.id),
222 replay.last_good_offset,
223 )?;
224
225 // A torn tail in a *sealed* segment is impossible (sealed
226 // segments are never appended to), so we only need to handle
227 // the active one.
228 if let Some(t) = &replay.torn_tail {
229 if t.segment_path == active.path {
230 writer.truncate_to(t.last_good_offset)?;
231 } else {
232 return Err(WalError::Malformed(format!(
233 "torn tail found in sealed segment {}",
234 t.segment_path.display()
235 )));
236 }
237 }
238
239 Ok((active.id, writer, replay))
240 }
241
242 pub fn dir(&self) -> &Path {
243 self.segments.root()
244 }
245
246 pub fn sync_mode(&self) -> SyncMode {
247 self.sync_mode
248 }
249
250 pub fn durable_lsn(&self) -> Lsn {
251 self.state.lock().unwrap().durable_lsn
252 }
253
254 /// Latched durability failure, if any. `None` means the WAL is healthy.
255 /// Once set, every commit / flush / force_fsync starts returning
256 /// [`WalError::Poisoned`] and the WAL stops accepting new
257 /// transactions until the operator restarts from the last
258 /// consistent snapshot + WAL.
259 pub fn bg_failure(&self) -> Option<String> {
260 self.bg_failure.lock().unwrap().clone()
261 }
262
263 /// Direct handle to the latched-failure mutex. Used by the bg
264 /// flusher to record an fsync failure exactly once. Hidden from
265 /// outside the module so the latch stays single-writer.
266 #[cfg(not(target_arch = "wasm32"))]
267 pub(super) fn bg_failure_slot(&self) -> &BgFailure {
268 &self.bg_failure
269 }
270
271 fn check_healthy(&self) -> Result<(), WalError> {
272 if self.bg_failure.lock().unwrap().is_some() {
273 return Err(WalError::Poisoned);
274 }
275 Ok(())
276 }
277
278 /// LSN that the *next* `begin` / `append` call will allocate.
279 /// Exposed for tests and for sanity checks at boot; not part of
280 /// any durability contract.
281 pub fn next_lsn(&self) -> Lsn {
282 self.state.lock().unwrap().next_lsn
283 }
284
285 pub fn oldest_segment_id(&self) -> u64 {
286 self.state.lock().unwrap().oldest_segment_id.raw()
287 }
288
289 pub fn active_segment_id(&self) -> u64 {
290 self.state.lock().unwrap().active_segment_id.raw()
291 }
292
293 // -------------------------------------------------------------
294 // Low-level record primitives.
295 //
296 // Production code does **not** use these directly — every commit
297 // goes through [`Self::commit_tx`], which writes the begin/batch/
298 // commit triple atomically and routes durability through the
299 // configured single-thread flush policy. The methods below remain
300 // `pub` for the crate's own integration tests and for the rare
301 // admin path (`checkpoint_marker`) that needs to insert a single record.
302 // Mixing them with `commit_tx` against the same WAL is supported
303 // but unnecessary; if you find yourself calling `begin` /
304 // `append` / `commit` from a new caller, prefer `commit_tx`
305 // unless you specifically need the partial-write shape.
306 // -------------------------------------------------------------
307
308 /// Allocate a `TxBegin` record and return its LSN. *Test/admin
309 /// primitive.* Production commits use [`Self::commit_tx`].
310 ///
311 /// Rotation happens here so a transaction is always wholly within
312 /// one segment.
313 pub fn begin(&self) -> Result<Lsn, WalError> {
314 self.check_healthy()?;
315 let mut state = self.state.lock().unwrap();
316 self.maybe_rotate(&mut state)?;
317 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
318 }
319
320 /// Append a single mutation to the active segment's pending
321 /// buffer. *Test/admin primitive.* Not durable until `flush()`
322 /// runs; production commits use [`Self::commit_tx`].
323 pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
324 self.check_healthy()?;
325 let mut state = self.state.lock().unwrap();
326 Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
327 lsn,
328 tx_begin_lsn,
329 event: event.clone(),
330 })
331 }
332
333 /// Append many mutations as one framed record. *Test/admin
334 /// primitive.* Production commits use [`Self::commit_tx`], which
335 /// writes the begin/batch/commit triple in a single critical
336 /// section.
337 pub fn append_batch(
338 &self,
339 tx_begin_lsn: Lsn,
340 events: Vec<MutationEvent>,
341 ) -> Result<Lsn, WalError> {
342 self.check_healthy()?;
343 if events.is_empty() {
344 return Err(WalError::Encode(
345 "mutation batch must contain at least one event".into(),
346 ));
347 }
348 let mut state = self.state.lock().unwrap();
349 Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
350 lsn,
351 tx_begin_lsn,
352 events,
353 })
354 }
355
356 /// Append a standalone `TxCommit` marker. *Test/admin primitive.*
357 /// Production commits use [`Self::commit_tx`].
358 pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
359 self.check_healthy()?;
360 let mut state = self.state.lock().unwrap();
361 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
362 }
363
364 /// Append a `TxAbort` marker. *Test/admin primitive.* Production
365 /// code never writes `TxAbort`: [`Self::commit_tx`] writes the
366 /// begin/batch/commit triple atomically, so an aborted query has
367 /// nothing on disk to mark as aborted.
368 pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
369 self.check_healthy()?;
370 let mut state = self.state.lock().unwrap();
371 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
372 }
373
374 /// One-shot transaction commit.
375 ///
376 /// Encodes `TxBegin` + `MutationBatch` + `TxCommit` as a single
377 /// contiguous run inside one short critical section, then applies the
378 /// configured flush policy. Compared to the legacy
379 /// `begin → append_batch → commit → flush` sequence this collapses
380 /// four separate state-lock acquisitions into one while preserving the
381 /// release's single-writer execution model. Future concurrent commit
382 /// plumbing can build around this one-shot boundary without changing the
383 /// recorder contract.
384 ///
385 /// Returns [`WroteCommit::No`] for an empty event list (no records
386 /// are written, no fsync is issued).
387 pub fn commit_tx(&self, events: Vec<MutationEvent>) -> Result<WroteCommit, WalError> {
388 self.check_healthy()?;
389 if events.is_empty() {
390 return Ok(WroteCommit::No);
391 }
392
393 // Phase 1: allocate the LSN window and encode all three
394 // records into the active segment's pending buffer in one
395 // critical section. Collapsing what was four separate state
396 // lock acquisitions (begin / append_batch / commit / flush)
397 // into one is the lock-side win that pairs with the
398 // lock-free emit short-circuit on the recorder side.
399 {
400 let mut state = self.state.lock().unwrap();
401 self.maybe_rotate(&mut state)?;
402 let begin_lsn = state.next_lsn;
403 let batch_lsn = begin_lsn.next();
404 let commit_lsn = batch_lsn.next();
405 state.next_lsn = commit_lsn.next();
406 state
407 .active_writer
408 .append(&WalRecord::TxBegin { lsn: begin_lsn })?;
409 state.active_writer.append(&WalRecord::MutationBatch {
410 lsn: batch_lsn,
411 tx_begin_lsn: begin_lsn,
412 events,
413 })?;
414 state.active_writer.append(&WalRecord::TxCommit {
415 lsn: commit_lsn,
416 tx_begin_lsn: begin_lsn,
417 })?;
418 }
419
420 // Phase 2: make commit bytes visible to the OS page cache. Storage
421 // durability is provided by the GroupSync flusher or an explicit
422 // force_fsync/checkpoint/sync/drop boundary.
423 self.flush_inner(FlushKind::PerConfiguredMode)?;
424
425 Ok(WroteCommit::Yes)
426 }
427
428 /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
429 /// LSN written into the snapshot file's header — replay uses
430 /// it to defend against the snapshot-rename-but-no-marker race.
431 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
432 self.check_healthy()?;
433 let mut state = self.state.lock().unwrap();
434 Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
435 lsn,
436 snapshot_lsn,
437 })
438 }
439
440 /// Single-source-of-truth for "allocate the next LSN, build the
441 /// record, push it onto the active segment's pending buffer".
442 /// The five public append paths (`begin / append / commit / abort
443 /// / checkpoint_marker`) all funnel through here so the LSN
444 /// allocation never gets out of sync with the encoded record.
445 #[inline]
446 fn alloc_and_append(
447 state: &mut WalState,
448 build: impl FnOnce(Lsn) -> WalRecord,
449 ) -> Result<Lsn, WalError> {
450 let lsn = state.next_lsn;
451 state.next_lsn = lsn.next();
452 state.active_writer.append(&build(lsn))?;
453 Ok(lsn)
454 }
455
456 /// Flush the active segment's pending buffer.
457 ///
458 /// Under [`SyncMode::GroupSync`], a normal flush writes bytes to the OS
459 /// but leaves `durable_lsn` unchanged until an explicit `force_fsync`,
460 /// checkpoint, sync, the background flusher, or clean drop.
461 pub fn flush(&self) -> Result<(), WalError> {
462 self.check_healthy()?;
463 self.flush_inner(FlushKind::PerConfiguredMode)
464 }
465
466 /// Unconditionally write the buffer to the OS, `fsync`, and
467 /// advance `durable_lsn`. Used by callers that need a durability
468 /// point right now regardless of the configured cadence (e.g.
469 /// checkpoint). Returns [`WalError::Poisoned`] if the WAL has already
470 /// latched a durability failure.
471 pub fn force_fsync(&self) -> Result<(), WalError> {
472 self.check_healthy()?;
473 self.flush_inner(FlushKind::ForceFsync)
474 }
475
476 /// Single source of truth for the flush state machine. Skips the
477 /// `check_healthy` gate so clean shutdown can force a final GroupSync
478 /// sync even if callers are otherwise done with the handle.
479 pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
480 let mut state = self.state.lock().unwrap();
481 let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
482
483 if matches!(kind, FlushKind::ForceFsync) {
484 state.active_writer.flush_and_sync()?;
485 state.durable_lsn = written_lsn;
486 } else {
487 state.active_writer.flush_buffer()?;
488 }
489 Ok(())
490 }
491
492 /// Drop sealed segments whose entire LSN range is at or below
493 /// `fence_lsn`. Idempotent and safe to call repeatedly.
494 ///
495 /// The active segment is never deleted — even if every record in
496 /// it predates the fence, it is still the rotation target for
497 /// new appends. The segment immediately before the active one
498 /// is also kept as a tombstone so a subsequent crash before the
499 /// next checkpoint still finds a self-describing log start.
500 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
501 let mut state = self.state.lock().unwrap();
502 let active_id = state.active_segment_id;
503 let entries = self.segments.list()?;
504
505 let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
506 for (i, entry) in entries.iter().enumerate() {
507 // Active segment and the one immediately preceding it
508 // are kept by policy.
509 if entry.id >= active_id.saturating_prev() {
510 break;
511 }
512 // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
513 // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
514 let next = match entries.get(i + 1) {
515 Some(n) => n,
516 None => break,
517 };
518 let next_base = SegmentDir::base_lsn(&next.path)?;
519 if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
520 to_drop.push(entry.clone());
521 }
522 }
523
524 for entry in to_drop {
525 fs::remove_file(&entry.path)?;
526 if entry.id >= state.oldest_segment_id {
527 state.oldest_segment_id = entry.id.next();
528 }
529 }
530 if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
531 self.segments.sync_dir()?;
532 }
533 Ok(())
534 }
535
536 /// Rotate the active segment when it has grown past
537 /// `segment_target_bytes`. Called from `begin()` so rotation only
538 /// ever lands at a transaction boundary.
539 fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
540 if state.active_writer.bytes_written() < self.segment_target_bytes {
541 return Ok(());
542 }
543 // Seal the current segment (forces a flush + fsync) and open
544 // a fresh one with `base_lsn = next_lsn` so the segment file
545 // names line up with the record LSNs they contain.
546 state.active_writer.seal()?;
547
548 let next_id = state.active_segment_id.next();
549 let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
550 self.segments.sync_dir()?;
551 state.active_writer = writer;
552 state.active_segment_id = next_id;
553 Ok(())
554 }
555}
556
557impl Drop for Wal {
558 fn drop(&mut self) {
559 let _ = self.flush_inner(FlushKind::ForceFsync);
560 // Join the group flusher, if any, before the directory lock is
561 // released. That keeps the "one live append owner" boundary intact
562 // through shutdown.
563 #[cfg(not(target_arch = "wasm32"))]
564 if let Ok(slot) = self.flusher.get_mut() {
565 let _ = slot.take();
566 }
567 }
568}