wal_db/store.rs
1//! Storage backends.
2//!
3//! A [`Wal`](crate::Wal) frames records, hands out sequence numbers, and
4//! coordinates durability; the bytes themselves live behind the [`WalStore`]
5//! trait. Every method takes `&self`, because the multi-writer append path
6//! writes from several threads at once — the store must accept concurrent,
7//! positioned writes without a lock of its own (a file does; an in-memory
8//! [`MemStore`] uses a short internal lock). The default [`FileStore`] writes to
9//! a file; a custom implementation could put the log on any byte-addressable,
10//! appendable medium.
11
12use std::{
13 fs::{File, OpenOptions},
14 io,
15 path::{Path, PathBuf},
16 sync::Mutex,
17};
18
19use crate::error::{Result, WalError};
20
21/// A byte-addressable, append-only store with an explicit durability barrier.
22///
23/// The log treats a store as a growing array of bytes. It writes framed records
24/// at reserved offsets — possibly from several threads concurrently and out of
25/// order — reads from arbitrary offsets during recovery, and occasionally
26/// truncates a torn tail. The one guarantee the log cannot provide itself — that
27/// written bytes have reached stable storage — is delegated to
28/// [`sync`](WalStore::sync).
29///
30/// # Implementing a backend
31///
32/// The contract an implementation must honour:
33///
34/// - [`write_at`](WalStore::write_at) writes `bytes` at `offset`, growing the
35/// store if `offset` is past the current end and zero-filling any gap (so a
36/// later offset written before an earlier one leaves detectable zero bytes in
37/// between, exactly as a sparse file does). Concurrent calls to disjoint
38/// ranges must not corrupt each other.
39/// - [`read_at`](WalStore::read_at) fills `buf` starting at `offset`, returning
40/// the number of bytes read. It returns fewer than `buf.len()` only when the
41/// store ends first — that short read is how recovery detects a torn tail.
42/// - [`sync`](WalStore::sync) returns only once every prior write is durable.
43/// - [`truncate`](WalStore::truncate) discards everything at or after `len`.
44///
45/// `Send + Sync` is required so the log can be shared across threads.
46///
47/// # Examples
48///
49/// ```
50/// use wal_db::{MemStore, Wal};
51///
52/// # fn main() -> Result<(), wal_db::WalError> {
53/// let wal = Wal::with_store(MemStore::new())?;
54/// wal.append(b"record")?;
55/// wal.sync()?;
56/// # Ok(())
57/// # }
58/// ```
59pub trait WalStore: Send + Sync {
60 /// Write `bytes` at byte `offset`, growing the store and zero-filling any
61 /// gap if `offset` is beyond the current end.
62 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()>;
63
64 /// Read into `buf` starting at byte `offset`, returning the number of bytes
65 /// read.
66 ///
67 /// A return value smaller than `buf.len()` means the store ended before
68 /// `buf` could be filled.
69 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize>;
70
71 /// Discard every byte at or after `len`, shrinking the store to exactly
72 /// `len` bytes.
73 fn truncate(&self, len: u64) -> Result<()>;
74
75 /// Flush every preceding [`write_at`](WalStore::write_at) to stable storage.
76 ///
77 /// Returns only once the data will survive a power loss. This is the
78 /// durability barrier the whole log rests on.
79 fn sync(&self) -> Result<()>;
80
81 /// The current size of the store in bytes.
82 fn len(&self) -> Result<u64>;
83
84 /// Whether the store holds no bytes.
85 ///
86 /// The default defers to [`len`](WalStore::len); override it only if a
87 /// backend can answer more cheaply.
88 fn is_empty(&self) -> Result<bool> {
89 Ok(self.len()? == 0)
90 }
91
92 /// The lowest offset that still holds data.
93 ///
94 /// Normally `0`. A backend that can drop a prefix (see
95 /// [`truncate_before`](WalStore::truncate_before)) reports the offset of its
96 /// first surviving byte here, so recovery knows where to begin scanning. The
97 /// default backends that cannot drop a prefix leave this at `0`.
98 fn head(&self) -> Result<u64> {
99 Ok(0)
100 }
101
102 /// Discard storage entirely below `offset`, if the backend can, returning the
103 /// new [`head`](WalStore::head).
104 ///
105 /// Offsets are preserved: dropping a prefix never renumbers what remains, so
106 /// a record keeps its byte position (its LSN) for life. A backend that cannot
107 /// remove a prefix — a single file, where the surviving bytes would have to
108 /// move — leaves the store unchanged and returns its current head. One that
109 /// can (a segmented store, by deleting whole leading segment files) removes
110 /// what it can at its own granularity and returns the resulting head, which
111 /// may be below `offset`.
112 fn truncate_before(&self, _offset: u64) -> Result<u64> {
113 self.head()
114 }
115}
116
117/// A file-backed [`WalStore`]: the default storage for [`Wal::open`](crate::Wal::open).
118///
119/// All reads and writes are positioned (`pread`/`pwrite` on Unix, `seek_read`/
120/// `seek_write` on Windows), so concurrent appenders writing to disjoint offsets
121/// never contend on a shared file cursor, and a recovery read never disturbs an
122/// append. [`sync`](WalStore::sync) issues the platform's true durability
123/// barrier: `fdatasync` on Linux, `FlushFileBuffers` on Windows, and
124/// `fcntl(F_FULLFSYNC)` on macOS — the last because macOS's `fsync` does not
125/// flush the drive's write cache.
126#[derive(Debug)]
127pub struct FileStore {
128 file: File,
129 path: PathBuf,
130}
131
132impl FileStore {
133 /// Open the file at `path`, creating it if it does not exist.
134 ///
135 /// The store does not interpret the file's contents — it does not look for a
136 /// torn tail or validate records. That is [`Wal::open`](crate::Wal::open)'s
137 /// job, which scans on open and truncates any incomplete trailing record.
138 ///
139 /// # Errors
140 ///
141 /// Returns [`WalError::Io`] if the file cannot be opened (for example a
142 /// missing parent directory or insufficient permissions).
143 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
144 let path = path.as_ref().to_path_buf();
145 let file = OpenOptions::new()
146 .read(true)
147 .write(true)
148 .create(true)
149 .truncate(false)
150 .open(&path)
151 .map_err(|e| WalError::io("opening the log file", e))?;
152 Ok(FileStore { file, path })
153 }
154
155 /// The path this store was opened from.
156 #[must_use]
157 pub fn path(&self) -> &Path {
158 &self.path
159 }
160}
161
162impl WalStore for FileStore {
163 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
164 pwrite_all(&self.file, offset, bytes).map_err(|e| WalError::io("writing a record", e))
165 }
166
167 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
168 pread_fill(&self.file, offset, buf).map_err(|e| WalError::io("reading from the log", e))
169 }
170
171 fn truncate(&self, len: u64) -> Result<()> {
172 self.file
173 .set_len(len)
174 .map_err(|e| WalError::io("truncating the log", e))
175 }
176
177 fn sync(&self) -> Result<()> {
178 durable_sync(&self.file).map_err(|e| WalError::io("flushing to stable storage", e))
179 }
180
181 fn len(&self) -> Result<u64> {
182 Ok(self
183 .file
184 .metadata()
185 .map_err(|e| WalError::io("reading log file metadata", e))?
186 .len())
187 }
188}
189
190/// An in-memory [`WalStore`] backed by a `Vec<u8>` behind a short lock.
191///
192/// Everything a [`FileStore`] does, without touching the filesystem, including
193/// the sparse-file behaviour of zero-filling a gap when a higher offset is
194/// written first. [`sync`](WalStore::sync) is a no-op — memory has no separate
195/// durable tier — so a `MemStore` is for tests, examples, and benchmarking the
196/// framing path in isolation, not for durability.
197///
198/// # Examples
199///
200/// ```
201/// use wal_db::{MemStore, Wal};
202///
203/// # fn main() -> Result<(), wal_db::WalError> {
204/// let wal = Wal::with_store(MemStore::new())?;
205/// let lsn = wal.append(b"in memory")?;
206/// assert_eq!(lsn.get(), 0);
207/// # Ok(())
208/// # }
209/// ```
210#[derive(Debug, Default)]
211pub struct MemStore {
212 data: Mutex<Vec<u8>>,
213}
214
215impl MemStore {
216 /// Create an empty in-memory store.
217 #[must_use]
218 pub fn new() -> Self {
219 MemStore {
220 data: Mutex::new(Vec::new()),
221 }
222 }
223
224 /// Create an empty store that has pre-allocated room for `capacity` bytes,
225 /// to avoid reallocations during a known-size workload.
226 #[must_use]
227 pub fn with_capacity(capacity: usize) -> Self {
228 MemStore {
229 data: Mutex::new(Vec::with_capacity(capacity)),
230 }
231 }
232
233 /// Create a store preloaded with `bytes` — for example a log image captured
234 /// elsewhere, so [`Wal::with_store`](crate::Wal::with_store) can recover it.
235 #[must_use]
236 pub fn from_bytes(bytes: Vec<u8>) -> Self {
237 MemStore {
238 data: Mutex::new(bytes),
239 }
240 }
241
242 fn lock(&self) -> std::sync::MutexGuard<'_, Vec<u8>> {
243 self.data
244 .lock()
245 .unwrap_or_else(std::sync::PoisonError::into_inner)
246 }
247
248 /// A copy of the current bytes. Crate-internal, for tests that inspect or
249 /// snapshot the on-disk image.
250 #[cfg(test)]
251 pub(crate) fn snapshot(&self) -> Vec<u8> {
252 self.lock().clone()
253 }
254}
255
256impl Clone for MemStore {
257 fn clone(&self) -> Self {
258 MemStore {
259 data: Mutex::new(self.lock().clone()),
260 }
261 }
262}
263
264impl WalStore for MemStore {
265 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
266 let start = usize::try_from(offset).map_err(|_| {
267 WalError::io(
268 "writing to memory",
269 io::Error::other("offset exceeds usize"),
270 )
271 })?;
272 let end = start.checked_add(bytes.len()).ok_or_else(|| {
273 WalError::io(
274 "writing to memory",
275 io::Error::other("write overflows usize"),
276 )
277 })?;
278
279 let mut data = self.lock();
280 if data.len() < end {
281 data.resize(end, 0); // zero-fill any gap, like a sparse file
282 }
283 data[start..end].copy_from_slice(bytes);
284 Ok(())
285 }
286
287 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
288 let data = self.lock();
289 let start = match usize::try_from(offset) {
290 Ok(start) if start < data.len() => start,
291 _ => return Ok(0),
292 };
293 let available = &data[start..];
294 let n = available.len().min(buf.len());
295 buf[..n].copy_from_slice(&available[..n]);
296 Ok(n)
297 }
298
299 fn truncate(&self, len: u64) -> Result<()> {
300 let len = usize::try_from(len).unwrap_or(usize::MAX);
301 self.lock().truncate(len);
302 Ok(())
303 }
304
305 fn sync(&self) -> Result<()> {
306 Ok(())
307 }
308
309 fn len(&self) -> Result<u64> {
310 Ok(self.lock().len() as u64)
311 }
312}
313
314// ---------------------------------------------------------------------------
315// Platform-correct durability.
316// ---------------------------------------------------------------------------
317
318/// Flush every buffered write for `file` to stable storage.
319///
320/// On macOS this is `fcntl(F_FULLFSYNC)`. The standard library's `sync_all` and
321/// `sync_data` call `fsync(2)` there, which flushes the page cache to the device
322/// but leaves the data in the device's own write cache, where a power loss can
323/// still take it. `F_FULLFSYNC` is the documented way to force a full flush.
324#[cfg(target_os = "macos")]
325pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
326 use std::os::unix::io::AsRawFd;
327
328 let fd = file.as_raw_fd();
329 // SAFETY: `fd` is a valid, open file descriptor for as long as `file` is
330 // borrowed, so it cannot be closed from under us. `F_FULLFSYNC` takes no
331 // argument pointer and neither reads nor writes any user buffer. `fcntl`
332 // reports failure by returning -1 and setting `errno`, which is checked
333 // immediately below.
334 let ret = unsafe { libc::fcntl(fd, libc::F_FULLFSYNC) };
335 if ret == -1 {
336 return Err(io::Error::last_os_error());
337 }
338 Ok(())
339}
340
341/// On Linux `sync_data` is `fdatasync`; on Windows it is `FlushFileBuffers`.
342/// Both are true durability barriers, so the standard library call is correct
343/// on every platform except macOS.
344#[cfg(not(target_os = "macos"))]
345pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
346 file.sync_data()
347}
348
349// ---------------------------------------------------------------------------
350// Positioned I/O. Reads and writes carry their own offset so they never move a
351// shared file cursor, which is what lets disjoint concurrent writes proceed
352// without a lock.
353// ---------------------------------------------------------------------------
354
355#[cfg(unix)]
356pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
357 use std::os::unix::fs::FileExt;
358
359 while !buf.is_empty() {
360 match file.write_at(buf, offset) {
361 Ok(0) => {
362 return Err(io::Error::new(
363 io::ErrorKind::WriteZero,
364 "the store accepted zero bytes mid-record",
365 ));
366 }
367 Ok(n) => {
368 buf = &buf[n..];
369 offset += n as u64;
370 }
371 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
372 Err(e) => return Err(e),
373 }
374 }
375 Ok(())
376}
377
378#[cfg(windows)]
379pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
380 use std::os::windows::fs::FileExt;
381
382 while !buf.is_empty() {
383 match file.seek_write(buf, offset) {
384 Ok(0) => {
385 return Err(io::Error::new(
386 io::ErrorKind::WriteZero,
387 "the store accepted zero bytes mid-record",
388 ));
389 }
390 Ok(n) => {
391 buf = &buf[n..];
392 offset += n as u64;
393 }
394 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
395 Err(e) => return Err(e),
396 }
397 }
398 Ok(())
399}
400
401#[cfg(unix)]
402pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
403 use std::os::unix::fs::FileExt;
404
405 let mut total = 0;
406 while total < buf.len() {
407 match file.read_at(&mut buf[total..], offset) {
408 Ok(0) => break,
409 Ok(n) => {
410 total += n;
411 offset += n as u64;
412 }
413 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
414 Err(e) => return Err(e),
415 }
416 }
417 Ok(total)
418}
419
420#[cfg(windows)]
421pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
422 use std::os::windows::fs::FileExt;
423
424 let mut total = 0;
425 while total < buf.len() {
426 match file.seek_read(&mut buf[total..], offset) {
427 Ok(0) => break,
428 Ok(n) => {
429 total += n;
430 offset += n as u64;
431 }
432 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
433 Err(e) => return Err(e),
434 }
435 }
436 Ok(total)
437}
438
439#[cfg(test)]
440#[allow(clippy::unwrap_used, clippy::expect_used)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_memstore_write_at_advances_len() {
446 let store = MemStore::new();
447 assert_eq!(store.len().unwrap(), 0);
448 store.write_at(0, b"abc").unwrap();
449 assert_eq!(store.len().unwrap(), 3);
450 store.write_at(3, b"de").unwrap();
451 assert_eq!(store.len().unwrap(), 5);
452 }
453
454 #[test]
455 fn test_memstore_write_past_end_zero_fills_gap() {
456 let store = MemStore::new();
457 // Write at offset 4 while the store is empty: the gap [0,4) is zeros.
458 store.write_at(4, b"XY").unwrap();
459 assert_eq!(store.len().unwrap(), 6);
460 let mut buf = [0xFFu8; 6];
461 assert_eq!(store.read_at(0, &mut buf).unwrap(), 6);
462 assert_eq!(&buf, &[0, 0, 0, 0, b'X', b'Y']);
463 }
464
465 #[test]
466 fn test_memstore_read_past_end_is_short() {
467 let store = MemStore::new();
468 store.write_at(0, b"abc").unwrap();
469 let mut buf = [0u8; 8];
470 assert_eq!(store.read_at(1, &mut buf).unwrap(), 2);
471 assert_eq!(&buf[..2], b"bc");
472 }
473
474 #[test]
475 fn test_memstore_truncate_shrinks() {
476 let store = MemStore::new();
477 store.write_at(0, b"0123456789").unwrap();
478 store.truncate(4).unwrap();
479 assert_eq!(store.len().unwrap(), 4);
480 }
481
482 #[test]
483 fn test_filestore_roundtrip_through_disk() {
484 let dir = tempfile::tempdir().unwrap();
485 let path = dir.path().join("store.bin");
486
487 {
488 let store = FileStore::open(&path).unwrap();
489 store.write_at(0, b"hello world").unwrap();
490 store.sync().unwrap();
491 assert_eq!(store.len().unwrap(), 11);
492 }
493
494 let store = FileStore::open(&path).unwrap();
495 assert_eq!(store.len().unwrap(), 11);
496 let mut buf = [0u8; 5];
497 assert_eq!(store.read_at(6, &mut buf).unwrap(), 5);
498 assert_eq!(&buf, b"world");
499 }
500
501 #[test]
502 fn test_filestore_concurrent_disjoint_writes() {
503 use std::sync::Arc;
504 use std::thread;
505
506 let dir = tempfile::tempdir().unwrap();
507 let path = dir.path().join("concurrent.bin");
508 let store = Arc::new(FileStore::open(&path).unwrap());
509
510 let mut handles = Vec::new();
511 for i in 0..8u64 {
512 let store = Arc::clone(&store);
513 handles.push(thread::spawn(move || {
514 let byte = b'A' + i as u8;
515 store.write_at(i * 4, &[byte; 4]).unwrap();
516 }));
517 }
518 for h in handles {
519 h.join().unwrap();
520 }
521 store.sync().unwrap();
522
523 let mut buf = [0u8; 32];
524 assert_eq!(store.read_at(0, &mut buf).unwrap(), 32);
525 for i in 0..8 {
526 let expected = b'A' + i as u8;
527 assert_eq!(&buf[i * 4..i * 4 + 4], &[expected; 4]);
528 }
529 }
530
531 #[test]
532 fn test_filestore_sync_durable_across_reopen() {
533 let dir = tempfile::tempdir().unwrap();
534 let path = dir.path().join("durable.bin");
535 {
536 let store = FileStore::open(&path).unwrap();
537 store.write_at(0, b"persisted").unwrap();
538 store.sync().unwrap();
539 }
540 let store = FileStore::open(&path).unwrap();
541 let mut buf = [0u8; 9];
542 assert_eq!(store.read_at(0, &mut buf).unwrap(), 9);
543 assert_eq!(&buf, b"persisted");
544 }
545}