Skip to main content

sochdb_storage/
wal_segment.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! WAL Segmentation and Checkpoint Manager
19//!
20//! This module provides bounded recovery time through:
21//! - WAL file segmentation (rotate after size/time threshold)
22//! - Fuzzy checkpointing (no blocking writes)
23//! - Automatic old segment cleanup after checkpoint
24//!
25//! ## Architecture
26//!
27//! ```text
28//! Active Writes → Current Segment → Rotation → Archived Segment
29//!                                                      ↓
30//!                                              Checkpoint
31//!                                                      ↓
32//!                                              Segment Cleanup
33//! ```
34//!
35//! ## Recovery Time Bound
36//!
37//! Recovery time is bounded by segment_max_size / disk_bandwidth.
38//! With 64MB segments and 400 MB/s sequential read, recovery ≤ 160ms.
39
40use std::collections::BTreeMap;
41use std::fs::{self, File, OpenOptions};
42use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
45use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
46
47use parking_lot::{Mutex, RwLock};
48
49/// Default maximum segment size (64 MB)
50pub const DEFAULT_SEGMENT_MAX_SIZE: u64 = 64 * 1024 * 1024;
51
52/// Default segment rotation interval (5 minutes)
53pub const DEFAULT_ROTATION_INTERVAL: Duration = Duration::from_secs(300);
54
55/// Default checkpoint interval (1 minute)
56pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
57
58/// Segment file header magic
59const SEGMENT_MAGIC: u32 = 0x574C5347; // "WLSG"
60
61/// Segment header version
62const SEGMENT_VERSION: u16 = 1;
63
64/// Segment header size
65const SEGMENT_HEADER_SIZE: usize = 32;
66
67/// Checkpoint file magic
68const CHECKPOINT_MAGIC: u32 = 0x43484B50; // "CHKP"
69
70/// WAL segment configuration
71#[derive(Debug, Clone)]
72pub struct SegmentConfig {
73    /// Maximum segment size before rotation
74    pub max_size: u64,
75    /// Maximum time before rotation
76    pub rotation_interval: Duration,
77    /// Checkpoint interval
78    pub checkpoint_interval: Duration,
79    /// Directory for WAL segments
80    pub wal_dir: PathBuf,
81    /// Sync on every write
82    pub sync_on_write: bool,
83    /// Preallocate segment files
84    pub preallocate: bool,
85}
86
87impl Default for SegmentConfig {
88    fn default() -> Self {
89        Self {
90            max_size: DEFAULT_SEGMENT_MAX_SIZE,
91            rotation_interval: DEFAULT_ROTATION_INTERVAL,
92            checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
93            wal_dir: PathBuf::from("wal"),
94            sync_on_write: true,
95            preallocate: true,
96        }
97    }
98}
99
100impl SegmentConfig {
101    pub fn with_wal_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
102        self.wal_dir = dir.as_ref().to_path_buf();
103        self
104    }
105
106    pub fn with_max_size(mut self, size: u64) -> Self {
107        self.max_size = size;
108        self
109    }
110}
111
112/// Segment header stored at the beginning of each segment file
113#[derive(Debug, Clone)]
114pub struct SegmentHeader {
115    /// Magic number
116    pub magic: u32,
117    /// Version
118    pub version: u16,
119    /// Flags
120    pub flags: u16,
121    /// Segment sequence number
122    pub sequence: u64,
123    /// First LSN in this segment
124    pub first_lsn: u64,
125    /// Creation timestamp (Unix millis)
126    pub created_at: u64,
127    /// Reserved for future use
128    pub reserved: [u8; 8],
129}
130
131impl SegmentHeader {
132    fn new(sequence: u64, first_lsn: u64) -> Self {
133        let now = SystemTime::now()
134            .duration_since(UNIX_EPOCH)
135            .map(|d| d.as_millis() as u64)
136            .unwrap_or(0);
137
138        Self {
139            magic: SEGMENT_MAGIC,
140            version: SEGMENT_VERSION,
141            flags: 0,
142            sequence,
143            first_lsn,
144            created_at: now,
145            reserved: [0; 8],
146        }
147    }
148
149    fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
150        let mut buf = [0u8; SEGMENT_HEADER_SIZE];
151        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
152        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
153        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
154        buf[8..16].copy_from_slice(&self.sequence.to_le_bytes());
155        buf[16..24].copy_from_slice(&self.first_lsn.to_le_bytes());
156        buf[24..32].copy_from_slice(&self.created_at.to_le_bytes());
157        buf
158    }
159
160    fn decode(buf: &[u8]) -> Option<Self> {
161        if buf.len() < SEGMENT_HEADER_SIZE {
162            return None;
163        }
164
165        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
166        if magic != SEGMENT_MAGIC {
167            return None;
168        }
169
170        // Fail fast on an incompatible (newer) on-disk version rather than
171        // silently misinterpreting a layout we do not understand. Treating an
172        // unknown version like a magic mismatch (returning `None`) stops replay
173        // at this segment instead of decoding newer fields as if they were the
174        // current layout, which could corrupt recovery.
175        let version = u16::from_le_bytes([buf[4], buf[5]]);
176        if version > SEGMENT_VERSION {
177            return None;
178        }
179
180        Some(Self {
181            magic,
182            version,
183            flags: u16::from_le_bytes([buf[6], buf[7]]),
184            sequence: u64::from_le_bytes([
185                buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
186            ]),
187            first_lsn: u64::from_le_bytes([
188                buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23],
189            ]),
190            created_at: u64::from_le_bytes([
191                buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31],
192            ]),
193            reserved: [0; 8],
194        })
195    }
196}
197
198/// Active WAL segment being written to
199struct ActiveSegment {
200    /// Segment file
201    file: BufWriter<File>,
202    /// Segment path
203    path: PathBuf,
204    /// Segment header
205    header: SegmentHeader,
206    /// Current offset within segment
207    offset: u64,
208    /// Creation time for rotation
209    created_at: Instant,
210}
211
212/// Checkpoint record stored in checkpoint file
213#[derive(Debug, Clone)]
214pub struct CheckpointRecord {
215    /// Checkpoint LSN (all entries before this are flushed)
216    pub lsn: u64,
217    /// Last segment sequence that can be deleted
218    pub last_segment: u64,
219    /// Checkpoint timestamp
220    pub timestamp: u64,
221    /// Memtable state checksum (for validation)
222    pub memtable_checksum: u64,
223    /// Number of entries checkpointed
224    pub entry_count: u64,
225}
226
227impl CheckpointRecord {
228    fn encode(&self) -> Vec<u8> {
229        let mut buf = Vec::with_capacity(48);
230        buf.extend_from_slice(&CHECKPOINT_MAGIC.to_le_bytes());
231        buf.extend_from_slice(&self.lsn.to_le_bytes());
232        buf.extend_from_slice(&self.last_segment.to_le_bytes());
233        buf.extend_from_slice(&self.timestamp.to_le_bytes());
234        buf.extend_from_slice(&self.memtable_checksum.to_le_bytes());
235        buf.extend_from_slice(&self.entry_count.to_le_bytes());
236        // Add checksum of the record itself
237        let checksum = crc32fast::hash(&buf);
238        buf.extend_from_slice(&checksum.to_le_bytes());
239        buf
240    }
241
242    fn decode(buf: &[u8]) -> Option<Self> {
243        if buf.len() < 48 {
244            return None;
245        }
246
247        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
248        if magic != CHECKPOINT_MAGIC {
249            return None;
250        }
251
252        // Verify checksum
253        let stored_checksum = u32::from_le_bytes([buf[44], buf[45], buf[46], buf[47]]);
254        let computed_checksum = crc32fast::hash(&buf[0..44]);
255        if stored_checksum != computed_checksum {
256            return None;
257        }
258
259        Some(Self {
260            lsn: u64::from_le_bytes([
261                buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11],
262            ]),
263            last_segment: u64::from_le_bytes([
264                buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18], buf[19],
265            ]),
266            timestamp: u64::from_le_bytes([
267                buf[20], buf[21], buf[22], buf[23], buf[24], buf[25], buf[26], buf[27],
268            ]),
269            memtable_checksum: u64::from_le_bytes([
270                buf[28], buf[29], buf[30], buf[31], buf[32], buf[33], buf[34], buf[35],
271            ]),
272            entry_count: u64::from_le_bytes([
273                buf[36], buf[37], buf[38], buf[39], buf[40], buf[41], buf[42], buf[43],
274            ]),
275        })
276    }
277}
278
279/// WAL Segment Manager
280///
281/// Handles WAL segmentation, rotation, and cleanup.
282pub struct WalSegmentManager {
283    /// Configuration
284    config: SegmentConfig,
285    /// Current active segment
286    active: Mutex<Option<ActiveSegment>>,
287    /// Current LSN
288    current_lsn: AtomicU64,
289    /// Current segment sequence
290    segment_sequence: AtomicU64,
291    /// All segment metadata (sequence -> header)
292    segments: RwLock<BTreeMap<u64, SegmentMetadata>>,
293    /// Last checkpoint record
294    last_checkpoint: RwLock<Option<CheckpointRecord>>,
295    /// Shutdown flag
296    shutdown: AtomicBool,
297}
298
299/// Metadata for a WAL segment
300#[derive(Debug, Clone)]
301pub struct SegmentMetadata {
302    /// Segment sequence number
303    pub sequence: u64,
304    /// First LSN in segment
305    pub first_lsn: u64,
306    /// Last LSN in segment (None if active)
307    pub last_lsn: Option<u64>,
308    /// File path
309    pub path: PathBuf,
310    /// File size
311    pub size: u64,
312    /// Is this the active segment
313    pub is_active: bool,
314}
315
316impl WalSegmentManager {
317    /// Create a new WAL segment manager
318    pub fn new(config: SegmentConfig) -> std::io::Result<Self> {
319        // Ensure WAL directory exists
320        fs::create_dir_all(&config.wal_dir)?;
321
322        let manager = Self {
323            config,
324            active: Mutex::new(None),
325            current_lsn: AtomicU64::new(0),
326            segment_sequence: AtomicU64::new(0),
327            segments: RwLock::new(BTreeMap::new()),
328            last_checkpoint: RwLock::new(None),
329            shutdown: AtomicBool::new(false),
330        };
331
332        // Load existing segments and checkpoint
333        manager.recover()?;
334
335        Ok(manager)
336    }
337
338    /// Recover from existing WAL segments
339    fn recover(&self) -> std::io::Result<()> {
340        // Load checkpoint if exists
341        let checkpoint_path = self.config.wal_dir.join("checkpoint");
342        if checkpoint_path.exists() {
343            let mut file = File::open(&checkpoint_path)?;
344            let mut buf = Vec::new();
345            file.read_to_end(&mut buf)?;
346            if let Some(record) = CheckpointRecord::decode(&buf) {
347                *self.last_checkpoint.write() = Some(record);
348            }
349        }
350
351        // Scan for existing segments
352        let entries = fs::read_dir(&self.config.wal_dir)?;
353        let mut max_sequence = 0u64;
354        let mut max_lsn = 0u64;
355
356        for entry in entries {
357            let entry = entry?;
358            let path = entry.path();
359
360            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
361                if name.starts_with("segment_") && name.ends_with(".wal") {
362                    // Parse segment file
363                    let mut file = File::open(&path)?;
364                    let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
365                    if file.read_exact(&mut header_buf).is_ok() {
366                        if let Some(header) = SegmentHeader::decode(&header_buf) {
367                            max_sequence = max_sequence.max(header.sequence);
368                            max_lsn = max_lsn.max(header.first_lsn);
369
370                            let metadata = file.metadata()?;
371                            self.segments.write().insert(
372                                header.sequence,
373                                SegmentMetadata {
374                                    sequence: header.sequence,
375                                    first_lsn: header.first_lsn,
376                                    last_lsn: None,
377                                    path: path.clone(),
378                                    size: metadata.len(),
379                                    is_active: false,
380                                },
381                            );
382                        }
383                    }
384                }
385            }
386        }
387
388        // Set counters
389        self.segment_sequence
390            .store(max_sequence + 1, Ordering::SeqCst);
391        self.current_lsn.store(max_lsn, Ordering::SeqCst);
392
393        Ok(())
394    }
395
396    /// Append a record to the WAL, returns the LSN
397    pub fn append(&self, data: &[u8]) -> std::io::Result<u64> {
398        let mut active = self.active.lock();
399
400        // Rotate if needed
401        if self.needs_rotation(&active) {
402            self.rotate_segment(&mut active)?;
403        }
404
405        // Ensure we have an active segment
406        if active.is_none() {
407            self.create_new_segment(&mut active)?;
408        }
409
410        let segment = active.as_mut().unwrap();
411
412        // Assign LSN
413        let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
414
415        // Write record: [length: u32][lsn: u64][data][checksum: u32]
416        let record_len = 4 + 8 + data.len() + 4;
417        let mut record = Vec::with_capacity(record_len);
418        record.extend_from_slice(&(data.len() as u32).to_le_bytes());
419        record.extend_from_slice(&lsn.to_le_bytes());
420        record.extend_from_slice(data);
421        let checksum = crc32fast::hash(&record);
422        record.extend_from_slice(&checksum.to_le_bytes());
423
424        segment.file.write_all(&record)?;
425        segment.offset += record_len as u64;
426
427        if self.config.sync_on_write {
428            // `flush()` only pushes the BufWriter into the kernel page cache;
429            // a true "sync on every write" durability guarantee additionally
430            // needs an fsync, matching rotate_segment()/shutdown(). Without it
431            // an acknowledged write is lost on power loss.
432            segment.file.flush()?;
433            segment.file.get_ref().sync_all()?;
434        }
435
436        Ok(lsn)
437    }
438
439    /// Check if segment needs rotation
440    fn needs_rotation(&self, active: &Option<ActiveSegment>) -> bool {
441        match active {
442            Some(segment) => {
443                segment.offset >= self.config.max_size
444                    || segment.created_at.elapsed() >= self.config.rotation_interval
445            }
446            None => false,
447        }
448    }
449
450    /// Rotate to a new segment
451    fn rotate_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
452        if let Some(mut segment) = active.take() {
453            // Flush and sync the old segment
454            segment.file.flush()?;
455            segment
456                .file
457                .into_inner()
458                .map_err(|e| e.into_error())?
459                .sync_all()?;
460
461            // Update metadata to mark as not active
462            let current_lsn = self.current_lsn.load(Ordering::SeqCst);
463            if let Some(meta) = self.segments.write().get_mut(&segment.header.sequence) {
464                meta.is_active = false;
465                meta.last_lsn = Some(current_lsn);
466                meta.size = segment.offset;
467            }
468        }
469
470        Ok(())
471    }
472
473    /// Create a new segment
474    fn create_new_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
475        let sequence = self.segment_sequence.fetch_add(1, Ordering::SeqCst);
476        let first_lsn = self.current_lsn.load(Ordering::SeqCst);
477
478        let path = self
479            .config
480            .wal_dir
481            .join(format!("segment_{:016x}.wal", sequence));
482
483        let file = OpenOptions::new()
484            .create(true)
485            .write(true)
486            .truncate(true)
487            .open(&path)?;
488
489        // Preallocate if configured
490        if self.config.preallocate {
491            file.set_len(self.config.max_size)?;
492        }
493
494        let mut writer = BufWriter::new(file);
495
496        // Write header
497        let header = SegmentHeader::new(sequence, first_lsn);
498        writer.write_all(&header.encode())?;
499
500        let segment = ActiveSegment {
501            file: writer,
502            path: path.clone(),
503            header: header.clone(),
504            offset: SEGMENT_HEADER_SIZE as u64,
505            created_at: Instant::now(),
506        };
507
508        // Add to segments map
509        self.segments.write().insert(
510            sequence,
511            SegmentMetadata {
512                sequence,
513                first_lsn,
514                last_lsn: None,
515                path,
516                size: SEGMENT_HEADER_SIZE as u64,
517                is_active: true,
518            },
519        );
520
521        *active = Some(segment);
522
523        Ok(())
524    }
525
526    /// Create a fuzzy checkpoint
527    ///
528    /// This captures the current LSN and can be used to cleanup old segments.
529    pub fn create_checkpoint(
530        &self,
531        memtable_checksum: u64,
532        entry_count: u64,
533    ) -> std::io::Result<CheckpointRecord> {
534        let lsn = self.current_lsn.load(Ordering::SeqCst);
535
536        // Find the last segment that is fully before this LSN
537        let segments = self.segments.read();
538        let last_segment = segments
539            .values()
540            .filter(|s| s.last_lsn.map(|l| l < lsn).unwrap_or(false))
541            .map(|s| s.sequence)
542            .max()
543            .unwrap_or(0);
544
545        let now = SystemTime::now()
546            .duration_since(UNIX_EPOCH)
547            .map(|d| d.as_millis() as u64)
548            .unwrap_or(0);
549
550        let record = CheckpointRecord {
551            lsn,
552            last_segment,
553            timestamp: now,
554            memtable_checksum,
555            entry_count,
556        };
557
558        // Write checkpoint file
559        let checkpoint_path = self.config.wal_dir.join("checkpoint");
560        let temp_path = self.config.wal_dir.join("checkpoint.tmp");
561
562        let mut file = File::create(&temp_path)?;
563        file.write_all(&record.encode())?;
564        file.sync_all()?;
565
566        fs::rename(&temp_path, &checkpoint_path)?;
567
568        *self.last_checkpoint.write() = Some(record.clone());
569
570        Ok(record)
571    }
572
573    /// Cleanup old segments that are no longer needed
574    pub fn cleanup_old_segments(&self) -> std::io::Result<usize> {
575        let checkpoint = self.last_checkpoint.read().clone();
576
577        let last_safe_segment = match checkpoint {
578            Some(cp) => cp.last_segment,
579            None => return Ok(0),
580        };
581
582        let mut segments = self.segments.write();
583        let old_segments: Vec<u64> = segments
584            .keys()
585            .filter(|&&seq| seq <= last_safe_segment)
586            .copied()
587            .collect();
588
589        let mut cleaned = 0;
590        for sequence in old_segments {
591            if let Some(meta) = segments.remove(&sequence) {
592                if meta.path.exists() {
593                    fs::remove_file(&meta.path)?;
594                    cleaned += 1;
595                }
596            }
597        }
598
599        Ok(cleaned)
600    }
601
602    /// Get statistics
603    pub fn stats(&self) -> SegmentStats {
604        let segments = self.segments.read();
605        let total_size: u64 = segments.values().map(|s| s.size).sum();
606        let checkpoint = self.last_checkpoint.read().clone();
607
608        SegmentStats {
609            segment_count: segments.len(),
610            total_size,
611            current_lsn: self.current_lsn.load(Ordering::SeqCst),
612            current_sequence: self.segment_sequence.load(Ordering::SeqCst),
613            last_checkpoint_lsn: checkpoint.as_ref().map(|c| c.lsn),
614        }
615    }
616
617    /// Iterator for recovery
618    pub fn recovery_iterator(&self, from_lsn: u64) -> RecoveryIterator<'_> {
619        RecoveryIterator::new(self, from_lsn)
620    }
621
622    /// Flush all pending writes
623    pub fn flush(&self) -> std::io::Result<()> {
624        let mut active = self.active.lock();
625        if let Some(ref mut segment) = *active {
626            segment.file.flush()?;
627        }
628        Ok(())
629    }
630
631    /// Shutdown the segment manager
632    pub fn shutdown(&self) -> std::io::Result<()> {
633        self.shutdown.store(true, Ordering::SeqCst);
634
635        let mut active = self.active.lock();
636        if let Some(mut segment) = active.take() {
637            segment.file.flush()?;
638            segment
639                .file
640                .into_inner()
641                .map_err(|e| e.into_error())?
642                .sync_all()?;
643        }
644
645        Ok(())
646    }
647}
648
649/// Statistics for WAL segments
650#[derive(Debug, Clone)]
651pub struct SegmentStats {
652    /// Number of segments
653    pub segment_count: usize,
654    /// Total size of all segments
655    pub total_size: u64,
656    /// Current LSN
657    pub current_lsn: u64,
658    /// Current segment sequence
659    pub current_sequence: u64,
660    /// Last checkpoint LSN
661    pub last_checkpoint_lsn: Option<u64>,
662}
663
664/// Recovery iterator for replaying WAL entries
665pub struct RecoveryIterator<'a> {
666    manager: &'a WalSegmentManager,
667    current_segment_idx: usize,
668    segment_sequences: Vec<u64>,
669    current_reader: Option<BufReader<File>>,
670    current_offset: u64,
671    from_lsn: u64,
672}
673
674impl<'a> RecoveryIterator<'a> {
675    fn new(manager: &'a WalSegmentManager, from_lsn: u64) -> Self {
676        let segments = manager.segments.read();
677        let mut sequences: Vec<u64> = segments
678            .values()
679            .filter(|s| {
680                s.first_lsn >= from_lsn || s.last_lsn.map(|l| l >= from_lsn).unwrap_or(true)
681            })
682            .map(|s| s.sequence)
683            .collect();
684        sequences.sort();
685
686        Self {
687            manager,
688            current_segment_idx: 0,
689            segment_sequences: sequences,
690            current_reader: None,
691            current_offset: SEGMENT_HEADER_SIZE as u64,
692            from_lsn,
693        }
694    }
695
696    /// Get next WAL entry
697    pub fn next_entry(&mut self) -> std::io::Result<Option<WalEntry>> {
698        loop {
699            // Open segment if needed
700            if self.current_reader.is_none() {
701                if self.current_segment_idx >= self.segment_sequences.len() {
702                    return Ok(None);
703                }
704
705                let sequence = self.segment_sequences[self.current_segment_idx];
706                let segments = self.manager.segments.read();
707                if let Some(meta) = segments.get(&sequence) {
708                    let file = File::open(&meta.path)?;
709                    let mut reader = BufReader::new(file);
710                    reader.seek(SeekFrom::Start(SEGMENT_HEADER_SIZE as u64))?;
711                    self.current_reader = Some(reader);
712                    self.current_offset = SEGMENT_HEADER_SIZE as u64;
713                } else {
714                    self.current_segment_idx += 1;
715                    continue;
716                }
717            }
718
719            let reader = self.current_reader.as_mut().unwrap();
720
721            // Try to read entry
722            let mut len_buf = [0u8; 4];
723            match reader.read_exact(&mut len_buf) {
724                Ok(_) => {}
725                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
726                    // Move to next segment
727                    self.current_reader = None;
728                    self.current_segment_idx += 1;
729                    continue;
730                }
731                Err(e) => return Err(e),
732            }
733
734            let data_len = u32::from_le_bytes(len_buf) as usize;
735            if data_len == 0 || data_len > 100 * 1024 * 1024 {
736                // Invalid or end of segment
737                self.current_reader = None;
738                self.current_segment_idx += 1;
739                continue;
740            }
741
742            // Read LSN
743            let mut lsn_buf = [0u8; 8];
744            reader.read_exact(&mut lsn_buf)?;
745            let lsn = u64::from_le_bytes(lsn_buf);
746
747            // Read data
748            let mut data = vec![0u8; data_len];
749            reader.read_exact(&mut data)?;
750
751            // Read and verify checksum
752            let mut checksum_buf = [0u8; 4];
753            reader.read_exact(&mut checksum_buf)?;
754            let stored_checksum = u32::from_le_bytes(checksum_buf);
755
756            let mut verify_buf = Vec::with_capacity(4 + 8 + data_len);
757            verify_buf.extend_from_slice(&len_buf);
758            verify_buf.extend_from_slice(&lsn_buf);
759            verify_buf.extend_from_slice(&data);
760            let computed_checksum = crc32fast::hash(&verify_buf);
761
762            if stored_checksum != computed_checksum {
763                return Err(std::io::Error::new(
764                    std::io::ErrorKind::InvalidData,
765                    "WAL entry checksum mismatch",
766                ));
767            }
768
769            self.current_offset += (4 + 8 + data_len + 4) as u64;
770
771            // Skip entries before from_lsn
772            if lsn < self.from_lsn {
773                continue;
774            }
775
776            return Ok(Some(WalEntry { lsn, data }));
777        }
778    }
779}
780
781/// A WAL entry for recovery
782#[derive(Debug, Clone)]
783pub struct WalEntry {
784    /// LSN of this entry
785    pub lsn: u64,
786    /// Entry data
787    pub data: Vec<u8>,
788}
789
790// =============================================================================
791// Tests
792// =============================================================================
793
794#[cfg(test)]
795mod tests {
796    use super::*;
797    use tempfile::tempdir;
798
799    #[test]
800    fn test_segment_manager_basic() {
801        let dir = tempdir().unwrap();
802        let config = SegmentConfig::default()
803            .with_wal_dir(dir.path())
804            .with_max_size(1024);
805
806        let manager = WalSegmentManager::new(config).unwrap();
807
808        // Append some entries
809        for i in 0..100 {
810            let data = format!("entry_{}", i);
811            let lsn = manager.append(data.as_bytes()).unwrap();
812            assert_eq!(lsn, i as u64);
813        }
814
815        let stats = manager.stats();
816        assert!(stats.segment_count > 0);
817        assert_eq!(stats.current_lsn, 100);
818
819        manager.shutdown().unwrap();
820    }
821
822    #[test]
823    fn test_checkpoint_and_cleanup() {
824        let dir = tempdir().unwrap();
825        let config = SegmentConfig::default()
826            .with_wal_dir(dir.path())
827            .with_max_size(256);
828
829        let manager = WalSegmentManager::new(config).unwrap();
830
831        // Append enough to create multiple segments
832        for i in 0..50 {
833            let data = format!("entry_{:04}", i);
834            manager.append(data.as_bytes()).unwrap();
835        }
836
837        // Force rotation
838        manager.flush().unwrap();
839
840        // Create checkpoint
841        let checkpoint = manager.create_checkpoint(12345, 50).unwrap();
842        assert!(checkpoint.lsn > 0);
843
844        // Cleanup should work
845        let cleaned = manager.cleanup_old_segments().unwrap();
846        // May or may not clean depending on segment boundaries;
847        // cleaned is unsigned, so just ensure cleanup returned a count.
848        let _ = cleaned;
849
850        manager.shutdown().unwrap();
851    }
852
853    #[test]
854    fn test_recovery() {
855        let dir = tempdir().unwrap();
856        let config = SegmentConfig::default().with_wal_dir(dir.path());
857
858        // Write some data
859        {
860            let manager = WalSegmentManager::new(config.clone()).unwrap();
861            for i in 0..10 {
862                let data = format!("data_{}", i);
863                manager.append(data.as_bytes()).unwrap();
864            }
865            manager.shutdown().unwrap();
866        }
867
868        // Recover and verify
869        {
870            let manager = WalSegmentManager::new(config).unwrap();
871            let mut iter = manager.recovery_iterator(0);
872            let mut count = 0;
873
874            while let Some(entry) = iter.next_entry().unwrap() {
875                let data = String::from_utf8_lossy(&entry.data);
876                assert!(data.starts_with("data_"));
877                count += 1;
878            }
879
880            assert_eq!(count, 10);
881        }
882    }
883
884    #[test]
885    fn test_segment_header_encoding() {
886        let header = SegmentHeader::new(42, 12345);
887        let encoded = header.encode();
888        let decoded = SegmentHeader::decode(&encoded).unwrap();
889
890        assert_eq!(decoded.magic, SEGMENT_MAGIC);
891        assert_eq!(decoded.sequence, 42);
892        assert_eq!(decoded.first_lsn, 12345);
893    }
894
895    #[test]
896    fn test_segment_header_rejects_future_version() {
897        // A segment written by a hypothetical newer release must not be
898        // decoded with the current layout — doing so could misinterpret bytes
899        // during replay and corrupt recovery. We treat it like a magic
900        // mismatch (None) so replay stops at this segment instead.
901        let mut encoded = SegmentHeader::new(1, 1).encode();
902        let future = SEGMENT_VERSION + 1;
903        encoded[4..6].copy_from_slice(&future.to_le_bytes());
904
905        assert!(SegmentHeader::decode(&encoded).is_none());
906    }
907
908    #[test]
909    fn test_checkpoint_record_encoding() {
910        let record = CheckpointRecord {
911            lsn: 1000,
912            last_segment: 5,
913            timestamp: 123456789,
914            memtable_checksum: 0xDEADBEEF,
915            entry_count: 500,
916        };
917
918        let encoded = record.encode();
919        let decoded = CheckpointRecord::decode(&encoded).unwrap();
920
921        assert_eq!(decoded.lsn, 1000);
922        assert_eq!(decoded.last_segment, 5);
923        assert_eq!(decoded.memtable_checksum, 0xDEADBEEF);
924        assert_eq!(decoded.entry_count, 500);
925    }
926}