pub struct Wal<S = FileStore> { /* private fields */ }Expand description
A durable, append-only log.
Wal is the entry point. The four calls that cover almost every use are
open, append, sync, and
iter. The type parameter S is the storage backend and
defaults to FileStore, so the plain name Wal is the file-backed log;
custom backends are supplied through with_store.
A Wal is Send and Sync, and the append path is built for it: many
threads can call append at once with no global lock. Share
one behind an Arc and write from every thread.
§Concurrency and durability
Appends are lock-free. Each one reserves its byte range with a single atomic
step — the range’s start offset is the record’s Lsn — frames the record
into a reused thread-local buffer, and writes it, all without blocking other
appenders. sync is the durability barrier; when several
threads sync at once they coalesce into a single fsync (group commit), so the
cost of making data durable is amortised across everyone committing together.
append returns once the record is in the OS page cache; sync returns once
it is on stable storage. See the crate docs for the full contract.
§Examples
use wal_db::Wal;
let wal = Wal::open(&path)?;
let first = wal.append(b"first")?;
let second = wal.append(b"second")?;
wal.sync()?;
// LSNs are byte offsets: the first record starts at 0, the second after it.
assert_eq!(first.get(), 0);
assert!(second.get() > first.get());
let read_back: Vec<Vec<u8>> = wal
.iter()?
.map(|entry| entry.map(|record| record.into_data()))
.collect::<Result<_, _>>()?;
assert_eq!(read_back, vec![b"first".to_vec(), b"second".to_vec()]);Implementations§
Source§impl Wal<FileStore>
impl Wal<FileStore>
Sourcepub fn open(path: impl AsRef<Path>) -> Result<Self>
pub fn open(path: impl AsRef<Path>) -> Result<Self>
Open the log at path, creating it if it does not exist.
On open the log scans its contents, stops at the first record that is incomplete or fails its checksum, and truncates that torn tail so the next append lands on a clean boundary. The common cause of a torn tail is a crash partway through an earlier append; that record was never acknowledged durable, so discarding it loses nothing the caller was promised.
§Errors
Returns WalError::Io if the file cannot be opened or scanned.
§Examples
use wal_db::Wal;
let wal = Wal::open(&path)?;
wal.append(b"hello")?;
wal.sync()?;Sourcepub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self>
pub fn open_with(path: impl AsRef<Path>, config: WalConfig) -> Result<Self>
Open the log at path with an explicit WalConfig.
§Errors
Returns WalError::Io if the file cannot be opened or scanned.
§Examples
use wal_db::{Wal, WalConfig};
let config = WalConfig::new().with_max_record_size(1024);
let wal = Wal::open_with(&path, config)?;Source§impl Wal<SegmentedStore>
impl Wal<SegmentedStore>
Sourcepub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self>
pub fn open_segmented(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self>
Open a segmented log in directory dir, with segments of segment_size
bytes, creating the directory if needed.
The log is one continuous byte stream striped across fixed-size files, so it behaves exactly like a single-file log — records span segment boundaries freely — while keeping each file bounded for recovery and archival. Records larger than a segment simply occupy several.
§Errors
Returns WalError::Io if segment_size is zero or the directory cannot
be opened or scanned.
§Examples
use wal_db::Wal;
let wal = Wal::open_segmented(dir.path(), 16 * 1024 * 1024)?; // 16 MiB segments
wal.append(b"record")?;
wal.sync()?;Sourcepub fn open_segmented_with(
dir: impl AsRef<Path>,
segment_size: u64,
config: WalConfig,
) -> Result<Self>
pub fn open_segmented_with( dir: impl AsRef<Path>, segment_size: u64, config: WalConfig, ) -> Result<Self>
Open a segmented log with an explicit WalConfig.
Like open_segmented, but applies config (for
example a tighter max_record_size).
§Errors
Returns WalError::Io if segment_size is zero or the directory cannot
be opened or scanned.
Source§impl<S: WalStore> Wal<S>
impl<S: WalStore> Wal<S>
Sourcepub fn with_store(store: S) -> Result<Self>
pub fn with_store(store: S) -> Result<Self>
Sourcepub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self>
pub fn with_store_and_config(store: S, config: WalConfig) -> Result<Self>
Build a log over a custom WalStore with an explicit WalConfig.
§Errors
Returns an error if scanning the existing contents of the store fails.
§Examples
use wal_db::{MemStore, Wal, WalConfig};
let config = WalConfig::new().with_max_record_size(64);
let wal = Wal::with_store_and_config(MemStore::new(), config)?;Sourcepub fn append(&self, record: &[u8]) -> Result<Lsn>
pub fn append(&self, record: &[u8]) -> Result<Lsn>
Append record to the log and return the Lsn it was assigned — the
byte offset where the record begins.
Lock-free: the byte range is reserved with one atomic step and the record
is written without blocking other appenders. Returns once the bytes are
in the operating system’s page cache. It does not flush the disk —
call sync for that. A crash between append and sync may
lose the record.
§Errors
WalError::RecordTooLargeifrecordis larger than the configuredmax_record_size. The log is unchanged.WalError::Ioif the write fails. The reserved range becomes a permanent gap: the log is durable only up to that point, recovery stops there, and later syncs covering it report the truncation.
§Examples
use wal_db::{MemStore, Wal};
let wal = Wal::with_store(MemStore::new())?;
let lsn = wal.append(b"some bytes")?;
assert_eq!(lsn.get(), 0);Sourcepub fn sync(&self) -> Result<()>
pub fn sync(&self) -> Result<()>
Make every record appended before this call durable.
Returns once the data is on stable storage, using the platform’s true durability barrier. Concurrent calls coalesce into a single fsync, so the flush cost is shared by everyone committing at the same time.
§Errors
Returns WalError::Io if the flush fails, or WalError::Corruption
if an earlier append’s write failed and left a gap that cannot be made
durable. A failed sync means the records are not durable; treat it as
fatal, not as something to retry blindly.
§Examples
use wal_db::Wal;
let wal = Wal::open(&path)?;
wal.append(b"durable me")?;
wal.sync()?; // now on stable storageSourcepub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn>
pub fn append_and_sync(&self, record: &[u8]) -> Result<Lsn>
Append record and make it durable in one call, returning its Lsn.
Equivalent to append followed by a sync
scoped to this record, but with the sync coalesced into the group commit
of any other threads syncing at the same moment. Use it when every record
must be durable before you proceed and you want the group-commit
throughput without managing the two calls yourself.
§Errors
The union of append’s and sync’s errors.
§Examples
use wal_db::Wal;
let wal = Wal::open(&path)?;
let lsn = wal.append_and_sync(b"committed immediately")?;Sourcepub fn append_typed<T: Serialize + ?Sized>(&self, value: &T) -> Result<Lsn>
pub fn append_typed<T: Serialize + ?Sized>(&self, value: &T) -> Result<Lsn>
Serialise value with pack-io and append it, returning its Lsn.
The typed counterpart to append: the value is encoded to
bytes and appended as one record, which Record::decode reads back.
Available with the pack-io feature. Like append, it does not sync.
§Errors
WalError::Encodingif the value fails to serialise.- Otherwise the errors of
append(WalError::RecordTooLarge,WalError::Io).
§Examples
use wal_db::{MemStore, Wal};
use wal_db::pack_io::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct Entry {
key: String,
value: u64,
}
let wal = Wal::with_store(MemStore::new())?;
wal.append_typed(&Entry { key: "balance".into(), value: 100 })?;
let entry: Entry = wal.iter()?.next().unwrap()?.decode()?;
assert_eq!(entry.value, 100);Sourcepub fn iter(&self) -> Result<WalIter<'_, S>>
pub fn iter(&self) -> Result<WalIter<'_, S>>
Iterate the log from the beginning, yielding each record in append order.
The iterator walks the records that are fully written at the moment it is
created — it does not see records still being written by other threads, or
appended afterwards. Each item is a Result: a damaged record yields a
single WalError::Corruption and then the iterator stops. In a log
opened normally the torn tail has already been truncated, so iteration
runs cleanly to the end.
§Examples
use wal_db::{MemStore, Wal};
let wal = Wal::with_store(MemStore::new())?;
wal.append(b"one")?;
wal.append(b"two")?;
let mut seen = Vec::new();
for entry in wal.iter()? {
seen.push(entry?.into_data());
}
assert_eq!(seen, vec![b"one".to_vec(), b"two".to_vec()]);Sourcepub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>>
pub fn iter_from(&self, from: Lsn) -> Result<WalIter<'_, S>>
Iterate from from (a record’s Lsn) to the end, skipping the records
before it.
Because an LSN is a byte offset, seeking is O(1): iteration simply starts
at from instead of 0. Pass an Lsn that a previous
append or iter produced — a real record
boundary. An Lsn that does not land on a record boundary will be read as
a malformed record and surface as WalError::Corruption; an Lsn past
the end yields an empty iterator.
§Examples
use wal_db::{MemStore, Wal};
let wal = Wal::with_store(MemStore::new())?;
wal.append(b"one")?;
let second = wal.append(b"two")?;
wal.append(b"three")?;
let from_second: Vec<Vec<u8>> = wal
.iter_from(second)?
.map(|entry| entry.map(|r| r.into_data()))
.collect::<Result<_, _>>()?;
assert_eq!(from_second, vec![b"two".to_vec(), b"three".to_vec()]);Sourcepub fn truncate_after(&self, lsn: Lsn) -> Result<()>
pub fn truncate_after(&self, lsn: Lsn) -> Result<()>
Drop every record after the one at lsn, keeping the log up to and
including it. For compaction.
The record at lsn becomes the new last record; the next append lands
right after it. The truncation is made durable before returning. lsn
must be a real record boundary from a previous append or
iter, and the record there must be intact.
§Exclusive access
This mutates the log’s end, so it must not run concurrently with
append, sync, or another truncate_after.
The caller is responsible for quiescing writers first — the usual case for
compaction, where the engine pauses the log, truncates, and resumes.
§Errors
WalError::Corruptioniflsndoes not point at an intact record.WalError::Ioif the truncation or its sync fails.
§Examples
use wal_db::{MemStore, Wal};
let wal = Wal::with_store(MemStore::new())?;
wal.append(b"keep me")?;
let last_kept = wal.append(b"and me")?;
wal.append(b"drop me")?;
wal.truncate_after(last_kept)?;
let remaining: Vec<Vec<u8>> = wal
.iter()?
.map(|entry| entry.map(|r| r.into_data()))
.collect::<Result<_, _>>()?;
assert_eq!(remaining, vec![b"keep me".to_vec(), b"and me".to_vec()]);Sourcepub fn truncate_before(&self, lsn: Lsn) -> Result<Lsn>
pub fn truncate_before(&self, lsn: Lsn) -> Result<Lsn>
Drop the records before the one at lsn, keeping the log from there on,
and return the new head Lsn — the lowest record still present.
This is prefix compaction: once a consumer has durably applied (and
flushed elsewhere) everything up to a checkpoint, the old records can be
reclaimed. Offsets are preserved — surviving records keep their LSNs — so
iter and iter_from continue to work.
Reading resumes at exactly lsn — the returned head is lsn itself
(clamped so it never moves backward or past the end). Reclamation,
though, is at the backend’s granularity: a segmented log
(Wal::open_segmented) deletes whole leading segment
files below the one that holds lsn, so a little space just before lsn —
back to its segment boundary — is kept rather than reclaimed, and the
segment with the most recent records is never dropped. A single-file log
cannot reclaim a prefix without moving the surviving bytes (which would
change their LSNs), so it is left unchanged and the returned head is
Lsn(0).
§Exclusive access
Like truncate_after, this must not run
concurrently with append, sync,
iter, or another truncation: it removes files a reader could
be holding open. Quiesce other users first.
§Errors
Returns WalError::Io if the removal fails.
§Examples
use wal_db::Wal;
// 32-byte segments so a handful of records spans several files.
let wal = Wal::open_segmented(dir.path(), 32)?;
for i in 0..10 {
let _ = wal.append(format!("record {i}").as_bytes())?;
}
let checkpoint = wal.append(b"checkpoint")?;
wal.sync()?;
// Reclaim everything before the checkpoint's segment.
let head = wal.truncate_before(checkpoint)?;
assert!(head <= checkpoint);
// Iteration now starts at (or before) the checkpoint, never at 0.
assert!(wal.iter()?.next().unwrap()?.lsn() >= head);