wal_db/wal.rs
1//! The log itself: [`Wal`], its recovery iterator [`WalIter`], and the
2//! [`Record`] iteration yields.
3
4use std::{fmt, io, sync::atomic::Ordering};
5
6#[cfg(not(loom))]
7use std::{cell::RefCell, path::Path};
8
9use crate::{
10 commit::Commit,
11 config::{RecoveryPolicy, WalConfig},
12 error::{Result, WalError},
13 lsn::Lsn,
14 record::{self, HEADER_LEN},
15 store::{FileStore, WalStore},
16 sync::AtomicU64,
17};
18
19/// A cache-line-aligned wrapper, used to keep the heavily-written reservation
20/// counter off the same cache line as the rest of the log's fields so appenders
21/// hammering it do not invalidate readers' caches (false sharing).
22#[repr(align(64))]
23#[derive(Debug)]
24struct CacheAligned<T>(T);
25
26/// A durable, append-only log.
27///
28/// `Wal` is the entry point. The four calls that cover almost every use are
29/// [`open`](Wal::open), [`append`](Wal::append), [`sync`](Wal::sync), and
30/// [`iter`](Wal::iter). The type parameter `S` is the storage backend and
31/// defaults to [`FileStore`], so the plain name `Wal` is the file-backed log;
32/// custom backends are supplied through [`with_store`](Wal::with_store).
33///
34/// A `Wal` is [`Send`] and [`Sync`], and the append path is built for it: many
35/// threads can call [`append`](Wal::append) at once with no global lock. Share
36/// one behind an [`Arc`](std::sync::Arc) and write from every thread.
37///
38/// # Concurrency and durability
39///
40/// Appends are lock-free. Each one reserves its byte range with a single atomic
41/// step — the range's start offset *is* the record's [`Lsn`] — frames the record
42/// into a reused thread-local buffer, and writes it, all without blocking other
43/// appenders. [`sync`](Wal::sync) is the durability barrier; when several
44/// threads sync at once they coalesce into a single fsync (group commit), so the
45/// cost of making data durable is amortised across everyone committing together.
46///
47/// `append` returns once the record is in the OS page cache; `sync` returns once
48/// it is on stable storage. See the [crate docs](crate) for the full contract.
49///
50/// # Examples
51///
52/// ```
53/// use wal_db::Wal;
54///
55/// # fn main() -> Result<(), wal_db::WalError> {
56/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
57/// # let path = dir.path().join("log.wal");
58/// let wal = Wal::open(&path)?;
59/// let first = wal.append(b"first")?;
60/// let second = wal.append(b"second")?;
61/// wal.sync()?;
62///
63/// // LSNs are byte offsets: the first record starts at 0, the second after it.
64/// assert_eq!(first.get(), 0);
65/// assert!(second.get() > first.get());
66///
67/// let read_back: Vec<Vec<u8>> = wal
68/// .iter()?
69/// .map(|entry| entry.map(|record| record.into_data()))
70/// .collect::<Result<_, _>>()?;
71/// assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);
72/// # Ok(())
73/// # }
74/// ```
75pub struct Wal<S = FileStore> {
76 /// Next byte offset to reserve. Hammered by every appender, so kept on its
77 /// own cache line.
78 tail: CacheAligned<AtomicU64>,
79 store: S,
80 max_record_size: u32,
81 recovery_policy: RecoveryPolicy,
82 commit: Commit,
83}
84
85#[cfg(not(loom))]
86impl Wal<FileStore> {
87 /// Open the log at `path`, creating it if it does not exist.
88 ///
89 /// On open the log scans its contents, stops at the first record that is
90 /// incomplete or fails its checksum, and truncates that torn tail so the
91 /// next append lands on a clean boundary. The common cause of a torn tail is
92 /// a crash partway through an earlier append; that record was never
93 /// acknowledged durable, so discarding it loses nothing the caller was
94 /// promised.
95 ///
96 /// # Errors
97 ///
98 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
99 ///
100 /// # Examples
101 ///
102 /// ```
103 /// use wal_db::Wal;
104 /// # fn main() -> Result<(), wal_db::WalError> {
105 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
106 /// # let path = dir.path().join("log.wal");
107 /// let wal = Wal::open(&path)?;
108 /// wal.append(b"hello")?;
109 /// wal.sync()?;
110 /// # Ok(())
111 /// # }
112 /// ```
113 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
114 Self::open_with(path, WalConfig::new())
115 }
116
117 /// Open the log at `path` with an explicit [`WalConfig`].
118 ///
119 /// # Errors
120 ///
121 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use wal_db::{Wal, WalConfig};
127 /// # fn main() -> Result<(), wal_db::WalError> {
128 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
129 /// # let path = dir.path().join("log.wal");
130 /// let config = WalConfig::new().with_max_record_size(1024);
131 /// let wal = Wal::open_with(&path, config)?;
132 /// # let _ = wal;
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
137 let store = FileStore::open(path)?;
138 Self::with_store_and_config(store, config)
139 }
140}
141
142#[cfg(not(loom))]
143impl Wal<crate::segment::SegmentedStore> {
144 /// Open a segmented log in directory `dir`, with segments of `segment_size`
145 /// bytes, creating the directory if needed.
146 ///
147 /// The log is one continuous byte stream striped across fixed-size files, so
148 /// it behaves exactly like a single-file log — records span segment
149 /// boundaries freely — while keeping each file bounded for recovery and
150 /// archival. Records larger than a segment simply occupy several.
151 ///
152 /// # Errors
153 ///
154 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
155 /// be opened or scanned.
156 ///
157 /// # Examples
158 ///
159 /// ```
160 /// use wal_db::Wal;
161 /// # fn main() -> Result<(), wal_db::WalError> {
162 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
163 /// let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
164 /// wal.append(b"record")?;
165 /// wal.sync()?;
166 /// # Ok(())
167 /// # }
168 /// ```
169 pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
170 Self::open_segmented_with(dir, segment_size, WalConfig::new())
171 }
172
173 /// Open a segmented log with an explicit [`WalConfig`].
174 ///
175 /// Like [`open_segmented`](Wal::open_segmented), but applies `config` (for
176 /// example a tighter [`max_record_size`](WalConfig::max_record_size)).
177 ///
178 /// # Errors
179 ///
180 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
181 /// be opened or scanned.
182 pub fn open_segmented_with(
183 dir: impl AsRef<Path>,
184 segment_size: u64,
185 config: WalConfig,
186 ) -> Result<Self> {
187 let store = crate::segment::SegmentedStore::open(dir, segment_size)?;
188 Self::with_store_and_config(store, config)
189 }
190}
191
192impl<S: WalStore> Wal<S> {
193 /// Build a log over a custom [`WalStore`], using the default configuration.
194 ///
195 /// # Errors
196 ///
197 /// Returns an error if scanning the existing contents of the store fails.
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// use wal_db::{MemStore, Wal};
203 /// # fn main() -> Result<(), wal_db::WalError> {
204 /// let wal = Wal::with_store(MemStore::new())?;
205 /// wal.append(b"record")?;
206 /// # Ok(())
207 /// # }
208 /// ```
209 pub fn with_store(store: S) -> Result<Self> {
210 Self::with_store_and_config(store, WalConfig::new())
211 }
212
213 /// Build a log over a custom [`WalStore`] with an explicit [`WalConfig`].
214 ///
215 /// # Errors
216 ///
217 /// Returns an error if scanning the existing contents of the store fails.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// use wal_db::{MemStore, Wal, WalConfig};
223 /// # fn main() -> Result<(), wal_db::WalError> {
224 /// let config = WalConfig::new().with_max_record_size(64);
225 /// let wal = Wal::with_store_and_config(MemStore::new(), config)?;
226 /// # let _ = wal;
227 /// # Ok(())
228 /// # }
229 /// ```
230 pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self> {
231 let recovered = recover(&store, config.max_record_size())?;
232 Ok(Wal {
233 tail: CacheAligned(AtomicU64::new(recovered)),
234 store,
235 max_record_size: config.max_record_size(),
236 recovery_policy: config.recovery_policy(),
237 commit: Commit::new(recovered),
238 })
239 }
240
241 /// Append `record` to the log and return the [`Lsn`] it was assigned — the
242 /// byte offset where the record begins.
243 ///
244 /// Lock-free: the byte range is reserved with one atomic step and the record
245 /// is written without blocking other appenders. Returns once the bytes are
246 /// in the operating system's page cache. It does **not** flush the disk —
247 /// call [`sync`](Wal::sync) for that. A crash between `append` and `sync` may
248 /// lose the record.
249 ///
250 /// # Errors
251 ///
252 /// - [`WalError::RecordTooLarge`] if `record` is larger than the configured
253 /// [`max_record_size`](WalConfig::max_record_size). The log is unchanged.
254 /// - [`WalError::Io`] if the write fails. The reserved range becomes a
255 /// permanent gap: the log is durable only up to that point, recovery stops
256 /// there, and later syncs covering it report the truncation.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use wal_db::{MemStore, Wal};
262 /// # fn main() -> Result<(), wal_db::WalError> {
263 /// let wal = Wal::with_store(MemStore::new())?;
264 /// let lsn = wal.append(b"some bytes")?;
265 /// assert_eq!(lsn.get(), 0);
266 /// # Ok(())
267 /// # }
268 /// ```
269 pub fn append(&self, record: &[u8]) -> Result<Lsn> {
270 let payload_len = record.len();
271 if payload_len > self.max_record_size as usize {
272 return Err(WalError::RecordTooLarge {
273 len: payload_len,
274 max: self.max_record_size,
275 });
276 }
277 let frame_len = record::framed_len(payload_len) as u64;
278
279 // Reserve the byte range. The returned start offset is the LSN, and
280 // because it comes from a single atomic it is unique and ordered.
281 let start = self.tail.0.fetch_add(frame_len, Ordering::Relaxed);
282 let end = match start.checked_add(frame_len) {
283 Some(end) => end,
284 None => {
285 self.commit.mark_failed(start);
286 return Err(WalError::io(
287 "reserving a record offset",
288 io::Error::other("log size exceeds u64"),
289 ));
290 }
291 };
292
293 match self.frame_and_write(start, record) {
294 Ok(()) => {
295 self.commit.mark_written(start, end);
296 Ok(Lsn::new(start))
297 }
298 Err(error) => {
299 self.commit.mark_failed(start);
300 Err(error)
301 }
302 }
303 }
304
305 /// Make every record appended before this call durable.
306 ///
307 /// Returns once the data is on stable storage, using the platform's true
308 /// durability barrier. Concurrent calls coalesce into a single fsync, so the
309 /// flush cost is shared by everyone committing at the same time.
310 ///
311 /// # Errors
312 ///
313 /// Returns [`WalError::Io`] if the flush fails, or [`WalError::Corruption`]
314 /// if an earlier append's write failed and left a gap that cannot be made
315 /// durable. A failed sync means the records are not durable; treat it as
316 /// fatal, not as something to retry blindly.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use wal_db::Wal;
322 /// # fn main() -> Result<(), wal_db::WalError> {
323 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
324 /// # let path = dir.path().join("log.wal");
325 /// let wal = Wal::open(&path)?;
326 /// wal.append(b"durable me")?;
327 /// wal.sync()?; // now on stable storage
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub fn sync(&self) -> Result<()> {
332 let target = self.tail.0.load(Ordering::Acquire);
333 if target == 0 {
334 return Ok(());
335 }
336 self.commit.sync_to(&self.store, target)
337 }
338
339 /// Append `record` and make it durable in one call, returning its [`Lsn`].
340 ///
341 /// Equivalent to [`append`](Wal::append) followed by a [`sync`](Wal::sync)
342 /// scoped to this record, but with the sync coalesced into the group commit
343 /// of any other threads syncing at the same moment. Use it when every record
344 /// must be durable before you proceed and you want the group-commit
345 /// throughput without managing the two calls yourself.
346 ///
347 /// # Errors
348 ///
349 /// The union of [`append`](Wal::append)'s and [`sync`](Wal::sync)'s errors.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use wal_db::Wal;
355 /// # fn main() -> Result<(), wal_db::WalError> {
356 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
357 /// # let path = dir.path().join("log.wal");
358 /// let wal = Wal::open(&path)?;
359 /// let lsn = wal.append_and_sync(b"committed immediately")?;
360 /// # let _ = lsn;
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn> {
365 let lsn = self.append(record)?;
366 let end = lsn.get() + record::framed_len(record.len()) as u64;
367 self.commit.sync_to(&self.store, end)?;
368 Ok(lsn)
369 }
370
371 /// Serialise `value` with `pack-io` and append it, returning its [`Lsn`].
372 ///
373 /// The typed counterpart to [`append`](Wal::append): the value is encoded to
374 /// bytes and appended as one record, which [`Record::decode`] reads back.
375 /// Available with the `pack-io` feature. Like `append`, it does not sync.
376 ///
377 /// # Errors
378 ///
379 /// - [`WalError::Encoding`] if the value fails to serialise.
380 /// - Otherwise the errors of [`append`](Wal::append) ([`WalError::RecordTooLarge`],
381 /// [`WalError::Io`]).
382 ///
383 /// # Examples
384 ///
385 /// ```
386 /// use wal_db::{MemStore, Wal};
387 /// use wal_db::pack_io::{Deserialize, Serialize};
388 ///
389 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
390 /// struct Entry {
391 /// key: String,
392 /// value: u64,
393 /// }
394 ///
395 /// # fn main() -> Result<(), wal_db::WalError> {
396 /// let wal = Wal::with_store(MemStore::new())?;
397 /// wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
398 ///
399 /// let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
400 /// assert_eq!(entry.value, 100);
401 /// # Ok(())
402 /// # }
403 /// ```
404 #[cfg(feature = "pack-io")]
405 pub fn append_typed<T: pack_io::Serialize + ?Sized>(&self, value: &T) -> Result<Lsn> {
406 let bytes = pack_io::encode(value).map_err(WalError::encoding)?;
407 self.append(&bytes)
408 }
409
410 /// Iterate the log from the beginning, yielding each record in append order.
411 ///
412 /// The iterator walks the records that are fully written at the moment it is
413 /// created — it does not see records still being written by other threads, or
414 /// appended afterwards. Each item is a [`Result`]: a damaged record yields a
415 /// single [`WalError::Corruption`] and then the iterator stops. In a log
416 /// opened normally the torn tail has already been truncated, so iteration
417 /// runs cleanly to the end.
418 ///
419 /// # Examples
420 ///
421 /// ```
422 /// use wal_db::{MemStore, Wal};
423 /// # fn main() -> Result<(), wal_db::WalError> {
424 /// let wal = Wal::with_store(MemStore::new())?;
425 /// wal.append(b"one")?;
426 /// wal.append(b"two")?;
427 ///
428 /// let mut seen = Vec::new();
429 /// for entry in wal.iter()? {
430 /// seen.push(entry?.into_data());
431 /// }
432 /// assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);
433 /// # Ok(())
434 /// # }
435 /// ```
436 pub fn iter(&self) -> Result<WalIter<'_, S>> {
437 let end = self.commit.committed();
438 Ok(WalIter {
439 wal: self,
440 offset: 0,
441 end,
442 done: false,
443 policy: self.recovery_policy,
444 })
445 }
446
447 /// Iterate from `from` (a record's [`Lsn`]) to the end, skipping the records
448 /// before it.
449 ///
450 /// Because an LSN is a byte offset, seeking is O(1): iteration simply starts
451 /// at `from` instead of 0. Pass an [`Lsn`] that a previous
452 /// [`append`](Wal::append) or [`iter`](Wal::iter) produced — a real record
453 /// boundary. An `Lsn` that does not land on a record boundary will be read as
454 /// a malformed record and surface as [`WalError::Corruption`]; an `Lsn` past
455 /// the end yields an empty iterator.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use wal_db::{MemStore, Wal};
461 /// # fn main() -> Result<(), wal_db::WalError> {
462 /// let wal = Wal::with_store(MemStore::new())?;
463 /// wal.append(b"one")?;
464 /// let second = wal.append(b"two")?;
465 /// wal.append(b"three")?;
466 ///
467 /// let from_second: Vec<Vec<u8>> = wal
468 /// .iter_from(second)?
469 /// .map(|entry| entry.map(|r| r.into_data()))
470 /// .collect::<Result<_, _>>()?;
471 /// assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>> {
476 let end = self.commit.committed();
477 Ok(WalIter {
478 wal: self,
479 offset: from.get().min(end),
480 end,
481 done: false,
482 policy: self.recovery_policy,
483 })
484 }
485
486 /// Drop every record after the one at `lsn`, keeping the log up to and
487 /// including it. For compaction.
488 ///
489 /// The record at `lsn` becomes the new last record; the next append lands
490 /// right after it. The truncation is made durable before returning. `lsn`
491 /// must be a real record boundary from a previous [`append`](Wal::append) or
492 /// [`iter`](Wal::iter), and the record there must be intact.
493 ///
494 /// # Exclusive access
495 ///
496 /// This mutates the log's end, so it must **not** run concurrently with
497 /// [`append`](Wal::append), [`sync`](Wal::sync), or another `truncate_after`.
498 /// The caller is responsible for quiescing writers first — the usual case for
499 /// compaction, where the engine pauses the log, truncates, and resumes.
500 ///
501 /// # Errors
502 ///
503 /// - [`WalError::Corruption`] if `lsn` does not point at an intact record.
504 /// - [`WalError::Io`] if the truncation or its sync fails.
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// use wal_db::{MemStore, Wal};
510 /// # fn main() -> Result<(), wal_db::WalError> {
511 /// let wal = Wal::with_store(MemStore::new())?;
512 /// wal.append(b"keep me")?;
513 /// let last_kept = wal.append(b"and me")?;
514 /// wal.append(b"drop me")?;
515 ///
516 /// wal.truncate_after(last_kept)?;
517 ///
518 /// let remaining: Vec<Vec<u8>> = wal
519 /// .iter()?
520 /// .map(|entry| entry.map(|r| r.into_data()))
521 /// .collect::<Result<_, _>>()?;
522 /// assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);
523 /// # Ok(())
524 /// # }
525 /// ```
526 pub fn truncate_after(&self, lsn: Lsn) -> Result<()> {
527 let start = lsn.get();
528
529 // Confirm an intact record really lives at `lsn` before keeping it.
530 let mut header = [0u8; HEADER_LEN];
531 if self.store.read_at(start, &mut header)? < HEADER_LEN {
532 return Err(WalError::corruption(start, "no record at this LSN"));
533 }
534 let parsed = record::parse_header(&header);
535 if parsed.len > self.max_record_size {
536 return Err(WalError::corruption(start, "no valid record at this LSN"));
537 }
538 let payload_start = start
539 .checked_add(HEADER_LEN as u64)
540 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
541 let mut payload = vec![0u8; parsed.len as usize];
542 if self.store.read_at(payload_start, &mut payload)? < payload.len() {
543 return Err(WalError::corruption(start, "incomplete record at this LSN"));
544 }
545 if !record::verify(&header, &payload, parsed.crc) {
546 return Err(WalError::corruption(start, "no valid record at this LSN"));
547 }
548 let new_end = payload_start
549 .checked_add(u64::from(parsed.len))
550 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
551
552 self.store.truncate(new_end)?;
553 self.store.sync()?;
554 self.tail.0.store(new_end, Ordering::Release);
555 self.commit.reset(new_end);
556 Ok(())
557 }
558
559 /// The logical size of the log in bytes, including record framing.
560 ///
561 /// This is the offset at which the next append will land. It counts bytes
562 /// that have been reserved, which under heavy concurrency may include a
563 /// record another thread is still writing.
564 #[must_use]
565 pub fn len(&self) -> u64 {
566 self.tail.0.load(Ordering::Acquire)
567 }
568
569 /// Whether the log holds no records.
570 #[must_use]
571 pub fn is_empty(&self) -> bool {
572 self.len() == 0
573 }
574
575 /// Frame `record` into a reused buffer and write it at `start`.
576 fn frame_and_write(&self, start: u64, record: &[u8]) -> Result<()> {
577 with_frame_buffer(|buf| {
578 record::encode(buf, record);
579 self.store.write_at(start, buf)
580 })
581 }
582
583 /// Crate-internal access to the backing store, for tests that need to read,
584 /// corrupt, or extend the on-disk image directly.
585 #[cfg(test)]
586 pub(crate) fn store(&self) -> &S {
587 &self.store
588 }
589}
590
591/// Frame a record using a reused thread-local buffer, so steady-state appends do
592/// not allocate. Under loom a fresh buffer is used, since the model checker does
593/// not need (and does not instrument) the thread-local.
594#[cfg(not(loom))]
595fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
596 thread_local! {
597 static FRAME: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
598 }
599 FRAME.with(|cell| f(&mut cell.borrow_mut()))
600}
601
602#[cfg(loom)]
603fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
604 let mut buf = Vec::new();
605 f(&mut buf)
606}
607
608/// Scan a store from the start, returning the end offset of the last intact
609/// record and truncating any torn tail beyond it.
610fn recover<S: WalStore>(store: &S, max_record_size: u32) -> Result<u64> {
611 let physical = store.len()?;
612 let mut offset: u64 = 0;
613 let mut header = [0u8; HEADER_LEN];
614
615 while offset < physical {
616 if store.read_at(offset, &mut header)? < HEADER_LEN {
617 break; // incomplete header: torn tail
618 }
619 let parsed = record::parse_header(&header);
620 if parsed.len > max_record_size {
621 break; // implausible length: treat the rest as a torn tail
622 }
623
624 let payload_start = match offset.checked_add(HEADER_LEN as u64) {
625 Some(start) => start,
626 None => break,
627 };
628 let mut payload = vec![0u8; parsed.len as usize];
629 if store.read_at(payload_start, &mut payload)? < payload.len() {
630 break; // incomplete payload: torn tail
631 }
632 if !record::verify(&header, &payload, parsed.crc) {
633 break; // checksum mismatch: stop here
634 }
635
636 offset = match payload_start.checked_add(u64::from(parsed.len)) {
637 Some(end) => end,
638 None => break,
639 };
640 }
641
642 if offset < physical {
643 store.truncate(offset)?;
644 }
645 Ok(offset)
646}
647
648impl<S: WalStore> fmt::Debug for Wal<S> {
649 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650 f.debug_struct("Wal")
651 .field("len", &self.tail.0.load(Ordering::Relaxed))
652 .finish_non_exhaustive()
653 }
654}
655
656/// One record read back during iteration: its [`Lsn`] and its payload bytes.
657///
658/// Yielded by [`Wal::iter`]. The payload is owned (a fresh `Vec` per record);
659/// take it without copying via [`into_data`](Record::into_data), or borrow it
660/// via [`data`](Record::data).
661#[derive(Debug, Clone, PartialEq, Eq)]
662pub struct Record {
663 lsn: Lsn,
664 data: Vec<u8>,
665}
666
667impl Record {
668 /// The sequence number this record was assigned — its byte offset in the log.
669 pub fn lsn(&self) -> Lsn {
670 self.lsn
671 }
672
673 /// The record's payload bytes.
674 #[must_use]
675 pub fn data(&self) -> &[u8] {
676 &self.data
677 }
678
679 /// The payload length in bytes.
680 #[must_use]
681 pub fn len(&self) -> usize {
682 self.data.len()
683 }
684
685 /// Whether the record's payload is empty.
686 #[must_use]
687 pub fn is_empty(&self) -> bool {
688 self.data.is_empty()
689 }
690
691 /// Consume the record and take ownership of its payload without copying.
692 #[must_use]
693 pub fn into_data(self) -> Vec<u8> {
694 self.data
695 }
696
697 /// Decode the record's payload into a typed value via `pack-io`.
698 ///
699 /// The mirror of [`Wal::append_typed`]. Available with the `pack-io` feature.
700 ///
701 /// # Errors
702 ///
703 /// Returns [`WalError::Encoding`] if the bytes do not deserialise into `T` —
704 /// for example reading a record written as a different type.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// use wal_db::{MemStore, Wal};
710 /// use wal_db::pack_io::{Deserialize, Serialize};
711 ///
712 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
713 /// struct Event {
714 /// id: u64,
715 /// name: String,
716 /// }
717 ///
718 /// # fn main() -> Result<(), wal_db::WalError> {
719 /// let wal = Wal::with_store(MemStore::new())?;
720 /// wal.append_typed(&Event { id: 7, name: "boot".into() })?;
721 ///
722 /// let record = wal.iter()?.next().unwrap()?;
723 /// let event: Event = record.decode()?;
724 /// assert_eq!(event, Event { id: 7, name: "boot".into() });
725 /// # Ok(())
726 /// # }
727 /// ```
728 #[cfg(feature = "pack-io")]
729 pub fn decode<T: pack_io::Deserialize>(&self) -> Result<T> {
730 pack_io::decode(&self.data).map_err(WalError::encoding)
731 }
732}
733
734/// The outcome of reading one record-sized chunk at the iterator's cursor.
735enum Step {
736 /// A valid record, plus the offset just past it.
737 Record(Record, u64),
738 /// A damaged record. `skip_to` is the offset of the next record if its
739 /// extent is known (length and payload present, only the checksum failed),
740 /// or `None` if the damage makes the next record's position unknowable.
741 Damaged(WalError, Option<u64>),
742 /// A clean end: a short read, meaning the log stops here (a torn tail).
743 End,
744}
745
746/// The iterator returned by [`Wal::iter`].
747///
748/// Walks the records fully written when it was created, yielding
749/// `Result<`[`Record`]`>`. Behaviour at a damaged record follows the configured
750/// [`RecoveryPolicy`]: by default the iterator yields the damage once and stops;
751/// under [`RecoveryPolicy::SkipBadRecords`] it yields the damage and continues
752/// past it when the next record's position is still recoverable.
753pub struct WalIter<'a, S: WalStore = FileStore> {
754 wal: &'a Wal<S>,
755 offset: u64,
756 end: u64,
757 done: bool,
758 policy: RecoveryPolicy,
759}
760
761impl<S: WalStore> WalIter<'_, S> {
762 /// Read and classify the record at the current offset, without advancing.
763 fn step(&self) -> Result<Step> {
764 let mut header = [0u8; HEADER_LEN];
765 if self.wal.store.read_at(self.offset, &mut header)? < HEADER_LEN {
766 return Ok(Step::End);
767 }
768 let parsed = record::parse_header(&header);
769 if parsed.len > self.wal.max_record_size {
770 // The length is implausible, so the next record's position is
771 // unknowable — there is nothing to skip to.
772 return Ok(Step::Damaged(
773 WalError::corruption(self.offset, "record length exceeds the maximum"),
774 None,
775 ));
776 }
777
778 let payload_start = self
779 .offset
780 .checked_add(HEADER_LEN as u64)
781 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
782 let mut payload = vec![0u8; parsed.len as usize];
783 if self.wal.store.read_at(payload_start, &mut payload)? < payload.len() {
784 return Ok(Step::End);
785 }
786 let next = payload_start
787 .checked_add(u64::from(parsed.len))
788 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
789
790 if !record::verify(&header, &payload, parsed.crc) {
791 // The length and payload are present, so we know where the next
792 // record starts even though this one is corrupt.
793 return Ok(Step::Damaged(
794 WalError::corruption(self.offset, "checksum mismatch"),
795 Some(next),
796 ));
797 }
798
799 Ok(Step::Record(
800 Record {
801 lsn: Lsn::new(self.offset),
802 data: payload,
803 },
804 next,
805 ))
806 }
807}
808
809impl<S: WalStore> Iterator for WalIter<'_, S> {
810 type Item = Result<Record>;
811
812 fn next(&mut self) -> Option<Self::Item> {
813 if self.done || self.offset >= self.end {
814 return None;
815 }
816 match self.step() {
817 Ok(Step::Record(record, next)) => {
818 self.offset = next;
819 Some(Ok(record))
820 }
821 // Skip-bad-records, and the next record is locatable: surface the
822 // damage but continue from past it on the next call.
823 Ok(Step::Damaged(error, Some(next)))
824 if self.policy == RecoveryPolicy::SkipBadRecords =>
825 {
826 self.offset = next;
827 Some(Err(error))
828 }
829 // Stop-at-first-error, or damage that makes the next position
830 // unknowable: surface the damage and end.
831 Ok(Step::Damaged(error, _)) => {
832 self.done = true;
833 Some(Err(error))
834 }
835 Ok(Step::End) => {
836 self.done = true;
837 None
838 }
839 Err(error) => {
840 self.done = true;
841 Some(Err(error))
842 }
843 }
844 }
845}
846
847impl<S: WalStore> fmt::Debug for WalIter<'_, S> {
848 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
849 f.debug_struct("WalIter")
850 .field("offset", &self.offset)
851 .field("end", &self.end)
852 .field("done", &self.done)
853 .finish()
854 }
855}
856
857#[cfg(all(test, not(loom)))]
858#[allow(
859 clippy::unwrap_used,
860 clippy::expect_used,
861 unused_must_use,
862 unused_results
863)]
864mod tests {
865 use std::sync::Arc;
866 use std::thread;
867
868 use super::*;
869 use crate::store::MemStore;
870
871 fn drain(wal: &Wal<MemStore>) -> Vec<Vec<u8>> {
872 wal.iter()
873 .unwrap()
874 .map(|r| r.unwrap().into_data())
875 .collect()
876 }
877
878 fn corrupt_byte(store: &MemStore, offset: u64) {
879 let mut byte = [0u8; 1];
880 store.read_at(offset, &mut byte).unwrap();
881 byte[0] ^= 0xFF;
882 store.write_at(offset, &byte).unwrap();
883 }
884
885 #[test]
886 fn test_stop_at_first_error_stops_at_corruption() {
887 let wal = Wal::with_store(MemStore::new()).unwrap(); // default policy
888 wal.append(b"first").unwrap();
889 let second = wal.append(b"second").unwrap();
890 wal.append(b"third").unwrap();
891 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
892
893 let items: Vec<_> = wal.iter().unwrap().collect();
894 assert_eq!(items.len(), 2); // first ok, second damaged, then stop
895 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
896 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
897 }
898
899 #[test]
900 fn test_skip_bad_records_continues_past_corruption() {
901 let config = WalConfig::new().with_recovery_policy(RecoveryPolicy::SkipBadRecords);
902 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
903 wal.append(b"first").unwrap();
904 let second = wal.append(b"second").unwrap();
905 wal.append(b"third").unwrap();
906 // Corrupt the payload only; the length prefix stays intact, so the
907 // record is skippable.
908 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
909
910 let items: Vec<_> = wal.iter().unwrap().collect();
911 assert_eq!(items.len(), 3);
912 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
913 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
914 assert_eq!(items[2].as_ref().unwrap().data(), b"third");
915 }
916
917 #[test]
918 fn test_skip_bad_records_still_stops_on_unreadable_length() {
919 let config = WalConfig::new()
920 .with_max_record_size(16)
921 .with_recovery_policy(RecoveryPolicy::SkipBadRecords);
922 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
923 wal.append(b"ok").unwrap();
924 let second = wal.append(b"victim").unwrap();
925 // Corrupt the length field to an implausible value: the next record's
926 // position becomes unknowable, so even skip-mode must stop.
927 corrupt_byte(wal.store(), second.get() + 4); // LEN_OFFSET within the header
928
929 let items: Vec<_> = wal.iter().unwrap().collect();
930 assert_eq!(items.len(), 2); // first ok, then a damaged stop
931 assert_eq!(items[0].as_ref().unwrap().data(), b"ok");
932 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
933 }
934
935 #[cfg(feature = "pack-io")]
936 #[test]
937 fn test_typed_record_roundtrip() {
938 use pack_io::{Deserialize, Serialize};
939
940 #[derive(Serialize, Deserialize, PartialEq, Debug)]
941 struct Entry {
942 id: u64,
943 label: String,
944 }
945
946 let wal = Wal::with_store(MemStore::new()).unwrap();
947 wal.append_typed(&Entry {
948 id: 9,
949 label: "nine".into(),
950 })
951 .unwrap();
952 wal.append_typed(&Entry {
953 id: 10,
954 label: "ten".into(),
955 })
956 .unwrap();
957
958 let decoded: Vec<Entry> = wal
959 .iter()
960 .unwrap()
961 .map(|r| r.unwrap().decode().unwrap())
962 .collect();
963 assert_eq!(
964 decoded[0],
965 Entry {
966 id: 9,
967 label: "nine".into()
968 }
969 );
970 assert_eq!(
971 decoded[1],
972 Entry {
973 id: 10,
974 label: "ten".into()
975 }
976 );
977 }
978
979 #[cfg(feature = "pack-io")]
980 #[test]
981 fn test_typed_decode_wrong_type_errors() {
982 use pack_io::{Deserialize, Serialize};
983
984 #[derive(Serialize)]
985 struct Big {
986 a: u64,
987 b: u64,
988 c: u64,
989 }
990 #[derive(Deserialize)]
991 struct Small {
992 _a: u8,
993 }
994
995 let wal = Wal::with_store(MemStore::new()).unwrap();
996 wal.append_typed(&Big { a: 1, b: 2, c: 3 }).unwrap();
997 let record = wal.iter().unwrap().next().unwrap().unwrap();
998 // Decoding 24 bytes as a 1-byte type leaves trailing bytes -> error.
999 let result: Result<Small> = record.decode();
1000 assert!(matches!(result, Err(WalError::Encoding { .. })));
1001 }
1002
1003 #[test]
1004 fn test_append_assigns_byte_offset_lsns() {
1005 let wal = Wal::with_store(MemStore::new()).unwrap();
1006 let a = wal.append(b"abc").unwrap(); // 8 header + 3 = 11 bytes
1007 let b = wal.append(b"de").unwrap();
1008 assert_eq!(a.get(), 0);
1009 assert_eq!(b.get(), 11);
1010 }
1011
1012 #[test]
1013 fn test_iter_reads_back_all_records_in_order() {
1014 let wal = Wal::with_store(MemStore::new()).unwrap();
1015 wal.append(b"one").unwrap();
1016 wal.append(b"two").unwrap();
1017 wal.append(b"three").unwrap();
1018 assert_eq!(
1019 drain(&wal),
1020 vec![b"one".to_vec(), b"two".to_vec(), b"three".to_vec()]
1021 );
1022 }
1023
1024 #[test]
1025 fn test_empty_log_iterates_to_nothing() {
1026 let wal = Wal::with_store(MemStore::new()).unwrap();
1027 assert!(wal.is_empty());
1028 assert_eq!(drain(&wal).len(), 0);
1029 }
1030
1031 #[test]
1032 fn test_empty_record_roundtrips() {
1033 let wal = Wal::with_store(MemStore::new()).unwrap();
1034 wal.append(b"").unwrap();
1035 assert_eq!(drain(&wal), vec![Vec::<u8>::new()]);
1036 }
1037
1038 #[test]
1039 fn test_record_too_large_is_rejected() {
1040 let config = WalConfig::new().with_max_record_size(4);
1041 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1042 wal.append(b"ok").unwrap();
1043 let err = wal.append(b"too long").unwrap_err();
1044 assert!(matches!(err, WalError::RecordTooLarge { len: 8, max: 4 }));
1045 // The rejected append did not advance the log.
1046 assert_eq!(drain(&wal), vec![b"ok".to_vec()]);
1047 }
1048
1049 #[test]
1050 fn test_reopen_recovers_records() {
1051 let wal = Wal::with_store(MemStore::new()).unwrap();
1052 wal.append(b"first").unwrap();
1053 wal.append(b"second").unwrap();
1054 wal.sync().unwrap();
1055 let image = wal.store().snapshot();
1056
1057 let reopened = Wal::with_store(MemStore::from_bytes(image)).unwrap();
1058 assert_eq!(
1059 drain(&reopened),
1060 vec![b"first".to_vec(), b"second".to_vec()]
1061 );
1062 // The next append continues at the recovered end: two records of
1063 // (8 + 5) and (8 + 6) bytes leave the tail at 27.
1064 assert_eq!(reopened.append(b"third").unwrap().get(), 27);
1065 }
1066
1067 #[test]
1068 fn test_recovery_truncates_torn_tail() {
1069 let wal = Wal::with_store(MemStore::new()).unwrap();
1070 wal.append(b"good record").unwrap();
1071 let clean_len = wal.len();
1072 // Append raw garbage directly to the store: a torn tail.
1073 wal.store().write_at(clean_len, &[0xAB; 5]).unwrap();
1074
1075 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1076 assert_eq!(drain(&reopened), vec![b"good record".to_vec()]);
1077 assert_eq!(reopened.len(), clean_len);
1078 }
1079
1080 #[test]
1081 fn test_corrupt_record_surfaces_error_then_stops() {
1082 let wal = Wal::with_store(MemStore::new()).unwrap();
1083 wal.append(b"intact").unwrap();
1084 let second = wal.append(b"victim").unwrap();
1085 // Flip a byte inside the second record's payload (offset + header).
1086 let payload_offset = second.get() + HEADER_LEN as u64;
1087 let mut byte = [0u8; 1];
1088 wal.store().read_at(payload_offset, &mut byte).unwrap();
1089 byte[0] ^= 0xFF;
1090 wal.store().write_at(payload_offset, &byte).unwrap();
1091
1092 let mut iter = wal.iter().unwrap();
1093 assert_eq!(iter.next().unwrap().unwrap().data(), b"intact");
1094 assert!(matches!(
1095 iter.next().unwrap(),
1096 Err(WalError::Corruption { .. })
1097 ));
1098 assert!(iter.next().is_none());
1099 }
1100
1101 #[test]
1102 fn test_append_and_sync_is_durable() {
1103 let wal = Wal::with_store(MemStore::new()).unwrap();
1104 wal.append_and_sync(b"committed").unwrap();
1105 assert_eq!(drain(&wal), vec![b"committed".to_vec()]);
1106 }
1107
1108 #[test]
1109 fn test_iter_from_seeks_to_lsn() {
1110 let wal = Wal::with_store(MemStore::new()).unwrap();
1111 wal.append(b"a").unwrap();
1112 let b = wal.append(b"b").unwrap();
1113 wal.append(b"c").unwrap();
1114
1115 let got: Vec<Vec<u8>> = wal
1116 .iter_from(b)
1117 .unwrap()
1118 .map(|r| r.unwrap().into_data())
1119 .collect();
1120 assert_eq!(got, vec![b"b".to_vec(), b"c".to_vec()]);
1121 }
1122
1123 #[test]
1124 fn test_iter_from_past_end_is_empty() {
1125 let wal = Wal::with_store(MemStore::new()).unwrap();
1126 wal.append(b"a").unwrap();
1127 assert_eq!(wal.iter_from(Lsn::new(9_999)).unwrap().count(), 0);
1128 }
1129
1130 #[test]
1131 fn test_truncate_after_drops_later_records() {
1132 let wal = Wal::with_store(MemStore::new()).unwrap();
1133 wal.append(b"first").unwrap(); // [0, 13)
1134 let keep = wal.append(b"second").unwrap(); // [13, 27)
1135 wal.append(b"third").unwrap();
1136 wal.append(b"fourth").unwrap();
1137
1138 wal.truncate_after(keep).unwrap();
1139 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1140 assert_eq!(wal.len(), 27);
1141
1142 // Appends resume immediately after the kept record.
1143 assert_eq!(wal.append(b"new").unwrap().get(), 27);
1144 assert_eq!(
1145 drain(&wal),
1146 vec![b"first".to_vec(), b"second".to_vec(), b"new".to_vec()]
1147 );
1148 }
1149
1150 #[test]
1151 fn test_truncate_after_keeping_last_record_is_a_no_op() {
1152 let wal = Wal::with_store(MemStore::new()).unwrap();
1153 wal.append(b"first").unwrap();
1154 let last = wal.append(b"second").unwrap();
1155 let before = wal.len();
1156
1157 wal.truncate_after(last).unwrap();
1158 assert_eq!(wal.len(), before);
1159 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1160 }
1161
1162 #[test]
1163 fn test_truncate_after_invalid_lsn_errors() {
1164 let config = WalConfig::new().with_max_record_size(64);
1165 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1166 wal.append(b"only record").unwrap();
1167 // An LSN that does not land on a record boundary is rejected.
1168 let err = wal.truncate_after(Lsn::new(3)).unwrap_err();
1169 assert!(matches!(err, WalError::Corruption { .. }));
1170 }
1171
1172 #[test]
1173 fn test_concurrent_appends_no_overlap_all_recovered() {
1174 const THREADS: usize = 8;
1175 const PER_THREAD: usize = 200;
1176
1177 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1178 let mut handles = Vec::new();
1179 for t in 0..THREADS {
1180 let wal = Arc::clone(&wal);
1181 handles.push(thread::spawn(move || {
1182 let mut lsns = Vec::with_capacity(PER_THREAD);
1183 for i in 0..PER_THREAD {
1184 let payload = format!("t{t}-r{i}").into_bytes();
1185 lsns.push(wal.append(&payload).unwrap().get());
1186 }
1187 lsns
1188 }));
1189 }
1190 let mut all_lsns = Vec::new();
1191 for h in handles {
1192 all_lsns.extend(h.join().unwrap());
1193 }
1194 wal.sync().unwrap();
1195
1196 // Every LSN is distinct (no two records shared a byte range).
1197 let mut sorted = all_lsns.clone();
1198 sorted.sort_unstable();
1199 sorted.dedup();
1200 assert_eq!(sorted.len(), THREADS * PER_THREAD);
1201
1202 // Recovery reads back exactly the records that were appended, in offset
1203 // order, with no gaps or corruption.
1204 let records = drain(&wal);
1205 assert_eq!(records.len(), THREADS * PER_THREAD);
1206
1207 // Reopening from the raw image recovers the same set.
1208 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1209 assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
1210 }
1211
1212 #[test]
1213 fn test_concurrent_append_and_sync_all_durable() {
1214 const THREADS: usize = 8;
1215
1216 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1217 let mut handles = Vec::new();
1218 for t in 0..THREADS {
1219 let wal = Arc::clone(&wal);
1220 handles.push(thread::spawn(move || {
1221 for i in 0..50 {
1222 wal.append_and_sync(format!("{t}:{i}").as_bytes()).unwrap();
1223 }
1224 }));
1225 }
1226 for h in handles {
1227 h.join().unwrap();
1228 }
1229 assert_eq!(drain(&wal).len(), THREADS * 50);
1230 }
1231}