amaters_core/storage/
wal.rs

1//! Write-Ahead Log (WAL) implementation
2//!
3//! The WAL provides durability by logging all writes before they're applied to the memtable.
4//! In case of crash, the WAL can be replayed to recover the memtable state.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::types::{CipherBlob, Key};
8use std::fs::{File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11
12/// WAL entry type
13#[derive(Debug, Clone, PartialEq)]
14pub enum WalEntryType {
15    Put = 1,
16    Delete = 2,
17}
18
19/// WAL entry
20#[derive(Debug, Clone, PartialEq)]
21pub struct WalEntry {
22    /// Sequence number for ordering
23    pub sequence: u64,
24    /// Entry type
25    pub entry_type: WalEntryType,
26    /// Key
27    pub key: Key,
28    /// Value (None for deletes)
29    pub value: Option<CipherBlob>,
30    /// CRC32 checksum for integrity
31    pub checksum: u32,
32}
33
34impl WalEntry {
35    /// Create a Put entry
36    pub fn put(sequence: u64, key: Key, value: CipherBlob) -> Self {
37        let mut entry = Self {
38            sequence,
39            entry_type: WalEntryType::Put,
40            key,
41            value: Some(value),
42            checksum: 0,
43        };
44        entry.checksum = entry.calculate_checksum();
45        entry
46    }
47
48    /// Create a Delete entry
49    pub fn delete(sequence: u64, key: Key) -> Self {
50        let mut entry = Self {
51            sequence,
52            entry_type: WalEntryType::Delete,
53            key,
54            value: None,
55            checksum: 0,
56        };
57        entry.checksum = entry.calculate_checksum();
58        entry
59    }
60
61    /// Calculate checksum for the entry
62    fn calculate_checksum(&self) -> u32 {
63        let mut hasher = crc32fast::Hasher::new();
64
65        // Hash sequence
66        hasher.update(&self.sequence.to_le_bytes());
67
68        // Hash entry type
69        hasher.update(&[self.entry_type.clone() as u8]);
70
71        // Hash key
72        hasher.update(self.key.as_bytes());
73
74        // Hash value if present
75        if let Some(ref value) = self.value {
76            hasher.update(value.as_bytes());
77        }
78
79        hasher.finalize()
80    }
81
82    /// Verify checksum
83    pub fn verify_checksum(&self) -> Result<()> {
84        let calculated = self.calculate_checksum();
85        if calculated == self.checksum {
86            Ok(())
87        } else {
88            Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
89                "WAL entry checksum mismatch: expected {}, got {}",
90                self.checksum, calculated
91            ))))
92        }
93    }
94
95    /// Encode entry to bytes
96    pub fn encode(&self) -> Vec<u8> {
97        let mut bytes = Vec::new();
98
99        // Magic number (0x57414C = "WAL" in hex)
100        bytes.extend_from_slice(&0x57414Cu32.to_le_bytes());
101
102        // Sequence
103        bytes.extend_from_slice(&self.sequence.to_le_bytes());
104
105        // Entry type
106        bytes.push(self.entry_type.clone() as u8);
107
108        // Key length and data
109        bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
110        bytes.extend_from_slice(self.key.as_bytes());
111
112        // Value length and data
113        if let Some(ref value) = self.value {
114            bytes.extend_from_slice(&(value.len() as u32).to_le_bytes());
115            bytes.extend_from_slice(value.as_bytes());
116        } else {
117            bytes.extend_from_slice(&0u32.to_le_bytes());
118        }
119
120        // Checksum
121        bytes.extend_from_slice(&self.checksum.to_le_bytes());
122
123        bytes
124    }
125
126    /// Decode entry from bytes
127    pub fn decode(bytes: &[u8]) -> Result<Self> {
128        if bytes.len() < 17 {
129            // Magic (4) + Sequence (8) + Type (1) + KeyLen (4)
130            return Err(AmateRSError::SerializationError(ErrorContext::new(
131                "WAL entry too short",
132            )));
133        }
134
135        let mut offset = 0;
136
137        // Verify magic number
138        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
139        if magic != 0x57414C {
140            return Err(AmateRSError::SerializationError(ErrorContext::new(
141                "Invalid WAL entry magic number",
142            )));
143        }
144        offset += 4;
145
146        // Sequence
147        let sequence = u64::from_le_bytes(bytes[offset..offset + 8].try_into().map_err(|_| {
148            AmateRSError::SerializationError(ErrorContext::new("Failed to read sequence"))
149        })?);
150        offset += 8;
151
152        // Entry type
153        let entry_type = match bytes[offset] {
154            1 => WalEntryType::Put,
155            2 => WalEntryType::Delete,
156            _ => {
157                return Err(AmateRSError::SerializationError(ErrorContext::new(
158                    "Invalid WAL entry type",
159                )));
160            }
161        };
162        offset += 1;
163
164        // Key
165        let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
166            AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
167        })?) as usize;
168        offset += 4;
169
170        let key_bytes = &bytes[offset..offset + key_len];
171        let key = Key::from_slice(key_bytes);
172        offset += key_len;
173
174        // Value
175        let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
176            AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
177        })?) as usize;
178        offset += 4;
179
180        let value = if value_len > 0 {
181            let value_bytes = &bytes[offset..offset + value_len];
182            Some(CipherBlob::new(value_bytes.to_vec()))
183        } else {
184            None
185        };
186        offset += value_len;
187
188        // Checksum
189        let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
190            AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
191        })?);
192
193        let entry = Self {
194            sequence,
195            entry_type,
196            key,
197            value,
198            checksum,
199        };
200
201        // Verify checksum
202        entry.verify_checksum()?;
203
204        Ok(entry)
205    }
206}
207
208/// WAL configuration
209#[derive(Debug, Clone)]
210pub struct WalConfig {
211    /// Directory for WAL files
212    pub wal_dir: PathBuf,
213    /// Maximum WAL file size before rotation (default: 64MB)
214    pub max_file_size: u64,
215    /// Maximum number of WAL files to keep (default: 10)
216    pub max_wal_files: usize,
217    /// Whether to sync after each write (default: true for durability)
218    pub sync_on_write: bool,
219}
220
221impl Default for WalConfig {
222    fn default() -> Self {
223        Self {
224            wal_dir: PathBuf::from("./wal"),
225            max_file_size: 64 * 1024 * 1024, // 64MB
226            max_wal_files: 10,
227            sync_on_write: true,
228        }
229    }
230}
231
232/// Write-Ahead Log
233pub struct Wal {
234    /// Configuration
235    config: WalConfig,
236    /// Current WAL file path
237    current_path: PathBuf,
238    /// Writer for current WAL file
239    writer: BufWriter<File>,
240    /// Global sequence number across all WAL files
241    sequence: u64,
242    /// Current file size in bytes
243    current_file_size: u64,
244    /// Current WAL file number
245    current_file_number: u64,
246}
247
248impl Wal {
249    /// Create or open a WAL file (simple API for backward compatibility)
250    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
251        let path = path.as_ref().to_path_buf();
252        let parent = path.parent().ok_or_else(|| {
253            AmateRSError::IoError(ErrorContext::new("WAL path has no parent directory"))
254        })?;
255
256        let config = WalConfig {
257            wal_dir: parent.to_path_buf(),
258            ..Default::default()
259        };
260
261        Self::with_config(config)
262    }
263
264    /// Create a new WAL with custom configuration
265    pub fn with_config(config: WalConfig) -> Result<Self> {
266        // Create WAL directory if it doesn't exist
267        std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
268            AmateRSError::IoError(ErrorContext::new(format!(
269                "Failed to create WAL directory: {}",
270                e
271            )))
272        })?;
273
274        // Find the latest WAL file or create a new one
275        let (file_number, sequence) = Self::find_latest_wal(&config)?;
276
277        let current_path = Self::wal_file_path(&config.wal_dir, file_number);
278
279        let file = OpenOptions::new()
280            .create(true)
281            .append(true)
282            .open(&current_path)
283            .map_err(|e| {
284                AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL: {}", e)))
285            })?;
286
287        let current_file_size = file
288            .metadata()
289            .map_err(|e| {
290                AmateRSError::IoError(ErrorContext::new(format!(
291                    "Failed to get WAL file size: {}",
292                    e
293                )))
294            })?
295            .len();
296
297        Ok(Self {
298            config,
299            current_path,
300            writer: BufWriter::new(file),
301            sequence,
302            current_file_size,
303            current_file_number: file_number,
304        })
305    }
306
307    /// Find the latest WAL file and sequence number
308    fn find_latest_wal(config: &WalConfig) -> Result<(u64, u64)> {
309        let mut max_file_number = 0u64;
310        let mut max_sequence = 0u64;
311
312        if config.wal_dir.exists() {
313            let entries = std::fs::read_dir(&config.wal_dir).map_err(|e| {
314                AmateRSError::IoError(ErrorContext::new(format!(
315                    "Failed to read WAL directory: {}",
316                    e
317                )))
318            })?;
319
320            for entry in entries {
321                let entry = entry.map_err(|e| {
322                    AmateRSError::IoError(ErrorContext::new(format!(
323                        "Failed to read directory entry: {}",
324                        e
325                    )))
326                })?;
327
328                let file_name = entry.file_name();
329                let name = file_name.to_string_lossy();
330
331                // Parse WAL file names: wal_NNNNNNNN.log
332                if name.starts_with("wal_") && name.ends_with(".log") {
333                    if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
334                        if number > max_file_number {
335                            max_file_number = number;
336                        }
337                    }
338                }
339            }
340
341            // TODO: Read the latest WAL file to get the max sequence number
342            // For now, we'll start from 0
343        }
344
345        Ok((max_file_number, max_sequence))
346    }
347
348    /// Generate WAL file path for a given file number
349    fn wal_file_path(wal_dir: &Path, file_number: u64) -> PathBuf {
350        wal_dir.join(format!("wal_{:08}.log", file_number))
351    }
352
353    /// Append a Put entry
354    pub fn put(&mut self, key: Key, value: CipherBlob) -> Result<u64> {
355        let sequence = self.sequence;
356        self.sequence += 1;
357
358        let entry = WalEntry::put(sequence, key, value);
359        self.write_entry(&entry)?;
360
361        Ok(sequence)
362    }
363
364    /// Append a Delete entry
365    pub fn delete(&mut self, key: Key) -> Result<u64> {
366        let sequence = self.sequence;
367        self.sequence += 1;
368
369        let entry = WalEntry::delete(sequence, key);
370        self.write_entry(&entry)?;
371
372        Ok(sequence)
373    }
374
375    /// Write an entry to the log
376    fn write_entry(&mut self, entry: &WalEntry) -> Result<()> {
377        let bytes = entry.encode();
378
379        // Write length prefix
380        let len = bytes.len() as u32;
381        self.writer.write_all(&len.to_le_bytes()).map_err(|e| {
382            AmateRSError::IoError(ErrorContext::new(format!(
383                "Failed to write WAL entry: {}",
384                e
385            )))
386        })?;
387
388        // Write entry
389        self.writer.write_all(&bytes).map_err(|e| {
390            AmateRSError::IoError(ErrorContext::new(format!(
391                "Failed to write WAL entry: {}",
392                e
393            )))
394        })?;
395
396        // Update file size
397        let entry_size = (4 + bytes.len()) as u64; // 4 bytes for length prefix
398        self.current_file_size += entry_size;
399
400        // Optional: sync after each write for durability
401        if self.config.sync_on_write {
402            self.writer.flush().map_err(|e| {
403                AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
404            })?;
405        }
406
407        // Check if rotation is needed
408        if self.current_file_size >= self.config.max_file_size {
409            self.rotate()?;
410        }
411
412        Ok(())
413    }
414
415    /// Rotate to a new WAL file
416    pub fn rotate(&mut self) -> Result<()> {
417        // Flush current file
418        self.flush()?;
419
420        // Increment file number
421        self.current_file_number += 1;
422
423        // Create new WAL file
424        let new_path = Self::wal_file_path(&self.config.wal_dir, self.current_file_number);
425
426        let file = OpenOptions::new()
427            .create(true)
428            .append(true)
429            .open(&new_path)
430            .map_err(|e| {
431                AmateRSError::IoError(ErrorContext::new(format!(
432                    "Failed to create new WAL file: {}",
433                    e
434                )))
435            })?;
436
437        self.current_path = new_path;
438        self.writer = BufWriter::new(file);
439        self.current_file_size = 0;
440
441        // Clean up old WAL files
442        self.cleanup_old_wal_files()?;
443
444        Ok(())
445    }
446
447    /// Clean up old WAL files beyond the retention limit
448    fn cleanup_old_wal_files(&self) -> Result<()> {
449        let entries = std::fs::read_dir(&self.config.wal_dir).map_err(|e| {
450            AmateRSError::IoError(ErrorContext::new(format!(
451                "Failed to read WAL directory: {}",
452                e
453            )))
454        })?;
455
456        // Collect all WAL file numbers
457        let mut wal_files: Vec<u64> = Vec::new();
458
459        for entry in entries {
460            let entry = entry.map_err(|e| {
461                AmateRSError::IoError(ErrorContext::new(format!(
462                    "Failed to read directory entry: {}",
463                    e
464                )))
465            })?;
466
467            let file_name = entry.file_name();
468            let name = file_name.to_string_lossy();
469
470            // Parse WAL file names: wal_NNNNNNNN.log
471            if name.starts_with("wal_") && name.ends_with(".log") {
472                if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
473                    wal_files.push(number);
474                }
475            }
476        }
477
478        // Sort by file number (oldest first)
479        wal_files.sort_unstable();
480
481        // Keep only the latest max_wal_files
482        if wal_files.len() > self.config.max_wal_files {
483            let files_to_delete = wal_files.len() - self.config.max_wal_files;
484
485            for &file_number in wal_files.iter().take(files_to_delete) {
486                let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
487                std::fs::remove_file(&file_path).map_err(|e| {
488                    AmateRSError::IoError(ErrorContext::new(format!(
489                        "Failed to delete old WAL file: {}",
490                        e
491                    )))
492                })?;
493            }
494        }
495
496        Ok(())
497    }
498
499    /// Manually trigger cleanup of old WAL files
500    pub fn cleanup(&self) -> Result<()> {
501        self.cleanup_old_wal_files()
502    }
503
504    /// Get current WAL file size
505    pub fn current_file_size(&self) -> u64 {
506        self.current_file_size
507    }
508
509    /// Get current WAL file number
510    pub fn current_file_number(&self) -> u64 {
511        self.current_file_number
512    }
513
514    /// Flush buffered writes to disk
515    pub fn flush(&mut self) -> Result<()> {
516        self.writer.flush().map_err(|e| {
517            AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
518        })?;
519
520        self.writer.get_ref().sync_all().map_err(|e| {
521            AmateRSError::IoError(ErrorContext::new(format!("Failed to sync WAL: {}", e)))
522        })?;
523
524        Ok(())
525    }
526
527    /// Get current sequence number
528    pub fn sequence(&self) -> u64 {
529        self.sequence
530    }
531
532    /// Get WAL file path
533    pub fn path(&self) -> &Path {
534        &self.current_path
535    }
536
537    /// Recover all entries from WAL files in a directory
538    ///
539    /// Reads all WAL files in sequence order and returns recovered entries.
540    /// Handles corrupted and incomplete entries gracefully by skipping them.
541    ///
542    /// Returns (entries, max_sequence) where max_sequence is the highest
543    /// sequence number found during recovery.
544    pub fn recover(wal_dir: impl AsRef<Path>) -> Result<(Vec<WalEntry>, u64)> {
545        let wal_dir = wal_dir.as_ref();
546
547        if !wal_dir.exists() {
548            return Ok((Vec::new(), 0));
549        }
550
551        let entries = std::fs::read_dir(wal_dir).map_err(|e| {
552            AmateRSError::IoError(ErrorContext::new(format!(
553                "Failed to read WAL directory: {}",
554                e
555            )))
556        })?;
557
558        // Collect all WAL file numbers
559        let mut wal_files: Vec<u64> = Vec::new();
560
561        for entry in entries {
562            let entry = entry.map_err(|e| {
563                AmateRSError::IoError(ErrorContext::new(format!(
564                    "Failed to read directory entry: {}",
565                    e
566                )))
567            })?;
568
569            let file_name = entry.file_name();
570            let name = file_name.to_string_lossy();
571
572            // Parse WAL file names: wal_NNNNNNNN.log
573            if name.starts_with("wal_") && name.ends_with(".log") {
574                if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
575                    wal_files.push(number);
576                }
577            }
578        }
579
580        // Sort by file number (oldest first)
581        wal_files.sort_unstable();
582
583        // Read all entries from all files
584        let mut all_entries = Vec::new();
585        let mut max_sequence = 0u64;
586
587        for file_number in wal_files {
588            let file_path = Self::wal_file_path(wal_dir, file_number);
589            let mut reader = WalReader::open(&file_path)?;
590
591            // Read all entries from this file
592            loop {
593                match reader.read_entry() {
594                    Ok(Some(entry)) => {
595                        if entry.sequence > max_sequence {
596                            max_sequence = entry.sequence;
597                        }
598                        all_entries.push(entry);
599                    }
600                    Ok(None) => {
601                        // End of file
602                        break;
603                    }
604                    Err(e) => {
605                        // Log error but continue recovery
606                        eprintln!(
607                            "Warning: Skipping corrupted entry in {}: {}",
608                            file_path.display(),
609                            e
610                        );
611                        // Try to continue reading after error
612                        continue;
613                    }
614                }
615            }
616        }
617
618        Ok((all_entries, max_sequence))
619    }
620
621    /// Replay WAL entries to a memtable
622    ///
623    /// Applies all entries from the WAL directory to the provided memtable.
624    /// This is used during crash recovery to rebuild memtable state.
625    ///
626    /// Returns the maximum sequence number found during replay.
627    pub fn replay_to_memtable(
628        wal_dir: impl AsRef<Path>,
629        memtable: &crate::storage::memtable::Memtable,
630    ) -> Result<u64> {
631        let (entries, max_sequence) = Self::recover(wal_dir)?;
632
633        for entry in entries {
634            match entry.entry_type {
635                WalEntryType::Put => {
636                    if let Some(value) = entry.value {
637                        memtable.put(entry.key, value)?;
638                    }
639                }
640                WalEntryType::Delete => {
641                    memtable.delete(entry.key)?;
642                }
643            }
644        }
645
646        Ok(max_sequence)
647    }
648}
649
650/// WAL reader for reading entries from a WAL file
651pub struct WalReader {
652    reader: BufReader<File>,
653}
654
655impl WalReader {
656    /// Open a WAL file for reading
657    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
658        let file = File::open(path.as_ref()).map_err(|e| {
659            AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL file: {}", e)))
660        })?;
661
662        Ok(Self {
663            reader: BufReader::new(file),
664        })
665    }
666
667    /// Read the next entry from the WAL file
668    ///
669    /// Returns:
670    /// - Ok(Some(entry)) if an entry was successfully read
671    /// - Ok(None) if end of file reached
672    /// - Err if a corrupted or incomplete entry is encountered
673    pub fn read_entry(&mut self) -> Result<Option<WalEntry>> {
674        // Read length prefix (4 bytes)
675        let mut len_bytes = [0u8; 4];
676        match self.reader.read_exact(&mut len_bytes) {
677            Ok(()) => {}
678            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
679                // End of file or incomplete length prefix
680                return Ok(None);
681            }
682            Err(e) => {
683                return Err(AmateRSError::IoError(ErrorContext::new(format!(
684                    "Failed to read WAL entry length: {}",
685                    e
686                ))));
687            }
688        }
689
690        let len = u32::from_le_bytes(len_bytes) as usize;
691
692        // Sanity check: reject unreasonably large entries (>100MB)
693        if len > 100 * 1024 * 1024 {
694            return Err(AmateRSError::SerializationError(ErrorContext::new(
695                format!("WAL entry too large: {} bytes", len),
696            )));
697        }
698
699        // Read entry bytes
700        let mut entry_bytes = vec![0u8; len];
701        match self.reader.read_exact(&mut entry_bytes) {
702            Ok(()) => {}
703            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
704                // Incomplete entry (crash during write)
705                return Err(AmateRSError::SerializationError(ErrorContext::new(
706                    "Incomplete WAL entry (truncated file)",
707                )));
708            }
709            Err(e) => {
710                return Err(AmateRSError::IoError(ErrorContext::new(format!(
711                    "Failed to read WAL entry: {}",
712                    e
713                ))));
714            }
715        }
716
717        // Decode entry (this includes checksum verification)
718        let entry = WalEntry::decode(&entry_bytes)?;
719
720        Ok(Some(entry))
721    }
722}
723
724#[cfg(test)]
725mod tests {
726    use super::*;
727    use crate::storage::Memtable;
728    use std::fs;
729    use tempfile::tempdir;
730
731    #[test]
732    fn test_wal_entry_encode_decode() -> Result<()> {
733        let key = Key::from_str("test_key");
734        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
735        let entry = WalEntry::put(42, key.clone(), value.clone());
736
737        let bytes = entry.encode();
738        let decoded = WalEntry::decode(&bytes)?;
739
740        assert_eq!(decoded.sequence, 42);
741        assert_eq!(decoded.entry_type, WalEntryType::Put);
742        assert_eq!(decoded.key, key);
743        assert_eq!(decoded.value, Some(value));
744
745        Ok(())
746    }
747
748    #[test]
749    fn test_wal_delete_entry() -> Result<()> {
750        let key = Key::from_str("delete_me");
751        let entry = WalEntry::delete(99, key.clone());
752
753        let bytes = entry.encode();
754        let decoded = WalEntry::decode(&bytes)?;
755
756        assert_eq!(decoded.sequence, 99);
757        assert_eq!(decoded.entry_type, WalEntryType::Delete);
758        assert_eq!(decoded.key, key);
759        assert_eq!(decoded.value, None);
760
761        Ok(())
762    }
763
764    #[test]
765    fn test_wal_checksum_verification() -> Result<()> {
766        let key = Key::from_str("test");
767        let value = CipherBlob::new(vec![1, 2, 3]);
768        let entry = WalEntry::put(1, key, value);
769
770        // Verify should pass
771        entry.verify_checksum()?;
772
773        // Corrupt checksum
774        let mut corrupted = entry.clone();
775        corrupted.checksum = 0;
776
777        // Verify should fail
778        assert!(corrupted.verify_checksum().is_err());
779
780        Ok(())
781    }
782
783    #[test]
784    fn test_wal_basic_operations() -> Result<()> {
785        let temp_dir = tempdir().map_err(|e| {
786            AmateRSError::IoError(ErrorContext::new(format!(
787                "Failed to create temp dir: {}",
788                e
789            )))
790        })?;
791        let wal_path = temp_dir.path().join("test.wal");
792
793        let mut wal = Wal::create(&wal_path)?;
794
795        // Write some entries
796        let seq1 = wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
797        let seq2 = wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
798        let seq3 = wal.delete(Key::from_str("key1"))?;
799
800        assert_eq!(seq1, 0);
801        assert_eq!(seq2, 1);
802        assert_eq!(seq3, 2);
803
804        wal.flush()?;
805
806        // Verify a WAL file was created (may be rotated, so check path() returns something that exists)
807        assert!(wal.path().exists());
808
809        Ok(())
810    }
811
812    #[test]
813    fn test_wal_sequence_increment() -> Result<()> {
814        let temp_dir = tempdir().map_err(|e| {
815            AmateRSError::IoError(ErrorContext::new(format!(
816                "Failed to create temp dir: {}",
817                e
818            )))
819        })?;
820        let wal_path = temp_dir.path().join("test_seq.wal");
821
822        let mut wal = Wal::create(&wal_path)?;
823
824        assert_eq!(wal.sequence(), 0);
825
826        wal.put(Key::from_str("key"), CipherBlob::new(vec![1]))?;
827        assert_eq!(wal.sequence(), 1);
828
829        wal.delete(Key::from_str("key"))?;
830        assert_eq!(wal.sequence(), 2);
831
832        Ok(())
833    }
834
835    #[test]
836    fn test_wal_entry_large_value() -> Result<()> {
837        let key = Key::from_str("large");
838        let large_value = CipherBlob::new(vec![0u8; 10_000]);
839        let entry = WalEntry::put(1, key.clone(), large_value.clone());
840
841        let bytes = entry.encode();
842        let decoded = WalEntry::decode(&bytes)?;
843
844        assert_eq!(decoded.key, key);
845        assert_eq!(decoded.value, Some(large_value));
846
847        Ok(())
848    }
849
850    #[test]
851    fn test_wal_rotation() -> Result<()> {
852        use std::env;
853
854        let temp_dir = env::temp_dir().join("test_wal_rotation");
855        std::fs::create_dir_all(&temp_dir).ok();
856
857        let config = WalConfig {
858            wal_dir: temp_dir.clone(),
859            max_file_size: 1024,  // Small size to trigger rotation
860            sync_on_write: false, // Disable for speed
861            ..Default::default()
862        };
863
864        let mut wal = Wal::with_config(config)?;
865
866        let initial_file_number = wal.current_file_number();
867
868        // Write enough data to trigger rotation
869        for i in 0..20 {
870            wal.put(
871                Key::from_str(&format!("key_{}", i)),
872                CipherBlob::new(vec![i as u8; 100]),
873            )?;
874        }
875
876        // File number should have increased due to rotation
877        assert!(wal.current_file_number() > initial_file_number);
878
879        // Verify new file exists
880        assert!(wal.path().exists());
881
882        std::fs::remove_dir_all(&temp_dir).ok();
883        Ok(())
884    }
885
886    #[test]
887    fn test_wal_cleanup() -> Result<()> {
888        use std::env;
889
890        let temp_dir = env::temp_dir().join("test_wal_cleanup");
891        std::fs::create_dir_all(&temp_dir).ok();
892
893        let config = WalConfig {
894            wal_dir: temp_dir.clone(),
895            max_file_size: 512, // Very small to trigger many rotations
896            max_wal_files: 3,   // Keep only 3 files
897            sync_on_write: false,
898        };
899
900        let mut wal = Wal::with_config(config)?;
901
902        // Write enough data to create many WAL files
903        for i in 0..100 {
904            wal.put(
905                Key::from_str(&format!("key_{}", i)),
906                CipherBlob::new(vec![i as u8; 100]),
907            )?;
908        }
909
910        // Count WAL files
911        let wal_file_count = std::fs::read_dir(&temp_dir)?
912            .filter_map(|e| e.ok())
913            .filter(|e| {
914                e.file_name().to_string_lossy().starts_with("wal_")
915                    && e.file_name().to_string_lossy().ends_with(".log")
916            })
917            .count();
918
919        // Should have at most max_wal_files
920        assert!(wal_file_count <= 3);
921
922        std::fs::remove_dir_all(&temp_dir).ok();
923        Ok(())
924    }
925
926    #[test]
927    fn test_wal_manual_cleanup() -> Result<()> {
928        use std::env;
929
930        let temp_dir = env::temp_dir().join("test_wal_manual_cleanup");
931        std::fs::create_dir_all(&temp_dir).ok();
932
933        let config = WalConfig {
934            wal_dir: temp_dir.clone(),
935            max_file_size: 512,
936            max_wal_files: 5,
937            sync_on_write: false,
938        };
939
940        let mut wal = Wal::with_config(config)?;
941
942        // Create several WAL files
943        for i in 0..80 {
944            wal.put(
945                Key::from_str(&format!("key_{}", i)),
946                CipherBlob::new(vec![i as u8; 100]),
947            )?;
948        }
949
950        // Manually trigger cleanup
951        wal.cleanup()?;
952
953        // Count files
954        let wal_file_count = std::fs::read_dir(&temp_dir)?
955            .filter_map(|e| e.ok())
956            .filter(|e| {
957                e.file_name().to_string_lossy().starts_with("wal_")
958                    && e.file_name().to_string_lossy().ends_with(".log")
959            })
960            .count();
961
962        assert!(wal_file_count <= 5);
963
964        std::fs::remove_dir_all(&temp_dir).ok();
965        Ok(())
966    }
967
968    #[test]
969    fn test_wal_recovery_basic() -> Result<()> {
970        use std::env;
971
972        let temp_dir = env::temp_dir().join("test_wal_recovery_basic");
973        std::fs::create_dir_all(&temp_dir).ok();
974
975        // Write some entries
976        {
977            let config = WalConfig {
978                wal_dir: temp_dir.clone(),
979                sync_on_write: true,
980                ..Default::default()
981            };
982
983            let mut wal = Wal::with_config(config)?;
984
985            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
986            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
987            wal.delete(Key::from_str("key1"))?;
988            wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
989
990            wal.flush()?;
991        }
992
993        // Recover entries
994        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
995
996        assert_eq!(entries.len(), 4);
997        assert_eq!(max_sequence, 3);
998
999        // Verify entries
1000        assert_eq!(entries[0].key, Key::from_str("key1"));
1001        assert_eq!(entries[0].entry_type, WalEntryType::Put);
1002        assert_eq!(entries[0].value, Some(CipherBlob::new(vec![1, 2, 3])));
1003
1004        assert_eq!(entries[1].key, Key::from_str("key2"));
1005        assert_eq!(entries[1].entry_type, WalEntryType::Put);
1006
1007        assert_eq!(entries[2].key, Key::from_str("key1"));
1008        assert_eq!(entries[2].entry_type, WalEntryType::Delete);
1009        assert_eq!(entries[2].value, None);
1010
1011        assert_eq!(entries[3].key, Key::from_str("key3"));
1012        assert_eq!(entries[3].entry_type, WalEntryType::Put);
1013
1014        std::fs::remove_dir_all(&temp_dir).ok();
1015        Ok(())
1016    }
1017
1018    #[test]
1019    fn test_wal_recovery_multiple_files() -> Result<()> {
1020        use std::env;
1021
1022        let temp_dir = env::temp_dir().join("test_wal_recovery_multiple");
1023        std::fs::create_dir_all(&temp_dir).ok();
1024
1025        // Write entries across multiple WAL files
1026        {
1027            let config = WalConfig {
1028                wal_dir: temp_dir.clone(),
1029                max_file_size: 512, // Small to trigger rotation
1030                sync_on_write: true,
1031                ..Default::default()
1032            };
1033
1034            let mut wal = Wal::with_config(config)?;
1035
1036            // Write enough data to create multiple files
1037            for i in 0..20 {
1038                wal.put(
1039                    Key::from_str(&format!("key_{}", i)),
1040                    CipherBlob::new(vec![i as u8; 100]),
1041                )?;
1042            }
1043
1044            wal.flush()?;
1045        }
1046
1047        // Recover all entries
1048        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1049
1050        assert_eq!(entries.len(), 20);
1051        assert_eq!(max_sequence, 19);
1052
1053        // Verify entries are in sequence order
1054        for (i, entry) in entries.iter().enumerate() {
1055            assert_eq!(entry.sequence, i as u64);
1056            assert_eq!(entry.key, Key::from_str(&format!("key_{}", i)));
1057        }
1058
1059        std::fs::remove_dir_all(&temp_dir).ok();
1060        Ok(())
1061    }
1062
1063    #[test]
1064    fn test_wal_recovery_empty_directory() -> Result<()> {
1065        use std::env;
1066
1067        let temp_dir = env::temp_dir().join("test_wal_recovery_empty");
1068        std::fs::create_dir_all(&temp_dir).ok();
1069
1070        // Recover from empty directory
1071        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1072
1073        assert_eq!(entries.len(), 0);
1074        assert_eq!(max_sequence, 0);
1075
1076        std::fs::remove_dir_all(&temp_dir).ok();
1077        Ok(())
1078    }
1079
1080    #[test]
1081    fn test_wal_recovery_nonexistent_directory() -> Result<()> {
1082        use std::env;
1083
1084        let temp_dir = env::temp_dir().join("nonexistent_wal_dir_12345");
1085
1086        // Recover from non-existent directory
1087        let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1088
1089        assert_eq!(entries.len(), 0);
1090        assert_eq!(max_sequence, 0);
1091
1092        Ok(())
1093    }
1094
1095    #[test]
1096    fn test_wal_replay_to_memtable() -> Result<()> {
1097        use std::env;
1098
1099        let temp_dir = env::temp_dir().join("test_wal_replay_memtable");
1100        std::fs::create_dir_all(&temp_dir).ok();
1101
1102        // Write some entries
1103        {
1104            let config = WalConfig {
1105                wal_dir: temp_dir.clone(),
1106                sync_on_write: true,
1107                ..Default::default()
1108            };
1109
1110            let mut wal = Wal::with_config(config)?;
1111
1112            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1113            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1114            wal.delete(Key::from_str("key1"))?;
1115            wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1116
1117            wal.flush()?;
1118        }
1119
1120        // Create a new memtable and replay WAL
1121        let memtable = Memtable::new();
1122        let max_sequence = Wal::replay_to_memtable(&temp_dir, &memtable)?;
1123
1124        assert_eq!(max_sequence, 3);
1125
1126        // Verify memtable state
1127        assert_eq!(memtable.get(&Key::from_str("key1"))?, None); // Deleted
1128        assert_eq!(
1129            memtable.get(&Key::from_str("key2"))?,
1130            Some(CipherBlob::new(vec![4, 5, 6]))
1131        );
1132        assert_eq!(
1133            memtable.get(&Key::from_str("key3"))?,
1134            Some(CipherBlob::new(vec![7, 8, 9]))
1135        );
1136
1137        std::fs::remove_dir_all(&temp_dir).ok();
1138        Ok(())
1139    }
1140
1141    #[test]
1142    fn test_wal_reader_basic() -> Result<()> {
1143        use std::env;
1144
1145        let temp_dir = env::temp_dir().join("test_wal_reader_basic");
1146        std::fs::create_dir_all(&temp_dir).ok();
1147
1148        let wal_file = temp_dir.join("test.wal");
1149
1150        // Write some entries
1151        {
1152            let mut wal = Wal::create(&wal_file)?;
1153            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1154            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1155            wal.flush()?;
1156        }
1157
1158        // Read entries with WalReader
1159        let wal_file_actual = temp_dir.join("wal_00000000.log");
1160        let mut reader = WalReader::open(&wal_file_actual)?;
1161
1162        let entry1 = reader.read_entry()?.expect("Should have entry 1");
1163        assert_eq!(entry1.sequence, 0);
1164        assert_eq!(entry1.key, Key::from_str("key1"));
1165
1166        let entry2 = reader.read_entry()?.expect("Should have entry 2");
1167        assert_eq!(entry2.sequence, 1);
1168        assert_eq!(entry2.key, Key::from_str("key2"));
1169
1170        let entry3 = reader.read_entry()?;
1171        assert_eq!(entry3, None); // End of file
1172
1173        std::fs::remove_dir_all(&temp_dir).ok();
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn test_wal_recovery_with_truncated_file() -> Result<()> {
1179        use std::env;
1180        use std::io::Write as IoWrite;
1181
1182        let temp_dir = env::temp_dir().join("test_wal_recovery_truncated");
1183        std::fs::create_dir_all(&temp_dir).ok();
1184
1185        // Write some valid entries, then truncate the last one
1186        let wal_file = temp_dir.join("wal_00000000.log");
1187        {
1188            let mut wal = Wal::create(&wal_file)?;
1189            wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1190            wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1191            wal.flush()?;
1192
1193            // Append incomplete entry (length prefix only, no data)
1194            let mut file = OpenOptions::new().append(true).open(&wal_file)?;
1195            let incomplete_len = 1234u32;
1196            file.write_all(&incomplete_len.to_le_bytes())?;
1197            file.flush()?;
1198        }
1199
1200        // Recovery should handle truncated entry gracefully
1201        let (entries, _) = Wal::recover(&temp_dir)?;
1202
1203        // Should recover the 2 complete entries, skip the truncated one
1204        assert_eq!(entries.len(), 2);
1205        assert_eq!(entries[0].key, Key::from_str("key1"));
1206        assert_eq!(entries[1].key, Key::from_str("key2"));
1207
1208        std::fs::remove_dir_all(&temp_dir).ok();
1209        Ok(())
1210    }
1211}