Skip to main content

nodedb_wal/
segmented.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Segmented WAL: manages a directory of segment files with automatic rollover
4//! and truncation.
5//!
6//! This is the primary WAL interface for production use. It wraps `WalWriter`
7//! and `WalReader` to provide:
8//!
9//! - Automatic segment rollover when the active segment exceeds a target size.
10//! - Truncation of old segments after checkpoint confirmation.
11//! - Transparent multi-segment replay for crash recovery.
12//! - Legacy single-file WAL migration.
13//!
14//! ## Thread safety
15//!
16//! `SegmentedWal` is NOT `Send + Sync` by itself. The `WalManager` in the main
17//! crate wraps it in a `Mutex` for thread-safe access from the Control Plane.
18
19use std::fs;
20use std::path::{Path, PathBuf};
21
22use tracing::info;
23
24use crate::error::{Result, WalError};
25use crate::record::WalRecord;
26use crate::segment::{
27    DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
28    truncate_segments,
29};
30use crate::writer::{WalWriter, WalWriterConfig};
31
32/// Configuration for the segmented WAL.
33#[derive(Debug, Clone)]
34pub struct SegmentedWalConfig {
35    /// WAL directory path.
36    pub wal_dir: PathBuf,
37
38    /// Target segment size in bytes. When the active segment exceeds this,
39    /// a new segment is created. This is a soft limit — the current record
40    /// is always completed before rolling.
41    pub segment_target_size: u64,
42
43    /// Writer configuration (alignment, buffer size, O_DIRECT).
44    pub writer_config: WalWriterConfig,
45}
46
47impl SegmentedWalConfig {
48    /// Create a config with defaults for the given directory.
49    pub fn new(wal_dir: PathBuf) -> Self {
50        Self {
51            wal_dir,
52            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
53            writer_config: WalWriterConfig::default(),
54        }
55    }
56
57    /// Create a config for testing (no O_DIRECT).
58    pub fn for_testing(wal_dir: PathBuf) -> Self {
59        Self {
60            wal_dir,
61            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
62            writer_config: WalWriterConfig {
63                use_direct_io: false,
64                ..Default::default()
65            },
66        }
67    }
68}
69
70/// A segmented write-ahead log.
71///
72/// Manages a directory of WAL segment files. Records are appended to the active
73/// segment. When the active segment exceeds `segment_target_size`, a new segment
74/// is created automatically on the next append.
75pub struct SegmentedWal {
76    /// WAL directory.
77    wal_dir: PathBuf,
78
79    /// The currently active writer (appending to the latest segment).
80    writer: WalWriter,
81
82    /// First LSN of the active segment (used for truncation safety).
83    active_first_lsn: u64,
84
85    /// Target size for segment rollover.
86    segment_target_size: u64,
87
88    /// Writer config (for creating new segments).
89    writer_config: WalWriterConfig,
90
91    /// Optional encryption key ring.
92    encryption_ring: Option<crate::crypto::KeyRing>,
93}
94
95impl SegmentedWal {
96    /// Open or create a segmented WAL in the given directory.
97    ///
98    /// On first startup, creates the directory and the first segment.
99    /// On subsequent startups, discovers existing segments and opens the
100    /// last one for continued appending.
101    pub fn open(config: SegmentedWalConfig) -> Result<Self> {
102        fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
103
104        let segments = discover_segments(&config.wal_dir)?;
105
106        let (writer, active_first_lsn) = if segments.is_empty() {
107            // Fresh WAL — create the first segment starting at LSN 1.
108            let path = segment_path(&config.wal_dir, 1);
109            let writer = WalWriter::open(&path, config.writer_config.clone())?;
110            (writer, 1u64)
111        } else {
112            // Resume from the last segment.
113            let last = &segments[segments.len() - 1];
114            let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
115            (writer, last.first_lsn)
116        };
117
118        info!(
119            wal_dir = %config.wal_dir.display(),
120            segments = segments.len().max(1),
121            active_first_lsn,
122            next_lsn = writer.next_lsn(),
123            "segmented WAL opened"
124        );
125
126        Ok(Self {
127            wal_dir: config.wal_dir,
128            writer,
129            active_first_lsn,
130            segment_target_size: config.segment_target_size,
131            writer_config: config.writer_config,
132            encryption_ring: None,
133        })
134    }
135
136    /// Set the encryption key ring. All subsequent records will be encrypted.
137    ///
138    /// Must be called before the first `append`. Returns an error if records
139    /// have already been written to the active segment.
140    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
141        self.writer.set_encryption_ring(ring.clone())?;
142        self.encryption_ring = Some(ring);
143        Ok(())
144    }
145
146    /// Get the encryption key ring (for replay decryption).
147    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
148        self.encryption_ring.as_ref()
149    }
150
151    /// Append a record to the WAL. Returns the assigned LSN.
152    ///
153    /// If the active segment has exceeded the target size, a new segment is
154    /// created before appending. The rollover is transparent to the caller.
155    pub fn append(
156        &mut self,
157        record_type: u32,
158        tenant_id: u64,
159        vshard_id: u32,
160        payload: &[u8],
161    ) -> Result<u64> {
162        // Check if we need to roll to a new segment.
163        if self.writer.file_offset() >= self.segment_target_size {
164            self.roll_segment()?;
165        }
166
167        self.writer
168            .append(record_type, tenant_id, vshard_id, payload)
169    }
170
171    /// Flush all buffered records and fsync the active segment.
172    pub fn sync(&mut self) -> Result<()> {
173        self.writer.sync()
174    }
175
176    /// The next LSN that will be assigned.
177    pub fn next_lsn(&self) -> u64 {
178        self.writer.next_lsn()
179    }
180
181    /// First LSN of the active (currently written) segment.
182    pub fn active_segment_first_lsn(&self) -> u64 {
183        self.active_first_lsn
184    }
185
186    /// The WAL directory path.
187    pub fn wal_dir(&self) -> &Path {
188        &self.wal_dir
189    }
190
191    /// Truncate old WAL segments that are fully below the checkpoint LSN.
192    ///
193    /// Only sealed (non-active) segments are eligible for deletion.
194    /// The active segment is never deleted.
195    ///
196    /// Returns the number of segments deleted and bytes reclaimed.
197    pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
198        truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
199    }
200
201    /// Replay all committed records across all segments, in LSN order.
202    ///
203    /// Used for crash recovery on startup.
204    pub fn replay(&self) -> Result<Vec<WalRecord>> {
205        replay_all_segments(&self.wal_dir)
206    }
207
208    /// Replay only records with LSN >= `from_lsn`.
209    ///
210    /// Used when recovering from a checkpoint — records before the checkpoint
211    /// LSN have already been applied from the snapshot.
212    pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
213        let all = self.replay()?;
214        Ok(all
215            .into_iter()
216            .filter(|r| r.header.lsn >= from_lsn)
217            .collect())
218    }
219
220    /// Paginated replay (delegates to standalone `replay_from_limit_dir`).
221    pub fn replay_from_limit(
222        &self,
223        from_lsn: u64,
224        max_records: usize,
225    ) -> Result<(Vec<WalRecord>, bool)> {
226        replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
227    }
228
229    /// List all segment metadata (for monitoring / operational tooling).
230    pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
231        discover_segments(&self.wal_dir)
232    }
233
234    /// Total WAL size on disk across all segments.
235    pub fn total_size_bytes(&self) -> Result<u64> {
236        let segments = discover_segments(&self.wal_dir)?;
237        Ok(segments.iter().map(|s| s.file_size).sum())
238    }
239
240    /// Roll to a new segment: seal the current writer and create a new one.
241    fn roll_segment(&mut self) -> Result<()> {
242        // Flush and seal the current segment.
243        self.writer.seal()?;
244
245        // The new segment starts at the next LSN.
246        let new_first_lsn = self.writer.next_lsn();
247        let new_path = segment_path(&self.wal_dir, new_first_lsn);
248
249        let mut new_writer =
250            WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
251
252        // Propagate encryption to the new writer with a fresh epoch.
253        // Each segment gets a new random epoch so the per-segment nonce space
254        // is independent. The ring's key material is preserved; only the epoch
255        // changes. The new preamble is written at the head of the new segment.
256        if let Some(ref ring) = self.encryption_ring {
257            let fresh_key = ring.current().with_fresh_epoch()?;
258            let new_ring = crate::crypto::KeyRing::new(fresh_key);
259            new_writer.set_encryption_ring(new_ring.clone())?;
260            self.encryption_ring = Some(new_ring);
261        }
262
263        self.writer = new_writer;
264        self.active_first_lsn = new_first_lsn;
265
266        // Fsync the WAL directory to ensure the new segment's directory
267        // entry is durable. Without this, a power loss could cause the
268        // new segment file to "disappear" on ext4/XFS.
269        let _ = crate::segment::fsync_directory(&self.wal_dir);
270
271        info!(
272            segment = %new_path.display(),
273            first_lsn = new_first_lsn,
274            "rolled to new WAL segment"
275        );
276
277        Ok(())
278    }
279}
280
281/// Replay all records from all segments in a WAL directory, in LSN order.
282///
283/// Segments are read in order of their first_lsn. Within each segment,
284/// records are read sequentially. This produces a globally ordered stream.
285pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
286    let segments = discover_segments(wal_dir)?;
287    let mut all_records = Vec::new();
288
289    for seg in &segments {
290        let reader = crate::reader::WalReader::open(&seg.path)?;
291        for record_result in reader.records() {
292            all_records.push(record_result?);
293        }
294    }
295
296    Ok(all_records)
297}
298
299/// Paginated replay from a WAL directory: reads at most `max_records` from `from_lsn`.
300///
301/// Uses sequential I/O (not mmap) and does NOT require the WAL mutex — safe
302/// to call concurrently with writes. Sealed segments are immutable; the active
303/// segment is read via buffered I/O which sees data after the writer's fsync.
304///
305/// Returns `(records, has_more)` where `has_more` is `true` if the limit was hit.
306pub fn replay_from_limit_dir(
307    wal_dir: &Path,
308    from_lsn: u64,
309    max_records: usize,
310) -> Result<(Vec<WalRecord>, bool)> {
311    let segments = discover_segments(wal_dir)?;
312    let mut records = Vec::with_capacity(max_records.min(4096));
313
314    for seg in &segments {
315        let reader = crate::reader::WalReader::open(&seg.path)?;
316        for record_result in reader.records() {
317            let record = record_result?;
318            if record.header.lsn >= from_lsn {
319                records.push(record);
320                if records.len() >= max_records {
321                    return Ok((records, true));
322                }
323            }
324        }
325    }
326
327    Ok((records, false))
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::record::RecordType;
334
335    fn test_config(dir: &Path) -> SegmentedWalConfig {
336        SegmentedWalConfig::for_testing(dir.to_path_buf())
337    }
338
339    #[test]
340    fn create_and_append() {
341        let dir = tempfile::tempdir().unwrap();
342        let wal_dir = dir.path().join("wal");
343
344        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
345        let lsn1 = wal.append(RecordType::Put as u32, 1, 0, b"hello").unwrap();
346        let lsn2 = wal.append(RecordType::Put as u32, 1, 0, b"world").unwrap();
347        wal.sync().unwrap();
348
349        assert_eq!(lsn1, 1);
350        assert_eq!(lsn2, 2);
351        assert_eq!(wal.next_lsn(), 3);
352    }
353
354    #[test]
355    fn replay_after_close() {
356        let dir = tempfile::tempdir().unwrap();
357        let wal_dir = dir.path().join("wal");
358
359        // Write records.
360        {
361            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
362            wal.append(RecordType::Put as u32, 1, 0, b"first").unwrap();
363            wal.append(RecordType::Delete as u32, 2, 1, b"second")
364                .unwrap();
365            wal.append(RecordType::Put as u32, 1, 0, b"third").unwrap();
366            wal.sync().unwrap();
367        }
368
369        // Reopen and replay.
370        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
371        let records = wal.replay().unwrap();
372        assert_eq!(records.len(), 3);
373        assert_eq!(records[0].payload, b"first");
374        assert_eq!(records[1].payload, b"second");
375        assert_eq!(records[2].payload, b"third");
376        assert_eq!(wal.next_lsn(), 4);
377    }
378
379    #[test]
380    fn automatic_segment_rollover() {
381        let dir = tempfile::tempdir().unwrap();
382        let wal_dir = dir.path().join("wal");
383
384        // Use a tiny segment target to force rollover.
385        let config = SegmentedWalConfig {
386            wal_dir: wal_dir.clone(),
387            segment_target_size: 100, // 100 bytes — will roll after ~2 records.
388            writer_config: WalWriterConfig {
389                use_direct_io: false,
390                ..Default::default()
391            },
392        };
393
394        let mut wal = SegmentedWal::open(config).unwrap();
395
396        // Write enough records to trigger multiple rollovers.
397        for i in 0..20u32 {
398            let payload = format!("record-{i:04}");
399            wal.append(RecordType::Put as u32, 1, 0, payload.as_bytes())
400                .unwrap();
401            wal.sync().unwrap();
402        }
403
404        // Should have created multiple segments.
405        let segments = wal.list_segments().unwrap();
406        assert!(
407            segments.len() > 1,
408            "expected multiple segments, got {}",
409            segments.len()
410        );
411
412        // Replay should return all 20 records in order.
413        let records = wal.replay().unwrap();
414        assert_eq!(records.len(), 20);
415        for (i, record) in records.iter().enumerate() {
416            assert_eq!(record.header.lsn, (i + 1) as u64);
417            let expected = format!("record-{i:04}");
418            assert_eq!(record.payload, expected.as_bytes());
419        }
420    }
421
422    #[test]
423    fn truncation_removes_old_segments() {
424        let dir = tempfile::tempdir().unwrap();
425        let wal_dir = dir.path().join("wal");
426
427        let config = SegmentedWalConfig {
428            wal_dir: wal_dir.clone(),
429            segment_target_size: 100,
430            writer_config: WalWriterConfig {
431                use_direct_io: false,
432                ..Default::default()
433            },
434        };
435
436        let mut wal = SegmentedWal::open(config).unwrap();
437
438        // Write records to create multiple segments.
439        for i in 0..20u32 {
440            let payload = format!("record-{i:04}");
441            wal.append(RecordType::Put as u32, 1, 0, payload.as_bytes())
442                .unwrap();
443            wal.sync().unwrap();
444        }
445
446        let segments_before = wal.list_segments().unwrap();
447        assert!(segments_before.len() > 1);
448
449        // Truncate with a checkpoint at LSN 15.
450        let result = wal.truncate_before(15).unwrap();
451        assert!(result.segments_deleted > 0);
452        assert!(result.bytes_reclaimed > 0);
453
454        let segments_after = wal.list_segments().unwrap();
455        assert!(segments_after.len() < segments_before.len());
456
457        // Remaining records should still be replayable.
458        let records = wal.replay().unwrap();
459        // At minimum, records from LSN 15+ should be present.
460        assert!(records.iter().any(|r| r.header.lsn >= 15));
461    }
462
463    #[test]
464    fn replay_from_checkpoint_lsn() {
465        let dir = tempfile::tempdir().unwrap();
466        let wal_dir = dir.path().join("wal");
467
468        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
469        for i in 0..10u32 {
470            wal.append(RecordType::Put as u32, 1, 0, format!("r{i}").as_bytes())
471                .unwrap();
472        }
473        wal.sync().unwrap();
474
475        // Replay from LSN 6 — should return LSNs 6..=10.
476        let records = wal.replay_from(6).unwrap();
477        assert_eq!(records.len(), 5);
478        assert_eq!(records[0].header.lsn, 6);
479        assert_eq!(records[4].header.lsn, 10);
480    }
481
482    #[test]
483    fn total_size_bytes() {
484        let dir = tempfile::tempdir().unwrap();
485        let wal_dir = dir.path().join("wal");
486
487        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
488        wal.append(RecordType::Put as u32, 1, 0, b"data").unwrap();
489        wal.sync().unwrap();
490
491        let size = wal.total_size_bytes().unwrap();
492        assert!(size > 0);
493    }
494
495    #[test]
496    fn reopen_continues_lsn() {
497        let dir = tempfile::tempdir().unwrap();
498        let wal_dir = dir.path().join("wal");
499
500        {
501            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
502            wal.append(RecordType::Put as u32, 1, 0, b"a").unwrap();
503            wal.append(RecordType::Put as u32, 1, 0, b"b").unwrap();
504            wal.sync().unwrap();
505        }
506
507        {
508            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
509            assert_eq!(wal.next_lsn(), 3);
510            let lsn = wal.append(RecordType::Put as u32, 1, 0, b"c").unwrap();
511            assert_eq!(lsn, 3);
512            wal.sync().unwrap();
513        }
514
515        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
516        let records = wal.replay().unwrap();
517        assert_eq!(records.len(), 3);
518    }
519
520    #[test]
521    fn encryption_persists_across_segments() {
522        // Rewritten: actually decrypts across a simulated restart.
523        //
524        // Lifecycle:
525        //  1. Write 10 records with encryption, forcing segment rollover.
526        //  2. Drop the WAL (simulating process exit).
527        //  3. Reopen a new reader and replay — records are marked encrypted.
528        //  4. For each segment, open the reader, read its preamble epoch, and
529        //     decrypt each record using that epoch. Verify payloads match.
530        let dir = tempfile::tempdir().unwrap();
531        let wal_dir = dir.path().join("wal");
532        let key_bytes = [42u8; 32];
533
534        let config = SegmentedWalConfig {
535            wal_dir: wal_dir.clone(),
536            segment_target_size: 100,
537            writer_config: WalWriterConfig {
538                use_direct_io: false,
539                ..Default::default()
540            },
541        };
542
543        // Step 1 & 2: write and drop.
544        {
545            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
546            let ring = crate::crypto::KeyRing::new(key);
547            let mut wal = SegmentedWal::open(config.clone()).unwrap();
548            wal.set_encryption_ring(ring).unwrap();
549
550            for i in 0..10u32 {
551                wal.append(RecordType::Put as u32, 1, 0, format!("enc-{i}").as_bytes())
552                    .unwrap();
553                wal.sync().unwrap();
554            }
555            assert!(wal.list_segments().unwrap().len() > 1);
556        }
557
558        // Step 3: reopen WAL (new in-memory epoch, simulates restart).
559        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
560        assert!(
561            segments.len() > 1,
562            "expected multiple segments after rollover"
563        );
564
565        let key_for_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
566        let ring_for_read = crate::crypto::KeyRing::new(key_for_read);
567
568        let mut all_payloads = Vec::new();
569
570        // Step 4: per-segment read + decrypt.
571        for seg in &segments {
572            let reader = crate::reader::WalReader::open(&seg.path).unwrap();
573            // Read the epoch from the preamble written at segment open time.
574            let epoch = *reader
575                .segment_preamble()
576                .expect("encrypted segment must have a preamble")
577                .epoch();
578            let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
579
580            for record_result in reader.records() {
581                let record = record_result.unwrap();
582                assert!(record.is_encrypted(), "all records should be encrypted");
583                let plaintext = record
584                    .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_for_read))
585                    .unwrap();
586                all_payloads.push(plaintext);
587            }
588        }
589
590        assert_eq!(all_payloads.len(), 10);
591        for (i, payload) in all_payloads.iter().enumerate() {
592            assert_eq!(payload, format!("enc-{i}").as_bytes());
593        }
594    }
595
596    /// WAL restart roundtrip with encryption.
597    ///
598    /// Writes encrypted records, simulates a process restart (different in-memory
599    /// epoch), replays, and verifies that decryption succeeds because the epoch
600    /// is read from the on-disk preamble rather than the current key's epoch.
601    #[test]
602    fn wal_encrypted_restart_roundtrip() {
603        let dir = tempfile::tempdir().unwrap();
604        let wal_dir = dir.path().join("wal");
605        let key_bytes = [0xABu8; 32];
606
607        let config = SegmentedWalConfig::for_testing(wal_dir.clone());
608
609        // Write with key lifetime 1.
610        {
611            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
612            let ring = crate::crypto::KeyRing::new(key);
613            let mut wal = SegmentedWal::open(config.clone()).unwrap();
614            wal.set_encryption_ring(ring).unwrap();
615
616            for i in 0..5u32 {
617                wal.append(
618                    RecordType::Put as u32,
619                    1,
620                    0,
621                    format!("restart-{i}").as_bytes(),
622                )
623                .unwrap();
624            }
625            wal.sync().unwrap();
626        }
627
628        // Simulate restart: new WalEncryptionKey with same bytes (fresh random epoch).
629        let key_restart = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
630        let ring_restart = crate::crypto::KeyRing::new(key_restart);
631
632        // Replay all segments and decrypt.
633        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
634        let mut payloads = Vec::new();
635
636        for seg in &segments {
637            let reader = crate::reader::WalReader::open(&seg.path).unwrap();
638            let epoch = *reader
639                .segment_preamble()
640                .expect("segment must have preamble after encrypted write")
641                .epoch();
642            let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
643
644            for record_result in reader.records() {
645                let record = record_result.unwrap();
646                let pt = record
647                    .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_restart))
648                    .unwrap();
649                payloads.push(pt);
650            }
651        }
652
653        assert_eq!(payloads.len(), 5);
654        for (i, pt) in payloads.iter().enumerate() {
655            assert_eq!(pt, format!("restart-{i}").as_bytes());
656        }
657    }
658
659    /// Epoch tamper rejection.
660    ///
661    /// After writing an encrypted segment, corrupt the preamble bytes on disk.
662    /// Decryption must fail because the preamble is part of the AAD.
663    #[test]
664    fn epoch_tamper_rejected() {
665        let dir = tempfile::tempdir().unwrap();
666        let wal_dir = dir.path().join("wal");
667        let key_bytes = [0x55u8; 32];
668
669        let config = SegmentedWalConfig::for_testing(wal_dir.clone());
670
671        {
672            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
673            let ring = crate::crypto::KeyRing::new(key);
674            let mut wal = SegmentedWal::open(config).unwrap();
675            wal.set_encryption_ring(ring).unwrap();
676            wal.append(RecordType::Put as u32, 1, 0, b"sensitive payload")
677                .unwrap();
678            wal.sync().unwrap();
679        }
680
681        // Find the segment file and corrupt byte 9 of the preamble (epoch field).
682        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
683        assert_eq!(segments.len(), 1);
684        let seg_path = &segments[0].path;
685
686        let mut raw = std::fs::read(seg_path).unwrap();
687        // Preamble is at offset 0; epoch is bytes 8..12. Flip byte 9.
688        raw[9] ^= 0xFF;
689        std::fs::write(seg_path, &raw).unwrap();
690
691        // Reading the preamble will succeed (bytes are valid), but the epoch
692        // in the preamble will be wrong, so the AAD during decryption won't
693        // match what was used at encryption time — decryption must fail.
694        let key_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
695        let ring_read = crate::crypto::KeyRing::new(key_read);
696
697        let reader = crate::reader::WalReader::open(seg_path).unwrap();
698        let epoch = *reader.segment_preamble().unwrap().epoch();
699        let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
700
701        let record = reader.records().next().unwrap().unwrap();
702        let result = record.decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_read));
703        assert!(
704            result.is_err(),
705            "decryption with tampered preamble epoch must fail"
706        );
707    }
708
709    #[test]
710    fn replay_from_limit_basic() {
711        let dir = tempfile::tempdir().unwrap();
712        let config = test_config(dir.path());
713        let mut wal = SegmentedWal::open(config).unwrap();
714
715        // Append 10 records.
716        for i in 0..10u8 {
717            wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
718        }
719        wal.sync().unwrap();
720
721        // Read all with limit > count.
722        let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
723        assert_eq!(records.len(), 10);
724        assert!(!has_more);
725
726        // Read with limit = 3.
727        let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
728        assert_eq!(records.len(), 3);
729        assert!(has_more);
730        assert_eq!(records[0].header.lsn, 1);
731        assert_eq!(records[2].header.lsn, 3);
732    }
733
734    #[test]
735    fn replay_from_limit_with_lsn_filter() {
736        let dir = tempfile::tempdir().unwrap();
737        let config = test_config(dir.path());
738        let mut wal = SegmentedWal::open(config).unwrap();
739
740        for i in 0..10u8 {
741            wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
742        }
743        wal.sync().unwrap();
744
745        // Start from LSN 6 with limit 100 — should get 5 records (LSNs 6-10).
746        let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
747        assert_eq!(records.len(), 5);
748        assert!(!has_more);
749        assert_eq!(records[0].header.lsn, 6);
750
751        // Start from LSN 6 with limit 2 — should get 2 records.
752        let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
753        assert_eq!(records.len(), 2);
754        assert!(has_more);
755    }
756
757    #[test]
758    fn replay_from_limit_empty() {
759        let dir = tempfile::tempdir().unwrap();
760        let config = test_config(dir.path());
761        let mut wal = SegmentedWal::open(config).unwrap();
762
763        wal.append(RecordType::Put as u32, 1, 0, b"a").unwrap();
764        wal.sync().unwrap();
765
766        // Start from beyond all records.
767        let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
768        assert!(records.is_empty());
769        assert!(!has_more);
770    }
771
772    #[test]
773    fn replay_from_limit_across_segments() {
774        let dir = tempfile::tempdir().unwrap();
775        let config = test_config(dir.path());
776        let mut wal = SegmentedWal::open(config).unwrap();
777
778        // Write 10 records to first segment.
779        for i in 0..10u8 {
780            wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
781        }
782        wal.sync().unwrap();
783        // Force a segment rollover.
784        wal.roll_segment().unwrap();
785
786        // Write 10 more records to second segment.
787        for i in 10..20u8 {
788            wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
789        }
790        wal.sync().unwrap();
791
792        let seg_count = wal.list_segments().unwrap().len();
793        assert!(
794            seg_count >= 2,
795            "expected multiple segments, got {seg_count}"
796        );
797
798        // Paginated read should span segments correctly.
799        let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
800        assert_eq!(records.len(), 5);
801        assert!(has_more);
802
803        // Continue from where we left off.
804        let next_lsn = records.last().unwrap().header.lsn + 1;
805        let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
806        assert_eq!(records2.len(), 15); // 20 - 5 = 15 remaining
807    }
808}