lora_wal/dir.rs
1//! Segment-directory helpers.
2//!
3//! `Wal::open`, `replay_dir`, and `Wal::truncate_up_to` all need to:
4//!
5//! - turn a [`SegmentId`] into the canonical `<NNNNNNNNNN>.wal` path,
6//! - parse a path back into a [`SegmentId`],
7//! - list every well-formed segment file in a directory in ascending
8//! id order,
9//! - read just the `base_lsn` of a segment without paying for a full
10//! record walk.
11//!
12//! The same operations were inlined in two places before the refactor.
13//! Pulling them behind a single `SegmentDir` and a `SegmentId` newtype
14//! removes the duplication and makes the magic number "10 zero-padded
15//! digits" live in exactly one location.
16//!
17//! `SegmentDir` does not hold an open `DirHandle`. Every call hits the
18//! filesystem fresh — segment listings happen at open time and at
19//! truncate time, neither of which is in any hot path, so caching is
20//! not worth the invalidation work.
21
22use std::fmt;
23use std::fs;
24use std::path::{Path, PathBuf};
25
26use crate::errors::WalError;
27use crate::io::sync_dir;
28use crate::lsn::Lsn;
29use crate::segment::SegmentReader;
30
31/// Width of the zero-padded segment id in file names. 10 digits is
32/// enough for ~10 billion segments, which at the default 8 MiB target
33/// is ~80 EiB of log. Plenty.
34const SEGMENT_ID_WIDTH: usize = 10;
35
36/// Monotonic identifier for a WAL segment file.
37///
38/// Allocation policy: ids start at 1 (`SegmentId(0)` is reserved as a
39/// "no segment" sentinel that callers should never encounter for a
40/// live WAL), and rotation simply does `id + 1`. Ids are stable: a
41/// truncated segment retains its id even after every preceding segment
42/// has been deleted.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
44#[repr(transparent)]
45pub struct SegmentId(u64);
46
47impl SegmentId {
48 pub const FIRST: SegmentId = SegmentId(1);
49
50 pub const fn new(value: u64) -> Self {
51 Self(value)
52 }
53
54 pub const fn raw(self) -> u64 {
55 self.0
56 }
57
58 pub fn checked_next(self) -> Option<Self> {
59 self.0.checked_add(1).map(Self)
60 }
61
62 pub fn next(self) -> Self {
63 self.checked_next()
64 .expect("SegmentId overflowed; WAL segment id space is exhausted")
65 }
66
67 /// Predecessor id, saturating at zero. Used for the "active and
68 /// the segment immediately preceding it" tombstone-retention rule
69 /// in [`crate::wal::Wal::truncate_up_to`].
70 pub fn saturating_prev(self) -> Self {
71 Self(self.0.saturating_sub(1))
72 }
73}
74
75impl fmt::Display for SegmentId {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 write!(f, "{}", self.0)
78 }
79}
80
81/// A `(SegmentId, PathBuf)` pair. Returned by [`SegmentDir::list`] so
82/// callers can iterate once and not have to parse the id back out of
83/// the path themselves.
84#[derive(Debug, Clone)]
85pub struct SegmentEntry {
86 pub id: SegmentId,
87 pub path: PathBuf,
88}
89
90/// Owns the canonical naming scheme for a WAL directory and the
91/// operations that depend on it. Cheap to construct (`Clone` is a
92/// `PathBuf` clone) — the type is just a typed wrapper over a
93/// directory path.
94#[derive(Debug, Clone)]
95pub struct SegmentDir {
96 root: PathBuf,
97}
98
99impl SegmentDir {
100 pub fn new(root: impl Into<PathBuf>) -> Self {
101 Self { root: root.into() }
102 }
103
104 pub fn root(&self) -> &Path {
105 &self.root
106 }
107
108 /// Best-effort portability boundary for directory-entry durability. Native
109 /// Unix targets fsync directories directly; platforms without that concept
110 /// (including wasm) intentionally degrade to a no-op.
111 pub fn sync_dir(&self) -> Result<(), WalError> {
112 sync_dir(&self.root)?;
113 Ok(())
114 }
115
116 /// Canonical path for the segment with id `id`.
117 pub fn path_for(&self, id: SegmentId) -> PathBuf {
118 self.root
119 .join(format!("{:0width$}.wal", id.0, width = SEGMENT_ID_WIDTH))
120 }
121
122 /// Parse a `<NNNNNNNNNN>.wal` path back into a [`SegmentId`].
123 /// Returns `None` if the file name does not match the canonical
124 /// pattern (e.g. a leftover `.tmp` or a non-numeric stem).
125 pub fn id_of(path: &Path) -> Option<SegmentId> {
126 path.extension()
127 .and_then(|s| s.to_str())
128 .filter(|ext| *ext == "wal")?;
129 path.file_stem()
130 .and_then(|s| s.to_str())
131 .and_then(|s| s.parse::<u64>().ok())
132 .map(SegmentId)
133 }
134
135 /// List every `*.wal` file in the directory in ascending id order.
136 /// Files whose names do not match the canonical pattern are ignored
137 /// so a stray `.tmp` does not block boot. Directory entry I/O errors
138 /// still abort the listing rather than risking an incomplete replay.
139 pub fn list(&self) -> Result<Vec<SegmentEntry>, WalError> {
140 let mut out = Vec::new();
141 for entry in fs::read_dir(&self.root)? {
142 let path = entry?.path();
143 if let Some(id) = Self::id_of(&path) {
144 out.push(SegmentEntry { id, path });
145 }
146 }
147 out.sort_by_key(|e| e.id);
148 Ok(out)
149 }
150
151 /// `base_lsn` recorded in `segment`'s header. Used by
152 /// `truncate_up_to` to compute the LSN range each sealed segment
153 /// covers without re-walking its records.
154 pub fn base_lsn(segment: &Path) -> Result<Lsn, WalError> {
155 // `SegmentReader::open` already validates the magic, format,
156 // and header CRC — no point re-implementing the layout here
157 // just to skip a few bytes.
158 let reader = SegmentReader::open(segment)?;
159 Ok(reader.header().base_lsn)
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn segment_id_path_round_trip() {
169 let dir = SegmentDir::new("/tmp");
170 let id = SegmentId::new(42);
171 let path = dir.path_for(id);
172 assert_eq!(path.to_str().unwrap(), "/tmp/0000000042.wal");
173 assert_eq!(SegmentDir::id_of(&path), Some(id));
174 }
175
176 #[test]
177 fn id_of_rejects_non_wal_files() {
178 assert_eq!(SegmentDir::id_of(Path::new("/tmp/0000000001.txt")), None);
179 assert_eq!(SegmentDir::id_of(Path::new("/tmp/notanumber.wal")), None);
180 assert_eq!(SegmentDir::id_of(Path::new("/tmp/CURRENT")), None);
181 }
182
183 #[test]
184 fn saturating_prev_does_not_underflow() {
185 assert_eq!(SegmentId::new(0).saturating_prev(), SegmentId::new(0));
186 assert_eq!(SegmentId::new(1).saturating_prev(), SegmentId::new(0));
187 assert_eq!(SegmentId::new(7).saturating_prev(), SegmentId::new(6));
188 }
189}