wal_db/wal.rs
1//! The log itself: [`Wal`], its recovery iterator [`WalIter`], and the
2//! [`Record`] iteration yields.
3
4use std::{fmt, io, sync::atomic::Ordering};
5
6#[cfg(not(loom))]
7use std::{cell::RefCell, path::Path};
8
9use crate::{
10 commit::Commit,
11 config::{RecoveryPolicy, WalConfig},
12 error::{Result, WalError},
13 lsn::Lsn,
14 record::{self, HEADER_LEN},
15 store::{FileStore, WalStore},
16 sync::AtomicU64,
17};
18
19/// A cache-line-aligned wrapper, used to keep the heavily-written reservation
20/// counter off the same cache line as the rest of the log's fields so appenders
21/// hammering it do not invalidate readers' caches (false sharing).
22#[repr(align(64))]
23#[derive(Debug)]
24struct CacheAligned<T>(T);
25
26/// A durable, append-only log.
27///
28/// `Wal` is the entry point. The four calls that cover almost every use are
29/// [`open`](Wal::open), [`append`](Wal::append), [`sync`](Wal::sync), and
30/// [`iter`](Wal::iter). The type parameter `S` is the storage backend and
31/// defaults to [`FileStore`], so the plain name `Wal` is the file-backed log;
32/// custom backends are supplied through [`with_store`](Wal::with_store).
33///
34/// A `Wal` is [`Send`] and [`Sync`], and the append path is built for it: many
35/// threads can call [`append`](Wal::append) at once with no global lock. Share
36/// one behind an [`Arc`](std::sync::Arc) and write from every thread.
37///
38/// # Concurrency and durability
39///
40/// Appends are lock-free. Each one reserves its byte range with a single atomic
41/// step — the range's start offset *is* the record's [`Lsn`] — frames the record
42/// into a reused thread-local buffer, and writes it, all without blocking other
43/// appenders. [`sync`](Wal::sync) is the durability barrier; when several
44/// threads sync at once they coalesce into a single fsync (group commit), so the
45/// cost of making data durable is amortised across everyone committing together.
46///
47/// `append` returns once the record is in the OS page cache; `sync` returns once
48/// it is on stable storage. See the [crate docs](crate) for the full contract.
49///
50/// # Examples
51///
52/// ```
53/// use wal_db::Wal;
54///
55/// # fn main() -> Result<(), wal_db::WalError> {
56/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
57/// # let path = dir.path().join("log.wal");
58/// let wal = Wal::open(&path)?;
59/// let first = wal.append(b"first")?;
60/// let second = wal.append(b"second")?;
61/// wal.sync()?;
62///
63/// // LSNs are byte offsets: the first record starts at 0, the second after it.
64/// assert_eq!(first.get(), 0);
65/// assert!(second.get() > first.get());
66///
67/// let read_back: Vec<Vec<u8>> = wal
68/// .iter()?
69/// .map(|entry| entry.map(|record| record.into_data()))
70/// .collect::<Result<_, _>>()?;
71/// assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);
72/// # Ok(())
73/// # }
74/// ```
75pub struct Wal<S = FileStore> {
76 /// Next byte offset to reserve. Hammered by every appender, so kept on its
77 /// own cache line.
78 tail: CacheAligned<AtomicU64>,
79 store: S,
80 max_record_size: u32,
81 recovery_policy: RecoveryPolicy,
82 commit: Commit,
83}
84
85#[cfg(not(loom))]
86impl Wal<FileStore> {
87 /// Open the log at `path`, creating it if it does not exist.
88 ///
89 /// On open the log scans its contents, stops at the first record that is
90 /// incomplete or fails its checksum, and truncates that torn tail so the
91 /// next append lands on a clean boundary. The common cause of a torn tail is
92 /// a crash partway through an earlier append; that record was never
93 /// acknowledged durable, so discarding it loses nothing the caller was
94 /// promised.
95 ///
96 /// # Errors
97 ///
98 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
99 ///
100 /// # Examples
101 ///
102 /// ```
103 /// use wal_db::Wal;
104 /// # fn main() -> Result<(), wal_db::WalError> {
105 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
106 /// # let path = dir.path().join("log.wal");
107 /// let wal = Wal::open(&path)?;
108 /// wal.append(b"hello")?;
109 /// wal.sync()?;
110 /// # Ok(())
111 /// # }
112 /// ```
113 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
114 Self::open_with(path, WalConfig::new())
115 }
116
117 /// Open the log at `path` with an explicit [`WalConfig`].
118 ///
119 /// # Errors
120 ///
121 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use wal_db::{Wal, WalConfig};
127 /// # fn main() -> Result<(), wal_db::WalError> {
128 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
129 /// # let path = dir.path().join("log.wal");
130 /// let config = WalConfig::new().with_max_record_size(1024);
131 /// let wal = Wal::open_with(&path, config)?;
132 /// # let _ = wal;
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
137 let store = FileStore::open(path)?;
138 Self::with_store_and_config(store, config)
139 }
140}
141
142#[cfg(not(loom))]
143impl Wal<crate::segment::SegmentedStore> {
144 /// Open a segmented log in directory `dir`, with segments of `segment_size`
145 /// bytes, creating the directory if needed.
146 ///
147 /// The log is one continuous byte stream striped across fixed-size files, so
148 /// it behaves exactly like a single-file log — records span segment
149 /// boundaries freely — while keeping each file bounded for recovery and
150 /// archival. Records larger than a segment simply occupy several.
151 ///
152 /// # Errors
153 ///
154 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
155 /// be opened or scanned.
156 ///
157 /// # Examples
158 ///
159 /// ```
160 /// use wal_db::Wal;
161 /// # fn main() -> Result<(), wal_db::WalError> {
162 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
163 /// let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
164 /// wal.append(b"record")?;
165 /// wal.sync()?;
166 /// # Ok(())
167 /// # }
168 /// ```
169 pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
170 Self::open_segmented_with(dir, segment_size, WalConfig::new())
171 }
172
173 /// Open a segmented log with an explicit [`WalConfig`].
174 ///
175 /// Like [`open_segmented`](Wal::open_segmented), but applies `config` (for
176 /// example a tighter [`max_record_size`](WalConfig::max_record_size)).
177 ///
178 /// # Errors
179 ///
180 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
181 /// be opened or scanned.
182 pub fn open_segmented_with(
183 dir: impl AsRef<Path>,
184 segment_size: u64,
185 config: WalConfig,
186 ) -> Result<Self> {
187 let store = crate::segment::SegmentedStore::open(dir, segment_size)?;
188 Self::with_store_and_config(store, config)
189 }
190}
191
192impl<S: WalStore> Wal<S> {
193 /// Build a log over a custom [`WalStore`], using the default configuration.
194 ///
195 /// # Errors
196 ///
197 /// Returns an error if scanning the existing contents of the store fails.
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// use wal_db::{MemStore, Wal};
203 /// # fn main() -> Result<(), wal_db::WalError> {
204 /// let wal = Wal::with_store(MemStore::new())?;
205 /// wal.append(b"record")?;
206 /// # Ok(())
207 /// # }
208 /// ```
209 pub fn with_store(store: S) -> Result<Self> {
210 Self::with_store_and_config(store, WalConfig::new())
211 }
212
213 /// Build a log over a custom [`WalStore`] with an explicit [`WalConfig`].
214 ///
215 /// # Errors
216 ///
217 /// Returns an error if scanning the existing contents of the store fails.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// use wal_db::{MemStore, Wal, WalConfig};
223 /// # fn main() -> Result<(), wal_db::WalError> {
224 /// let config = WalConfig::new().with_max_record_size(64);
225 /// let wal = Wal::with_store_and_config(MemStore::new(), config)?;
226 /// # let _ = wal;
227 /// # Ok(())
228 /// # }
229 /// ```
230 pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self> {
231 let recovered = recover(&store, config.max_record_size())?;
232 Ok(Wal {
233 tail: CacheAligned(AtomicU64::new(recovered)),
234 store,
235 max_record_size: config.max_record_size(),
236 recovery_policy: config.recovery_policy(),
237 commit: Commit::new(recovered),
238 })
239 }
240
241 /// Append `record` to the log and return the [`Lsn`] it was assigned — the
242 /// byte offset where the record begins.
243 ///
244 /// Lock-free: the byte range is reserved with one atomic step and the record
245 /// is written without blocking other appenders. Returns once the bytes are
246 /// in the operating system's page cache. It does **not** flush the disk —
247 /// call [`sync`](Wal::sync) for that. A crash between `append` and `sync` may
248 /// lose the record.
249 ///
250 /// # Errors
251 ///
252 /// - [`WalError::RecordTooLarge`] if `record` is larger than the configured
253 /// [`max_record_size`](WalConfig::max_record_size). The log is unchanged.
254 /// - [`WalError::Io`] if the write fails. The reserved range becomes a
255 /// permanent gap: the log is durable only up to that point, recovery stops
256 /// there, and later syncs covering it report the truncation.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use wal_db::{MemStore, Wal};
262 /// # fn main() -> Result<(), wal_db::WalError> {
263 /// let wal = Wal::with_store(MemStore::new())?;
264 /// let lsn = wal.append(b"some bytes")?;
265 /// assert_eq!(lsn.get(), 0);
266 /// # Ok(())
267 /// # }
268 /// ```
269 pub fn append(&self, record: &[u8]) -> Result<Lsn> {
270 let payload_len = record.len();
271 if payload_len > self.max_record_size as usize {
272 return Err(WalError::RecordTooLarge {
273 len: payload_len,
274 max: self.max_record_size,
275 });
276 }
277 let frame_len = record::framed_len(payload_len) as u64;
278
279 // Reserve the byte range. The returned start offset is the LSN, and
280 // because it comes from a single atomic it is unique and ordered.
281 let start = self.tail.0.fetch_add(frame_len, Ordering::Relaxed);
282 let end = match start.checked_add(frame_len) {
283 Some(end) => end,
284 None => {
285 self.commit.mark_failed(start);
286 return Err(WalError::io(
287 "reserving a record offset",
288 io::Error::other("log size exceeds u64"),
289 ));
290 }
291 };
292
293 match self.frame_and_write(start, record) {
294 Ok(()) => {
295 self.commit.mark_written(start, end);
296 Ok(Lsn::new(start))
297 }
298 Err(error) => {
299 self.commit.mark_failed(start);
300 Err(error)
301 }
302 }
303 }
304
305 /// Make every record appended before this call durable.
306 ///
307 /// Returns once the data is on stable storage, using the platform's true
308 /// durability barrier. Concurrent calls coalesce into a single fsync, so the
309 /// flush cost is shared by everyone committing at the same time.
310 ///
311 /// # Errors
312 ///
313 /// Returns [`WalError::Io`] if the flush fails, or [`WalError::Corruption`]
314 /// if an earlier append's write failed and left a gap that cannot be made
315 /// durable. A failed sync means the records are not durable; treat it as
316 /// fatal, not as something to retry blindly.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use wal_db::Wal;
322 /// # fn main() -> Result<(), wal_db::WalError> {
323 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
324 /// # let path = dir.path().join("log.wal");
325 /// let wal = Wal::open(&path)?;
326 /// wal.append(b"durable me")?;
327 /// wal.sync()?; // now on stable storage
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub fn sync(&self) -> Result<()> {
332 let target = self.tail.0.load(Ordering::Acquire);
333 if target == 0 {
334 return Ok(());
335 }
336 self.commit.sync_to(&self.store, target)
337 }
338
339 /// Append `record` and make it durable in one call, returning its [`Lsn`].
340 ///
341 /// Equivalent to [`append`](Wal::append) followed by a [`sync`](Wal::sync)
342 /// scoped to this record, but with the sync coalesced into the group commit
343 /// of any other threads syncing at the same moment. Use it when every record
344 /// must be durable before you proceed and you want the group-commit
345 /// throughput without managing the two calls yourself.
346 ///
347 /// # Errors
348 ///
349 /// The union of [`append`](Wal::append)'s and [`sync`](Wal::sync)'s errors.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use wal_db::Wal;
355 /// # fn main() -> Result<(), wal_db::WalError> {
356 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
357 /// # let path = dir.path().join("log.wal");
358 /// let wal = Wal::open(&path)?;
359 /// let lsn = wal.append_and_sync(b"committed immediately")?;
360 /// # let _ = lsn;
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn> {
365 let lsn = self.append(record)?;
366 let end = lsn.get() + record::framed_len(record.len()) as u64;
367 self.commit.sync_to(&self.store, end)?;
368 Ok(lsn)
369 }
370
371 /// Serialise `value` with `pack-io` and append it, returning its [`Lsn`].
372 ///
373 /// The typed counterpart to [`append`](Wal::append): the value is encoded to
374 /// bytes and appended as one record, which [`Record::decode`] reads back.
375 /// Available with the `pack-io` feature. Like `append`, it does not sync.
376 ///
377 /// # Errors
378 ///
379 /// - [`WalError::Encoding`] if the value fails to serialise.
380 /// - Otherwise the errors of [`append`](Wal::append) ([`WalError::RecordTooLarge`],
381 /// [`WalError::Io`]).
382 ///
383 /// # Examples
384 ///
385 /// ```
386 /// use wal_db::{MemStore, Wal};
387 /// use wal_db::pack_io::{Deserialize, Serialize};
388 ///
389 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
390 /// struct Entry {
391 /// key: String,
392 /// value: u64,
393 /// }
394 ///
395 /// # fn main() -> Result<(), wal_db::WalError> {
396 /// let wal = Wal::with_store(MemStore::new())?;
397 /// wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
398 ///
399 /// let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
400 /// assert_eq!(entry.value, 100);
401 /// # Ok(())
402 /// # }
403 /// ```
404 #[cfg(feature = "pack-io")]
405 pub fn append_typed<T: pack_io::Serialize + ?Sized>(&self, value: &T) -> Result<Lsn> {
406 let bytes = pack_io::encode(value).map_err(WalError::encoding)?;
407 self.append(&bytes)
408 }
409
410 /// Iterate the log from the beginning, yielding each record in append order.
411 ///
412 /// The iterator walks the records that are fully written at the moment it is
413 /// created — it does not see records still being written by other threads, or
414 /// appended afterwards. Each item is a [`Result`]: a damaged record yields a
415 /// single [`WalError::Corruption`] and then the iterator stops. In a log
416 /// opened normally the torn tail has already been truncated, so iteration
417 /// runs cleanly to the end.
418 ///
419 /// # Examples
420 ///
421 /// ```
422 /// use wal_db::{MemStore, Wal};
423 /// # fn main() -> Result<(), wal_db::WalError> {
424 /// let wal = Wal::with_store(MemStore::new())?;
425 /// wal.append(b"one")?;
426 /// wal.append(b"two")?;
427 ///
428 /// let mut seen = Vec::new();
429 /// for entry in wal.iter()? {
430 /// seen.push(entry?.into_data());
431 /// }
432 /// assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);
433 /// # Ok(())
434 /// # }
435 /// ```
436 pub fn iter(&self) -> Result<WalIter<'_, S>> {
437 let end = self.commit.committed();
438 Ok(WalIter {
439 wal: self,
440 offset: 0,
441 end,
442 done: false,
443 policy: self.recovery_policy,
444 })
445 }
446
447 /// Iterate from `from` (a record's [`Lsn`]) to the end, skipping the records
448 /// before it.
449 ///
450 /// Because an LSN is a byte offset, seeking is O(1): iteration simply starts
451 /// at `from` instead of 0. Pass an [`Lsn`] that a previous
452 /// [`append`](Wal::append) or [`iter`](Wal::iter) produced — a real record
453 /// boundary. An `Lsn` that does not land on a record boundary will be read as
454 /// a malformed record and surface as [`WalError::Corruption`]; an `Lsn` past
455 /// the end yields an empty iterator.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use wal_db::{MemStore, Wal};
461 /// # fn main() -> Result<(), wal_db::WalError> {
462 /// let wal = Wal::with_store(MemStore::new())?;
463 /// wal.append(b"one")?;
464 /// let second = wal.append(b"two")?;
465 /// wal.append(b"three")?;
466 ///
467 /// let from_second: Vec<Vec<u8>> = wal
468 /// .iter_from(second)?
469 /// .map(|entry| entry.map(|r| r.into_data()))
470 /// .collect::<Result<_, _>>()?;
471 /// assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>> {
476 let end = self.commit.committed();
477 Ok(WalIter {
478 wal: self,
479 offset: from.get().min(end),
480 end,
481 done: false,
482 policy: self.recovery_policy,
483 })
484 }
485
486 /// Drop every record after the one at `lsn`, keeping the log up to and
487 /// including it. For compaction.
488 ///
489 /// The record at `lsn` becomes the new last record; the next append lands
490 /// right after it. The truncation is made durable before returning. `lsn`
491 /// must be a real record boundary from a previous [`append`](Wal::append) or
492 /// [`iter`](Wal::iter), and the record there must be intact.
493 ///
494 /// # Exclusive access
495 ///
496 /// This mutates the log's end, so it must **not** run concurrently with
497 /// [`append`](Wal::append), [`sync`](Wal::sync), or another `truncate_after`.
498 /// The caller is responsible for quiescing writers first — the usual case for
499 /// compaction, where the engine pauses the log, truncates, and resumes.
500 ///
501 /// # Errors
502 ///
503 /// - [`WalError::Corruption`] if `lsn` does not point at an intact record.
504 /// - [`WalError::Io`] if the truncation or its sync fails.
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// use wal_db::{MemStore, Wal};
510 /// # fn main() -> Result<(), wal_db::WalError> {
511 /// let wal = Wal::with_store(MemStore::new())?;
512 /// wal.append(b"keep me")?;
513 /// let last_kept = wal.append(b"and me")?;
514 /// wal.append(b"drop me")?;
515 ///
516 /// wal.truncate_after(last_kept)?;
517 ///
518 /// let remaining: Vec<Vec<u8>> = wal
519 /// .iter()?
520 /// .map(|entry| entry.map(|r| r.into_data()))
521 /// .collect::<Result<_, _>>()?;
522 /// assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);
523 /// # Ok(())
524 /// # }
525 /// ```
526 pub fn truncate_after(&self, lsn: Lsn) -> Result<()> {
527 let start = lsn.get();
528
529 // Confirm an intact record really lives at `lsn` before keeping it.
530 let mut header = [0u8; HEADER_LEN];
531 if self.store.read_at(start, &mut header)? < HEADER_LEN {
532 return Err(WalError::corruption(start, "no record at this LSN"));
533 }
534 let parsed = record::parse_header(&header);
535 if parsed.len > self.max_record_size {
536 return Err(WalError::corruption(start, "no valid record at this LSN"));
537 }
538 let payload_start = start
539 .checked_add(HEADER_LEN as u64)
540 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
541 let mut payload = vec![0u8; parsed.len as usize];
542 if self.store.read_at(payload_start, &mut payload)? < payload.len() {
543 return Err(WalError::corruption(start, "incomplete record at this LSN"));
544 }
545 if !record::verify(&header, &payload, parsed.crc) {
546 return Err(WalError::corruption(start, "no valid record at this LSN"));
547 }
548 let new_end = payload_start
549 .checked_add(u64::from(parsed.len))
550 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
551
552 self.store.truncate(new_end)?;
553 self.store.sync()?;
554 self.tail.0.store(new_end, Ordering::Release);
555 self.commit.reset(new_end);
556 Ok(())
557 }
558
559 /// The logical size of the log in bytes, including record framing.
560 ///
561 /// This is the offset at which the next append will land. It counts bytes
562 /// that have been reserved, which under heavy concurrency may include a
563 /// record another thread is still writing.
564 #[must_use]
565 pub fn len(&self) -> u64 {
566 self.tail.0.load(Ordering::Acquire)
567 }
568
569 /// Whether the log holds no records.
570 #[must_use]
571 pub fn is_empty(&self) -> bool {
572 self.len() == 0
573 }
574
575 /// Frame `record` into a reused buffer and write it at `start`.
576 fn frame_and_write(&self, start: u64, record: &[u8]) -> Result<()> {
577 with_frame_buffer(|buf| {
578 record::encode(buf, record);
579 self.store.write_at(start, buf)
580 })
581 }
582
583 /// Crate-internal access to the backing store, for tests that need to read,
584 /// corrupt, or extend the on-disk image directly.
585 #[cfg(test)]
586 pub(crate) fn store(&self) -> &S {
587 &self.store
588 }
589}
590
591/// Frame a record using a reused thread-local buffer, so steady-state appends do
592/// not allocate. Under loom a fresh buffer is used, since the model checker does
593/// not need (and does not instrument) the thread-local.
594#[cfg(not(loom))]
595fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
596 thread_local! {
597 static FRAME: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
598 }
599 FRAME.with(|cell| f(&mut cell.borrow_mut()))
600}
601
602#[cfg(loom)]
603fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
604 let mut buf = Vec::new();
605 f(&mut buf)
606}
607
608/// Scan a store from the start, returning the end offset of the last intact
609/// record and truncating any torn tail beyond it.
610fn recover<S: WalStore>(store: &S, max_record_size: u32) -> Result<u64> {
611 let physical = store.len()?;
612 let mut offset: u64 = 0;
613 let mut header = [0u8; HEADER_LEN];
614 // One reused payload buffer for the whole scan: only the checksum needs the
615 // bytes, so the buffer is refilled per record rather than reallocated.
616 let mut payload = Vec::new();
617
618 while offset < physical {
619 if store.read_at(offset, &mut header)? < HEADER_LEN {
620 break; // incomplete header: torn tail
621 }
622 let parsed = record::parse_header(&header);
623 if parsed.len > max_record_size {
624 break; // implausible length: treat the rest as a torn tail
625 }
626
627 let payload_start = match offset.checked_add(HEADER_LEN as u64) {
628 Some(start) => start,
629 None => break,
630 };
631 let len = parsed.len as usize;
632 payload.clear();
633 payload.resize(len, 0);
634 if store.read_at(payload_start, &mut payload)? < len {
635 break; // incomplete payload: torn tail
636 }
637 if !record::verify(&header, &payload, parsed.crc) {
638 break; // checksum mismatch: stop here
639 }
640
641 offset = match payload_start.checked_add(u64::from(parsed.len)) {
642 Some(end) => end,
643 None => break,
644 };
645 }
646
647 if offset < physical {
648 store.truncate(offset)?;
649 }
650 Ok(offset)
651}
652
653impl<S: WalStore> fmt::Debug for Wal<S> {
654 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
655 f.debug_struct("Wal")
656 .field("len", &self.tail.0.load(Ordering::Relaxed))
657 .finish_non_exhaustive()
658 }
659}
660
661/// One record read back during iteration: its [`Lsn`] and its payload bytes.
662///
663/// Yielded by [`Wal::iter`]. The payload is owned (a fresh `Vec` per record);
664/// take it without copying via [`into_data`](Record::into_data), or borrow it
665/// via [`data`](Record::data).
666#[derive(Debug, Clone, PartialEq, Eq)]
667pub struct Record {
668 lsn: Lsn,
669 data: Vec<u8>,
670}
671
672impl Record {
673 /// The sequence number this record was assigned — its byte offset in the log.
674 pub fn lsn(&self) -> Lsn {
675 self.lsn
676 }
677
678 /// The record's payload bytes.
679 #[must_use]
680 pub fn data(&self) -> &[u8] {
681 &self.data
682 }
683
684 /// The payload length in bytes.
685 #[must_use]
686 pub fn len(&self) -> usize {
687 self.data.len()
688 }
689
690 /// Whether the record's payload is empty.
691 #[must_use]
692 pub fn is_empty(&self) -> bool {
693 self.data.is_empty()
694 }
695
696 /// Consume the record and take ownership of its payload without copying.
697 #[must_use]
698 pub fn into_data(self) -> Vec<u8> {
699 self.data
700 }
701
702 /// Decode the record's payload into a typed value via `pack-io`.
703 ///
704 /// The mirror of [`Wal::append_typed`]. Available with the `pack-io` feature.
705 ///
706 /// # Errors
707 ///
708 /// Returns [`WalError::Encoding`] if the bytes do not deserialise into `T` —
709 /// for example reading a record written as a different type.
710 ///
711 /// # Examples
712 ///
713 /// ```
714 /// use wal_db::{MemStore, Wal};
715 /// use wal_db::pack_io::{Deserialize, Serialize};
716 ///
717 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
718 /// struct Event {
719 /// id: u64,
720 /// name: String,
721 /// }
722 ///
723 /// # fn main() -> Result<(), wal_db::WalError> {
724 /// let wal = Wal::with_store(MemStore::new())?;
725 /// wal.append_typed(&Event { id: 7, name: "boot".into() })?;
726 ///
727 /// let record = wal.iter()?.next().unwrap()?;
728 /// let event: Event = record.decode()?;
729 /// assert_eq!(event, Event { id: 7, name: "boot".into() });
730 /// # Ok(())
731 /// # }
732 /// ```
733 #[cfg(feature = "pack-io")]
734 pub fn decode<T: pack_io::Deserialize>(&self) -> Result<T> {
735 pack_io::decode(&self.data).map_err(WalError::encoding)
736 }
737}
738
739/// The outcome of reading one record-sized chunk at the iterator's cursor.
740enum Step {
741 /// A valid record, plus the offset just past it.
742 Record(Record, u64),
743 /// A damaged record. `skip_to` is the offset of the next record if its
744 /// extent is known (length and payload present, only the checksum failed),
745 /// or `None` if the damage makes the next record's position unknowable.
746 Damaged(WalError, Option<u64>),
747 /// A clean end: a short read, meaning the log stops here (a torn tail).
748 End,
749}
750
751/// The iterator returned by [`Wal::iter`].
752///
753/// Walks the records fully written when it was created, yielding
754/// `Result<`[`Record`]`>`. Behaviour at a damaged record follows the configured
755/// [`RecoveryPolicy`]: by default the iterator yields the damage once and stops;
756/// under [`RecoveryPolicy::SkipBadRecords`] it yields the damage and continues
757/// past it when the next record's position is still recoverable.
758pub struct WalIter<'a, S: WalStore = FileStore> {
759 wal: &'a Wal<S>,
760 offset: u64,
761 end: u64,
762 done: bool,
763 policy: RecoveryPolicy,
764}
765
766impl<S: WalStore> WalIter<'_, S> {
767 /// Read and classify the record at the current offset, without advancing.
768 fn step(&self) -> Result<Step> {
769 let mut header = [0u8; HEADER_LEN];
770 if self.wal.store.read_at(self.offset, &mut header)? < HEADER_LEN {
771 return Ok(Step::End);
772 }
773 let parsed = record::parse_header(&header);
774 if parsed.len > self.wal.max_record_size {
775 // The length is implausible, so the next record's position is
776 // unknowable — there is nothing to skip to.
777 return Ok(Step::Damaged(
778 WalError::corruption(self.offset, "record length exceeds the maximum"),
779 None,
780 ));
781 }
782
783 let payload_start = self
784 .offset
785 .checked_add(HEADER_LEN as u64)
786 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
787 let mut payload = vec![0u8; parsed.len as usize];
788 if self.wal.store.read_at(payload_start, &mut payload)? < payload.len() {
789 return Ok(Step::End);
790 }
791 let next = payload_start
792 .checked_add(u64::from(parsed.len))
793 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
794
795 if !record::verify(&header, &payload, parsed.crc) {
796 // The length and payload are present, so we know where the next
797 // record starts even though this one is corrupt.
798 return Ok(Step::Damaged(
799 WalError::corruption(self.offset, "checksum mismatch"),
800 Some(next),
801 ));
802 }
803
804 Ok(Step::Record(
805 Record {
806 lsn: Lsn::new(self.offset),
807 data: payload,
808 },
809 next,
810 ))
811 }
812}
813
814impl<S: WalStore> Iterator for WalIter<'_, S> {
815 type Item = Result<Record>;
816
817 fn next(&mut self) -> Option<Self::Item> {
818 if self.done || self.offset >= self.end {
819 return None;
820 }
821 match self.step() {
822 Ok(Step::Record(record, next)) => {
823 self.offset = next;
824 Some(Ok(record))
825 }
826 // Skip-bad-records, and the next record is locatable: surface the
827 // damage but continue from past it on the next call.
828 Ok(Step::Damaged(error, Some(next)))
829 if self.policy == RecoveryPolicy::SkipBadRecords =>
830 {
831 self.offset = next;
832 Some(Err(error))
833 }
834 // Stop-at-first-error, or damage that makes the next position
835 // unknowable: surface the damage and end.
836 Ok(Step::Damaged(error, _)) => {
837 self.done = true;
838 Some(Err(error))
839 }
840 Ok(Step::End) => {
841 self.done = true;
842 None
843 }
844 Err(error) => {
845 self.done = true;
846 Some(Err(error))
847 }
848 }
849 }
850}
851
852impl<S: WalStore> fmt::Debug for WalIter<'_, S> {
853 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
854 f.debug_struct("WalIter")
855 .field("offset", &self.offset)
856 .field("end", &self.end)
857 .field("done", &self.done)
858 .finish()
859 }
860}
861
862#[cfg(all(test, not(loom)))]
863#[allow(
864 clippy::unwrap_used,
865 clippy::expect_used,
866 unused_must_use,
867 unused_results
868)]
869mod tests {
870 use std::sync::Arc;
871 use std::thread;
872
873 use super::*;
874 use crate::store::MemStore;
875
876 fn drain(wal: &Wal<MemStore>) -> Vec<Vec<u8>> {
877 wal.iter()
878 .unwrap()
879 .map(|r| r.unwrap().into_data())
880 .collect()
881 }
882
883 fn corrupt_byte(store: &MemStore, offset: u64) {
884 let mut byte = [0u8; 1];
885 store.read_at(offset, &mut byte).unwrap();
886 byte[0] ^= 0xFF;
887 store.write_at(offset, &byte).unwrap();
888 }
889
890 #[test]
891 fn test_stop_at_first_error_stops_at_corruption() {
892 let wal = Wal::with_store(MemStore::new()).unwrap(); // default policy
893 wal.append(b"first").unwrap();
894 let second = wal.append(b"second").unwrap();
895 wal.append(b"third").unwrap();
896 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
897
898 let items: Vec<_> = wal.iter().unwrap().collect();
899 assert_eq!(items.len(), 2); // first ok, second damaged, then stop
900 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
901 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
902 }
903
904 #[test]
905 fn test_skip_bad_records_continues_past_corruption() {
906 let config = WalConfig::new().with_recovery_policy(RecoveryPolicy::SkipBadRecords);
907 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
908 wal.append(b"first").unwrap();
909 let second = wal.append(b"second").unwrap();
910 wal.append(b"third").unwrap();
911 // Corrupt the payload only; the length prefix stays intact, so the
912 // record is skippable.
913 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
914
915 let items: Vec<_> = wal.iter().unwrap().collect();
916 assert_eq!(items.len(), 3);
917 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
918 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
919 assert_eq!(items[2].as_ref().unwrap().data(), b"third");
920 }
921
922 #[test]
923 fn test_skip_bad_records_still_stops_on_unreadable_length() {
924 let config = WalConfig::new()
925 .with_max_record_size(16)
926 .with_recovery_policy(RecoveryPolicy::SkipBadRecords);
927 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
928 wal.append(b"ok").unwrap();
929 let second = wal.append(b"victim").unwrap();
930 // Corrupt the length field to an implausible value: the next record's
931 // position becomes unknowable, so even skip-mode must stop.
932 corrupt_byte(wal.store(), second.get() + 4); // LEN_OFFSET within the header
933
934 let items: Vec<_> = wal.iter().unwrap().collect();
935 assert_eq!(items.len(), 2); // first ok, then a damaged stop
936 assert_eq!(items[0].as_ref().unwrap().data(), b"ok");
937 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
938 }
939
940 #[cfg(feature = "pack-io")]
941 #[test]
942 fn test_typed_record_roundtrip() {
943 use pack_io::{Deserialize, Serialize};
944
945 #[derive(Serialize, Deserialize, PartialEq, Debug)]
946 struct Entry {
947 id: u64,
948 label: String,
949 }
950
951 let wal = Wal::with_store(MemStore::new()).unwrap();
952 wal.append_typed(&Entry {
953 id: 9,
954 label: "nine".into(),
955 })
956 .unwrap();
957 wal.append_typed(&Entry {
958 id: 10,
959 label: "ten".into(),
960 })
961 .unwrap();
962
963 let decoded: Vec<Entry> = wal
964 .iter()
965 .unwrap()
966 .map(|r| r.unwrap().decode().unwrap())
967 .collect();
968 assert_eq!(
969 decoded[0],
970 Entry {
971 id: 9,
972 label: "nine".into()
973 }
974 );
975 assert_eq!(
976 decoded[1],
977 Entry {
978 id: 10,
979 label: "ten".into()
980 }
981 );
982 }
983
984 #[cfg(feature = "pack-io")]
985 #[test]
986 fn test_typed_decode_wrong_type_errors() {
987 use pack_io::{Deserialize, Serialize};
988
989 #[derive(Serialize)]
990 struct Big {
991 a: u64,
992 b: u64,
993 c: u64,
994 }
995 #[derive(Deserialize)]
996 struct Small {
997 _a: u8,
998 }
999
1000 let wal = Wal::with_store(MemStore::new()).unwrap();
1001 wal.append_typed(&Big { a: 1, b: 2, c: 3 }).unwrap();
1002 let record = wal.iter().unwrap().next().unwrap().unwrap();
1003 // Decoding 24 bytes as a 1-byte type leaves trailing bytes -> error.
1004 let result: Result<Small> = record.decode();
1005 assert!(matches!(result, Err(WalError::Encoding { .. })));
1006 }
1007
1008 #[test]
1009 fn test_append_assigns_byte_offset_lsns() {
1010 let wal = Wal::with_store(MemStore::new()).unwrap();
1011 let a = wal.append(b"abc").unwrap(); // 8 header + 3 = 11 bytes
1012 let b = wal.append(b"de").unwrap();
1013 assert_eq!(a.get(), 0);
1014 assert_eq!(b.get(), 11);
1015 }
1016
1017 #[test]
1018 fn test_iter_reads_back_all_records_in_order() {
1019 let wal = Wal::with_store(MemStore::new()).unwrap();
1020 wal.append(b"one").unwrap();
1021 wal.append(b"two").unwrap();
1022 wal.append(b"three").unwrap();
1023 assert_eq!(
1024 drain(&wal),
1025 vec![b"one".to_vec(), b"two".to_vec(), b"three".to_vec()]
1026 );
1027 }
1028
1029 #[test]
1030 fn test_empty_log_iterates_to_nothing() {
1031 let wal = Wal::with_store(MemStore::new()).unwrap();
1032 assert!(wal.is_empty());
1033 assert_eq!(drain(&wal).len(), 0);
1034 }
1035
1036 #[test]
1037 fn test_empty_record_roundtrips() {
1038 let wal = Wal::with_store(MemStore::new()).unwrap();
1039 wal.append(b"").unwrap();
1040 assert_eq!(drain(&wal), vec![Vec::<u8>::new()]);
1041 }
1042
1043 #[test]
1044 fn test_record_too_large_is_rejected() {
1045 let config = WalConfig::new().with_max_record_size(4);
1046 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1047 wal.append(b"ok").unwrap();
1048 let err = wal.append(b"too long").unwrap_err();
1049 assert!(matches!(err, WalError::RecordTooLarge { len: 8, max: 4 }));
1050 // The rejected append did not advance the log.
1051 assert_eq!(drain(&wal), vec![b"ok".to_vec()]);
1052 }
1053
1054 #[test]
1055 fn test_reopen_recovers_records() {
1056 let wal = Wal::with_store(MemStore::new()).unwrap();
1057 wal.append(b"first").unwrap();
1058 wal.append(b"second").unwrap();
1059 wal.sync().unwrap();
1060 let image = wal.store().snapshot();
1061
1062 let reopened = Wal::with_store(MemStore::from_bytes(image)).unwrap();
1063 assert_eq!(
1064 drain(&reopened),
1065 vec![b"first".to_vec(), b"second".to_vec()]
1066 );
1067 // The next append continues at the recovered end: two records of
1068 // (8 + 5) and (8 + 6) bytes leave the tail at 27.
1069 assert_eq!(reopened.append(b"third").unwrap().get(), 27);
1070 }
1071
1072 #[test]
1073 fn test_recovery_truncates_torn_tail() {
1074 let wal = Wal::with_store(MemStore::new()).unwrap();
1075 wal.append(b"good record").unwrap();
1076 let clean_len = wal.len();
1077 // Append raw garbage directly to the store: a torn tail.
1078 wal.store().write_at(clean_len, &[0xAB; 5]).unwrap();
1079
1080 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1081 assert_eq!(drain(&reopened), vec![b"good record".to_vec()]);
1082 assert_eq!(reopened.len(), clean_len);
1083 }
1084
1085 #[test]
1086 fn test_corrupt_record_surfaces_error_then_stops() {
1087 let wal = Wal::with_store(MemStore::new()).unwrap();
1088 wal.append(b"intact").unwrap();
1089 let second = wal.append(b"victim").unwrap();
1090 // Flip a byte inside the second record's payload (offset + header).
1091 let payload_offset = second.get() + HEADER_LEN as u64;
1092 let mut byte = [0u8; 1];
1093 wal.store().read_at(payload_offset, &mut byte).unwrap();
1094 byte[0] ^= 0xFF;
1095 wal.store().write_at(payload_offset, &byte).unwrap();
1096
1097 let mut iter = wal.iter().unwrap();
1098 assert_eq!(iter.next().unwrap().unwrap().data(), b"intact");
1099 assert!(matches!(
1100 iter.next().unwrap(),
1101 Err(WalError::Corruption { .. })
1102 ));
1103 assert!(iter.next().is_none());
1104 }
1105
1106 #[test]
1107 fn test_append_and_sync_is_durable() {
1108 let wal = Wal::with_store(MemStore::new()).unwrap();
1109 wal.append_and_sync(b"committed").unwrap();
1110 assert_eq!(drain(&wal), vec![b"committed".to_vec()]);
1111 }
1112
1113 #[test]
1114 fn test_iter_from_seeks_to_lsn() {
1115 let wal = Wal::with_store(MemStore::new()).unwrap();
1116 wal.append(b"a").unwrap();
1117 let b = wal.append(b"b").unwrap();
1118 wal.append(b"c").unwrap();
1119
1120 let got: Vec<Vec<u8>> = wal
1121 .iter_from(b)
1122 .unwrap()
1123 .map(|r| r.unwrap().into_data())
1124 .collect();
1125 assert_eq!(got, vec![b"b".to_vec(), b"c".to_vec()]);
1126 }
1127
1128 #[test]
1129 fn test_iter_from_past_end_is_empty() {
1130 let wal = Wal::with_store(MemStore::new()).unwrap();
1131 wal.append(b"a").unwrap();
1132 assert_eq!(wal.iter_from(Lsn::new(9_999)).unwrap().count(), 0);
1133 }
1134
1135 #[test]
1136 fn test_truncate_after_drops_later_records() {
1137 let wal = Wal::with_store(MemStore::new()).unwrap();
1138 wal.append(b"first").unwrap(); // [0, 13)
1139 let keep = wal.append(b"second").unwrap(); // [13, 27)
1140 wal.append(b"third").unwrap();
1141 wal.append(b"fourth").unwrap();
1142
1143 wal.truncate_after(keep).unwrap();
1144 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1145 assert_eq!(wal.len(), 27);
1146
1147 // Appends resume immediately after the kept record.
1148 assert_eq!(wal.append(b"new").unwrap().get(), 27);
1149 assert_eq!(
1150 drain(&wal),
1151 vec![b"first".to_vec(), b"second".to_vec(), b"new".to_vec()]
1152 );
1153 }
1154
1155 #[test]
1156 fn test_truncate_after_keeping_last_record_is_a_no_op() {
1157 let wal = Wal::with_store(MemStore::new()).unwrap();
1158 wal.append(b"first").unwrap();
1159 let last = wal.append(b"second").unwrap();
1160 let before = wal.len();
1161
1162 wal.truncate_after(last).unwrap();
1163 assert_eq!(wal.len(), before);
1164 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1165 }
1166
1167 #[test]
1168 fn test_truncate_after_invalid_lsn_errors() {
1169 let config = WalConfig::new().with_max_record_size(64);
1170 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1171 wal.append(b"only record").unwrap();
1172 // An LSN that does not land on a record boundary is rejected.
1173 let err = wal.truncate_after(Lsn::new(3)).unwrap_err();
1174 assert!(matches!(err, WalError::Corruption { .. }));
1175 }
1176
1177 #[test]
1178 fn test_concurrent_appends_no_overlap_all_recovered() {
1179 const THREADS: usize = 8;
1180 const PER_THREAD: usize = 200;
1181
1182 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1183 let mut handles = Vec::new();
1184 for t in 0..THREADS {
1185 let wal = Arc::clone(&wal);
1186 handles.push(thread::spawn(move || {
1187 let mut lsns = Vec::with_capacity(PER_THREAD);
1188 for i in 0..PER_THREAD {
1189 let payload = format!("t{t}-r{i}").into_bytes();
1190 lsns.push(wal.append(&payload).unwrap().get());
1191 }
1192 lsns
1193 }));
1194 }
1195 let mut all_lsns = Vec::new();
1196 for h in handles {
1197 all_lsns.extend(h.join().unwrap());
1198 }
1199 wal.sync().unwrap();
1200
1201 // Every LSN is distinct (no two records shared a byte range).
1202 let mut sorted = all_lsns.clone();
1203 sorted.sort_unstable();
1204 sorted.dedup();
1205 assert_eq!(sorted.len(), THREADS * PER_THREAD);
1206
1207 // Recovery reads back exactly the records that were appended, in offset
1208 // order, with no gaps or corruption.
1209 let records = drain(&wal);
1210 assert_eq!(records.len(), THREADS * PER_THREAD);
1211
1212 // Reopening from the raw image recovers the same set.
1213 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1214 assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
1215 }
1216
1217 #[test]
1218 fn test_concurrent_append_and_sync_all_durable() {
1219 const THREADS: usize = 8;
1220
1221 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1222 let mut handles = Vec::new();
1223 for t in 0..THREADS {
1224 let wal = Arc::clone(&wal);
1225 handles.push(thread::spawn(move || {
1226 for i in 0..50 {
1227 wal.append_and_sync(format!("{t}:{i}").as_bytes()).unwrap();
1228 }
1229 }));
1230 }
1231 for h in handles {
1232 h.join().unwrap();
1233 }
1234 assert_eq!(drain(&wal).len(), THREADS * 50);
1235 }
1236}