Skip to main content

nodedb_wal/
segmented.rs

1//! Segmented WAL: manages a directory of segment files with automatic rollover
2//! and truncation.
3//!
4//! This is the primary WAL interface for production use. It wraps `WalWriter`
5//! and `WalReader` to provide:
6//!
7//! - Automatic segment rollover when the active segment exceeds a target size.
8//! - Truncation of old segments after checkpoint confirmation.
9//! - Transparent multi-segment replay for crash recovery.
10//! - Legacy single-file WAL migration.
11//!
12//! ## Thread safety
13//!
14//! `SegmentedWal` is NOT `Send + Sync` by itself. The `WalManager` in the main
15//! crate wraps it in a `Mutex` for thread-safe access from the Control Plane.
16
17use std::fs;
18use std::path::{Path, PathBuf};
19
20use tracing::info;
21
22use crate::error::{Result, WalError};
23use crate::record::WalRecord;
24use crate::segment::{
25    DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
26    truncate_segments,
27};
28use crate::writer::{WalWriter, WalWriterConfig};
29
30/// Configuration for the segmented WAL.
31#[derive(Debug, Clone)]
32pub struct SegmentedWalConfig {
33    /// WAL directory path.
34    pub wal_dir: PathBuf,
35
36    /// Target segment size in bytes. When the active segment exceeds this,
37    /// a new segment is created. This is a soft limit — the current record
38    /// is always completed before rolling.
39    pub segment_target_size: u64,
40
41    /// Writer configuration (alignment, buffer size, O_DIRECT).
42    pub writer_config: WalWriterConfig,
43}
44
45impl SegmentedWalConfig {
46    /// Create a config with defaults for the given directory.
47    pub fn new(wal_dir: PathBuf) -> Self {
48        Self {
49            wal_dir,
50            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
51            writer_config: WalWriterConfig::default(),
52        }
53    }
54
55    /// Create a config for testing (no O_DIRECT).
56    pub fn for_testing(wal_dir: PathBuf) -> Self {
57        Self {
58            wal_dir,
59            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
60            writer_config: WalWriterConfig {
61                use_direct_io: false,
62                ..Default::default()
63            },
64        }
65    }
66}
67
68/// A segmented write-ahead log.
69///
70/// Manages a directory of WAL segment files. Records are appended to the active
71/// segment. When the active segment exceeds `segment_target_size`, a new segment
72/// is created automatically on the next append.
73pub struct SegmentedWal {
74    /// WAL directory.
75    wal_dir: PathBuf,
76
77    /// The currently active writer (appending to the latest segment).
78    writer: WalWriter,
79
80    /// First LSN of the active segment (used for truncation safety).
81    active_first_lsn: u64,
82
83    /// Target size for segment rollover.
84    segment_target_size: u64,
85
86    /// Writer config (for creating new segments).
87    writer_config: WalWriterConfig,
88
89    /// Optional encryption key ring.
90    encryption_ring: Option<crate::crypto::KeyRing>,
91}
92
93impl SegmentedWal {
94    /// Open or create a segmented WAL in the given directory.
95    ///
96    /// On first startup, creates the directory and the first segment.
97    /// On subsequent startups, discovers existing segments and opens the
98    /// last one for continued appending.
99    pub fn open(config: SegmentedWalConfig) -> Result<Self> {
100        fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
101
102        let segments = discover_segments(&config.wal_dir)?;
103
104        let (writer, active_first_lsn) = if segments.is_empty() {
105            // Fresh WAL — create the first segment starting at LSN 1.
106            let path = segment_path(&config.wal_dir, 1);
107            let writer = WalWriter::open(&path, config.writer_config.clone())?;
108            (writer, 1u64)
109        } else {
110            // Resume from the last segment.
111            let last = &segments[segments.len() - 1];
112            let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
113            (writer, last.first_lsn)
114        };
115
116        info!(
117            wal_dir = %config.wal_dir.display(),
118            segments = segments.len().max(1),
119            active_first_lsn,
120            next_lsn = writer.next_lsn(),
121            "segmented WAL opened"
122        );
123
124        Ok(Self {
125            wal_dir: config.wal_dir,
126            writer,
127            active_first_lsn,
128            segment_target_size: config.segment_target_size,
129            writer_config: config.writer_config,
130            encryption_ring: None,
131        })
132    }
133
134    /// Set the encryption key ring. All subsequent records will be encrypted.
135    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
136        self.writer.set_encryption_ring(ring.clone());
137        self.encryption_ring = Some(ring);
138    }
139
140    /// Get the encryption key ring (for replay decryption).
141    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
142        self.encryption_ring.as_ref()
143    }
144
145    /// Append a record to the WAL. Returns the assigned LSN.
146    ///
147    /// If the active segment has exceeded the target size, a new segment is
148    /// created before appending. The rollover is transparent to the caller.
149    pub fn append(
150        &mut self,
151        record_type: u16,
152        tenant_id: u32,
153        vshard_id: u16,
154        payload: &[u8],
155    ) -> Result<u64> {
156        // Check if we need to roll to a new segment.
157        if self.writer.file_offset() >= self.segment_target_size {
158            self.roll_segment()?;
159        }
160
161        self.writer
162            .append(record_type, tenant_id, vshard_id, payload)
163    }
164
165    /// Flush all buffered records and fsync the active segment.
166    pub fn sync(&mut self) -> Result<()> {
167        self.writer.sync()
168    }
169
170    /// The next LSN that will be assigned.
171    pub fn next_lsn(&self) -> u64 {
172        self.writer.next_lsn()
173    }
174
175    /// First LSN of the active (currently written) segment.
176    pub fn active_segment_first_lsn(&self) -> u64 {
177        self.active_first_lsn
178    }
179
180    /// The WAL directory path.
181    pub fn wal_dir(&self) -> &Path {
182        &self.wal_dir
183    }
184
185    /// Truncate old WAL segments that are fully below the checkpoint LSN.
186    ///
187    /// Only sealed (non-active) segments are eligible for deletion.
188    /// The active segment is never deleted.
189    ///
190    /// Returns the number of segments deleted and bytes reclaimed.
191    pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
192        truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
193    }
194
195    /// Replay all committed records across all segments, in LSN order.
196    ///
197    /// Used for crash recovery on startup.
198    pub fn replay(&self) -> Result<Vec<WalRecord>> {
199        replay_all_segments(&self.wal_dir)
200    }
201
202    /// Replay only records with LSN >= `from_lsn`.
203    ///
204    /// Used when recovering from a checkpoint — records before the checkpoint
205    /// LSN have already been applied from the snapshot.
206    pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
207        let all = self.replay()?;
208        Ok(all
209            .into_iter()
210            .filter(|r| r.header.lsn >= from_lsn)
211            .collect())
212    }
213
214    /// Paginated replay (delegates to standalone `replay_from_limit_dir`).
215    pub fn replay_from_limit(
216        &self,
217        from_lsn: u64,
218        max_records: usize,
219    ) -> Result<(Vec<WalRecord>, bool)> {
220        replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
221    }
222
223    /// List all segment metadata (for monitoring / operational tooling).
224    pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
225        discover_segments(&self.wal_dir)
226    }
227
228    /// Total WAL size on disk across all segments.
229    pub fn total_size_bytes(&self) -> Result<u64> {
230        let segments = discover_segments(&self.wal_dir)?;
231        Ok(segments.iter().map(|s| s.file_size).sum())
232    }
233
234    /// Roll to a new segment: seal the current writer and create a new one.
235    fn roll_segment(&mut self) -> Result<()> {
236        // Flush and seal the current segment.
237        self.writer.seal()?;
238
239        // The new segment starts at the next LSN.
240        let new_first_lsn = self.writer.next_lsn();
241        let new_path = segment_path(&self.wal_dir, new_first_lsn);
242
243        let mut new_writer =
244            WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
245
246        // Propagate encryption ring to the new writer.
247        if let Some(ref ring) = self.encryption_ring {
248            new_writer.set_encryption_ring(ring.clone());
249        }
250
251        self.writer = new_writer;
252        self.active_first_lsn = new_first_lsn;
253
254        // Fsync the WAL directory to ensure the new segment's directory
255        // entry is durable. Without this, a power loss could cause the
256        // new segment file to "disappear" on ext4/XFS.
257        let _ = crate::segment::fsync_directory(&self.wal_dir);
258
259        info!(
260            segment = %new_path.display(),
261            first_lsn = new_first_lsn,
262            "rolled to new WAL segment"
263        );
264
265        Ok(())
266    }
267}
268
269/// Replay all records from all segments in a WAL directory, in LSN order.
270///
271/// Segments are read in order of their first_lsn. Within each segment,
272/// records are read sequentially. This produces a globally ordered stream.
273pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
274    let segments = discover_segments(wal_dir)?;
275    let mut all_records = Vec::new();
276
277    for seg in &segments {
278        let reader = crate::reader::WalReader::open(&seg.path)?;
279        for record_result in reader.records() {
280            all_records.push(record_result?);
281        }
282    }
283
284    Ok(all_records)
285}
286
287/// Paginated replay from a WAL directory: reads at most `max_records` from `from_lsn`.
288///
289/// Uses sequential I/O (not mmap) and does NOT require the WAL mutex — safe
290/// to call concurrently with writes. Sealed segments are immutable; the active
291/// segment is read via buffered I/O which sees data after the writer's fsync.
292///
293/// Returns `(records, has_more)` where `has_more` is `true` if the limit was hit.
294pub fn replay_from_limit_dir(
295    wal_dir: &Path,
296    from_lsn: u64,
297    max_records: usize,
298) -> Result<(Vec<WalRecord>, bool)> {
299    let segments = discover_segments(wal_dir)?;
300    let mut records = Vec::with_capacity(max_records.min(4096));
301
302    for seg in &segments {
303        let reader = crate::reader::WalReader::open(&seg.path)?;
304        for record_result in reader.records() {
305            let record = record_result?;
306            if record.header.lsn >= from_lsn {
307                records.push(record);
308                if records.len() >= max_records {
309                    return Ok((records, true));
310                }
311            }
312        }
313    }
314
315    Ok((records, false))
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::record::RecordType;
322
323    fn test_config(dir: &Path) -> SegmentedWalConfig {
324        SegmentedWalConfig::for_testing(dir.to_path_buf())
325    }
326
327    #[test]
328    fn create_and_append() {
329        let dir = tempfile::tempdir().unwrap();
330        let wal_dir = dir.path().join("wal");
331
332        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
333        let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
334        let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"world").unwrap();
335        wal.sync().unwrap();
336
337        assert_eq!(lsn1, 1);
338        assert_eq!(lsn2, 2);
339        assert_eq!(wal.next_lsn(), 3);
340    }
341
342    #[test]
343    fn replay_after_close() {
344        let dir = tempfile::tempdir().unwrap();
345        let wal_dir = dir.path().join("wal");
346
347        // Write records.
348        {
349            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
350            wal.append(RecordType::Put as u16, 1, 0, b"first").unwrap();
351            wal.append(RecordType::Delete as u16, 2, 1, b"second")
352                .unwrap();
353            wal.append(RecordType::Put as u16, 1, 0, b"third").unwrap();
354            wal.sync().unwrap();
355        }
356
357        // Reopen and replay.
358        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
359        let records = wal.replay().unwrap();
360        assert_eq!(records.len(), 3);
361        assert_eq!(records[0].payload, b"first");
362        assert_eq!(records[1].payload, b"second");
363        assert_eq!(records[2].payload, b"third");
364        assert_eq!(wal.next_lsn(), 4);
365    }
366
367    #[test]
368    fn automatic_segment_rollover() {
369        let dir = tempfile::tempdir().unwrap();
370        let wal_dir = dir.path().join("wal");
371
372        // Use a tiny segment target to force rollover.
373        let config = SegmentedWalConfig {
374            wal_dir: wal_dir.clone(),
375            segment_target_size: 100, // 100 bytes — will roll after ~2 records.
376            writer_config: WalWriterConfig {
377                use_direct_io: false,
378                ..Default::default()
379            },
380        };
381
382        let mut wal = SegmentedWal::open(config).unwrap();
383
384        // Write enough records to trigger multiple rollovers.
385        for i in 0..20u32 {
386            let payload = format!("record-{i:04}");
387            wal.append(RecordType::Put as u16, 1, 0, payload.as_bytes())
388                .unwrap();
389            wal.sync().unwrap();
390        }
391
392        // Should have created multiple segments.
393        let segments = wal.list_segments().unwrap();
394        assert!(
395            segments.len() > 1,
396            "expected multiple segments, got {}",
397            segments.len()
398        );
399
400        // Replay should return all 20 records in order.
401        let records = wal.replay().unwrap();
402        assert_eq!(records.len(), 20);
403        for (i, record) in records.iter().enumerate() {
404            assert_eq!(record.header.lsn, (i + 1) as u64);
405            let expected = format!("record-{i:04}");
406            assert_eq!(record.payload, expected.as_bytes());
407        }
408    }
409
410    #[test]
411    fn truncation_removes_old_segments() {
412        let dir = tempfile::tempdir().unwrap();
413        let wal_dir = dir.path().join("wal");
414
415        let config = SegmentedWalConfig {
416            wal_dir: wal_dir.clone(),
417            segment_target_size: 100,
418            writer_config: WalWriterConfig {
419                use_direct_io: false,
420                ..Default::default()
421            },
422        };
423
424        let mut wal = SegmentedWal::open(config).unwrap();
425
426        // Write records to create multiple segments.
427        for i in 0..20u32 {
428            let payload = format!("record-{i:04}");
429            wal.append(RecordType::Put as u16, 1, 0, payload.as_bytes())
430                .unwrap();
431            wal.sync().unwrap();
432        }
433
434        let segments_before = wal.list_segments().unwrap();
435        assert!(segments_before.len() > 1);
436
437        // Truncate with a checkpoint at LSN 15.
438        let result = wal.truncate_before(15).unwrap();
439        assert!(result.segments_deleted > 0);
440        assert!(result.bytes_reclaimed > 0);
441
442        let segments_after = wal.list_segments().unwrap();
443        assert!(segments_after.len() < segments_before.len());
444
445        // Remaining records should still be replayable.
446        let records = wal.replay().unwrap();
447        // At minimum, records from LSN 15+ should be present.
448        assert!(records.iter().any(|r| r.header.lsn >= 15));
449    }
450
451    #[test]
452    fn replay_from_checkpoint_lsn() {
453        let dir = tempfile::tempdir().unwrap();
454        let wal_dir = dir.path().join("wal");
455
456        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
457        for i in 0..10u32 {
458            wal.append(RecordType::Put as u16, 1, 0, format!("r{i}").as_bytes())
459                .unwrap();
460        }
461        wal.sync().unwrap();
462
463        // Replay from LSN 6 — should return LSNs 6..=10.
464        let records = wal.replay_from(6).unwrap();
465        assert_eq!(records.len(), 5);
466        assert_eq!(records[0].header.lsn, 6);
467        assert_eq!(records[4].header.lsn, 10);
468    }
469
470    #[test]
471    fn total_size_bytes() {
472        let dir = tempfile::tempdir().unwrap();
473        let wal_dir = dir.path().join("wal");
474
475        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
476        wal.append(RecordType::Put as u16, 1, 0, b"data").unwrap();
477        wal.sync().unwrap();
478
479        let size = wal.total_size_bytes().unwrap();
480        assert!(size > 0);
481    }
482
483    #[test]
484    fn reopen_continues_lsn() {
485        let dir = tempfile::tempdir().unwrap();
486        let wal_dir = dir.path().join("wal");
487
488        {
489            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
490            wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
491            wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
492            wal.sync().unwrap();
493        }
494
495        {
496            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
497            assert_eq!(wal.next_lsn(), 3);
498            let lsn = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
499            assert_eq!(lsn, 3);
500            wal.sync().unwrap();
501        }
502
503        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
504        let records = wal.replay().unwrap();
505        assert_eq!(records.len(), 3);
506    }
507
508    #[test]
509    fn encryption_persists_across_segments() {
510        let dir = tempfile::tempdir().unwrap();
511        let wal_dir = dir.path().join("wal");
512
513        let key = crate::crypto::WalEncryptionKey::from_bytes(&[42u8; 32]);
514        let ring = crate::crypto::KeyRing::new(key);
515
516        let config = SegmentedWalConfig {
517            wal_dir: wal_dir.clone(),
518            segment_target_size: 100, // Tiny to force rollover.
519            writer_config: WalWriterConfig {
520                use_direct_io: false,
521                ..Default::default()
522            },
523        };
524
525        let mut wal = SegmentedWal::open(config).unwrap();
526        wal.set_encryption_ring(ring);
527
528        // Write enough to trigger rollover.
529        for i in 0..10u32 {
530            wal.append(RecordType::Put as u16, 1, 0, format!("enc-{i}").as_bytes())
531                .unwrap();
532            wal.sync().unwrap();
533        }
534
535        // Verify multiple segments were created.
536        assert!(wal.list_segments().unwrap().len() > 1);
537
538        // Replay should work (reader handles encrypted records via DWB/checksum).
539        // Note: encrypted records are readable because the reader doesn't decrypt —
540        // decryption happens at the consumer level via decrypt_payload().
541        let records = wal.replay().unwrap();
542        assert_eq!(records.len(), 10);
543        // All records should be marked as encrypted.
544        assert!(records.iter().all(|r| r.is_encrypted()));
545    }
546
547    #[test]
548    fn replay_from_limit_basic() {
549        let dir = tempfile::tempdir().unwrap();
550        let config = test_config(dir.path());
551        let mut wal = SegmentedWal::open(config).unwrap();
552
553        // Append 10 records.
554        for i in 0..10u8 {
555            wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
556        }
557        wal.sync().unwrap();
558
559        // Read all with limit > count.
560        let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
561        assert_eq!(records.len(), 10);
562        assert!(!has_more);
563
564        // Read with limit = 3.
565        let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
566        assert_eq!(records.len(), 3);
567        assert!(has_more);
568        assert_eq!(records[0].header.lsn, 1);
569        assert_eq!(records[2].header.lsn, 3);
570    }
571
572    #[test]
573    fn replay_from_limit_with_lsn_filter() {
574        let dir = tempfile::tempdir().unwrap();
575        let config = test_config(dir.path());
576        let mut wal = SegmentedWal::open(config).unwrap();
577
578        for i in 0..10u8 {
579            wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
580        }
581        wal.sync().unwrap();
582
583        // Start from LSN 6 with limit 100 — should get 5 records (LSNs 6-10).
584        let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
585        assert_eq!(records.len(), 5);
586        assert!(!has_more);
587        assert_eq!(records[0].header.lsn, 6);
588
589        // Start from LSN 6 with limit 2 — should get 2 records.
590        let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
591        assert_eq!(records.len(), 2);
592        assert!(has_more);
593    }
594
595    #[test]
596    fn replay_from_limit_empty() {
597        let dir = tempfile::tempdir().unwrap();
598        let config = test_config(dir.path());
599        let mut wal = SegmentedWal::open(config).unwrap();
600
601        wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
602        wal.sync().unwrap();
603
604        // Start from beyond all records.
605        let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
606        assert!(records.is_empty());
607        assert!(!has_more);
608    }
609
610    #[test]
611    fn replay_from_limit_across_segments() {
612        let dir = tempfile::tempdir().unwrap();
613        let config = test_config(dir.path());
614        let mut wal = SegmentedWal::open(config).unwrap();
615
616        // Write 10 records to first segment.
617        for i in 0..10u8 {
618            wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
619        }
620        wal.sync().unwrap();
621        // Force a segment rollover.
622        wal.roll_segment().unwrap();
623
624        // Write 10 more records to second segment.
625        for i in 10..20u8 {
626            wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
627        }
628        wal.sync().unwrap();
629
630        let seg_count = wal.list_segments().unwrap().len();
631        assert!(
632            seg_count >= 2,
633            "expected multiple segments, got {seg_count}"
634        );
635
636        // Paginated read should span segments correctly.
637        let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
638        assert_eq!(records.len(), 5);
639        assert!(has_more);
640
641        // Continue from where we left off.
642        let next_lsn = records.last().unwrap().header.lsn + 1;
643        let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
644        assert_eq!(records2.len(), 15); // 20 - 5 = 15 remaining
645    }
646}