sochdb_storage/
wal_segment.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! WAL Segmentation and Checkpoint Manager
16//!
17//! This module provides bounded recovery time through:
18//! - WAL file segmentation (rotate after size/time threshold)
19//! - Fuzzy checkpointing (no blocking writes)
20//! - Automatic old segment cleanup after checkpoint
21//!
22//! ## Architecture
23//!
24//! ```text
25//! Active Writes → Current Segment → Rotation → Archived Segment
26//!                                                      ↓
27//!                                              Checkpoint
28//!                                                      ↓
29//!                                              Segment Cleanup
30//! ```
31//!
32//! ## Recovery Time Bound
33//!
34//! Recovery time is bounded by segment_max_size / disk_bandwidth.
35//! With 64MB segments and 400 MB/s sequential read, recovery ≤ 160ms.
36
37use std::collections::BTreeMap;
38use std::fs::{self, File, OpenOptions};
39use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
40use std::path::{Path, PathBuf};
41use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
42use std::sync::Arc;
43use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
44
45use parking_lot::{Mutex, RwLock};
46
47/// Default maximum segment size (64 MB)
48pub const DEFAULT_SEGMENT_MAX_SIZE: u64 = 64 * 1024 * 1024;
49
50/// Default segment rotation interval (5 minutes)
51pub const DEFAULT_ROTATION_INTERVAL: Duration = Duration::from_secs(300);
52
53/// Default checkpoint interval (1 minute)
54pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
55
56/// Segment file header magic
57const SEGMENT_MAGIC: u32 = 0x574C5347; // "WLSG"
58
59/// Segment header version
60const SEGMENT_VERSION: u16 = 1;
61
62/// Segment header size
63const SEGMENT_HEADER_SIZE: usize = 32;
64
65/// Checkpoint file magic
66const CHECKPOINT_MAGIC: u32 = 0x43484B50; // "CHKP"
67
68/// WAL segment configuration
69#[derive(Debug, Clone)]
70pub struct SegmentConfig {
71    /// Maximum segment size before rotation
72    pub max_size: u64,
73    /// Maximum time before rotation
74    pub rotation_interval: Duration,
75    /// Checkpoint interval
76    pub checkpoint_interval: Duration,
77    /// Directory for WAL segments
78    pub wal_dir: PathBuf,
79    /// Sync on every write
80    pub sync_on_write: bool,
81    /// Preallocate segment files
82    pub preallocate: bool,
83}
84
85impl Default for SegmentConfig {
86    fn default() -> Self {
87        Self {
88            max_size: DEFAULT_SEGMENT_MAX_SIZE,
89            rotation_interval: DEFAULT_ROTATION_INTERVAL,
90            checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
91            wal_dir: PathBuf::from("wal"),
92            sync_on_write: true,
93            preallocate: true,
94        }
95    }
96}
97
98impl SegmentConfig {
99    pub fn with_wal_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
100        self.wal_dir = dir.as_ref().to_path_buf();
101        self
102    }
103
104    pub fn with_max_size(mut self, size: u64) -> Self {
105        self.max_size = size;
106        self
107    }
108}
109
110/// Segment header stored at the beginning of each segment file
111#[derive(Debug, Clone)]
112pub struct SegmentHeader {
113    /// Magic number
114    pub magic: u32,
115    /// Version
116    pub version: u16,
117    /// Flags
118    pub flags: u16,
119    /// Segment sequence number
120    pub sequence: u64,
121    /// First LSN in this segment
122    pub first_lsn: u64,
123    /// Creation timestamp (Unix millis)
124    pub created_at: u64,
125    /// Reserved for future use
126    pub reserved: [u8; 8],
127}
128
129impl SegmentHeader {
130    fn new(sequence: u64, first_lsn: u64) -> Self {
131        let now = SystemTime::now()
132            .duration_since(UNIX_EPOCH)
133            .map(|d| d.as_millis() as u64)
134            .unwrap_or(0);
135
136        Self {
137            magic: SEGMENT_MAGIC,
138            version: SEGMENT_VERSION,
139            flags: 0,
140            sequence,
141            first_lsn,
142            created_at: now,
143            reserved: [0; 8],
144        }
145    }
146
147    fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
148        let mut buf = [0u8; SEGMENT_HEADER_SIZE];
149        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
150        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
151        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
152        buf[8..16].copy_from_slice(&self.sequence.to_le_bytes());
153        buf[16..24].copy_from_slice(&self.first_lsn.to_le_bytes());
154        buf[24..32].copy_from_slice(&self.created_at.to_le_bytes());
155        buf
156    }
157
158    fn decode(buf: &[u8]) -> Option<Self> {
159        if buf.len() < SEGMENT_HEADER_SIZE {
160            return None;
161        }
162
163        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
164        if magic != SEGMENT_MAGIC {
165            return None;
166        }
167
168        Some(Self {
169            magic,
170            version: u16::from_le_bytes([buf[4], buf[5]]),
171            flags: u16::from_le_bytes([buf[6], buf[7]]),
172            sequence: u64::from_le_bytes([buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]]),
173            first_lsn: u64::from_le_bytes([buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23]]),
174            created_at: u64::from_le_bytes([buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31]]),
175            reserved: [0; 8],
176        })
177    }
178}
179
180/// Active WAL segment being written to
181struct ActiveSegment {
182    /// Segment file
183    file: BufWriter<File>,
184    /// Segment path
185    path: PathBuf,
186    /// Segment header
187    header: SegmentHeader,
188    /// Current offset within segment
189    offset: u64,
190    /// Creation time for rotation
191    created_at: Instant,
192}
193
194/// Checkpoint record stored in checkpoint file
195#[derive(Debug, Clone)]
196pub struct CheckpointRecord {
197    /// Checkpoint LSN (all entries before this are flushed)
198    pub lsn: u64,
199    /// Last segment sequence that can be deleted
200    pub last_segment: u64,
201    /// Checkpoint timestamp
202    pub timestamp: u64,
203    /// Memtable state checksum (for validation)
204    pub memtable_checksum: u64,
205    /// Number of entries checkpointed
206    pub entry_count: u64,
207}
208
209impl CheckpointRecord {
210    fn encode(&self) -> Vec<u8> {
211        let mut buf = Vec::with_capacity(48);
212        buf.extend_from_slice(&CHECKPOINT_MAGIC.to_le_bytes());
213        buf.extend_from_slice(&self.lsn.to_le_bytes());
214        buf.extend_from_slice(&self.last_segment.to_le_bytes());
215        buf.extend_from_slice(&self.timestamp.to_le_bytes());
216        buf.extend_from_slice(&self.memtable_checksum.to_le_bytes());
217        buf.extend_from_slice(&self.entry_count.to_le_bytes());
218        // Add checksum of the record itself
219        let checksum = crc32fast::hash(&buf);
220        buf.extend_from_slice(&checksum.to_le_bytes());
221        buf
222    }
223
224    fn decode(buf: &[u8]) -> Option<Self> {
225        if buf.len() < 48 {
226            return None;
227        }
228
229        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
230        if magic != CHECKPOINT_MAGIC {
231            return None;
232        }
233
234        // Verify checksum
235        let stored_checksum = u32::from_le_bytes([buf[44], buf[45], buf[46], buf[47]]);
236        let computed_checksum = crc32fast::hash(&buf[0..44]);
237        if stored_checksum != computed_checksum {
238            return None;
239        }
240
241        Some(Self {
242            lsn: u64::from_le_bytes([buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11]]),
243            last_segment: u64::from_le_bytes([buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18], buf[19]]),
244            timestamp: u64::from_le_bytes([buf[20], buf[21], buf[22], buf[23], buf[24], buf[25], buf[26], buf[27]]),
245            memtable_checksum: u64::from_le_bytes([buf[28], buf[29], buf[30], buf[31], buf[32], buf[33], buf[34], buf[35]]),
246            entry_count: u64::from_le_bytes([buf[36], buf[37], buf[38], buf[39], buf[40], buf[41], buf[42], buf[43]]),
247        })
248    }
249}
250
251/// WAL Segment Manager
252///
253/// Handles WAL segmentation, rotation, and cleanup.
254pub struct WalSegmentManager {
255    /// Configuration
256    config: SegmentConfig,
257    /// Current active segment
258    active: Mutex<Option<ActiveSegment>>,
259    /// Current LSN
260    current_lsn: AtomicU64,
261    /// Current segment sequence
262    segment_sequence: AtomicU64,
263    /// All segment metadata (sequence -> header)
264    segments: RwLock<BTreeMap<u64, SegmentMetadata>>,
265    /// Last checkpoint record
266    last_checkpoint: RwLock<Option<CheckpointRecord>>,
267    /// Shutdown flag
268    shutdown: AtomicBool,
269}
270
271/// Metadata for a WAL segment
272#[derive(Debug, Clone)]
273pub struct SegmentMetadata {
274    /// Segment sequence number
275    pub sequence: u64,
276    /// First LSN in segment
277    pub first_lsn: u64,
278    /// Last LSN in segment (None if active)
279    pub last_lsn: Option<u64>,
280    /// File path
281    pub path: PathBuf,
282    /// File size
283    pub size: u64,
284    /// Is this the active segment
285    pub is_active: bool,
286}
287
288impl WalSegmentManager {
289    /// Create a new WAL segment manager
290    pub fn new(config: SegmentConfig) -> std::io::Result<Self> {
291        // Ensure WAL directory exists
292        fs::create_dir_all(&config.wal_dir)?;
293
294        let manager = Self {
295            config,
296            active: Mutex::new(None),
297            current_lsn: AtomicU64::new(0),
298            segment_sequence: AtomicU64::new(0),
299            segments: RwLock::new(BTreeMap::new()),
300            last_checkpoint: RwLock::new(None),
301            shutdown: AtomicBool::new(false),
302        };
303
304        // Load existing segments and checkpoint
305        manager.recover()?;
306
307        Ok(manager)
308    }
309
310    /// Recover from existing WAL segments
311    fn recover(&self) -> std::io::Result<()> {
312        // Load checkpoint if exists
313        let checkpoint_path = self.config.wal_dir.join("checkpoint");
314        if checkpoint_path.exists() {
315            let mut file = File::open(&checkpoint_path)?;
316            let mut buf = Vec::new();
317            file.read_to_end(&mut buf)?;
318            if let Some(record) = CheckpointRecord::decode(&buf) {
319                *self.last_checkpoint.write() = Some(record);
320            }
321        }
322
323        // Scan for existing segments
324        let entries = fs::read_dir(&self.config.wal_dir)?;
325        let mut max_sequence = 0u64;
326        let mut max_lsn = 0u64;
327
328        for entry in entries {
329            let entry = entry?;
330            let path = entry.path();
331
332            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
333                if name.starts_with("segment_") && name.ends_with(".wal") {
334                    // Parse segment file
335                    let mut file = File::open(&path)?;
336                    let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
337                    if file.read_exact(&mut header_buf).is_ok() {
338                        if let Some(header) = SegmentHeader::decode(&header_buf) {
339                            max_sequence = max_sequence.max(header.sequence);
340                            max_lsn = max_lsn.max(header.first_lsn);
341
342                            let metadata = file.metadata()?;
343                            self.segments.write().insert(header.sequence, SegmentMetadata {
344                                sequence: header.sequence,
345                                first_lsn: header.first_lsn,
346                                last_lsn: None,
347                                path: path.clone(),
348                                size: metadata.len(),
349                                is_active: false,
350                            });
351                        }
352                    }
353                }
354            }
355        }
356
357        // Set counters
358        self.segment_sequence.store(max_sequence + 1, Ordering::SeqCst);
359        self.current_lsn.store(max_lsn, Ordering::SeqCst);
360
361        Ok(())
362    }
363
364    /// Append a record to the WAL, returns the LSN
365    pub fn append(&self, data: &[u8]) -> std::io::Result<u64> {
366        let mut active = self.active.lock();
367
368        // Rotate if needed
369        if self.needs_rotation(&active) {
370            self.rotate_segment(&mut active)?;
371        }
372
373        // Ensure we have an active segment
374        if active.is_none() {
375            self.create_new_segment(&mut active)?;
376        }
377
378        let segment = active.as_mut().unwrap();
379
380        // Assign LSN
381        let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
382
383        // Write record: [length: u32][lsn: u64][data][checksum: u32]
384        let record_len = 4 + 8 + data.len() + 4;
385        let mut record = Vec::with_capacity(record_len);
386        record.extend_from_slice(&(data.len() as u32).to_le_bytes());
387        record.extend_from_slice(&lsn.to_le_bytes());
388        record.extend_from_slice(data);
389        let checksum = crc32fast::hash(&record);
390        record.extend_from_slice(&checksum.to_le_bytes());
391
392        segment.file.write_all(&record)?;
393        segment.offset += record_len as u64;
394
395        if self.config.sync_on_write {
396            segment.file.flush()?;
397        }
398
399        Ok(lsn)
400    }
401
402    /// Check if segment needs rotation
403    fn needs_rotation(&self, active: &Option<ActiveSegment>) -> bool {
404        match active {
405            Some(segment) => {
406                segment.offset >= self.config.max_size
407                    || segment.created_at.elapsed() >= self.config.rotation_interval
408            }
409            None => false,
410        }
411    }
412
413    /// Rotate to a new segment
414    fn rotate_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
415        if let Some(mut segment) = active.take() {
416            // Flush and sync the old segment
417            segment.file.flush()?;
418            segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
419
420            // Update metadata to mark as not active
421            let current_lsn = self.current_lsn.load(Ordering::SeqCst);
422            if let Some(meta) = self.segments.write().get_mut(&segment.header.sequence) {
423                meta.is_active = false;
424                meta.last_lsn = Some(current_lsn);
425                meta.size = segment.offset;
426            }
427        }
428
429        Ok(())
430    }
431
432    /// Create a new segment
433    fn create_new_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
434        let sequence = self.segment_sequence.fetch_add(1, Ordering::SeqCst);
435        let first_lsn = self.current_lsn.load(Ordering::SeqCst);
436
437        let path = self.config.wal_dir.join(format!("segment_{:016x}.wal", sequence));
438
439        let file = OpenOptions::new()
440            .create(true)
441            .write(true)
442            .truncate(true)
443            .open(&path)?;
444
445        // Preallocate if configured
446        if self.config.preallocate {
447            file.set_len(self.config.max_size)?;
448        }
449
450        let mut writer = BufWriter::new(file);
451
452        // Write header
453        let header = SegmentHeader::new(sequence, first_lsn);
454        writer.write_all(&header.encode())?;
455
456        let segment = ActiveSegment {
457            file: writer,
458            path: path.clone(),
459            header: header.clone(),
460            offset: SEGMENT_HEADER_SIZE as u64,
461            created_at: Instant::now(),
462        };
463
464        // Add to segments map
465        self.segments.write().insert(sequence, SegmentMetadata {
466            sequence,
467            first_lsn,
468            last_lsn: None,
469            path,
470            size: SEGMENT_HEADER_SIZE as u64,
471            is_active: true,
472        });
473
474        *active = Some(segment);
475
476        Ok(())
477    }
478
479    /// Create a fuzzy checkpoint
480    ///
481    /// This captures the current LSN and can be used to cleanup old segments.
482    pub fn create_checkpoint(
483        &self,
484        memtable_checksum: u64,
485        entry_count: u64,
486    ) -> std::io::Result<CheckpointRecord> {
487        let lsn = self.current_lsn.load(Ordering::SeqCst);
488
489        // Find the last segment that is fully before this LSN
490        let segments = self.segments.read();
491        let last_segment = segments
492            .values()
493            .filter(|s| s.last_lsn.map(|l| l < lsn).unwrap_or(false))
494            .map(|s| s.sequence)
495            .max()
496            .unwrap_or(0);
497
498        let now = SystemTime::now()
499            .duration_since(UNIX_EPOCH)
500            .map(|d| d.as_millis() as u64)
501            .unwrap_or(0);
502
503        let record = CheckpointRecord {
504            lsn,
505            last_segment,
506            timestamp: now,
507            memtable_checksum,
508            entry_count,
509        };
510
511        // Write checkpoint file
512        let checkpoint_path = self.config.wal_dir.join("checkpoint");
513        let temp_path = self.config.wal_dir.join("checkpoint.tmp");
514
515        let mut file = File::create(&temp_path)?;
516        file.write_all(&record.encode())?;
517        file.sync_all()?;
518
519        fs::rename(&temp_path, &checkpoint_path)?;
520
521        *self.last_checkpoint.write() = Some(record.clone());
522
523        Ok(record)
524    }
525
526    /// Cleanup old segments that are no longer needed
527    pub fn cleanup_old_segments(&self) -> std::io::Result<usize> {
528        let checkpoint = self.last_checkpoint.read().clone();
529
530        let last_safe_segment = match checkpoint {
531            Some(cp) => cp.last_segment,
532            None => return Ok(0),
533        };
534
535        let mut segments = self.segments.write();
536        let old_segments: Vec<u64> = segments
537            .keys()
538            .filter(|&&seq| seq <= last_safe_segment)
539            .copied()
540            .collect();
541
542        let mut cleaned = 0;
543        for sequence in old_segments {
544            if let Some(meta) = segments.remove(&sequence) {
545                if meta.path.exists() {
546                    fs::remove_file(&meta.path)?;
547                    cleaned += 1;
548                }
549            }
550        }
551
552        Ok(cleaned)
553    }
554
555    /// Get statistics
556    pub fn stats(&self) -> SegmentStats {
557        let segments = self.segments.read();
558        let total_size: u64 = segments.values().map(|s| s.size).sum();
559        let checkpoint = self.last_checkpoint.read().clone();
560
561        SegmentStats {
562            segment_count: segments.len(),
563            total_size,
564            current_lsn: self.current_lsn.load(Ordering::SeqCst),
565            current_sequence: self.segment_sequence.load(Ordering::SeqCst),
566            last_checkpoint_lsn: checkpoint.as_ref().map(|c| c.lsn),
567        }
568    }
569
570    /// Iterator for recovery
571    pub fn recovery_iterator(&self, from_lsn: u64) -> RecoveryIterator {
572        RecoveryIterator::new(self, from_lsn)
573    }
574
575    /// Flush all pending writes
576    pub fn flush(&self) -> std::io::Result<()> {
577        let mut active = self.active.lock();
578        if let Some(ref mut segment) = *active {
579            segment.file.flush()?;
580        }
581        Ok(())
582    }
583
584    /// Shutdown the segment manager
585    pub fn shutdown(&self) -> std::io::Result<()> {
586        self.shutdown.store(true, Ordering::SeqCst);
587
588        let mut active = self.active.lock();
589        if let Some(mut segment) = active.take() {
590            segment.file.flush()?;
591            segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
592        }
593
594        Ok(())
595    }
596}
597
598/// Statistics for WAL segments
599#[derive(Debug, Clone)]
600pub struct SegmentStats {
601    /// Number of segments
602    pub segment_count: usize,
603    /// Total size of all segments
604    pub total_size: u64,
605    /// Current LSN
606    pub current_lsn: u64,
607    /// Current segment sequence
608    pub current_sequence: u64,
609    /// Last checkpoint LSN
610    pub last_checkpoint_lsn: Option<u64>,
611}
612
613/// Recovery iterator for replaying WAL entries
614pub struct RecoveryIterator<'a> {
615    manager: &'a WalSegmentManager,
616    current_segment_idx: usize,
617    segment_sequences: Vec<u64>,
618    current_reader: Option<BufReader<File>>,
619    current_offset: u64,
620    from_lsn: u64,
621}
622
623impl<'a> RecoveryIterator<'a> {
624    fn new(manager: &'a WalSegmentManager, from_lsn: u64) -> Self {
625        let segments = manager.segments.read();
626        let mut sequences: Vec<u64> = segments
627            .values()
628            .filter(|s| s.first_lsn >= from_lsn || s.last_lsn.map(|l| l >= from_lsn).unwrap_or(true))
629            .map(|s| s.sequence)
630            .collect();
631        sequences.sort();
632
633        Self {
634            manager,
635            current_segment_idx: 0,
636            segment_sequences: sequences,
637            current_reader: None,
638            current_offset: SEGMENT_HEADER_SIZE as u64,
639            from_lsn,
640        }
641    }
642
643    /// Get next WAL entry
644    pub fn next_entry(&mut self) -> std::io::Result<Option<WalEntry>> {
645        loop {
646            // Open segment if needed
647            if self.current_reader.is_none() {
648                if self.current_segment_idx >= self.segment_sequences.len() {
649                    return Ok(None);
650                }
651
652                let sequence = self.segment_sequences[self.current_segment_idx];
653                let segments = self.manager.segments.read();
654                if let Some(meta) = segments.get(&sequence) {
655                    let file = File::open(&meta.path)?;
656                    let mut reader = BufReader::new(file);
657                    reader.seek(SeekFrom::Start(SEGMENT_HEADER_SIZE as u64))?;
658                    self.current_reader = Some(reader);
659                    self.current_offset = SEGMENT_HEADER_SIZE as u64;
660                } else {
661                    self.current_segment_idx += 1;
662                    continue;
663                }
664            }
665
666            let reader = self.current_reader.as_mut().unwrap();
667
668            // Try to read entry
669            let mut len_buf = [0u8; 4];
670            match reader.read_exact(&mut len_buf) {
671                Ok(_) => {}
672                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
673                    // Move to next segment
674                    self.current_reader = None;
675                    self.current_segment_idx += 1;
676                    continue;
677                }
678                Err(e) => return Err(e),
679            }
680
681            let data_len = u32::from_le_bytes(len_buf) as usize;
682            if data_len == 0 || data_len > 100 * 1024 * 1024 {
683                // Invalid or end of segment
684                self.current_reader = None;
685                self.current_segment_idx += 1;
686                continue;
687            }
688
689            // Read LSN
690            let mut lsn_buf = [0u8; 8];
691            reader.read_exact(&mut lsn_buf)?;
692            let lsn = u64::from_le_bytes(lsn_buf);
693
694            // Read data
695            let mut data = vec![0u8; data_len];
696            reader.read_exact(&mut data)?;
697
698            // Read and verify checksum
699            let mut checksum_buf = [0u8; 4];
700            reader.read_exact(&mut checksum_buf)?;
701            let stored_checksum = u32::from_le_bytes(checksum_buf);
702
703            let mut verify_buf = Vec::with_capacity(4 + 8 + data_len);
704            verify_buf.extend_from_slice(&len_buf);
705            verify_buf.extend_from_slice(&lsn_buf);
706            verify_buf.extend_from_slice(&data);
707            let computed_checksum = crc32fast::hash(&verify_buf);
708
709            if stored_checksum != computed_checksum {
710                return Err(std::io::Error::new(
711                    std::io::ErrorKind::InvalidData,
712                    "WAL entry checksum mismatch",
713                ));
714            }
715
716            self.current_offset += (4 + 8 + data_len + 4) as u64;
717
718            // Skip entries before from_lsn
719            if lsn < self.from_lsn {
720                continue;
721            }
722
723            return Ok(Some(WalEntry { lsn, data }));
724        }
725    }
726}
727
728/// A WAL entry for recovery
729#[derive(Debug, Clone)]
730pub struct WalEntry {
731    /// LSN of this entry
732    pub lsn: u64,
733    /// Entry data
734    pub data: Vec<u8>,
735}
736
737// =============================================================================
738// Tests
739// =============================================================================
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744    use tempfile::tempdir;
745
746    #[test]
747    fn test_segment_manager_basic() {
748        let dir = tempdir().unwrap();
749        let config = SegmentConfig::default()
750            .with_wal_dir(dir.path())
751            .with_max_size(1024);
752
753        let manager = WalSegmentManager::new(config).unwrap();
754
755        // Append some entries
756        for i in 0..100 {
757            let data = format!("entry_{}", i);
758            let lsn = manager.append(data.as_bytes()).unwrap();
759            assert_eq!(lsn, i as u64);
760        }
761
762        let stats = manager.stats();
763        assert!(stats.segment_count > 0);
764        assert_eq!(stats.current_lsn, 100);
765
766        manager.shutdown().unwrap();
767    }
768
769    #[test]
770    fn test_checkpoint_and_cleanup() {
771        let dir = tempdir().unwrap();
772        let config = SegmentConfig::default()
773            .with_wal_dir(dir.path())
774            .with_max_size(256);
775
776        let manager = WalSegmentManager::new(config).unwrap();
777
778        // Append enough to create multiple segments
779        for i in 0..50 {
780            let data = format!("entry_{:04}", i);
781            manager.append(data.as_bytes()).unwrap();
782        }
783
784        // Force rotation
785        manager.flush().unwrap();
786
787        // Create checkpoint
788        let checkpoint = manager.create_checkpoint(12345, 50).unwrap();
789        assert!(checkpoint.lsn > 0);
790
791        // Cleanup should work
792        let cleaned = manager.cleanup_old_segments().unwrap();
793        // May or may not clean depending on segment boundaries
794        assert!(cleaned >= 0);
795
796        manager.shutdown().unwrap();
797    }
798
799    #[test]
800    fn test_recovery() {
801        let dir = tempdir().unwrap();
802        let config = SegmentConfig::default()
803            .with_wal_dir(dir.path());
804
805        // Write some data
806        {
807            let manager = WalSegmentManager::new(config.clone()).unwrap();
808            for i in 0..10 {
809                let data = format!("data_{}", i);
810                manager.append(data.as_bytes()).unwrap();
811            }
812            manager.shutdown().unwrap();
813        }
814
815        // Recover and verify
816        {
817            let manager = WalSegmentManager::new(config).unwrap();
818            let mut iter = manager.recovery_iterator(0);
819            let mut count = 0;
820
821            while let Some(entry) = iter.next_entry().unwrap() {
822                let data = String::from_utf8_lossy(&entry.data);
823                assert!(data.starts_with("data_"));
824                count += 1;
825            }
826
827            assert_eq!(count, 10);
828        }
829    }
830
831    #[test]
832    fn test_segment_header_encoding() {
833        let header = SegmentHeader::new(42, 12345);
834        let encoded = header.encode();
835        let decoded = SegmentHeader::decode(&encoded).unwrap();
836
837        assert_eq!(decoded.magic, SEGMENT_MAGIC);
838        assert_eq!(decoded.sequence, 42);
839        assert_eq!(decoded.first_lsn, 12345);
840    }
841
842    #[test]
843    fn test_checkpoint_record_encoding() {
844        let record = CheckpointRecord {
845            lsn: 1000,
846            last_segment: 5,
847            timestamp: 123456789,
848            memtable_checksum: 0xDEADBEEF,
849            entry_count: 500,
850        };
851
852        let encoded = record.encode();
853        let decoded = CheckpointRecord::decode(&encoded).unwrap();
854
855        assert_eq!(decoded.lsn, 1000);
856        assert_eq!(decoded.last_segment, 5);
857        assert_eq!(decoded.memtable_checksum, 0xDEADBEEF);
858        assert_eq!(decoded.entry_count, 500);
859    }
860}