Skip to main content

wal_db/
segment.rs

1//! Segmented file storage.
2//!
3//! A [`SegmentedStore`] presents one continuous byte address space — the same
4//! flat space the log already uses, since an [`Lsn`](crate::Lsn) is a byte offset
5//! — striped across fixed-size segment files on disk. A write or read that
6//! crosses a segment boundary is split across the two files; a record may span
7//! boundaries freely, exactly as PostgreSQL's WAL records span its 16 MiB
8//! segments. Bounded segments keep recovery time bounded and let old, fully
9//! superseded segments be archived or pruned.
10//!
11//! Because the address space stays contiguous, the log's append, recovery, and
12//! iteration logic do not change at all: a [`Wal`](crate::Wal) over a
13//! `SegmentedStore` behaves identically to one over a single file, only with the
14//! bytes spread across `00000000000000000000.wal`, `00000000000000000001.wal`,
15//! and so on inside a directory.
16
17use std::{
18    collections::HashMap,
19    ffi::OsStr,
20    fs::{self, File, OpenOptions},
21    io,
22    path::{Path, PathBuf},
23    sync::{
24        Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
25        atomic::{AtomicU64, Ordering},
26    },
27};
28
29use crate::{
30    error::{Result, WalError},
31    store::{WalStore, durable_sync, pread_fill, pwrite_all},
32};
33
34/// The fixed width of a segment file name, in zero-padded decimal digits — wide
35/// enough for any `u64` index and lexically sortable.
36const NAME_DIGITS: usize = 20;
37/// The segment file extension.
38const NAME_EXT: &str = "wal";
39
40/// A [`WalStore`] that stripes one flat byte space across fixed-size segment
41/// files in a directory.
42///
43/// Open one with [`SegmentedStore::open`] and hand it to
44/// [`Wal::with_store`](crate::Wal::with_store), or use the
45/// [`Wal::open_segmented`](crate::Wal::open_segmented) convenience constructor.
46/// Segments are created lazily as the log grows, and [`sync`](WalStore::sync)
47/// flushes only the segments with unwritten changes, not the whole history.
48///
49/// # Examples
50///
51/// ```
52/// use wal_db::{SegmentedStore, Wal};
53///
54/// # fn main() -> Result<(), wal_db::WalError> {
55/// # let dir = tempfile::tempdir().map_err(wal_db::WalError::from)?;
56/// // 1 MiB segments. Records larger than a segment simply span several.
57/// let store = SegmentedStore::open(dir.path(), 1024 * 1024)?;
58/// let wal = Wal::with_store(store)?;
59/// wal.append(b"spans nothing yet")?;
60/// wal.sync()?;
61/// # Ok(())
62/// # }
63/// ```
64#[derive(Debug)]
65pub struct SegmentedStore {
66    dir: PathBuf,
67    segment_size: u64,
68    segments: RwLock<HashMap<u64, Arc<File>>>,
69    /// Highest byte offset ever written, used to decide which segments still
70    /// need flushing.
71    max_written: AtomicU64,
72    /// Index of the lowest segment that may still have unflushed writes. Every
73    /// segment below it is full and durable, so `sync` skips it.
74    synced_from: AtomicU64,
75}
76
77impl SegmentedStore {
78    /// Open the segmented log in `dir`, creating the directory if needed, with
79    /// segments of `segment_size` bytes.
80    ///
81    /// Existing segment files are picked up so the log can be recovered. The
82    /// store does not validate record contents — that is
83    /// [`Wal::open`](crate::Wal::open)'s recovery scan, which runs unchanged over
84    /// the flat space.
85    ///
86    /// # Errors
87    ///
88    /// Returns [`WalError::Io`] if `segment_size` is zero, or if the directory
89    /// cannot be created or read.
90    pub fn open(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
91        if segment_size == 0 {
92            return Err(WalError::io(
93                "opening the segmented log",
94                io::Error::other("segment size must be non-zero"),
95            ));
96        }
97        let dir = dir.as_ref().to_path_buf();
98        fs::create_dir_all(&dir).map_err(|e| WalError::io("creating the log directory", e))?;
99
100        // Find the highest existing segment to compute the current logical length.
101        let mut highest: Option<(u64, u64)> = None; // (index, file length)
102        for entry in fs::read_dir(&dir).map_err(|e| WalError::io("reading the log directory", e))? {
103            let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
104            if let Some(index) = parse_segment_name(&entry.file_name()) {
105                let len = entry
106                    .metadata()
107                    .map_err(|e| WalError::io("reading segment metadata", e))?
108                    .len();
109                if highest.is_none_or(|(h, _)| index > h) {
110                    highest = Some((index, len));
111                }
112            }
113        }
114
115        let total_len = match highest {
116            Some((index, len)) => index.saturating_mul(segment_size).saturating_add(len),
117            None => 0,
118        };
119        let active = total_len / segment_size;
120
121        Ok(SegmentedStore {
122            dir,
123            segment_size,
124            segments: RwLock::new(HashMap::new()),
125            max_written: AtomicU64::new(total_len),
126            // Everything already on disk is treated as durable on open.
127            synced_from: AtomicU64::new(active),
128        })
129    }
130
131    /// The directory holding the segment files.
132    #[must_use]
133    pub fn dir(&self) -> &Path {
134        &self.dir
135    }
136
137    /// The configured segment size in bytes.
138    #[must_use]
139    pub fn segment_size(&self) -> u64 {
140        self.segment_size
141    }
142
143    fn read_map(&self) -> RwLockReadGuard<'_, HashMap<u64, Arc<File>>> {
144        self.segments.read().unwrap_or_else(PoisonError::into_inner)
145    }
146
147    fn write_map(&self) -> RwLockWriteGuard<'_, HashMap<u64, Arc<File>>> {
148        self.segments
149            .write()
150            .unwrap_or_else(PoisonError::into_inner)
151    }
152
153    /// Get the handle for segment `index`, creating the file if it does not
154    /// exist yet.
155    fn segment_for_write(&self, index: u64) -> Result<Arc<File>> {
156        if let Some(file) = self.read_map().get(&index) {
157            return Ok(Arc::clone(file));
158        }
159        let mut map = self.write_map();
160        if let Some(file) = map.get(&index) {
161            return Ok(Arc::clone(file));
162        }
163        let path = self.dir.join(segment_name(index));
164        let file = OpenOptions::new()
165            .read(true)
166            .write(true)
167            .create(true)
168            .truncate(false)
169            .open(&path)
170            .map_err(|e| WalError::io("creating a log segment", e))?;
171        let file = Arc::new(file);
172        let _ = map.insert(index, Arc::clone(&file));
173        Ok(file)
174    }
175
176    /// Get the handle for segment `index` only if it already exists, without
177    /// creating it. `None` means the log ends before this segment.
178    fn segment_for_read(&self, index: u64) -> Result<Option<Arc<File>>> {
179        if let Some(file) = self.read_map().get(&index) {
180            return Ok(Some(Arc::clone(file)));
181        }
182        let path = self.dir.join(segment_name(index));
183        match OpenOptions::new().read(true).write(true).open(&path) {
184            Ok(file) => {
185                let file = Arc::new(file);
186                let mut map = self.write_map();
187                if let Some(existing) = map.get(&index) {
188                    return Ok(Some(Arc::clone(existing)));
189                }
190                let _ = map.insert(index, Arc::clone(&file));
191                Ok(Some(file))
192            }
193            Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
194            Err(error) => Err(WalError::io("opening a log segment", error)),
195        }
196    }
197
198    /// Look up an already-open segment without touching the filesystem.
199    fn open_segment(&self, index: u64) -> Option<Arc<File>> {
200        self.read_map().get(&index).map(Arc::clone)
201    }
202}
203
204impl WalStore for SegmentedStore {
205    fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
206        let mut pos = offset;
207        let mut remaining = bytes;
208        while !remaining.is_empty() {
209            let index = pos / self.segment_size;
210            let local = pos % self.segment_size;
211            let room = (self.segment_size - local) as usize;
212            let take = remaining.len().min(room);
213
214            let file = self.segment_for_write(index)?;
215            pwrite_all(&file, local, &remaining[..take])
216                .map_err(|e| WalError::io("writing a record", e))?;
217
218            pos += take as u64;
219            remaining = &remaining[take..];
220        }
221        let end = offset.saturating_add(bytes.len() as u64);
222        let _ = self.max_written.fetch_max(end, Ordering::Relaxed);
223        Ok(())
224    }
225
226    fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
227        let mut pos = offset;
228        let mut filled = 0;
229        while filled < buf.len() {
230            let index = pos / self.segment_size;
231            let local = pos % self.segment_size;
232            let room = (self.segment_size - local) as usize;
233            let want = (buf.len() - filled).min(room);
234
235            let Some(file) = self.segment_for_read(index)? else {
236                break; // no such segment: the log ends here
237            };
238            let got = pread_fill(&file, local, &mut buf[filled..filled + want])
239                .map_err(|e| WalError::io("reading from the log", e))?;
240            filled += got;
241            pos += got as u64;
242            if got < want {
243                break; // short read within a segment: the log ends here
244            }
245        }
246        Ok(filled)
247    }
248
249    fn truncate(&self, len: u64) -> Result<()> {
250        let last_index = len / self.segment_size;
251        let last_local = len % self.segment_size;
252
253        // Walk the directory so segments not currently open are handled too.
254        let entries =
255            fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
256        for entry in entries {
257            let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
258            let Some(index) = parse_segment_name(&entry.file_name()) else {
259                continue;
260            };
261            match index.cmp(&last_index) {
262                std::cmp::Ordering::Greater => {
263                    // Entirely past the new end: drop the segment.
264                    fs::remove_file(entry.path())
265                        .map_err(|e| WalError::io("removing a truncated segment", e))?;
266                    let _ = self.write_map().remove(&index);
267                }
268                std::cmp::Ordering::Equal => {
269                    // Straddles the new end: shrink it to the kept bytes.
270                    let file = self.segment_for_write(index)?;
271                    file.set_len(last_local)
272                        .map_err(|e| WalError::io("truncating a log segment", e))?;
273                }
274                std::cmp::Ordering::Less => {}
275            }
276        }
277
278        self.max_written.store(len, Ordering::Relaxed);
279        self.synced_from.store(last_index, Ordering::Relaxed);
280        Ok(())
281    }
282
283    fn sync(&self) -> Result<()> {
284        let written = self.max_written.load(Ordering::Acquire);
285        if written == 0 {
286            return Ok(());
287        }
288        // The segment holding the last written byte.
289        let active = (written - 1) / self.segment_size;
290        let from = self.synced_from.load(Ordering::Acquire);
291
292        for index in from..=active {
293            if let Some(file) = self.open_segment(index) {
294                durable_sync(&file).map_err(|e| WalError::io("flushing to stable storage", e))?;
295            }
296        }
297        // Every segment below `active` is now full and durable; the active one
298        // may still grow, so it stays in the window for the next sync.
299        self.synced_from.store(active, Ordering::Release);
300        Ok(())
301    }
302
303    fn len(&self) -> Result<u64> {
304        Ok(self.max_written.load(Ordering::Acquire))
305    }
306}
307
308/// The file name for segment `index`: zero-padded decimal, then `.wal`.
309fn segment_name(index: u64) -> String {
310    format!("{index:0NAME_DIGITS$}.{NAME_EXT}")
311}
312
313/// Parse a segment index out of a file name, or `None` if it is not a segment
314/// file. Only names of exactly the expected shape are accepted, so unrelated
315/// files in the directory are ignored.
316fn parse_segment_name(name: &OsStr) -> Option<u64> {
317    let name = name.to_str()?;
318    let stem = name.strip_suffix(&format!(".{NAME_EXT}"))?;
319    if stem.len() != NAME_DIGITS || !stem.bytes().all(|b| b.is_ascii_digit()) {
320        return None;
321    }
322    stem.parse().ok()
323}
324
325#[cfg(test)]
326#[allow(clippy::unwrap_used, clippy::expect_used)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_segment_name_roundtrips() {
332        assert_eq!(segment_name(0), "00000000000000000000.wal");
333        assert_eq!(segment_name(42), "00000000000000000042.wal");
334        assert_eq!(
335            parse_segment_name(OsStr::new("00000000000000000042.wal")),
336            Some(42)
337        );
338        assert_eq!(parse_segment_name(OsStr::new("README.md")), None);
339        assert_eq!(parse_segment_name(OsStr::new("42.wal")), None);
340        assert_eq!(
341            parse_segment_name(OsStr::new("0000000000000000004x.wal")),
342            None
343        );
344    }
345
346    #[test]
347    fn test_write_read_within_one_segment() {
348        let dir = tempfile::tempdir().unwrap();
349        let store = SegmentedStore::open(dir.path(), 64).unwrap();
350        store.write_at(0, b"hello").unwrap();
351        store.sync().unwrap();
352
353        let mut buf = [0u8; 5];
354        assert_eq!(store.read_at(0, &mut buf).unwrap(), 5);
355        assert_eq!(&buf, b"hello");
356        assert_eq!(store.len().unwrap(), 5);
357    }
358
359    #[test]
360    fn test_write_spans_segment_boundary() {
361        let dir = tempfile::tempdir().unwrap();
362        // 8-byte segments force a 12-byte write to span two segments.
363        let store = SegmentedStore::open(dir.path(), 8).unwrap();
364        store.write_at(0, b"ABCDEFGHIJKL").unwrap(); // 12 bytes -> segments 0 and 1
365        store.sync().unwrap();
366
367        // Two segment files exist.
368        assert!(dir.path().join("00000000000000000000.wal").exists());
369        assert!(dir.path().join("00000000000000000001.wal").exists());
370
371        let mut buf = [0u8; 12];
372        assert_eq!(store.read_at(0, &mut buf).unwrap(), 12);
373        assert_eq!(&buf, b"ABCDEFGHIJKL");
374    }
375
376    #[test]
377    fn test_read_at_arbitrary_offset_across_boundary() {
378        let dir = tempfile::tempdir().unwrap();
379        let store = SegmentedStore::open(dir.path(), 4).unwrap();
380        store.write_at(0, b"0123456789").unwrap();
381        let mut buf = [0u8; 5];
382        let n = store.read_at(3, &mut buf).unwrap(); // spans segments 0,1,2
383        assert_eq!(n, 5);
384        assert_eq!(&buf, b"34567");
385    }
386
387    #[test]
388    fn test_reopen_reports_correct_length() {
389        let dir = tempfile::tempdir().unwrap();
390        {
391            let store = SegmentedStore::open(dir.path(), 8).unwrap();
392            store.write_at(0, b"ABCDEFGHIJKLM").unwrap(); // 13 bytes -> segs 0(full),1(partial)
393            store.sync().unwrap();
394            assert_eq!(store.len().unwrap(), 13);
395        }
396        let store = SegmentedStore::open(dir.path(), 8).unwrap();
397        assert_eq!(store.len().unwrap(), 13);
398        let mut buf = [0u8; 13];
399        assert_eq!(store.read_at(0, &mut buf).unwrap(), 13);
400        assert_eq!(&buf, b"ABCDEFGHIJKLM");
401    }
402
403    #[test]
404    fn test_truncate_removes_later_segments() {
405        let dir = tempfile::tempdir().unwrap();
406        let store = SegmentedStore::open(dir.path(), 8).unwrap();
407        store.write_at(0, &[0xAB; 30]).unwrap(); // segments 0..=3
408        store.sync().unwrap();
409        assert!(dir.path().join("00000000000000000003.wal").exists());
410
411        store.truncate(10).unwrap(); // keep segment 0 (full) + 2 bytes of segment 1
412        assert_eq!(store.len().unwrap(), 10);
413        assert!(dir.path().join("00000000000000000001.wal").exists());
414        assert!(!dir.path().join("00000000000000000002.wal").exists());
415        assert!(!dir.path().join("00000000000000000003.wal").exists());
416
417        let mut buf = [0u8; 16];
418        assert_eq!(store.read_at(0, &mut buf).unwrap(), 10);
419    }
420
421    #[test]
422    fn test_read_past_end_is_short() {
423        let dir = tempfile::tempdir().unwrap();
424        let store = SegmentedStore::open(dir.path(), 8).unwrap();
425        store.write_at(0, b"abc").unwrap();
426        let mut buf = [0u8; 16];
427        assert_eq!(store.read_at(0, &mut buf).unwrap(), 3);
428        assert_eq!(store.read_at(100, &mut buf).unwrap(), 0);
429    }
430}