wal_db/wal.rs
1//! The log itself: [`Wal`], its recovery iterator [`WalIter`], and the
2//! [`Record`] iteration yields.
3
4use std::{fmt, io, sync::atomic::Ordering};
5
6#[cfg(not(loom))]
7use std::{cell::RefCell, path::Path};
8
9use crate::{
10 commit::Commit,
11 config::{RecoveryPolicy, WalConfig},
12 error::{Result, WalError},
13 lsn::Lsn,
14 record::{self, HEADER_LEN},
15 store::{FileStore, WalStore},
16 sync::AtomicU64,
17};
18
19/// A cache-line-aligned wrapper, used to keep the heavily-written reservation
20/// counter off the same cache line as the rest of the log's fields so appenders
21/// hammering it do not invalidate readers' caches (false sharing).
22#[repr(align(64))]
23#[derive(Debug)]
24struct CacheAligned<T>(T);
25
26/// A durable, append-only log.
27///
28/// `Wal` is the entry point. The four calls that cover almost every use are
29/// [`open`](Wal::open), [`append`](Wal::append), [`sync`](Wal::sync), and
30/// [`iter`](Wal::iter). The type parameter `S` is the storage backend and
31/// defaults to [`FileStore`], so the plain name `Wal` is the file-backed log;
32/// custom backends are supplied through [`with_store`](Wal::with_store).
33///
34/// A `Wal` is [`Send`] and [`Sync`], and the append path is built for it: many
35/// threads can call [`append`](Wal::append) at once with no global lock. Share
36/// one behind an [`Arc`](std::sync::Arc) and write from every thread.
37///
38/// # Concurrency and durability
39///
40/// Appends are lock-free. Each one reserves its byte range with a single atomic
41/// step — the range's start offset *is* the record's [`Lsn`] — frames the record
42/// into a reused thread-local buffer, and writes it, all without blocking other
43/// appenders. [`sync`](Wal::sync) is the durability barrier; when several
44/// threads sync at once they coalesce into a single fsync (group commit), so the
45/// cost of making data durable is amortised across everyone committing together.
46///
47/// `append` returns once the record is in the OS page cache; `sync` returns once
48/// it is on stable storage. See the [crate docs](crate) for the full contract.
49///
50/// # Examples
51///
52/// ```
53/// use wal_db::Wal;
54///
55/// # fn main() -> Result<(), wal_db::WalError> {
56/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
57/// # let path = dir.path().join("log.wal");
58/// let wal = Wal::open(&path)?;
59/// let first = wal.append(b"first")?;
60/// let second = wal.append(b"second")?;
61/// wal.sync()?;
62///
63/// // LSNs are byte offsets: the first record starts at 0, the second after it.
64/// assert_eq!(first.get(), 0);
65/// assert!(second.get() > first.get());
66///
67/// let read_back: Vec<Vec<u8>> = wal
68/// .iter()?
69/// .map(|entry| entry.map(|record| record.into_data()))
70/// .collect::<Result<_, _>>()?;
71/// assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);
72/// # Ok(())
73/// # }
74/// ```
75pub struct Wal<S = FileStore> {
76 /// Next byte offset to reserve. Hammered by every appender, so kept on its
77 /// own cache line.
78 tail: CacheAligned<AtomicU64>,
79 store: S,
80 max_record_size: u32,
81 recovery_policy: RecoveryPolicy,
82 /// Lowest readable offset — `0` until a prefix is dropped by
83 /// [`truncate_before`](Wal::truncate_before).
84 head: AtomicU64,
85 commit: Commit,
86}
87
88#[cfg(not(loom))]
89impl Wal<FileStore> {
90 /// Open the log at `path`, creating it if it does not exist.
91 ///
92 /// On open the log scans its contents, stops at the first record that is
93 /// incomplete or fails its checksum, and truncates that torn tail so the
94 /// next append lands on a clean boundary. The common cause of a torn tail is
95 /// a crash partway through an earlier append; that record was never
96 /// acknowledged durable, so discarding it loses nothing the caller was
97 /// promised.
98 ///
99 /// # Errors
100 ///
101 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// use wal_db::Wal;
107 /// # fn main() -> Result<(), wal_db::WalError> {
108 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
109 /// # let path = dir.path().join("log.wal");
110 /// let wal = Wal::open(&path)?;
111 /// wal.append(b"hello")?;
112 /// wal.sync()?;
113 /// # Ok(())
114 /// # }
115 /// ```
116 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
117 Self::open_with(path, WalConfig::new())
118 }
119
120 /// Open the log at `path` with an explicit [`WalConfig`].
121 ///
122 /// # Errors
123 ///
124 /// Returns [`WalError::Io`] if the file cannot be opened or scanned.
125 ///
126 /// # Examples
127 ///
128 /// ```
129 /// use wal_db::{Wal, WalConfig};
130 /// # fn main() -> Result<(), wal_db::WalError> {
131 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
132 /// # let path = dir.path().join("log.wal");
133 /// let config = WalConfig::new().with_max_record_size(1024);
134 /// let wal = Wal::open_with(&path, config)?;
135 /// # let _ = wal;
136 /// # Ok(())
137 /// # }
138 /// ```
139 pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
140 let store = FileStore::open(path)?;
141 Self::with_store_and_config(store, config)
142 }
143}
144
145#[cfg(not(loom))]
146impl Wal<crate::segment::SegmentedStore> {
147 /// Open a segmented log in directory `dir`, with segments of `segment_size`
148 /// bytes, creating the directory if needed.
149 ///
150 /// The log is one continuous byte stream striped across fixed-size files, so
151 /// it behaves exactly like a single-file log — records span segment
152 /// boundaries freely — while keeping each file bounded for recovery and
153 /// archival. Records larger than a segment simply occupy several.
154 ///
155 /// # Errors
156 ///
157 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
158 /// be opened or scanned.
159 ///
160 /// # Examples
161 ///
162 /// ```
163 /// use wal_db::Wal;
164 /// # fn main() -> Result<(), wal_db::WalError> {
165 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
166 /// let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
167 /// wal.append(b"record")?;
168 /// wal.sync()?;
169 /// # Ok(())
170 /// # }
171 /// ```
172 pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
173 Self::open_segmented_with(dir, segment_size, WalConfig::new())
174 }
175
176 /// Open a segmented log with an explicit [`WalConfig`].
177 ///
178 /// Like [`open_segmented`](Wal::open_segmented), but applies `config` (for
179 /// example a tighter [`max_record_size`](WalConfig::max_record_size)).
180 ///
181 /// # Errors
182 ///
183 /// Returns [`WalError::Io`] if `segment_size` is zero or the directory cannot
184 /// be opened or scanned.
185 pub fn open_segmented_with(
186 dir: impl AsRef<Path>,
187 segment_size: u64,
188 config: WalConfig,
189 ) -> Result<Self> {
190 let store = crate::segment::SegmentedStore::open(dir, segment_size)?;
191 Self::with_store_and_config(store, config)
192 }
193}
194
195impl<S: WalStore> Wal<S> {
196 /// Build a log over a custom [`WalStore`], using the default configuration.
197 ///
198 /// # Errors
199 ///
200 /// Returns an error if scanning the existing contents of the store fails.
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use wal_db::{MemStore, Wal};
206 /// # fn main() -> Result<(), wal_db::WalError> {
207 /// let wal = Wal::with_store(MemStore::new())?;
208 /// wal.append(b"record")?;
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub fn with_store(store: S) -> Result<Self> {
213 Self::with_store_and_config(store, WalConfig::new())
214 }
215
216 /// Build a log over a custom [`WalStore`] with an explicit [`WalConfig`].
217 ///
218 /// # Errors
219 ///
220 /// Returns an error if scanning the existing contents of the store fails.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use wal_db::{MemStore, Wal, WalConfig};
226 /// # fn main() -> Result<(), wal_db::WalError> {
227 /// let config = WalConfig::new().with_max_record_size(64);
228 /// let wal = Wal::with_store_and_config(MemStore::new(), config)?;
229 /// # let _ = wal;
230 /// # Ok(())
231 /// # }
232 /// ```
233 pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self> {
234 let head = store.head()?;
235 let recovered = recover(&store, config.max_record_size(), head)?;
236 Ok(Wal {
237 tail: CacheAligned(AtomicU64::new(recovered)),
238 store,
239 max_record_size: config.max_record_size(),
240 recovery_policy: config.recovery_policy(),
241 head: AtomicU64::new(head),
242 commit: Commit::new(recovered),
243 })
244 }
245
246 /// Append `record` to the log and return the [`Lsn`] it was assigned — the
247 /// byte offset where the record begins.
248 ///
249 /// Lock-free: the byte range is reserved with one atomic step and the record
250 /// is written without blocking other appenders. Returns once the bytes are
251 /// in the operating system's page cache. It does **not** flush the disk —
252 /// call [`sync`](Wal::sync) for that. A crash between `append` and `sync` may
253 /// lose the record.
254 ///
255 /// # Errors
256 ///
257 /// - [`WalError::RecordTooLarge`] if `record` is larger than the configured
258 /// [`max_record_size`](WalConfig::max_record_size). The log is unchanged.
259 /// - [`WalError::Io`] if the write fails. The reserved range becomes a
260 /// permanent gap: the log is durable only up to that point, recovery stops
261 /// there, and later syncs covering it report the truncation.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use wal_db::{MemStore, Wal};
267 /// # fn main() -> Result<(), wal_db::WalError> {
268 /// let wal = Wal::with_store(MemStore::new())?;
269 /// let lsn = wal.append(b"some bytes")?;
270 /// assert_eq!(lsn.get(), 0);
271 /// # Ok(())
272 /// # }
273 /// ```
274 pub fn append(&self, record: &[u8]) -> Result<Lsn> {
275 let payload_len = record.len();
276 if payload_len > self.max_record_size as usize {
277 return Err(WalError::RecordTooLarge {
278 len: payload_len,
279 max: self.max_record_size,
280 });
281 }
282 let frame_len = record::framed_len(payload_len) as u64;
283
284 // Reserve the byte range. The returned start offset is the LSN, and
285 // because it comes from a single atomic it is unique and ordered.
286 let start = self.tail.0.fetch_add(frame_len, Ordering::Relaxed);
287 let end = match start.checked_add(frame_len) {
288 Some(end) => end,
289 None => {
290 self.commit.mark_failed(start);
291 return Err(WalError::io(
292 "reserving a record offset",
293 io::Error::other("log size exceeds u64"),
294 ));
295 }
296 };
297
298 match self.frame_and_write(start, record) {
299 Ok(()) => {
300 self.commit.mark_written(start, end);
301 Ok(Lsn::new(start))
302 }
303 Err(error) => {
304 self.commit.mark_failed(start);
305 Err(error)
306 }
307 }
308 }
309
310 /// Make every record appended before this call durable.
311 ///
312 /// Returns once the data is on stable storage, using the platform's true
313 /// durability barrier. Concurrent calls coalesce into a single fsync, so the
314 /// flush cost is shared by everyone committing at the same time.
315 ///
316 /// # Errors
317 ///
318 /// Returns [`WalError::Io`] if the flush fails, or [`WalError::Corruption`]
319 /// if an earlier append's write failed and left a gap that cannot be made
320 /// durable. A failed sync means the records are not durable; treat it as
321 /// fatal, not as something to retry blindly.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// use wal_db::Wal;
327 /// # fn main() -> Result<(), wal_db::WalError> {
328 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
329 /// # let path = dir.path().join("log.wal");
330 /// let wal = Wal::open(&path)?;
331 /// wal.append(b"durable me")?;
332 /// wal.sync()?; // now on stable storage
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn sync(&self) -> Result<()> {
337 let target = self.tail.0.load(Ordering::Acquire);
338 if target == 0 {
339 return Ok(());
340 }
341 self.commit.sync_to(&self.store, target)
342 }
343
344 /// Append `record` and make it durable in one call, returning its [`Lsn`].
345 ///
346 /// Equivalent to [`append`](Wal::append) followed by a [`sync`](Wal::sync)
347 /// scoped to this record, but with the sync coalesced into the group commit
348 /// of any other threads syncing at the same moment. Use it when every record
349 /// must be durable before you proceed and you want the group-commit
350 /// throughput without managing the two calls yourself.
351 ///
352 /// # Errors
353 ///
354 /// The union of [`append`](Wal::append)'s and [`sync`](Wal::sync)'s errors.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// use wal_db::Wal;
360 /// # fn main() -> Result<(), wal_db::WalError> {
361 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
362 /// # let path = dir.path().join("log.wal");
363 /// let wal = Wal::open(&path)?;
364 /// let lsn = wal.append_and_sync(b"committed immediately")?;
365 /// # let _ = lsn;
366 /// # Ok(())
367 /// # }
368 /// ```
369 pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn> {
370 let lsn = self.append(record)?;
371 let end = lsn.get() + record::framed_len(record.len()) as u64;
372 self.commit.sync_to(&self.store, end)?;
373 Ok(lsn)
374 }
375
376 /// Serialise `value` with `pack-io` and append it, returning its [`Lsn`].
377 ///
378 /// The typed counterpart to [`append`](Wal::append): the value is encoded to
379 /// bytes and appended as one record, which [`Record::decode`] reads back.
380 /// Available with the `pack-io` feature. Like `append`, it does not sync.
381 ///
382 /// # Errors
383 ///
384 /// - [`WalError::Encoding`] if the value fails to serialise.
385 /// - Otherwise the errors of [`append`](Wal::append) ([`WalError::RecordTooLarge`],
386 /// [`WalError::Io`]).
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use wal_db::{MemStore, Wal};
392 /// use wal_db::pack_io::{Deserialize, Serialize};
393 ///
394 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
395 /// struct Entry {
396 /// key: String,
397 /// value: u64,
398 /// }
399 ///
400 /// # fn main() -> Result<(), wal_db::WalError> {
401 /// let wal = Wal::with_store(MemStore::new())?;
402 /// wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
403 ///
404 /// let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
405 /// assert_eq!(entry.value, 100);
406 /// # Ok(())
407 /// # }
408 /// ```
409 #[cfg(feature = "pack-io")]
410 pub fn append_typed<T: pack_io::Serialize + ?Sized>(&self, value: &T) -> Result<Lsn> {
411 let bytes = pack_io::encode(value).map_err(WalError::encoding)?;
412 self.append(&bytes)
413 }
414
415 /// Iterate the log from the beginning, yielding each record in append order.
416 ///
417 /// The iterator walks the records that are fully written at the moment it is
418 /// created — it does not see records still being written by other threads, or
419 /// appended afterwards. Each item is a [`Result`]: a damaged record yields a
420 /// single [`WalError::Corruption`] and then the iterator stops. In a log
421 /// opened normally the torn tail has already been truncated, so iteration
422 /// runs cleanly to the end.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// use wal_db::{MemStore, Wal};
428 /// # fn main() -> Result<(), wal_db::WalError> {
429 /// let wal = Wal::with_store(MemStore::new())?;
430 /// wal.append(b"one")?;
431 /// wal.append(b"two")?;
432 ///
433 /// let mut seen = Vec::new();
434 /// for entry in wal.iter()? {
435 /// seen.push(entry?.into_data());
436 /// }
437 /// assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);
438 /// # Ok(())
439 /// # }
440 /// ```
441 pub fn iter(&self) -> Result<WalIter<'_, S>> {
442 let end = self.commit.committed();
443 Ok(WalIter {
444 wal: self,
445 offset: self.head.load(Ordering::Acquire),
446 end,
447 done: false,
448 policy: self.recovery_policy,
449 })
450 }
451
452 /// Iterate from `from` (a record's [`Lsn`]) to the end, skipping the records
453 /// before it.
454 ///
455 /// Because an LSN is a byte offset, seeking is O(1): iteration simply starts
456 /// at `from` instead of 0. Pass an [`Lsn`] that a previous
457 /// [`append`](Wal::append) or [`iter`](Wal::iter) produced — a real record
458 /// boundary. An `Lsn` that does not land on a record boundary will be read as
459 /// a malformed record and surface as [`WalError::Corruption`]; an `Lsn` past
460 /// the end yields an empty iterator.
461 ///
462 /// # Examples
463 ///
464 /// ```
465 /// use wal_db::{MemStore, Wal};
466 /// # fn main() -> Result<(), wal_db::WalError> {
467 /// let wal = Wal::with_store(MemStore::new())?;
468 /// wal.append(b"one")?;
469 /// let second = wal.append(b"two")?;
470 /// wal.append(b"three")?;
471 ///
472 /// let from_second: Vec<Vec<u8>> = wal
473 /// .iter_from(second)?
474 /// .map(|entry| entry.map(|r| r.into_data()))
475 /// .collect::<Result<_, _>>()?;
476 /// assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);
477 /// # Ok(())
478 /// # }
479 /// ```
480 pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>> {
481 let end = self.commit.committed();
482 // Never read below the head: a dropped prefix is gone.
483 let start = from.get().max(self.head.load(Ordering::Acquire)).min(end);
484 Ok(WalIter {
485 wal: self,
486 offset: start,
487 end,
488 done: false,
489 policy: self.recovery_policy,
490 })
491 }
492
493 /// Drop every record after the one at `lsn`, keeping the log up to and
494 /// including it. For compaction.
495 ///
496 /// The record at `lsn` becomes the new last record; the next append lands
497 /// right after it. The truncation is made durable before returning. `lsn`
498 /// must be a real record boundary from a previous [`append`](Wal::append) or
499 /// [`iter`](Wal::iter), and the record there must be intact.
500 ///
501 /// # Exclusive access
502 ///
503 /// This mutates the log's end, so it must **not** run concurrently with
504 /// [`append`](Wal::append), [`sync`](Wal::sync), or another `truncate_after`.
505 /// The caller is responsible for quiescing writers first — the usual case for
506 /// compaction, where the engine pauses the log, truncates, and resumes.
507 ///
508 /// # Errors
509 ///
510 /// - [`WalError::Corruption`] if `lsn` does not point at an intact record.
511 /// - [`WalError::Io`] if the truncation or its sync fails.
512 ///
513 /// # Examples
514 ///
515 /// ```
516 /// use wal_db::{MemStore, Wal};
517 /// # fn main() -> Result<(), wal_db::WalError> {
518 /// let wal = Wal::with_store(MemStore::new())?;
519 /// wal.append(b"keep me")?;
520 /// let last_kept = wal.append(b"and me")?;
521 /// wal.append(b"drop me")?;
522 ///
523 /// wal.truncate_after(last_kept)?;
524 ///
525 /// let remaining: Vec<Vec<u8>> = wal
526 /// .iter()?
527 /// .map(|entry| entry.map(|r| r.into_data()))
528 /// .collect::<Result<_, _>>()?;
529 /// assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);
530 /// # Ok(())
531 /// # }
532 /// ```
533 pub fn truncate_after(&self, lsn: Lsn) -> Result<()> {
534 let start = lsn.get();
535
536 // Confirm an intact record really lives at `lsn` before keeping it.
537 let mut header = [0u8; HEADER_LEN];
538 if self.store.read_at(start, &mut header)? < HEADER_LEN {
539 return Err(WalError::corruption(start, "no record at this LSN"));
540 }
541 let parsed = record::parse_header(&header);
542 if parsed.len > self.max_record_size {
543 return Err(WalError::corruption(start, "no valid record at this LSN"));
544 }
545 let payload_start = start
546 .checked_add(HEADER_LEN as u64)
547 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
548 let mut payload = vec![0u8; parsed.len as usize];
549 if self.store.read_at(payload_start, &mut payload)? < payload.len() {
550 return Err(WalError::corruption(start, "incomplete record at this LSN"));
551 }
552 if !record::verify(&header, &payload, parsed.crc) {
553 return Err(WalError::corruption(start, "no valid record at this LSN"));
554 }
555 let new_end = payload_start
556 .checked_add(u64::from(parsed.len))
557 .ok_or_else(|| WalError::corruption(start, "record offset overflow"))?;
558
559 self.store.truncate(new_end)?;
560 self.store.sync()?;
561 self.tail.0.store(new_end, Ordering::Release);
562 self.commit.reset(new_end);
563 Ok(())
564 }
565
566 /// Drop the records before the one at `lsn`, keeping the log from there on,
567 /// and return the new head [`Lsn`] — the lowest record still present.
568 ///
569 /// This is prefix compaction: once a consumer has durably applied (and
570 /// flushed elsewhere) everything up to a checkpoint, the old records can be
571 /// reclaimed. Offsets are preserved — surviving records keep their LSNs — so
572 /// [`iter`](Wal::iter) and [`iter_from`](Wal::iter_from) continue to work.
573 ///
574 /// Reading resumes at exactly `lsn` — the returned head is `lsn` itself
575 /// (clamped so it never moves backward or past the end). *Reclamation*,
576 /// though, is at the backend's granularity: a segmented log
577 /// ([`Wal::open_segmented`](Wal::open_segmented)) deletes whole leading segment
578 /// files below the one that holds `lsn`, so a little space just before `lsn` —
579 /// back to its segment boundary — is kept rather than reclaimed, and the
580 /// segment with the most recent records is never dropped. A single-file log
581 /// cannot reclaim a prefix without moving the surviving bytes (which would
582 /// change their LSNs), so it is left unchanged and the returned head is
583 /// `Lsn(0)`.
584 ///
585 /// # Exclusive access
586 ///
587 /// Like [`truncate_after`](Wal::truncate_after), this must **not** run
588 /// concurrently with [`append`](Wal::append), [`sync`](Wal::sync),
589 /// [`iter`](Wal::iter), or another truncation: it removes files a reader could
590 /// be holding open. Quiesce other users first.
591 ///
592 /// # Errors
593 ///
594 /// Returns [`WalError::Io`] if the removal fails.
595 ///
596 /// # Examples
597 ///
598 /// ```
599 /// use wal_db::Wal;
600 /// # fn main() -> Result<(), wal_db::WalError> {
601 /// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
602 /// // 32-byte segments so a handful of records spans several files.
603 /// let wal = Wal::open_segmented(dir.path(), 32)?;
604 /// for i in 0..10 {
605 /// let _ = wal.append(format!("record {i}").as_bytes())?;
606 /// }
607 /// let checkpoint = wal.append(b"checkpoint")?;
608 /// wal.sync()?;
609 ///
610 /// // Reclaim everything before the checkpoint's segment.
611 /// let head = wal.truncate_before(checkpoint)?;
612 /// assert!(head <= checkpoint);
613 /// // Iteration now starts at (or before) the checkpoint, never at 0.
614 /// assert!(wal.iter()?.next().unwrap()?.lsn() >= head);
615 /// # Ok(())
616 /// # }
617 /// ```
618 pub fn truncate_before(&self, lsn: Lsn) -> Result<Lsn> {
619 let new_head = self.store.truncate_before(lsn.get())?;
620 self.head.store(new_head, Ordering::Release);
621 Ok(Lsn::new(new_head))
622 }
623
624 /// The logical size of the log in bytes, including record framing.
625 ///
626 /// This is the offset at which the next append will land. It counts bytes
627 /// that have been reserved, which under heavy concurrency may include a
628 /// record another thread is still writing.
629 #[must_use]
630 pub fn len(&self) -> u64 {
631 self.tail.0.load(Ordering::Acquire)
632 }
633
634 /// Whether the log holds no records.
635 #[must_use]
636 pub fn is_empty(&self) -> bool {
637 self.len() == 0
638 }
639
640 /// Frame `record` into a reused buffer and write it at `start`.
641 fn frame_and_write(&self, start: u64, record: &[u8]) -> Result<()> {
642 with_frame_buffer(|buf| {
643 record::encode(buf, record);
644 self.store.write_at(start, buf)
645 })
646 }
647
648 /// Crate-internal access to the backing store, for tests that need to read,
649 /// corrupt, or extend the on-disk image directly.
650 #[cfg(test)]
651 pub(crate) fn store(&self) -> &S {
652 &self.store
653 }
654}
655
656/// Frame a record using a reused thread-local buffer, so steady-state appends do
657/// not allocate. Under loom a fresh buffer is used, since the model checker does
658/// not need (and does not instrument) the thread-local.
659#[cfg(not(loom))]
660fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
661 thread_local! {
662 static FRAME: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
663 }
664 FRAME.with(|cell| f(&mut cell.borrow_mut()))
665}
666
667#[cfg(loom)]
668fn with_frame_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
669 let mut buf = Vec::new();
670 f(&mut buf)
671}
672
673/// Scan a store from the start, returning the end offset of the last intact
674/// record and truncating any torn tail beyond it.
675fn recover<S: WalStore>(store: &S, max_record_size: u32, head: u64) -> Result<u64> {
676 let physical = store.len()?;
677 // Scan from the head: a log whose prefix was dropped no longer starts at 0.
678 let mut offset: u64 = head;
679 let mut header = [0u8; HEADER_LEN];
680 // One reused payload buffer for the whole scan: only the checksum needs the
681 // bytes, so the buffer is refilled per record rather than reallocated.
682 let mut payload = Vec::new();
683
684 while offset < physical {
685 if store.read_at(offset, &mut header)? < HEADER_LEN {
686 break; // incomplete header: torn tail
687 }
688 let parsed = record::parse_header(&header);
689 if parsed.len > max_record_size {
690 break; // implausible length: treat the rest as a torn tail
691 }
692
693 let payload_start = match offset.checked_add(HEADER_LEN as u64) {
694 Some(start) => start,
695 None => break,
696 };
697 let len = parsed.len as usize;
698 payload.clear();
699 payload.resize(len, 0);
700 if store.read_at(payload_start, &mut payload)? < len {
701 break; // incomplete payload: torn tail
702 }
703 if !record::verify(&header, &payload, parsed.crc) {
704 break; // checksum mismatch: stop here
705 }
706
707 offset = match payload_start.checked_add(u64::from(parsed.len)) {
708 Some(end) => end,
709 None => break,
710 };
711 }
712
713 if offset < physical {
714 store.truncate(offset)?;
715 }
716 Ok(offset)
717}
718
719impl<S: WalStore> fmt::Debug for Wal<S> {
720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721 f.debug_struct("Wal")
722 .field("len", &self.tail.0.load(Ordering::Relaxed))
723 .finish_non_exhaustive()
724 }
725}
726
727/// One record read back during iteration: its [`Lsn`] and its payload bytes.
728///
729/// Yielded by [`Wal::iter`]. The payload is owned (a fresh `Vec` per record);
730/// take it without copying via [`into_data`](Record::into_data), or borrow it
731/// via [`data`](Record::data).
732#[derive(Debug, Clone, PartialEq, Eq)]
733pub struct Record {
734 lsn: Lsn,
735 data: Vec<u8>,
736}
737
738impl Record {
739 /// The sequence number this record was assigned — its byte offset in the log.
740 pub fn lsn(&self) -> Lsn {
741 self.lsn
742 }
743
744 /// The record's payload bytes.
745 #[must_use]
746 pub fn data(&self) -> &[u8] {
747 &self.data
748 }
749
750 /// The payload length in bytes.
751 #[must_use]
752 pub fn len(&self) -> usize {
753 self.data.len()
754 }
755
756 /// Whether the record's payload is empty.
757 #[must_use]
758 pub fn is_empty(&self) -> bool {
759 self.data.is_empty()
760 }
761
762 /// Consume the record and take ownership of its payload without copying.
763 #[must_use]
764 pub fn into_data(self) -> Vec<u8> {
765 self.data
766 }
767
768 /// Decode the record's payload into a typed value via `pack-io`.
769 ///
770 /// The mirror of [`Wal::append_typed`]. Available with the `pack-io` feature.
771 ///
772 /// # Errors
773 ///
774 /// Returns [`WalError::Encoding`] if the bytes do not deserialise into `T` —
775 /// for example reading a record written as a different type.
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use wal_db::{MemStore, Wal};
781 /// use wal_db::pack_io::{Deserialize, Serialize};
782 ///
783 /// #[derive(Serialize, Deserialize, PartialEq, Debug)]
784 /// struct Event {
785 /// id: u64,
786 /// name: String,
787 /// }
788 ///
789 /// # fn main() -> Result<(), wal_db::WalError> {
790 /// let wal = Wal::with_store(MemStore::new())?;
791 /// wal.append_typed(&Event { id: 7, name: "boot".into() })?;
792 ///
793 /// let record = wal.iter()?.next().unwrap()?;
794 /// let event: Event = record.decode()?;
795 /// assert_eq!(event, Event { id: 7, name: "boot".into() });
796 /// # Ok(())
797 /// # }
798 /// ```
799 #[cfg(feature = "pack-io")]
800 pub fn decode<T: pack_io::Deserialize>(&self) -> Result<T> {
801 pack_io::decode(&self.data).map_err(WalError::encoding)
802 }
803}
804
805/// The outcome of reading one record-sized chunk at the iterator's cursor.
806enum Step {
807 /// A valid record, plus the offset just past it.
808 Record(Record, u64),
809 /// A damaged record. `skip_to` is the offset of the next record if its
810 /// extent is known (length and payload present, only the checksum failed),
811 /// or `None` if the damage makes the next record's position unknowable.
812 Damaged(WalError, Option<u64>),
813 /// A clean end: a short read, meaning the log stops here (a torn tail).
814 End,
815}
816
817/// The iterator returned by [`Wal::iter`].
818///
819/// Walks the records fully written when it was created, yielding
820/// `Result<`[`Record`]`>`. Behaviour at a damaged record follows the configured
821/// [`RecoveryPolicy`]: by default the iterator yields the damage once and stops;
822/// under [`RecoveryPolicy::SkipBadRecords`] it yields the damage and continues
823/// past it when the next record's position is still recoverable.
824pub struct WalIter<'a, S: WalStore = FileStore> {
825 wal: &'a Wal<S>,
826 offset: u64,
827 end: u64,
828 done: bool,
829 policy: RecoveryPolicy,
830}
831
832impl<S: WalStore> WalIter<'_, S> {
833 /// Read and classify the record at the current offset, without advancing.
834 fn step(&self) -> Result<Step> {
835 let mut header = [0u8; HEADER_LEN];
836 if self.wal.store.read_at(self.offset, &mut header)? < HEADER_LEN {
837 return Ok(Step::End);
838 }
839 let parsed = record::parse_header(&header);
840 if parsed.len > self.wal.max_record_size {
841 // The length is implausible, so the next record's position is
842 // unknowable — there is nothing to skip to.
843 return Ok(Step::Damaged(
844 WalError::corruption(self.offset, "record length exceeds the maximum"),
845 None,
846 ));
847 }
848
849 let payload_start = self
850 .offset
851 .checked_add(HEADER_LEN as u64)
852 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
853 let mut payload = vec![0u8; parsed.len as usize];
854 if self.wal.store.read_at(payload_start, &mut payload)? < payload.len() {
855 return Ok(Step::End);
856 }
857 let next = payload_start
858 .checked_add(u64::from(parsed.len))
859 .ok_or_else(|| WalError::corruption(self.offset, "record offset overflow"))?;
860
861 if !record::verify(&header, &payload, parsed.crc) {
862 // The length and payload are present, so we know where the next
863 // record starts even though this one is corrupt.
864 return Ok(Step::Damaged(
865 WalError::corruption(self.offset, "checksum mismatch"),
866 Some(next),
867 ));
868 }
869
870 Ok(Step::Record(
871 Record {
872 lsn: Lsn::new(self.offset),
873 data: payload,
874 },
875 next,
876 ))
877 }
878}
879
880impl<S: WalStore> Iterator for WalIter<'_, S> {
881 type Item = Result<Record>;
882
883 fn next(&mut self) -> Option<Self::Item> {
884 if self.done || self.offset >= self.end {
885 return None;
886 }
887 match self.step() {
888 Ok(Step::Record(record, next)) => {
889 self.offset = next;
890 Some(Ok(record))
891 }
892 // Skip-bad-records, and the next record is locatable: surface the
893 // damage but continue from past it on the next call.
894 Ok(Step::Damaged(error, Some(next)))
895 if self.policy == RecoveryPolicy::SkipBadRecords =>
896 {
897 self.offset = next;
898 Some(Err(error))
899 }
900 // Stop-at-first-error, or damage that makes the next position
901 // unknowable: surface the damage and end.
902 Ok(Step::Damaged(error, _)) => {
903 self.done = true;
904 Some(Err(error))
905 }
906 Ok(Step::End) => {
907 self.done = true;
908 None
909 }
910 Err(error) => {
911 self.done = true;
912 Some(Err(error))
913 }
914 }
915 }
916}
917
918impl<S: WalStore> fmt::Debug for WalIter<'_, S> {
919 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920 f.debug_struct("WalIter")
921 .field("offset", &self.offset)
922 .field("end", &self.end)
923 .field("done", &self.done)
924 .finish()
925 }
926}
927
928#[cfg(all(test, not(loom)))]
929#[allow(
930 clippy::unwrap_used,
931 clippy::expect_used,
932 unused_must_use,
933 unused_results
934)]
935mod tests {
936 use std::sync::Arc;
937 use std::thread;
938
939 use super::*;
940 use crate::store::MemStore;
941
942 fn drain(wal: &Wal<MemStore>) -> Vec<Vec<u8>> {
943 wal.iter()
944 .unwrap()
945 .map(|r| r.unwrap().into_data())
946 .collect()
947 }
948
949 fn corrupt_byte(store: &MemStore, offset: u64) {
950 let mut byte = [0u8; 1];
951 store.read_at(offset, &mut byte).unwrap();
952 byte[0] ^= 0xFF;
953 store.write_at(offset, &byte).unwrap();
954 }
955
956 #[test]
957 fn test_stop_at_first_error_stops_at_corruption() {
958 let wal = Wal::with_store(MemStore::new()).unwrap(); // default policy
959 wal.append(b"first").unwrap();
960 let second = wal.append(b"second").unwrap();
961 wal.append(b"third").unwrap();
962 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
963
964 let items: Vec<_> = wal.iter().unwrap().collect();
965 assert_eq!(items.len(), 2); // first ok, second damaged, then stop
966 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
967 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
968 }
969
970 #[test]
971 fn test_skip_bad_records_continues_past_corruption() {
972 let config = WalConfig::new().with_recovery_policy(RecoveryPolicy::SkipBadRecords);
973 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
974 wal.append(b"first").unwrap();
975 let second = wal.append(b"second").unwrap();
976 wal.append(b"third").unwrap();
977 // Corrupt the payload only; the length prefix stays intact, so the
978 // record is skippable.
979 corrupt_byte(wal.store(), second.get() + HEADER_LEN as u64);
980
981 let items: Vec<_> = wal.iter().unwrap().collect();
982 assert_eq!(items.len(), 3);
983 assert_eq!(items[0].as_ref().unwrap().data(), b"first");
984 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
985 assert_eq!(items[2].as_ref().unwrap().data(), b"third");
986 }
987
988 #[test]
989 fn test_skip_bad_records_still_stops_on_unreadable_length() {
990 let config = WalConfig::new()
991 .with_max_record_size(16)
992 .with_recovery_policy(RecoveryPolicy::SkipBadRecords);
993 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
994 wal.append(b"ok").unwrap();
995 let second = wal.append(b"victim").unwrap();
996 // Corrupt the length field to an implausible value: the next record's
997 // position becomes unknowable, so even skip-mode must stop.
998 corrupt_byte(wal.store(), second.get() + 4); // LEN_OFFSET within the header
999
1000 let items: Vec<_> = wal.iter().unwrap().collect();
1001 assert_eq!(items.len(), 2); // first ok, then a damaged stop
1002 assert_eq!(items[0].as_ref().unwrap().data(), b"ok");
1003 assert!(matches!(items[1], Err(WalError::Corruption { .. })));
1004 }
1005
1006 #[cfg(feature = "pack-io")]
1007 #[test]
1008 fn test_typed_record_roundtrip() {
1009 use pack_io::{Deserialize, Serialize};
1010
1011 #[derive(Serialize, Deserialize, PartialEq, Debug)]
1012 struct Entry {
1013 id: u64,
1014 label: String,
1015 }
1016
1017 let wal = Wal::with_store(MemStore::new()).unwrap();
1018 wal.append_typed(&Entry {
1019 id: 9,
1020 label: "nine".into(),
1021 })
1022 .unwrap();
1023 wal.append_typed(&Entry {
1024 id: 10,
1025 label: "ten".into(),
1026 })
1027 .unwrap();
1028
1029 let decoded: Vec<Entry> = wal
1030 .iter()
1031 .unwrap()
1032 .map(|r| r.unwrap().decode().unwrap())
1033 .collect();
1034 assert_eq!(
1035 decoded[0],
1036 Entry {
1037 id: 9,
1038 label: "nine".into()
1039 }
1040 );
1041 assert_eq!(
1042 decoded[1],
1043 Entry {
1044 id: 10,
1045 label: "ten".into()
1046 }
1047 );
1048 }
1049
1050 #[cfg(feature = "pack-io")]
1051 #[test]
1052 fn test_typed_decode_wrong_type_errors() {
1053 use pack_io::{Deserialize, Serialize};
1054
1055 #[derive(Serialize)]
1056 struct Big {
1057 a: u64,
1058 b: u64,
1059 c: u64,
1060 }
1061 #[derive(Deserialize)]
1062 struct Small {
1063 _a: u8,
1064 }
1065
1066 let wal = Wal::with_store(MemStore::new()).unwrap();
1067 wal.append_typed(&Big { a: 1, b: 2, c: 3 }).unwrap();
1068 let record = wal.iter().unwrap().next().unwrap().unwrap();
1069 // Decoding 24 bytes as a 1-byte type leaves trailing bytes -> error.
1070 let result: Result<Small> = record.decode();
1071 assert!(matches!(result, Err(WalError::Encoding { .. })));
1072 }
1073
1074 #[test]
1075 fn test_append_assigns_byte_offset_lsns() {
1076 let wal = Wal::with_store(MemStore::new()).unwrap();
1077 let a = wal.append(b"abc").unwrap(); // 8 header + 3 = 11 bytes
1078 let b = wal.append(b"de").unwrap();
1079 assert_eq!(a.get(), 0);
1080 assert_eq!(b.get(), 11);
1081 }
1082
1083 #[test]
1084 fn test_iter_reads_back_all_records_in_order() {
1085 let wal = Wal::with_store(MemStore::new()).unwrap();
1086 wal.append(b"one").unwrap();
1087 wal.append(b"two").unwrap();
1088 wal.append(b"three").unwrap();
1089 assert_eq!(
1090 drain(&wal),
1091 vec![b"one".to_vec(), b"two".to_vec(), b"three".to_vec()]
1092 );
1093 }
1094
1095 #[test]
1096 fn test_empty_log_iterates_to_nothing() {
1097 let wal = Wal::with_store(MemStore::new()).unwrap();
1098 assert!(wal.is_empty());
1099 assert_eq!(drain(&wal).len(), 0);
1100 }
1101
1102 #[test]
1103 fn test_empty_record_roundtrips() {
1104 let wal = Wal::with_store(MemStore::new()).unwrap();
1105 wal.append(b"").unwrap();
1106 assert_eq!(drain(&wal), vec![Vec::<u8>::new()]);
1107 }
1108
1109 #[test]
1110 fn test_record_too_large_is_rejected() {
1111 let config = WalConfig::new().with_max_record_size(4);
1112 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1113 wal.append(b"ok").unwrap();
1114 let err = wal.append(b"too long").unwrap_err();
1115 assert!(matches!(err, WalError::RecordTooLarge { len: 8, max: 4 }));
1116 // The rejected append did not advance the log.
1117 assert_eq!(drain(&wal), vec![b"ok".to_vec()]);
1118 }
1119
1120 #[test]
1121 fn test_reopen_recovers_records() {
1122 let wal = Wal::with_store(MemStore::new()).unwrap();
1123 wal.append(b"first").unwrap();
1124 wal.append(b"second").unwrap();
1125 wal.sync().unwrap();
1126 let image = wal.store().snapshot();
1127
1128 let reopened = Wal::with_store(MemStore::from_bytes(image)).unwrap();
1129 assert_eq!(
1130 drain(&reopened),
1131 vec![b"first".to_vec(), b"second".to_vec()]
1132 );
1133 // The next append continues at the recovered end: two records of
1134 // (8 + 5) and (8 + 6) bytes leave the tail at 27.
1135 assert_eq!(reopened.append(b"third").unwrap().get(), 27);
1136 }
1137
1138 #[test]
1139 fn test_recovery_truncates_torn_tail() {
1140 let wal = Wal::with_store(MemStore::new()).unwrap();
1141 wal.append(b"good record").unwrap();
1142 let clean_len = wal.len();
1143 // Append raw garbage directly to the store: a torn tail.
1144 wal.store().write_at(clean_len, &[0xAB; 5]).unwrap();
1145
1146 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1147 assert_eq!(drain(&reopened), vec![b"good record".to_vec()]);
1148 assert_eq!(reopened.len(), clean_len);
1149 }
1150
1151 #[test]
1152 fn test_corrupt_record_surfaces_error_then_stops() {
1153 let wal = Wal::with_store(MemStore::new()).unwrap();
1154 wal.append(b"intact").unwrap();
1155 let second = wal.append(b"victim").unwrap();
1156 // Flip a byte inside the second record's payload (offset + header).
1157 let payload_offset = second.get() + HEADER_LEN as u64;
1158 let mut byte = [0u8; 1];
1159 wal.store().read_at(payload_offset, &mut byte).unwrap();
1160 byte[0] ^= 0xFF;
1161 wal.store().write_at(payload_offset, &byte).unwrap();
1162
1163 let mut iter = wal.iter().unwrap();
1164 assert_eq!(iter.next().unwrap().unwrap().data(), b"intact");
1165 assert!(matches!(
1166 iter.next().unwrap(),
1167 Err(WalError::Corruption { .. })
1168 ));
1169 assert!(iter.next().is_none());
1170 }
1171
1172 #[test]
1173 fn test_append_and_sync_is_durable() {
1174 let wal = Wal::with_store(MemStore::new()).unwrap();
1175 wal.append_and_sync(b"committed").unwrap();
1176 assert_eq!(drain(&wal), vec![b"committed".to_vec()]);
1177 }
1178
1179 #[test]
1180 fn test_iter_from_seeks_to_lsn() {
1181 let wal = Wal::with_store(MemStore::new()).unwrap();
1182 wal.append(b"a").unwrap();
1183 let b = wal.append(b"b").unwrap();
1184 wal.append(b"c").unwrap();
1185
1186 let got: Vec<Vec<u8>> = wal
1187 .iter_from(b)
1188 .unwrap()
1189 .map(|r| r.unwrap().into_data())
1190 .collect();
1191 assert_eq!(got, vec![b"b".to_vec(), b"c".to_vec()]);
1192 }
1193
1194 #[test]
1195 fn test_iter_from_past_end_is_empty() {
1196 let wal = Wal::with_store(MemStore::new()).unwrap();
1197 wal.append(b"a").unwrap();
1198 assert_eq!(wal.iter_from(Lsn::new(9_999)).unwrap().count(), 0);
1199 }
1200
1201 #[test]
1202 fn test_truncate_after_drops_later_records() {
1203 let wal = Wal::with_store(MemStore::new()).unwrap();
1204 wal.append(b"first").unwrap(); // [0, 13)
1205 let keep = wal.append(b"second").unwrap(); // [13, 27)
1206 wal.append(b"third").unwrap();
1207 wal.append(b"fourth").unwrap();
1208
1209 wal.truncate_after(keep).unwrap();
1210 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1211 assert_eq!(wal.len(), 27);
1212
1213 // Appends resume immediately after the kept record.
1214 assert_eq!(wal.append(b"new").unwrap().get(), 27);
1215 assert_eq!(
1216 drain(&wal),
1217 vec![b"first".to_vec(), b"second".to_vec(), b"new".to_vec()]
1218 );
1219 }
1220
1221 #[test]
1222 fn test_truncate_after_keeping_last_record_is_a_no_op() {
1223 let wal = Wal::with_store(MemStore::new()).unwrap();
1224 wal.append(b"first").unwrap();
1225 let last = wal.append(b"second").unwrap();
1226 let before = wal.len();
1227
1228 wal.truncate_after(last).unwrap();
1229 assert_eq!(wal.len(), before);
1230 assert_eq!(drain(&wal), vec![b"first".to_vec(), b"second".to_vec()]);
1231 }
1232
1233 #[test]
1234 fn test_truncate_after_invalid_lsn_errors() {
1235 let config = WalConfig::new().with_max_record_size(64);
1236 let wal = Wal::with_store_and_config(MemStore::new(), config).unwrap();
1237 wal.append(b"only record").unwrap();
1238 // An LSN that does not land on a record boundary is rejected.
1239 let err = wal.truncate_after(Lsn::new(3)).unwrap_err();
1240 assert!(matches!(err, WalError::Corruption { .. }));
1241 }
1242
1243 #[test]
1244 fn test_concurrent_appends_no_overlap_all_recovered() {
1245 const THREADS: usize = 8;
1246 const PER_THREAD: usize = 200;
1247
1248 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1249 let mut handles = Vec::new();
1250 for t in 0..THREADS {
1251 let wal = Arc::clone(&wal);
1252 handles.push(thread::spawn(move || {
1253 let mut lsns = Vec::with_capacity(PER_THREAD);
1254 for i in 0..PER_THREAD {
1255 let payload = format!("t{t}-r{i}").into_bytes();
1256 lsns.push(wal.append(&payload).unwrap().get());
1257 }
1258 lsns
1259 }));
1260 }
1261 let mut all_lsns = Vec::new();
1262 for h in handles {
1263 all_lsns.extend(h.join().unwrap());
1264 }
1265 wal.sync().unwrap();
1266
1267 // Every LSN is distinct (no two records shared a byte range).
1268 let mut sorted = all_lsns.clone();
1269 sorted.sort_unstable();
1270 sorted.dedup();
1271 assert_eq!(sorted.len(), THREADS * PER_THREAD);
1272
1273 // Recovery reads back exactly the records that were appended, in offset
1274 // order, with no gaps or corruption.
1275 let records = drain(&wal);
1276 assert_eq!(records.len(), THREADS * PER_THREAD);
1277
1278 // Reopening from the raw image recovers the same set.
1279 let reopened = Wal::with_store(MemStore::from_bytes(wal.store().snapshot())).unwrap();
1280 assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
1281 }
1282
1283 #[test]
1284 fn test_concurrent_append_and_sync_all_durable() {
1285 const THREADS: usize = 8;
1286
1287 let wal = Arc::new(Wal::with_store(MemStore::new()).unwrap());
1288 let mut handles = Vec::new();
1289 for t in 0..THREADS {
1290 let wal = Arc::clone(&wal);
1291 handles.push(thread::spawn(move || {
1292 for i in 0..50 {
1293 wal.append_and_sync(format!("{t}:{i}").as_bytes()).unwrap();
1294 }
1295 }));
1296 }
1297 for h in handles {
1298 h.join().unwrap();
1299 }
1300 assert_eq!(drain(&wal).len(), THREADS * 50);
1301 }
1302}