Skip to main content

selene_persist/
writer.rs

1//! Append-only WAL writer.
2
3use std::fs::{File, OpenOptions};
4use std::io::{BufReader, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use selene_core::{Change, HlcTimestamp, Origin, metrics};
9
10use crate::entry_header::{
11    encode_entry_header, ensure_payload_len, read_entry_header, validate_principal,
12};
13use crate::file_header::{WAL_FILE_HEADER_LEN, WalFileHeader};
14use crate::manifest::Manifest;
15use crate::payload::{WalCompression, encode_changes_with_compression, verify_checksum};
16use crate::retention::{PruneOutcome, RetentionPolicy};
17use crate::snapshot_writer::SnapshotBuilder;
18use crate::writer_rotation::{RotationInputs, WalRotationOutcome, rotate_with_manifest};
19use crate::{PersistError, PersistResult, WalEntryHeader};
20
21/// Conventional v1.0 single-file WAL name used by embedders.
22pub const DEFAULT_WAL_FILE_NAME: &str = "wal.log";
23
24/// WAL fsync scheduling policy.
25#[derive(Clone, Copy, Debug, Eq, PartialEq)]
26pub enum SyncPolicy {
27    /// Flush and fsync after every `N` appended entries, on explicit flush,
28    /// and when the writer is dropped.
29    ///
30    /// `EveryN(1)` is durability-by-default. Values greater than `1` opt into
31    /// group commit. `EveryN(0)` is normalized to `EveryN(1)` on open.
32    EveryN(u32),
33    /// Never fsync during append or drop; only explicit [`WalWriter::flush`]
34    /// fsyncs.
35    ///
36    /// This is an explicit opt-in for benchmark parity and offline paths where
37    /// durability is provided elsewhere. It is not the production default.
38    ///
39    /// # selene-graph forces this for the committer WAL (v1.2 BRIEF 2)
40    ///
41    /// When a [`WalWriter`] is owned by selene-graph's single committer thread
42    /// (via `SharedGraphBuilder::with_wal` / `SharedGraph::from_graph_with_wal` /
43    /// recovery), the committer is the **sole fsync caller**: it appends a
44    /// contiguous run of commits with fsync deferred, then issues exactly one
45    /// [`WalWriter::flush`] per run as the fsync-before-publish barrier. To make
46    /// that the only fsync path, selene-graph **overrides `WalConfig::sync_policy`
47    /// to `OnFlushOnly`** before opening such a WAL, discarding any caller policy
48    /// (the fsync cadence is instead set by `selene_graph::CommitBatching`). A
49    /// `WalWriter` opened directly (outside selene-graph) still honors whatever
50    /// policy the caller passes — the override lives in selene-graph, not here.
51    OnFlushOnly,
52}
53
54impl SyncPolicy {
55    /// Return the `EveryN` threshold when this policy syncs on append.
56    #[must_use]
57    pub const fn as_every_n(self) -> Option<u32> {
58        match self {
59            Self::EveryN(value) => Some(value),
60            Self::OnFlushOnly => None,
61        }
62    }
63
64    const fn normalized(self) -> Self {
65        match self {
66            Self::EveryN(0) => Self::EveryN(1),
67            policy => policy,
68        }
69    }
70
71    const fn syncs_on_drop(self) -> bool {
72        matches!(self, Self::EveryN(_))
73    }
74}
75
76/// WAL writer configuration.
77#[derive(Clone, Copy, Debug, Eq, PartialEq)]
78pub struct WalConfig {
79    /// Flush and fsync schedule.
80    pub sync_policy: SyncPolicy,
81    /// Highest WAL sequence covered by the snapshot this file extends.
82    ///
83    /// Written into the file header on a fresh file, and used to seed
84    /// `last_sequence` so the first appended entry receives sequence
85    /// `snapshot_seq + 1`. On reopen, the on-disk header is the source of
86    /// truth and the config value is ignored — recovery never moves a
87    /// snapshot watermark backward.
88    pub snapshot_seq: u64,
89}
90
91impl Default for WalConfig {
92    fn default() -> Self {
93        Self {
94            sync_policy: SyncPolicy::EveryN(1),
95            snapshot_seq: 0,
96        }
97    }
98}
99
100impl WalConfig {
101    /// Construct a WAL config with the legacy group-commit threshold.
102    #[must_use]
103    pub const fn with_fsync_every_n(fsync_every_n: u32) -> Self {
104        Self {
105            sync_policy: SyncPolicy::EveryN(fsync_every_n),
106            snapshot_seq: 0,
107        }
108    }
109}
110
111/// Single-threaded append-only WAL writer.
112///
113/// Holds an exclusive OS-level file lock on the WAL file for the writer's
114/// lifetime, so a second `WalWriter::open` call on the same path
115/// (in-process or cross-process) fails fast with
116/// [`PersistError::WriterLockHeld`] rather than corrupting the log.
117pub struct WalWriter {
118    file: File,
119    path: PathBuf,
120    last_sequence: u64,
121    snapshot_seq: u64,
122    sync_policy: SyncPolicy,
123    compression: WalCompression,
124    entries_since_fsync: u32,
125    /// File offset of the last fully-committed entry's end. On any
126    /// append-time error, the file is truncated and re-seeked to this
127    /// offset so the writer's in-memory state and the on-disk state stay
128    /// consistent.
129    committed_offset: u64,
130}
131
132impl WalWriter {
133    /// Open a WAL file for append, creating the v2 header for a new file.
134    ///
135    /// Existing files are scanned once to find the last valid entry. A partial
136    /// or checksum-invalid tail is truncated to the last valid offset.
137    ///
138    /// Acquires an exclusive OS-level file lock; a second writer on the
139    /// same path fails immediately with
140    /// [`PersistError::WriterLockHeld`] instead of clobbering the log.
141    ///
142    /// # Errors
143    ///
144    /// Returns I/O, header, sequence, lock, or checksum errors encountered
145    /// while opening and validating the WAL.
146    pub fn open(path: &Path, config: WalConfig) -> PersistResult<Self> {
147        Self::open_with_compression(path, config, WalCompression::default())
148    }
149
150    /// Open a WAL file with an explicit payload compression policy.
151    ///
152    /// This keeps the same file format as [`Self::open`]; only the append-time
153    /// decision to compress each serialized payload changes. Existing readers
154    /// continue to use the per-entry compression flag stored in the header.
155    ///
156    /// # Errors
157    ///
158    /// Returns I/O, header, sequence, lock, or checksum errors encountered
159    /// while opening and validating the WAL.
160    pub fn open_with_compression(
161        path: &Path,
162        config: WalConfig,
163        compression: WalCompression,
164    ) -> PersistResult<Self> {
165        let sync_policy = config.sync_policy.normalized();
166        let mut file = OpenOptions::new()
167            .create(true)
168            .read(true)
169            .write(true)
170            .truncate(false)
171            .open(path)?;
172        // Acquire an exclusive lock before doing anything else. A second
173        // writer on the same path observes WriterLockHeld and returns
174        // without touching the file.
175        match file.try_lock() {
176            Ok(()) => {}
177            Err(std::fs::TryLockError::WouldBlock) => {
178                return Err(PersistError::WriterLockHeld);
179            }
180            Err(std::fs::TryLockError::Error(error)) => return Err(error.into()),
181        }
182        let len = file.metadata()?.len();
183        let header_snapshot_seq = if len == 0 {
184            WalFileHeader::new(config.snapshot_seq).write_to(&mut file)?;
185            file.sync_data()?;
186            config.snapshot_seq
187        } else {
188            file.seek(SeekFrom::Start(0))?;
189            WalFileHeader::read_from(&mut file)?.snapshot_seq
190        };
191
192        let scan = scan_existing(&mut file)?;
193        if scan.truncate_to < file.metadata()?.len() {
194            tracing::warn!(
195                offset = scan.truncate_to,
196                "truncating WAL tail to last valid entry"
197            );
198            file.set_len(scan.truncate_to)?;
199        }
200        file.seek(SeekFrom::Start(scan.truncate_to))?;
201
202        // Seed last_sequence from the larger of (header watermark, last
203        // scanned entry). On a fresh file, scan returns 0 and the
204        // watermark wins. On reopen with entries that already extend past
205        // the snapshot, the entry sequence wins.
206        let last_sequence = scan.last_sequence.max(header_snapshot_seq);
207        Ok(Self {
208            file,
209            path: path.to_path_buf(),
210            last_sequence,
211            snapshot_seq: header_snapshot_seq,
212            sync_policy,
213            compression,
214            entries_since_fsync: 0,
215            committed_offset: scan.truncate_to,
216        })
217    }
218
219    /// Append one WAL entry and return its assigned sequence.
220    ///
221    /// # Errors
222    ///
223    /// Returns codec, cap, compression, or I/O errors. On any error, the
224    /// in-memory sequence counter is **not** advanced and the file is
225    /// truncated back to the last fully-committed entry, so the next
226    /// append (or a reopen + retry) observes a consistent state.
227    #[tracing::instrument(
228        name = "selene.persist.wal.append",
229        skip(self, principal, changes),
230        fields(sequence = self.last_sequence + 1, change_count = changes.len(), has_principal = principal.is_some())
231    )]
232    pub fn append(
233        &mut self,
234        hlc: HlcTimestamp,
235        origin: Origin,
236        principal: Option<Arc<[u8]>>,
237        changes: &[Change],
238    ) -> PersistResult<u64> {
239        validate_principal(principal.as_deref())?;
240        let payload = encode_changes_with_compression(changes, self.compression)?;
241        let sequence = self.last_sequence + 1;
242        let header = WalEntryHeader::new(
243            payload.bytes.len(),
244            payload.checksum_lo,
245            sequence,
246            hlc,
247            origin,
248            payload.flags,
249            principal,
250        )?;
251        let header_bytes = encode_entry_header(&header)?;
252        let pending_count = self.entries_since_fsync.saturating_add(1);
253        let needs_fsync = match self.sync_policy {
254            SyncPolicy::EveryN(threshold) => pending_count >= threshold,
255            SyncPolicy::OnFlushOnly => false,
256        };
257
258        // Single contiguous record. Write it in one syscall via a Vec
259        // assembly so partial writes are easier to reason about.
260        let mut record = Vec::with_capacity(header_bytes.len() + payload.bytes.len());
261        record.extend_from_slice(&header_bytes);
262        record.extend_from_slice(&payload.bytes);
263
264        let result = (|| -> PersistResult<()> {
265            self.file.write_all(&record)?;
266            if needs_fsync {
267                self.file.sync_data()?;
268            }
269            Ok(())
270        })();
271
272        match result {
273            Ok(()) => {
274                let new_offset = self.committed_offset.saturating_add(record.len() as u64);
275                self.committed_offset = new_offset;
276                self.last_sequence = sequence;
277                self.entries_since_fsync = if needs_fsync { 0 } else { pending_count };
278                metrics::counter_inc(metrics::WAL_APPENDS_TOTAL);
279                Ok(sequence)
280            }
281            Err(error) => {
282                self.rollback_to_committed_offset();
283                Err(error)
284            }
285        }
286    }
287
288    /// Flush + fsync without appending. Useful before snapshot publication.
289    ///
290    /// # Errors
291    ///
292    /// Returns I/O errors from fsync.
293    #[tracing::instrument(name = "selene.persist.wal.fsync", skip(self))]
294    pub fn flush(&mut self) -> PersistResult<()> {
295        self.file.sync_data()?;
296        self.entries_since_fsync = 0;
297        Ok(())
298    }
299
300    /// Return the last sequence assigned by this writer.
301    #[must_use]
302    pub const fn last_sequence(&self) -> u64 {
303        self.last_sequence
304    }
305
306    /// Return the durable file offset of the last fully committed WAL entry.
307    #[must_use]
308    pub const fn committed_offset(&self) -> u64 {
309        self.committed_offset
310    }
311
312    /// Return the snapshot sequence stored in this WAL file's header.
313    #[must_use]
314    pub const fn snapshot_seq(&self) -> u64 {
315        self.snapshot_seq
316    }
317
318    /// Return the number of entries appended since the last fsync.
319    #[must_use]
320    pub const fn entries_since_fsync(&self) -> u32 {
321        self.entries_since_fsync
322    }
323
324    /// Crash-safe rotate: finalize `builder`, commit a MANIFEST, then reset.
325    ///
326    /// This is the v1.x replacement for the embedder's two-call
327    /// finalize-then-`rotate` sequence. It runs the 4-phase rotation
328    /// whose MANIFEST write is the single linearization / commit point, so a
329    /// crash at any point either leaves the previous epoch fully recoverable or
330    /// the new epoch fully committed — never the [`PersistError::WalSnapshotMismatch`]
331    /// (Seam F) hard-fail the split calls could produce.
332    ///
333    /// `builder` must target the same sequence as this writer's current
334    /// high-water mark (`builder.sequence() == self.last_sequence()`); the
335    /// builder is finalized as Phase 1, so the caller adds every section before
336    /// calling. The MANIFEST's `archived_wal_seqs` extends the set already named
337    /// by any live MANIFEST in this writer's directory, so retention (Item-5)
338    /// has the full archive history.
339    ///
340    /// A second mutable borrow cannot overlap the rotation:
341    ///
342    /// ```compile_fail
343    /// # use selene_persist::{SnapshotBuilder, SnapshotConfig, WalWriter};
344    /// fn cannot_overlap(writer: &mut WalWriter, builder: SnapshotBuilder) {
345    ///     let active = writer;
346    ///     let _ = writer.rotate_with_manifest(builder);
347    ///     let _ = active.last_sequence();
348    /// }
349    /// ```
350    ///
351    /// # Errors
352    ///
353    /// Returns [`PersistError::WalRotationSequenceMismatch`] when the builder
354    /// sequence does not match the writer high-water mark, I/O / format errors
355    /// from snapshot finalize, archive, MANIFEST commit, or WAL reset, or
356    /// [`PersistError::WalRotationIncomplete`] if the MANIFEST committed but the
357    /// active WAL could not be reset (recovery still converges on the new
358    /// epoch). On error before the MANIFEST commit the previous epoch is intact.
359    pub fn rotate_with_manifest(
360        &mut self,
361        builder: SnapshotBuilder,
362    ) -> PersistResult<WalRotationOutcome> {
363        if builder.sequence() != self.last_sequence {
364            return Err(PersistError::WalRotationSequenceMismatch {
365                snapshot_seq: builder.sequence(),
366                last_sequence: self.last_sequence,
367            });
368        }
369        self.flush()?;
370        let dir = self
371            .path
372            .parent()
373            .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
374        let prior_archived_seqs = Manifest::read(&dir)?
375            .map(|manifest| manifest.archived_wal_seqs)
376            .unwrap_or_default();
377        let inputs = RotationInputs {
378            file: &mut self.file,
379            wal_path: &self.path,
380            committed_offset: self.committed_offset,
381            last_sequence: self.last_sequence,
382            prior_archived_seqs,
383        };
384        let (outcome, state) = rotate_with_manifest(inputs, builder, &dir)?;
385        self.last_sequence = state.last_sequence;
386        self.snapshot_seq = state.snapshot_seq;
387        self.committed_offset = state.committed_offset;
388        self.entries_since_fsync = 0;
389        Ok(outcome)
390    }
391
392    /// Prune superseded snapshots + WAL archives in this writer's directory per
393    /// `policy`, committing through the MANIFEST.
394    ///
395    /// Thin ergonomic wrapper over [`crate::retention::prune`] bound to this
396    /// writer's directory. Pending appends are flushed first so the on-disk
397    /// state the prune reasons about is current, and the `&mut self` receiver
398    /// serializes the prune against [`Self::rotate_with_manifest`] — the two
399    /// must never interleave their MANIFEST rewrites. The prune never touches
400    /// the active WAL this writer owns; it only reclaims snapshot/archive files
401    /// the live epoch no longer needs.
402    ///
403    /// # Errors
404    ///
405    /// Returns flush errors, or any error from [`crate::retention::prune`]
406    /// (directory scan, MANIFEST decode/commit). Post-commit file deletion is
407    /// best-effort and never fails the prune.
408    pub fn prune(&mut self, policy: &RetentionPolicy) -> PersistResult<PruneOutcome> {
409        self.flush()?;
410        let dir = self
411            .path
412            .parent()
413            .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
414        crate::retention::prune(&dir, policy)
415    }
416
417    /// Best-effort rollback to the last committed offset on append failure.
418    /// On rollback failure, the writer is left in a half-consistent state;
419    /// the caller should reopen the WAL (which scan-truncates on open) to
420    /// recover.
421    fn rollback_to_committed_offset(&mut self) {
422        if let Err(error) = self.file.set_len(self.committed_offset) {
423            tracing::error!(%error, "failed to truncate WAL after append error");
424            return;
425        }
426        if let Err(error) = self.file.seek(SeekFrom::Start(self.committed_offset)) {
427            tracing::error!(%error, "failed to seek WAL after append error");
428        }
429    }
430}
431
432impl Drop for WalWriter {
433    fn drop(&mut self) {
434        if self.sync_policy.syncs_on_drop()
435            && let Err(error) = self.file.sync_data()
436        {
437            tracing::error!(%error, "failed to fsync WAL writer on drop");
438        }
439        // The exclusive file lock is released when `file` is dropped.
440    }
441}
442
443#[derive(Clone, Copy, Debug, Eq, PartialEq)]
444struct Scan {
445    last_sequence: u64,
446    truncate_to: u64,
447}
448
449fn scan_existing(file: &mut File) -> PersistResult<Scan> {
450    let file_len = file.metadata()?.len();
451    let mut offset = WAL_FILE_HEADER_LEN as u64;
452    let mut previous = 0_u64;
453    let mut last_valid_offset = offset;
454    let mut payload = Vec::new();
455    file.seek(SeekFrom::Start(offset))?;
456    let mut reader = BufReader::with_capacity(64 * 1024, file);
457
458    loop {
459        if offset == file_len {
460            return Ok(Scan {
461                last_sequence: previous,
462                truncate_to: last_valid_offset,
463            });
464        }
465        if offset > file_len {
466            return Ok(Scan {
467                last_sequence: previous,
468                truncate_to: last_valid_offset,
469            });
470        }
471
472        let (header, bytes_consumed) = match read_entry_header(&mut reader, offset) {
473            Ok(header) => header,
474            // Treat oversized decoded headers as torn-tail too: with the
475            // fixed-layout v2 format, garbage bytes can decode as a valid
476            // u32 payload_len > MAX_WAL_ENTRY_BYTES or u16 principal_len > cap.
477            // Pre-v2 (postcard varint), oversized lengths surfaced as
478            // HeaderCodec; v2 surfaces them as typed cap errors that must be
479            // routed through the same recovery path.
480            Err(PersistError::TruncatedEntry { .. })
481            | Err(PersistError::HeaderCodec(_))
482            | Err(PersistError::PayloadTooLarge { .. })
483            | Err(PersistError::PrincipalTooLarge { .. }) => {
484                return Ok(Scan {
485                    last_sequence: previous,
486                    truncate_to: last_valid_offset,
487                });
488            }
489            Err(error) => return Err(error),
490        };
491
492        if header.sequence <= previous {
493            return Err(PersistError::NonMonotonicSequence {
494                previous,
495                current: header.sequence,
496            });
497        }
498        let payload_len = header.payload_len as usize;
499        if ensure_payload_len(payload_len).is_err() {
500            return Ok(Scan {
501                last_sequence: previous,
502                truncate_to: last_valid_offset,
503            });
504        }
505        let payload_start = offset.saturating_add(bytes_consumed as u64);
506        let payload_end = payload_start.saturating_add(u64::from(header.payload_len));
507        if payload_end > file_len {
508            return Ok(Scan {
509                last_sequence: previous,
510                truncate_to: last_valid_offset,
511            });
512        }
513        payload.resize(payload_len, 0);
514        reader.read_exact(&mut payload)?;
515        if verify_checksum(&header, &payload).is_err() {
516            return Ok(Scan {
517                last_sequence: previous,
518                truncate_to: last_valid_offset,
519            });
520        }
521        previous = header.sequence;
522        offset = payload_end;
523        last_valid_offset = payload_end;
524    }
525}
526
527#[cfg(test)]
528mod tests;