Skip to main content

wal_db/
segment.rs

1//! Segmented file storage.
2//!
3//! A [`SegmentedStore`] presents one continuous byte address space — the same
4//! flat space the log already uses, since an [`Lsn`](crate::Lsn) is a byte offset
5//! — striped across fixed-size segment files on disk. A write or read that
6//! crosses a segment boundary is split across the two files; a record may span
7//! boundaries freely, exactly as PostgreSQL's WAL records span its 16 MiB
8//! segments. Bounded segments keep recovery time bounded and let old, fully
9//! superseded segments be archived or pruned.
10//!
11//! Because the address space stays contiguous, the log's append, recovery, and
12//! iteration logic do not change at all: a [`Wal`](crate::Wal) over a
13//! `SegmentedStore` behaves identically to one over a single file, only with the
14//! bytes spread across `00000000000000000000.wal`, `00000000000000000001.wal`,
15//! and so on inside a directory.
16
17use std::{
18    collections::HashMap,
19    ffi::OsStr,
20    fs::{self, File, OpenOptions},
21    io,
22    path::{Path, PathBuf},
23    sync::{
24        Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
25        atomic::{AtomicU64, Ordering},
26    },
27};
28
29use crate::{
30    error::{Result, WalError},
31    store::{WalStore, durable_sync, pread_fill, pwrite_all},
32};
33
34/// The fixed width of a segment file name, in zero-padded decimal digits — wide
35/// enough for any `u64` index and lexically sortable.
36const NAME_DIGITS: usize = 20;
37/// The segment file extension.
38const NAME_EXT: &str = "wal";
39/// Name of the file that records the log's head (its lowest surviving offset)
40/// after a prefix has been dropped. Absent until the first `truncate_before`.
41const HEAD_FILE: &str = "head";
42/// Size of the head marker: an 8-byte offset plus a 4-byte CRC32C of it.
43const HEAD_FILE_LEN: usize = 12;
44
45/// A [`WalStore`] that stripes one flat byte space across fixed-size segment
46/// files in a directory.
47///
48/// Open one with [`SegmentedStore::open`] and hand it to
49/// [`Wal::with_store`](crate::Wal::with_store), or use the
50/// [`Wal::open_segmented`](crate::Wal::open_segmented) convenience constructor.
51/// Segments are created lazily as the log grows, and [`sync`](WalStore::sync)
52/// flushes only the segments with unwritten changes, not the whole history.
53///
54/// # Examples
55///
56/// ```
57/// use wal_db::{SegmentedStore, Wal};
58///
59/// # fn main() -> Result<(), wal_db::WalError> {
60/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
61/// // 1 MiB segments. Records larger than a segment simply span several.
62/// let store = SegmentedStore::open(dir.path(), 1024 * 1024)?;
63/// let wal = Wal::with_store(store)?;
64/// wal.append(b"spans nothing yet")?;
65/// wal.sync()?;
66/// # Ok(())
67/// # }
68/// ```
69#[derive(Debug)]
70pub struct SegmentedStore {
71    dir: PathBuf,
72    segment_size: u64,
73    segments: RwLock<HashMap<u64, Arc<File>>>,
74    /// Highest byte offset ever written, used to decide which segments still
75    /// need flushing.
76    max_written: AtomicU64,
77    /// Index of the lowest segment that may still have unflushed writes. Every
78    /// segment below it is full and durable, so `sync` skips it.
79    synced_from: AtomicU64,
80    /// Lowest offset still present — the start of the lowest segment that has not
81    /// been dropped by `truncate_before`. Recovery scans from here.
82    head: AtomicU64,
83}
84
85impl SegmentedStore {
86    /// Open the segmented log in `dir`, creating the directory if needed, with
87    /// segments of `segment_size` bytes.
88    ///
89    /// Existing segment files are picked up so the log can be recovered. The
90    /// store does not validate record contents — that is
91    /// [`Wal::open`](crate::Wal::open)'s recovery scan, which runs unchanged over
92    /// the flat space.
93    ///
94    /// # Errors
95    ///
96    /// Returns [`WalError::Io`] if `segment_size` is zero, or if the directory
97    /// cannot be created or read.
98    pub fn open(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
99        if segment_size == 0 {
100            return Err(WalError::io(
101                "opening the segmented log",
102                io::Error::other("segment size must be non-zero"),
103            ));
104        }
105        let dir = dir.as_ref().to_path_buf();
106        fs::create_dir_all(&dir).map_err(|e| WalError::io("creating the log directory", e))?;
107
108        // Find the highest existing segment to compute the current logical length.
109        let mut highest: Option<(u64, u64)> = None; // (index, file length)
110        for entry in fs::read_dir(&dir).map_err(|e| WalError::io("reading the log directory", e))? {
111            let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
112            if let Some(index) = parse_segment_name(&entry.file_name()) {
113                let len = entry
114                    .metadata()
115                    .map_err(|e| WalError::io("reading segment metadata", e))?
116                    .len();
117                if highest.is_none_or(|(h, _)| index > h) {
118                    highest = Some((index, len));
119                }
120            }
121        }
122
123        let total_len = match highest {
124            Some((index, len)) => index.saturating_mul(segment_size).saturating_add(len),
125            None => 0,
126        };
127        let active = total_len / segment_size;
128        // The head is the exact record boundary recorded by the last prefix
129        // truncation, or 0 for a log that has never had one. It is durable, so
130        // recovery resumes from the same place after a crash.
131        let head = read_head_file(&dir)?.unwrap_or(0).min(total_len);
132
133        Ok(SegmentedStore {
134            dir,
135            segment_size,
136            segments: RwLock::new(HashMap::new()),
137            max_written: AtomicU64::new(total_len),
138            // Everything already on disk is treated as durable on open.
139            synced_from: AtomicU64::new(active),
140            head: AtomicU64::new(head),
141        })
142    }
143
144    /// The directory holding the segment files.
145    #[must_use]
146    pub fn dir(&self) -> &Path {
147        &self.dir
148    }
149
150    /// The configured segment size in bytes.
151    #[must_use]
152    pub fn segment_size(&self) -> u64 {
153        self.segment_size
154    }
155
156    fn read_map(&self) -> RwLockReadGuard<'_, HashMap<u64, Arc<File>>> {
157        self.segments.read().unwrap_or_else(PoisonError::into_inner)
158    }
159
160    fn write_map(&self) -> RwLockWriteGuard<'_, HashMap<u64, Arc<File>>> {
161        self.segments
162            .write()
163            .unwrap_or_else(PoisonError::into_inner)
164    }
165
166    /// Get the handle for segment `index`, creating the file if it does not
167    /// exist yet.
168    fn segment_for_write(&self, index: u64) -> Result<Arc<File>> {
169        if let Some(file) = self.read_map().get(&index) {
170            return Ok(Arc::clone(file));
171        }
172        let mut map = self.write_map();
173        if let Some(file) = map.get(&index) {
174            return Ok(Arc::clone(file));
175        }
176        let path = self.dir.join(segment_name(index));
177        let file = OpenOptions::new()
178            .read(true)
179            .write(true)
180            .create(true)
181            .truncate(false)
182            .open(&path)
183            .map_err(|e| WalError::io("creating a log segment", e))?;
184        let file = Arc::new(file);
185        let _ = map.insert(index, Arc::clone(&file));
186        Ok(file)
187    }
188
189    /// Get the handle for segment `index` only if it already exists, without
190    /// creating it. `None` means the log ends before this segment.
191    fn segment_for_read(&self, index: u64) -> Result<Option<Arc<File>>> {
192        if let Some(file) = self.read_map().get(&index) {
193            return Ok(Some(Arc::clone(file)));
194        }
195        let path = self.dir.join(segment_name(index));
196        match OpenOptions::new().read(true).write(true).open(&path) {
197            Ok(file) => {
198                let file = Arc::new(file);
199                let mut map = self.write_map();
200                if let Some(existing) = map.get(&index) {
201                    return Ok(Some(Arc::clone(existing)));
202                }
203                let _ = map.insert(index, Arc::clone(&file));
204                Ok(Some(file))
205            }
206            Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
207            Err(error) => Err(WalError::io("opening a log segment", error)),
208        }
209    }
210
211    /// Look up an already-open segment without touching the filesystem.
212    fn open_segment(&self, index: u64) -> Option<Arc<File>> {
213        self.read_map().get(&index).map(Arc::clone)
214    }
215
216    /// Durably record `head` so a later open resumes from the same boundary.
217    ///
218    /// The marker is checksummed (`[head: u64][crc32c(head): u32]`) so a torn
219    /// write of the marker itself is detected on read rather than trusted — a
220    /// corrupt marker must never make recovery skip live records.
221    fn write_head_file(&self, head: u64) -> Result<()> {
222        let mut buf = [0u8; HEAD_FILE_LEN];
223        buf[..8].copy_from_slice(&head.to_le_bytes());
224        let crc = crc32c::crc32c(&buf[..8]);
225        buf[8..].copy_from_slice(&crc.to_le_bytes());
226
227        let path = self.dir.join(HEAD_FILE);
228        let file = OpenOptions::new()
229            .write(true)
230            .create(true)
231            .truncate(true)
232            .open(&path)
233            .map_err(|e| WalError::io("writing the head marker", e))?;
234        pwrite_all(&file, 0, &buf).map_err(|e| WalError::io("writing the head marker", e))?;
235        durable_sync(&file).map_err(|e| WalError::io("flushing the head marker", e))?;
236        Ok(())
237    }
238}
239
240/// Read the durably recorded head, or `None` if no prefix has been dropped.
241///
242/// A marker that is absent, too short, or whose checksum does not match is taken
243/// as `None`, so the head falls back to 0 and recovery reads the whole log. That
244/// is always safe: a dropped prefix that the marker can no longer vouch for is
245/// simply re-read, never skipped.
246fn read_head_file(dir: &Path) -> Result<Option<u64>> {
247    match fs::read(dir.join(HEAD_FILE)) {
248        Ok(bytes) if bytes.len() >= HEAD_FILE_LEN => {
249            let head = u64::from_le_bytes([
250                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
251            ]);
252            let stored = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
253            if crc32c::crc32c(&bytes[..8]) == stored {
254                Ok(Some(head))
255            } else {
256                Ok(None) // torn or corrupt marker: fall back to full recovery
257            }
258        }
259        Ok(_) => Ok(None),
260        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
261        Err(error) => Err(WalError::io("reading the head marker", error)),
262    }
263}
264
265impl WalStore for SegmentedStore {
266    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
267        let mut pos = offset;
268        let mut remaining = bytes;
269        while !remaining.is_empty() {
270            let index = pos / self.segment_size;
271            let local = pos % self.segment_size;
272            let room = (self.segment_size - local) as usize;
273            let take = remaining.len().min(room);
274
275            let file = self.segment_for_write(index)?;
276            pwrite_all(&file, local, &remaining[..take])
277                .map_err(|e| WalError::io("writing a record", e))?;
278
279            pos += take as u64;
280            remaining = &remaining[take..];
281        }
282        let end = offset.saturating_add(bytes.len() as u64);
283        let _ = self.max_written.fetch_max(end, Ordering::Relaxed);
284        Ok(())
285    }
286
287    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
288        let mut pos = offset;
289        let mut filled = 0;
290        while filled < buf.len() {
291            let index = pos / self.segment_size;
292            let local = pos % self.segment_size;
293            let room = (self.segment_size - local) as usize;
294            let want = (buf.len() - filled).min(room);
295
296            let Some(file) = self.segment_for_read(index)? else {
297                break; // no such segment: the log ends here
298            };
299            let got = pread_fill(&file, local, &mut buf[filled..filled + want])
300                .map_err(|e| WalError::io("reading from the log", e))?;
301            filled += got;
302            pos += got as u64;
303            if got < want {
304                break; // short read within a segment: the log ends here
305            }
306        }
307        Ok(filled)
308    }
309
310    fn truncate(&self, len: u64) -> Result<()> {
311        let last_index = len / self.segment_size;
312        let last_local = len % self.segment_size;
313
314        // Walk the directory so segments not currently open are handled too.
315        let entries =
316            fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
317        for entry in entries {
318            let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
319            let Some(index) = parse_segment_name(&entry.file_name()) else {
320                continue;
321            };
322            match index.cmp(&last_index) {
323                std::cmp::Ordering::Greater => {
324                    // Entirely past the new end: drop the segment.
325                    fs::remove_file(entry.path())
326                        .map_err(|e| WalError::io("removing a truncated segment", e))?;
327                    let _ = self.write_map().remove(&index);
328                }
329                std::cmp::Ordering::Equal => {
330                    // Straddles the new end: shrink it to the kept bytes.
331                    let file = self.segment_for_write(index)?;
332                    file.set_len(last_local)
333                        .map_err(|e| WalError::io("truncating a log segment", e))?;
334                }
335                std::cmp::Ordering::Less => {}
336            }
337        }
338
339        self.max_written.store(len, Ordering::Relaxed);
340        self.synced_from.store(last_index, Ordering::Relaxed);
341        Ok(())
342    }
343
344    fn sync(&self) -> Result<()> {
345        let written = self.max_written.load(Ordering::Acquire);
346        if written == 0 {
347            return Ok(());
348        }
349        // The segment holding the last written byte.
350        let active = (written - 1) / self.segment_size;
351        let from = self.synced_from.load(Ordering::Acquire);
352
353        for index in from..=active {
354            if let Some(file) = self.open_segment(index) {
355                durable_sync(&file).map_err(|e| WalError::io("flushing to stable storage", e))?;
356            }
357        }
358        // Every segment below `active` is now full and durable; the active one
359        // may still grow, so it stays in the window for the next sync.
360        self.synced_from.store(active, Ordering::Release);
361        Ok(())
362    }
363
364    fn len(&self) -> Result<u64> {
365        Ok(self.max_written.load(Ordering::Acquire))
366    }
367
368    fn head(&self) -> Result<u64> {
369        Ok(self.head.load(Ordering::Acquire))
370    }
371
372    fn truncate_before(&self, offset: u64) -> Result<u64> {
373        let written = self.max_written.load(Ordering::Acquire);
374        // The new head is the caller's record boundary, clamped so it never moves
375        // backwards and never past the end. It is a *record* boundary, not a
376        // segment boundary: because records span segments, the lowest surviving
377        // segment may start mid-record, so the head — and where recovery begins —
378        // must be the exact offset the caller gave.
379        let prev = self.head.load(Ordering::Acquire);
380        let new_head = offset.clamp(prev, written);
381
382        // Persist the head durably *before* deleting anything: a crash mid-delete
383        // then recovers from the right boundary, and the leftover segments below it
384        // are harmless dead space.
385        self.write_head_file(new_head)?;
386
387        // Delete every segment entirely below the one that holds the new head; that
388        // segment, and the one holding the most recent records, are always kept.
389        let last_segment = written.saturating_sub(1) / self.segment_size;
390        let keep_from = (new_head / self.segment_size).min(last_segment);
391        let entries =
392            fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
393        for entry in entries {
394            let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
395            let Some(index) = parse_segment_name(&entry.file_name()) else {
396                continue;
397            };
398            if index < keep_from {
399                // Drop our handle before unlinking — Windows refuses to remove a
400                // file that still has an open handle.
401                let _ = self.write_map().remove(&index);
402                fs::remove_file(entry.path())
403                    .map_err(|e| WalError::io("removing a dropped segment", e))?;
404            }
405        }
406
407        self.head.store(new_head, Ordering::Release);
408        Ok(new_head)
409    }
410}
411
412/// The file name for segment `index`: zero-padded decimal, then `.wal`.
413fn segment_name(index: u64) -> String {
414    format!("{index:0NAME_DIGITS$}.{NAME_EXT}")
415}
416
417/// Parse a segment index out of a file name, or `None` if it is not a segment
418/// file. Only names of exactly the expected shape are accepted, so unrelated
419/// files in the directory are ignored.
420fn parse_segment_name(name: &OsStr) -> Option<u64> {
421    let name = name.to_str()?;
422    let stem = name.strip_suffix(&format!(".{NAME_EXT}"))?;
423    if stem.len() != NAME_DIGITS || !stem.bytes().all(|b| b.is_ascii_digit()) {
424        return None;
425    }
426    stem.parse().ok()
427}
428
429#[cfg(test)]
430#[allow(clippy::unwrap_used, clippy::expect_used)]
431mod tests {
432    use super::*;
433
434    #[test]
435    fn test_segment_name_roundtrips() {
436        assert_eq!(segment_name(0), "00000000000000000000.wal");
437        assert_eq!(segment_name(42), "00000000000000000042.wal");
438        assert_eq!(
439            parse_segment_name(OsStr::new("00000000000000000042.wal")),
440            Some(42)
441        );
442        assert_eq!(parse_segment_name(OsStr::new("README.md")), None);
443        assert_eq!(parse_segment_name(OsStr::new("42.wal")), None);
444        assert_eq!(
445            parse_segment_name(OsStr::new("0000000000000000004x.wal")),
446            None
447        );
448    }
449
450    #[test]
451    fn test_write_read_within_one_segment() {
452        let dir = tempfile::tempdir().unwrap();
453        let store = SegmentedStore::open(dir.path(), 64).unwrap();
454        store.write_at(0, b"hello").unwrap();
455        store.sync().unwrap();
456
457        let mut buf = [0u8; 5];
458        assert_eq!(store.read_at(0, &mut buf).unwrap(), 5);
459        assert_eq!(&buf, b"hello");
460        assert_eq!(store.len().unwrap(), 5);
461    }
462
463    #[test]
464    fn test_write_spans_segment_boundary() {
465        let dir = tempfile::tempdir().unwrap();
466        // 8-byte segments force a 12-byte write to span two segments.
467        let store = SegmentedStore::open(dir.path(), 8).unwrap();
468        store.write_at(0, b"ABCDEFGHIJKL").unwrap(); // 12 bytes -> segments 0 and 1
469        store.sync().unwrap();
470
471        // Two segment files exist.
472        assert!(dir.path().join("00000000000000000000.wal").exists());
473        assert!(dir.path().join("00000000000000000001.wal").exists());
474
475        let mut buf = [0u8; 12];
476        assert_eq!(store.read_at(0, &mut buf).unwrap(), 12);
477        assert_eq!(&buf, b"ABCDEFGHIJKL");
478    }
479
480    #[test]
481    fn test_read_at_arbitrary_offset_across_boundary() {
482        let dir = tempfile::tempdir().unwrap();
483        let store = SegmentedStore::open(dir.path(), 4).unwrap();
484        store.write_at(0, b"0123456789").unwrap();
485        let mut buf = [0u8; 5];
486        let n = store.read_at(3, &mut buf).unwrap(); // spans segments 0,1,2
487        assert_eq!(n, 5);
488        assert_eq!(&buf, b"34567");
489    }
490
491    #[test]
492    fn test_reopen_reports_correct_length() {
493        let dir = tempfile::tempdir().unwrap();
494        {
495            let store = SegmentedStore::open(dir.path(), 8).unwrap();
496            store.write_at(0, b"ABCDEFGHIJKLM").unwrap(); // 13 bytes -> segs 0(full),1(partial)
497            store.sync().unwrap();
498            assert_eq!(store.len().unwrap(), 13);
499        }
500        let store = SegmentedStore::open(dir.path(), 8).unwrap();
501        assert_eq!(store.len().unwrap(), 13);
502        let mut buf = [0u8; 13];
503        assert_eq!(store.read_at(0, &mut buf).unwrap(), 13);
504        assert_eq!(&buf, b"ABCDEFGHIJKLM");
505    }
506
507    #[test]
508    fn test_truncate_removes_later_segments() {
509        let dir = tempfile::tempdir().unwrap();
510        let store = SegmentedStore::open(dir.path(), 8).unwrap();
511        store.write_at(0, &[0xAB; 30]).unwrap(); // segments 0..=3
512        store.sync().unwrap();
513        assert!(dir.path().join("00000000000000000003.wal").exists());
514
515        store.truncate(10).unwrap(); // keep segment 0 (full) + 2 bytes of segment 1
516        assert_eq!(store.len().unwrap(), 10);
517        assert!(dir.path().join("00000000000000000001.wal").exists());
518        assert!(!dir.path().join("00000000000000000002.wal").exists());
519        assert!(!dir.path().join("00000000000000000003.wal").exists());
520
521        let mut buf = [0u8; 16];
522        assert_eq!(store.read_at(0, &mut buf).unwrap(), 10);
523    }
524
525    #[test]
526    fn test_read_past_end_is_short() {
527        let dir = tempfile::tempdir().unwrap();
528        let store = SegmentedStore::open(dir.path(), 8).unwrap();
529        store.write_at(0, b"abc").unwrap();
530        let mut buf = [0u8; 16];
531        assert_eq!(store.read_at(0, &mut buf).unwrap(), 3);
532        assert_eq!(store.read_at(100, &mut buf).unwrap(), 0);
533    }
534}