selene_persist/writer.rs
1//! Append-only WAL writer.
2
3use std::fs::{File, OpenOptions};
4use std::io::{BufReader, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use selene_core::{Change, HlcTimestamp, Origin, metrics};
9
10use crate::entry_header::{
11 encode_entry_header, ensure_payload_len, read_entry_header, validate_principal,
12};
13use crate::file_header::{WAL_FILE_HEADER_LEN, WalFileHeader};
14use crate::manifest::Manifest;
15use crate::payload::{WalCompression, encode_changes_with_compression, verify_checksum};
16use crate::retention::{PruneOutcome, RetentionPolicy};
17use crate::snapshot_writer::SnapshotBuilder;
18use crate::writer_rotation::{RotationInputs, WalRotationOutcome, rotate_with_manifest};
19use crate::{PersistError, PersistResult, WalEntryHeader};
20
21/// Conventional v1.0 single-file WAL name used by embedders.
22pub const DEFAULT_WAL_FILE_NAME: &str = "wal.log";
23
24/// WAL fsync scheduling policy.
25#[derive(Clone, Copy, Debug, Eq, PartialEq)]
26pub enum SyncPolicy {
27 /// Flush and fsync after every `N` appended entries, on explicit flush,
28 /// and when the writer is dropped.
29 ///
30 /// `EveryN(1)` is durability-by-default. Values greater than `1` opt into
31 /// group commit. `EveryN(0)` is normalized to `EveryN(1)` on open.
32 EveryN(u32),
33 /// Never fsync during append or drop; only explicit [`WalWriter::flush`]
34 /// fsyncs.
35 ///
36 /// This is an explicit opt-in for benchmark parity and offline paths where
37 /// durability is provided elsewhere. It is not the production default.
38 ///
39 /// # selene-graph forces this for the committer WAL (v1.2 BRIEF 2)
40 ///
41 /// When a [`WalWriter`] is owned by selene-graph's single committer thread
42 /// (via `SharedGraphBuilder::with_wal` / `SharedGraph::from_graph_with_wal` /
43 /// recovery), the committer is the **sole fsync caller**: it appends a
44 /// contiguous run of commits with fsync deferred, then issues exactly one
45 /// [`WalWriter::flush`] per run as the fsync-before-publish barrier. To make
46 /// that the only fsync path, selene-graph **overrides `WalConfig::sync_policy`
47 /// to `OnFlushOnly`** before opening such a WAL, discarding any caller policy
48 /// (the fsync cadence is instead set by `selene_graph::CommitBatching`). A
49 /// `WalWriter` opened directly (outside selene-graph) still honors whatever
50 /// policy the caller passes — the override lives in selene-graph, not here.
51 OnFlushOnly,
52}
53
54impl SyncPolicy {
55 /// Return the `EveryN` threshold when this policy syncs on append.
56 #[must_use]
57 pub const fn as_every_n(self) -> Option<u32> {
58 match self {
59 Self::EveryN(value) => Some(value),
60 Self::OnFlushOnly => None,
61 }
62 }
63
64 const fn normalized(self) -> Self {
65 match self {
66 Self::EveryN(0) => Self::EveryN(1),
67 policy => policy,
68 }
69 }
70
71 const fn syncs_on_drop(self) -> bool {
72 matches!(self, Self::EveryN(_))
73 }
74}
75
76/// WAL writer configuration.
77#[derive(Clone, Copy, Debug, Eq, PartialEq)]
78pub struct WalConfig {
79 /// Flush and fsync schedule.
80 pub sync_policy: SyncPolicy,
81 /// Highest WAL sequence covered by the snapshot this file extends.
82 ///
83 /// Written into the file header on a fresh file, and used to seed
84 /// `last_sequence` so the first appended entry receives sequence
85 /// `snapshot_seq + 1`. On reopen, the on-disk header is the source of
86 /// truth and the config value is ignored — recovery never moves a
87 /// snapshot watermark backward.
88 pub snapshot_seq: u64,
89}
90
91impl Default for WalConfig {
92 fn default() -> Self {
93 Self {
94 sync_policy: SyncPolicy::EveryN(1),
95 snapshot_seq: 0,
96 }
97 }
98}
99
100impl WalConfig {
101 /// Construct a WAL config with the legacy group-commit threshold.
102 #[must_use]
103 pub const fn with_fsync_every_n(fsync_every_n: u32) -> Self {
104 Self {
105 sync_policy: SyncPolicy::EveryN(fsync_every_n),
106 snapshot_seq: 0,
107 }
108 }
109}
110
111/// Single-threaded append-only WAL writer.
112///
113/// Holds an exclusive OS-level file lock on the WAL file for the writer's
114/// lifetime, so a second `WalWriter::open` call on the same path
115/// (in-process or cross-process) fails fast with
116/// [`PersistError::WriterLockHeld`] rather than corrupting the log.
117pub struct WalWriter {
118 file: File,
119 path: PathBuf,
120 last_sequence: u64,
121 snapshot_seq: u64,
122 sync_policy: SyncPolicy,
123 compression: WalCompression,
124 entries_since_fsync: u32,
125 /// File offset of the last fully-committed entry's end. On any
126 /// append-time error, the file is truncated and re-seeked to this
127 /// offset so the writer's in-memory state and the on-disk state stay
128 /// consistent.
129 committed_offset: u64,
130}
131
132impl WalWriter {
133 /// Open a WAL file for append, creating the v2 header for a new file.
134 ///
135 /// Existing files are scanned once to find the last valid entry. A partial
136 /// or checksum-invalid tail is truncated to the last valid offset.
137 ///
138 /// Acquires an exclusive OS-level file lock; a second writer on the
139 /// same path fails immediately with
140 /// [`PersistError::WriterLockHeld`] instead of clobbering the log.
141 ///
142 /// # Errors
143 ///
144 /// Returns I/O, header, sequence, lock, or checksum errors encountered
145 /// while opening and validating the WAL.
146 pub fn open(path: &Path, config: WalConfig) -> PersistResult<Self> {
147 Self::open_with_compression(path, config, WalCompression::default())
148 }
149
150 /// Open a WAL file with an explicit payload compression policy.
151 ///
152 /// This keeps the same file format as [`Self::open`]; only the append-time
153 /// decision to compress each serialized payload changes. Existing readers
154 /// continue to use the per-entry compression flag stored in the header.
155 ///
156 /// # Errors
157 ///
158 /// Returns I/O, header, sequence, lock, or checksum errors encountered
159 /// while opening and validating the WAL.
160 pub fn open_with_compression(
161 path: &Path,
162 config: WalConfig,
163 compression: WalCompression,
164 ) -> PersistResult<Self> {
165 let sync_policy = config.sync_policy.normalized();
166 let mut file = OpenOptions::new()
167 .create(true)
168 .read(true)
169 .write(true)
170 .truncate(false)
171 .open(path)?;
172 // Acquire an exclusive lock before doing anything else. A second
173 // writer on the same path observes WriterLockHeld and returns
174 // without touching the file.
175 match file.try_lock() {
176 Ok(()) => {}
177 Err(std::fs::TryLockError::WouldBlock) => {
178 return Err(PersistError::WriterLockHeld);
179 }
180 Err(std::fs::TryLockError::Error(error)) => return Err(error.into()),
181 }
182 let len = file.metadata()?.len();
183 let header_snapshot_seq = if len == 0 {
184 WalFileHeader::new(config.snapshot_seq).write_to(&mut file)?;
185 file.sync_data()?;
186 config.snapshot_seq
187 } else {
188 file.seek(SeekFrom::Start(0))?;
189 WalFileHeader::read_from(&mut file)?.snapshot_seq
190 };
191
192 let scan = scan_existing(&mut file)?;
193 if scan.truncate_to < file.metadata()?.len() {
194 tracing::warn!(
195 offset = scan.truncate_to,
196 "truncating WAL tail to last valid entry"
197 );
198 file.set_len(scan.truncate_to)?;
199 }
200 file.seek(SeekFrom::Start(scan.truncate_to))?;
201
202 // Seed last_sequence from the larger of (header watermark, last
203 // scanned entry). On a fresh file, scan returns 0 and the
204 // watermark wins. On reopen with entries that already extend past
205 // the snapshot, the entry sequence wins.
206 let last_sequence = scan.last_sequence.max(header_snapshot_seq);
207 Ok(Self {
208 file,
209 path: path.to_path_buf(),
210 last_sequence,
211 snapshot_seq: header_snapshot_seq,
212 sync_policy,
213 compression,
214 entries_since_fsync: 0,
215 committed_offset: scan.truncate_to,
216 })
217 }
218
219 /// Append one WAL entry and return its assigned sequence.
220 ///
221 /// # Errors
222 ///
223 /// Returns codec, cap, compression, or I/O errors. On any error, the
224 /// in-memory sequence counter is **not** advanced and the file is
225 /// truncated back to the last fully-committed entry, so the next
226 /// append (or a reopen + retry) observes a consistent state.
227 #[tracing::instrument(
228 name = "selene.persist.wal.append",
229 skip(self, principal, changes),
230 fields(sequence = self.last_sequence + 1, change_count = changes.len(), has_principal = principal.is_some())
231 )]
232 pub fn append(
233 &mut self,
234 hlc: HlcTimestamp,
235 origin: Origin,
236 principal: Option<Arc<[u8]>>,
237 changes: &[Change],
238 ) -> PersistResult<u64> {
239 validate_principal(principal.as_deref())?;
240 let payload = encode_changes_with_compression(changes, self.compression)?;
241 let sequence = self.last_sequence + 1;
242 let header = WalEntryHeader::new(
243 payload.bytes.len(),
244 payload.checksum_lo,
245 sequence,
246 hlc,
247 origin,
248 payload.flags,
249 principal,
250 )?;
251 let header_bytes = encode_entry_header(&header)?;
252 let pending_count = self.entries_since_fsync.saturating_add(1);
253 let needs_fsync = match self.sync_policy {
254 SyncPolicy::EveryN(threshold) => pending_count >= threshold,
255 SyncPolicy::OnFlushOnly => false,
256 };
257
258 // Single contiguous record. Write it in one syscall via a Vec
259 // assembly so partial writes are easier to reason about.
260 let mut record = Vec::with_capacity(header_bytes.len() + payload.bytes.len());
261 record.extend_from_slice(&header_bytes);
262 record.extend_from_slice(&payload.bytes);
263
264 let result = (|| -> PersistResult<()> {
265 self.file.write_all(&record)?;
266 if needs_fsync {
267 self.file.sync_data()?;
268 }
269 Ok(())
270 })();
271
272 match result {
273 Ok(()) => {
274 let new_offset = self.committed_offset.saturating_add(record.len() as u64);
275 self.committed_offset = new_offset;
276 self.last_sequence = sequence;
277 self.entries_since_fsync = if needs_fsync { 0 } else { pending_count };
278 metrics::counter_inc(metrics::WAL_APPENDS_TOTAL);
279 Ok(sequence)
280 }
281 Err(error) => {
282 self.rollback_to_committed_offset();
283 Err(error)
284 }
285 }
286 }
287
288 /// Flush + fsync without appending. Useful before snapshot publication.
289 ///
290 /// # Errors
291 ///
292 /// Returns I/O errors from fsync.
293 #[tracing::instrument(name = "selene.persist.wal.fsync", skip(self))]
294 pub fn flush(&mut self) -> PersistResult<()> {
295 self.file.sync_data()?;
296 self.entries_since_fsync = 0;
297 Ok(())
298 }
299
300 /// Return the last sequence assigned by this writer.
301 #[must_use]
302 pub const fn last_sequence(&self) -> u64 {
303 self.last_sequence
304 }
305
306 /// Return the durable file offset of the last fully committed WAL entry.
307 #[must_use]
308 pub const fn committed_offset(&self) -> u64 {
309 self.committed_offset
310 }
311
312 /// Return the snapshot sequence stored in this WAL file's header.
313 #[must_use]
314 pub const fn snapshot_seq(&self) -> u64 {
315 self.snapshot_seq
316 }
317
318 /// Return the number of entries appended since the last fsync.
319 #[must_use]
320 pub const fn entries_since_fsync(&self) -> u32 {
321 self.entries_since_fsync
322 }
323
324 /// Crash-safe rotate: finalize `builder`, commit a MANIFEST, then reset.
325 ///
326 /// This is the v1.x replacement for the embedder's two-call
327 /// finalize-then-`rotate` sequence. It runs the 4-phase rotation
328 /// whose MANIFEST write is the single linearization / commit point, so a
329 /// crash at any point either leaves the previous epoch fully recoverable or
330 /// the new epoch fully committed — never the [`PersistError::WalSnapshotMismatch`]
331 /// (Seam F) hard-fail the split calls could produce.
332 ///
333 /// `builder` must target the same sequence as this writer's current
334 /// high-water mark (`builder.sequence() == self.last_sequence()`); the
335 /// builder is finalized as Phase 1, so the caller adds every section before
336 /// calling. The MANIFEST's `archived_wal_seqs` extends the set already named
337 /// by any live MANIFEST in this writer's directory, so retention (Item-5)
338 /// has the full archive history.
339 ///
340 /// A second mutable borrow cannot overlap the rotation:
341 ///
342 /// ```compile_fail
343 /// # use selene_persist::{SnapshotBuilder, SnapshotConfig, WalWriter};
344 /// fn cannot_overlap(writer: &mut WalWriter, builder: SnapshotBuilder) {
345 /// let active = writer;
346 /// let _ = writer.rotate_with_manifest(builder);
347 /// let _ = active.last_sequence();
348 /// }
349 /// ```
350 ///
351 /// # Errors
352 ///
353 /// Returns [`PersistError::WalRotationSequenceMismatch`] when the builder
354 /// sequence does not match the writer high-water mark, I/O / format errors
355 /// from snapshot finalize, archive, MANIFEST commit, or WAL reset, or
356 /// [`PersistError::WalRotationIncomplete`] if the MANIFEST committed but the
357 /// active WAL could not be reset (recovery still converges on the new
358 /// epoch). On error before the MANIFEST commit the previous epoch is intact.
359 pub fn rotate_with_manifest(
360 &mut self,
361 builder: SnapshotBuilder,
362 ) -> PersistResult<WalRotationOutcome> {
363 if builder.sequence() != self.last_sequence {
364 return Err(PersistError::WalRotationSequenceMismatch {
365 snapshot_seq: builder.sequence(),
366 last_sequence: self.last_sequence,
367 });
368 }
369 self.flush()?;
370 let dir = self
371 .path
372 .parent()
373 .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
374 let prior_archived_seqs = Manifest::read(&dir)?
375 .map(|manifest| manifest.archived_wal_seqs)
376 .unwrap_or_default();
377 let inputs = RotationInputs {
378 file: &mut self.file,
379 wal_path: &self.path,
380 committed_offset: self.committed_offset,
381 last_sequence: self.last_sequence,
382 prior_archived_seqs,
383 };
384 let (outcome, state) = rotate_with_manifest(inputs, builder, &dir)?;
385 self.last_sequence = state.last_sequence;
386 self.snapshot_seq = state.snapshot_seq;
387 self.committed_offset = state.committed_offset;
388 self.entries_since_fsync = 0;
389 Ok(outcome)
390 }
391
392 /// Prune superseded snapshots + WAL archives in this writer's directory per
393 /// `policy`, committing through the MANIFEST.
394 ///
395 /// Thin ergonomic wrapper over [`crate::retention::prune`] bound to this
396 /// writer's directory. Pending appends are flushed first so the on-disk
397 /// state the prune reasons about is current, and the `&mut self` receiver
398 /// serializes the prune against [`Self::rotate_with_manifest`] — the two
399 /// must never interleave their MANIFEST rewrites. The prune never touches
400 /// the active WAL this writer owns; it only reclaims snapshot/archive files
401 /// the live epoch no longer needs.
402 ///
403 /// # Errors
404 ///
405 /// Returns flush errors, or any error from [`crate::retention::prune`]
406 /// (directory scan, MANIFEST decode/commit). Post-commit file deletion is
407 /// best-effort and never fails the prune.
408 pub fn prune(&mut self, policy: &RetentionPolicy) -> PersistResult<PruneOutcome> {
409 self.flush()?;
410 let dir = self
411 .path
412 .parent()
413 .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
414 crate::retention::prune(&dir, policy)
415 }
416
417 /// Best-effort rollback to the last committed offset on append failure.
418 /// On rollback failure, the writer is left in a half-consistent state;
419 /// the caller should reopen the WAL (which scan-truncates on open) to
420 /// recover.
421 fn rollback_to_committed_offset(&mut self) {
422 if let Err(error) = self.file.set_len(self.committed_offset) {
423 tracing::error!(%error, "failed to truncate WAL after append error");
424 return;
425 }
426 if let Err(error) = self.file.seek(SeekFrom::Start(self.committed_offset)) {
427 tracing::error!(%error, "failed to seek WAL after append error");
428 }
429 }
430}
431
432impl Drop for WalWriter {
433 fn drop(&mut self) {
434 if self.sync_policy.syncs_on_drop()
435 && let Err(error) = self.file.sync_data()
436 {
437 tracing::error!(%error, "failed to fsync WAL writer on drop");
438 }
439 // The exclusive file lock is released when `file` is dropped.
440 }
441}
442
443#[derive(Clone, Copy, Debug, Eq, PartialEq)]
444struct Scan {
445 last_sequence: u64,
446 truncate_to: u64,
447}
448
449fn scan_existing(file: &mut File) -> PersistResult<Scan> {
450 let file_len = file.metadata()?.len();
451 let mut offset = WAL_FILE_HEADER_LEN as u64;
452 let mut previous = 0_u64;
453 let mut last_valid_offset = offset;
454 let mut payload = Vec::new();
455 file.seek(SeekFrom::Start(offset))?;
456 let mut reader = BufReader::with_capacity(64 * 1024, file);
457
458 loop {
459 if offset == file_len {
460 return Ok(Scan {
461 last_sequence: previous,
462 truncate_to: last_valid_offset,
463 });
464 }
465 if offset > file_len {
466 return Ok(Scan {
467 last_sequence: previous,
468 truncate_to: last_valid_offset,
469 });
470 }
471
472 let (header, bytes_consumed) = match read_entry_header(&mut reader, offset) {
473 Ok(header) => header,
474 // Treat oversized decoded headers as torn-tail too: with the
475 // fixed-layout v2 format, garbage bytes can decode as a valid
476 // u32 payload_len > MAX_WAL_ENTRY_BYTES or u16 principal_len > cap.
477 // Pre-v2 (postcard varint), oversized lengths surfaced as
478 // HeaderCodec; v2 surfaces them as typed cap errors that must be
479 // routed through the same recovery path.
480 Err(PersistError::TruncatedEntry { .. })
481 | Err(PersistError::HeaderCodec(_))
482 | Err(PersistError::PayloadTooLarge { .. })
483 | Err(PersistError::PrincipalTooLarge { .. }) => {
484 return Ok(Scan {
485 last_sequence: previous,
486 truncate_to: last_valid_offset,
487 });
488 }
489 Err(error) => return Err(error),
490 };
491
492 if header.sequence <= previous {
493 return Err(PersistError::NonMonotonicSequence {
494 previous,
495 current: header.sequence,
496 });
497 }
498 let payload_len = header.payload_len as usize;
499 if ensure_payload_len(payload_len).is_err() {
500 return Ok(Scan {
501 last_sequence: previous,
502 truncate_to: last_valid_offset,
503 });
504 }
505 let payload_start = offset.saturating_add(bytes_consumed as u64);
506 let payload_end = payload_start.saturating_add(u64::from(header.payload_len));
507 if payload_end > file_len {
508 return Ok(Scan {
509 last_sequence: previous,
510 truncate_to: last_valid_offset,
511 });
512 }
513 payload.resize(payload_len, 0);
514 reader.read_exact(&mut payload)?;
515 if verify_checksum(&header, &payload).is_err() {
516 return Ok(Scan {
517 last_sequence: previous,
518 truncate_to: last_valid_offset,
519 });
520 }
521 previous = header.sequence;
522 offset = payload_end;
523 last_valid_offset = payload_end;
524 }
525}
526
527#[cfg(test)]
528mod tests;