Skip to main content

sochdb_storage/
wal_segment.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! WAL Segmentation and Checkpoint Manager
19//!
20//! This module provides bounded recovery time through:
21//! - WAL file segmentation (rotate after size/time threshold)
22//! - Fuzzy checkpointing (no blocking writes)
23//! - Automatic old segment cleanup after checkpoint
24//!
25//! ## Architecture
26//!
27//! ```text
28//! Active Writes → Current Segment → Rotation → Archived Segment
29//!                                                      ↓
30//!                                              Checkpoint
31//!                                                      ↓
32//!                                              Segment Cleanup
33//! ```
34//!
35//! ## Recovery Time Bound
36//!
37//! Recovery time is bounded by segment_max_size / disk_bandwidth.
38//! With 64MB segments and 400 MB/s sequential read, recovery ≤ 160ms.
39
40use std::collections::BTreeMap;
41use std::fs::{self, File, OpenOptions};
42use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
45use std::sync::Arc;
46use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
47
48use parking_lot::{Mutex, RwLock};
49
50/// Default maximum segment size (64 MB)
51pub const DEFAULT_SEGMENT_MAX_SIZE: u64 = 64 * 1024 * 1024;
52
53/// Default segment rotation interval (5 minutes)
54pub const DEFAULT_ROTATION_INTERVAL: Duration = Duration::from_secs(300);
55
56/// Default checkpoint interval (1 minute)
57pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
58
59/// Segment file header magic
60const SEGMENT_MAGIC: u32 = 0x574C5347; // "WLSG"
61
62/// Segment header version
63const SEGMENT_VERSION: u16 = 1;
64
65/// Segment header size
66const SEGMENT_HEADER_SIZE: usize = 32;
67
68/// Checkpoint file magic
69const CHECKPOINT_MAGIC: u32 = 0x43484B50; // "CHKP"
70
71/// WAL segment configuration
72#[derive(Debug, Clone)]
73pub struct SegmentConfig {
74    /// Maximum segment size before rotation
75    pub max_size: u64,
76    /// Maximum time before rotation
77    pub rotation_interval: Duration,
78    /// Checkpoint interval
79    pub checkpoint_interval: Duration,
80    /// Directory for WAL segments
81    pub wal_dir: PathBuf,
82    /// Sync on every write
83    pub sync_on_write: bool,
84    /// Preallocate segment files
85    pub preallocate: bool,
86}
87
88impl Default for SegmentConfig {
89    fn default() -> Self {
90        Self {
91            max_size: DEFAULT_SEGMENT_MAX_SIZE,
92            rotation_interval: DEFAULT_ROTATION_INTERVAL,
93            checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
94            wal_dir: PathBuf::from("wal"),
95            sync_on_write: true,
96            preallocate: true,
97        }
98    }
99}
100
101impl SegmentConfig {
102    pub fn with_wal_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
103        self.wal_dir = dir.as_ref().to_path_buf();
104        self
105    }
106
107    pub fn with_max_size(mut self, size: u64) -> Self {
108        self.max_size = size;
109        self
110    }
111}
112
113/// Segment header stored at the beginning of each segment file
114#[derive(Debug, Clone)]
115pub struct SegmentHeader {
116    /// Magic number
117    pub magic: u32,
118    /// Version
119    pub version: u16,
120    /// Flags
121    pub flags: u16,
122    /// Segment sequence number
123    pub sequence: u64,
124    /// First LSN in this segment
125    pub first_lsn: u64,
126    /// Creation timestamp (Unix millis)
127    pub created_at: u64,
128    /// Reserved for future use
129    pub reserved: [u8; 8],
130}
131
132impl SegmentHeader {
133    fn new(sequence: u64, first_lsn: u64) -> Self {
134        let now = SystemTime::now()
135            .duration_since(UNIX_EPOCH)
136            .map(|d| d.as_millis() as u64)
137            .unwrap_or(0);
138
139        Self {
140            magic: SEGMENT_MAGIC,
141            version: SEGMENT_VERSION,
142            flags: 0,
143            sequence,
144            first_lsn,
145            created_at: now,
146            reserved: [0; 8],
147        }
148    }
149
150    fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
151        let mut buf = [0u8; SEGMENT_HEADER_SIZE];
152        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
153        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
154        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
155        buf[8..16].copy_from_slice(&self.sequence.to_le_bytes());
156        buf[16..24].copy_from_slice(&self.first_lsn.to_le_bytes());
157        buf[24..32].copy_from_slice(&self.created_at.to_le_bytes());
158        buf
159    }
160
161    fn decode(buf: &[u8]) -> Option<Self> {
162        if buf.len() < SEGMENT_HEADER_SIZE {
163            return None;
164        }
165
166        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
167        if magic != SEGMENT_MAGIC {
168            return None;
169        }
170
171        Some(Self {
172            magic,
173            version: u16::from_le_bytes([buf[4], buf[5]]),
174            flags: u16::from_le_bytes([buf[6], buf[7]]),
175            sequence: u64::from_le_bytes([buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]]),
176            first_lsn: u64::from_le_bytes([buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23]]),
177            created_at: u64::from_le_bytes([buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31]]),
178            reserved: [0; 8],
179        })
180    }
181}
182
183/// Active WAL segment being written to
184struct ActiveSegment {
185    /// Segment file
186    file: BufWriter<File>,
187    /// Segment path
188    path: PathBuf,
189    /// Segment header
190    header: SegmentHeader,
191    /// Current offset within segment
192    offset: u64,
193    /// Creation time for rotation
194    created_at: Instant,
195}
196
197/// Checkpoint record stored in checkpoint file
198#[derive(Debug, Clone)]
199pub struct CheckpointRecord {
200    /// Checkpoint LSN (all entries before this are flushed)
201    pub lsn: u64,
202    /// Last segment sequence that can be deleted
203    pub last_segment: u64,
204    /// Checkpoint timestamp
205    pub timestamp: u64,
206    /// Memtable state checksum (for validation)
207    pub memtable_checksum: u64,
208    /// Number of entries checkpointed
209    pub entry_count: u64,
210}
211
212impl CheckpointRecord {
213    fn encode(&self) -> Vec<u8> {
214        let mut buf = Vec::with_capacity(48);
215        buf.extend_from_slice(&CHECKPOINT_MAGIC.to_le_bytes());
216        buf.extend_from_slice(&self.lsn.to_le_bytes());
217        buf.extend_from_slice(&self.last_segment.to_le_bytes());
218        buf.extend_from_slice(&self.timestamp.to_le_bytes());
219        buf.extend_from_slice(&self.memtable_checksum.to_le_bytes());
220        buf.extend_from_slice(&self.entry_count.to_le_bytes());
221        // Add checksum of the record itself
222        let checksum = crc32fast::hash(&buf);
223        buf.extend_from_slice(&checksum.to_le_bytes());
224        buf
225    }
226
227    fn decode(buf: &[u8]) -> Option<Self> {
228        if buf.len() < 48 {
229            return None;
230        }
231
232        let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
233        if magic != CHECKPOINT_MAGIC {
234            return None;
235        }
236
237        // Verify checksum
238        let stored_checksum = u32::from_le_bytes([buf[44], buf[45], buf[46], buf[47]]);
239        let computed_checksum = crc32fast::hash(&buf[0..44]);
240        if stored_checksum != computed_checksum {
241            return None;
242        }
243
244        Some(Self {
245            lsn: u64::from_le_bytes([buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11]]),
246            last_segment: u64::from_le_bytes([buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18], buf[19]]),
247            timestamp: u64::from_le_bytes([buf[20], buf[21], buf[22], buf[23], buf[24], buf[25], buf[26], buf[27]]),
248            memtable_checksum: u64::from_le_bytes([buf[28], buf[29], buf[30], buf[31], buf[32], buf[33], buf[34], buf[35]]),
249            entry_count: u64::from_le_bytes([buf[36], buf[37], buf[38], buf[39], buf[40], buf[41], buf[42], buf[43]]),
250        })
251    }
252}
253
254/// WAL Segment Manager
255///
256/// Handles WAL segmentation, rotation, and cleanup.
257pub struct WalSegmentManager {
258    /// Configuration
259    config: SegmentConfig,
260    /// Current active segment
261    active: Mutex<Option<ActiveSegment>>,
262    /// Current LSN
263    current_lsn: AtomicU64,
264    /// Current segment sequence
265    segment_sequence: AtomicU64,
266    /// All segment metadata (sequence -> header)
267    segments: RwLock<BTreeMap<u64, SegmentMetadata>>,
268    /// Last checkpoint record
269    last_checkpoint: RwLock<Option<CheckpointRecord>>,
270    /// Shutdown flag
271    shutdown: AtomicBool,
272}
273
274/// Metadata for a WAL segment
275#[derive(Debug, Clone)]
276pub struct SegmentMetadata {
277    /// Segment sequence number
278    pub sequence: u64,
279    /// First LSN in segment
280    pub first_lsn: u64,
281    /// Last LSN in segment (None if active)
282    pub last_lsn: Option<u64>,
283    /// File path
284    pub path: PathBuf,
285    /// File size
286    pub size: u64,
287    /// Is this the active segment
288    pub is_active: bool,
289}
290
291impl WalSegmentManager {
292    /// Create a new WAL segment manager
293    pub fn new(config: SegmentConfig) -> std::io::Result<Self> {
294        // Ensure WAL directory exists
295        fs::create_dir_all(&config.wal_dir)?;
296
297        let manager = Self {
298            config,
299            active: Mutex::new(None),
300            current_lsn: AtomicU64::new(0),
301            segment_sequence: AtomicU64::new(0),
302            segments: RwLock::new(BTreeMap::new()),
303            last_checkpoint: RwLock::new(None),
304            shutdown: AtomicBool::new(false),
305        };
306
307        // Load existing segments and checkpoint
308        manager.recover()?;
309
310        Ok(manager)
311    }
312
313    /// Recover from existing WAL segments
314    fn recover(&self) -> std::io::Result<()> {
315        // Load checkpoint if exists
316        let checkpoint_path = self.config.wal_dir.join("checkpoint");
317        if checkpoint_path.exists() {
318            let mut file = File::open(&checkpoint_path)?;
319            let mut buf = Vec::new();
320            file.read_to_end(&mut buf)?;
321            if let Some(record) = CheckpointRecord::decode(&buf) {
322                *self.last_checkpoint.write() = Some(record);
323            }
324        }
325
326        // Scan for existing segments
327        let entries = fs::read_dir(&self.config.wal_dir)?;
328        let mut max_sequence = 0u64;
329        let mut max_lsn = 0u64;
330
331        for entry in entries {
332            let entry = entry?;
333            let path = entry.path();
334
335            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
336                if name.starts_with("segment_") && name.ends_with(".wal") {
337                    // Parse segment file
338                    let mut file = File::open(&path)?;
339                    let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
340                    if file.read_exact(&mut header_buf).is_ok() {
341                        if let Some(header) = SegmentHeader::decode(&header_buf) {
342                            max_sequence = max_sequence.max(header.sequence);
343                            max_lsn = max_lsn.max(header.first_lsn);
344
345                            let metadata = file.metadata()?;
346                            self.segments.write().insert(header.sequence, SegmentMetadata {
347                                sequence: header.sequence,
348                                first_lsn: header.first_lsn,
349                                last_lsn: None,
350                                path: path.clone(),
351                                size: metadata.len(),
352                                is_active: false,
353                            });
354                        }
355                    }
356                }
357            }
358        }
359
360        // Set counters
361        self.segment_sequence.store(max_sequence + 1, Ordering::SeqCst);
362        self.current_lsn.store(max_lsn, Ordering::SeqCst);
363
364        Ok(())
365    }
366
367    /// Append a record to the WAL, returns the LSN
368    pub fn append(&self, data: &[u8]) -> std::io::Result<u64> {
369        let mut active = self.active.lock();
370
371        // Rotate if needed
372        if self.needs_rotation(&active) {
373            self.rotate_segment(&mut active)?;
374        }
375
376        // Ensure we have an active segment
377        if active.is_none() {
378            self.create_new_segment(&mut active)?;
379        }
380
381        let segment = active.as_mut().unwrap();
382
383        // Assign LSN
384        let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
385
386        // Write record: [length: u32][lsn: u64][data][checksum: u32]
387        let record_len = 4 + 8 + data.len() + 4;
388        let mut record = Vec::with_capacity(record_len);
389        record.extend_from_slice(&(data.len() as u32).to_le_bytes());
390        record.extend_from_slice(&lsn.to_le_bytes());
391        record.extend_from_slice(data);
392        let checksum = crc32fast::hash(&record);
393        record.extend_from_slice(&checksum.to_le_bytes());
394
395        segment.file.write_all(&record)?;
396        segment.offset += record_len as u64;
397
398        if self.config.sync_on_write {
399            segment.file.flush()?;
400        }
401
402        Ok(lsn)
403    }
404
405    /// Check if segment needs rotation
406    fn needs_rotation(&self, active: &Option<ActiveSegment>) -> bool {
407        match active {
408            Some(segment) => {
409                segment.offset >= self.config.max_size
410                    || segment.created_at.elapsed() >= self.config.rotation_interval
411            }
412            None => false,
413        }
414    }
415
416    /// Rotate to a new segment
417    fn rotate_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
418        if let Some(mut segment) = active.take() {
419            // Flush and sync the old segment
420            segment.file.flush()?;
421            segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
422
423            // Update metadata to mark as not active
424            let current_lsn = self.current_lsn.load(Ordering::SeqCst);
425            if let Some(meta) = self.segments.write().get_mut(&segment.header.sequence) {
426                meta.is_active = false;
427                meta.last_lsn = Some(current_lsn);
428                meta.size = segment.offset;
429            }
430        }
431
432        Ok(())
433    }
434
435    /// Create a new segment
436    fn create_new_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
437        let sequence = self.segment_sequence.fetch_add(1, Ordering::SeqCst);
438        let first_lsn = self.current_lsn.load(Ordering::SeqCst);
439
440        let path = self.config.wal_dir.join(format!("segment_{:016x}.wal", sequence));
441
442        let file = OpenOptions::new()
443            .create(true)
444            .write(true)
445            .truncate(true)
446            .open(&path)?;
447
448        // Preallocate if configured
449        if self.config.preallocate {
450            file.set_len(self.config.max_size)?;
451        }
452
453        let mut writer = BufWriter::new(file);
454
455        // Write header
456        let header = SegmentHeader::new(sequence, first_lsn);
457        writer.write_all(&header.encode())?;
458
459        let segment = ActiveSegment {
460            file: writer,
461            path: path.clone(),
462            header: header.clone(),
463            offset: SEGMENT_HEADER_SIZE as u64,
464            created_at: Instant::now(),
465        };
466
467        // Add to segments map
468        self.segments.write().insert(sequence, SegmentMetadata {
469            sequence,
470            first_lsn,
471            last_lsn: None,
472            path,
473            size: SEGMENT_HEADER_SIZE as u64,
474            is_active: true,
475        });
476
477        *active = Some(segment);
478
479        Ok(())
480    }
481
482    /// Create a fuzzy checkpoint
483    ///
484    /// This captures the current LSN and can be used to cleanup old segments.
485    pub fn create_checkpoint(
486        &self,
487        memtable_checksum: u64,
488        entry_count: u64,
489    ) -> std::io::Result<CheckpointRecord> {
490        let lsn = self.current_lsn.load(Ordering::SeqCst);
491
492        // Find the last segment that is fully before this LSN
493        let segments = self.segments.read();
494        let last_segment = segments
495            .values()
496            .filter(|s| s.last_lsn.map(|l| l < lsn).unwrap_or(false))
497            .map(|s| s.sequence)
498            .max()
499            .unwrap_or(0);
500
501        let now = SystemTime::now()
502            .duration_since(UNIX_EPOCH)
503            .map(|d| d.as_millis() as u64)
504            .unwrap_or(0);
505
506        let record = CheckpointRecord {
507            lsn,
508            last_segment,
509            timestamp: now,
510            memtable_checksum,
511            entry_count,
512        };
513
514        // Write checkpoint file
515        let checkpoint_path = self.config.wal_dir.join("checkpoint");
516        let temp_path = self.config.wal_dir.join("checkpoint.tmp");
517
518        let mut file = File::create(&temp_path)?;
519        file.write_all(&record.encode())?;
520        file.sync_all()?;
521
522        fs::rename(&temp_path, &checkpoint_path)?;
523
524        *self.last_checkpoint.write() = Some(record.clone());
525
526        Ok(record)
527    }
528
529    /// Cleanup old segments that are no longer needed
530    pub fn cleanup_old_segments(&self) -> std::io::Result<usize> {
531        let checkpoint = self.last_checkpoint.read().clone();
532
533        let last_safe_segment = match checkpoint {
534            Some(cp) => cp.last_segment,
535            None => return Ok(0),
536        };
537
538        let mut segments = self.segments.write();
539        let old_segments: Vec<u64> = segments
540            .keys()
541            .filter(|&&seq| seq <= last_safe_segment)
542            .copied()
543            .collect();
544
545        let mut cleaned = 0;
546        for sequence in old_segments {
547            if let Some(meta) = segments.remove(&sequence) {
548                if meta.path.exists() {
549                    fs::remove_file(&meta.path)?;
550                    cleaned += 1;
551                }
552            }
553        }
554
555        Ok(cleaned)
556    }
557
558    /// Get statistics
559    pub fn stats(&self) -> SegmentStats {
560        let segments = self.segments.read();
561        let total_size: u64 = segments.values().map(|s| s.size).sum();
562        let checkpoint = self.last_checkpoint.read().clone();
563
564        SegmentStats {
565            segment_count: segments.len(),
566            total_size,
567            current_lsn: self.current_lsn.load(Ordering::SeqCst),
568            current_sequence: self.segment_sequence.load(Ordering::SeqCst),
569            last_checkpoint_lsn: checkpoint.as_ref().map(|c| c.lsn),
570        }
571    }
572
573    /// Iterator for recovery
574    pub fn recovery_iterator(&self, from_lsn: u64) -> RecoveryIterator {
575        RecoveryIterator::new(self, from_lsn)
576    }
577
578    /// Flush all pending writes
579    pub fn flush(&self) -> std::io::Result<()> {
580        let mut active = self.active.lock();
581        if let Some(ref mut segment) = *active {
582            segment.file.flush()?;
583        }
584        Ok(())
585    }
586
587    /// Shutdown the segment manager
588    pub fn shutdown(&self) -> std::io::Result<()> {
589        self.shutdown.store(true, Ordering::SeqCst);
590
591        let mut active = self.active.lock();
592        if let Some(mut segment) = active.take() {
593            segment.file.flush()?;
594            segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
595        }
596
597        Ok(())
598    }
599}
600
601/// Statistics for WAL segments
602#[derive(Debug, Clone)]
603pub struct SegmentStats {
604    /// Number of segments
605    pub segment_count: usize,
606    /// Total size of all segments
607    pub total_size: u64,
608    /// Current LSN
609    pub current_lsn: u64,
610    /// Current segment sequence
611    pub current_sequence: u64,
612    /// Last checkpoint LSN
613    pub last_checkpoint_lsn: Option<u64>,
614}
615
616/// Recovery iterator for replaying WAL entries
617pub struct RecoveryIterator<'a> {
618    manager: &'a WalSegmentManager,
619    current_segment_idx: usize,
620    segment_sequences: Vec<u64>,
621    current_reader: Option<BufReader<File>>,
622    current_offset: u64,
623    from_lsn: u64,
624}
625
626impl<'a> RecoveryIterator<'a> {
627    fn new(manager: &'a WalSegmentManager, from_lsn: u64) -> Self {
628        let segments = manager.segments.read();
629        let mut sequences: Vec<u64> = segments
630            .values()
631            .filter(|s| s.first_lsn >= from_lsn || s.last_lsn.map(|l| l >= from_lsn).unwrap_or(true))
632            .map(|s| s.sequence)
633            .collect();
634        sequences.sort();
635
636        Self {
637            manager,
638            current_segment_idx: 0,
639            segment_sequences: sequences,
640            current_reader: None,
641            current_offset: SEGMENT_HEADER_SIZE as u64,
642            from_lsn,
643        }
644    }
645
646    /// Get next WAL entry
647    pub fn next_entry(&mut self) -> std::io::Result<Option<WalEntry>> {
648        loop {
649            // Open segment if needed
650            if self.current_reader.is_none() {
651                if self.current_segment_idx >= self.segment_sequences.len() {
652                    return Ok(None);
653                }
654
655                let sequence = self.segment_sequences[self.current_segment_idx];
656                let segments = self.manager.segments.read();
657                if let Some(meta) = segments.get(&sequence) {
658                    let file = File::open(&meta.path)?;
659                    let mut reader = BufReader::new(file);
660                    reader.seek(SeekFrom::Start(SEGMENT_HEADER_SIZE as u64))?;
661                    self.current_reader = Some(reader);
662                    self.current_offset = SEGMENT_HEADER_SIZE as u64;
663                } else {
664                    self.current_segment_idx += 1;
665                    continue;
666                }
667            }
668
669            let reader = self.current_reader.as_mut().unwrap();
670
671            // Try to read entry
672            let mut len_buf = [0u8; 4];
673            match reader.read_exact(&mut len_buf) {
674                Ok(_) => {}
675                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
676                    // Move to next segment
677                    self.current_reader = None;
678                    self.current_segment_idx += 1;
679                    continue;
680                }
681                Err(e) => return Err(e),
682            }
683
684            let data_len = u32::from_le_bytes(len_buf) as usize;
685            if data_len == 0 || data_len > 100 * 1024 * 1024 {
686                // Invalid or end of segment
687                self.current_reader = None;
688                self.current_segment_idx += 1;
689                continue;
690            }
691
692            // Read LSN
693            let mut lsn_buf = [0u8; 8];
694            reader.read_exact(&mut lsn_buf)?;
695            let lsn = u64::from_le_bytes(lsn_buf);
696
697            // Read data
698            let mut data = vec![0u8; data_len];
699            reader.read_exact(&mut data)?;
700
701            // Read and verify checksum
702            let mut checksum_buf = [0u8; 4];
703            reader.read_exact(&mut checksum_buf)?;
704            let stored_checksum = u32::from_le_bytes(checksum_buf);
705
706            let mut verify_buf = Vec::with_capacity(4 + 8 + data_len);
707            verify_buf.extend_from_slice(&len_buf);
708            verify_buf.extend_from_slice(&lsn_buf);
709            verify_buf.extend_from_slice(&data);
710            let computed_checksum = crc32fast::hash(&verify_buf);
711
712            if stored_checksum != computed_checksum {
713                return Err(std::io::Error::new(
714                    std::io::ErrorKind::InvalidData,
715                    "WAL entry checksum mismatch",
716                ));
717            }
718
719            self.current_offset += (4 + 8 + data_len + 4) as u64;
720
721            // Skip entries before from_lsn
722            if lsn < self.from_lsn {
723                continue;
724            }
725
726            return Ok(Some(WalEntry { lsn, data }));
727        }
728    }
729}
730
731/// A WAL entry for recovery
732#[derive(Debug, Clone)]
733pub struct WalEntry {
734    /// LSN of this entry
735    pub lsn: u64,
736    /// Entry data
737    pub data: Vec<u8>,
738}
739
740// =============================================================================
741// Tests
742// =============================================================================
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747    use tempfile::tempdir;
748
749    #[test]
750    fn test_segment_manager_basic() {
751        let dir = tempdir().unwrap();
752        let config = SegmentConfig::default()
753            .with_wal_dir(dir.path())
754            .with_max_size(1024);
755
756        let manager = WalSegmentManager::new(config).unwrap();
757
758        // Append some entries
759        for i in 0..100 {
760            let data = format!("entry_{}", i);
761            let lsn = manager.append(data.as_bytes()).unwrap();
762            assert_eq!(lsn, i as u64);
763        }
764
765        let stats = manager.stats();
766        assert!(stats.segment_count > 0);
767        assert_eq!(stats.current_lsn, 100);
768
769        manager.shutdown().unwrap();
770    }
771
772    #[test]
773    fn test_checkpoint_and_cleanup() {
774        let dir = tempdir().unwrap();
775        let config = SegmentConfig::default()
776            .with_wal_dir(dir.path())
777            .with_max_size(256);
778
779        let manager = WalSegmentManager::new(config).unwrap();
780
781        // Append enough to create multiple segments
782        for i in 0..50 {
783            let data = format!("entry_{:04}", i);
784            manager.append(data.as_bytes()).unwrap();
785        }
786
787        // Force rotation
788        manager.flush().unwrap();
789
790        // Create checkpoint
791        let checkpoint = manager.create_checkpoint(12345, 50).unwrap();
792        assert!(checkpoint.lsn > 0);
793
794        // Cleanup should work
795        let cleaned = manager.cleanup_old_segments().unwrap();
796        // May or may not clean depending on segment boundaries
797        assert!(cleaned >= 0);
798
799        manager.shutdown().unwrap();
800    }
801
802    #[test]
803    fn test_recovery() {
804        let dir = tempdir().unwrap();
805        let config = SegmentConfig::default()
806            .with_wal_dir(dir.path());
807
808        // Write some data
809        {
810            let manager = WalSegmentManager::new(config.clone()).unwrap();
811            for i in 0..10 {
812                let data = format!("data_{}", i);
813                manager.append(data.as_bytes()).unwrap();
814            }
815            manager.shutdown().unwrap();
816        }
817
818        // Recover and verify
819        {
820            let manager = WalSegmentManager::new(config).unwrap();
821            let mut iter = manager.recovery_iterator(0);
822            let mut count = 0;
823
824            while let Some(entry) = iter.next_entry().unwrap() {
825                let data = String::from_utf8_lossy(&entry.data);
826                assert!(data.starts_with("data_"));
827                count += 1;
828            }
829
830            assert_eq!(count, 10);
831        }
832    }
833
834    #[test]
835    fn test_segment_header_encoding() {
836        let header = SegmentHeader::new(42, 12345);
837        let encoded = header.encode();
838        let decoded = SegmentHeader::decode(&encoded).unwrap();
839
840        assert_eq!(decoded.magic, SEGMENT_MAGIC);
841        assert_eq!(decoded.sequence, 42);
842        assert_eq!(decoded.first_lsn, 12345);
843    }
844
845    #[test]
846    fn test_checkpoint_record_encoding() {
847        let record = CheckpointRecord {
848            lsn: 1000,
849            last_segment: 5,
850            timestamp: 123456789,
851            memtable_checksum: 0xDEADBEEF,
852            entry_count: 500,
853        };
854
855        let encoded = record.encode();
856        let decoded = CheckpointRecord::decode(&encoded).unwrap();
857
858        assert_eq!(decoded.lsn, 1000);
859        assert_eq!(decoded.last_segment, 5);
860        assert_eq!(decoded.memtable_checksum, 0xDEADBEEF);
861        assert_eq!(decoded.entry_count, 500);
862    }
863}