wal-db 0.8.0

Write-ahead log primitive for Rust storage engines. Durable, recoverable, lock-free append path. The WAL substrate under lsm-db, txn-db, raft-io, and Hive DB.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
//! Storage backends.
//!
//! A [`Wal`](crate::Wal) frames records, hands out sequence numbers, and
//! coordinates durability; the bytes themselves live behind the [`WalStore`]
//! trait. Every method takes `&self`, because the multi-writer append path
//! writes from several threads at once — the store must accept concurrent,
//! positioned writes without a lock of its own (a file does; an in-memory
//! [`MemStore`] uses a short internal lock). The default [`FileStore`] writes to
//! a file; a custom implementation could put the log on any byte-addressable,
//! appendable medium.

use std::{
    fs::{File, OpenOptions},
    io,
    path::{Path, PathBuf},
    sync::Mutex,
};

use crate::error::{Result, WalError};

/// A byte-addressable, append-only store with an explicit durability barrier.
///
/// The log treats a store as a growing array of bytes. It writes framed records
/// at reserved offsets — possibly from several threads concurrently and out of
/// order — reads from arbitrary offsets during recovery, and occasionally
/// truncates a torn tail. The one guarantee the log cannot provide itself — that
/// written bytes have reached stable storage — is delegated to
/// [`sync`](WalStore::sync).
///
/// # Implementing a backend
///
/// The contract an implementation must honour:
///
/// - [`write_at`](WalStore::write_at) writes `bytes` at `offset`, growing the
///   store if `offset` is past the current end and zero-filling any gap (so a
///   later offset written before an earlier one leaves detectable zero bytes in
///   between, exactly as a sparse file does). Concurrent calls to disjoint
///   ranges must not corrupt each other.
/// - [`read_at`](WalStore::read_at) fills `buf` starting at `offset`, returning
///   the number of bytes read. It returns fewer than `buf.len()` only when the
///   store ends first — that short read is how recovery detects a torn tail.
/// - [`sync`](WalStore::sync) returns only once every prior write is durable.
/// - [`truncate`](WalStore::truncate) discards everything at or after `len`.
///
/// `Send + Sync` is required so the log can be shared across threads.
///
/// # Examples
///
/// ```
/// use wal_db::{MemStore, Wal};
///
/// # fn main() -> Result<(), wal_db::WalError> {
/// let wal = Wal::with_store(MemStore::new())?;
/// wal.append(b"record")?;
/// wal.sync()?;
/// # Ok(())
/// # }
/// ```
pub trait WalStore: Send + Sync {
    /// Write `bytes` at byte `offset`, growing the store and zero-filling any
    /// gap if `offset` is beyond the current end.
    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()>;

    /// Read into `buf` starting at byte `offset`, returning the number of bytes
    /// read.
    ///
    /// A return value smaller than `buf.len()` means the store ended before
    /// `buf` could be filled.
    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize>;

    /// Discard every byte at or after `len`, shrinking the store to exactly
    /// `len` bytes.
    fn truncate(&self, len: u64) -> Result<()>;

    /// Flush every preceding [`write_at`](WalStore::write_at) to stable storage.
    ///
    /// Returns only once the data will survive a power loss. This is the
    /// durability barrier the whole log rests on.
    fn sync(&self) -> Result<()>;

    /// The current size of the store in bytes.
    fn len(&self) -> Result<u64>;

    /// Whether the store holds no bytes.
    ///
    /// The default defers to [`len`](WalStore::len); override it only if a
    /// backend can answer more cheaply.
    fn is_empty(&self) -> Result<bool> {
        Ok(self.len()? == 0)
    }
}

/// A file-backed [`WalStore`]: the default storage for [`Wal::open`](crate::Wal::open).
///
/// All reads and writes are positioned (`pread`/`pwrite` on Unix, `seek_read`/
/// `seek_write` on Windows), so concurrent appenders writing to disjoint offsets
/// never contend on a shared file cursor, and a recovery read never disturbs an
/// append. [`sync`](WalStore::sync) issues the platform's true durability
/// barrier: `fdatasync` on Linux, `FlushFileBuffers` on Windows, and
/// `fcntl(F_FULLFSYNC)` on macOS — the last because macOS's `fsync` does not
/// flush the drive's write cache.
#[derive(Debug)]
pub struct FileStore {
    file: File,
    path: PathBuf,
}

impl FileStore {
    /// Open the file at `path`, creating it if it does not exist.
    ///
    /// The store does not interpret the file's contents — it does not look for a
    /// torn tail or validate records. That is [`Wal::open`](crate::Wal::open)'s
    /// job, which scans on open and truncates any incomplete trailing record.
    ///
    /// # Errors
    ///
    /// Returns [`WalError::Io`] if the file cannot be opened (for example a
    /// missing parent directory or insufficient permissions).
    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
        let path = path.as_ref().to_path_buf();
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(false)
            .open(&path)
            .map_err(|e| WalError::io("opening the log file", e))?;
        Ok(FileStore { file, path })
    }

    /// The path this store was opened from.
    #[must_use]
    pub fn path(&self) -> &Path {
        &self.path
    }
}

impl WalStore for FileStore {
    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
        pwrite_all(&self.file, offset, bytes).map_err(|e| WalError::io("writing a record", e))
    }

    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
        pread_fill(&self.file, offset, buf).map_err(|e| WalError::io("reading from the log", e))
    }

    fn truncate(&self, len: u64) -> Result<()> {
        self.file
            .set_len(len)
            .map_err(|e| WalError::io("truncating the log", e))
    }

    fn sync(&self) -> Result<()> {
        durable_sync(&self.file).map_err(|e| WalError::io("flushing to stable storage", e))
    }

    fn len(&self) -> Result<u64> {
        Ok(self
            .file
            .metadata()
            .map_err(|e| WalError::io("reading log file metadata", e))?
            .len())
    }
}

/// An in-memory [`WalStore`] backed by a `Vec<u8>` behind a short lock.
///
/// Everything a [`FileStore`] does, without touching the filesystem, including
/// the sparse-file behaviour of zero-filling a gap when a higher offset is
/// written first. [`sync`](WalStore::sync) is a no-op — memory has no separate
/// durable tier — so a `MemStore` is for tests, examples, and benchmarking the
/// framing path in isolation, not for durability.
///
/// # Examples
///
/// ```
/// use wal_db::{MemStore, Wal};
///
/// # fn main() -> Result<(), wal_db::WalError> {
/// let wal = Wal::with_store(MemStore::new())?;
/// let lsn = wal.append(b"in memory")?;
/// assert_eq!(lsn.get(), 0);
/// # Ok(())
/// # }
/// ```
#[derive(Debug, Default)]
pub struct MemStore {
    data: Mutex<Vec<u8>>,
}

impl MemStore {
    /// Create an empty in-memory store.
    #[must_use]
    pub fn new() -> Self {
        MemStore {
            data: Mutex::new(Vec::new()),
        }
    }

    /// Create an empty store that has pre-allocated room for `capacity` bytes,
    /// to avoid reallocations during a known-size workload.
    #[must_use]
    pub fn with_capacity(capacity: usize) -> Self {
        MemStore {
            data: Mutex::new(Vec::with_capacity(capacity)),
        }
    }

    /// Create a store preloaded with `bytes` — for example a log image captured
    /// elsewhere, so [`Wal::with_store`](crate::Wal::with_store) can recover it.
    #[must_use]
    pub fn from_bytes(bytes: Vec<u8>) -> Self {
        MemStore {
            data: Mutex::new(bytes),
        }
    }

    fn lock(&self) -> std::sync::MutexGuard<'_, Vec<u8>> {
        self.data
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner)
    }

    /// A copy of the current bytes. Crate-internal, for tests that inspect or
    /// snapshot the on-disk image.
    #[cfg(test)]
    pub(crate) fn snapshot(&self) -> Vec<u8> {
        self.lock().clone()
    }
}

impl Clone for MemStore {
    fn clone(&self) -> Self {
        MemStore {
            data: Mutex::new(self.lock().clone()),
        }
    }
}

impl WalStore for MemStore {
    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
        let start = usize::try_from(offset).map_err(|_| {
            WalError::io(
                "writing to memory",
                io::Error::other("offset exceeds usize"),
            )
        })?;
        let end = start.checked_add(bytes.len()).ok_or_else(|| {
            WalError::io(
                "writing to memory",
                io::Error::other("write overflows usize"),
            )
        })?;

        let mut data = self.lock();
        if data.len() < end {
            data.resize(end, 0); // zero-fill any gap, like a sparse file
        }
        data[start..end].copy_from_slice(bytes);
        Ok(())
    }

    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
        let data = self.lock();
        let start = match usize::try_from(offset) {
            Ok(start) if start < data.len() => start,
            _ => return Ok(0),
        };
        let available = &data[start..];
        let n = available.len().min(buf.len());
        buf[..n].copy_from_slice(&available[..n]);
        Ok(n)
    }

    fn truncate(&self, len: u64) -> Result<()> {
        let len = usize::try_from(len).unwrap_or(usize::MAX);
        self.lock().truncate(len);
        Ok(())
    }

    fn sync(&self) -> Result<()> {
        Ok(())
    }

    fn len(&self) -> Result<u64> {
        Ok(self.lock().len() as u64)
    }
}

// ---------------------------------------------------------------------------
// Platform-correct durability.
// ---------------------------------------------------------------------------

/// Flush every buffered write for `file` to stable storage.
///
/// On macOS this is `fcntl(F_FULLFSYNC)`. The standard library's `sync_all` and
/// `sync_data` call `fsync(2)` there, which flushes the page cache to the device
/// but leaves the data in the device's own write cache, where a power loss can
/// still take it. `F_FULLFSYNC` is the documented way to force a full flush.
#[cfg(target_os = "macos")]
pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
    use std::os::unix::io::AsRawFd;

    let fd = file.as_raw_fd();
    // SAFETY: `fd` is a valid, open file descriptor for as long as `file` is
    // borrowed, so it cannot be closed from under us. `F_FULLFSYNC` takes no
    // argument pointer and neither reads nor writes any user buffer. `fcntl`
    // reports failure by returning -1 and setting `errno`, which is checked
    // immediately below.
    let ret = unsafe { libc::fcntl(fd, libc::F_FULLFSYNC) };
    if ret == -1 {
        return Err(io::Error::last_os_error());
    }
    Ok(())
}

/// On Linux `sync_data` is `fdatasync`; on Windows it is `FlushFileBuffers`.
/// Both are true durability barriers, so the standard library call is correct
/// on every platform except macOS.
#[cfg(not(target_os = "macos"))]
pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
    file.sync_data()
}

// ---------------------------------------------------------------------------
// Positioned I/O. Reads and writes carry their own offset so they never move a
// shared file cursor, which is what lets disjoint concurrent writes proceed
// without a lock.
// ---------------------------------------------------------------------------

#[cfg(unix)]
pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
    use std::os::unix::fs::FileExt;

    while !buf.is_empty() {
        match file.write_at(buf, offset) {
            Ok(0) => {
                return Err(io::Error::new(
                    io::ErrorKind::WriteZero,
                    "the store accepted zero bytes mid-record",
                ));
            }
            Ok(n) => {
                buf = &buf[n..];
                offset += n as u64;
            }
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
            Err(e) => return Err(e),
        }
    }
    Ok(())
}

#[cfg(windows)]
pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
    use std::os::windows::fs::FileExt;

    while !buf.is_empty() {
        match file.seek_write(buf, offset) {
            Ok(0) => {
                return Err(io::Error::new(
                    io::ErrorKind::WriteZero,
                    "the store accepted zero bytes mid-record",
                ));
            }
            Ok(n) => {
                buf = &buf[n..];
                offset += n as u64;
            }
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
            Err(e) => return Err(e),
        }
    }
    Ok(())
}

#[cfg(unix)]
pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
    use std::os::unix::fs::FileExt;

    let mut total = 0;
    while total < buf.len() {
        match file.read_at(&mut buf[total..], offset) {
            Ok(0) => break,
            Ok(n) => {
                total += n;
                offset += n as u64;
            }
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
            Err(e) => return Err(e),
        }
    }
    Ok(total)
}

#[cfg(windows)]
pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
    use std::os::windows::fs::FileExt;

    let mut total = 0;
    while total < buf.len() {
        match file.seek_read(&mut buf[total..], offset) {
            Ok(0) => break,
            Ok(n) => {
                total += n;
                offset += n as u64;
            }
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
            Err(e) => return Err(e),
        }
    }
    Ok(total)
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;

    #[test]
    fn test_memstore_write_at_advances_len() {
        let store = MemStore::new();
        assert_eq!(store.len().unwrap(), 0);
        store.write_at(0, b"abc").unwrap();
        assert_eq!(store.len().unwrap(), 3);
        store.write_at(3, b"de").unwrap();
        assert_eq!(store.len().unwrap(), 5);
    }

    #[test]
    fn test_memstore_write_past_end_zero_fills_gap() {
        let store = MemStore::new();
        // Write at offset 4 while the store is empty: the gap [0,4) is zeros.
        store.write_at(4, b"XY").unwrap();
        assert_eq!(store.len().unwrap(), 6);
        let mut buf = [0xFFu8; 6];
        assert_eq!(store.read_at(0, &mut buf).unwrap(), 6);
        assert_eq!(&buf, &[0, 0, 0, 0, b'X', b'Y']);
    }

    #[test]
    fn test_memstore_read_past_end_is_short() {
        let store = MemStore::new();
        store.write_at(0, b"abc").unwrap();
        let mut buf = [0u8; 8];
        assert_eq!(store.read_at(1, &mut buf).unwrap(), 2);
        assert_eq!(&buf[..2], b"bc");
    }

    #[test]
    fn test_memstore_truncate_shrinks() {
        let store = MemStore::new();
        store.write_at(0, b"0123456789").unwrap();
        store.truncate(4).unwrap();
        assert_eq!(store.len().unwrap(), 4);
    }

    #[test]
    fn test_filestore_roundtrip_through_disk() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("store.bin");

        {
            let store = FileStore::open(&path).unwrap();
            store.write_at(0, b"hello world").unwrap();
            store.sync().unwrap();
            assert_eq!(store.len().unwrap(), 11);
        }

        let store = FileStore::open(&path).unwrap();
        assert_eq!(store.len().unwrap(), 11);
        let mut buf = [0u8; 5];
        assert_eq!(store.read_at(6, &mut buf).unwrap(), 5);
        assert_eq!(&buf, b"world");
    }

    #[test]
    fn test_filestore_concurrent_disjoint_writes() {
        use std::sync::Arc;
        use std::thread;

        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("concurrent.bin");
        let store = Arc::new(FileStore::open(&path).unwrap());

        let mut handles = Vec::new();
        for i in 0..8u64 {
            let store = Arc::clone(&store);
            handles.push(thread::spawn(move || {
                let byte = b'A' + i as u8;
                store.write_at(i * 4, &[byte; 4]).unwrap();
            }));
        }
        for h in handles {
            h.join().unwrap();
        }
        store.sync().unwrap();

        let mut buf = [0u8; 32];
        assert_eq!(store.read_at(0, &mut buf).unwrap(), 32);
        for i in 0..8 {
            let expected = b'A' + i as u8;
            assert_eq!(&buf[i * 4..i * 4 + 4], &[expected; 4]);
        }
    }

    #[test]
    fn test_filestore_sync_durable_across_reopen() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("durable.bin");
        {
            let store = FileStore::open(&path).unwrap();
            store.write_at(0, b"persisted").unwrap();
            store.sync().unwrap();
        }
        let store = FileStore::open(&path).unwrap();
        let mut buf = [0u8; 9];
        assert_eq!(store.read_at(0, &mut buf).unwrap(), 9);
        assert_eq!(&buf, b"persisted");
    }
}