Skip to main content

nodedb_wal/
segmented.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Segmented WAL: manages a directory of segment files with automatic rollover
4//! and truncation.
5//!
6//! This is the primary WAL interface for production use. It wraps `WalWriter`
7//! and `WalReader` to provide:
8//!
9//! - Automatic segment rollover when the active segment exceeds a target size.
10//! - Truncation of old segments after checkpoint confirmation.
11//! - Transparent multi-segment replay for crash recovery.
12//! - Legacy single-file WAL migration.
13//!
14//! ## Thread safety
15//!
16//! `SegmentedWal` is NOT `Send + Sync` by itself. The `WalManager` in the main
17//! crate wraps it in a `Mutex` for thread-safe access from the Control Plane.
18
19use std::fs;
20use std::path::{Path, PathBuf};
21
22use tracing::info;
23
24use crate::error::{Result, WalError};
25use crate::record::WalRecord;
26use crate::segment::{
27    DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
28    truncate_segments,
29};
30use crate::writer::{WalWriter, WalWriterConfig};
31
32/// Configuration for the segmented WAL.
33#[derive(Debug, Clone)]
34pub struct SegmentedWalConfig {
35    /// WAL directory path.
36    pub wal_dir: PathBuf,
37
38    /// Target segment size in bytes. When the active segment exceeds this,
39    /// a new segment is created. This is a soft limit — the current record
40    /// is always completed before rolling.
41    pub segment_target_size: u64,
42
43    /// Writer configuration (alignment, buffer size, O_DIRECT).
44    pub writer_config: WalWriterConfig,
45}
46
47impl SegmentedWalConfig {
48    /// Create a config with defaults for the given directory.
49    pub fn new(wal_dir: PathBuf) -> Self {
50        Self {
51            wal_dir,
52            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
53            writer_config: WalWriterConfig::default(),
54        }
55    }
56
57    /// Create a config for testing (no O_DIRECT).
58    pub fn for_testing(wal_dir: PathBuf) -> Self {
59        Self {
60            wal_dir,
61            segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
62            writer_config: WalWriterConfig {
63                use_direct_io: false,
64                ..Default::default()
65            },
66        }
67    }
68}
69
70/// A segmented write-ahead log.
71///
72/// Manages a directory of WAL segment files. Records are appended to the active
73/// segment. When the active segment exceeds `segment_target_size`, a new segment
74/// is created automatically on the next append.
75pub struct SegmentedWal {
76    /// WAL directory.
77    wal_dir: PathBuf,
78
79    /// The currently active writer (appending to the latest segment).
80    writer: WalWriter,
81
82    /// First LSN of the active segment (used for truncation safety).
83    active_first_lsn: u64,
84
85    /// Target size for segment rollover.
86    segment_target_size: u64,
87
88    /// Writer config (for creating new segments).
89    writer_config: WalWriterConfig,
90
91    /// Optional encryption key ring.
92    encryption_ring: Option<crate::crypto::KeyRing>,
93}
94
95impl SegmentedWal {
96    /// Open or create a segmented WAL in the given directory.
97    ///
98    /// On first startup, creates the directory and the first segment.
99    /// On subsequent startups, discovers existing segments and opens the
100    /// last one for continued appending.
101    pub fn open(config: SegmentedWalConfig) -> Result<Self> {
102        fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
103
104        let segments = discover_segments(&config.wal_dir)?;
105
106        let (writer, active_first_lsn) = if segments.is_empty() {
107            // Fresh WAL — create the first segment starting at LSN 1.
108            let path = segment_path(&config.wal_dir, 1);
109            let writer = WalWriter::open(&path, config.writer_config.clone())?;
110            (writer, 1u64)
111        } else {
112            // Resume from the last segment.
113            let last = &segments[segments.len() - 1];
114            let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
115            (writer, last.first_lsn)
116        };
117
118        info!(
119            wal_dir = %config.wal_dir.display(),
120            segments = segments.len().max(1),
121            active_first_lsn,
122            next_lsn = writer.next_lsn(),
123            "segmented WAL opened"
124        );
125
126        Ok(Self {
127            wal_dir: config.wal_dir,
128            writer,
129            active_first_lsn,
130            segment_target_size: config.segment_target_size,
131            writer_config: config.writer_config,
132            encryption_ring: None,
133        })
134    }
135
136    /// Set the encryption key ring. All subsequent records will be encrypted.
137    ///
138    /// Must be called before the first `append`. Returns an error if records
139    /// have already been written to the active segment.
140    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
141        self.writer.set_encryption_ring(ring.clone())?;
142        self.encryption_ring = Some(ring);
143        Ok(())
144    }
145
146    /// Get the encryption key ring (for replay decryption).
147    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
148        self.encryption_ring.as_ref()
149    }
150
151    /// Append a record to the WAL. Returns the assigned LSN.
152    ///
153    /// If the active segment has exceeded the target size, a new segment is
154    /// created before appending. The rollover is transparent to the caller.
155    ///
156    /// `database_id` is stored in header bytes 34-41. Pass `0` for the
157    /// default database (backward-compatible with pre-existing records).
158    pub fn append(
159        &mut self,
160        record_type: u32,
161        tenant_id: u64,
162        vshard_id: u32,
163        database_id: u64,
164        payload: &[u8],
165    ) -> Result<u64> {
166        // Check if we need to roll to a new segment.
167        if self.writer.file_offset() >= self.segment_target_size {
168            self.roll_segment()?;
169        }
170
171        self.writer
172            .append(record_type, tenant_id, vshard_id, database_id, payload)
173    }
174
175    /// Flush all buffered records and fsync the active segment.
176    pub fn sync(&mut self) -> Result<()> {
177        self.writer.sync()
178    }
179
180    /// The next LSN that will be assigned.
181    pub fn next_lsn(&self) -> u64 {
182        self.writer.next_lsn()
183    }
184
185    /// First LSN of the active (currently written) segment.
186    pub fn active_segment_first_lsn(&self) -> u64 {
187        self.active_first_lsn
188    }
189
190    /// The WAL directory path.
191    pub fn wal_dir(&self) -> &Path {
192        &self.wal_dir
193    }
194
195    /// Truncate old WAL segments that are fully below the checkpoint LSN.
196    ///
197    /// Only sealed (non-active) segments are eligible for deletion.
198    /// The active segment is never deleted.
199    ///
200    /// Returns the number of segments deleted and bytes reclaimed.
201    pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
202        truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
203    }
204
205    /// Replay all committed records across all segments, in LSN order.
206    ///
207    /// Used for crash recovery on startup.
208    pub fn replay(&self) -> Result<Vec<WalRecord>> {
209        replay_all_segments(&self.wal_dir)
210    }
211
212    /// Replay only records with LSN >= `from_lsn`.
213    ///
214    /// Used when recovering from a checkpoint — records before the checkpoint
215    /// LSN have already been applied from the snapshot.
216    pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
217        let all = self.replay()?;
218        Ok(all
219            .into_iter()
220            .filter(|r| r.header.lsn >= from_lsn)
221            .collect())
222    }
223
224    /// Paginated replay (delegates to standalone `replay_from_limit_dir`).
225    pub fn replay_from_limit(
226        &self,
227        from_lsn: u64,
228        max_records: usize,
229    ) -> Result<(Vec<WalRecord>, bool)> {
230        replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
231    }
232
233    /// List all segment metadata (for monitoring / operational tooling).
234    pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
235        discover_segments(&self.wal_dir)
236    }
237
238    /// Total WAL size on disk across all segments.
239    pub fn total_size_bytes(&self) -> Result<u64> {
240        let segments = discover_segments(&self.wal_dir)?;
241        Ok(segments.iter().map(|s| s.file_size).sum())
242    }
243
244    /// Roll to a new segment: seal the current writer and create a new one.
245    fn roll_segment(&mut self) -> Result<()> {
246        // Flush and seal the current segment.
247        self.writer.seal()?;
248
249        // The new segment starts at the next LSN.
250        let new_first_lsn = self.writer.next_lsn();
251        let new_path = segment_path(&self.wal_dir, new_first_lsn);
252
253        let mut new_writer =
254            WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
255
256        // Propagate encryption to the new writer with a fresh epoch.
257        // Each segment gets a new random epoch so the per-segment nonce space
258        // is independent. The ring's key material is preserved; only the epoch
259        // changes. The new preamble is written at the head of the new segment.
260        if let Some(ref ring) = self.encryption_ring {
261            let fresh_key = ring.current().with_fresh_epoch()?;
262            let new_ring = crate::crypto::KeyRing::new(fresh_key);
263            new_writer.set_encryption_ring(new_ring.clone())?;
264            self.encryption_ring = Some(new_ring);
265        }
266
267        self.writer = new_writer;
268        self.active_first_lsn = new_first_lsn;
269
270        // Fsync the WAL directory to ensure the new segment's directory
271        // entry is durable. Without this, a power loss could cause the
272        // new segment file to "disappear" on ext4/XFS.
273        let _ = crate::segment::fsync_directory(&self.wal_dir);
274
275        info!(
276            segment = %new_path.display(),
277            first_lsn = new_first_lsn,
278            "rolled to new WAL segment"
279        );
280
281        Ok(())
282    }
283}
284
285/// Replay all records from all segments in a WAL directory, in LSN order.
286///
287/// Segments are read in order of their first_lsn. Within each segment,
288/// records are read sequentially. This produces a globally ordered stream.
289pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
290    let segments = discover_segments(wal_dir)?;
291    let mut all_records = Vec::new();
292
293    for seg in &segments {
294        let reader = crate::reader::WalReader::open(&seg.path)?;
295        for record_result in reader.records() {
296            all_records.push(record_result?);
297        }
298    }
299
300    Ok(all_records)
301}
302
303/// Paginated replay from a WAL directory: reads at most `max_records` from `from_lsn`.
304///
305/// Uses sequential I/O (not mmap) and does NOT require the WAL mutex — safe
306/// to call concurrently with writes. Sealed segments are immutable; the active
307/// segment is read via buffered I/O which sees data after the writer's fsync.
308///
309/// Returns `(records, has_more)` where `has_more` is `true` if the limit was hit.
310pub fn replay_from_limit_dir(
311    wal_dir: &Path,
312    from_lsn: u64,
313    max_records: usize,
314) -> Result<(Vec<WalRecord>, bool)> {
315    let segments = discover_segments(wal_dir)?;
316    let mut records = Vec::with_capacity(max_records.min(4096));
317
318    for seg in &segments {
319        let reader = crate::reader::WalReader::open(&seg.path)?;
320        for record_result in reader.records() {
321            let record = record_result?;
322            if record.header.lsn >= from_lsn {
323                records.push(record);
324                if records.len() >= max_records {
325                    return Ok((records, true));
326                }
327            }
328        }
329    }
330
331    Ok((records, false))
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::record::RecordType;
338
339    fn test_config(dir: &Path) -> SegmentedWalConfig {
340        SegmentedWalConfig::for_testing(dir.to_path_buf())
341    }
342
343    #[test]
344    fn create_and_append() {
345        let dir = tempfile::tempdir().unwrap();
346        let wal_dir = dir.path().join("wal");
347
348        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
349        let lsn1 = wal
350            .append(RecordType::Put as u32, 1, 0, 0, b"hello")
351            .unwrap();
352        let lsn2 = wal
353            .append(RecordType::Put as u32, 1, 0, 0, b"world")
354            .unwrap();
355        wal.sync().unwrap();
356
357        assert_eq!(lsn1, 1);
358        assert_eq!(lsn2, 2);
359        assert_eq!(wal.next_lsn(), 3);
360    }
361
362    #[test]
363    fn replay_after_close() {
364        let dir = tempfile::tempdir().unwrap();
365        let wal_dir = dir.path().join("wal");
366
367        // Write records.
368        {
369            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
370            wal.append(RecordType::Put as u32, 1, 0, 0, b"first")
371                .unwrap();
372            wal.append(RecordType::Delete as u32, 2, 1, 0, b"second")
373                .unwrap();
374            wal.append(RecordType::Put as u32, 1, 0, 0, b"third")
375                .unwrap();
376            wal.sync().unwrap();
377        }
378
379        // Reopen and replay.
380        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
381        let records = wal.replay().unwrap();
382        assert_eq!(records.len(), 3);
383        assert_eq!(records[0].payload, b"first");
384        assert_eq!(records[1].payload, b"second");
385        assert_eq!(records[2].payload, b"third");
386        assert_eq!(wal.next_lsn(), 4);
387    }
388
389    #[test]
390    fn automatic_segment_rollover() {
391        let dir = tempfile::tempdir().unwrap();
392        let wal_dir = dir.path().join("wal");
393
394        // Use a tiny segment target to force rollover.
395        let config = SegmentedWalConfig {
396            wal_dir: wal_dir.clone(),
397            segment_target_size: 100, // 100 bytes — will roll after ~2 records.
398            writer_config: WalWriterConfig {
399                use_direct_io: false,
400                ..Default::default()
401            },
402        };
403
404        let mut wal = SegmentedWal::open(config).unwrap();
405
406        // Write enough records to trigger multiple rollovers.
407        for i in 0..20u32 {
408            let payload = format!("record-{i:04}");
409            wal.append(RecordType::Put as u32, 1, 0, 0, payload.as_bytes())
410                .unwrap();
411            wal.sync().unwrap();
412        }
413
414        // Should have created multiple segments.
415        let segments = wal.list_segments().unwrap();
416        assert!(
417            segments.len() > 1,
418            "expected multiple segments, got {}",
419            segments.len()
420        );
421
422        // Replay should return all 20 records in order.
423        let records = wal.replay().unwrap();
424        assert_eq!(records.len(), 20);
425        for (i, record) in records.iter().enumerate() {
426            assert_eq!(record.header.lsn, (i + 1) as u64);
427            let expected = format!("record-{i:04}");
428            assert_eq!(record.payload, expected.as_bytes());
429        }
430    }
431
432    #[test]
433    fn truncation_removes_old_segments() {
434        let dir = tempfile::tempdir().unwrap();
435        let wal_dir = dir.path().join("wal");
436
437        let config = SegmentedWalConfig {
438            wal_dir: wal_dir.clone(),
439            segment_target_size: 100,
440            writer_config: WalWriterConfig {
441                use_direct_io: false,
442                ..Default::default()
443            },
444        };
445
446        let mut wal = SegmentedWal::open(config).unwrap();
447
448        // Write records to create multiple segments.
449        for i in 0..20u32 {
450            let payload = format!("record-{i:04}");
451            wal.append(RecordType::Put as u32, 1, 0, 0, payload.as_bytes())
452                .unwrap();
453            wal.sync().unwrap();
454        }
455
456        let segments_before = wal.list_segments().unwrap();
457        assert!(segments_before.len() > 1);
458
459        // Truncate with a checkpoint at LSN 15.
460        let result = wal.truncate_before(15).unwrap();
461        assert!(result.segments_deleted > 0);
462        assert!(result.bytes_reclaimed > 0);
463
464        let segments_after = wal.list_segments().unwrap();
465        assert!(segments_after.len() < segments_before.len());
466
467        // Remaining records should still be replayable.
468        let records = wal.replay().unwrap();
469        // At minimum, records from LSN 15+ should be present.
470        assert!(records.iter().any(|r| r.header.lsn >= 15));
471    }
472
473    #[test]
474    fn replay_from_checkpoint_lsn() {
475        let dir = tempfile::tempdir().unwrap();
476        let wal_dir = dir.path().join("wal");
477
478        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
479        for i in 0..10u32 {
480            wal.append(RecordType::Put as u32, 1, 0, 0, format!("r{i}").as_bytes())
481                .unwrap();
482        }
483        wal.sync().unwrap();
484
485        // Replay from LSN 6 — should return LSNs 6..=10.
486        let records = wal.replay_from(6).unwrap();
487        assert_eq!(records.len(), 5);
488        assert_eq!(records[0].header.lsn, 6);
489        assert_eq!(records[4].header.lsn, 10);
490    }
491
492    #[test]
493    fn total_size_bytes() {
494        let dir = tempfile::tempdir().unwrap();
495        let wal_dir = dir.path().join("wal");
496
497        let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
498        wal.append(RecordType::Put as u32, 1, 0, 0, b"data")
499            .unwrap();
500        wal.sync().unwrap();
501
502        let size = wal.total_size_bytes().unwrap();
503        assert!(size > 0);
504    }
505
506    #[test]
507    fn reopen_continues_lsn() {
508        let dir = tempfile::tempdir().unwrap();
509        let wal_dir = dir.path().join("wal");
510
511        {
512            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
513            wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
514            wal.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
515            wal.sync().unwrap();
516        }
517
518        {
519            let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
520            assert_eq!(wal.next_lsn(), 3);
521            let lsn = wal.append(RecordType::Put as u32, 1, 0, 0, b"c").unwrap();
522            assert_eq!(lsn, 3);
523            wal.sync().unwrap();
524        }
525
526        let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
527        let records = wal.replay().unwrap();
528        assert_eq!(records.len(), 3);
529    }
530
531    #[test]
532    fn encryption_persists_across_segments() {
533        // Rewritten: actually decrypts across a simulated restart.
534        //
535        // Lifecycle:
536        //  1. Write 10 records with encryption, forcing segment rollover.
537        //  2. Drop the WAL (simulating process exit).
538        //  3. Reopen a new reader and replay — records are marked encrypted.
539        //  4. For each segment, open the reader, read its preamble epoch, and
540        //     decrypt each record using that epoch. Verify payloads match.
541        let dir = tempfile::tempdir().unwrap();
542        let wal_dir = dir.path().join("wal");
543        let key_bytes = [42u8; 32];
544
545        let config = SegmentedWalConfig {
546            wal_dir: wal_dir.clone(),
547            segment_target_size: 100,
548            writer_config: WalWriterConfig {
549                use_direct_io: false,
550                ..Default::default()
551            },
552        };
553
554        // Step 1 & 2: write and drop.
555        {
556            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
557            let ring = crate::crypto::KeyRing::new(key);
558            let mut wal = SegmentedWal::open(config.clone()).unwrap();
559            wal.set_encryption_ring(ring).unwrap();
560
561            for i in 0..10u32 {
562                wal.append(
563                    RecordType::Put as u32,
564                    1,
565                    0,
566                    0,
567                    format!("enc-{i}").as_bytes(),
568                )
569                .unwrap();
570                wal.sync().unwrap();
571            }
572            assert!(wal.list_segments().unwrap().len() > 1);
573        }
574
575        // Step 3: reopen WAL (new in-memory epoch, simulates restart).
576        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
577        assert!(
578            segments.len() > 1,
579            "expected multiple segments after rollover"
580        );
581
582        let key_for_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
583        let ring_for_read = crate::crypto::KeyRing::new(key_for_read);
584
585        let mut all_payloads = Vec::new();
586
587        // Step 4: per-segment read + decrypt.
588        for seg in &segments {
589            let reader = crate::reader::WalReader::open(&seg.path).unwrap();
590            // Read the epoch from the preamble written at segment open time.
591            let epoch = *reader
592                .segment_preamble()
593                .expect("encrypted segment must have a preamble")
594                .epoch();
595            let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
596
597            for record_result in reader.records() {
598                let record = record_result.unwrap();
599                assert!(record.is_encrypted(), "all records should be encrypted");
600                let plaintext = record
601                    .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_for_read))
602                    .unwrap();
603                all_payloads.push(plaintext);
604            }
605        }
606
607        assert_eq!(all_payloads.len(), 10);
608        for (i, payload) in all_payloads.iter().enumerate() {
609            assert_eq!(payload, format!("enc-{i}").as_bytes());
610        }
611    }
612
613    /// WAL restart roundtrip with encryption.
614    ///
615    /// Writes encrypted records, simulates a process restart (different in-memory
616    /// epoch), replays, and verifies that decryption succeeds because the epoch
617    /// is read from the on-disk preamble rather than the current key's epoch.
618    #[test]
619    fn wal_encrypted_restart_roundtrip() {
620        let dir = tempfile::tempdir().unwrap();
621        let wal_dir = dir.path().join("wal");
622        let key_bytes = [0xABu8; 32];
623
624        let config = SegmentedWalConfig::for_testing(wal_dir.clone());
625
626        // Write with key lifetime 1.
627        {
628            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
629            let ring = crate::crypto::KeyRing::new(key);
630            let mut wal = SegmentedWal::open(config.clone()).unwrap();
631            wal.set_encryption_ring(ring).unwrap();
632
633            for i in 0..5u32 {
634                wal.append(
635                    RecordType::Put as u32,
636                    1,
637                    0,
638                    0,
639                    format!("restart-{i}").as_bytes(),
640                )
641                .unwrap();
642            }
643            wal.sync().unwrap();
644        }
645
646        // Simulate restart: new WalEncryptionKey with same bytes (fresh random epoch).
647        let key_restart = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
648        let ring_restart = crate::crypto::KeyRing::new(key_restart);
649
650        // Replay all segments and decrypt.
651        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
652        let mut payloads = Vec::new();
653
654        for seg in &segments {
655            let reader = crate::reader::WalReader::open(&seg.path).unwrap();
656            let epoch = *reader
657                .segment_preamble()
658                .expect("segment must have preamble after encrypted write")
659                .epoch();
660            let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
661
662            for record_result in reader.records() {
663                let record = record_result.unwrap();
664                let pt = record
665                    .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_restart))
666                    .unwrap();
667                payloads.push(pt);
668            }
669        }
670
671        assert_eq!(payloads.len(), 5);
672        for (i, pt) in payloads.iter().enumerate() {
673            assert_eq!(pt, format!("restart-{i}").as_bytes());
674        }
675    }
676
677    /// Epoch tamper rejection.
678    ///
679    /// After writing an encrypted segment, corrupt the preamble bytes on disk.
680    /// Decryption must fail because the preamble is part of the AAD.
681    #[test]
682    fn epoch_tamper_rejected() {
683        let dir = tempfile::tempdir().unwrap();
684        let wal_dir = dir.path().join("wal");
685        let key_bytes = [0x55u8; 32];
686
687        let config = SegmentedWalConfig::for_testing(wal_dir.clone());
688
689        {
690            let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
691            let ring = crate::crypto::KeyRing::new(key);
692            let mut wal = SegmentedWal::open(config).unwrap();
693            wal.set_encryption_ring(ring).unwrap();
694            wal.append(RecordType::Put as u32, 1, 0, 0, b"sensitive payload")
695                .unwrap();
696            wal.sync().unwrap();
697        }
698
699        // Find the segment file and corrupt byte 9 of the preamble (epoch field).
700        let segments = crate::segment::discover_segments(&wal_dir).unwrap();
701        assert_eq!(segments.len(), 1);
702        let seg_path = &segments[0].path;
703
704        let mut raw = std::fs::read(seg_path).unwrap();
705        // Preamble is at offset 0; epoch is bytes 8..12. Flip byte 9.
706        raw[9] ^= 0xFF;
707        std::fs::write(seg_path, &raw).unwrap();
708
709        // Reading the preamble will succeed (bytes are valid), but the epoch
710        // in the preamble will be wrong, so the AAD during decryption won't
711        // match what was used at encryption time — decryption must fail.
712        let key_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
713        let ring_read = crate::crypto::KeyRing::new(key_read);
714
715        let reader = crate::reader::WalReader::open(seg_path).unwrap();
716        let epoch = *reader.segment_preamble().unwrap().epoch();
717        let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
718
719        let record = reader.records().next().unwrap().unwrap();
720        let result = record.decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_read));
721        assert!(
722            result.is_err(),
723            "decryption with tampered preamble epoch must fail"
724        );
725    }
726
727    #[test]
728    fn replay_from_limit_basic() {
729        let dir = tempfile::tempdir().unwrap();
730        let config = test_config(dir.path());
731        let mut wal = SegmentedWal::open(config).unwrap();
732
733        // Append 10 records.
734        for i in 0..10u8 {
735            wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
736        }
737        wal.sync().unwrap();
738
739        // Read all with limit > count.
740        let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
741        assert_eq!(records.len(), 10);
742        assert!(!has_more);
743
744        // Read with limit = 3.
745        let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
746        assert_eq!(records.len(), 3);
747        assert!(has_more);
748        assert_eq!(records[0].header.lsn, 1);
749        assert_eq!(records[2].header.lsn, 3);
750    }
751
752    #[test]
753    fn replay_from_limit_with_lsn_filter() {
754        let dir = tempfile::tempdir().unwrap();
755        let config = test_config(dir.path());
756        let mut wal = SegmentedWal::open(config).unwrap();
757
758        for i in 0..10u8 {
759            wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
760        }
761        wal.sync().unwrap();
762
763        // Start from LSN 6 with limit 100 — should get 5 records (LSNs 6-10).
764        let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
765        assert_eq!(records.len(), 5);
766        assert!(!has_more);
767        assert_eq!(records[0].header.lsn, 6);
768
769        // Start from LSN 6 with limit 2 — should get 2 records.
770        let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
771        assert_eq!(records.len(), 2);
772        assert!(has_more);
773    }
774
775    #[test]
776    fn replay_from_limit_empty() {
777        let dir = tempfile::tempdir().unwrap();
778        let config = test_config(dir.path());
779        let mut wal = SegmentedWal::open(config).unwrap();
780
781        wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
782        wal.sync().unwrap();
783
784        // Start from beyond all records.
785        let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
786        assert!(records.is_empty());
787        assert!(!has_more);
788    }
789
790    #[test]
791    fn replay_from_limit_across_segments() {
792        let dir = tempfile::tempdir().unwrap();
793        let config = test_config(dir.path());
794        let mut wal = SegmentedWal::open(config).unwrap();
795
796        // Write 10 records to first segment.
797        for i in 0..10u8 {
798            wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
799        }
800        wal.sync().unwrap();
801        // Force a segment rollover.
802        wal.roll_segment().unwrap();
803
804        // Write 10 more records to second segment.
805        for i in 10..20u8 {
806            wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
807        }
808        wal.sync().unwrap();
809
810        let seg_count = wal.list_segments().unwrap().len();
811        assert!(
812            seg_count >= 2,
813            "expected multiple segments, got {seg_count}"
814        );
815
816        // Paginated read should span segments correctly.
817        let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
818        assert_eq!(records.len(), 5);
819        assert!(has_more);
820
821        // Continue from where we left off.
822        let next_lsn = records.last().unwrap().header.lsn + 1;
823        let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
824        assert_eq!(records2.len(), 15); // 20 - 5 = 15 remaining
825    }
826}