Skip to main content

wal_db/
wal.rs

1//! The log itself: [`Wal`], its recovery iterator [`WalIter`], and the
2//! [`Record`] iteration yields.
3
4use std::{fmt, io, sync::atomic::Ordering};
5
6#[cfg(not(loom))]
7use std::{cell::RefCell, path::Path};
8
9use crate::{
10    commit::Commit,
11    config::{RecoveryPolicy, WalConfig},
12    error::{Result, WalError},
13    lsn::Lsn,
14    record::{self, HEADER_LEN},
15    store::{FileStore, WalStore},
16    sync::AtomicU64,
17};
18
19/// A cache-line-aligned wrapper, used to keep the heavily-written reservation
20/// counter off the same cache line as the rest of the log's fields so appenders
21/// hammering it do not invalidate readers' caches (false sharing).
22#[repr(align(64))]
23#[derive(Debug)]
24struct CacheAligned<T>(T);
25
26/// A durable, append-only log.
27///
28/// `Wal` is the entry point. The four calls that cover almost every use are
29/// [`open`](Wal::open), [`append`](Wal::append), [`sync`](Wal::sync), and
30/// [`iter`](Wal::iter). The type parameter `S` is the storage backend and
31/// defaults to [`FileStore`], so the plain name `Wal` is the file-backed log;
32/// custom backends are supplied through [`with_store`](Wal::with_store).
33///
34/// A `Wal` is [`Send`] and [`Sync`], and the append path is built for it: many
35/// threads can call [`append`](Wal::append) at once with no global lock. Share
36/// one behind an [`Arc`](std::sync::Arc) and write from every thread.
37///
38/// # Concurrency and durability
39///
40/// Appends are lock-free. Each one reserves its byte range with a single atomic
41/// step — the range's start offset *is* the record's [`Lsn`] — frames the record
42/// into a reused thread-local buffer, and writes it, all without blocking other
43/// appenders. [`sync`](Wal::sync) is the durability barrier; when several
44/// threads sync at once they coalesce into a single fsync (group commit), so the
45/// cost of making data durable is amortised across everyone committing together.
46///
47/// `append` returns once the record is in the OS page cache; `sync` returns once
48/// it is on stable storage. See the [crate docs](crate) for the full contract.
49///
50/// # Examples
51///
52/// ```
53/// use wal_db::Wal;
54///
55/// # fn main() -> Result<(), wal_db::WalError> {
56/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
57/// # let path = dir.path().join("log.wal");
58/// let wal = Wal::open(&path)?;
59/// let first = wal.append(b"first")?;
60/// let second = wal.append(b"second")?;
61/// wal.sync()?;
62///
63/// // LSNs are byte offsets: the first record starts at 0, the second after it.
64/// assert_eq!(first.get(), 0);
65/// assert!(second.get() > first.get());
66///
67/// let read_back: Vec<Vec<u8>> = wal
68///     .iter()?
69///     .map(|entry| entry.map(|record| record.into_data()))
70///     .collect::<Result<_, _>>()?;
71/// assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);
72/// # Ok(())
73/// # }
74/// ```
75pub struct Wal<S = FileStore> {
76    /// Next byte offset to reserve. Hammered by every appender, so kept on its
77    /// own cache line.
78    tail: CacheAligned<AtomicU64>,
79    store: S,
80    max_record_size: u32,
81    recovery_policy: RecoveryPolicy,
82    commit: Commit,
83}
84
85#[cfg(not(loom))]
86impl Wal<FileStore> {
87    /// Open the log at `path`, creating it if it does not exist.
88    ///
89    /// On open the log scans its contents, stops at the first record that is
90    /// incomplete or fails its checksum, and truncates that torn tail so the
91    /// next append lands on a clean boundary. The common cause of a torn tail is
92    /// a crash partway through an earlier append; that record was never
93    /// acknowledged durable, so discarding it loses nothing the caller was
94    /// promised.
95    ///
96    /// # Errors
97    ///
98    /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
99    ///
100    /// # Examples
101    ///
102    /// ```
103    /// use wal_db::Wal;
104    /// # fn main() -> Result<(), wal_db::WalError> {
105    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
106    /// # let path = dir.path().join("log.wal");
107    /// let wal = Wal::open(&path)?;
108    /// wal.append(b"hello")?;
109    /// wal.sync()?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
114        Self::open_with(path, WalConfig::new())
115    }
116
117    /// Open the log at `path` with an explicit [`WalConfig`].
118    ///
119    /// # Errors
120    ///
121    /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use wal_db::{Wal, WalConfig};
127    /// # fn main() -> Result<(), wal_db::WalError> {
128    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
129    /// # let path = dir.path().join("log.wal");
130    /// let config = WalConfig::new().with_max_record_size(1024);
131    /// let wal = Wal::open_with(&path, config)?;
132    /// # let _ = wal;
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
137        let store = FileStore::open(path)?;
138        Self::with_store_and_config(store, config)
139    }
140}
141
142#[cfg(not(loom))]
143impl Wal<crate::segment::SegmentedStore> {
144    /// Open a segmented log in directory `dir`, with segments of `segment_size`
145    /// bytes, creating the directory if needed.
146    ///
147    /// The log is one continuous byte stream striped across fixed-size files, so
148    /// it behaves exactly like a single-file log — records span segment
149    /// boundaries freely — while keeping each file bounded for recovery and
150    /// archival. Records larger than a segment simply occupy several.
151    ///
152    /// # Errors
153    ///
154    /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
155    /// be opened or scanned.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use wal_db::Wal;
161    /// # fn main() -> Result<(), wal_db::WalError> {
162    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
163    /// let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
164    /// wal.append(b"record")?;
165    /// wal.sync()?;
166    /// # Ok(())
167    /// # }
168    /// ```
169    pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
170        Self::open_segmented_with(dir, segment_size, WalConfig::new())
171    }
172
173    /// Open a segmented log with an explicit [`WalConfig`].
174    ///
175    /// Like [`open_segmented`](Wal::open_segmented), but applies `config` (for
176    /// example a tighter [`max_record_size`](WalConfig::max_record_size)).
177    ///
178    /// # Errors
179    ///
180    /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
181    /// be opened or scanned.
182    pub fn open_segmented_with(
183        dir: impl AsRef<Path>,
184        segment_size: u64,
185        config: WalConfig,
186    ) -> Result<Self> {
187        let store = crate::segment::SegmentedStore::open(dir, segment_size)?;
188        Self::with_store_and_config(store, config)
189    }
190}
191
192impl<S: WalStore> Wal<S> {
193    /// Build a log over a custom [`WalStore`], using the default configuration.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if scanning the existing contents of the store fails.
198    ///
199    /// # Examples
200    ///
201    /// ```
202    /// use wal_db::{MemStore, Wal};
203    /// # fn main() -> Result<(), wal_db::WalError> {
204    /// let wal = Wal::with_store(MemStore::new())?;
205    /// wal.append(b"record")?;
206    /// # Ok(())
207    /// # }
208    /// ```
209    pub fn with_store(store: S) -> Result<Self> {
210        Self::with_store_and_config(store, WalConfig::new())
211    }
212
213    /// Build a log over a custom [`WalStore`] with an explicit [`WalConfig`].
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if scanning the existing contents of the store fails.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// use wal_db::{MemStore, Wal, WalConfig};
223    /// # fn main() -> Result<(), wal_db::WalError> {
224    /// let config = WalConfig::new().with_max_record_size(64);
225    /// let wal = Wal::with_store_and_config(MemStore::new(), config)?;
226    /// # let _ = wal;
227    /// # Ok(())
228    /// # }
229    /// ```
230    pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self> {
231        let recovered = recover(&store, config.max_record_size())?;
232        Ok(Wal {
233            tail: CacheAligned(AtomicU64::new(recovered)),
234            store,
235            max_record_size: config.max_record_size(),
236            recovery_policy: config.recovery_policy(),
237            commit: Commit::new(recovered),
238        })
239    }
240
241    /// Append `record` to the log and return the [`Lsn`] it was assigned — the
242    /// byte offset where the record begins.
243    ///
244    /// Lock-free: the byte range is reserved with one atomic step and the record
245    /// is written without blocking other appenders. Returns once the bytes are
246    /// in the operating system's page cache. It does **not** flush the disk —
247    /// call [`sync`](Wal::sync) for that. A crash between `append` and `sync` may
248    /// lose the record.
249    ///
250    /// # Errors
251    ///
252    /// - [`WalError::RecordTooLarge`] if `record` is larger than the configured
253    ///   [`max_record_size`](WalConfig::max_record_size). The log is unchanged.
254    /// - [`WalError::Io`] if the write fails. The reserved range becomes a
255    ///   permanent gap: the log is durable only up to that point, recovery stops
256    ///   there, and later syncs covering it report the truncation.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use wal_db::{MemStore, Wal};
262    /// # fn main() -> Result<(), wal_db::WalError> {
263    /// let wal = Wal::with_store(MemStore::new())?;
264    /// let lsn = wal.append(b"some bytes")?;
265    /// assert_eq!(lsn.get(), 0);
266    /// # Ok(())
267    /// # }
268    /// ```
269    pub fn append(&self, record: &[u8]) -> Result<Lsn> {
270        let payload_len = record.len();
271        if payload_len > self.max_record_size as usize {
272            return Err(WalError::RecordTooLarge {
273                len: payload_len,
274                max: self.max_record_size,
275            });
276        }
277        let frame_len = record::framed_len(payload_len) as u64;
278
279        // Reserve the byte range. The returned start offset is the LSN, and
280        // because it comes from a single atomic it is unique and ordered.
281        let start = self.tail.0.fetch_add(frame_len, Ordering::Relaxed);
282        let end = match start.checked_add(frame_len) {
283            Some(end) => end,
284            None => {
285                self.commit.mark_failed(start);
286                return Err(WalError::io(
287                    "reserving a record offset",
288                    io::Error::other("log size exceeds u64"),
289                ));
290            }
291        };
292
293        match self.frame_and_write(start, record) {
294            Ok(()) => {
295                self.commit.mark_written(start, end);
296                Ok(Lsn::new(start))
297            }
298            Err(error) => {
299                self.commit.mark_failed(start);
300                Err(error)
301            }
302        }
303    }
304
305    /// Make every record appended before this call durable.
306    ///
307    /// Returns once the data is on stable storage, using the platform's true
308    /// durability barrier. Concurrent calls coalesce into a single fsync, so the
309    /// flush cost is shared by everyone committing at the same time.
310    ///
311    /// # Errors
312    ///
313    /// Returns [`WalError::Io`] if the flush fails, or [`WalError::Corruption`]
314    /// if an earlier append's write failed and left a gap that cannot be made
315    /// durable. A failed sync means the records are not durable; treat it as
316    /// fatal, not as something to retry blindly.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use wal_db::Wal;
322    /// # fn main() -> Result<(), wal_db::WalError> {
323    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
324    /// # let path = dir.path().join("log.wal");
325    /// let wal = Wal::open(&path)?;
326    /// wal.append(b"durable me")?;
327    /// wal.sync()?; // now on stable storage
328    /// # Ok(())
329    /// # }
330    /// ```
331    pub fn sync(&self) -> Result<()> {
332        let target = self.tail.0.load(Ordering::Acquire);
333        if target == 0 {
334            return Ok(());
335        }
336        self.commit.sync_to(&self.store, target)
337    }
338
339    /// Append `record` and make it durable in one call, returning its [`Lsn`].
340    ///
341    /// Equivalent to [`append`](Wal::append) followed by a [`sync`](Wal::sync)
342    /// scoped to this record, but with the sync coalesced into the group commit
343    /// of any other threads syncing at the same moment. Use it when every record
344    /// must be durable before you proceed and you want the group-commit
345    /// throughput without managing the two calls yourself.
346    ///
347    /// # Errors
348    ///
349    /// The union of [`append`](Wal::append)'s and [`sync`](Wal::sync)'s errors.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// use wal_db::Wal;
355    /// # fn main() -> Result<(), wal_db::WalError> {
356    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
357    /// # let path = dir.path().join("log.wal");
358    /// let wal = Wal::open(&path)?;
359    /// let lsn = wal.append_and_sync(b"committed immediately")?;
360    /// # let _ = lsn;
361    /// # Ok(())
362    /// # }
363    /// ```
364    pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn> {
365        let lsn = self.append(record)?;
366        let end = lsn.get() + record::framed_len(record.len()) as u64;
367        self.commit.sync_to(&self.store, end)?;
368        Ok(lsn)
369    }
370
371    /// Serialise `value` with `pack-io` and append it, returning its [`Lsn`].
372    ///
373    /// The typed counterpart to [`append`](Wal::append): the value is encoded to
374    /// bytes and appended as one record, which [`Record::decode`] reads back.
375    /// Available with the `pack-io` feature. Like `append`, it does not sync.
376    ///
377    /// # Errors
378    ///
379    /// - [`WalError::Encoding`] if the value fails to serialise.
380    /// - Otherwise the errors of [`append`](Wal::append) ([`WalError::RecordTooLarge`],
381    ///   [`WalError::Io`]).
382    ///
383    /// # Examples
384    ///
385    /// ```
386    /// use wal_db::{MemStore, Wal};
387    /// use wal_db::pack_io::{Deserialize, Serialize};
388    ///
389    /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
390    /// struct Entry {
391    ///     key: String,
392    ///     value: u64,
393    /// }
394    ///
395    /// # fn main() -> Result<(), wal_db::WalError> {
396    /// let wal = Wal::with_store(MemStore::new())?;
397    /// wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
398    ///
399    /// let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
400    /// assert_eq!(entry.value, 100);
401    /// # Ok(())
402    /// # }
403    /// ```
404    #[cfg(feature = "pack-io")]
405    pub fn append_typed<T: pack_io::Serialize + ?Sized>(&self, value: &T) -> Result<Lsn> {
406        let bytes = pack_io::encode(value).map_err(WalError::encoding)?;
407        self.append(&bytes)
408    }
409
410    /// Iterate the log from the beginning, yielding each record in append order.
411    ///
412    /// The iterator walks the records that are fully written at the moment it is
413    /// created — it does not see records still being written by other threads, or
414    /// appended afterwards. Each item is a [`Result`]: a damaged record yields a
415    /// single [`WalError::Corruption`] and then the iterator stops. In a log
416    /// opened normally the torn tail has already been truncated, so iteration
417    /// runs cleanly to the end.
418    ///
419    /// # Examples
420    ///
421    /// ```
422    /// use wal_db::{MemStore, Wal};
423    /// # fn main() -> Result<(), wal_db::WalError> {
424    /// let wal = Wal::with_store(MemStore::new())?;
425    /// wal.append(b"one")?;
426    /// wal.append(b"two")?;
427    ///
428    /// let mut seen = Vec::new();
429    /// for entry in wal.iter()? {
430    ///     seen.push(entry?.into_data());
431    /// }
432    /// assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);
433    /// # Ok(())
434    /// # }
435    /// ```
436    pub fn iter(&self) -> Result<WalIter<'_, S>> {
437        let end = self.commit.committed();
438        Ok(WalIter {
439            wal: self,
440            offset: 0,
441            end,
442            done: false,
443            policy: self.recovery_policy,
444        })
445    }
446
447    /// Iterate from `from` (a record's [`Lsn`]) to the end, skipping the records
448    /// before it.
449    ///
450    /// Because an LSN is a byte offset, seeking is O(1): iteration simply starts
451    /// at `from` instead of 0. Pass an [`Lsn`] that a previous
452    /// [`append`](Wal::append) or [`iter`](Wal::iter) produced — a real record
453    /// boundary. An `Lsn` that does not land on a record boundary will be read as
454    /// a malformed record and surface as [`WalError::Corruption`]; an `Lsn` past
455    /// the end yields an empty iterator.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// use wal_db::{MemStore, Wal};
461    /// # fn main() -> Result<(), wal_db::WalError> {
462    /// let wal = Wal::with_store(MemStore::new())?;
463    /// wal.append(b"one")?;
464    /// let second = wal.append(b"two")?;
465    /// wal.append(b"three")?;
466    ///
467    /// let from_second: Vec<Vec<u8>> = wal
468    ///     .iter_from(second)?
469    ///     .map(|entry| entry.map(|r| r.into_data()))
470    ///     .collect::<Result<_, _>>()?;
471    /// assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);
472    /// # Ok(())
473    /// # }
474    /// ```
475    pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>> {
476        let end = self.commit.committed();
477        Ok(WalIter {
478            wal: self,
479            offset: from.get().min(end),
480            end,
481            done: false,
482            policy: self.recovery_policy,
483        })
484    }
485
486    /// Drop every record after the one at `lsn`, keeping the log up to and
487    /// including it. For compaction.
488    ///
489    /// The record at `lsn` becomes the new last record; the next append lands
490    /// right after it. The truncation is made durable before returning. `lsn`
491    /// must be a real record boundary from a previous [`append`](Wal::append) or
492    /// [`iter`](Wal::iter), and the record there must be intact.
493    ///
494    /// # Exclusive access
495    ///
496    /// This mutates the log's end, so it must **not** run concurrently with
497    /// [`append`](Wal::append), [`sync`](Wal::sync), or another `truncate_after`.
498    /// The caller is responsible for quiescing writers first — the usual case for
499    /// compaction, where the engine pauses the log, truncates, and resumes.
500    ///
501    /// # Errors
502    ///
503    /// - [`WalError::Corruption`] if `lsn` does not point at an intact record.
504    /// - [`WalError::Io`] if the truncation or its sync fails.
505    ///
506    /// # Examples
507    ///
508    /// ```
509    /// use wal_db::{MemStore, Wal};
510    /// # fn main() -> Result<(), wal_db::WalError> {
511    /// let wal = Wal::with_store(MemStore::new())?;
512    /// wal.append(b"keep me")?;
513    /// let last_kept = wal.append(b"and me")?;
514    /// wal.append(b"drop me")?;
515    ///
516    /// wal.truncate_after(last_kept)?;
517    ///
518    /// let remaining: Vec<Vec<u8>> = wal
519    ///     .iter()?
520    ///     .map(|entry| entry.map(|r| r.into_data()))
521    ///     .collect::<Result<_, _>>()?;
522    /// assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);
523    /// # Ok(())
524    /// # }
525    /// ```
526    pub fn truncate_after(&self, lsn: Lsn) -> Result<()> {
527        let start = lsn.get();
528
529        // Confirm an intact record really lives at `lsn` before keeping it.
530        let mut header = [0u8; HEADER_LEN];
531        if self.store.read_at(start, &mut header)? < HEADER_LEN {
532            return Err(WalError::corruption(start, "no record at this LSN"));
533        }
534        let parsed = record::parse_header(&header);
535        if parsed.len > self.max_record_size {
536            return Err(WalError::corruption(start, "no valid record at this LSN"));
537        }
538        let payload_start = start
539            .checked_add(HEADER_LEN as u64)
540            .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
541        let mut payload = vec![0u8; parsed.len as usize];
542        if self.store.read_at(payload_start, &mut payload)? < payload.len() {
543            return Err(WalError::corruption(start, "incomplete record at this LSN"));
544        }
545        if !record::verify(&header, &payload, parsed.crc) {
546            return Err(WalError::corruption(start, "no valid record at this LSN"));
547        }
548        let new_end = payload_start
549            .checked_add(u64::from(parsed.len))
550            .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
551
552        self.store.truncate(new_end)?;
553        self.store.sync()?;
554        self.tail.0.store(new_end, Ordering::Release);
555        self.commit.reset(new_end);
556        Ok(())
557    }
558
559    /// The logical size of the log in bytes, including record framing.
560    ///
561    /// This is the offset at which the next append will land. It counts bytes
562    /// that have been reserved, which under heavy concurrency may include a
563    /// record another thread is still writing.
564    #[must_use]
565    pub fn len(&self) -> u64 {
566        self.tail.0.load(Ordering::Acquire)
567    }
568
569    /// Whether the log holds no records.
570    #[must_use]
571    pub fn is_empty(&self) -> bool {
572        self.len() == 0
573    }
574
575    /// Frame `record` into a reused buffer and write it at `start`.
576    fn frame_and_write(&self, start: u64, record: &[u8]) -> Result<()> {
577        with_frame_buffer(|buf| {
578            record::encode(buf, record);
579            self.store.write_at(start, buf)
580        })
581    }
582
583    /// Crate-internal access to the backing store, for tests that need to read,
584    /// corrupt, or extend the on-disk image directly.
585    #[cfg(test)]
586    pub(crate) fn store(&self) -> &S {
587        &self.store
588    }
589}
590
591/// Frame a record using a reused thread-local buffer, so steady-state appends do
592/// not allocate. Under loom a fresh buffer is used, since the model checker does
593/// not need (and does not instrument) the thread-local.
594#[cfg(not(loom))]
595fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
596    thread_local! {
597        static FRAME: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
598    }
599    FRAME.with(|cell| f(&mut cell.borrow_mut()))
600}
601
602#[cfg(loom)]
603fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
604    let mut buf = Vec::new();
605    f(&mut buf)
606}
607
608/// Scan a store from the start, returning the end offset of the last intact
609/// record and truncating any torn tail beyond it.
610fn recover<S: WalStore>(store: &S, max_record_size: u32) -> Result<u64> {
611    let physical = store.len()?;
612    let mut offset: u64 = 0;
613    let mut header = [0u8; HEADER_LEN];
614
615    while offset < physical {
616        if store.read_at(offset, &mut header)? < HEADER_LEN {
617            break; // incomplete header: torn tail
618        }
619        let parsed = record::parse_header(&header);
620        if parsed.len > max_record_size {
621            break; // implausible length: treat the rest as a torn tail
622        }
623
624        let payload_start = match offset.checked_add(HEADER_LEN as u64) {
625            Some(start) => start,
626            None => break,
627        };
628        let mut payload = vec![0u8; parsed.len as usize];
629        if store.read_at(payload_start, &mut payload)? < payload.len() {
630            break; // incomplete payload: torn tail
631        }
632        if !record::verify(&header, &payload, parsed.crc) {
633            break; // checksum mismatch: stop here
634        }
635
636        offset = match payload_start.checked_add(u64::from(parsed.len)) {
637            Some(end) => end,
638            None => break,
639        };
640    }
641
642    if offset < physical {
643        store.truncate(offset)?;
644    }
645    Ok(offset)
646}
647
648impl<S: WalStore> fmt::Debug for Wal<S> {
649    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650        f.debug_struct("Wal")
651            .field("len", &self.tail.0.load(Ordering::Relaxed))
652            .finish_non_exhaustive()
653    }
654}
655
656/// One record read back during iteration: its [`Lsn`] and its payload bytes.
657///
658/// Yielded by [`Wal::iter`]. The payload is owned (a fresh `Vec` per record);
659/// take it without copying via [`into_data`](Record::into_data), or borrow it
660/// via [`data`](Record::data).
661#[derive(Debug, Clone, PartialEq, Eq)]
662pub struct Record {
663    lsn: Lsn,
664    data: Vec<u8>,
665}
666
667impl Record {
668    /// The sequence number this record was assigned — its byte offset in the log.
669    pub fn lsn(&self) -> Lsn {
670        self.lsn
671    }
672
673    /// The record's payload bytes.
674    #[must_use]
675    pub fn data(&self) -> &[u8] {
676        &self.data
677    }
678
679    /// The payload length in bytes.
680    #[must_use]
681    pub fn len(&self) -> usize {
682        self.data.len()
683    }
684
685    /// Whether the record's payload is empty.
686    #[must_use]
687    pub fn is_empty(&self) -> bool {
688        self.data.is_empty()
689    }
690
691    /// Consume the record and take ownership of its payload without copying.
692    #[must_use]
693    pub fn into_data(self) -> Vec<u8> {
694        self.data
695    }
696
697    /// Decode the record's payload into a typed value via `pack-io`.
698    ///
699    /// The mirror of [`Wal::append_typed`]. Available with the `pack-io` feature.
700    ///
701    /// # Errors
702    ///
703    /// Returns [`WalError::Encoding`] if the bytes do not deserialise into `T` —
704    /// for example reading a record written as a different type.
705    ///
706    /// # Examples
707    ///
708    /// ```
709    /// use wal_db::{MemStore, Wal};
710    /// use wal_db::pack_io::{Deserialize, Serialize};
711    ///
712    /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
713    /// struct Event {
714    ///     id: u64,
715    ///     name: String,
716    /// }
717    ///
718    /// # fn main() -> Result<(), wal_db::WalError> {
719    /// let wal = Wal::with_store(MemStore::new())?;
720    /// wal.append_typed(&Event { id: 7, name: "boot".into() })?;
721    ///
722    /// let record = wal.iter()?.next().unwrap()?;
723    /// let event: Event = record.decode()?;
724    /// assert_eq!(event, Event { id: 7, name: "boot".into() });
725    /// # Ok(())
726    /// # }
727    /// ```
728    #[cfg(feature = "pack-io")]
729    pub fn decode<T: pack_io::Deserialize>(&self) -> Result<T> {
730        pack_io::decode(&self.data).map_err(WalError::encoding)
731    }
732}
733
734/// The outcome of reading one record-sized chunk at the iterator's cursor.
735enum Step {
736    /// A valid record, plus the offset just past it.
737    Record(Record, u64),
738    /// A damaged record. `skip_to` is the offset of the next record if its
739    /// extent is known (length and payload present, only the checksum failed),
740    /// or `None` if the damage makes the next record's position unknowable.
741    Damaged(WalError, Option<u64>),
742    /// A clean end: a short read, meaning the log stops here (a torn tail).
743    End,
744}
745
746/// The iterator returned by [`Wal::iter`].
747///
748/// Walks the records fully written when it was created, yielding
749/// `Result<`[`Record`]`>`. Behaviour at a damaged record follows the configured
750/// [`RecoveryPolicy`]: by default the iterator yields the damage once and stops;
751/// under [`RecoveryPolicy::SkipBadRecords`] it yields the damage and continues
752/// past it when the next record's position is still recoverable.
753pub struct WalIter<'a, S: WalStore = FileStore> {
754    wal: &'a Wal<S>,
755    offset: u64,
756    end: u64,
757    done: bool,
758    policy: RecoveryPolicy,
759}
760
761impl<S: WalStore> WalIter<'_, S> {
762    /// Read and classify the record at the current offset, without advancing.
763    fn step(&self) -> Result<Step> {
764        let mut header = [0u8; HEADER_LEN];
765        if self.wal.store.read_at(self.offset, &mut header)? < HEADER_LEN {
766            return Ok(Step::End);
767        }
768        let parsed = record::parse_header(&header);
769        if parsed.len > self.wal.max_record_size {
770            // The length is implausible, so the next record's position is
771            // unknowable — there is nothing to skip to.
772            return Ok(Step::Damaged(
773                WalError::corruption(self.offset, "record length exceeds the maximum"),
774                None,
775            ));
776        }
777
778        let payload_start = self
779            .offset
780            .checked_add(HEADER_LEN as u64)
781            .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
782        let mut payload = vec![0u8; parsed.len as usize];
783        if self.wal.store.read_at(payload_start, &mut payload)? < payload.len() {
784            return Ok(Step::End);
785        }
786        let next = payload_start
787            .checked_add(u64::from(parsed.len))
788            .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
789
790        if !record::verify(&header, &payload, parsed.crc) {
791            // The length and payload are present, so we know where the next
792            // record starts even though this one is corrupt.
793            return Ok(Step::Damaged(
794                WalError::corruption(self.offset, "checksum mismatch"),
795                Some(next),
796            ));
797        }
798
799        Ok(Step::Record(
800            Record {
801                lsn: Lsn::new(self.offset),
802                data: payload,
803            },
804            next,
805        ))
806    }
807}
808
809impl<S: WalStore> Iterator for WalIter<'_, S> {
810    type Item = Result<Record>;
811
812    fn next(&mut self) -> Option<Self::Item> {
813        if self.done || self.offset >= self.end {
814            return None;
815        }
816        match self.step() {
817            Ok(Step::Record(record, next)) => {
818                self.offset = next;
819                Some(Ok(record))
820            }
821            // Skip-bad-records, and the next record is locatable: surface the
822            // damage but continue from past it on the next call.
823            Ok(Step::Damaged(error, Some(next)))
824                if self.policy == RecoveryPolicy::SkipBadRecords =>
825            {
826                self.offset = next;
827                Some(Err(error))
828            }
829            // Stop-at-first-error, or damage that makes the next position
830            // unknowable: surface the damage and end.
831            Ok(Step::Damaged(error, _)) => {
832                self.done = true;
833                Some(Err(error))
834            }
835            Ok(Step::End) => {
836                self.done = true;
837                None
838            }
839            Err(error) => {
840                self.done = true;
841                Some(Err(error))
842            }
843        }
844    }
845}
846
847impl<S: WalStore> fmt::Debug for WalIter<'_, S> {
848    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
849        f.debug_struct("WalIter")
850            .field("offset", &self.offset)
851            .field("end", &self.end)
852            .field("done", &self.done)
853            .finish()
854    }
855}
856
857#[cfg(all(test, not(loom)))]
858#[allow(
859    clippy::unwrap_used,
860    clippy::expect_used,
861    unused_must_use,
862    unused_results
863)]
864mod tests {
865    use std::sync::Arc;
866    use std::thread;
867
868    use super::*;
869    use crate::store::MemStore;
870
871    fn drain(wal: &Wal<MemStore>) -> Vec<Vec<u8>> {
872        wal.iter()
873            .unwrap()
874            .map(|r| r.unwrap().into_data())
875            .collect()
876    }
877
878    fn corrupt_byte(store: &MemStore, offset: u64) {
879        let mut byte = [0u8; 1];
880        store.read_at(offset, &mut byte).unwrap();
881        byte[0] ^= 0xFF;
882        store.write_at(offset, &byte).unwrap();
883    }
884
885    #[test]
886    fn test_stop_at_first_error_stops_at_corruption() {
887        let wal = Wal::with_store(MemStore::new()).unwrap(); // default policy
888        wal.append(b"first").unwrap();
889        let second = wal.append(b"second").unwrap();
890        wal.append(b"third").unwrap();
891        corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
892
893        let items: Vec<_> = wal.iter().unwrap().collect();
894        assert_eq!(items.len(), 2); // first ok, second damaged, then stop
895        assert_eq!(items[0].as_ref().unwrap().data(), b"first");
896        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
897    }
898
899    #[test]
900    fn test_skip_bad_records_continues_past_corruption() {
901        let config = WalConfig::new().with_recovery_policy(RecoveryPolicy::SkipBadRecords);
902        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
903        wal.append(b"first").unwrap();
904        let second = wal.append(b"second").unwrap();
905        wal.append(b"third").unwrap();
906        // Corrupt the payload only; the length prefix stays intact, so the
907        // record is skippable.
908        corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
909
910        let items: Vec<_> = wal.iter().unwrap().collect();
911        assert_eq!(items.len(), 3);
912        assert_eq!(items[0].as_ref().unwrap().data(), b"first");
913        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
914        assert_eq!(items[2].as_ref().unwrap().data(), b"third");
915    }
916
917    #[test]
918    fn test_skip_bad_records_still_stops_on_unreadable_length() {
919        let config = WalConfig::new()
920            .with_max_record_size(16)
921            .with_recovery_policy(RecoveryPolicy::SkipBadRecords);
922        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
923        wal.append(b"ok").unwrap();
924        let second = wal.append(b"victim").unwrap();
925        // Corrupt the length field to an implausible value: the next record's
926        // position becomes unknowable, so even skip-mode must stop.
927        corrupt_byte(wal.store(), second.get() + 4); // LEN_OFFSET within the header
928
929        let items: Vec<_> = wal.iter().unwrap().collect();
930        assert_eq!(items.len(), 2); // first ok, then a damaged stop
931        assert_eq!(items[0].as_ref().unwrap().data(), b"ok");
932        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
933    }
934
935    #[cfg(feature = "pack-io")]
936    #[test]
937    fn test_typed_record_roundtrip() {
938        use pack_io::{Deserialize, Serialize};
939
940        #[derive(Serialize, Deserialize, PartialEq, Debug)]
941        struct Entry {
942            id: u64,
943            label: String,
944        }
945
946        let wal = Wal::with_store(MemStore::new()).unwrap();
947        wal.append_typed(&Entry {
948            id: 9,
949            label: "nine".into(),
950        })
951        .unwrap();
952        wal.append_typed(&Entry {
953            id: 10,
954            label: "ten".into(),
955        })
956        .unwrap();
957
958        let decoded: Vec<Entry> = wal
959            .iter()
960            .unwrap()
961            .map(|r| r.unwrap().decode().unwrap())
962            .collect();
963        assert_eq!(
964            decoded[0],
965            Entry {
966                id: 9,
967                label: "nine".into()
968            }
969        );
970        assert_eq!(
971            decoded[1],
972            Entry {
973                id: 10,
974                label: "ten".into()
975            }
976        );
977    }
978
979    #[cfg(feature = "pack-io")]
980    #[test]
981    fn test_typed_decode_wrong_type_errors() {
982        use pack_io::{Deserialize, Serialize};
983
984        #[derive(Serialize)]
985        struct Big {
986            a: u64,
987            b: u64,
988            c: u64,
989        }
990        #[derive(Deserialize)]
991        struct Small {
992            _a: u8,
993        }
994
995        let wal = Wal::with_store(MemStore::new()).unwrap();
996        wal.append_typed(&Big { a: 1, b: 2, c: 3 }).unwrap();
997        let record = wal.iter().unwrap().next().unwrap().unwrap();
998        // Decoding 24 bytes as a 1-byte type leaves trailing bytes -> error.
999        let result: Result<Small> = record.decode();
1000        assert!(matches!(result, Err(WalError::Encoding { .. })));
1001    }
1002
1003    #[test]
1004    fn test_append_assigns_byte_offset_lsns() {
1005        let wal = Wal::with_store(MemStore::new()).unwrap();
1006        let a = wal.append(b"abc").unwrap(); // 8 header + 3 = 11 bytes
1007        let b = wal.append(b"de").unwrap();
1008        assert_eq!(a.get(), 0);
1009        assert_eq!(b.get(), 11);
1010    }
1011
1012    #[test]
1013    fn test_iter_reads_back_all_records_in_order() {
1014        let wal = Wal::with_store(MemStore::new()).unwrap();
1015        wal.append(b"one").unwrap();
1016        wal.append(b"two").unwrap();
1017        wal.append(b"three").unwrap();
1018        assert_eq!(
1019            drain(&wal),
1020            vec![b"one".to_vec(), b"two".to_vec(), b"three".to_vec()]
1021        );
1022    }
1023
1024    #[test]
1025    fn test_empty_log_iterates_to_nothing() {
1026        let wal = Wal::with_store(MemStore::new()).unwrap();
1027        assert!(wal.is_empty());
1028        assert_eq!(drain(&wal).len(), 0);
1029    }
1030
1031    #[test]
1032    fn test_empty_record_roundtrips() {
1033        let wal = Wal::with_store(MemStore::new()).unwrap();
1034        wal.append(b"").unwrap();
1035        assert_eq!(drain(&wal), vec![Vec::<u8>::new()]);
1036    }
1037
1038    #[test]
1039    fn test_record_too_large_is_rejected() {
1040        let config = WalConfig::new().with_max_record_size(4);
1041        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1042        wal.append(b"ok").unwrap();
1043        let err = wal.append(b"too long").unwrap_err();
1044        assert!(matches!(err, WalError::RecordTooLarge { len: 8, max: 4 }));
1045        // The rejected append did not advance the log.
1046        assert_eq!(drain(&wal), vec![b"ok".to_vec()]);
1047    }
1048
1049    #[test]
1050    fn test_reopen_recovers_records() {
1051        let wal = Wal::with_store(MemStore::new()).unwrap();
1052        wal.append(b"first").unwrap();
1053        wal.append(b"second").unwrap();
1054        wal.sync().unwrap();
1055        let image = wal.store().snapshot();
1056
1057        let reopened = Wal::with_store(MemStore::from_bytes(image)).unwrap();
1058        assert_eq!(
1059            drain(&reopened),
1060            vec![b"first".to_vec(), b"second".to_vec()]
1061        );
1062        // The next append continues at the recovered end: two records of
1063        // (8 + 5) and (8 + 6) bytes leave the tail at 27.
1064        assert_eq!(reopened.append(b"third").unwrap().get(), 27);
1065    }
1066
1067    #[test]
1068    fn test_recovery_truncates_torn_tail() {
1069        let wal = Wal::with_store(MemStore::new()).unwrap();
1070        wal.append(b"good record").unwrap();
1071        let clean_len = wal.len();
1072        // Append raw garbage directly to the store: a torn tail.
1073        wal.store().write_at(clean_len, &[0xAB; 5]).unwrap();
1074
1075        let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1076        assert_eq!(drain(&reopened), vec![b"good record".to_vec()]);
1077        assert_eq!(reopened.len(), clean_len);
1078    }
1079
1080    #[test]
1081    fn test_corrupt_record_surfaces_error_then_stops() {
1082        let wal = Wal::with_store(MemStore::new()).unwrap();
1083        wal.append(b"intact").unwrap();
1084        let second = wal.append(b"victim").unwrap();
1085        // Flip a byte inside the second record's payload (offset + header).
1086        let payload_offset = second.get() + HEADER_LEN as u64;
1087        let mut byte = [0u8; 1];
1088        wal.store().read_at(payload_offset, &mut byte).unwrap();
1089        byte[0] ^= 0xFF;
1090        wal.store().write_at(payload_offset, &byte).unwrap();
1091
1092        let mut iter = wal.iter().unwrap();
1093        assert_eq!(iter.next().unwrap().unwrap().data(), b"intact");
1094        assert!(matches!(
1095            iter.next().unwrap(),
1096            Err(WalError::Corruption { .. })
1097        ));
1098        assert!(iter.next().is_none());
1099    }
1100
1101    #[test]
1102    fn test_append_and_sync_is_durable() {
1103        let wal = Wal::with_store(MemStore::new()).unwrap();
1104        wal.append_and_sync(b"committed").unwrap();
1105        assert_eq!(drain(&wal), vec![b"committed".to_vec()]);
1106    }
1107
1108    #[test]
1109    fn test_iter_from_seeks_to_lsn() {
1110        let wal = Wal::with_store(MemStore::new()).unwrap();
1111        wal.append(b"a").unwrap();
1112        let b = wal.append(b"b").unwrap();
1113        wal.append(b"c").unwrap();
1114
1115        let got: Vec<Vec<u8>> = wal
1116            .iter_from(b)
1117            .unwrap()
1118            .map(|r| r.unwrap().into_data())
1119            .collect();
1120        assert_eq!(got, vec![b"b".to_vec(), b"c".to_vec()]);
1121    }
1122
1123    #[test]
1124    fn test_iter_from_past_end_is_empty() {
1125        let wal = Wal::with_store(MemStore::new()).unwrap();
1126        wal.append(b"a").unwrap();
1127        assert_eq!(wal.iter_from(Lsn::new(9_999)).unwrap().count(), 0);
1128    }
1129
1130    #[test]
1131    fn test_truncate_after_drops_later_records() {
1132        let wal = Wal::with_store(MemStore::new()).unwrap();
1133        wal.append(b"first").unwrap(); // [0, 13)
1134        let keep = wal.append(b"second").unwrap(); // [13, 27)
1135        wal.append(b"third").unwrap();
1136        wal.append(b"fourth").unwrap();
1137
1138        wal.truncate_after(keep).unwrap();
1139        assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1140        assert_eq!(wal.len(), 27);
1141
1142        // Appends resume immediately after the kept record.
1143        assert_eq!(wal.append(b"new").unwrap().get(), 27);
1144        assert_eq!(
1145            drain(&wal),
1146            vec![b"first".to_vec(), b"second".to_vec(), b"new".to_vec()]
1147        );
1148    }
1149
1150    #[test]
1151    fn test_truncate_after_keeping_last_record_is_a_no_op() {
1152        let wal = Wal::with_store(MemStore::new()).unwrap();
1153        wal.append(b"first").unwrap();
1154        let last = wal.append(b"second").unwrap();
1155        let before = wal.len();
1156
1157        wal.truncate_after(last).unwrap();
1158        assert_eq!(wal.len(), before);
1159        assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1160    }
1161
1162    #[test]
1163    fn test_truncate_after_invalid_lsn_errors() {
1164        let config = WalConfig::new().with_max_record_size(64);
1165        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1166        wal.append(b"only record").unwrap();
1167        // An LSN that does not land on a record boundary is rejected.
1168        let err = wal.truncate_after(Lsn::new(3)).unwrap_err();
1169        assert!(matches!(err, WalError::Corruption { .. }));
1170    }
1171
1172    #[test]
1173    fn test_concurrent_appends_no_overlap_all_recovered() {
1174        const THREADS: usize = 8;
1175        const PER_THREAD: usize = 200;
1176
1177        let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1178        let mut handles = Vec::new();
1179        for t in 0..THREADS {
1180            let wal = Arc::clone(&wal);
1181            handles.push(thread::spawn(move || {
1182                let mut lsns = Vec::with_capacity(PER_THREAD);
1183                for i in 0..PER_THREAD {
1184                    let payload = format!("t{t}-r{i}").into_bytes();
1185                    lsns.push(wal.append(&payload).unwrap().get());
1186                }
1187                lsns
1188            }));
1189        }
1190        let mut all_lsns = Vec::new();
1191        for h in handles {
1192            all_lsns.extend(h.join().unwrap());
1193        }
1194        wal.sync().unwrap();
1195
1196        // Every LSN is distinct (no two records shared a byte range).
1197        let mut sorted = all_lsns.clone();
1198        sorted.sort_unstable();
1199        sorted.dedup();
1200        assert_eq!(sorted.len(), THREADS * PER_THREAD);
1201
1202        // Recovery reads back exactly the records that were appended, in offset
1203        // order, with no gaps or corruption.
1204        let records = drain(&wal);
1205        assert_eq!(records.len(), THREADS * PER_THREAD);
1206
1207        // Reopening from the raw image recovers the same set.
1208        let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1209        assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
1210    }
1211
1212    #[test]
1213    fn test_concurrent_append_and_sync_all_durable() {
1214        const THREADS: usize = 8;
1215
1216        let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1217        let mut handles = Vec::new();
1218        for t in 0..THREADS {
1219            let wal = Arc::clone(&wal);
1220            handles.push(thread::spawn(move || {
1221                for i in 0..50 {
1222                    wal.append_and_sync(format!("{t}:{i}").as_bytes()).unwrap();
1223                }
1224            }));
1225        }
1226        for h in handles {
1227            h.join().unwrap();
1228        }
1229        assert_eq!(drain(&wal).len(), THREADS * 50);
1230    }
1231}