Skip to main content

wal_db/
store.rs

1//! Storage backends.
2//!
3//! A [`Wal`](crate::Wal) frames records, hands out sequence numbers, and
4//! coordinates durability; the bytes themselves live behind the [`WalStore`]
5//! trait. Every method takes `&self`, because the multi-writer append path
6//! writes from several threads at once — the store must accept concurrent,
7//! positioned writes without a lock of its own (a file does; an in-memory
8//! [`MemStore`] uses a short internal lock). The default [`FileStore`] writes to
9//! a file; a custom implementation could put the log on any byte-addressable,
10//! appendable medium.
11
12use std::{
13    fs::{File, OpenOptions},
14    io,
15    path::{Path, PathBuf},
16    sync::Mutex,
17};
18
19use crate::error::{Result, WalError};
20
21/// A byte-addressable, append-only store with an explicit durability barrier.
22///
23/// The log treats a store as a growing array of bytes. It writes framed records
24/// at reserved offsets — possibly from several threads concurrently and out of
25/// order — reads from arbitrary offsets during recovery, and occasionally
26/// truncates a torn tail. The one guarantee the log cannot provide itself — that
27/// written bytes have reached stable storage — is delegated to
28/// [`sync`](WalStore::sync).
29///
30/// # Implementing a backend
31///
32/// The contract an implementation must honour:
33///
34/// - [`write_at`](WalStore::write_at) writes `bytes` at `offset`, growing the
35///   store if `offset` is past the current end and zero-filling any gap (so a
36///   later offset written before an earlier one leaves detectable zero bytes in
37///   between, exactly as a sparse file does). Concurrent calls to disjoint
38///   ranges must not corrupt each other.
39/// - [`read_at`](WalStore::read_at) fills `buf` starting at `offset`, returning
40///   the number of bytes read. It returns fewer than `buf.len()` only when the
41///   store ends first — that short read is how recovery detects a torn tail.
42/// - [`sync`](WalStore::sync) returns only once every prior write is durable.
43/// - [`truncate`](WalStore::truncate) discards everything at or after `len`.
44///
45/// `Send + Sync` is required so the log can be shared across threads.
46///
47/// # Examples
48///
49/// ```
50/// use wal_db::{MemStore, Wal};
51///
52/// # fn main() -> Result<(), wal_db::WalError> {
53/// let wal = Wal::with_store(MemStore::new())?;
54/// wal.append(b"record")?;
55/// wal.sync()?;
56/// # Ok(())
57/// # }
58/// ```
59pub trait WalStore: Send + Sync {
60    /// Write `bytes` at byte `offset`, growing the store and zero-filling any
61    /// gap if `offset` is beyond the current end.
62    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()>;
63
64    /// Read into `buf` starting at byte `offset`, returning the number of bytes
65    /// read.
66    ///
67    /// A return value smaller than `buf.len()` means the store ended before
68    /// `buf` could be filled.
69    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize>;
70
71    /// Discard every byte at or after `len`, shrinking the store to exactly
72    /// `len` bytes.
73    fn truncate(&self, len: u64) -> Result<()>;
74
75    /// Flush every preceding [`write_at`](WalStore::write_at) to stable storage.
76    ///
77    /// Returns only once the data will survive a power loss. This is the
78    /// durability barrier the whole log rests on.
79    fn sync(&self) -> Result<()>;
80
81    /// The current size of the store in bytes.
82    fn len(&self) -> Result<u64>;
83
84    /// Whether the store holds no bytes.
85    ///
86    /// The default defers to [`len`](WalStore::len); override it only if a
87    /// backend can answer more cheaply.
88    fn is_empty(&self) -> Result<bool> {
89        Ok(self.len()? == 0)
90    }
91
92    /// The lowest offset that still holds data.
93    ///
94    /// Normally `0`. A backend that can drop a prefix (see
95    /// [`truncate_before`](WalStore::truncate_before)) reports the offset of its
96    /// first surviving byte here, so recovery knows where to begin scanning. The
97    /// default backends that cannot drop a prefix leave this at `0`.
98    fn head(&self) -> Result<u64> {
99        Ok(0)
100    }
101
102    /// Discard storage entirely below `offset`, if the backend can, returning the
103    /// new [`head`](WalStore::head).
104    ///
105    /// Offsets are preserved: dropping a prefix never renumbers what remains, so
106    /// a record keeps its byte position (its LSN) for life. A backend that cannot
107    /// remove a prefix — a single file, where the surviving bytes would have to
108    /// move — leaves the store unchanged and returns its current head. One that
109    /// can (a segmented store, by deleting whole leading segment files) removes
110    /// what it can at its own granularity and returns the resulting head, which
111    /// may be below `offset`.
112    fn truncate_before(&self, _offset: u64) -> Result<u64> {
113        self.head()
114    }
115}
116
117/// A file-backed [`WalStore`]: the default storage for [`Wal::open`](crate::Wal::open).
118///
119/// All reads and writes are positioned (`pread`/`pwrite` on Unix, `seek_read`/
120/// `seek_write` on Windows), so concurrent appenders writing to disjoint offsets
121/// never contend on a shared file cursor, and a recovery read never disturbs an
122/// append. [`sync`](WalStore::sync) issues the platform's true durability
123/// barrier: `fdatasync` on Linux, `FlushFileBuffers` on Windows, and
124/// `fcntl(F_FULLFSYNC)` on macOS — the last because macOS's `fsync` does not
125/// flush the drive's write cache.
126#[derive(Debug)]
127pub struct FileStore {
128    file: File,
129    path: PathBuf,
130}
131
132impl FileStore {
133    /// Open the file at `path`, creating it if it does not exist.
134    ///
135    /// The store does not interpret the file's contents — it does not look for a
136    /// torn tail or validate records. That is [`Wal::open`](crate::Wal::open)'s
137    /// job, which scans on open and truncates any incomplete trailing record.
138    ///
139    /// # Errors
140    ///
141    /// Returns [`WalError::Io`] if the file cannot be opened (for example a
142    /// missing parent directory or insufficient permissions).
143    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
144        let path = path.as_ref().to_path_buf();
145        let file = OpenOptions::new()
146            .read(true)
147            .write(true)
148            .create(true)
149            .truncate(false)
150            .open(&path)
151            .map_err(|e| WalError::io("opening the log file", e))?;
152        Ok(FileStore { file, path })
153    }
154
155    /// The path this store was opened from.
156    #[must_use]
157    pub fn path(&self) -> &Path {
158        &self.path
159    }
160}
161
162impl WalStore for FileStore {
163    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
164        pwrite_all(&self.file, offset, bytes).map_err(|e| WalError::io("writing a record", e))
165    }
166
167    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
168        pread_fill(&self.file, offset, buf).map_err(|e| WalError::io("reading from the log", e))
169    }
170
171    fn truncate(&self, len: u64) -> Result<()> {
172        self.file
173            .set_len(len)
174            .map_err(|e| WalError::io("truncating the log", e))
175    }
176
177    fn sync(&self) -> Result<()> {
178        durable_sync(&self.file).map_err(|e| WalError::io("flushing to stable storage", e))
179    }
180
181    fn len(&self) -> Result<u64> {
182        Ok(self
183            .file
184            .metadata()
185            .map_err(|e| WalError::io("reading log file metadata", e))?
186            .len())
187    }
188}
189
190/// An in-memory [`WalStore`] backed by a `Vec<u8>` behind a short lock.
191///
192/// Everything a [`FileStore`] does, without touching the filesystem, including
193/// the sparse-file behaviour of zero-filling a gap when a higher offset is
194/// written first. [`sync`](WalStore::sync) is a no-op — memory has no separate
195/// durable tier — so a `MemStore` is for tests, examples, and benchmarking the
196/// framing path in isolation, not for durability.
197///
198/// # Examples
199///
200/// ```
201/// use wal_db::{MemStore, Wal};
202///
203/// # fn main() -> Result<(), wal_db::WalError> {
204/// let wal = Wal::with_store(MemStore::new())?;
205/// let lsn = wal.append(b"in memory")?;
206/// assert_eq!(lsn.get(), 0);
207/// # Ok(())
208/// # }
209/// ```
210#[derive(Debug, Default)]
211pub struct MemStore {
212    data: Mutex<Vec<u8>>,
213}
214
215impl MemStore {
216    /// Create an empty in-memory store.
217    #[must_use]
218    pub fn new() -> Self {
219        MemStore {
220            data: Mutex::new(Vec::new()),
221        }
222    }
223
224    /// Create an empty store that has pre-allocated room for `capacity` bytes,
225    /// to avoid reallocations during a known-size workload.
226    #[must_use]
227    pub fn with_capacity(capacity: usize) -> Self {
228        MemStore {
229            data: Mutex::new(Vec::with_capacity(capacity)),
230        }
231    }
232
233    /// Create a store preloaded with `bytes` — for example a log image captured
234    /// elsewhere, so [`Wal::with_store`](crate::Wal::with_store) can recover it.
235    #[must_use]
236    pub fn from_bytes(bytes: Vec<u8>) -> Self {
237        MemStore {
238            data: Mutex::new(bytes),
239        }
240    }
241
242    fn lock(&self) -> std::sync::MutexGuard<'_, Vec<u8>> {
243        self.data
244            .lock()
245            .unwrap_or_else(std::sync::PoisonError::into_inner)
246    }
247
248    /// A copy of the current bytes. Crate-internal, for tests that inspect or
249    /// snapshot the on-disk image.
250    #[cfg(test)]
251    pub(crate) fn snapshot(&self) -> Vec<u8> {
252        self.lock().clone()
253    }
254}
255
256impl Clone for MemStore {
257    fn clone(&self) -> Self {
258        MemStore {
259            data: Mutex::new(self.lock().clone()),
260        }
261    }
262}
263
264impl WalStore for MemStore {
265    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
266        let start = usize::try_from(offset).map_err(|_| {
267            WalError::io(
268                "writing to memory",
269                io::Error::other("offset exceeds usize"),
270            )
271        })?;
272        let end = start.checked_add(bytes.len()).ok_or_else(|| {
273            WalError::io(
274                "writing to memory",
275                io::Error::other("write overflows usize"),
276            )
277        })?;
278
279        let mut data = self.lock();
280        if data.len() < end {
281            data.resize(end, 0); // zero-fill any gap, like a sparse file
282        }
283        data[start..end].copy_from_slice(bytes);
284        Ok(())
285    }
286
287    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
288        let data = self.lock();
289        let start = match usize::try_from(offset) {
290            Ok(start) if start < data.len() => start,
291            _ => return Ok(0),
292        };
293        let available = &data[start..];
294        let n = available.len().min(buf.len());
295        buf[..n].copy_from_slice(&available[..n]);
296        Ok(n)
297    }
298
299    fn truncate(&self, len: u64) -> Result<()> {
300        let len = usize::try_from(len).unwrap_or(usize::MAX);
301        self.lock().truncate(len);
302        Ok(())
303    }
304
305    fn sync(&self) -> Result<()> {
306        Ok(())
307    }
308
309    fn len(&self) -> Result<u64> {
310        Ok(self.lock().len() as u64)
311    }
312}
313
314// ---------------------------------------------------------------------------
315// Platform-correct durability.
316// ---------------------------------------------------------------------------
317
318/// Flush every buffered write for `file` to stable storage.
319///
320/// On macOS this is `fcntl(F_FULLFSYNC)`. The standard library's `sync_all` and
321/// `sync_data` call `fsync(2)` there, which flushes the page cache to the device
322/// but leaves the data in the device's own write cache, where a power loss can
323/// still take it. `F_FULLFSYNC` is the documented way to force a full flush.
324#[cfg(target_os = "macos")]
325pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
326    use std::os::unix::io::AsRawFd;
327
328    let fd = file.as_raw_fd();
329    // SAFETY: `fd` is a valid, open file descriptor for as long as `file` is
330    // borrowed, so it cannot be closed from under us. `F_FULLFSYNC` takes no
331    // argument pointer and neither reads nor writes any user buffer. `fcntl`
332    // reports failure by returning -1 and setting `errno`, which is checked
333    // immediately below.
334    let ret = unsafe { libc::fcntl(fd, libc::F_FULLFSYNC) };
335    if ret == -1 {
336        return Err(io::Error::last_os_error());
337    }
338    Ok(())
339}
340
341/// On Linux `sync_data` is `fdatasync`; on Windows it is `FlushFileBuffers`.
342/// Both are true durability barriers, so the standard library call is correct
343/// on every platform except macOS.
344#[cfg(not(target_os = "macos"))]
345pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
346    file.sync_data()
347}
348
349// ---------------------------------------------------------------------------
350// Positioned I/O. Reads and writes carry their own offset so they never move a
351// shared file cursor, which is what lets disjoint concurrent writes proceed
352// without a lock.
353// ---------------------------------------------------------------------------
354
355#[cfg(unix)]
356pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
357    use std::os::unix::fs::FileExt;
358
359    while !buf.is_empty() {
360        match file.write_at(buf, offset) {
361            Ok(0) => {
362                return Err(io::Error::new(
363                    io::ErrorKind::WriteZero,
364                    "the store accepted zero bytes mid-record",
365                ));
366            }
367            Ok(n) => {
368                buf = &buf[n..];
369                offset += n as u64;
370            }
371            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
372            Err(e) => return Err(e),
373        }
374    }
375    Ok(())
376}
377
378#[cfg(windows)]
379pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
380    use std::os::windows::fs::FileExt;
381
382    while !buf.is_empty() {
383        match file.seek_write(buf, offset) {
384            Ok(0) => {
385                return Err(io::Error::new(
386                    io::ErrorKind::WriteZero,
387                    "the store accepted zero bytes mid-record",
388                ));
389            }
390            Ok(n) => {
391                buf = &buf[n..];
392                offset += n as u64;
393            }
394            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
395            Err(e) => return Err(e),
396        }
397    }
398    Ok(())
399}
400
401#[cfg(unix)]
402pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
403    use std::os::unix::fs::FileExt;
404
405    let mut total = 0;
406    while total < buf.len() {
407        match file.read_at(&mut buf[total..], offset) {
408            Ok(0) => break,
409            Ok(n) => {
410                total += n;
411                offset += n as u64;
412            }
413            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
414            Err(e) => return Err(e),
415        }
416    }
417    Ok(total)
418}
419
420#[cfg(windows)]
421pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
422    use std::os::windows::fs::FileExt;
423
424    let mut total = 0;
425    while total < buf.len() {
426        match file.seek_read(&mut buf[total..], offset) {
427            Ok(0) => break,
428            Ok(n) => {
429                total += n;
430                offset += n as u64;
431            }
432            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
433            Err(e) => return Err(e),
434        }
435    }
436    Ok(total)
437}
438
439#[cfg(test)]
440#[allow(clippy::unwrap_used, clippy::expect_used)]
441mod tests {
442    use super::*;
443
444    #[test]
445    fn test_memstore_write_at_advances_len() {
446        let store = MemStore::new();
447        assert_eq!(store.len().unwrap(), 0);
448        store.write_at(0, b"abc").unwrap();
449        assert_eq!(store.len().unwrap(), 3);
450        store.write_at(3, b"de").unwrap();
451        assert_eq!(store.len().unwrap(), 5);
452    }
453
454    #[test]
455    fn test_memstore_write_past_end_zero_fills_gap() {
456        let store = MemStore::new();
457        // Write at offset 4 while the store is empty: the gap [0,4) is zeros.
458        store.write_at(4, b"XY").unwrap();
459        assert_eq!(store.len().unwrap(), 6);
460        let mut buf = [0xFFu8; 6];
461        assert_eq!(store.read_at(0, &mut buf).unwrap(), 6);
462        assert_eq!(&buf, &[0, 0, 0, 0, b'X', b'Y']);
463    }
464
465    #[test]
466    fn test_memstore_read_past_end_is_short() {
467        let store = MemStore::new();
468        store.write_at(0, b"abc").unwrap();
469        let mut buf = [0u8; 8];
470        assert_eq!(store.read_at(1, &mut buf).unwrap(), 2);
471        assert_eq!(&buf[..2], b"bc");
472    }
473
474    #[test]
475    fn test_memstore_truncate_shrinks() {
476        let store = MemStore::new();
477        store.write_at(0, b"0123456789").unwrap();
478        store.truncate(4).unwrap();
479        assert_eq!(store.len().unwrap(), 4);
480    }
481
482    #[test]
483    fn test_filestore_roundtrip_through_disk() {
484        let dir = tempfile::tempdir().unwrap();
485        let path = dir.path().join("store.bin");
486
487        {
488            let store = FileStore::open(&path).unwrap();
489            store.write_at(0, b"hello world").unwrap();
490            store.sync().unwrap();
491            assert_eq!(store.len().unwrap(), 11);
492        }
493
494        let store = FileStore::open(&path).unwrap();
495        assert_eq!(store.len().unwrap(), 11);
496        let mut buf = [0u8; 5];
497        assert_eq!(store.read_at(6, &mut buf).unwrap(), 5);
498        assert_eq!(&buf, b"world");
499    }
500
501    #[test]
502    fn test_filestore_concurrent_disjoint_writes() {
503        use std::sync::Arc;
504        use std::thread;
505
506        let dir = tempfile::tempdir().unwrap();
507        let path = dir.path().join("concurrent.bin");
508        let store = Arc::new(FileStore::open(&path).unwrap());
509
510        let mut handles = Vec::new();
511        for i in 0..8u64 {
512            let store = Arc::clone(&store);
513            handles.push(thread::spawn(move || {
514                let byte = b'A' + i as u8;
515                store.write_at(i * 4, &[byte; 4]).unwrap();
516            }));
517        }
518        for h in handles {
519            h.join().unwrap();
520        }
521        store.sync().unwrap();
522
523        let mut buf = [0u8; 32];
524        assert_eq!(store.read_at(0, &mut buf).unwrap(), 32);
525        for i in 0..8 {
526            let expected = b'A' + i as u8;
527            assert_eq!(&buf[i * 4..i * 4 + 4], &[expected; 4]);
528        }
529    }
530
531    #[test]
532    fn test_filestore_sync_durable_across_reopen() {
533        let dir = tempfile::tempdir().unwrap();
534        let path = dir.path().join("durable.bin");
535        {
536            let store = FileStore::open(&path).unwrap();
537            store.write_at(0, b"persisted").unwrap();
538            store.sync().unwrap();
539        }
540        let store = FileStore::open(&path).unwrap();
541        let mut buf = [0u8; 9];
542        assert_eq!(store.read_at(0, &mut buf).unwrap(), 9);
543        assert_eq!(&buf, b"persisted");
544    }
545}