1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::types::{CipherBlob, Key};
8use std::fs::{File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11
12#[derive(Debug, Clone, PartialEq)]
14pub enum WalEntryType {
15 Put = 1,
16 Delete = 2,
17}
18
19#[derive(Debug, Clone, PartialEq)]
21pub struct WalEntry {
22 pub sequence: u64,
24 pub entry_type: WalEntryType,
26 pub key: Key,
28 pub value: Option<CipherBlob>,
30 pub checksum: u32,
32}
33
34impl WalEntry {
35 pub fn put(sequence: u64, key: Key, value: CipherBlob) -> Self {
37 let mut entry = Self {
38 sequence,
39 entry_type: WalEntryType::Put,
40 key,
41 value: Some(value),
42 checksum: 0,
43 };
44 entry.checksum = entry.calculate_checksum();
45 entry
46 }
47
48 pub fn delete(sequence: u64, key: Key) -> Self {
50 let mut entry = Self {
51 sequence,
52 entry_type: WalEntryType::Delete,
53 key,
54 value: None,
55 checksum: 0,
56 };
57 entry.checksum = entry.calculate_checksum();
58 entry
59 }
60
61 fn calculate_checksum(&self) -> u32 {
63 let mut hasher = crc32fast::Hasher::new();
64
65 hasher.update(&self.sequence.to_le_bytes());
67
68 hasher.update(&[self.entry_type.clone() as u8]);
70
71 hasher.update(self.key.as_bytes());
73
74 if let Some(ref value) = self.value {
76 hasher.update(value.as_bytes());
77 }
78
79 hasher.finalize()
80 }
81
82 pub fn verify_checksum(&self) -> Result<()> {
84 let calculated = self.calculate_checksum();
85 if calculated == self.checksum {
86 Ok(())
87 } else {
88 Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
89 "WAL entry checksum mismatch: expected {}, got {}",
90 self.checksum, calculated
91 ))))
92 }
93 }
94
95 pub fn encode(&self) -> Vec<u8> {
97 let mut bytes = Vec::new();
98
99 bytes.extend_from_slice(&0x57414Cu32.to_le_bytes());
101
102 bytes.extend_from_slice(&self.sequence.to_le_bytes());
104
105 bytes.push(self.entry_type.clone() as u8);
107
108 bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
110 bytes.extend_from_slice(self.key.as_bytes());
111
112 if let Some(ref value) = self.value {
114 bytes.extend_from_slice(&(value.len() as u32).to_le_bytes());
115 bytes.extend_from_slice(value.as_bytes());
116 } else {
117 bytes.extend_from_slice(&0u32.to_le_bytes());
118 }
119
120 bytes.extend_from_slice(&self.checksum.to_le_bytes());
122
123 bytes
124 }
125
126 pub fn decode(bytes: &[u8]) -> Result<Self> {
128 if bytes.len() < 17 {
129 return Err(AmateRSError::SerializationError(ErrorContext::new(
131 "WAL entry too short",
132 )));
133 }
134
135 let mut offset = 0;
136
137 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
139 if magic != 0x57414C {
140 return Err(AmateRSError::SerializationError(ErrorContext::new(
141 "Invalid WAL entry magic number",
142 )));
143 }
144 offset += 4;
145
146 let sequence = u64::from_le_bytes(bytes[offset..offset + 8].try_into().map_err(|_| {
148 AmateRSError::SerializationError(ErrorContext::new("Failed to read sequence"))
149 })?);
150 offset += 8;
151
152 let entry_type = match bytes[offset] {
154 1 => WalEntryType::Put,
155 2 => WalEntryType::Delete,
156 _ => {
157 return Err(AmateRSError::SerializationError(ErrorContext::new(
158 "Invalid WAL entry type",
159 )));
160 }
161 };
162 offset += 1;
163
164 let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
166 AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
167 })?) as usize;
168 offset += 4;
169
170 let key_bytes = &bytes[offset..offset + key_len];
171 let key = Key::from_slice(key_bytes);
172 offset += key_len;
173
174 let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
176 AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
177 })?) as usize;
178 offset += 4;
179
180 let value = if value_len > 0 {
181 let value_bytes = &bytes[offset..offset + value_len];
182 Some(CipherBlob::new(value_bytes.to_vec()))
183 } else {
184 None
185 };
186 offset += value_len;
187
188 let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
190 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
191 })?);
192
193 let entry = Self {
194 sequence,
195 entry_type,
196 key,
197 value,
198 checksum,
199 };
200
201 entry.verify_checksum()?;
203
204 Ok(entry)
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct WalConfig {
211 pub wal_dir: PathBuf,
213 pub max_file_size: u64,
215 pub max_wal_files: usize,
217 pub sync_on_write: bool,
219}
220
221impl Default for WalConfig {
222 fn default() -> Self {
223 Self {
224 wal_dir: PathBuf::from("./wal"),
225 max_file_size: 64 * 1024 * 1024, max_wal_files: 10,
227 sync_on_write: true,
228 }
229 }
230}
231
232pub struct Wal {
234 config: WalConfig,
236 current_path: PathBuf,
238 writer: BufWriter<File>,
240 sequence: u64,
242 current_file_size: u64,
244 current_file_number: u64,
246}
247
248impl Wal {
249 pub fn create(path: impl AsRef<Path>) -> Result<Self> {
251 let path = path.as_ref().to_path_buf();
252 let parent = path.parent().ok_or_else(|| {
253 AmateRSError::IoError(ErrorContext::new("WAL path has no parent directory"))
254 })?;
255
256 let config = WalConfig {
257 wal_dir: parent.to_path_buf(),
258 ..Default::default()
259 };
260
261 Self::with_config(config)
262 }
263
264 pub fn with_config(config: WalConfig) -> Result<Self> {
266 std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
268 AmateRSError::IoError(ErrorContext::new(format!(
269 "Failed to create WAL directory: {}",
270 e
271 )))
272 })?;
273
274 let (file_number, sequence) = Self::find_latest_wal(&config)?;
276
277 let current_path = Self::wal_file_path(&config.wal_dir, file_number);
278
279 let file = OpenOptions::new()
280 .create(true)
281 .append(true)
282 .open(¤t_path)
283 .map_err(|e| {
284 AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL: {}", e)))
285 })?;
286
287 let current_file_size = file
288 .metadata()
289 .map_err(|e| {
290 AmateRSError::IoError(ErrorContext::new(format!(
291 "Failed to get WAL file size: {}",
292 e
293 )))
294 })?
295 .len();
296
297 Ok(Self {
298 config,
299 current_path,
300 writer: BufWriter::new(file),
301 sequence,
302 current_file_size,
303 current_file_number: file_number,
304 })
305 }
306
307 fn find_latest_wal(config: &WalConfig) -> Result<(u64, u64)> {
309 let mut max_file_number = 0u64;
310 let mut max_sequence = 0u64;
311
312 if config.wal_dir.exists() {
313 let entries = std::fs::read_dir(&config.wal_dir).map_err(|e| {
314 AmateRSError::IoError(ErrorContext::new(format!(
315 "Failed to read WAL directory: {}",
316 e
317 )))
318 })?;
319
320 for entry in entries {
321 let entry = entry.map_err(|e| {
322 AmateRSError::IoError(ErrorContext::new(format!(
323 "Failed to read directory entry: {}",
324 e
325 )))
326 })?;
327
328 let file_name = entry.file_name();
329 let name = file_name.to_string_lossy();
330
331 if name.starts_with("wal_") && name.ends_with(".log") {
333 if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
334 if number > max_file_number {
335 max_file_number = number;
336 }
337 }
338 }
339 }
340
341 }
344
345 Ok((max_file_number, max_sequence))
346 }
347
348 fn wal_file_path(wal_dir: &Path, file_number: u64) -> PathBuf {
350 wal_dir.join(format!("wal_{:08}.log", file_number))
351 }
352
353 pub fn put(&mut self, key: Key, value: CipherBlob) -> Result<u64> {
355 let sequence = self.sequence;
356 self.sequence += 1;
357
358 let entry = WalEntry::put(sequence, key, value);
359 self.write_entry(&entry)?;
360
361 Ok(sequence)
362 }
363
364 pub fn delete(&mut self, key: Key) -> Result<u64> {
366 let sequence = self.sequence;
367 self.sequence += 1;
368
369 let entry = WalEntry::delete(sequence, key);
370 self.write_entry(&entry)?;
371
372 Ok(sequence)
373 }
374
375 fn write_entry(&mut self, entry: &WalEntry) -> Result<()> {
377 let bytes = entry.encode();
378
379 let len = bytes.len() as u32;
381 self.writer.write_all(&len.to_le_bytes()).map_err(|e| {
382 AmateRSError::IoError(ErrorContext::new(format!(
383 "Failed to write WAL entry: {}",
384 e
385 )))
386 })?;
387
388 self.writer.write_all(&bytes).map_err(|e| {
390 AmateRSError::IoError(ErrorContext::new(format!(
391 "Failed to write WAL entry: {}",
392 e
393 )))
394 })?;
395
396 let entry_size = (4 + bytes.len()) as u64; self.current_file_size += entry_size;
399
400 if self.config.sync_on_write {
402 self.writer.flush().map_err(|e| {
403 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
404 })?;
405 }
406
407 if self.current_file_size >= self.config.max_file_size {
409 self.rotate()?;
410 }
411
412 Ok(())
413 }
414
415 pub fn rotate(&mut self) -> Result<()> {
417 self.flush()?;
419
420 self.current_file_number += 1;
422
423 let new_path = Self::wal_file_path(&self.config.wal_dir, self.current_file_number);
425
426 let file = OpenOptions::new()
427 .create(true)
428 .append(true)
429 .open(&new_path)
430 .map_err(|e| {
431 AmateRSError::IoError(ErrorContext::new(format!(
432 "Failed to create new WAL file: {}",
433 e
434 )))
435 })?;
436
437 self.current_path = new_path;
438 self.writer = BufWriter::new(file);
439 self.current_file_size = 0;
440
441 self.cleanup_old_wal_files()?;
443
444 Ok(())
445 }
446
447 fn cleanup_old_wal_files(&self) -> Result<()> {
449 let entries = std::fs::read_dir(&self.config.wal_dir).map_err(|e| {
450 AmateRSError::IoError(ErrorContext::new(format!(
451 "Failed to read WAL directory: {}",
452 e
453 )))
454 })?;
455
456 let mut wal_files: Vec<u64> = Vec::new();
458
459 for entry in entries {
460 let entry = entry.map_err(|e| {
461 AmateRSError::IoError(ErrorContext::new(format!(
462 "Failed to read directory entry: {}",
463 e
464 )))
465 })?;
466
467 let file_name = entry.file_name();
468 let name = file_name.to_string_lossy();
469
470 if name.starts_with("wal_") && name.ends_with(".log") {
472 if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
473 wal_files.push(number);
474 }
475 }
476 }
477
478 wal_files.sort_unstable();
480
481 if wal_files.len() > self.config.max_wal_files {
483 let files_to_delete = wal_files.len() - self.config.max_wal_files;
484
485 for &file_number in wal_files.iter().take(files_to_delete) {
486 let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
487 std::fs::remove_file(&file_path).map_err(|e| {
488 AmateRSError::IoError(ErrorContext::new(format!(
489 "Failed to delete old WAL file: {}",
490 e
491 )))
492 })?;
493 }
494 }
495
496 Ok(())
497 }
498
499 pub fn cleanup(&self) -> Result<()> {
501 self.cleanup_old_wal_files()
502 }
503
504 pub fn current_file_size(&self) -> u64 {
506 self.current_file_size
507 }
508
509 pub fn current_file_number(&self) -> u64 {
511 self.current_file_number
512 }
513
514 pub fn flush(&mut self) -> Result<()> {
516 self.writer.flush().map_err(|e| {
517 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
518 })?;
519
520 self.writer.get_ref().sync_all().map_err(|e| {
521 AmateRSError::IoError(ErrorContext::new(format!("Failed to sync WAL: {}", e)))
522 })?;
523
524 Ok(())
525 }
526
527 pub fn sequence(&self) -> u64 {
529 self.sequence
530 }
531
532 pub fn path(&self) -> &Path {
534 &self.current_path
535 }
536
537 pub fn recover(wal_dir: impl AsRef<Path>) -> Result<(Vec<WalEntry>, u64)> {
545 let wal_dir = wal_dir.as_ref();
546
547 if !wal_dir.exists() {
548 return Ok((Vec::new(), 0));
549 }
550
551 let entries = std::fs::read_dir(wal_dir).map_err(|e| {
552 AmateRSError::IoError(ErrorContext::new(format!(
553 "Failed to read WAL directory: {}",
554 e
555 )))
556 })?;
557
558 let mut wal_files: Vec<u64> = Vec::new();
560
561 for entry in entries {
562 let entry = entry.map_err(|e| {
563 AmateRSError::IoError(ErrorContext::new(format!(
564 "Failed to read directory entry: {}",
565 e
566 )))
567 })?;
568
569 let file_name = entry.file_name();
570 let name = file_name.to_string_lossy();
571
572 if name.starts_with("wal_") && name.ends_with(".log") {
574 if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
575 wal_files.push(number);
576 }
577 }
578 }
579
580 wal_files.sort_unstable();
582
583 let mut all_entries = Vec::new();
585 let mut max_sequence = 0u64;
586
587 for file_number in wal_files {
588 let file_path = Self::wal_file_path(wal_dir, file_number);
589 let mut reader = WalReader::open(&file_path)?;
590
591 loop {
593 match reader.read_entry() {
594 Ok(Some(entry)) => {
595 if entry.sequence > max_sequence {
596 max_sequence = entry.sequence;
597 }
598 all_entries.push(entry);
599 }
600 Ok(None) => {
601 break;
603 }
604 Err(e) => {
605 eprintln!(
607 "Warning: Skipping corrupted entry in {}: {}",
608 file_path.display(),
609 e
610 );
611 continue;
613 }
614 }
615 }
616 }
617
618 Ok((all_entries, max_sequence))
619 }
620
621 pub fn replay_to_memtable(
628 wal_dir: impl AsRef<Path>,
629 memtable: &crate::storage::memtable::Memtable,
630 ) -> Result<u64> {
631 let (entries, max_sequence) = Self::recover(wal_dir)?;
632
633 for entry in entries {
634 match entry.entry_type {
635 WalEntryType::Put => {
636 if let Some(value) = entry.value {
637 memtable.put(entry.key, value)?;
638 }
639 }
640 WalEntryType::Delete => {
641 memtable.delete(entry.key)?;
642 }
643 }
644 }
645
646 Ok(max_sequence)
647 }
648}
649
650pub struct WalReader {
652 reader: BufReader<File>,
653}
654
655impl WalReader {
656 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
658 let file = File::open(path.as_ref()).map_err(|e| {
659 AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL file: {}", e)))
660 })?;
661
662 Ok(Self {
663 reader: BufReader::new(file),
664 })
665 }
666
667 pub fn read_entry(&mut self) -> Result<Option<WalEntry>> {
674 let mut len_bytes = [0u8; 4];
676 match self.reader.read_exact(&mut len_bytes) {
677 Ok(()) => {}
678 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
679 return Ok(None);
681 }
682 Err(e) => {
683 return Err(AmateRSError::IoError(ErrorContext::new(format!(
684 "Failed to read WAL entry length: {}",
685 e
686 ))));
687 }
688 }
689
690 let len = u32::from_le_bytes(len_bytes) as usize;
691
692 if len > 100 * 1024 * 1024 {
694 return Err(AmateRSError::SerializationError(ErrorContext::new(
695 format!("WAL entry too large: {} bytes", len),
696 )));
697 }
698
699 let mut entry_bytes = vec![0u8; len];
701 match self.reader.read_exact(&mut entry_bytes) {
702 Ok(()) => {}
703 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
704 return Err(AmateRSError::SerializationError(ErrorContext::new(
706 "Incomplete WAL entry (truncated file)",
707 )));
708 }
709 Err(e) => {
710 return Err(AmateRSError::IoError(ErrorContext::new(format!(
711 "Failed to read WAL entry: {}",
712 e
713 ))));
714 }
715 }
716
717 let entry = WalEntry::decode(&entry_bytes)?;
719
720 Ok(Some(entry))
721 }
722}
723
724#[cfg(test)]
725mod tests {
726 use super::*;
727 use crate::storage::Memtable;
728 use std::fs;
729 use tempfile::tempdir;
730
731 #[test]
732 fn test_wal_entry_encode_decode() -> Result<()> {
733 let key = Key::from_str("test_key");
734 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
735 let entry = WalEntry::put(42, key.clone(), value.clone());
736
737 let bytes = entry.encode();
738 let decoded = WalEntry::decode(&bytes)?;
739
740 assert_eq!(decoded.sequence, 42);
741 assert_eq!(decoded.entry_type, WalEntryType::Put);
742 assert_eq!(decoded.key, key);
743 assert_eq!(decoded.value, Some(value));
744
745 Ok(())
746 }
747
748 #[test]
749 fn test_wal_delete_entry() -> Result<()> {
750 let key = Key::from_str("delete_me");
751 let entry = WalEntry::delete(99, key.clone());
752
753 let bytes = entry.encode();
754 let decoded = WalEntry::decode(&bytes)?;
755
756 assert_eq!(decoded.sequence, 99);
757 assert_eq!(decoded.entry_type, WalEntryType::Delete);
758 assert_eq!(decoded.key, key);
759 assert_eq!(decoded.value, None);
760
761 Ok(())
762 }
763
764 #[test]
765 fn test_wal_checksum_verification() -> Result<()> {
766 let key = Key::from_str("test");
767 let value = CipherBlob::new(vec![1, 2, 3]);
768 let entry = WalEntry::put(1, key, value);
769
770 entry.verify_checksum()?;
772
773 let mut corrupted = entry.clone();
775 corrupted.checksum = 0;
776
777 assert!(corrupted.verify_checksum().is_err());
779
780 Ok(())
781 }
782
783 #[test]
784 fn test_wal_basic_operations() -> Result<()> {
785 let temp_dir = tempdir().map_err(|e| {
786 AmateRSError::IoError(ErrorContext::new(format!(
787 "Failed to create temp dir: {}",
788 e
789 )))
790 })?;
791 let wal_path = temp_dir.path().join("test.wal");
792
793 let mut wal = Wal::create(&wal_path)?;
794
795 let seq1 = wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
797 let seq2 = wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
798 let seq3 = wal.delete(Key::from_str("key1"))?;
799
800 assert_eq!(seq1, 0);
801 assert_eq!(seq2, 1);
802 assert_eq!(seq3, 2);
803
804 wal.flush()?;
805
806 assert!(wal.path().exists());
808
809 Ok(())
810 }
811
812 #[test]
813 fn test_wal_sequence_increment() -> Result<()> {
814 let temp_dir = tempdir().map_err(|e| {
815 AmateRSError::IoError(ErrorContext::new(format!(
816 "Failed to create temp dir: {}",
817 e
818 )))
819 })?;
820 let wal_path = temp_dir.path().join("test_seq.wal");
821
822 let mut wal = Wal::create(&wal_path)?;
823
824 assert_eq!(wal.sequence(), 0);
825
826 wal.put(Key::from_str("key"), CipherBlob::new(vec![1]))?;
827 assert_eq!(wal.sequence(), 1);
828
829 wal.delete(Key::from_str("key"))?;
830 assert_eq!(wal.sequence(), 2);
831
832 Ok(())
833 }
834
835 #[test]
836 fn test_wal_entry_large_value() -> Result<()> {
837 let key = Key::from_str("large");
838 let large_value = CipherBlob::new(vec![0u8; 10_000]);
839 let entry = WalEntry::put(1, key.clone(), large_value.clone());
840
841 let bytes = entry.encode();
842 let decoded = WalEntry::decode(&bytes)?;
843
844 assert_eq!(decoded.key, key);
845 assert_eq!(decoded.value, Some(large_value));
846
847 Ok(())
848 }
849
850 #[test]
851 fn test_wal_rotation() -> Result<()> {
852 use std::env;
853
854 let temp_dir = env::temp_dir().join("test_wal_rotation");
855 std::fs::create_dir_all(&temp_dir).ok();
856
857 let config = WalConfig {
858 wal_dir: temp_dir.clone(),
859 max_file_size: 1024, sync_on_write: false, ..Default::default()
862 };
863
864 let mut wal = Wal::with_config(config)?;
865
866 let initial_file_number = wal.current_file_number();
867
868 for i in 0..20 {
870 wal.put(
871 Key::from_str(&format!("key_{}", i)),
872 CipherBlob::new(vec![i as u8; 100]),
873 )?;
874 }
875
876 assert!(wal.current_file_number() > initial_file_number);
878
879 assert!(wal.path().exists());
881
882 std::fs::remove_dir_all(&temp_dir).ok();
883 Ok(())
884 }
885
886 #[test]
887 fn test_wal_cleanup() -> Result<()> {
888 use std::env;
889
890 let temp_dir = env::temp_dir().join("test_wal_cleanup");
891 std::fs::create_dir_all(&temp_dir).ok();
892
893 let config = WalConfig {
894 wal_dir: temp_dir.clone(),
895 max_file_size: 512, max_wal_files: 3, sync_on_write: false,
898 };
899
900 let mut wal = Wal::with_config(config)?;
901
902 for i in 0..100 {
904 wal.put(
905 Key::from_str(&format!("key_{}", i)),
906 CipherBlob::new(vec![i as u8; 100]),
907 )?;
908 }
909
910 let wal_file_count = std::fs::read_dir(&temp_dir)?
912 .filter_map(|e| e.ok())
913 .filter(|e| {
914 e.file_name().to_string_lossy().starts_with("wal_")
915 && e.file_name().to_string_lossy().ends_with(".log")
916 })
917 .count();
918
919 assert!(wal_file_count <= 3);
921
922 std::fs::remove_dir_all(&temp_dir).ok();
923 Ok(())
924 }
925
926 #[test]
927 fn test_wal_manual_cleanup() -> Result<()> {
928 use std::env;
929
930 let temp_dir = env::temp_dir().join("test_wal_manual_cleanup");
931 std::fs::create_dir_all(&temp_dir).ok();
932
933 let config = WalConfig {
934 wal_dir: temp_dir.clone(),
935 max_file_size: 512,
936 max_wal_files: 5,
937 sync_on_write: false,
938 };
939
940 let mut wal = Wal::with_config(config)?;
941
942 for i in 0..80 {
944 wal.put(
945 Key::from_str(&format!("key_{}", i)),
946 CipherBlob::new(vec![i as u8; 100]),
947 )?;
948 }
949
950 wal.cleanup()?;
952
953 let wal_file_count = std::fs::read_dir(&temp_dir)?
955 .filter_map(|e| e.ok())
956 .filter(|e| {
957 e.file_name().to_string_lossy().starts_with("wal_")
958 && e.file_name().to_string_lossy().ends_with(".log")
959 })
960 .count();
961
962 assert!(wal_file_count <= 5);
963
964 std::fs::remove_dir_all(&temp_dir).ok();
965 Ok(())
966 }
967
968 #[test]
969 fn test_wal_recovery_basic() -> Result<()> {
970 use std::env;
971
972 let temp_dir = env::temp_dir().join("test_wal_recovery_basic");
973 std::fs::create_dir_all(&temp_dir).ok();
974
975 {
977 let config = WalConfig {
978 wal_dir: temp_dir.clone(),
979 sync_on_write: true,
980 ..Default::default()
981 };
982
983 let mut wal = Wal::with_config(config)?;
984
985 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
986 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
987 wal.delete(Key::from_str("key1"))?;
988 wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
989
990 wal.flush()?;
991 }
992
993 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
995
996 assert_eq!(entries.len(), 4);
997 assert_eq!(max_sequence, 3);
998
999 assert_eq!(entries[0].key, Key::from_str("key1"));
1001 assert_eq!(entries[0].entry_type, WalEntryType::Put);
1002 assert_eq!(entries[0].value, Some(CipherBlob::new(vec![1, 2, 3])));
1003
1004 assert_eq!(entries[1].key, Key::from_str("key2"));
1005 assert_eq!(entries[1].entry_type, WalEntryType::Put);
1006
1007 assert_eq!(entries[2].key, Key::from_str("key1"));
1008 assert_eq!(entries[2].entry_type, WalEntryType::Delete);
1009 assert_eq!(entries[2].value, None);
1010
1011 assert_eq!(entries[3].key, Key::from_str("key3"));
1012 assert_eq!(entries[3].entry_type, WalEntryType::Put);
1013
1014 std::fs::remove_dir_all(&temp_dir).ok();
1015 Ok(())
1016 }
1017
1018 #[test]
1019 fn test_wal_recovery_multiple_files() -> Result<()> {
1020 use std::env;
1021
1022 let temp_dir = env::temp_dir().join("test_wal_recovery_multiple");
1023 std::fs::create_dir_all(&temp_dir).ok();
1024
1025 {
1027 let config = WalConfig {
1028 wal_dir: temp_dir.clone(),
1029 max_file_size: 512, sync_on_write: true,
1031 ..Default::default()
1032 };
1033
1034 let mut wal = Wal::with_config(config)?;
1035
1036 for i in 0..20 {
1038 wal.put(
1039 Key::from_str(&format!("key_{}", i)),
1040 CipherBlob::new(vec![i as u8; 100]),
1041 )?;
1042 }
1043
1044 wal.flush()?;
1045 }
1046
1047 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1049
1050 assert_eq!(entries.len(), 20);
1051 assert_eq!(max_sequence, 19);
1052
1053 for (i, entry) in entries.iter().enumerate() {
1055 assert_eq!(entry.sequence, i as u64);
1056 assert_eq!(entry.key, Key::from_str(&format!("key_{}", i)));
1057 }
1058
1059 std::fs::remove_dir_all(&temp_dir).ok();
1060 Ok(())
1061 }
1062
1063 #[test]
1064 fn test_wal_recovery_empty_directory() -> Result<()> {
1065 use std::env;
1066
1067 let temp_dir = env::temp_dir().join("test_wal_recovery_empty");
1068 std::fs::create_dir_all(&temp_dir).ok();
1069
1070 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1072
1073 assert_eq!(entries.len(), 0);
1074 assert_eq!(max_sequence, 0);
1075
1076 std::fs::remove_dir_all(&temp_dir).ok();
1077 Ok(())
1078 }
1079
1080 #[test]
1081 fn test_wal_recovery_nonexistent_directory() -> Result<()> {
1082 use std::env;
1083
1084 let temp_dir = env::temp_dir().join("nonexistent_wal_dir_12345");
1085
1086 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1088
1089 assert_eq!(entries.len(), 0);
1090 assert_eq!(max_sequence, 0);
1091
1092 Ok(())
1093 }
1094
1095 #[test]
1096 fn test_wal_replay_to_memtable() -> Result<()> {
1097 use std::env;
1098
1099 let temp_dir = env::temp_dir().join("test_wal_replay_memtable");
1100 std::fs::create_dir_all(&temp_dir).ok();
1101
1102 {
1104 let config = WalConfig {
1105 wal_dir: temp_dir.clone(),
1106 sync_on_write: true,
1107 ..Default::default()
1108 };
1109
1110 let mut wal = Wal::with_config(config)?;
1111
1112 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1113 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1114 wal.delete(Key::from_str("key1"))?;
1115 wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1116
1117 wal.flush()?;
1118 }
1119
1120 let memtable = Memtable::new();
1122 let max_sequence = Wal::replay_to_memtable(&temp_dir, &memtable)?;
1123
1124 assert_eq!(max_sequence, 3);
1125
1126 assert_eq!(memtable.get(&Key::from_str("key1"))?, None); assert_eq!(
1129 memtable.get(&Key::from_str("key2"))?,
1130 Some(CipherBlob::new(vec![4, 5, 6]))
1131 );
1132 assert_eq!(
1133 memtable.get(&Key::from_str("key3"))?,
1134 Some(CipherBlob::new(vec![7, 8, 9]))
1135 );
1136
1137 std::fs::remove_dir_all(&temp_dir).ok();
1138 Ok(())
1139 }
1140
1141 #[test]
1142 fn test_wal_reader_basic() -> Result<()> {
1143 use std::env;
1144
1145 let temp_dir = env::temp_dir().join("test_wal_reader_basic");
1146 std::fs::create_dir_all(&temp_dir).ok();
1147
1148 let wal_file = temp_dir.join("test.wal");
1149
1150 {
1152 let mut wal = Wal::create(&wal_file)?;
1153 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1154 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1155 wal.flush()?;
1156 }
1157
1158 let wal_file_actual = temp_dir.join("wal_00000000.log");
1160 let mut reader = WalReader::open(&wal_file_actual)?;
1161
1162 let entry1 = reader.read_entry()?.expect("Should have entry 1");
1163 assert_eq!(entry1.sequence, 0);
1164 assert_eq!(entry1.key, Key::from_str("key1"));
1165
1166 let entry2 = reader.read_entry()?.expect("Should have entry 2");
1167 assert_eq!(entry2.sequence, 1);
1168 assert_eq!(entry2.key, Key::from_str("key2"));
1169
1170 let entry3 = reader.read_entry()?;
1171 assert_eq!(entry3, None); std::fs::remove_dir_all(&temp_dir).ok();
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn test_wal_recovery_with_truncated_file() -> Result<()> {
1179 use std::env;
1180 use std::io::Write as IoWrite;
1181
1182 let temp_dir = env::temp_dir().join("test_wal_recovery_truncated");
1183 std::fs::create_dir_all(&temp_dir).ok();
1184
1185 let wal_file = temp_dir.join("wal_00000000.log");
1187 {
1188 let mut wal = Wal::create(&wal_file)?;
1189 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1190 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1191 wal.flush()?;
1192
1193 let mut file = OpenOptions::new().append(true).open(&wal_file)?;
1195 let incomplete_len = 1234u32;
1196 file.write_all(&incomplete_len.to_le_bytes())?;
1197 file.flush()?;
1198 }
1199
1200 let (entries, _) = Wal::recover(&temp_dir)?;
1202
1203 assert_eq!(entries.len(), 2);
1205 assert_eq!(entries[0].key, Key::from_str("key1"));
1206 assert_eq!(entries[1].key, Key::from_str("key2"));
1207
1208 std::fs::remove_dir_all(&temp_dir).ok();
1209 Ok(())
1210 }
1211}