Skip to main content

nodedb_wal/
segment.rs

1//! WAL segment management.
2//!
3//! The WAL is split into fixed-size segment files for efficient truncation.
4//! Each segment is a standalone WAL file containing records within an LSN range.
5//!
6//! ## Naming convention
7//!
8//! Segments are named `wal-{first_lsn:020}.seg` — zero-padded for lexicographic
9//! ordering. This guarantees `ls` and `readdir` return segments in LSN order.
10//!
11//! ## Lifecycle
12//!
13//! 1. Writer creates a new segment when the current segment exceeds `target_size`.
14//! 2. The active segment is the one being appended to.
15//! 3. `truncate_before(lsn)` deletes all sealed segments whose `max_lsn < lsn`.
16//! 4. The active segment is NEVER deleted — only sealed (closed) segments are eligible.
17//!
18//! ## Recovery
19//!
20//! On startup, all segment files in the WAL directory are discovered via `readdir`,
21//! sorted by first_lsn, and replayed in order. The last segment is the active one.
22
23use std::fs;
24use std::path::{Path, PathBuf};
25
26use crate::error::{Result, WalError};
27
28/// Fsync a directory to ensure file creation/deletion metadata is durable.
29///
30/// On ext4/XFS, creating or deleting a file writes the file data to disk
31/// but the directory entry may only be in the page cache. A power loss
32/// before the directory entry is persisted causes the file to "disappear"
33/// on reboot. Calling fsync on the directory fd ensures the metadata
34/// (filename, inode pointer) is on stable storage.
35pub fn fsync_directory(dir: &Path) -> Result<()> {
36    let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
37    dir_file.sync_all().map_err(WalError::Io)?;
38    Ok(())
39}
40
41/// Default segment target size: 64 MiB.
42///
43/// This is a soft limit — the writer finishes the current record before rolling.
44/// Actual segments may be slightly larger than this value.
45pub const DEFAULT_SEGMENT_TARGET_SIZE: u64 = 64 * 1024 * 1024;
46
47/// Segment file extension.
48const SEGMENT_EXTENSION: &str = "seg";
49
50/// Segment file prefix.
51const SEGMENT_PREFIX: &str = "wal-";
52
53/// Metadata about a WAL segment file on disk.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct SegmentMeta {
56    /// Path to the segment file on disk.
57    pub path: PathBuf,
58
59    /// First LSN stored in this segment (from the filename).
60    pub first_lsn: u64,
61
62    /// File size in bytes.
63    pub file_size: u64,
64}
65
66impl SegmentMeta {
67    /// Path to the companion double-write buffer file.
68    pub fn dwb_path(&self) -> PathBuf {
69        self.path.with_extension("dwb")
70    }
71}
72
73impl Ord for SegmentMeta {
74    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
75        self.first_lsn.cmp(&other.first_lsn)
76    }
77}
78
79impl PartialOrd for SegmentMeta {
80    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
81        Some(self.cmp(other))
82    }
83}
84
85/// Build a segment filename from a starting LSN.
86pub fn segment_filename(first_lsn: u64) -> String {
87    format!("{SEGMENT_PREFIX}{first_lsn:020}.{SEGMENT_EXTENSION}")
88}
89
90/// Build a full segment path within a WAL directory.
91pub fn segment_path(wal_dir: &Path, first_lsn: u64) -> PathBuf {
92    wal_dir.join(segment_filename(first_lsn))
93}
94
95/// Parse the first_lsn from a segment filename.
96///
97/// Returns `None` if the filename doesn't match the expected pattern.
98fn parse_segment_filename(filename: &str) -> Option<u64> {
99    let stem = filename.strip_prefix(SEGMENT_PREFIX)?;
100    let lsn_str = stem.strip_suffix(&format!(".{SEGMENT_EXTENSION}"))?;
101    lsn_str.parse::<u64>().ok()
102}
103
104/// Discover all WAL segments in a directory, sorted by first_lsn.
105///
106/// Ignores non-segment files (DWB files, other metadata).
107pub fn discover_segments(wal_dir: &Path) -> Result<Vec<SegmentMeta>> {
108    if !wal_dir.exists() {
109        return Ok(Vec::new());
110    }
111
112    let entries = fs::read_dir(wal_dir).map_err(WalError::Io)?;
113    let mut segments = Vec::new();
114
115    for entry in entries {
116        let entry = entry.map_err(WalError::Io)?;
117        let file_name = entry.file_name();
118        let name = file_name.to_string_lossy();
119
120        if let Some(first_lsn) = parse_segment_filename(&name) {
121            let metadata = entry.metadata().map_err(WalError::Io)?;
122            segments.push(SegmentMeta {
123                path: entry.path(),
124                first_lsn,
125                file_size: metadata.len(),
126            });
127        }
128    }
129
130    segments.sort();
131    Ok(segments)
132}
133
134/// Migrate a legacy single-file WAL to the segmented format.
135///
136/// If a file named `wal_path` (e.g., `data/wal`) exists and is not a directory,
137/// it is renamed to `{wal_dir}/wal-00000000000000000001.seg`. The WAL directory
138/// is created if needed.
139///
140/// This is a one-time migration that runs transparently on startup. After
141/// migration, the old path no longer exists as a file.
142///
143/// Returns `true` if migration occurred, `false` if no legacy file found.
144pub fn migrate_legacy_wal(legacy_path: &Path, wal_dir: &Path) -> Result<bool> {
145    // Only migrate if legacy_path is a file (not a directory or missing).
146    if !legacy_path.is_file() {
147        return Ok(false);
148    }
149
150    // Don't migrate if it's zero-length (empty WAL).
151    let metadata = fs::metadata(legacy_path).map_err(WalError::Io)?;
152    if metadata.len() == 0 {
153        // Remove the empty file and let the new system create a fresh directory.
154        let _ = fs::remove_file(legacy_path);
155        return Ok(false);
156    }
157
158    // Scan the legacy WAL to find its first LSN.
159    let info = crate::recovery::recover(legacy_path)?;
160    let first_lsn = if info.record_count == 0 {
161        1 // Empty but valid — start at 1.
162    } else {
163        // Scan to find the actual first LSN (recovery only gives us last_lsn).
164        // Read the first record to get its LSN.
165        let mut reader = crate::reader::WalReader::open(legacy_path)?;
166        match reader.next_record()? {
167            Some(record) => record.header.lsn,
168            None => 1,
169        }
170    };
171
172    // Create the WAL directory.
173    fs::create_dir_all(wal_dir).map_err(WalError::Io)?;
174
175    // Move the legacy file to the segmented location.
176    let new_path = segment_path(wal_dir, first_lsn);
177    fs::rename(legacy_path, &new_path).map_err(WalError::Io)?;
178
179    // Move the companion DWB file if it exists.
180    let legacy_dwb = legacy_path.with_extension("dwb");
181    if legacy_dwb.exists() {
182        let new_dwb = new_path.with_extension("dwb");
183        fs::rename(&legacy_dwb, &new_dwb).map_err(WalError::Io)?;
184    }
185
186    tracing::info!(
187        legacy = %legacy_path.display(),
188        segment = %new_path.display(),
189        first_lsn,
190        "migrated legacy WAL to segmented format"
191    );
192
193    Ok(true)
194}
195
196/// Delete all sealed segments whose maximum LSN is strictly less than `checkpoint_lsn`.
197///
198/// The `active_segment_path` is the segment currently being written to — it is
199/// NEVER deleted, even if all its records are below the checkpoint LSN.
200///
201/// Returns the number of segments deleted and total bytes reclaimed.
202pub fn truncate_segments(
203    wal_dir: &Path,
204    checkpoint_lsn: u64,
205    active_segment_first_lsn: u64,
206) -> Result<TruncateResult> {
207    let segments = discover_segments(wal_dir)?;
208    let mut deleted_count = 0u64;
209    let mut bytes_reclaimed = 0u64;
210
211    for seg in &segments {
212        // Never delete the active segment.
213        if seg.first_lsn == active_segment_first_lsn {
214            continue;
215        }
216
217        // A sealed segment's records are all below checkpoint_lsn if the NEXT
218        // segment's first_lsn <= checkpoint_lsn. We find the next segment by
219        // looking at segments with higher first_lsn.
220        //
221        // However, we need to know the max_lsn of each segment. Since segments
222        // are sequential, a segment's max_lsn < next_segment.first_lsn.
223        // Therefore: if next_segment.first_lsn <= checkpoint_lsn, then this
224        // segment's max_lsn < checkpoint_lsn, so it's safe to delete.
225        //
226        // For the last sealed segment (before active), we use the active
227        // segment's first_lsn as the upper bound.
228        let next_first_lsn = segments
229            .iter()
230            .find(|s| s.first_lsn > seg.first_lsn)
231            .map(|s| s.first_lsn)
232            .unwrap_or(u64::MAX);
233
234        // This segment's max_lsn < next_first_lsn.
235        // Safe to delete if next_first_lsn <= checkpoint_lsn.
236        if next_first_lsn <= checkpoint_lsn {
237            bytes_reclaimed += seg.file_size;
238
239            // Delete the segment file.
240            fs::remove_file(&seg.path).map_err(WalError::Io)?;
241
242            // Delete the companion DWB file if it exists.
243            let dwb_path = seg.dwb_path();
244            if dwb_path.exists() {
245                let _ = fs::remove_file(&dwb_path);
246            }
247
248            tracing::info!(
249                segment = %seg.path.display(),
250                first_lsn = seg.first_lsn,
251                "deleted WAL segment (checkpoint_lsn={})",
252                checkpoint_lsn
253            );
254
255            deleted_count += 1;
256        }
257    }
258
259    // Fsync the directory to ensure deletions are durable.
260    if deleted_count > 0 {
261        let _ = fsync_directory(wal_dir);
262    }
263
264    Ok(TruncateResult {
265        segments_deleted: deleted_count,
266        bytes_reclaimed,
267    })
268}
269
270/// Result of a WAL truncation operation.
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub struct TruncateResult {
273    /// Number of segment files deleted.
274    pub segments_deleted: u64,
275
276    /// Total bytes reclaimed from disk.
277    pub bytes_reclaimed: u64,
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn segment_filename_format() {
286        assert_eq!(segment_filename(1), "wal-00000000000000000001.seg");
287        assert_eq!(segment_filename(999), "wal-00000000000000000999.seg");
288        assert_eq!(segment_filename(u64::MAX), "wal-18446744073709551615.seg");
289    }
290
291    #[test]
292    fn parse_segment_filename_valid() {
293        assert_eq!(
294            parse_segment_filename("wal-00000000000000000001.seg"),
295            Some(1)
296        );
297        assert_eq!(
298            parse_segment_filename("wal-00000000000000000999.seg"),
299            Some(999)
300        );
301    }
302
303    #[test]
304    fn parse_segment_filename_invalid() {
305        assert_eq!(parse_segment_filename("wal.log"), None);
306        assert_eq!(parse_segment_filename("wal-abc.seg"), None);
307        assert_eq!(parse_segment_filename("other-00001.seg"), None);
308        assert_eq!(parse_segment_filename("wal-00001.dwb"), None);
309    }
310
311    #[test]
312    fn discover_empty_dir() {
313        let dir = tempfile::tempdir().unwrap();
314        let segments = discover_segments(dir.path()).unwrap();
315        assert!(segments.is_empty());
316    }
317
318    #[test]
319    fn discover_nonexistent_dir() {
320        let segments = discover_segments(Path::new("/nonexistent/wal/dir")).unwrap();
321        assert!(segments.is_empty());
322    }
323
324    #[test]
325    fn discover_segments_sorted() {
326        let dir = tempfile::tempdir().unwrap();
327
328        // Create segment files out of order.
329        fs::write(dir.path().join("wal-00000000000000000050.seg"), b"seg3").unwrap();
330        fs::write(dir.path().join("wal-00000000000000000001.seg"), b"seg1").unwrap();
331        fs::write(dir.path().join("wal-00000000000000000025.seg"), b"seg2").unwrap();
332        // Non-segment file should be ignored.
333        fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb").unwrap();
334        fs::write(dir.path().join("metadata.json"), b"{}").unwrap();
335
336        let segments = discover_segments(dir.path()).unwrap();
337        assert_eq!(segments.len(), 3);
338        assert_eq!(segments[0].first_lsn, 1);
339        assert_eq!(segments[1].first_lsn, 25);
340        assert_eq!(segments[2].first_lsn, 50);
341    }
342
343    #[test]
344    fn truncate_deletes_old_segments() {
345        let dir = tempfile::tempdir().unwrap();
346
347        // Create 3 segment files.
348        fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data1").unwrap();
349        fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb1").unwrap();
350        fs::write(dir.path().join("wal-00000000000000000050.seg"), b"data2").unwrap();
351        fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data3").unwrap();
352
353        // Truncate: checkpoint_lsn=100, active segment starts at 100.
354        // Segment 1 (max_lsn < 50) and segment 50 (max_lsn < 100) should be deleted.
355        let result = truncate_segments(dir.path(), 100, 100).unwrap();
356        assert_eq!(result.segments_deleted, 2);
357
358        // Only the active segment should remain.
359        let remaining = discover_segments(dir.path()).unwrap();
360        assert_eq!(remaining.len(), 1);
361        assert_eq!(remaining[0].first_lsn, 100);
362
363        // DWB for segment 1 should also be deleted.
364        assert!(!dir.path().join("wal-00000000000000000001.dwb").exists());
365    }
366
367    #[test]
368    fn truncate_never_deletes_active_segment() {
369        let dir = tempfile::tempdir().unwrap();
370
371        // Single active segment.
372        fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data").unwrap();
373
374        let result = truncate_segments(dir.path(), 999, 1).unwrap();
375        assert_eq!(result.segments_deleted, 0);
376
377        let remaining = discover_segments(dir.path()).unwrap();
378        assert_eq!(remaining.len(), 1);
379    }
380
381    #[test]
382    fn truncate_no_segments_below_checkpoint() {
383        let dir = tempfile::tempdir().unwrap();
384
385        fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data").unwrap();
386        fs::write(dir.path().join("wal-00000000000000000200.seg"), b"data").unwrap();
387
388        // Checkpoint at 50 — neither segment should be deleted because
389        // segment 100's next is 200, and 200 > 50.
390        let result = truncate_segments(dir.path(), 50, 200).unwrap();
391        assert_eq!(result.segments_deleted, 0);
392    }
393
394    #[test]
395    fn migrate_legacy_wal() {
396        let dir = tempfile::tempdir().unwrap();
397        let legacy_path = dir.path().join("test.wal");
398        let wal_dir = dir.path().join("wal_segments");
399
400        // Write a legacy WAL with some records.
401        {
402            let mut writer =
403                crate::writer::WalWriter::open_without_direct_io(&legacy_path).unwrap();
404            writer
405                .append(crate::record::RecordType::Put as u16, 1, 0, b"hello")
406                .unwrap();
407            writer
408                .append(crate::record::RecordType::Put as u16, 1, 0, b"world")
409                .unwrap();
410            writer.sync().unwrap();
411        }
412
413        // Migrate.
414        let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
415        assert!(migrated);
416
417        // Legacy file should be gone.
418        assert!(!legacy_path.exists());
419
420        // Segment should exist in the new directory.
421        let segments = discover_segments(&wal_dir).unwrap();
422        assert_eq!(segments.len(), 1);
423        assert_eq!(segments[0].first_lsn, 1);
424
425        // Verify records are still readable.
426        let reader = crate::reader::WalReader::open(&segments[0].path).unwrap();
427        let records: Vec<_> = reader.records().collect::<crate::Result<_>>().unwrap();
428        assert_eq!(records.len(), 2);
429        assert_eq!(records[0].payload, b"hello");
430    }
431
432    #[test]
433    fn migrate_nonexistent_legacy_is_noop() {
434        let dir = tempfile::tempdir().unwrap();
435        let legacy_path = dir.path().join("nonexistent.wal");
436        let wal_dir = dir.path().join("wal_segments");
437
438        let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
439        assert!(!migrated);
440    }
441
442    #[test]
443    fn migrate_empty_legacy_is_noop() {
444        let dir = tempfile::tempdir().unwrap();
445        let legacy_path = dir.path().join("empty.wal");
446        let wal_dir = dir.path().join("wal_segments");
447
448        // Create empty file.
449        fs::write(&legacy_path, b"").unwrap();
450
451        let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
452        assert!(!migrated);
453        assert!(!legacy_path.exists()); // Empty file cleaned up.
454    }
455}