Skip to main content

amaters_core/storage/
wal.rs

1//! Write-Ahead Log (WAL) implementation
2//!
3//! The WAL provides durability by logging all writes before they're applied to the memtable.
4//! In case of crash, the WAL can be replayed to recover the memtable state.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::types::{CipherBlob, Key};
8use std::fs::{File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11
12/// Statistics from WAL recovery
13#[derive(Debug, Clone, Default)]
14pub struct RecoveryStats {
15    /// Number of entries successfully recovered
16    pub entries_recovered: u64,
17    /// Number of corrupted entries encountered
18    pub entries_corrupted: u64,
19    /// Total bytes recovered
20    pub bytes_recovered: u64,
21}
22
23/// WAL entry type
24#[derive(Debug, Clone, PartialEq)]
25pub enum WalEntryType {
26    Put = 1,
27    Delete = 2,
28}
29
30/// WAL entry
31#[derive(Debug, Clone, PartialEq)]
32pub struct WalEntry {
33    /// Sequence number for ordering
34    pub sequence: u64,
35    /// Entry type
36    pub entry_type: WalEntryType,
37    /// Key
38    pub key: Key,
39    /// Value (None for deletes)
40    pub value: Option<CipherBlob>,
41    /// CRC32 checksum for integrity
42    pub checksum: u32,
43}
44
45impl WalEntry {
46    /// Create a Put entry
47    pub fn put(sequence: u64, key: Key, value: CipherBlob) -> Self {
48        let mut entry = Self {
49            sequence,
50            entry_type: WalEntryType::Put,
51            key,
52            value: Some(value),
53            checksum: 0,
54        };
55        entry.checksum = entry.calculate_checksum();
56        entry
57    }
58
59    /// Create a Delete entry
60    pub fn delete(sequence: u64, key: Key) -> Self {
61        let mut entry = Self {
62            sequence,
63            entry_type: WalEntryType::Delete,
64            key,
65            value: None,
66            checksum: 0,
67        };
68        entry.checksum = entry.calculate_checksum();
69        entry
70    }
71
72    /// Calculate checksum for the entry
73    fn calculate_checksum(&self) -> u32 {
74        let mut hasher = crc32fast::Hasher::new();
75
76        // Hash sequence
77        hasher.update(&self.sequence.to_le_bytes());
78
79        // Hash entry type
80        hasher.update(&[self.entry_type.clone() as u8]);
81
82        // Hash key
83        hasher.update(self.key.as_bytes());
84
85        // Hash value if present
86        if let Some(ref value) = self.value {
87            hasher.update(value.as_bytes());
88        }
89
90        hasher.finalize()
91    }
92
93    /// Verify checksum
94    pub fn verify_checksum(&self) -> Result<()> {
95        let calculated = self.calculate_checksum();
96        if calculated == self.checksum {
97            Ok(())
98        } else {
99            Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
100                "WAL entry checksum mismatch: expected {}, got {}",
101                self.checksum, calculated
102            ))))
103        }
104    }
105
106    /// Encode entry to bytes
107    pub fn encode(&self) -> Vec<u8> {
108        let mut bytes = Vec::new();
109
110        // Magic number (0x57414C = "WAL" in hex)
111        bytes.extend_from_slice(&0x57414Cu32.to_le_bytes());
112
113        // Sequence
114        bytes.extend_from_slice(&self.sequence.to_le_bytes());
115
116        // Entry type
117        bytes.push(self.entry_type.clone() as u8);
118
119        // Key length and data
120        bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
121        bytes.extend_from_slice(self.key.as_bytes());
122
123        // Value length and data
124        if let Some(ref value) = self.value {
125            bytes.extend_from_slice(&(value.len() as u32).to_le_bytes());
126            bytes.extend_from_slice(value.as_bytes());
127        } else {
128            bytes.extend_from_slice(&0u32.to_le_bytes());
129        }
130
131        // Checksum
132        bytes.extend_from_slice(&self.checksum.to_le_bytes());
133
134        bytes
135    }
136
137    /// Decode entry from bytes
138    pub fn decode(bytes: &[u8]) -> Result<Self> {
139        if bytes.len() < 17 {
140            // Magic (4) + Sequence (8) + Type (1) + KeyLen (4)
141            return Err(AmateRSError::SerializationError(ErrorContext::new(
142                "WAL entry too short",
143            )));
144        }
145
146        let mut offset = 0;
147
148        // Verify magic number
149        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
150        if magic != 0x57414C {
151            return Err(AmateRSError::SerializationError(ErrorContext::new(
152                "Invalid WAL entry magic number",
153            )));
154        }
155        offset += 4;
156
157        // Sequence
158        let sequence = u64::from_le_bytes(bytes[offset..offset + 8].try_into().map_err(|_| {
159            AmateRSError::SerializationError(ErrorContext::new("Failed to read sequence"))
160        })?);
161        offset += 8;
162
163        // Entry type
164        let entry_type = match bytes[offset] {
165            1 => WalEntryType::Put,
166            2 => WalEntryType::Delete,
167            _ => {
168                return Err(AmateRSError::SerializationError(ErrorContext::new(
169                    "Invalid WAL entry type",
170                )));
171            }
172        };
173        offset += 1;
174
175        // Key
176        let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
177            AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
178        })?) as usize;
179        offset += 4;
180
181        let key_bytes = &bytes[offset..offset + key_len];
182        let key = Key::from_slice(key_bytes);
183        offset += key_len;
184
185        // Value
186        let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
187            AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
188        })?) as usize;
189        offset += 4;
190
191        let value = if value_len > 0 {
192            let value_bytes = &bytes[offset..offset + value_len];
193            Some(CipherBlob::new(value_bytes.to_vec()))
194        } else {
195            None
196        };
197        offset += value_len;
198
199        // Checksum
200        let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
201            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
202        })?);
203
204        let entry = Self {
205            sequence,
206            entry_type,
207            key,
208            value,
209            checksum,
210        };
211
212        // Verify checksum
213        entry.verify_checksum()?;
214
215        Ok(entry)
216    }
217}
218
219/// WAL configuration
220#[derive(Debug, Clone)]
221pub struct WalConfig {
222    /// Directory for WAL files
223    pub wal_dir: PathBuf,
224    /// Maximum WAL file size before rotation (default: 64MB)
225    pub max_file_size: u64,
226    /// Maximum number of WAL files to keep (default: 10)
227    pub max_wal_files: usize,
228    /// Whether to sync after each write (default: true for durability)
229    pub sync_on_write: bool,
230}
231
232impl Default for WalConfig {
233    fn default() -> Self {
234        Self {
235            wal_dir: PathBuf::from("./wal"),
236            max_file_size: 64 * 1024 * 1024, // 64MB
237            max_wal_files: 10,
238            sync_on_write: true,
239        }
240    }
241}
242
243/// Write-Ahead Log
244pub struct Wal {
245    /// Configuration
246    config: WalConfig,
247    /// Current WAL file path
248    current_path: PathBuf,
249    /// Writer for current WAL file
250    writer: BufWriter<File>,
251    /// Global sequence number across all WAL files
252    sequence: u64,
253    /// Current file size in bytes
254    current_file_size: u64,
255    /// Current WAL file number
256    current_file_number: u64,
257}
258
259impl Wal {
260    /// Create or open a WAL file (simple API for backward compatibility)
261    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
262        let path = path.as_ref().to_path_buf();
263        let parent = path.parent().ok_or_else(|| {
264            AmateRSError::IoError(ErrorContext::new("WAL path has no parent directory"))
265        })?;
266
267        let config = WalConfig {
268            wal_dir: parent.to_path_buf(),
269            ..Default::default()
270        };
271
272        Self::with_config(config)
273    }
274
275    /// Create a new WAL with custom configuration
276    pub fn with_config(config: WalConfig) -> Result<Self> {
277        // Create WAL directory if it doesn't exist
278        std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
279            AmateRSError::IoError(ErrorContext::new(format!(
280                "Failed to create WAL directory: {}",
281                e
282            )))
283        })?;
284
285        // Find the latest WAL file or create a new one
286        let (file_number, sequence) = Self::find_latest_wal(&config)?;
287
288        let current_path = Self::wal_file_path(&config.wal_dir, file_number);
289
290        let file = OpenOptions::new()
291            .create(true)
292            .append(true)
293            .open(&current_path)
294            .map_err(|e| {
295                AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL: {}", e)))
296            })?;
297
298        let current_file_size = file
299            .metadata()
300            .map_err(|e| {
301                AmateRSError::IoError(ErrorContext::new(format!(
302                    "Failed to get WAL file size: {}",
303                    e
304                )))
305            })?
306            .len();
307
308        Ok(Self {
309            config,
310            current_path,
311            writer: BufWriter::new(file),
312            sequence,
313            current_file_size,
314            current_file_number: file_number,
315        })
316    }
317
318    /// Find the latest WAL file and sequence number
319    fn find_latest_wal(config: &WalConfig) -> Result<(u64, u64)> {
320        let mut max_file_number = 0u64;
321        let mut max_sequence = 0u64;
322
323        if config.wal_dir.exists() {
324            let wal_file_numbers = Self::list_wal_file_numbers(&config.wal_dir)?;
325
326            if let Some(&last) = wal_file_numbers.last() {
327                max_file_number = last;
328            }
329
330            // Scan all WAL files to recover the max sequence number
331            for file_num in &wal_file_numbers {
332                let file_path = Self::wal_file_path(&config.wal_dir, *file_num);
333                if let Ok(mut reader) = WalReader::open(&file_path) {
334                    loop {
335                        match reader.read_entry() {
336                            Ok(Some(entry)) => {
337                                if entry.sequence >= max_sequence {
338                                    max_sequence = entry.sequence + 1;
339                                }
340                            }
341                            Ok(None) => break,
342                            Err(_) => {
343                                tracing::warn!(
344                                    "Corrupted entry found in WAL file {} during startup",
345                                    file_path.display()
346                                );
347                                continue;
348                            }
349                        }
350                    }
351                }
352            }
353        }
354
355        Ok((max_file_number, max_sequence))
356    }
357
358    /// Generate WAL file path for a given file number
359    fn wal_file_path(wal_dir: &Path, file_number: u64) -> PathBuf {
360        wal_dir.join(format!("wal_{:08}.log", file_number))
361    }
362
363    /// List all WAL file numbers in the directory, sorted ascending
364    fn list_wal_file_numbers(wal_dir: &Path) -> Result<Vec<u64>> {
365        let entries = std::fs::read_dir(wal_dir).map_err(|e| {
366            AmateRSError::IoError(ErrorContext::new(format!(
367                "Failed to read WAL directory: {}",
368                e
369            )))
370        })?;
371
372        let mut numbers = Vec::new();
373        for entry in entries {
374            let entry = entry.map_err(|e| {
375                AmateRSError::IoError(ErrorContext::new(format!(
376                    "Failed to read directory entry: {}",
377                    e
378                )))
379            })?;
380            let file_name = entry.file_name();
381            let name = file_name.to_string_lossy();
382            if name.starts_with("wal_") && name.ends_with(".log") {
383                if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
384                    numbers.push(number);
385                }
386            }
387        }
388        numbers.sort_unstable();
389        Ok(numbers)
390    }
391
392    /// Append a Put entry
393    pub fn put(&mut self, key: Key, value: CipherBlob) -> Result<u64> {
394        let sequence = self.sequence;
395        self.sequence += 1;
396
397        let entry = WalEntry::put(sequence, key, value);
398        self.write_entry(&entry)?;
399
400        Ok(sequence)
401    }
402
403    /// Append a Delete entry
404    pub fn delete(&mut self, key: Key) -> Result<u64> {
405        let sequence = self.sequence;
406        self.sequence += 1;
407
408        let entry = WalEntry::delete(sequence, key);
409        self.write_entry(&entry)?;
410
411        Ok(sequence)
412    }
413
414    /// Write an entry to the log
415    fn write_entry(&mut self, entry: &WalEntry) -> Result<()> {
416        let bytes = entry.encode();
417
418        // Write length prefix
419        let len = bytes.len() as u32;
420        self.writer.write_all(&len.to_le_bytes()).map_err(|e| {
421            AmateRSError::IoError(ErrorContext::new(format!(
422                "Failed to write WAL entry: {}",
423                e
424            )))
425        })?;
426
427        // Write entry
428        self.writer.write_all(&bytes).map_err(|e| {
429            AmateRSError::IoError(ErrorContext::new(format!(
430                "Failed to write WAL entry: {}",
431                e
432            )))
433        })?;
434
435        // Update file size
436        let entry_size = (4 + bytes.len()) as u64; // 4 bytes for length prefix
437        self.current_file_size += entry_size;
438
439        // Optional: sync after each write for durability
440        if self.config.sync_on_write {
441            self.writer.flush().map_err(|e| {
442                AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
443            })?;
444        }
445
446        // Check if rotation is needed
447        if self.current_file_size >= self.config.max_file_size {
448            self.rotate()?;
449        }
450
451        Ok(())
452    }
453
454    /// Rotate to a new WAL file
455    pub fn rotate(&mut self) -> Result<()> {
456        // Flush current file
457        self.flush()?;
458
459        // Increment file number
460        self.current_file_number += 1;
461
462        // Create new WAL file
463        let new_path = Self::wal_file_path(&self.config.wal_dir, self.current_file_number);
464
465        let file = OpenOptions::new()
466            .create(true)
467            .append(true)
468            .open(&new_path)
469            .map_err(|e| {
470                AmateRSError::IoError(ErrorContext::new(format!(
471                    "Failed to create new WAL file: {}",
472                    e
473                )))
474            })?;
475
476        self.current_path = new_path;
477        self.writer = BufWriter::new(file);
478        self.current_file_size = 0;
479
480        // Clean up old WAL files
481        self.cleanup_old_wal_files()?;
482
483        Ok(())
484    }
485
486    /// Clean up old WAL files beyond the retention limit
487    fn cleanup_old_wal_files(&self) -> Result<()> {
488        let wal_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
489
490        if wal_files.len() > self.config.max_wal_files {
491            let files_to_delete = wal_files.len() - self.config.max_wal_files;
492
493            for &file_number in wal_files.iter().take(files_to_delete) {
494                let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
495                std::fs::remove_file(&file_path).map_err(|e| {
496                    AmateRSError::IoError(ErrorContext::new(format!(
497                        "Failed to delete old WAL file: {}",
498                        e
499                    )))
500                })?;
501            }
502        }
503
504        Ok(())
505    }
506
507    /// Manually trigger cleanup of old WAL files
508    pub fn cleanup(&self) -> Result<()> {
509        self.cleanup_old_wal_files()
510    }
511
512    /// Get current WAL file size
513    pub fn current_file_size(&self) -> u64 {
514        self.current_file_size
515    }
516
517    /// Get current WAL file number
518    pub fn current_file_number(&self) -> u64 {
519        self.current_file_number
520    }
521
522    /// Flush buffered writes to disk
523    pub fn flush(&mut self) -> Result<()> {
524        self.writer.flush().map_err(|e| {
525            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
526        })?;
527
528        self.writer.get_ref().sync_all().map_err(|e| {
529            AmateRSError::IoError(ErrorContext::new(format!("Failed to sync WAL: {}", e)))
530        })?;
531
532        Ok(())
533    }
534
535    /// Get current sequence number
536    pub fn sequence(&self) -> u64 {
537        self.sequence
538    }
539
540    /// Get WAL file path
541    pub fn path(&self) -> &Path {
542        &self.current_path
543    }
544
545    /// Recover all entries from WAL files in a directory
546    ///
547    /// Reads all WAL files in sequence order and returns recovered entries.
548    /// Handles corrupted and incomplete entries gracefully by skipping them.
549    ///
550    /// Returns (entries, max_sequence) where max_sequence is the highest
551    /// sequence number found during recovery.
552    pub fn recover(wal_dir: impl AsRef<Path>) -> Result<(Vec<WalEntry>, u64)> {
553        let wal_dir = wal_dir.as_ref();
554
555        if !wal_dir.exists() {
556            return Ok((Vec::new(), 0));
557        }
558
559        let wal_files = Self::list_wal_file_numbers(wal_dir)?;
560
561        let mut all_entries = Vec::new();
562        let mut max_sequence = 0u64;
563
564        for file_number in wal_files {
565            let file_path = Self::wal_file_path(wal_dir, file_number);
566            let mut reader = WalReader::open(&file_path)?;
567
568            loop {
569                match reader.read_entry() {
570                    Ok(Some(entry)) => {
571                        if entry.sequence > max_sequence {
572                            max_sequence = entry.sequence;
573                        }
574                        all_entries.push(entry);
575                    }
576                    Ok(None) => break,
577                    Err(e) => {
578                        tracing::warn!(
579                            "Skipping corrupted entry in {}: {}",
580                            file_path.display(),
581                            e
582                        );
583                        continue;
584                    }
585                }
586            }
587        }
588
589        Ok((all_entries, max_sequence))
590    }
591
592    /// Get current active WAL file size in bytes
593    pub fn current_size(&self) -> u64 {
594        self.current_file_size
595    }
596
597    /// Get total size of all WAL files in the WAL directory
598    pub fn total_wal_size(&self) -> Result<u64> {
599        let wal_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
600        let mut total_size = 0u64;
601
602        for file_number in wal_files {
603            let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
604            let metadata = std::fs::metadata(&file_path).map_err(|e| {
605                AmateRSError::IoError(ErrorContext::new(format!(
606                    "Failed to read WAL file metadata: {}",
607                    e
608                )))
609            })?;
610            total_size += metadata.len();
611        }
612
613        Ok(total_size)
614    }
615
616    /// Truncate WAL files whose max sequence number is <= the given sequence.
617    ///
618    /// This is used after a memtable flush to remove WAL files that are no longer needed.
619    /// The current active WAL file is never removed.
620    ///
621    /// Returns the number of files truncated (removed).
622    pub fn truncate_before(&mut self, sequence: u64) -> Result<u64> {
623        self.flush()?;
624
625        let all_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
626        // Exclude the current active file
627        let wal_files: Vec<u64> = all_files
628            .into_iter()
629            .filter(|&n| n != self.current_file_number)
630            .collect();
631
632        let mut files_truncated = 0u64;
633
634        for file_number in wal_files {
635            let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
636
637            // Read the file to find its max sequence
638            let mut file_max_seq = 0u64;
639            if let Ok(mut reader) = WalReader::open(&file_path) {
640                loop {
641                    match reader.read_entry() {
642                        Ok(Some(entry)) => {
643                            if entry.sequence > file_max_seq {
644                                file_max_seq = entry.sequence;
645                            }
646                        }
647                        Ok(None) => break,
648                        Err(_) => continue,
649                    }
650                }
651            }
652
653            // If all entries in this file are <= the given sequence, remove it
654            if file_max_seq <= sequence {
655                std::fs::remove_file(&file_path).map_err(|e| {
656                    AmateRSError::IoError(ErrorContext::new(format!(
657                        "Failed to remove WAL file {}: {}",
658                        file_path.display(),
659                        e
660                    )))
661                })?;
662                files_truncated += 1;
663            }
664        }
665
666        Ok(files_truncated)
667    }
668
669    /// Recover all entries from WAL files with detailed statistics
670    ///
671    /// Like `recover()`, but also returns `RecoveryStats` with counts of
672    /// recovered entries, corrupted entries, and total bytes recovered.
673    pub fn recover_with_stats(
674        wal_dir: impl AsRef<Path>,
675    ) -> Result<(Vec<WalEntry>, u64, RecoveryStats)> {
676        let wal_dir = wal_dir.as_ref();
677        let mut stats = RecoveryStats::default();
678
679        if !wal_dir.exists() {
680            return Ok((Vec::new(), 0, stats));
681        }
682
683        let wal_files = Self::list_wal_file_numbers(wal_dir)?;
684
685        let mut all_entries = Vec::new();
686        let mut max_sequence = 0u64;
687
688        for file_number in wal_files {
689            let file_path = Self::wal_file_path(wal_dir, file_number);
690            let mut reader = WalReader::open(&file_path)?;
691
692            loop {
693                match reader.read_entry() {
694                    Ok(Some(entry)) => {
695                        let entry_bytes = entry.encode().len() as u64 + 4; // +4 for length prefix
696                        stats.bytes_recovered += entry_bytes;
697                        stats.entries_recovered += 1;
698                        if entry.sequence > max_sequence {
699                            max_sequence = entry.sequence;
700                        }
701                        all_entries.push(entry);
702                    }
703                    Ok(None) => break,
704                    Err(e) => {
705                        stats.entries_corrupted += 1;
706                        tracing::warn!(
707                            "Skipping corrupted entry in {}: {}",
708                            file_path.display(),
709                            e
710                        );
711                        continue;
712                    }
713                }
714            }
715        }
716
717        Ok((all_entries, max_sequence, stats))
718    }
719
720    /// Replay WAL entries to a memtable
721    ///
722    /// Applies all entries from the WAL directory to the provided memtable.
723    /// This is used during crash recovery to rebuild memtable state.
724    ///
725    /// Returns the maximum sequence number found during replay.
726    pub fn replay_to_memtable(
727        wal_dir: impl AsRef<Path>,
728        memtable: &crate::storage::memtable::Memtable,
729    ) -> Result<u64> {
730        let (entries, max_sequence) = Self::recover(wal_dir)?;
731
732        for entry in entries {
733            match entry.entry_type {
734                WalEntryType::Put => {
735                    if let Some(value) = entry.value {
736                        memtable.put(entry.key, value)?;
737                    }
738                }
739                WalEntryType::Delete => {
740                    memtable.delete(entry.key)?;
741                }
742            }
743        }
744
745        Ok(max_sequence)
746    }
747}
748
749/// WAL reader for reading entries from a WAL file
750pub struct WalReader {
751    reader: BufReader<File>,
752}
753
754impl WalReader {
755    /// Open a WAL file for reading
756    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
757        let file = File::open(path.as_ref()).map_err(|e| {
758            AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL file: {}", e)))
759        })?;
760
761        Ok(Self {
762            reader: BufReader::new(file),
763        })
764    }
765
766    /// Read the next entry from the WAL file
767    ///
768    /// Returns:
769    /// - Ok(Some(entry)) if an entry was successfully read
770    /// - Ok(None) if end of file reached
771    /// - Err if a corrupted or incomplete entry is encountered
772    pub fn read_entry(&mut self) -> Result<Option<WalEntry>> {
773        // Read length prefix (4 bytes)
774        let mut len_bytes = [0u8; 4];
775        match self.reader.read_exact(&mut len_bytes) {
776            Ok(()) => {}
777            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
778                // End of file or incomplete length prefix
779                return Ok(None);
780            }
781            Err(e) => {
782                return Err(AmateRSError::IoError(ErrorContext::new(format!(
783                    "Failed to read WAL entry length: {}",
784                    e
785                ))));
786            }
787        }
788
789        let len = u32::from_le_bytes(len_bytes) as usize;
790
791        // Sanity check: reject unreasonably large entries (>100MB)
792        if len > 100 * 1024 * 1024 {
793            return Err(AmateRSError::SerializationError(ErrorContext::new(
794                format!("WAL entry too large: {} bytes", len),
795            )));
796        }
797
798        // Read entry bytes
799        let mut entry_bytes = vec![0u8; len];
800        match self.reader.read_exact(&mut entry_bytes) {
801            Ok(()) => {}
802            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
803                // Incomplete entry (crash during write)
804                return Err(AmateRSError::SerializationError(ErrorContext::new(
805                    "Incomplete WAL entry (truncated file)",
806                )));
807            }
808            Err(e) => {
809                return Err(AmateRSError::IoError(ErrorContext::new(format!(
810                    "Failed to read WAL entry: {}",
811                    e
812                ))));
813            }
814        }
815
816        // Decode entry (this includes checksum verification)
817        let entry = WalEntry::decode(&entry_bytes)?;
818
819        Ok(Some(entry))
820    }
821}
822
823#[cfg(test)]
824mod tests {
825    use super::*;
826    use crate::storage::Memtable;
827    use std::fs;
828    use tempfile::tempdir;
829
830    #[test]
831    fn test_wal_entry_encode_decode() -> Result<()> {
832        let key = Key::from_str("test_key");
833        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
834        let entry = WalEntry::put(42, key.clone(), value.clone());
835
836        let bytes = entry.encode();
837        let decoded = WalEntry::decode(&bytes)?;
838
839        assert_eq!(decoded.sequence, 42);
840        assert_eq!(decoded.entry_type, WalEntryType::Put);
841        assert_eq!(decoded.key, key);
842        assert_eq!(decoded.value, Some(value));
843
844        Ok(())
845    }
846
847    #[test]
848    fn test_wal_delete_entry() -> Result<()> {
849        let key = Key::from_str("delete_me");
850        let entry = WalEntry::delete(99, key.clone());
851
852        let bytes = entry.encode();
853        let decoded = WalEntry::decode(&bytes)?;
854
855        assert_eq!(decoded.sequence, 99);
856        assert_eq!(decoded.entry_type, WalEntryType::Delete);
857        assert_eq!(decoded.key, key);
858        assert_eq!(decoded.value, None);
859
860        Ok(())
861    }
862
863    #[test]
864    fn test_wal_checksum_verification() -> Result<()> {
865        let key = Key::from_str("test");
866        let value = CipherBlob::new(vec![1, 2, 3]);
867        let entry = WalEntry::put(1, key, value);
868
869        // Verify should pass
870        entry.verify_checksum()?;
871
872        // Corrupt checksum
873        let mut corrupted = entry.clone();
874        corrupted.checksum = 0;
875
876        // Verify should fail
877        assert!(corrupted.verify_checksum().is_err());
878
879        Ok(())
880    }
881
882    #[test]
883    fn test_wal_basic_operations() -> Result<()> {
884        let temp_dir = tempdir().map_err(|e| {
885            AmateRSError::IoError(ErrorContext::new(format!(
886                "Failed to create temp dir: {}",
887                e
888            )))
889        })?;
890        let wal_path = temp_dir.path().join("test.wal");
891
892        let mut wal = Wal::create(&wal_path)?;
893
894        // Write some entries
895        let seq1 = wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
896        let seq2 = wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
897        let seq3 = wal.delete(Key::from_str("key1"))?;
898
899        assert_eq!(seq1, 0);
900        assert_eq!(seq2, 1);
901        assert_eq!(seq3, 2);
902
903        wal.flush()?;
904
905        // Verify a WAL file was created (may be rotated, so check path() returns something that exists)
906        assert!(wal.path().exists());
907
908        Ok(())
909    }
910
911    #[test]
912    fn test_wal_sequence_increment() -> Result<()> {
913        let temp_dir = tempdir().map_err(|e| {
914            AmateRSError::IoError(ErrorContext::new(format!(
915                "Failed to create temp dir: {}",
916                e
917            )))
918        })?;
919        let wal_path = temp_dir.path().join("test_seq.wal");
920
921        let mut wal = Wal::create(&wal_path)?;
922
923        assert_eq!(wal.sequence(), 0);
924
925        wal.put(Key::from_str("key"), CipherBlob::new(vec![1]))?;
926        assert_eq!(wal.sequence(), 1);
927
928        wal.delete(Key::from_str("key"))?;
929        assert_eq!(wal.sequence(), 2);
930
931        Ok(())
932    }
933
934    #[test]
935    fn test_wal_entry_large_value() -> Result<()> {
936        let key = Key::from_str("large");
937        let large_value = CipherBlob::new(vec![0u8; 10_000]);
938        let entry = WalEntry::put(1, key.clone(), large_value.clone());
939
940        let bytes = entry.encode();
941        let decoded = WalEntry::decode(&bytes)?;
942
943        assert_eq!(decoded.key, key);
944        assert_eq!(decoded.value, Some(large_value));
945
946        Ok(())
947    }
948
949    #[test]
950    fn test_wal_rotation() -> Result<()> {
951        use std::env;
952
953        let temp_dir = env::temp_dir().join("test_wal_rotation");
954        std::fs::create_dir_all(&temp_dir).ok();
955
956        let config = WalConfig {
957            wal_dir: temp_dir.clone(),
958            max_file_size: 1024,  // Small size to trigger rotation
959            sync_on_write: false, // Disable for speed
960            ..Default::default()
961        };
962
963        let mut wal = Wal::with_config(config)?;
964
965        let initial_file_number = wal.current_file_number();
966
967        // Write enough data to trigger rotation
968        for i in 0..20 {
969            wal.put(
970                Key::from_str(&format!("key_{}", i)),
971                CipherBlob::new(vec![i as u8; 100]),
972            )?;
973        }
974
975        // File number should have increased due to rotation
976        assert!(wal.current_file_number() > initial_file_number);
977
978        // Verify new file exists
979        assert!(wal.path().exists());
980
981        std::fs::remove_dir_all(&temp_dir).ok();
982        Ok(())
983    }
984
985    #[test]
986    fn test_wal_cleanup() -> Result<()> {
987        use std::env;
988
989        let temp_dir = env::temp_dir().join("test_wal_cleanup");
990        std::fs::create_dir_all(&temp_dir).ok();
991
992        let config = WalConfig {
993            wal_dir: temp_dir.clone(),
994            max_file_size: 512, // Very small to trigger many rotations
995            max_wal_files: 3,   // Keep only 3 files
996            sync_on_write: false,
997        };
998
999        let mut wal = Wal::with_config(config)?;
1000
1001        // Write enough data to create many WAL files
1002        for i in 0..100 {
1003            wal.put(
1004                Key::from_str(&format!("key_{}", i)),
1005                CipherBlob::new(vec![i as u8; 100]),
1006            )?;
1007        }
1008
1009        // Count WAL files
1010        let wal_file_count = std::fs::read_dir(&temp_dir)?
1011            .filter_map(|e| e.ok())
1012            .filter(|e| {
1013                e.file_name().to_string_lossy().starts_with("wal_")
1014                    && e.file_name().to_string_lossy().ends_with(".log")
1015            })
1016            .count();
1017
1018        // Should have at most max_wal_files
1019        assert!(wal_file_count <= 3);
1020
1021        std::fs::remove_dir_all(&temp_dir).ok();
1022        Ok(())
1023    }
1024
1025    #[test]
1026    fn test_wal_manual_cleanup() -> Result<()> {
1027        use std::env;
1028
1029        let temp_dir = env::temp_dir().join("test_wal_manual_cleanup");
1030        std::fs::create_dir_all(&temp_dir).ok();
1031
1032        let config = WalConfig {
1033            wal_dir: temp_dir.clone(),
1034            max_file_size: 512,
1035            max_wal_files: 5,
1036            sync_on_write: false,
1037        };
1038
1039        let mut wal = Wal::with_config(config)?;
1040
1041        // Create several WAL files
1042        for i in 0..80 {
1043            wal.put(
1044                Key::from_str(&format!("key_{}", i)),
1045                CipherBlob::new(vec![i as u8; 100]),
1046            )?;
1047        }
1048
1049        // Manually trigger cleanup
1050        wal.cleanup()?;
1051
1052        // Count files
1053        let wal_file_count = std::fs::read_dir(&temp_dir)?
1054            .filter_map(|e| e.ok())
1055            .filter(|e| {
1056                e.file_name().to_string_lossy().starts_with("wal_")
1057                    && e.file_name().to_string_lossy().ends_with(".log")
1058            })
1059            .count();
1060
1061        assert!(wal_file_count <= 5);
1062
1063        std::fs::remove_dir_all(&temp_dir).ok();
1064        Ok(())
1065    }
1066
1067    #[test]
1068    fn test_wal_recovery_basic() -> Result<()> {
1069        use std::env;
1070
1071        let temp_dir = env::temp_dir().join("test_wal_recovery_basic");
1072        std::fs::create_dir_all(&temp_dir).ok();
1073
1074        // Write some entries
1075        {
1076            let config = WalConfig {
1077                wal_dir: temp_dir.clone(),
1078                sync_on_write: true,
1079                ..Default::default()
1080            };
1081
1082            let mut wal = Wal::with_config(config)?;
1083
1084            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1085            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1086            wal.delete(Key::from_str("key1"))?;
1087            wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1088
1089            wal.flush()?;
1090        }
1091
1092        // Recover entries
1093        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1094
1095        assert_eq!(entries.len(), 4);
1096        assert_eq!(max_sequence, 3);
1097
1098        // Verify entries
1099        assert_eq!(entries[0].key, Key::from_str("key1"));
1100        assert_eq!(entries[0].entry_type, WalEntryType::Put);
1101        assert_eq!(entries[0].value, Some(CipherBlob::new(vec![1, 2, 3])));
1102
1103        assert_eq!(entries[1].key, Key::from_str("key2"));
1104        assert_eq!(entries[1].entry_type, WalEntryType::Put);
1105
1106        assert_eq!(entries[2].key, Key::from_str("key1"));
1107        assert_eq!(entries[2].entry_type, WalEntryType::Delete);
1108        assert_eq!(entries[2].value, None);
1109
1110        assert_eq!(entries[3].key, Key::from_str("key3"));
1111        assert_eq!(entries[3].entry_type, WalEntryType::Put);
1112
1113        std::fs::remove_dir_all(&temp_dir).ok();
1114        Ok(())
1115    }
1116
1117    #[test]
1118    fn test_wal_recovery_multiple_files() -> Result<()> {
1119        use std::env;
1120
1121        let temp_dir = env::temp_dir().join("test_wal_recovery_multiple");
1122        std::fs::create_dir_all(&temp_dir).ok();
1123
1124        // Write entries across multiple WAL files
1125        {
1126            let config = WalConfig {
1127                wal_dir: temp_dir.clone(),
1128                max_file_size: 512, // Small to trigger rotation
1129                sync_on_write: true,
1130                ..Default::default()
1131            };
1132
1133            let mut wal = Wal::with_config(config)?;
1134
1135            // Write enough data to create multiple files
1136            for i in 0..20 {
1137                wal.put(
1138                    Key::from_str(&format!("key_{}", i)),
1139                    CipherBlob::new(vec![i as u8; 100]),
1140                )?;
1141            }
1142
1143            wal.flush()?;
1144        }
1145
1146        // Recover all entries
1147        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1148
1149        assert_eq!(entries.len(), 20);
1150        assert_eq!(max_sequence, 19);
1151
1152        // Verify entries are in sequence order
1153        for (i, entry) in entries.iter().enumerate() {
1154            assert_eq!(entry.sequence, i as u64);
1155            assert_eq!(entry.key, Key::from_str(&format!("key_{}", i)));
1156        }
1157
1158        std::fs::remove_dir_all(&temp_dir).ok();
1159        Ok(())
1160    }
1161
1162    #[test]
1163    fn test_wal_recovery_empty_directory() -> Result<()> {
1164        use std::env;
1165
1166        let temp_dir = env::temp_dir().join("test_wal_recovery_empty");
1167        std::fs::create_dir_all(&temp_dir).ok();
1168
1169        // Recover from empty directory
1170        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1171
1172        assert_eq!(entries.len(), 0);
1173        assert_eq!(max_sequence, 0);
1174
1175        std::fs::remove_dir_all(&temp_dir).ok();
1176        Ok(())
1177    }
1178
1179    #[test]
1180    fn test_wal_recovery_nonexistent_directory() -> Result<()> {
1181        use std::env;
1182
1183        let temp_dir = env::temp_dir().join("nonexistent_wal_dir_12345");
1184
1185        // Recover from non-existent directory
1186        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1187
1188        assert_eq!(entries.len(), 0);
1189        assert_eq!(max_sequence, 0);
1190
1191        Ok(())
1192    }
1193
1194    #[test]
1195    fn test_wal_replay_to_memtable() -> Result<()> {
1196        use std::env;
1197
1198        let temp_dir = env::temp_dir().join("test_wal_replay_memtable");
1199        std::fs::create_dir_all(&temp_dir).ok();
1200
1201        // Write some entries
1202        {
1203            let config = WalConfig {
1204                wal_dir: temp_dir.clone(),
1205                sync_on_write: true,
1206                ..Default::default()
1207            };
1208
1209            let mut wal = Wal::with_config(config)?;
1210
1211            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1212            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1213            wal.delete(Key::from_str("key1"))?;
1214            wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1215
1216            wal.flush()?;
1217        }
1218
1219        // Create a new memtable and replay WAL
1220        let memtable = Memtable::new();
1221        let max_sequence = Wal::replay_to_memtable(&temp_dir, &memtable)?;
1222
1223        assert_eq!(max_sequence, 3);
1224
1225        // Verify memtable state
1226        assert_eq!(memtable.get(&Key::from_str("key1"))?, None); // Deleted
1227        assert_eq!(
1228            memtable.get(&Key::from_str("key2"))?,
1229            Some(CipherBlob::new(vec![4, 5, 6]))
1230        );
1231        assert_eq!(
1232            memtable.get(&Key::from_str("key3"))?,
1233            Some(CipherBlob::new(vec![7, 8, 9]))
1234        );
1235
1236        std::fs::remove_dir_all(&temp_dir).ok();
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn test_wal_reader_basic() -> Result<()> {
1242        use std::env;
1243
1244        let temp_dir = env::temp_dir().join("test_wal_reader_basic");
1245        std::fs::create_dir_all(&temp_dir).ok();
1246
1247        let wal_file = temp_dir.join("test.wal");
1248
1249        // Write some entries
1250        {
1251            let mut wal = Wal::create(&wal_file)?;
1252            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1253            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1254            wal.flush()?;
1255        }
1256
1257        // Read entries with WalReader
1258        let wal_file_actual = temp_dir.join("wal_00000000.log");
1259        let mut reader = WalReader::open(&wal_file_actual)?;
1260
1261        let entry1 = reader.read_entry()?.expect("Should have entry 1");
1262        assert_eq!(entry1.sequence, 0);
1263        assert_eq!(entry1.key, Key::from_str("key1"));
1264
1265        let entry2 = reader.read_entry()?.expect("Should have entry 2");
1266        assert_eq!(entry2.sequence, 1);
1267        assert_eq!(entry2.key, Key::from_str("key2"));
1268
1269        let entry3 = reader.read_entry()?;
1270        assert_eq!(entry3, None); // End of file
1271
1272        std::fs::remove_dir_all(&temp_dir).ok();
1273        Ok(())
1274    }
1275
1276    #[test]
1277    fn test_wal_recovery_with_truncated_file() -> Result<()> {
1278        use std::env;
1279        use std::io::Write as IoWrite;
1280
1281        let temp_dir = env::temp_dir().join("test_wal_recovery_truncated");
1282        std::fs::create_dir_all(&temp_dir).ok();
1283
1284        // Write some valid entries, then truncate the last one
1285        let wal_file = temp_dir.join("wal_00000000.log");
1286        {
1287            let mut wal = Wal::create(&wal_file)?;
1288            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1289            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1290            wal.flush()?;
1291
1292            // Append incomplete entry (length prefix only, no data)
1293            let mut file = OpenOptions::new().append(true).open(&wal_file)?;
1294            let incomplete_len = 1234u32;
1295            file.write_all(&incomplete_len.to_le_bytes())?;
1296            file.flush()?;
1297        }
1298
1299        // Recovery should handle truncated entry gracefully
1300        let (entries, _) = Wal::recover(&temp_dir)?;
1301
1302        // Should recover the 2 complete entries, skip the truncated one
1303        assert_eq!(entries.len(), 2);
1304        assert_eq!(entries[0].key, Key::from_str("key1"));
1305        assert_eq!(entries[1].key, Key::from_str("key2"));
1306
1307        std::fs::remove_dir_all(&temp_dir).ok();
1308        Ok(())
1309    }
1310
1311    #[test]
1312    fn test_wal_sequence_recovery_after_crash() -> Result<()> {
1313        use std::env;
1314
1315        let temp_dir = env::temp_dir().join("test_wal_seq_recovery_crash");
1316        // Clean up from any previous run
1317        std::fs::remove_dir_all(&temp_dir).ok();
1318        std::fs::create_dir_all(&temp_dir).ok();
1319
1320        // Phase 1: Write entries and then drop (simulate crash)
1321        {
1322            let config = WalConfig {
1323                wal_dir: temp_dir.clone(),
1324                sync_on_write: true,
1325                ..Default::default()
1326            };
1327
1328            let mut wal = Wal::with_config(config)?;
1329
1330            wal.put(Key::from_str("a"), CipherBlob::new(vec![1]))?;
1331            wal.put(Key::from_str("b"), CipherBlob::new(vec![2]))?;
1332            wal.put(Key::from_str("c"), CipherBlob::new(vec![3]))?;
1333            wal.put(Key::from_str("d"), CipherBlob::new(vec![4]))?;
1334            wal.put(Key::from_str("e"), CipherBlob::new(vec![5]))?;
1335            wal.flush()?;
1336            // sequences 0..4 written, next should be 5
1337        }
1338
1339        // Phase 2: Open a new WAL instance - should recover sequence
1340        {
1341            let config = WalConfig {
1342                wal_dir: temp_dir.clone(),
1343                sync_on_write: true,
1344                ..Default::default()
1345            };
1346
1347            let mut wal = Wal::with_config(config)?;
1348
1349            // Sequence should continue from 5 (max was 4, so next is 5)
1350            assert_eq!(wal.sequence(), 5);
1351
1352            // Write more entries
1353            let seq = wal.put(Key::from_str("f"), CipherBlob::new(vec![6]))?;
1354            assert_eq!(seq, 5);
1355
1356            let seq = wal.put(Key::from_str("g"), CipherBlob::new(vec![7]))?;
1357            assert_eq!(seq, 6);
1358
1359            wal.flush()?;
1360        }
1361
1362        // Phase 3: Verify all entries are recoverable
1363        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1364        assert_eq!(entries.len(), 7);
1365        assert_eq!(max_sequence, 6);
1366
1367        std::fs::remove_dir_all(&temp_dir).ok();
1368        Ok(())
1369    }
1370
1371    #[test]
1372    fn test_wal_corruption_detection_and_partial_recovery() -> Result<()> {
1373        use std::env;
1374        use std::io::Write as IoWrite;
1375
1376        let temp_dir = env::temp_dir().join("test_wal_corruption_detect");
1377        std::fs::remove_dir_all(&temp_dir).ok();
1378        std::fs::create_dir_all(&temp_dir).ok();
1379
1380        let wal_file = temp_dir.join("wal_00000000.log");
1381
1382        // Write valid entries
1383        {
1384            let config = WalConfig {
1385                wal_dir: temp_dir.clone(),
1386                sync_on_write: true,
1387                ..Default::default()
1388            };
1389
1390            let mut wal = Wal::with_config(config)?;
1391            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1392            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1393            wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1394            wal.flush()?;
1395        }
1396
1397        // Corrupt the middle entry by modifying bytes in the WAL file
1398        {
1399            let data = std::fs::read(&wal_file).map_err(|e| {
1400                AmateRSError::IoError(ErrorContext::new(format!("Failed to read WAL: {}", e)))
1401            })?;
1402
1403            let mut corrupted_data = data.clone();
1404            // The first entry starts at offset 0: 4 bytes length prefix + entry data
1405            // Find the start of the second entry by reading the first entry's length
1406            let first_entry_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
1407            let second_entry_start = 4 + first_entry_len;
1408
1409            // Corrupt bytes inside the second entry (after length prefix, corrupt the checksum area)
1410            let corrupt_offset = second_entry_start + 4 + 10; // Skip length prefix and some bytes
1411            if corrupt_offset < corrupted_data.len() {
1412                corrupted_data[corrupt_offset] ^= 0xFF;
1413            }
1414
1415            let mut file = File::create(&wal_file).map_err(|e| {
1416                AmateRSError::IoError(ErrorContext::new(format!("Failed to create file: {}", e)))
1417            })?;
1418            file.write_all(&corrupted_data).map_err(|e| {
1419                AmateRSError::IoError(ErrorContext::new(format!("Failed to write file: {}", e)))
1420            })?;
1421            file.flush().map_err(|e| {
1422                AmateRSError::IoError(ErrorContext::new(format!("Failed to flush file: {}", e)))
1423            })?;
1424        }
1425
1426        // Recovery should detect corruption and skip the corrupted entry
1427        let (entries, _max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1428
1429        // We should have recovered 2 out of 3 entries (one corrupted)
1430        assert_eq!(stats.entries_corrupted, 1);
1431        assert_eq!(stats.entries_recovered, entries.len() as u64);
1432        assert!(stats.bytes_recovered > 0);
1433        // At least 2 entries should be recovered (first and possibly third)
1434        assert!(entries.len() >= 2);
1435
1436        std::fs::remove_dir_all(&temp_dir).ok();
1437        Ok(())
1438    }
1439
1440    #[test]
1441    fn test_wal_truncate_before() -> Result<()> {
1442        use std::env;
1443
1444        let temp_dir = env::temp_dir().join("test_wal_truncate_before");
1445        std::fs::remove_dir_all(&temp_dir).ok();
1446        std::fs::create_dir_all(&temp_dir).ok();
1447
1448        let config = WalConfig {
1449            wal_dir: temp_dir.clone(),
1450            max_file_size: 512, // Small to trigger rotation
1451            max_wal_files: 100, // Don't auto-cleanup
1452            sync_on_write: true,
1453        };
1454
1455        let mut wal = Wal::with_config(config)?;
1456
1457        // Write enough entries to create multiple WAL files
1458        for i in 0..30 {
1459            wal.put(
1460                Key::from_str(&format!("key_{}", i)),
1461                CipherBlob::new(vec![i as u8; 100]),
1462            )?;
1463        }
1464        wal.flush()?;
1465
1466        // Ensure multiple files were created
1467        let file_count_before = std::fs::read_dir(&temp_dir)
1468            .map_err(|e| {
1469                AmateRSError::IoError(ErrorContext::new(format!("Failed to read dir: {}", e)))
1470            })?
1471            .filter_map(|e| e.ok())
1472            .filter(|e| {
1473                let name = e.file_name().to_string_lossy().to_string();
1474                name.starts_with("wal_") && name.ends_with(".log")
1475            })
1476            .count();
1477        assert!(file_count_before > 1, "Should have multiple WAL files");
1478
1479        // Truncate all entries with sequence <= 10
1480        let truncated = wal.truncate_before(10)?;
1481
1482        // Should have truncated at least one file
1483        assert!(truncated > 0, "Should have truncated at least one file");
1484
1485        // Verify remaining entries all have sequence > 10 or are in the current file
1486        let (remaining_entries, _) = Wal::recover(&temp_dir)?;
1487        // Entries in remaining files should include those with seq > 10
1488        // (some with seq <= 10 may remain if they share a file with seq > 10 entries)
1489        let has_high_seq = remaining_entries.iter().any(|e| e.sequence > 10);
1490        assert!(has_high_seq, "Should still have entries with sequence > 10");
1491
1492        std::fs::remove_dir_all(&temp_dir).ok();
1493        Ok(())
1494    }
1495
1496    #[test]
1497    fn test_wal_size_tracking() -> Result<()> {
1498        use std::env;
1499
1500        let temp_dir = env::temp_dir().join("test_wal_size_tracking");
1501        std::fs::remove_dir_all(&temp_dir).ok();
1502        std::fs::create_dir_all(&temp_dir).ok();
1503
1504        let config = WalConfig {
1505            wal_dir: temp_dir.clone(),
1506            sync_on_write: true,
1507            ..Default::default()
1508        };
1509
1510        let mut wal = Wal::with_config(config)?;
1511
1512        // Initial size should be 0
1513        assert_eq!(wal.current_size(), 0);
1514
1515        // Write an entry
1516        wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1517        let size_after_one = wal.current_size();
1518        assert!(size_after_one > 0, "Size should increase after writing");
1519
1520        // Write another entry
1521        wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1522        let size_after_two = wal.current_size();
1523        assert!(
1524            size_after_two > size_after_one,
1525            "Size should increase with more entries"
1526        );
1527
1528        wal.flush()?;
1529
1530        // Total WAL size should match current size (single file)
1531        let total = wal.total_wal_size()?;
1532        assert_eq!(total, size_after_two);
1533
1534        std::fs::remove_dir_all(&temp_dir).ok();
1535        Ok(())
1536    }
1537
1538    #[test]
1539    fn test_wal_total_size_multiple_files() -> Result<()> {
1540        use std::env;
1541
1542        let temp_dir = env::temp_dir().join("test_wal_total_size_multi");
1543        std::fs::remove_dir_all(&temp_dir).ok();
1544        std::fs::create_dir_all(&temp_dir).ok();
1545
1546        let config = WalConfig {
1547            wal_dir: temp_dir.clone(),
1548            max_file_size: 512,
1549            max_wal_files: 100,
1550            sync_on_write: true,
1551        };
1552
1553        let mut wal = Wal::with_config(config)?;
1554
1555        for i in 0..20 {
1556            wal.put(
1557                Key::from_str(&format!("key_{}", i)),
1558                CipherBlob::new(vec![i as u8; 100]),
1559            )?;
1560        }
1561        wal.flush()?;
1562
1563        let total = wal.total_wal_size()?;
1564        assert!(total > 0, "Total WAL size should be positive");
1565
1566        // Total should be larger than current file size if we have multiple files
1567        if wal.current_file_number() > 0 {
1568            assert!(
1569                total >= wal.current_size(),
1570                "Total size should be >= current file size"
1571            );
1572        }
1573
1574        std::fs::remove_dir_all(&temp_dir).ok();
1575        Ok(())
1576    }
1577
1578    #[test]
1579    fn test_wal_empty_recovery() -> Result<()> {
1580        use std::env;
1581
1582        let temp_dir = env::temp_dir().join("test_wal_empty_recovery");
1583        std::fs::remove_dir_all(&temp_dir).ok();
1584        std::fs::create_dir_all(&temp_dir).ok();
1585
1586        // Create an empty WAL file
1587        {
1588            let config = WalConfig {
1589                wal_dir: temp_dir.clone(),
1590                sync_on_write: true,
1591                ..Default::default()
1592            };
1593            let wal = Wal::with_config(config)?;
1594            drop(wal);
1595        }
1596
1597        // Recovery from directory with empty WAL file
1598        let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1599        assert_eq!(entries.len(), 0);
1600        assert_eq!(max_seq, 0);
1601        assert_eq!(stats.entries_recovered, 0);
1602        assert_eq!(stats.entries_corrupted, 0);
1603
1604        std::fs::remove_dir_all(&temp_dir).ok();
1605        Ok(())
1606    }
1607
1608    #[test]
1609    fn test_wal_single_entry_recovery() -> Result<()> {
1610        use std::env;
1611
1612        let temp_dir = env::temp_dir().join("test_wal_single_entry_recovery");
1613        std::fs::remove_dir_all(&temp_dir).ok();
1614        std::fs::create_dir_all(&temp_dir).ok();
1615
1616        {
1617            let config = WalConfig {
1618                wal_dir: temp_dir.clone(),
1619                sync_on_write: true,
1620                ..Default::default()
1621            };
1622
1623            let mut wal = Wal::with_config(config)?;
1624            wal.put(Key::from_str("only_key"), CipherBlob::new(vec![42]))?;
1625            wal.flush()?;
1626        }
1627
1628        let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1629        assert_eq!(entries.len(), 1);
1630        assert_eq!(max_seq, 0);
1631        assert_eq!(stats.entries_recovered, 1);
1632        assert_eq!(stats.entries_corrupted, 0);
1633        assert!(stats.bytes_recovered > 0);
1634        assert_eq!(entries[0].key, Key::from_str("only_key"));
1635
1636        std::fs::remove_dir_all(&temp_dir).ok();
1637        Ok(())
1638    }
1639
1640    #[test]
1641    fn test_wal_large_recovery() -> Result<()> {
1642        use std::env;
1643
1644        let temp_dir = env::temp_dir().join("test_wal_large_recovery");
1645        std::fs::remove_dir_all(&temp_dir).ok();
1646        std::fs::create_dir_all(&temp_dir).ok();
1647
1648        let entry_count = 500;
1649
1650        {
1651            let config = WalConfig {
1652                wal_dir: temp_dir.clone(),
1653                max_file_size: 4096,
1654                max_wal_files: 1000,
1655                sync_on_write: false,
1656            };
1657
1658            let mut wal = Wal::with_config(config)?;
1659
1660            for i in 0..entry_count {
1661                wal.put(
1662                    Key::from_str(&format!("large_key_{:05}", i)),
1663                    CipherBlob::new(vec![(i % 256) as u8; 50]),
1664                )?;
1665            }
1666            wal.flush()?;
1667        }
1668
1669        // Recover and verify
1670        let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1671        assert_eq!(entries.len(), entry_count);
1672        assert_eq!(max_seq, (entry_count - 1) as u64);
1673        assert_eq!(stats.entries_recovered, entry_count as u64);
1674        assert_eq!(stats.entries_corrupted, 0);
1675        assert!(stats.bytes_recovered > 0);
1676
1677        // Verify sequence order
1678        for (i, entry) in entries.iter().enumerate() {
1679            assert_eq!(entry.sequence, i as u64);
1680        }
1681
1682        std::fs::remove_dir_all(&temp_dir).ok();
1683        Ok(())
1684    }
1685
1686    #[test]
1687    fn test_wal_truncate_keeps_current_file() -> Result<()> {
1688        use std::env;
1689
1690        let temp_dir = env::temp_dir().join("test_wal_truncate_keeps_current");
1691        std::fs::remove_dir_all(&temp_dir).ok();
1692        std::fs::create_dir_all(&temp_dir).ok();
1693
1694        let config = WalConfig {
1695            wal_dir: temp_dir.clone(),
1696            max_file_size: 512,
1697            max_wal_files: 100,
1698            sync_on_write: true,
1699        };
1700
1701        let mut wal = Wal::with_config(config)?;
1702
1703        for i in 0..30 {
1704            wal.put(
1705                Key::from_str(&format!("key_{}", i)),
1706                CipherBlob::new(vec![i as u8; 100]),
1707            )?;
1708        }
1709        wal.flush()?;
1710
1711        let current_file_num = wal.current_file_number();
1712
1713        // Truncate everything (use a very high sequence number)
1714        wal.truncate_before(u64::MAX)?;
1715
1716        // Current file should still exist
1717        let current_path = Wal::wal_file_path(&temp_dir, current_file_num);
1718        assert!(
1719            current_path.exists(),
1720            "Current active WAL file should not be removed"
1721        );
1722
1723        std::fs::remove_dir_all(&temp_dir).ok();
1724        Ok(())
1725    }
1726
1727    #[test]
1728    fn test_wal_sequence_recovery_across_rotations() -> Result<()> {
1729        use std::env;
1730
1731        let temp_dir = env::temp_dir().join("test_wal_seq_recovery_rotation");
1732        std::fs::remove_dir_all(&temp_dir).ok();
1733        std::fs::create_dir_all(&temp_dir).ok();
1734
1735        // Phase 1: Write entries across multiple rotations
1736        let entries_written;
1737        {
1738            let config = WalConfig {
1739                wal_dir: temp_dir.clone(),
1740                max_file_size: 512,
1741                max_wal_files: 100,
1742                sync_on_write: true,
1743            };
1744
1745            let mut wal = Wal::with_config(config)?;
1746
1747            for i in 0..25 {
1748                wal.put(
1749                    Key::from_str(&format!("rkey_{}", i)),
1750                    CipherBlob::new(vec![i as u8; 80]),
1751                )?;
1752            }
1753            wal.flush()?;
1754            entries_written = wal.sequence();
1755        }
1756
1757        // Phase 2: Open new WAL and verify sequence continues
1758        {
1759            let config = WalConfig {
1760                wal_dir: temp_dir.clone(),
1761                max_file_size: 512,
1762                max_wal_files: 100,
1763                sync_on_write: true,
1764            };
1765
1766            let wal = Wal::with_config(config)?;
1767            assert_eq!(
1768                wal.sequence(),
1769                entries_written,
1770                "Sequence should continue from where it left off"
1771            );
1772        }
1773
1774        std::fs::remove_dir_all(&temp_dir).ok();
1775        Ok(())
1776    }
1777
1778    #[test]
1779    fn test_wal_recovery_stats_with_corruption() -> Result<()> {
1780        use std::env;
1781        use std::io::Write as IoWrite;
1782
1783        let temp_dir = env::temp_dir().join("test_wal_recovery_stats_corrupt");
1784        std::fs::remove_dir_all(&temp_dir).ok();
1785        std::fs::create_dir_all(&temp_dir).ok();
1786
1787        let wal_file = temp_dir.join("wal_00000000.log");
1788
1789        // Write valid entries
1790        {
1791            let config = WalConfig {
1792                wal_dir: temp_dir.clone(),
1793                sync_on_write: true,
1794                ..Default::default()
1795            };
1796
1797            let mut wal = Wal::with_config(config)?;
1798            wal.put(Key::from_str("s1"), CipherBlob::new(vec![10]))?;
1799            wal.put(Key::from_str("s2"), CipherBlob::new(vec![20]))?;
1800            wal.flush()?;
1801        }
1802
1803        // Append garbage data that looks like a valid length prefix but has bad content
1804        {
1805            let mut file = OpenOptions::new()
1806                .append(true)
1807                .open(&wal_file)
1808                .map_err(|e| {
1809                    AmateRSError::IoError(ErrorContext::new(format!(
1810                        "Failed to open for corruption: {}",
1811                        e
1812                    )))
1813                })?;
1814            // Write a length prefix for 30 bytes, then 30 bytes of garbage
1815            let fake_len = 30u32;
1816            file.write_all(&fake_len.to_le_bytes()).map_err(|e| {
1817                AmateRSError::IoError(ErrorContext::new(format!("write error: {}", e)))
1818            })?;
1819            file.write_all(&[0xDE; 30]).map_err(|e| {
1820                AmateRSError::IoError(ErrorContext::new(format!("write error: {}", e)))
1821            })?;
1822            file.flush().map_err(|e| {
1823                AmateRSError::IoError(ErrorContext::new(format!("flush error: {}", e)))
1824            })?;
1825        }
1826
1827        let (_entries, _max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1828
1829        assert_eq!(stats.entries_recovered, 2);
1830        assert!(
1831            stats.entries_corrupted >= 1,
1832            "Should detect at least one corrupted entry"
1833        );
1834
1835        std::fs::remove_dir_all(&temp_dir).ok();
1836        Ok(())
1837    }
1838}