Skip to main content

wal_db/
wal.rs

1//! The log itself: [`Wal`], its recovery iterator [`WalIter`], and the
2//! [`Record`] iteration yields.
3
4use std::{fmt, io, sync::atomic::Ordering};
5
6#[cfg(not(loom))]
7use std::{cell::RefCell, path::Path};
8
9use crate::{
10    commit::Commit,
11    config::{RecoveryPolicy, WalConfig},
12    error::{Result, WalError},
13    lsn::Lsn,
14    record::{self, HEADER_LEN},
15    store::{FileStore, WalStore},
16    sync::AtomicU64,
17};
18
19/// A cache-line-aligned wrapper, used to keep the heavily-written reservation
20/// counter off the same cache line as the rest of the log's fields so appenders
21/// hammering it do not invalidate readers' caches (false sharing).
22#[repr(align(64))]
23#[derive(Debug)]
24struct CacheAligned<T>(T);
25
26/// A durable, append-only log.
27///
28/// `Wal` is the entry point. The four calls that cover almost every use are
29/// [`open`](Wal::open), [`append`](Wal::append), [`sync`](Wal::sync), and
30/// [`iter`](Wal::iter). The type parameter `S` is the storage backend and
31/// defaults to [`FileStore`], so the plain name `Wal` is the file-backed log;
32/// custom backends are supplied through [`with_store`](Wal::with_store).
33///
34/// A `Wal` is [`Send`] and [`Sync`], and the append path is built for it: many
35/// threads can call [`append`](Wal::append) at once with no global lock. Share
36/// one behind an [`Arc`](std::sync::Arc) and write from every thread.
37///
38/// # Concurrency and durability
39///
40/// Appends are lock-free. Each one reserves its byte range with a single atomic
41/// step — the range's start offset *is* the record's [`Lsn`] — frames the record
42/// into a reused thread-local buffer, and writes it, all without blocking other
43/// appenders. [`sync`](Wal::sync) is the durability barrier; when several
44/// threads sync at once they coalesce into a single fsync (group commit), so the
45/// cost of making data durable is amortised across everyone committing together.
46///
47/// `append` returns once the record is in the OS page cache; `sync` returns once
48/// it is on stable storage. See the [crate docs](crate) for the full contract.
49///
50/// # Examples
51///
52/// ```
53/// use wal_db::Wal;
54///
55/// # fn main() -> Result<(), wal_db::WalError> {
56/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
57/// # let path = dir.path().join("log.wal");
58/// let wal = Wal::open(&path)?;
59/// let first = wal.append(b"first")?;
60/// let second = wal.append(b"second")?;
61/// wal.sync()?;
62///
63/// // LSNs are byte offsets: the first record starts at 0, the second after it.
64/// assert_eq!(first.get(), 0);
65/// assert!(second.get() > first.get());
66///
67/// let read_back: Vec<Vec<u8>> = wal
68///     .iter()?
69///     .map(|entry| entry.map(|record| record.into_data()))
70///     .collect::<Result<_, _>>()?;
71/// assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);
72/// # Ok(())
73/// # }
74/// ```
75pub struct Wal<S = FileStore> {
76    /// Next byte offset to reserve. Hammered by every appender, so kept on its
77    /// own cache line.
78    tail: CacheAligned<AtomicU64>,
79    store: S,
80    max_record_size: u32,
81    recovery_policy: RecoveryPolicy,
82    /// Lowest readable offset — `0` until a prefix is dropped by
83    /// [`truncate_before`](Wal::truncate_before).
84    head: AtomicU64,
85    commit: Commit,
86}
87
88#[cfg(not(loom))]
89impl Wal<FileStore> {
90    /// Open the log at `path`, creating it if it does not exist.
91    ///
92    /// On open the log scans its contents, stops at the first record that is
93    /// incomplete or fails its checksum, and truncates that torn tail so the
94    /// next append lands on a clean boundary. The common cause of a torn tail is
95    /// a crash partway through an earlier append; that record was never
96    /// acknowledged durable, so discarding it loses nothing the caller was
97    /// promised.
98    ///
99    /// # Errors
100    ///
101    /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use wal_db::Wal;
107    /// # fn main() -> Result<(), wal_db::WalError> {
108    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
109    /// # let path = dir.path().join("log.wal");
110    /// let wal = Wal::open(&path)?;
111    /// wal.append(b"hello")?;
112    /// wal.sync()?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
117        Self::open_with(path, WalConfig::new())
118    }
119
120    /// Open the log at `path` with an explicit [`WalConfig`].
121    ///
122    /// # Errors
123    ///
124    /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
125    ///
126    /// # Examples
127    ///
128    /// ```
129    /// use wal_db::{Wal, WalConfig};
130    /// # fn main() -> Result<(), wal_db::WalError> {
131    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
132    /// # let path = dir.path().join("log.wal");
133    /// let config = WalConfig::new().with_max_record_size(1024);
134    /// let wal = Wal::open_with(&path, config)?;
135    /// # let _ = wal;
136    /// # Ok(())
137    /// # }
138    /// ```
139    pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
140        let store = FileStore::open(path)?;
141        Self::with_store_and_config(store, config)
142    }
143}
144
145#[cfg(not(loom))]
146impl Wal<crate::segment::SegmentedStore> {
147    /// Open a segmented log in directory `dir`, with segments of `segment_size`
148    /// bytes, creating the directory if needed.
149    ///
150    /// The log is one continuous byte stream striped across fixed-size files, so
151    /// it behaves exactly like a single-file log — records span segment
152    /// boundaries freely — while keeping each file bounded for recovery and
153    /// archival. Records larger than a segment simply occupy several.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
158    /// be opened or scanned.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use wal_db::Wal;
164    /// # fn main() -> Result<(), wal_db::WalError> {
165    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
166    /// let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
167    /// wal.append(b"record")?;
168    /// wal.sync()?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
173        Self::open_segmented_with(dir, segment_size, WalConfig::new())
174    }
175
176    /// Open a segmented log with an explicit [`WalConfig`].
177    ///
178    /// Like [`open_segmented`](Wal::open_segmented), but applies `config` (for
179    /// example a tighter [`max_record_size`](WalConfig::max_record_size)).
180    ///
181    /// # Errors
182    ///
183    /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
184    /// be opened or scanned.
185    pub fn open_segmented_with(
186        dir: impl AsRef<Path>,
187        segment_size: u64,
188        config: WalConfig,
189    ) -> Result<Self> {
190        let store = crate::segment::SegmentedStore::open(dir, segment_size)?;
191        Self::with_store_and_config(store, config)
192    }
193}
194
195impl<S: WalStore> Wal<S> {
196    /// Build a log over a custom [`WalStore`], using the default configuration.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if scanning the existing contents of the store fails.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// use wal_db::{MemStore, Wal};
206    /// # fn main() -> Result<(), wal_db::WalError> {
207    /// let wal = Wal::with_store(MemStore::new())?;
208    /// wal.append(b"record")?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub fn with_store(store: S) -> Result<Self> {
213        Self::with_store_and_config(store, WalConfig::new())
214    }
215
216    /// Build a log over a custom [`WalStore`] with an explicit [`WalConfig`].
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if scanning the existing contents of the store fails.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use wal_db::{MemStore, Wal, WalConfig};
226    /// # fn main() -> Result<(), wal_db::WalError> {
227    /// let config = WalConfig::new().with_max_record_size(64);
228    /// let wal = Wal::with_store_and_config(MemStore::new(), config)?;
229    /// # let _ = wal;
230    /// # Ok(())
231    /// # }
232    /// ```
233    pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self> {
234        let head = store.head()?;
235        let recovered = recover(&store, config.max_record_size(), head)?;
236        Ok(Wal {
237            tail: CacheAligned(AtomicU64::new(recovered)),
238            store,
239            max_record_size: config.max_record_size(),
240            recovery_policy: config.recovery_policy(),
241            head: AtomicU64::new(head),
242            commit: Commit::new(recovered),
243        })
244    }
245
246    /// Append `record` to the log and return the [`Lsn`] it was assigned — the
247    /// byte offset where the record begins.
248    ///
249    /// Lock-free: the byte range is reserved with one atomic step and the record
250    /// is written without blocking other appenders. Returns once the bytes are
251    /// in the operating system's page cache. It does **not** flush the disk —
252    /// call [`sync`](Wal::sync) for that. A crash between `append` and `sync` may
253    /// lose the record.
254    ///
255    /// # Errors
256    ///
257    /// - [`WalError::RecordTooLarge`] if `record` is larger than the configured
258    ///   [`max_record_size`](WalConfig::max_record_size). The log is unchanged.
259    /// - [`WalError::Io`] if the write fails. The reserved range becomes a
260    ///   permanent gap: the log is durable only up to that point, recovery stops
261    ///   there, and later syncs covering it report the truncation.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use wal_db::{MemStore, Wal};
267    /// # fn main() -> Result<(), wal_db::WalError> {
268    /// let wal = Wal::with_store(MemStore::new())?;
269    /// let lsn = wal.append(b"some bytes")?;
270    /// assert_eq!(lsn.get(), 0);
271    /// # Ok(())
272    /// # }
273    /// ```
274    pub fn append(&self, record: &[u8]) -> Result<Lsn> {
275        let payload_len = record.len();
276        if payload_len > self.max_record_size as usize {
277            return Err(WalError::RecordTooLarge {
278                len: payload_len,
279                max: self.max_record_size,
280            });
281        }
282        let frame_len = record::framed_len(payload_len) as u64;
283
284        // Reserve the byte range. The returned start offset is the LSN, and
285        // because it comes from a single atomic it is unique and ordered.
286        let start = self.tail.0.fetch_add(frame_len, Ordering::Relaxed);
287        let end = match start.checked_add(frame_len) {
288            Some(end) => end,
289            None => {
290                self.commit.mark_failed(start);
291                return Err(WalError::io(
292                    "reserving a record offset",
293                    io::Error::other("log size exceeds u64"),
294                ));
295            }
296        };
297
298        match self.frame_and_write(start, record) {
299            Ok(()) => {
300                self.commit.mark_written(start, end);
301                Ok(Lsn::new(start))
302            }
303            Err(error) => {
304                self.commit.mark_failed(start);
305                Err(error)
306            }
307        }
308    }
309
310    /// Make every record appended before this call durable.
311    ///
312    /// Returns once the data is on stable storage, using the platform's true
313    /// durability barrier. Concurrent calls coalesce into a single fsync, so the
314    /// flush cost is shared by everyone committing at the same time.
315    ///
316    /// # Errors
317    ///
318    /// Returns [`WalError::Io`] if the flush fails, or [`WalError::Corruption`]
319    /// if an earlier append's write failed and left a gap that cannot be made
320    /// durable. A failed sync means the records are not durable; treat it as
321    /// fatal, not as something to retry blindly.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// use wal_db::Wal;
327    /// # fn main() -> Result<(), wal_db::WalError> {
328    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
329    /// # let path = dir.path().join("log.wal");
330    /// let wal = Wal::open(&path)?;
331    /// wal.append(b"durable me")?;
332    /// wal.sync()?; // now on stable storage
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn sync(&self) -> Result<()> {
337        let target = self.tail.0.load(Ordering::Acquire);
338        if target == 0 {
339            return Ok(());
340        }
341        self.commit.sync_to(&self.store, target)
342    }
343
344    /// Append `record` and make it durable in one call, returning its [`Lsn`].
345    ///
346    /// Equivalent to [`append`](Wal::append) followed by a [`sync`](Wal::sync)
347    /// scoped to this record, but with the sync coalesced into the group commit
348    /// of any other threads syncing at the same moment. Use it when every record
349    /// must be durable before you proceed and you want the group-commit
350    /// throughput without managing the two calls yourself.
351    ///
352    /// # Errors
353    ///
354    /// The union of [`append`](Wal::append)'s and [`sync`](Wal::sync)'s errors.
355    ///
356    /// # Examples
357    ///
358    /// ```
359    /// use wal_db::Wal;
360    /// # fn main() -> Result<(), wal_db::WalError> {
361    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
362    /// # let path = dir.path().join("log.wal");
363    /// let wal = Wal::open(&path)?;
364    /// let lsn = wal.append_and_sync(b"committed immediately")?;
365    /// # let _ = lsn;
366    /// # Ok(())
367    /// # }
368    /// ```
369    pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn> {
370        let lsn = self.append(record)?;
371        let end = lsn.get() + record::framed_len(record.len()) as u64;
372        self.commit.sync_to(&self.store, end)?;
373        Ok(lsn)
374    }
375
376    /// Serialise `value` with `pack-io` and append it, returning its [`Lsn`].
377    ///
378    /// The typed counterpart to [`append`](Wal::append): the value is encoded to
379    /// bytes and appended as one record, which [`Record::decode`] reads back.
380    /// Available with the `pack-io` feature. Like `append`, it does not sync.
381    ///
382    /// # Errors
383    ///
384    /// - [`WalError::Encoding`] if the value fails to serialise.
385    /// - Otherwise the errors of [`append`](Wal::append) ([`WalError::RecordTooLarge`],
386    ///   [`WalError::Io`]).
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// use wal_db::{MemStore, Wal};
392    /// use wal_db::pack_io::{Deserialize, Serialize};
393    ///
394    /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
395    /// struct Entry {
396    ///     key: String,
397    ///     value: u64,
398    /// }
399    ///
400    /// # fn main() -> Result<(), wal_db::WalError> {
401    /// let wal = Wal::with_store(MemStore::new())?;
402    /// wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
403    ///
404    /// let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
405    /// assert_eq!(entry.value, 100);
406    /// # Ok(())
407    /// # }
408    /// ```
409    #[cfg(feature = "pack-io")]
410    pub fn append_typed<T: pack_io::Serialize + ?Sized>(&self, value: &T) -> Result<Lsn> {
411        let bytes = pack_io::encode(value).map_err(WalError::encoding)?;
412        self.append(&bytes)
413    }
414
415    /// Iterate the log from the beginning, yielding each record in append order.
416    ///
417    /// The iterator walks the records that are fully written at the moment it is
418    /// created — it does not see records still being written by other threads, or
419    /// appended afterwards. Each item is a [`Result`]: a damaged record yields a
420    /// single [`WalError::Corruption`] and then the iterator stops. In a log
421    /// opened normally the torn tail has already been truncated, so iteration
422    /// runs cleanly to the end.
423    ///
424    /// # Examples
425    ///
426    /// ```
427    /// use wal_db::{MemStore, Wal};
428    /// # fn main() -> Result<(), wal_db::WalError> {
429    /// let wal = Wal::with_store(MemStore::new())?;
430    /// wal.append(b"one")?;
431    /// wal.append(b"two")?;
432    ///
433    /// let mut seen = Vec::new();
434    /// for entry in wal.iter()? {
435    ///     seen.push(entry?.into_data());
436    /// }
437    /// assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);
438    /// # Ok(())
439    /// # }
440    /// ```
441    pub fn iter(&self) -> Result<WalIter<'_, S>> {
442        let end = self.commit.committed();
443        Ok(WalIter {
444            wal: self,
445            offset: self.head.load(Ordering::Acquire),
446            end,
447            done: false,
448            policy: self.recovery_policy,
449        })
450    }
451
452    /// Iterate from `from` (a record's [`Lsn`]) to the end, skipping the records
453    /// before it.
454    ///
455    /// Because an LSN is a byte offset, seeking is O(1): iteration simply starts
456    /// at `from` instead of 0. Pass an [`Lsn`] that a previous
457    /// [`append`](Wal::append) or [`iter`](Wal::iter) produced — a real record
458    /// boundary. An `Lsn` that does not land on a record boundary will be read as
459    /// a malformed record and surface as [`WalError::Corruption`]; an `Lsn` past
460    /// the end yields an empty iterator.
461    ///
462    /// # Examples
463    ///
464    /// ```
465    /// use wal_db::{MemStore, Wal};
466    /// # fn main() -> Result<(), wal_db::WalError> {
467    /// let wal = Wal::with_store(MemStore::new())?;
468    /// wal.append(b"one")?;
469    /// let second = wal.append(b"two")?;
470    /// wal.append(b"three")?;
471    ///
472    /// let from_second: Vec<Vec<u8>> = wal
473    ///     .iter_from(second)?
474    ///     .map(|entry| entry.map(|r| r.into_data()))
475    ///     .collect::<Result<_, _>>()?;
476    /// assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);
477    /// # Ok(())
478    /// # }
479    /// ```
480    pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>> {
481        let end = self.commit.committed();
482        // Never read below the head: a dropped prefix is gone.
483        let start = from.get().max(self.head.load(Ordering::Acquire)).min(end);
484        Ok(WalIter {
485            wal: self,
486            offset: start,
487            end,
488            done: false,
489            policy: self.recovery_policy,
490        })
491    }
492
493    /// Drop every record after the one at `lsn`, keeping the log up to and
494    /// including it. For compaction.
495    ///
496    /// The record at `lsn` becomes the new last record; the next append lands
497    /// right after it. The truncation is made durable before returning. `lsn`
498    /// must be a real record boundary from a previous [`append`](Wal::append) or
499    /// [`iter`](Wal::iter), and the record there must be intact.
500    ///
501    /// # Exclusive access
502    ///
503    /// This mutates the log's end, so it must **not** run concurrently with
504    /// [`append`](Wal::append), [`sync`](Wal::sync), or another `truncate_after`.
505    /// The caller is responsible for quiescing writers first — the usual case for
506    /// compaction, where the engine pauses the log, truncates, and resumes.
507    ///
508    /// # Errors
509    ///
510    /// - [`WalError::Corruption`] if `lsn` does not point at an intact record.
511    /// - [`WalError::Io`] if the truncation or its sync fails.
512    ///
513    /// # Examples
514    ///
515    /// ```
516    /// use wal_db::{MemStore, Wal};
517    /// # fn main() -> Result<(), wal_db::WalError> {
518    /// let wal = Wal::with_store(MemStore::new())?;
519    /// wal.append(b"keep me")?;
520    /// let last_kept = wal.append(b"and me")?;
521    /// wal.append(b"drop me")?;
522    ///
523    /// wal.truncate_after(last_kept)?;
524    ///
525    /// let remaining: Vec<Vec<u8>> = wal
526    ///     .iter()?
527    ///     .map(|entry| entry.map(|r| r.into_data()))
528    ///     .collect::<Result<_, _>>()?;
529    /// assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);
530    /// # Ok(())
531    /// # }
532    /// ```
533    pub fn truncate_after(&self, lsn: Lsn) -> Result<()> {
534        let start = lsn.get();
535
536        // Confirm an intact record really lives at `lsn` before keeping it.
537        let mut header = [0u8; HEADER_LEN];
538        if self.store.read_at(start, &mut header)? < HEADER_LEN {
539            return Err(WalError::corruption(start, "no record at this LSN"));
540        }
541        let parsed = record::parse_header(&header);
542        if parsed.len > self.max_record_size {
543            return Err(WalError::corruption(start, "no valid record at this LSN"));
544        }
545        let payload_start = start
546            .checked_add(HEADER_LEN as u64)
547            .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
548        let mut payload = vec![0u8; parsed.len as usize];
549        if self.store.read_at(payload_start, &mut payload)? < payload.len() {
550            return Err(WalError::corruption(start, "incomplete record at this LSN"));
551        }
552        if !record::verify(&header, &payload, parsed.crc) {
553            return Err(WalError::corruption(start, "no valid record at this LSN"));
554        }
555        let new_end = payload_start
556            .checked_add(u64::from(parsed.len))
557            .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
558
559        self.store.truncate(new_end)?;
560        self.store.sync()?;
561        self.tail.0.store(new_end, Ordering::Release);
562        self.commit.reset(new_end);
563        Ok(())
564    }
565
566    /// Drop the records before the one at `lsn`, keeping the log from there on,
567    /// and return the new head [`Lsn`] — the lowest record still present.
568    ///
569    /// This is prefix compaction: once a consumer has durably applied (and
570    /// flushed elsewhere) everything up to a checkpoint, the old records can be
571    /// reclaimed. Offsets are preserved — surviving records keep their LSNs — so
572    /// [`iter`](Wal::iter) and [`iter_from`](Wal::iter_from) continue to work.
573    ///
574    /// Reading resumes at exactly `lsn` — the returned head is `lsn` itself
575    /// (clamped so it never moves backward or past the end). *Reclamation*,
576    /// though, is at the backend's granularity: a segmented log
577    /// ([`Wal::open_segmented`](Wal::open_segmented)) deletes whole leading segment
578    /// files below the one that holds `lsn`, so a little space just before `lsn` —
579    /// back to its segment boundary — is kept rather than reclaimed, and the
580    /// segment with the most recent records is never dropped. A single-file log
581    /// cannot reclaim a prefix without moving the surviving bytes (which would
582    /// change their LSNs), so it is left unchanged and the returned head is
583    /// `Lsn(0)`.
584    ///
585    /// # Exclusive access
586    ///
587    /// Like [`truncate_after`](Wal::truncate_after), this must **not** run
588    /// concurrently with [`append`](Wal::append), [`sync`](Wal::sync),
589    /// [`iter`](Wal::iter), or another truncation: it removes files a reader could
590    /// be holding open. Quiesce other users first.
591    ///
592    /// # Errors
593    ///
594    /// Returns [`WalError::Io`] if the removal fails.
595    ///
596    /// # Examples
597    ///
598    /// ```
599    /// use wal_db::Wal;
600    /// # fn main() -> Result<(), wal_db::WalError> {
601    /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
602    /// // 32-byte segments so a handful of records spans several files.
603    /// let wal = Wal::open_segmented(dir.path(), 32)?;
604    /// for i in 0..10 {
605    ///     let _ = wal.append(format!("record {i}").as_bytes())?;
606    /// }
607    /// let checkpoint = wal.append(b"checkpoint")?;
608    /// wal.sync()?;
609    ///
610    /// // Reclaim everything before the checkpoint's segment.
611    /// let head = wal.truncate_before(checkpoint)?;
612    /// assert!(head <= checkpoint);
613    /// // Iteration now starts at (or before) the checkpoint, never at 0.
614    /// assert!(wal.iter()?.next().unwrap()?.lsn() >= head);
615    /// # Ok(())
616    /// # }
617    /// ```
618    pub fn truncate_before(&self, lsn: Lsn) -> Result<Lsn> {
619        let new_head = self.store.truncate_before(lsn.get())?;
620        self.head.store(new_head, Ordering::Release);
621        Ok(Lsn::new(new_head))
622    }
623
624    /// The logical size of the log in bytes, including record framing.
625    ///
626    /// This is the offset at which the next append will land. It counts bytes
627    /// that have been reserved, which under heavy concurrency may include a
628    /// record another thread is still writing.
629    #[must_use]
630    pub fn len(&self) -> u64 {
631        self.tail.0.load(Ordering::Acquire)
632    }
633
634    /// Whether the log holds no records.
635    #[must_use]
636    pub fn is_empty(&self) -> bool {
637        self.len() == 0
638    }
639
640    /// Frame `record` into a reused buffer and write it at `start`.
641    fn frame_and_write(&self, start: u64, record: &[u8]) -> Result<()> {
642        with_frame_buffer(|buf| {
643            record::encode(buf, record);
644            self.store.write_at(start, buf)
645        })
646    }
647
648    /// Crate-internal access to the backing store, for tests that need to read,
649    /// corrupt, or extend the on-disk image directly.
650    #[cfg(test)]
651    pub(crate) fn store(&self) -> &S {
652        &self.store
653    }
654}
655
656/// Frame a record using a reused thread-local buffer, so steady-state appends do
657/// not allocate. Under loom a fresh buffer is used, since the model checker does
658/// not need (and does not instrument) the thread-local.
659#[cfg(not(loom))]
660fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
661    thread_local! {
662        static FRAME: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
663    }
664    FRAME.with(|cell| f(&mut cell.borrow_mut()))
665}
666
667#[cfg(loom)]
668fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
669    let mut buf = Vec::new();
670    f(&mut buf)
671}
672
673/// Scan a store from the start, returning the end offset of the last intact
674/// record and truncating any torn tail beyond it.
675fn recover<S: WalStore>(store: &S, max_record_size: u32, head: u64) -> Result<u64> {
676    let physical = store.len()?;
677    // Scan from the head: a log whose prefix was dropped no longer starts at 0.
678    let mut offset: u64 = head;
679    let mut header = [0u8; HEADER_LEN];
680    // One reused payload buffer for the whole scan: only the checksum needs the
681    // bytes, so the buffer is refilled per record rather than reallocated.
682    let mut payload = Vec::new();
683
684    while offset < physical {
685        if store.read_at(offset, &mut header)? < HEADER_LEN {
686            break; // incomplete header: torn tail
687        }
688        let parsed = record::parse_header(&header);
689        if parsed.len > max_record_size {
690            break; // implausible length: treat the rest as a torn tail
691        }
692
693        let payload_start = match offset.checked_add(HEADER_LEN as u64) {
694            Some(start) => start,
695            None => break,
696        };
697        let len = parsed.len as usize;
698        payload.clear();
699        payload.resize(len, 0);
700        if store.read_at(payload_start, &mut payload)? < len {
701            break; // incomplete payload: torn tail
702        }
703        if !record::verify(&header, &payload, parsed.crc) {
704            break; // checksum mismatch: stop here
705        }
706
707        offset = match payload_start.checked_add(u64::from(parsed.len)) {
708            Some(end) => end,
709            None => break,
710        };
711    }
712
713    if offset < physical {
714        store.truncate(offset)?;
715    }
716    Ok(offset)
717}
718
719impl<S: WalStore> fmt::Debug for Wal<S> {
720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721        f.debug_struct("Wal")
722            .field("len", &self.tail.0.load(Ordering::Relaxed))
723            .finish_non_exhaustive()
724    }
725}
726
727/// One record read back during iteration: its [`Lsn`] and its payload bytes.
728///
729/// Yielded by [`Wal::iter`]. The payload is owned (a fresh `Vec` per record);
730/// take it without copying via [`into_data`](Record::into_data), or borrow it
731/// via [`data`](Record::data).
732#[derive(Debug, Clone, PartialEq, Eq)]
733pub struct Record {
734    lsn: Lsn,
735    data: Vec<u8>,
736}
737
738impl Record {
739    /// The sequence number this record was assigned — its byte offset in the log.
740    pub fn lsn(&self) -> Lsn {
741        self.lsn
742    }
743
744    /// The record's payload bytes.
745    #[must_use]
746    pub fn data(&self) -> &[u8] {
747        &self.data
748    }
749
750    /// The payload length in bytes.
751    #[must_use]
752    pub fn len(&self) -> usize {
753        self.data.len()
754    }
755
756    /// Whether the record's payload is empty.
757    #[must_use]
758    pub fn is_empty(&self) -> bool {
759        self.data.is_empty()
760    }
761
762    /// Consume the record and take ownership of its payload without copying.
763    #[must_use]
764    pub fn into_data(self) -> Vec<u8> {
765        self.data
766    }
767
768    /// Decode the record's payload into a typed value via `pack-io`.
769    ///
770    /// The mirror of [`Wal::append_typed`]. Available with the `pack-io` feature.
771    ///
772    /// # Errors
773    ///
774    /// Returns [`WalError::Encoding`] if the bytes do not deserialise into `T` —
775    /// for example reading a record written as a different type.
776    ///
777    /// # Examples
778    ///
779    /// ```
780    /// use wal_db::{MemStore, Wal};
781    /// use wal_db::pack_io::{Deserialize, Serialize};
782    ///
783    /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
784    /// struct Event {
785    ///     id: u64,
786    ///     name: String,
787    /// }
788    ///
789    /// # fn main() -> Result<(), wal_db::WalError> {
790    /// let wal = Wal::with_store(MemStore::new())?;
791    /// wal.append_typed(&Event { id: 7, name: "boot".into() })?;
792    ///
793    /// let record = wal.iter()?.next().unwrap()?;
794    /// let event: Event = record.decode()?;
795    /// assert_eq!(event, Event { id: 7, name: "boot".into() });
796    /// # Ok(())
797    /// # }
798    /// ```
799    #[cfg(feature = "pack-io")]
800    pub fn decode<T: pack_io::Deserialize>(&self) -> Result<T> {
801        pack_io::decode(&self.data).map_err(WalError::encoding)
802    }
803}
804
805/// The outcome of reading one record-sized chunk at the iterator's cursor.
806enum Step {
807    /// A valid record, plus the offset just past it.
808    Record(Record, u64),
809    /// A damaged record. `skip_to` is the offset of the next record if its
810    /// extent is known (length and payload present, only the checksum failed),
811    /// or `None` if the damage makes the next record's position unknowable.
812    Damaged(WalError, Option<u64>),
813    /// A clean end: a short read, meaning the log stops here (a torn tail).
814    End,
815}
816
817/// The iterator returned by [`Wal::iter`].
818///
819/// Walks the records fully written when it was created, yielding
820/// `Result<`[`Record`]`>`. Behaviour at a damaged record follows the configured
821/// [`RecoveryPolicy`]: by default the iterator yields the damage once and stops;
822/// under [`RecoveryPolicy::SkipBadRecords`] it yields the damage and continues
823/// past it when the next record's position is still recoverable.
824pub struct WalIter<'a, S: WalStore = FileStore> {
825    wal: &'a Wal<S>,
826    offset: u64,
827    end: u64,
828    done: bool,
829    policy: RecoveryPolicy,
830}
831
832impl<S: WalStore> WalIter<'_, S> {
833    /// Read and classify the record at the current offset, without advancing.
834    fn step(&self) -> Result<Step> {
835        let mut header = [0u8; HEADER_LEN];
836        if self.wal.store.read_at(self.offset, &mut header)? < HEADER_LEN {
837            return Ok(Step::End);
838        }
839        let parsed = record::parse_header(&header);
840        if parsed.len > self.wal.max_record_size {
841            // The length is implausible, so the next record's position is
842            // unknowable — there is nothing to skip to.
843            return Ok(Step::Damaged(
844                WalError::corruption(self.offset, "record length exceeds the maximum"),
845                None,
846            ));
847        }
848
849        let payload_start = self
850            .offset
851            .checked_add(HEADER_LEN as u64)
852            .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
853        let mut payload = vec![0u8; parsed.len as usize];
854        if self.wal.store.read_at(payload_start, &mut payload)? < payload.len() {
855            return Ok(Step::End);
856        }
857        let next = payload_start
858            .checked_add(u64::from(parsed.len))
859            .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
860
861        if !record::verify(&header, &payload, parsed.crc) {
862            // The length and payload are present, so we know where the next
863            // record starts even though this one is corrupt.
864            return Ok(Step::Damaged(
865                WalError::corruption(self.offset, "checksum mismatch"),
866                Some(next),
867            ));
868        }
869
870        Ok(Step::Record(
871            Record {
872                lsn: Lsn::new(self.offset),
873                data: payload,
874            },
875            next,
876        ))
877    }
878}
879
880impl<S: WalStore> Iterator for WalIter<'_, S> {
881    type Item = Result<Record>;
882
883    fn next(&mut self) -> Option<Self::Item> {
884        if self.done || self.offset >= self.end {
885            return None;
886        }
887        match self.step() {
888            Ok(Step::Record(record, next)) => {
889                self.offset = next;
890                Some(Ok(record))
891            }
892            // Skip-bad-records, and the next record is locatable: surface the
893            // damage but continue from past it on the next call.
894            Ok(Step::Damaged(error, Some(next)))
895                if self.policy == RecoveryPolicy::SkipBadRecords =>
896            {
897                self.offset = next;
898                Some(Err(error))
899            }
900            // Stop-at-first-error, or damage that makes the next position
901            // unknowable: surface the damage and end.
902            Ok(Step::Damaged(error, _)) => {
903                self.done = true;
904                Some(Err(error))
905            }
906            Ok(Step::End) => {
907                self.done = true;
908                None
909            }
910            Err(error) => {
911                self.done = true;
912                Some(Err(error))
913            }
914        }
915    }
916}
917
918impl<S: WalStore> fmt::Debug for WalIter<'_, S> {
919    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920        f.debug_struct("WalIter")
921            .field("offset", &self.offset)
922            .field("end", &self.end)
923            .field("done", &self.done)
924            .finish()
925    }
926}
927
928#[cfg(all(test, not(loom)))]
929#[allow(
930    clippy::unwrap_used,
931    clippy::expect_used,
932    unused_must_use,
933    unused_results
934)]
935mod tests {
936    use std::sync::Arc;
937    use std::thread;
938
939    use super::*;
940    use crate::store::MemStore;
941
942    fn drain(wal: &Wal<MemStore>) -> Vec<Vec<u8>> {
943        wal.iter()
944            .unwrap()
945            .map(|r| r.unwrap().into_data())
946            .collect()
947    }
948
949    fn corrupt_byte(store: &MemStore, offset: u64) {
950        let mut byte = [0u8; 1];
951        store.read_at(offset, &mut byte).unwrap();
952        byte[0] ^= 0xFF;
953        store.write_at(offset, &byte).unwrap();
954    }
955
956    #[test]
957    fn test_stop_at_first_error_stops_at_corruption() {
958        let wal = Wal::with_store(MemStore::new()).unwrap(); // default policy
959        wal.append(b"first").unwrap();
960        let second = wal.append(b"second").unwrap();
961        wal.append(b"third").unwrap();
962        corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
963
964        let items: Vec<_> = wal.iter().unwrap().collect();
965        assert_eq!(items.len(), 2); // first ok, second damaged, then stop
966        assert_eq!(items[0].as_ref().unwrap().data(), b"first");
967        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
968    }
969
970    #[test]
971    fn test_skip_bad_records_continues_past_corruption() {
972        let config = WalConfig::new().with_recovery_policy(RecoveryPolicy::SkipBadRecords);
973        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
974        wal.append(b"first").unwrap();
975        let second = wal.append(b"second").unwrap();
976        wal.append(b"third").unwrap();
977        // Corrupt the payload only; the length prefix stays intact, so the
978        // record is skippable.
979        corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
980
981        let items: Vec<_> = wal.iter().unwrap().collect();
982        assert_eq!(items.len(), 3);
983        assert_eq!(items[0].as_ref().unwrap().data(), b"first");
984        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
985        assert_eq!(items[2].as_ref().unwrap().data(), b"third");
986    }
987
988    #[test]
989    fn test_skip_bad_records_still_stops_on_unreadable_length() {
990        let config = WalConfig::new()
991            .with_max_record_size(16)
992            .with_recovery_policy(RecoveryPolicy::SkipBadRecords);
993        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
994        wal.append(b"ok").unwrap();
995        let second = wal.append(b"victim").unwrap();
996        // Corrupt the length field to an implausible value: the next record's
997        // position becomes unknowable, so even skip-mode must stop.
998        corrupt_byte(wal.store(), second.get() + 4); // LEN_OFFSET within the header
999
1000        let items: Vec<_> = wal.iter().unwrap().collect();
1001        assert_eq!(items.len(), 2); // first ok, then a damaged stop
1002        assert_eq!(items[0].as_ref().unwrap().data(), b"ok");
1003        assert!(matches!(items[1], Err(WalError::Corruption { .. })));
1004    }
1005
1006    #[cfg(feature = "pack-io")]
1007    #[test]
1008    fn test_typed_record_roundtrip() {
1009        use pack_io::{Deserialize, Serialize};
1010
1011        #[derive(Serialize, Deserialize, PartialEq, Debug)]
1012        struct Entry {
1013            id: u64,
1014            label: String,
1015        }
1016
1017        let wal = Wal::with_store(MemStore::new()).unwrap();
1018        wal.append_typed(&Entry {
1019            id: 9,
1020            label: "nine".into(),
1021        })
1022        .unwrap();
1023        wal.append_typed(&Entry {
1024            id: 10,
1025            label: "ten".into(),
1026        })
1027        .unwrap();
1028
1029        let decoded: Vec<Entry> = wal
1030            .iter()
1031            .unwrap()
1032            .map(|r| r.unwrap().decode().unwrap())
1033            .collect();
1034        assert_eq!(
1035            decoded[0],
1036            Entry {
1037                id: 9,
1038                label: "nine".into()
1039            }
1040        );
1041        assert_eq!(
1042            decoded[1],
1043            Entry {
1044                id: 10,
1045                label: "ten".into()
1046            }
1047        );
1048    }
1049
1050    #[cfg(feature = "pack-io")]
1051    #[test]
1052    fn test_typed_decode_wrong_type_errors() {
1053        use pack_io::{Deserialize, Serialize};
1054
1055        #[derive(Serialize)]
1056        struct Big {
1057            a: u64,
1058            b: u64,
1059            c: u64,
1060        }
1061        #[derive(Deserialize)]
1062        struct Small {
1063            _a: u8,
1064        }
1065
1066        let wal = Wal::with_store(MemStore::new()).unwrap();
1067        wal.append_typed(&Big { a: 1, b: 2, c: 3 }).unwrap();
1068        let record = wal.iter().unwrap().next().unwrap().unwrap();
1069        // Decoding 24 bytes as a 1-byte type leaves trailing bytes -> error.
1070        let result: Result<Small> = record.decode();
1071        assert!(matches!(result, Err(WalError::Encoding { .. })));
1072    }
1073
1074    #[test]
1075    fn test_append_assigns_byte_offset_lsns() {
1076        let wal = Wal::with_store(MemStore::new()).unwrap();
1077        let a = wal.append(b"abc").unwrap(); // 8 header + 3 = 11 bytes
1078        let b = wal.append(b"de").unwrap();
1079        assert_eq!(a.get(), 0);
1080        assert_eq!(b.get(), 11);
1081    }
1082
1083    #[test]
1084    fn test_iter_reads_back_all_records_in_order() {
1085        let wal = Wal::with_store(MemStore::new()).unwrap();
1086        wal.append(b"one").unwrap();
1087        wal.append(b"two").unwrap();
1088        wal.append(b"three").unwrap();
1089        assert_eq!(
1090            drain(&wal),
1091            vec![b"one".to_vec(), b"two".to_vec(), b"three".to_vec()]
1092        );
1093    }
1094
1095    #[test]
1096    fn test_empty_log_iterates_to_nothing() {
1097        let wal = Wal::with_store(MemStore::new()).unwrap();
1098        assert!(wal.is_empty());
1099        assert_eq!(drain(&wal).len(), 0);
1100    }
1101
1102    #[test]
1103    fn test_empty_record_roundtrips() {
1104        let wal = Wal::with_store(MemStore::new()).unwrap();
1105        wal.append(b"").unwrap();
1106        assert_eq!(drain(&wal), vec![Vec::<u8>::new()]);
1107    }
1108
1109    #[test]
1110    fn test_record_too_large_is_rejected() {
1111        let config = WalConfig::new().with_max_record_size(4);
1112        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1113        wal.append(b"ok").unwrap();
1114        let err = wal.append(b"too long").unwrap_err();
1115        assert!(matches!(err, WalError::RecordTooLarge { len: 8, max: 4 }));
1116        // The rejected append did not advance the log.
1117        assert_eq!(drain(&wal), vec![b"ok".to_vec()]);
1118    }
1119
1120    #[test]
1121    fn test_reopen_recovers_records() {
1122        let wal = Wal::with_store(MemStore::new()).unwrap();
1123        wal.append(b"first").unwrap();
1124        wal.append(b"second").unwrap();
1125        wal.sync().unwrap();
1126        let image = wal.store().snapshot();
1127
1128        let reopened = Wal::with_store(MemStore::from_bytes(image)).unwrap();
1129        assert_eq!(
1130            drain(&reopened),
1131            vec![b"first".to_vec(), b"second".to_vec()]
1132        );
1133        // The next append continues at the recovered end: two records of
1134        // (8 + 5) and (8 + 6) bytes leave the tail at 27.
1135        assert_eq!(reopened.append(b"third").unwrap().get(), 27);
1136    }
1137
1138    #[test]
1139    fn test_recovery_truncates_torn_tail() {
1140        let wal = Wal::with_store(MemStore::new()).unwrap();
1141        wal.append(b"good record").unwrap();
1142        let clean_len = wal.len();
1143        // Append raw garbage directly to the store: a torn tail.
1144        wal.store().write_at(clean_len, &[0xAB; 5]).unwrap();
1145
1146        let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1147        assert_eq!(drain(&reopened), vec![b"good record".to_vec()]);
1148        assert_eq!(reopened.len(), clean_len);
1149    }
1150
1151    #[test]
1152    fn test_corrupt_record_surfaces_error_then_stops() {
1153        let wal = Wal::with_store(MemStore::new()).unwrap();
1154        wal.append(b"intact").unwrap();
1155        let second = wal.append(b"victim").unwrap();
1156        // Flip a byte inside the second record's payload (offset + header).
1157        let payload_offset = second.get() + HEADER_LEN as u64;
1158        let mut byte = [0u8; 1];
1159        wal.store().read_at(payload_offset, &mut byte).unwrap();
1160        byte[0] ^= 0xFF;
1161        wal.store().write_at(payload_offset, &byte).unwrap();
1162
1163        let mut iter = wal.iter().unwrap();
1164        assert_eq!(iter.next().unwrap().unwrap().data(), b"intact");
1165        assert!(matches!(
1166            iter.next().unwrap(),
1167            Err(WalError::Corruption { .. })
1168        ));
1169        assert!(iter.next().is_none());
1170    }
1171
1172    #[test]
1173    fn test_append_and_sync_is_durable() {
1174        let wal = Wal::with_store(MemStore::new()).unwrap();
1175        wal.append_and_sync(b"committed").unwrap();
1176        assert_eq!(drain(&wal), vec![b"committed".to_vec()]);
1177    }
1178
1179    #[test]
1180    fn test_iter_from_seeks_to_lsn() {
1181        let wal = Wal::with_store(MemStore::new()).unwrap();
1182        wal.append(b"a").unwrap();
1183        let b = wal.append(b"b").unwrap();
1184        wal.append(b"c").unwrap();
1185
1186        let got: Vec<Vec<u8>> = wal
1187            .iter_from(b)
1188            .unwrap()
1189            .map(|r| r.unwrap().into_data())
1190            .collect();
1191        assert_eq!(got, vec![b"b".to_vec(), b"c".to_vec()]);
1192    }
1193
1194    #[test]
1195    fn test_iter_from_past_end_is_empty() {
1196        let wal = Wal::with_store(MemStore::new()).unwrap();
1197        wal.append(b"a").unwrap();
1198        assert_eq!(wal.iter_from(Lsn::new(9_999)).unwrap().count(), 0);
1199    }
1200
1201    #[test]
1202    fn test_truncate_after_drops_later_records() {
1203        let wal = Wal::with_store(MemStore::new()).unwrap();
1204        wal.append(b"first").unwrap(); // [0, 13)
1205        let keep = wal.append(b"second").unwrap(); // [13, 27)
1206        wal.append(b"third").unwrap();
1207        wal.append(b"fourth").unwrap();
1208
1209        wal.truncate_after(keep).unwrap();
1210        assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1211        assert_eq!(wal.len(), 27);
1212
1213        // Appends resume immediately after the kept record.
1214        assert_eq!(wal.append(b"new").unwrap().get(), 27);
1215        assert_eq!(
1216            drain(&wal),
1217            vec![b"first".to_vec(), b"second".to_vec(), b"new".to_vec()]
1218        );
1219    }
1220
1221    #[test]
1222    fn test_truncate_after_keeping_last_record_is_a_no_op() {
1223        let wal = Wal::with_store(MemStore::new()).unwrap();
1224        wal.append(b"first").unwrap();
1225        let last = wal.append(b"second").unwrap();
1226        let before = wal.len();
1227
1228        wal.truncate_after(last).unwrap();
1229        assert_eq!(wal.len(), before);
1230        assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1231    }
1232
1233    #[test]
1234    fn test_truncate_after_invalid_lsn_errors() {
1235        let config = WalConfig::new().with_max_record_size(64);
1236        let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1237        wal.append(b"only record").unwrap();
1238        // An LSN that does not land on a record boundary is rejected.
1239        let err = wal.truncate_after(Lsn::new(3)).unwrap_err();
1240        assert!(matches!(err, WalError::Corruption { .. }));
1241    }
1242
1243    #[test]
1244    fn test_concurrent_appends_no_overlap_all_recovered() {
1245        const THREADS: usize = 8;
1246        const PER_THREAD: usize = 200;
1247
1248        let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1249        let mut handles = Vec::new();
1250        for t in 0..THREADS {
1251            let wal = Arc::clone(&wal);
1252            handles.push(thread::spawn(move || {
1253                let mut lsns = Vec::with_capacity(PER_THREAD);
1254                for i in 0..PER_THREAD {
1255                    let payload = format!("t{t}-r{i}").into_bytes();
1256                    lsns.push(wal.append(&payload).unwrap().get());
1257                }
1258                lsns
1259            }));
1260        }
1261        let mut all_lsns = Vec::new();
1262        for h in handles {
1263            all_lsns.extend(h.join().unwrap());
1264        }
1265        wal.sync().unwrap();
1266
1267        // Every LSN is distinct (no two records shared a byte range).
1268        let mut sorted = all_lsns.clone();
1269        sorted.sort_unstable();
1270        sorted.dedup();
1271        assert_eq!(sorted.len(), THREADS * PER_THREAD);
1272
1273        // Recovery reads back exactly the records that were appended, in offset
1274        // order, with no gaps or corruption.
1275        let records = drain(&wal);
1276        assert_eq!(records.len(), THREADS * PER_THREAD);
1277
1278        // Reopening from the raw image recovers the same set.
1279        let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1280        assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
1281    }
1282
1283    #[test]
1284    fn test_concurrent_append_and_sync_all_durable() {
1285        const THREADS: usize = 8;
1286
1287        let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1288        let mut handles = Vec::new();
1289        for t in 0..THREADS {
1290            let wal = Arc::clone(&wal);
1291            handles.push(thread::spawn(move || {
1292                for i in 0..50 {
1293                    wal.append_and_sync(format!("{t}:{i}").as_bytes()).unwrap();
1294                }
1295            }));
1296        }
1297        for h in handles {
1298            h.join().unwrap();
1299        }
1300        assert_eq!(drain(&wal).len(), THREADS * 50);
1301    }
1302}