lora_wal/dir.rs
1//! Segment-directory helpers.
2//!
3//! `Wal::open`, `replay_dir`, and `Wal::truncate_up_to` all need to:
4//!
5//! - turn a [`SegmentId`] into the canonical `<NNNNNNNNNN>.wal` path,
6//! - parse a path back into a [`SegmentId`],
7//! - list every well-formed segment file in a directory in ascending
8//! id order,
9//! - read just the `base_lsn` of a segment without paying for a full
10//! record walk.
11//!
12//! The same operations were inlined in two places before the refactor.
13//! Pulling them behind a single `SegmentDir` and a `SegmentId` newtype
14//! removes the duplication and makes the magic number "10 zero-padded
15//! digits" live in exactly one location.
16//!
17//! `SegmentDir` does not hold an open `DirHandle`. Every call hits the
18//! filesystem fresh — segment listings happen at open time and at
19//! truncate time, neither of which is in any hot path, so caching is
20//! not worth the invalidation work.
21
22use std::fmt;
23use std::fs;
24#[cfg(unix)]
25use std::fs::File;
26use std::path::{Path, PathBuf};
27
28use crate::error::WalError;
29use crate::lsn::Lsn;
30use crate::segment::SegmentReader;
31
32/// Width of the zero-padded segment id in file names. 10 digits is
33/// enough for ~10 billion segments, which at the default 8 MiB target
34/// is ~80 EiB of log. Plenty.
35const SEGMENT_ID_WIDTH: usize = 10;
36
37/// Monotonic identifier for a WAL segment file.
38///
39/// Allocation policy: ids start at 1 (`SegmentId(0)` is reserved as a
40/// "no segment" sentinel that callers should never encounter for a
41/// live WAL), and rotation simply does `id + 1`. Ids are stable: a
42/// truncated segment retains its id even after every preceding segment
43/// has been deleted.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
45#[repr(transparent)]
46pub struct SegmentId(u64);
47
48impl SegmentId {
49 pub const FIRST: SegmentId = SegmentId(1);
50
51 pub const fn new(value: u64) -> Self {
52 Self(value)
53 }
54
55 pub const fn raw(self) -> u64 {
56 self.0
57 }
58
59 pub fn next(self) -> Self {
60 Self(self.0 + 1)
61 }
62
63 /// Predecessor id, saturating at zero. Used for the "active and
64 /// the segment immediately preceding it" tombstone-retention rule
65 /// in [`crate::wal::Wal::truncate_up_to`].
66 pub fn saturating_prev(self) -> Self {
67 Self(self.0.saturating_sub(1))
68 }
69}
70
71impl fmt::Display for SegmentId {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{}", self.0)
74 }
75}
76
77/// A `(SegmentId, PathBuf)` pair. Returned by [`SegmentDir::list`] so
78/// callers can iterate once and not have to parse the id back out of
79/// the path themselves.
80#[derive(Debug, Clone)]
81pub struct SegmentEntry {
82 pub id: SegmentId,
83 pub path: PathBuf,
84}
85
86/// Owns the canonical naming scheme for a WAL directory and the
87/// operations that depend on it. Cheap to construct (`Clone` is a
88/// `PathBuf` clone) — the type is just a typed wrapper over a
89/// directory path.
90#[derive(Debug, Clone)]
91pub struct SegmentDir {
92 root: PathBuf,
93}
94
95impl SegmentDir {
96 pub fn new(root: impl Into<PathBuf>) -> Self {
97 Self { root: root.into() }
98 }
99
100 pub fn root(&self) -> &Path {
101 &self.root
102 }
103
104 /// Best-effort portability boundary for directory-entry durability.
105 /// Unix targets can fsync directories directly; other targets keep
106 /// the existing file-level guarantees until a platform-specific
107 /// directory sync implementation is added.
108 #[cfg(unix)]
109 pub fn sync_dir(&self) -> Result<(), WalError> {
110 File::open(&self.root)?.sync_all()?;
111 Ok(())
112 }
113
114 #[cfg(not(unix))]
115 pub fn sync_dir(&self) -> Result<(), WalError> {
116 Ok(())
117 }
118
119 /// Canonical path for the segment with id `id`.
120 pub fn path_for(&self, id: SegmentId) -> PathBuf {
121 self.root
122 .join(format!("{:0width$}.wal", id.0, width = SEGMENT_ID_WIDTH))
123 }
124
125 /// Parse a `<NNNNNNNNNN>.wal` path back into a [`SegmentId`].
126 /// Returns `None` if the file name does not match the canonical
127 /// pattern (e.g. a leftover `.tmp` or a non-numeric stem).
128 pub fn id_of(path: &Path) -> Option<SegmentId> {
129 path.extension()
130 .and_then(|s| s.to_str())
131 .filter(|ext| *ext == "wal")?;
132 path.file_stem()
133 .and_then(|s| s.to_str())
134 .and_then(|s| s.parse::<u64>().ok())
135 .map(SegmentId)
136 }
137
138 /// List every `*.wal` file in the directory in ascending id order.
139 /// Files whose names do not match the canonical pattern are
140 /// silently dropped — that mirrors the pre-refactor behaviour and
141 /// matches the operator's expectation that a stray `.tmp` is
142 /// ignored on the next boot.
143 pub fn list(&self) -> Result<Vec<SegmentEntry>, WalError> {
144 let mut out: Vec<SegmentEntry> = fs::read_dir(&self.root)?
145 .filter_map(|e| e.ok())
146 .filter_map(|e| {
147 let path = e.path();
148 let id = Self::id_of(&path)?;
149 Some(SegmentEntry { id, path })
150 })
151 .collect();
152 out.sort_by_key(|e| e.id);
153 Ok(out)
154 }
155
156 /// `base_lsn` recorded in `segment`'s header. Used by
157 /// `truncate_up_to` to compute the LSN range each sealed segment
158 /// covers without re-walking its records.
159 pub fn base_lsn(segment: &Path) -> Result<Lsn, WalError> {
160 // `SegmentReader::open` already validates the magic, format,
161 // and header CRC — no point re-implementing the layout here
162 // just to skip a few bytes.
163 let reader = SegmentReader::open(segment)?;
164 Ok(reader.header().base_lsn)
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn segment_id_path_round_trip() {
174 let dir = SegmentDir::new("/tmp");
175 let id = SegmentId::new(42);
176 let path = dir.path_for(id);
177 assert_eq!(path.to_str().unwrap(), "/tmp/0000000042.wal");
178 assert_eq!(SegmentDir::id_of(&path), Some(id));
179 }
180
181 #[test]
182 fn id_of_rejects_non_wal_files() {
183 assert_eq!(SegmentDir::id_of(Path::new("/tmp/0000000001.txt")), None);
184 assert_eq!(SegmentDir::id_of(Path::new("/tmp/notanumber.wal")), None);
185 assert_eq!(SegmentDir::id_of(Path::new("/tmp/CURRENT")), None);
186 }
187
188 #[test]
189 fn saturating_prev_does_not_underflow() {
190 assert_eq!(SegmentId::new(0).saturating_prev(), SegmentId::new(0));
191 assert_eq!(SegmentId::new(1).saturating_prev(), SegmentId::new(0));
192 assert_eq!(SegmentId::new(7).saturating_prev(), SegmentId::new(6));
193 }
194}