Skip to main content

wal_db/
store.rs

1//! Storage backends.
2//!
3//! A [`Wal`](crate::Wal) frames records, hands out sequence numbers, and
4//! coordinates durability; the bytes themselves live behind the [`WalStore`]
5//! trait. Every method takes `&self`, because the multi-writer append path
6//! writes from several threads at once — the store must accept concurrent,
7//! positioned writes without a lock of its own (a file does; an in-memory
8//! [`MemStore`] uses a short internal lock). The default [`FileStore`] writes to
9//! a file; a custom implementation could put the log on any byte-addressable,
10//! appendable medium.
11
12use std::{
13    fs::{File, OpenOptions},
14    io,
15    path::{Path, PathBuf},
16    sync::Mutex,
17};
18
19use crate::error::{Result, WalError};
20
21/// A byte-addressable, append-only store with an explicit durability barrier.
22///
23/// The log treats a store as a growing array of bytes. It writes framed records
24/// at reserved offsets — possibly from several threads concurrently and out of
25/// order — reads from arbitrary offsets during recovery, and occasionally
26/// truncates a torn tail. The one guarantee the log cannot provide itself — that
27/// written bytes have reached stable storage — is delegated to
28/// [`sync`](WalStore::sync).
29///
30/// # Implementing a backend
31///
32/// The contract an implementation must honour:
33///
34/// - [`write_at`](WalStore::write_at) writes `bytes` at `offset`, growing the
35///   store if `offset` is past the current end and zero-filling any gap (so a
36///   later offset written before an earlier one leaves detectable zero bytes in
37///   between, exactly as a sparse file does). Concurrent calls to disjoint
38///   ranges must not corrupt each other.
39/// - [`read_at`](WalStore::read_at) fills `buf` starting at `offset`, returning
40///   the number of bytes read. It returns fewer than `buf.len()` only when the
41///   store ends first — that short read is how recovery detects a torn tail.
42/// - [`sync`](WalStore::sync) returns only once every prior write is durable.
43/// - [`truncate`](WalStore::truncate) discards everything at or after `len`.
44///
45/// `Send + Sync` is required so the log can be shared across threads.
46///
47/// # Examples
48///
49/// ```
50/// use wal_db::{MemStore, Wal};
51///
52/// # fn main() -> Result<(), wal_db::WalError> {
53/// let wal = Wal::with_store(MemStore::new())?;
54/// wal.append(b"record")?;
55/// wal.sync()?;
56/// # Ok(())
57/// # }
58/// ```
59pub trait WalStore: Send + Sync {
60    /// Write `bytes` at byte `offset`, growing the store and zero-filling any
61    /// gap if `offset` is beyond the current end.
62    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()>;
63
64    /// Read into `buf` starting at byte `offset`, returning the number of bytes
65    /// read.
66    ///
67    /// A return value smaller than `buf.len()` means the store ended before
68    /// `buf` could be filled.
69    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize>;
70
71    /// Discard every byte at or after `len`, shrinking the store to exactly
72    /// `len` bytes.
73    fn truncate(&self, len: u64) -> Result<()>;
74
75    /// Flush every preceding [`write_at`](WalStore::write_at) to stable storage.
76    ///
77    /// Returns only once the data will survive a power loss. This is the
78    /// durability barrier the whole log rests on.
79    fn sync(&self) -> Result<()>;
80
81    /// The current size of the store in bytes.
82    fn len(&self) -> Result<u64>;
83
84    /// Whether the store holds no bytes.
85    ///
86    /// The default defers to [`len`](WalStore::len); override it only if a
87    /// backend can answer more cheaply.
88    fn is_empty(&self) -> Result<bool> {
89        Ok(self.len()? == 0)
90    }
91}
92
93/// A file-backed [`WalStore`]: the default storage for [`Wal::open`](crate::Wal::open).
94///
95/// All reads and writes are positioned (`pread`/`pwrite` on Unix, `seek_read`/
96/// `seek_write` on Windows), so concurrent appenders writing to disjoint offsets
97/// never contend on a shared file cursor, and a recovery read never disturbs an
98/// append. [`sync`](WalStore::sync) issues the platform's true durability
99/// barrier: `fdatasync` on Linux, `FlushFileBuffers` on Windows, and
100/// `fcntl(F_FULLFSYNC)` on macOS — the last because macOS's `fsync` does not
101/// flush the drive's write cache.
102#[derive(Debug)]
103pub struct FileStore {
104    file: File,
105    path: PathBuf,
106}
107
108impl FileStore {
109    /// Open the file at `path`, creating it if it does not exist.
110    ///
111    /// The store does not interpret the file's contents — it does not look for a
112    /// torn tail or validate records. That is [`Wal::open`](crate::Wal::open)'s
113    /// job, which scans on open and truncates any incomplete trailing record.
114    ///
115    /// # Errors
116    ///
117    /// Returns [`WalError::Io`] if the file cannot be opened (for example a
118    /// missing parent directory or insufficient permissions).
119    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
120        let path = path.as_ref().to_path_buf();
121        let file = OpenOptions::new()
122            .read(true)
123            .write(true)
124            .create(true)
125            .truncate(false)
126            .open(&path)
127            .map_err(|e| WalError::io("opening the log file", e))?;
128        Ok(FileStore { file, path })
129    }
130
131    /// The path this store was opened from.
132    #[must_use]
133    pub fn path(&self) -> &Path {
134        &self.path
135    }
136}
137
138impl WalStore for FileStore {
139    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
140        pwrite_all(&self.file, offset, bytes).map_err(|e| WalError::io("writing a record", e))
141    }
142
143    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
144        pread_fill(&self.file, offset, buf).map_err(|e| WalError::io("reading from the log", e))
145    }
146
147    fn truncate(&self, len: u64) -> Result<()> {
148        self.file
149            .set_len(len)
150            .map_err(|e| WalError::io("truncating the log", e))
151    }
152
153    fn sync(&self) -> Result<()> {
154        durable_sync(&self.file).map_err(|e| WalError::io("flushing to stable storage", e))
155    }
156
157    fn len(&self) -> Result<u64> {
158        Ok(self
159            .file
160            .metadata()
161            .map_err(|e| WalError::io("reading log file metadata", e))?
162            .len())
163    }
164}
165
166/// An in-memory [`WalStore`] backed by a `Vec<u8>` behind a short lock.
167///
168/// Everything a [`FileStore`] does, without touching the filesystem, including
169/// the sparse-file behaviour of zero-filling a gap when a higher offset is
170/// written first. [`sync`](WalStore::sync) is a no-op — memory has no separate
171/// durable tier — so a `MemStore` is for tests, examples, and benchmarking the
172/// framing path in isolation, not for durability.
173///
174/// # Examples
175///
176/// ```
177/// use wal_db::{MemStore, Wal};
178///
179/// # fn main() -> Result<(), wal_db::WalError> {
180/// let wal = Wal::with_store(MemStore::new())?;
181/// let lsn = wal.append(b"in memory")?;
182/// assert_eq!(lsn.get(), 0);
183/// # Ok(())
184/// # }
185/// ```
186#[derive(Debug, Default)]
187pub struct MemStore {
188    data: Mutex<Vec<u8>>,
189}
190
191impl MemStore {
192    /// Create an empty in-memory store.
193    #[must_use]
194    pub fn new() -> Self {
195        MemStore {
196            data: Mutex::new(Vec::new()),
197        }
198    }
199
200    /// Create an empty store that has pre-allocated room for `capacity` bytes,
201    /// to avoid reallocations during a known-size workload.
202    #[must_use]
203    pub fn with_capacity(capacity: usize) -> Self {
204        MemStore {
205            data: Mutex::new(Vec::with_capacity(capacity)),
206        }
207    }
208
209    /// Create a store preloaded with `bytes` — for example a log image captured
210    /// elsewhere, so [`Wal::with_store`](crate::Wal::with_store) can recover it.
211    #[must_use]
212    pub fn from_bytes(bytes: Vec<u8>) -> Self {
213        MemStore {
214            data: Mutex::new(bytes),
215        }
216    }
217
218    fn lock(&self) -> std::sync::MutexGuard<'_, Vec<u8>> {
219        self.data
220            .lock()
221            .unwrap_or_else(std::sync::PoisonError::into_inner)
222    }
223
224    /// A copy of the current bytes. Crate-internal, for tests that inspect or
225    /// snapshot the on-disk image.
226    #[cfg(test)]
227    pub(crate) fn snapshot(&self) -> Vec<u8> {
228        self.lock().clone()
229    }
230}
231
232impl Clone for MemStore {
233    fn clone(&self) -> Self {
234        MemStore {
235            data: Mutex::new(self.lock().clone()),
236        }
237    }
238}
239
240impl WalStore for MemStore {
241    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
242        let start = usize::try_from(offset).map_err(|_| {
243            WalError::io(
244                "writing to memory",
245                io::Error::other("offset exceeds usize"),
246            )
247        })?;
248        let end = start.checked_add(bytes.len()).ok_or_else(|| {
249            WalError::io(
250                "writing to memory",
251                io::Error::other("write overflows usize"),
252            )
253        })?;
254
255        let mut data = self.lock();
256        if data.len() < end {
257            data.resize(end, 0); // zero-fill any gap, like a sparse file
258        }
259        data[start..end].copy_from_slice(bytes);
260        Ok(())
261    }
262
263    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
264        let data = self.lock();
265        let start = match usize::try_from(offset) {
266            Ok(start) if start < data.len() => start,
267            _ => return Ok(0),
268        };
269        let available = &data[start..];
270        let n = available.len().min(buf.len());
271        buf[..n].copy_from_slice(&available[..n]);
272        Ok(n)
273    }
274
275    fn truncate(&self, len: u64) -> Result<()> {
276        let len = usize::try_from(len).unwrap_or(usize::MAX);
277        self.lock().truncate(len);
278        Ok(())
279    }
280
281    fn sync(&self) -> Result<()> {
282        Ok(())
283    }
284
285    fn len(&self) -> Result<u64> {
286        Ok(self.lock().len() as u64)
287    }
288}
289
290// ---------------------------------------------------------------------------
291// Platform-correct durability.
292// ---------------------------------------------------------------------------
293
294/// Flush every buffered write for `file` to stable storage.
295///
296/// On macOS this is `fcntl(F_FULLFSYNC)`. The standard library's `sync_all` and
297/// `sync_data` call `fsync(2)` there, which flushes the page cache to the device
298/// but leaves the data in the device's own write cache, where a power loss can
299/// still take it. `F_FULLFSYNC` is the documented way to force a full flush.
300#[cfg(target_os = "macos")]
301pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
302    use std::os::unix::io::AsRawFd;
303
304    let fd = file.as_raw_fd();
305    // SAFETY: `fd` is a valid, open file descriptor for as long as `file` is
306    // borrowed, so it cannot be closed from under us. `F_FULLFSYNC` takes no
307    // argument pointer and neither reads nor writes any user buffer. `fcntl`
308    // reports failure by returning -1 and setting `errno`, which is checked
309    // immediately below.
310    let ret = unsafe { libc::fcntl(fd, libc::F_FULLFSYNC) };
311    if ret == -1 {
312        return Err(io::Error::last_os_error());
313    }
314    Ok(())
315}
316
317/// On Linux `sync_data` is `fdatasync`; on Windows it is `FlushFileBuffers`.
318/// Both are true durability barriers, so the standard library call is correct
319/// on every platform except macOS.
320#[cfg(not(target_os = "macos"))]
321pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
322    file.sync_data()
323}
324
325// ---------------------------------------------------------------------------
326// Positioned I/O. Reads and writes carry their own offset so they never move a
327// shared file cursor, which is what lets disjoint concurrent writes proceed
328// without a lock.
329// ---------------------------------------------------------------------------
330
331#[cfg(unix)]
332pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
333    use std::os::unix::fs::FileExt;
334
335    while !buf.is_empty() {
336        match file.write_at(buf, offset) {
337            Ok(0) => {
338                return Err(io::Error::new(
339                    io::ErrorKind::WriteZero,
340                    "the store accepted zero bytes mid-record",
341                ));
342            }
343            Ok(n) => {
344                buf = &buf[n..];
345                offset += n as u64;
346            }
347            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
348            Err(e) => return Err(e),
349        }
350    }
351    Ok(())
352}
353
354#[cfg(windows)]
355pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
356    use std::os::windows::fs::FileExt;
357
358    while !buf.is_empty() {
359        match file.seek_write(buf, offset) {
360            Ok(0) => {
361                return Err(io::Error::new(
362                    io::ErrorKind::WriteZero,
363                    "the store accepted zero bytes mid-record",
364                ));
365            }
366            Ok(n) => {
367                buf = &buf[n..];
368                offset += n as u64;
369            }
370            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
371            Err(e) => return Err(e),
372        }
373    }
374    Ok(())
375}
376
377#[cfg(unix)]
378pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
379    use std::os::unix::fs::FileExt;
380
381    let mut total = 0;
382    while total < buf.len() {
383        match file.read_at(&mut buf[total..], offset) {
384            Ok(0) => break,
385            Ok(n) => {
386                total += n;
387                offset += n as u64;
388            }
389            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
390            Err(e) => return Err(e),
391        }
392    }
393    Ok(total)
394}
395
396#[cfg(windows)]
397pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
398    use std::os::windows::fs::FileExt;
399
400    let mut total = 0;
401    while total < buf.len() {
402        match file.seek_read(&mut buf[total..], offset) {
403            Ok(0) => break,
404            Ok(n) => {
405                total += n;
406                offset += n as u64;
407            }
408            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
409            Err(e) => return Err(e),
410        }
411    }
412    Ok(total)
413}
414
415#[cfg(test)]
416#[allow(clippy::unwrap_used, clippy::expect_used)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn test_memstore_write_at_advances_len() {
422        let store = MemStore::new();
423        assert_eq!(store.len().unwrap(), 0);
424        store.write_at(0, b"abc").unwrap();
425        assert_eq!(store.len().unwrap(), 3);
426        store.write_at(3, b"de").unwrap();
427        assert_eq!(store.len().unwrap(), 5);
428    }
429
430    #[test]
431    fn test_memstore_write_past_end_zero_fills_gap() {
432        let store = MemStore::new();
433        // Write at offset 4 while the store is empty: the gap [0,4) is zeros.
434        store.write_at(4, b"XY").unwrap();
435        assert_eq!(store.len().unwrap(), 6);
436        let mut buf = [0xFFu8; 6];
437        assert_eq!(store.read_at(0, &mut buf).unwrap(), 6);
438        assert_eq!(&buf, &[0, 0, 0, 0, b'X', b'Y']);
439    }
440
441    #[test]
442    fn test_memstore_read_past_end_is_short() {
443        let store = MemStore::new();
444        store.write_at(0, b"abc").unwrap();
445        let mut buf = [0u8; 8];
446        assert_eq!(store.read_at(1, &mut buf).unwrap(), 2);
447        assert_eq!(&buf[..2], b"bc");
448    }
449
450    #[test]
451    fn test_memstore_truncate_shrinks() {
452        let store = MemStore::new();
453        store.write_at(0, b"0123456789").unwrap();
454        store.truncate(4).unwrap();
455        assert_eq!(store.len().unwrap(), 4);
456    }
457
458    #[test]
459    fn test_filestore_roundtrip_through_disk() {
460        let dir = tempfile::tempdir().unwrap();
461        let path = dir.path().join("store.bin");
462
463        {
464            let store = FileStore::open(&path).unwrap();
465            store.write_at(0, b"hello world").unwrap();
466            store.sync().unwrap();
467            assert_eq!(store.len().unwrap(), 11);
468        }
469
470        let store = FileStore::open(&path).unwrap();
471        assert_eq!(store.len().unwrap(), 11);
472        let mut buf = [0u8; 5];
473        assert_eq!(store.read_at(6, &mut buf).unwrap(), 5);
474        assert_eq!(&buf, b"world");
475    }
476
477    #[test]
478    fn test_filestore_concurrent_disjoint_writes() {
479        use std::sync::Arc;
480        use std::thread;
481
482        let dir = tempfile::tempdir().unwrap();
483        let path = dir.path().join("concurrent.bin");
484        let store = Arc::new(FileStore::open(&path).unwrap());
485
486        let mut handles = Vec::new();
487        for i in 0..8u64 {
488            let store = Arc::clone(&store);
489            handles.push(thread::spawn(move || {
490                let byte = b'A' + i as u8;
491                store.write_at(i * 4, &[byte; 4]).unwrap();
492            }));
493        }
494        for h in handles {
495            h.join().unwrap();
496        }
497        store.sync().unwrap();
498
499        let mut buf = [0u8; 32];
500        assert_eq!(store.read_at(0, &mut buf).unwrap(), 32);
501        for i in 0..8 {
502            let expected = b'A' + i as u8;
503            assert_eq!(&buf[i * 4..i * 4 + 4], &[expected; 4]);
504        }
505    }
506
507    #[test]
508    fn test_filestore_sync_durable_across_reopen() {
509        let dir = tempfile::tempdir().unwrap();
510        let path = dir.path().join("durable.bin");
511        {
512            let store = FileStore::open(&path).unwrap();
513            store.write_at(0, b"persisted").unwrap();
514            store.sync().unwrap();
515        }
516        let store = FileStore::open(&path).unwrap();
517        let mut buf = [0u8; 9];
518        assert_eq!(store.read_at(0, &mut buf).unwrap(), 9);
519        assert_eq!(&buf, b"persisted");
520    }
521}