1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::types::{CipherBlob, Key};
8use std::fs::{File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11
12#[derive(Debug, Clone, Default)]
14pub struct RecoveryStats {
15 pub entries_recovered: u64,
17 pub entries_corrupted: u64,
19 pub bytes_recovered: u64,
21}
22
23#[derive(Debug, Clone, PartialEq)]
25pub enum WalEntryType {
26 Put = 1,
27 Delete = 2,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct WalEntry {
33 pub sequence: u64,
35 pub entry_type: WalEntryType,
37 pub key: Key,
39 pub value: Option<CipherBlob>,
41 pub checksum: u32,
43}
44
45impl WalEntry {
46 pub fn put(sequence: u64, key: Key, value: CipherBlob) -> Self {
48 let mut entry = Self {
49 sequence,
50 entry_type: WalEntryType::Put,
51 key,
52 value: Some(value),
53 checksum: 0,
54 };
55 entry.checksum = entry.calculate_checksum();
56 entry
57 }
58
59 pub fn delete(sequence: u64, key: Key) -> Self {
61 let mut entry = Self {
62 sequence,
63 entry_type: WalEntryType::Delete,
64 key,
65 value: None,
66 checksum: 0,
67 };
68 entry.checksum = entry.calculate_checksum();
69 entry
70 }
71
72 fn calculate_checksum(&self) -> u32 {
74 let mut hasher = crc32fast::Hasher::new();
75
76 hasher.update(&self.sequence.to_le_bytes());
78
79 hasher.update(&[self.entry_type.clone() as u8]);
81
82 hasher.update(self.key.as_bytes());
84
85 if let Some(ref value) = self.value {
87 hasher.update(value.as_bytes());
88 }
89
90 hasher.finalize()
91 }
92
93 pub fn verify_checksum(&self) -> Result<()> {
95 let calculated = self.calculate_checksum();
96 if calculated == self.checksum {
97 Ok(())
98 } else {
99 Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
100 "WAL entry checksum mismatch: expected {}, got {}",
101 self.checksum, calculated
102 ))))
103 }
104 }
105
106 pub fn encode(&self) -> Vec<u8> {
108 let mut bytes = Vec::new();
109
110 bytes.extend_from_slice(&0x57414Cu32.to_le_bytes());
112
113 bytes.extend_from_slice(&self.sequence.to_le_bytes());
115
116 bytes.push(self.entry_type.clone() as u8);
118
119 bytes.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
121 bytes.extend_from_slice(self.key.as_bytes());
122
123 if let Some(ref value) = self.value {
125 bytes.extend_from_slice(&(value.len() as u32).to_le_bytes());
126 bytes.extend_from_slice(value.as_bytes());
127 } else {
128 bytes.extend_from_slice(&0u32.to_le_bytes());
129 }
130
131 bytes.extend_from_slice(&self.checksum.to_le_bytes());
133
134 bytes
135 }
136
137 pub fn decode(bytes: &[u8]) -> Result<Self> {
139 if bytes.len() < 17 {
140 return Err(AmateRSError::SerializationError(ErrorContext::new(
142 "WAL entry too short",
143 )));
144 }
145
146 let mut offset = 0;
147
148 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
150 if magic != 0x57414C {
151 return Err(AmateRSError::SerializationError(ErrorContext::new(
152 "Invalid WAL entry magic number",
153 )));
154 }
155 offset += 4;
156
157 let sequence = u64::from_le_bytes(bytes[offset..offset + 8].try_into().map_err(|_| {
159 AmateRSError::SerializationError(ErrorContext::new("Failed to read sequence"))
160 })?);
161 offset += 8;
162
163 let entry_type = match bytes[offset] {
165 1 => WalEntryType::Put,
166 2 => WalEntryType::Delete,
167 _ => {
168 return Err(AmateRSError::SerializationError(ErrorContext::new(
169 "Invalid WAL entry type",
170 )));
171 }
172 };
173 offset += 1;
174
175 let key_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
177 AmateRSError::SerializationError(ErrorContext::new("Failed to read key length"))
178 })?) as usize;
179 offset += 4;
180
181 let key_bytes = &bytes[offset..offset + key_len];
182 let key = Key::from_slice(key_bytes);
183 offset += key_len;
184
185 let value_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
187 AmateRSError::SerializationError(ErrorContext::new("Failed to read value length"))
188 })?) as usize;
189 offset += 4;
190
191 let value = if value_len > 0 {
192 let value_bytes = &bytes[offset..offset + value_len];
193 Some(CipherBlob::new(value_bytes.to_vec()))
194 } else {
195 None
196 };
197 offset += value_len;
198
199 let checksum = u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
201 AmateRSError::SerializationError(ErrorContext::new("Failed to read checksum"))
202 })?);
203
204 let entry = Self {
205 sequence,
206 entry_type,
207 key,
208 value,
209 checksum,
210 };
211
212 entry.verify_checksum()?;
214
215 Ok(entry)
216 }
217}
218
219#[derive(Debug, Clone)]
221pub struct WalConfig {
222 pub wal_dir: PathBuf,
224 pub max_file_size: u64,
226 pub max_wal_files: usize,
228 pub sync_on_write: bool,
230}
231
232impl Default for WalConfig {
233 fn default() -> Self {
234 Self {
235 wal_dir: PathBuf::from("./wal"),
236 max_file_size: 64 * 1024 * 1024, max_wal_files: 10,
238 sync_on_write: true,
239 }
240 }
241}
242
243pub struct Wal {
245 config: WalConfig,
247 current_path: PathBuf,
249 writer: BufWriter<File>,
251 sequence: u64,
253 current_file_size: u64,
255 current_file_number: u64,
257}
258
259impl Wal {
260 pub fn create(path: impl AsRef<Path>) -> Result<Self> {
262 let path = path.as_ref().to_path_buf();
263 let parent = path.parent().ok_or_else(|| {
264 AmateRSError::IoError(ErrorContext::new("WAL path has no parent directory"))
265 })?;
266
267 let config = WalConfig {
268 wal_dir: parent.to_path_buf(),
269 ..Default::default()
270 };
271
272 Self::with_config(config)
273 }
274
275 pub fn with_config(config: WalConfig) -> Result<Self> {
277 std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
279 AmateRSError::IoError(ErrorContext::new(format!(
280 "Failed to create WAL directory: {}",
281 e
282 )))
283 })?;
284
285 let (file_number, sequence) = Self::find_latest_wal(&config)?;
287
288 let current_path = Self::wal_file_path(&config.wal_dir, file_number);
289
290 let file = OpenOptions::new()
291 .create(true)
292 .append(true)
293 .open(¤t_path)
294 .map_err(|e| {
295 AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL: {}", e)))
296 })?;
297
298 let current_file_size = file
299 .metadata()
300 .map_err(|e| {
301 AmateRSError::IoError(ErrorContext::new(format!(
302 "Failed to get WAL file size: {}",
303 e
304 )))
305 })?
306 .len();
307
308 Ok(Self {
309 config,
310 current_path,
311 writer: BufWriter::new(file),
312 sequence,
313 current_file_size,
314 current_file_number: file_number,
315 })
316 }
317
318 fn find_latest_wal(config: &WalConfig) -> Result<(u64, u64)> {
320 let mut max_file_number = 0u64;
321 let mut max_sequence = 0u64;
322
323 if config.wal_dir.exists() {
324 let wal_file_numbers = Self::list_wal_file_numbers(&config.wal_dir)?;
325
326 if let Some(&last) = wal_file_numbers.last() {
327 max_file_number = last;
328 }
329
330 for file_num in &wal_file_numbers {
332 let file_path = Self::wal_file_path(&config.wal_dir, *file_num);
333 if let Ok(mut reader) = WalReader::open(&file_path) {
334 loop {
335 match reader.read_entry() {
336 Ok(Some(entry)) => {
337 if entry.sequence >= max_sequence {
338 max_sequence = entry.sequence + 1;
339 }
340 }
341 Ok(None) => break,
342 Err(_) => {
343 tracing::warn!(
344 "Corrupted entry found in WAL file {} during startup",
345 file_path.display()
346 );
347 continue;
348 }
349 }
350 }
351 }
352 }
353 }
354
355 Ok((max_file_number, max_sequence))
356 }
357
358 fn wal_file_path(wal_dir: &Path, file_number: u64) -> PathBuf {
360 wal_dir.join(format!("wal_{:08}.log", file_number))
361 }
362
363 fn list_wal_file_numbers(wal_dir: &Path) -> Result<Vec<u64>> {
365 let entries = std::fs::read_dir(wal_dir).map_err(|e| {
366 AmateRSError::IoError(ErrorContext::new(format!(
367 "Failed to read WAL directory: {}",
368 e
369 )))
370 })?;
371
372 let mut numbers = Vec::new();
373 for entry in entries {
374 let entry = entry.map_err(|e| {
375 AmateRSError::IoError(ErrorContext::new(format!(
376 "Failed to read directory entry: {}",
377 e
378 )))
379 })?;
380 let file_name = entry.file_name();
381 let name = file_name.to_string_lossy();
382 if name.starts_with("wal_") && name.ends_with(".log") {
383 if let Ok(number) = name[4..name.len() - 4].parse::<u64>() {
384 numbers.push(number);
385 }
386 }
387 }
388 numbers.sort_unstable();
389 Ok(numbers)
390 }
391
392 pub fn put(&mut self, key: Key, value: CipherBlob) -> Result<u64> {
394 let sequence = self.sequence;
395 self.sequence += 1;
396
397 let entry = WalEntry::put(sequence, key, value);
398 self.write_entry(&entry)?;
399
400 Ok(sequence)
401 }
402
403 pub fn delete(&mut self, key: Key) -> Result<u64> {
405 let sequence = self.sequence;
406 self.sequence += 1;
407
408 let entry = WalEntry::delete(sequence, key);
409 self.write_entry(&entry)?;
410
411 Ok(sequence)
412 }
413
414 fn write_entry(&mut self, entry: &WalEntry) -> Result<()> {
416 let bytes = entry.encode();
417
418 let len = bytes.len() as u32;
420 self.writer.write_all(&len.to_le_bytes()).map_err(|e| {
421 AmateRSError::IoError(ErrorContext::new(format!(
422 "Failed to write WAL entry: {}",
423 e
424 )))
425 })?;
426
427 self.writer.write_all(&bytes).map_err(|e| {
429 AmateRSError::IoError(ErrorContext::new(format!(
430 "Failed to write WAL entry: {}",
431 e
432 )))
433 })?;
434
435 let entry_size = (4 + bytes.len()) as u64; self.current_file_size += entry_size;
438
439 if self.config.sync_on_write {
441 self.writer.flush().map_err(|e| {
442 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
443 })?;
444 }
445
446 if self.current_file_size >= self.config.max_file_size {
448 self.rotate()?;
449 }
450
451 Ok(())
452 }
453
454 pub fn rotate(&mut self) -> Result<()> {
456 self.flush()?;
458
459 self.current_file_number += 1;
461
462 let new_path = Self::wal_file_path(&self.config.wal_dir, self.current_file_number);
464
465 let file = OpenOptions::new()
466 .create(true)
467 .append(true)
468 .open(&new_path)
469 .map_err(|e| {
470 AmateRSError::IoError(ErrorContext::new(format!(
471 "Failed to create new WAL file: {}",
472 e
473 )))
474 })?;
475
476 self.current_path = new_path;
477 self.writer = BufWriter::new(file);
478 self.current_file_size = 0;
479
480 self.cleanup_old_wal_files()?;
482
483 Ok(())
484 }
485
486 fn cleanup_old_wal_files(&self) -> Result<()> {
488 let wal_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
489
490 if wal_files.len() > self.config.max_wal_files {
491 let files_to_delete = wal_files.len() - self.config.max_wal_files;
492
493 for &file_number in wal_files.iter().take(files_to_delete) {
494 let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
495 std::fs::remove_file(&file_path).map_err(|e| {
496 AmateRSError::IoError(ErrorContext::new(format!(
497 "Failed to delete old WAL file: {}",
498 e
499 )))
500 })?;
501 }
502 }
503
504 Ok(())
505 }
506
507 pub fn cleanup(&self) -> Result<()> {
509 self.cleanup_old_wal_files()
510 }
511
512 pub fn current_file_size(&self) -> u64 {
514 self.current_file_size
515 }
516
517 pub fn current_file_number(&self) -> u64 {
519 self.current_file_number
520 }
521
522 pub fn flush(&mut self) -> Result<()> {
524 self.writer.flush().map_err(|e| {
525 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush WAL: {}", e)))
526 })?;
527
528 self.writer.get_ref().sync_all().map_err(|e| {
529 AmateRSError::IoError(ErrorContext::new(format!("Failed to sync WAL: {}", e)))
530 })?;
531
532 Ok(())
533 }
534
535 pub fn sequence(&self) -> u64 {
537 self.sequence
538 }
539
540 pub fn path(&self) -> &Path {
542 &self.current_path
543 }
544
545 pub fn recover(wal_dir: impl AsRef<Path>) -> Result<(Vec<WalEntry>, u64)> {
553 let wal_dir = wal_dir.as_ref();
554
555 if !wal_dir.exists() {
556 return Ok((Vec::new(), 0));
557 }
558
559 let wal_files = Self::list_wal_file_numbers(wal_dir)?;
560
561 let mut all_entries = Vec::new();
562 let mut max_sequence = 0u64;
563
564 for file_number in wal_files {
565 let file_path = Self::wal_file_path(wal_dir, file_number);
566 let mut reader = WalReader::open(&file_path)?;
567
568 loop {
569 match reader.read_entry() {
570 Ok(Some(entry)) => {
571 if entry.sequence > max_sequence {
572 max_sequence = entry.sequence;
573 }
574 all_entries.push(entry);
575 }
576 Ok(None) => break,
577 Err(e) => {
578 tracing::warn!(
579 "Skipping corrupted entry in {}: {}",
580 file_path.display(),
581 e
582 );
583 continue;
584 }
585 }
586 }
587 }
588
589 Ok((all_entries, max_sequence))
590 }
591
592 pub fn current_size(&self) -> u64 {
594 self.current_file_size
595 }
596
597 pub fn total_wal_size(&self) -> Result<u64> {
599 let wal_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
600 let mut total_size = 0u64;
601
602 for file_number in wal_files {
603 let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
604 let metadata = std::fs::metadata(&file_path).map_err(|e| {
605 AmateRSError::IoError(ErrorContext::new(format!(
606 "Failed to read WAL file metadata: {}",
607 e
608 )))
609 })?;
610 total_size += metadata.len();
611 }
612
613 Ok(total_size)
614 }
615
616 pub fn truncate_before(&mut self, sequence: u64) -> Result<u64> {
623 self.flush()?;
624
625 let all_files = Self::list_wal_file_numbers(&self.config.wal_dir)?;
626 let wal_files: Vec<u64> = all_files
628 .into_iter()
629 .filter(|&n| n != self.current_file_number)
630 .collect();
631
632 let mut files_truncated = 0u64;
633
634 for file_number in wal_files {
635 let file_path = Self::wal_file_path(&self.config.wal_dir, file_number);
636
637 let mut file_max_seq = 0u64;
639 if let Ok(mut reader) = WalReader::open(&file_path) {
640 loop {
641 match reader.read_entry() {
642 Ok(Some(entry)) => {
643 if entry.sequence > file_max_seq {
644 file_max_seq = entry.sequence;
645 }
646 }
647 Ok(None) => break,
648 Err(_) => continue,
649 }
650 }
651 }
652
653 if file_max_seq <= sequence {
655 std::fs::remove_file(&file_path).map_err(|e| {
656 AmateRSError::IoError(ErrorContext::new(format!(
657 "Failed to remove WAL file {}: {}",
658 file_path.display(),
659 e
660 )))
661 })?;
662 files_truncated += 1;
663 }
664 }
665
666 Ok(files_truncated)
667 }
668
669 pub fn recover_with_stats(
674 wal_dir: impl AsRef<Path>,
675 ) -> Result<(Vec<WalEntry>, u64, RecoveryStats)> {
676 let wal_dir = wal_dir.as_ref();
677 let mut stats = RecoveryStats::default();
678
679 if !wal_dir.exists() {
680 return Ok((Vec::new(), 0, stats));
681 }
682
683 let wal_files = Self::list_wal_file_numbers(wal_dir)?;
684
685 let mut all_entries = Vec::new();
686 let mut max_sequence = 0u64;
687
688 for file_number in wal_files {
689 let file_path = Self::wal_file_path(wal_dir, file_number);
690 let mut reader = WalReader::open(&file_path)?;
691
692 loop {
693 match reader.read_entry() {
694 Ok(Some(entry)) => {
695 let entry_bytes = entry.encode().len() as u64 + 4; stats.bytes_recovered += entry_bytes;
697 stats.entries_recovered += 1;
698 if entry.sequence > max_sequence {
699 max_sequence = entry.sequence;
700 }
701 all_entries.push(entry);
702 }
703 Ok(None) => break,
704 Err(e) => {
705 stats.entries_corrupted += 1;
706 tracing::warn!(
707 "Skipping corrupted entry in {}: {}",
708 file_path.display(),
709 e
710 );
711 continue;
712 }
713 }
714 }
715 }
716
717 Ok((all_entries, max_sequence, stats))
718 }
719
720 pub fn replay_to_memtable(
727 wal_dir: impl AsRef<Path>,
728 memtable: &crate::storage::memtable::Memtable,
729 ) -> Result<u64> {
730 let (entries, max_sequence) = Self::recover(wal_dir)?;
731
732 for entry in entries {
733 match entry.entry_type {
734 WalEntryType::Put => {
735 if let Some(value) = entry.value {
736 memtable.put(entry.key, value)?;
737 }
738 }
739 WalEntryType::Delete => {
740 memtable.delete(entry.key)?;
741 }
742 }
743 }
744
745 Ok(max_sequence)
746 }
747}
748
749pub struct WalReader {
751 reader: BufReader<File>,
752}
753
754impl WalReader {
755 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
757 let file = File::open(path.as_ref()).map_err(|e| {
758 AmateRSError::IoError(ErrorContext::new(format!("Failed to open WAL file: {}", e)))
759 })?;
760
761 Ok(Self {
762 reader: BufReader::new(file),
763 })
764 }
765
766 pub fn read_entry(&mut self) -> Result<Option<WalEntry>> {
773 let mut len_bytes = [0u8; 4];
775 match self.reader.read_exact(&mut len_bytes) {
776 Ok(()) => {}
777 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
778 return Ok(None);
780 }
781 Err(e) => {
782 return Err(AmateRSError::IoError(ErrorContext::new(format!(
783 "Failed to read WAL entry length: {}",
784 e
785 ))));
786 }
787 }
788
789 let len = u32::from_le_bytes(len_bytes) as usize;
790
791 if len > 100 * 1024 * 1024 {
793 return Err(AmateRSError::SerializationError(ErrorContext::new(
794 format!("WAL entry too large: {} bytes", len),
795 )));
796 }
797
798 let mut entry_bytes = vec![0u8; len];
800 match self.reader.read_exact(&mut entry_bytes) {
801 Ok(()) => {}
802 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
803 return Err(AmateRSError::SerializationError(ErrorContext::new(
805 "Incomplete WAL entry (truncated file)",
806 )));
807 }
808 Err(e) => {
809 return Err(AmateRSError::IoError(ErrorContext::new(format!(
810 "Failed to read WAL entry: {}",
811 e
812 ))));
813 }
814 }
815
816 let entry = WalEntry::decode(&entry_bytes)?;
818
819 Ok(Some(entry))
820 }
821}
822
823#[cfg(test)]
824mod tests {
825 use super::*;
826 use crate::storage::Memtable;
827 use std::fs;
828 use tempfile::tempdir;
829
830 #[test]
831 fn test_wal_entry_encode_decode() -> Result<()> {
832 let key = Key::from_str("test_key");
833 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
834 let entry = WalEntry::put(42, key.clone(), value.clone());
835
836 let bytes = entry.encode();
837 let decoded = WalEntry::decode(&bytes)?;
838
839 assert_eq!(decoded.sequence, 42);
840 assert_eq!(decoded.entry_type, WalEntryType::Put);
841 assert_eq!(decoded.key, key);
842 assert_eq!(decoded.value, Some(value));
843
844 Ok(())
845 }
846
847 #[test]
848 fn test_wal_delete_entry() -> Result<()> {
849 let key = Key::from_str("delete_me");
850 let entry = WalEntry::delete(99, key.clone());
851
852 let bytes = entry.encode();
853 let decoded = WalEntry::decode(&bytes)?;
854
855 assert_eq!(decoded.sequence, 99);
856 assert_eq!(decoded.entry_type, WalEntryType::Delete);
857 assert_eq!(decoded.key, key);
858 assert_eq!(decoded.value, None);
859
860 Ok(())
861 }
862
863 #[test]
864 fn test_wal_checksum_verification() -> Result<()> {
865 let key = Key::from_str("test");
866 let value = CipherBlob::new(vec![1, 2, 3]);
867 let entry = WalEntry::put(1, key, value);
868
869 entry.verify_checksum()?;
871
872 let mut corrupted = entry.clone();
874 corrupted.checksum = 0;
875
876 assert!(corrupted.verify_checksum().is_err());
878
879 Ok(())
880 }
881
882 #[test]
883 fn test_wal_basic_operations() -> Result<()> {
884 let temp_dir = tempdir().map_err(|e| {
885 AmateRSError::IoError(ErrorContext::new(format!(
886 "Failed to create temp dir: {}",
887 e
888 )))
889 })?;
890 let wal_path = temp_dir.path().join("test.wal");
891
892 let mut wal = Wal::create(&wal_path)?;
893
894 let seq1 = wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
896 let seq2 = wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
897 let seq3 = wal.delete(Key::from_str("key1"))?;
898
899 assert_eq!(seq1, 0);
900 assert_eq!(seq2, 1);
901 assert_eq!(seq3, 2);
902
903 wal.flush()?;
904
905 assert!(wal.path().exists());
907
908 Ok(())
909 }
910
911 #[test]
912 fn test_wal_sequence_increment() -> Result<()> {
913 let temp_dir = tempdir().map_err(|e| {
914 AmateRSError::IoError(ErrorContext::new(format!(
915 "Failed to create temp dir: {}",
916 e
917 )))
918 })?;
919 let wal_path = temp_dir.path().join("test_seq.wal");
920
921 let mut wal = Wal::create(&wal_path)?;
922
923 assert_eq!(wal.sequence(), 0);
924
925 wal.put(Key::from_str("key"), CipherBlob::new(vec![1]))?;
926 assert_eq!(wal.sequence(), 1);
927
928 wal.delete(Key::from_str("key"))?;
929 assert_eq!(wal.sequence(), 2);
930
931 Ok(())
932 }
933
934 #[test]
935 fn test_wal_entry_large_value() -> Result<()> {
936 let key = Key::from_str("large");
937 let large_value = CipherBlob::new(vec![0u8; 10_000]);
938 let entry = WalEntry::put(1, key.clone(), large_value.clone());
939
940 let bytes = entry.encode();
941 let decoded = WalEntry::decode(&bytes)?;
942
943 assert_eq!(decoded.key, key);
944 assert_eq!(decoded.value, Some(large_value));
945
946 Ok(())
947 }
948
949 #[test]
950 fn test_wal_rotation() -> Result<()> {
951 use std::env;
952
953 let temp_dir = env::temp_dir().join("test_wal_rotation");
954 std::fs::create_dir_all(&temp_dir).ok();
955
956 let config = WalConfig {
957 wal_dir: temp_dir.clone(),
958 max_file_size: 1024, sync_on_write: false, ..Default::default()
961 };
962
963 let mut wal = Wal::with_config(config)?;
964
965 let initial_file_number = wal.current_file_number();
966
967 for i in 0..20 {
969 wal.put(
970 Key::from_str(&format!("key_{}", i)),
971 CipherBlob::new(vec![i as u8; 100]),
972 )?;
973 }
974
975 assert!(wal.current_file_number() > initial_file_number);
977
978 assert!(wal.path().exists());
980
981 std::fs::remove_dir_all(&temp_dir).ok();
982 Ok(())
983 }
984
985 #[test]
986 fn test_wal_cleanup() -> Result<()> {
987 use std::env;
988
989 let temp_dir = env::temp_dir().join("test_wal_cleanup");
990 std::fs::create_dir_all(&temp_dir).ok();
991
992 let config = WalConfig {
993 wal_dir: temp_dir.clone(),
994 max_file_size: 512, max_wal_files: 3, sync_on_write: false,
997 };
998
999 let mut wal = Wal::with_config(config)?;
1000
1001 for i in 0..100 {
1003 wal.put(
1004 Key::from_str(&format!("key_{}", i)),
1005 CipherBlob::new(vec![i as u8; 100]),
1006 )?;
1007 }
1008
1009 let wal_file_count = std::fs::read_dir(&temp_dir)?
1011 .filter_map(|e| e.ok())
1012 .filter(|e| {
1013 e.file_name().to_string_lossy().starts_with("wal_")
1014 && e.file_name().to_string_lossy().ends_with(".log")
1015 })
1016 .count();
1017
1018 assert!(wal_file_count <= 3);
1020
1021 std::fs::remove_dir_all(&temp_dir).ok();
1022 Ok(())
1023 }
1024
1025 #[test]
1026 fn test_wal_manual_cleanup() -> Result<()> {
1027 use std::env;
1028
1029 let temp_dir = env::temp_dir().join("test_wal_manual_cleanup");
1030 std::fs::create_dir_all(&temp_dir).ok();
1031
1032 let config = WalConfig {
1033 wal_dir: temp_dir.clone(),
1034 max_file_size: 512,
1035 max_wal_files: 5,
1036 sync_on_write: false,
1037 };
1038
1039 let mut wal = Wal::with_config(config)?;
1040
1041 for i in 0..80 {
1043 wal.put(
1044 Key::from_str(&format!("key_{}", i)),
1045 CipherBlob::new(vec![i as u8; 100]),
1046 )?;
1047 }
1048
1049 wal.cleanup()?;
1051
1052 let wal_file_count = std::fs::read_dir(&temp_dir)?
1054 .filter_map(|e| e.ok())
1055 .filter(|e| {
1056 e.file_name().to_string_lossy().starts_with("wal_")
1057 && e.file_name().to_string_lossy().ends_with(".log")
1058 })
1059 .count();
1060
1061 assert!(wal_file_count <= 5);
1062
1063 std::fs::remove_dir_all(&temp_dir).ok();
1064 Ok(())
1065 }
1066
1067 #[test]
1068 fn test_wal_recovery_basic() -> Result<()> {
1069 use std::env;
1070
1071 let temp_dir = env::temp_dir().join("test_wal_recovery_basic");
1072 std::fs::create_dir_all(&temp_dir).ok();
1073
1074 {
1076 let config = WalConfig {
1077 wal_dir: temp_dir.clone(),
1078 sync_on_write: true,
1079 ..Default::default()
1080 };
1081
1082 let mut wal = Wal::with_config(config)?;
1083
1084 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1085 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1086 wal.delete(Key::from_str("key1"))?;
1087 wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1088
1089 wal.flush()?;
1090 }
1091
1092 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1094
1095 assert_eq!(entries.len(), 4);
1096 assert_eq!(max_sequence, 3);
1097
1098 assert_eq!(entries[0].key, Key::from_str("key1"));
1100 assert_eq!(entries[0].entry_type, WalEntryType::Put);
1101 assert_eq!(entries[0].value, Some(CipherBlob::new(vec![1, 2, 3])));
1102
1103 assert_eq!(entries[1].key, Key::from_str("key2"));
1104 assert_eq!(entries[1].entry_type, WalEntryType::Put);
1105
1106 assert_eq!(entries[2].key, Key::from_str("key1"));
1107 assert_eq!(entries[2].entry_type, WalEntryType::Delete);
1108 assert_eq!(entries[2].value, None);
1109
1110 assert_eq!(entries[3].key, Key::from_str("key3"));
1111 assert_eq!(entries[3].entry_type, WalEntryType::Put);
1112
1113 std::fs::remove_dir_all(&temp_dir).ok();
1114 Ok(())
1115 }
1116
1117 #[test]
1118 fn test_wal_recovery_multiple_files() -> Result<()> {
1119 use std::env;
1120
1121 let temp_dir = env::temp_dir().join("test_wal_recovery_multiple");
1122 std::fs::create_dir_all(&temp_dir).ok();
1123
1124 {
1126 let config = WalConfig {
1127 wal_dir: temp_dir.clone(),
1128 max_file_size: 512, sync_on_write: true,
1130 ..Default::default()
1131 };
1132
1133 let mut wal = Wal::with_config(config)?;
1134
1135 for i in 0..20 {
1137 wal.put(
1138 Key::from_str(&format!("key_{}", i)),
1139 CipherBlob::new(vec![i as u8; 100]),
1140 )?;
1141 }
1142
1143 wal.flush()?;
1144 }
1145
1146 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1148
1149 assert_eq!(entries.len(), 20);
1150 assert_eq!(max_sequence, 19);
1151
1152 for (i, entry) in entries.iter().enumerate() {
1154 assert_eq!(entry.sequence, i as u64);
1155 assert_eq!(entry.key, Key::from_str(&format!("key_{}", i)));
1156 }
1157
1158 std::fs::remove_dir_all(&temp_dir).ok();
1159 Ok(())
1160 }
1161
1162 #[test]
1163 fn test_wal_recovery_empty_directory() -> Result<()> {
1164 use std::env;
1165
1166 let temp_dir = env::temp_dir().join("test_wal_recovery_empty");
1167 std::fs::create_dir_all(&temp_dir).ok();
1168
1169 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1171
1172 assert_eq!(entries.len(), 0);
1173 assert_eq!(max_sequence, 0);
1174
1175 std::fs::remove_dir_all(&temp_dir).ok();
1176 Ok(())
1177 }
1178
1179 #[test]
1180 fn test_wal_recovery_nonexistent_directory() -> Result<()> {
1181 use std::env;
1182
1183 let temp_dir = env::temp_dir().join("nonexistent_wal_dir_12345");
1184
1185 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1187
1188 assert_eq!(entries.len(), 0);
1189 assert_eq!(max_sequence, 0);
1190
1191 Ok(())
1192 }
1193
1194 #[test]
1195 fn test_wal_replay_to_memtable() -> Result<()> {
1196 use std::env;
1197
1198 let temp_dir = env::temp_dir().join("test_wal_replay_memtable");
1199 std::fs::create_dir_all(&temp_dir).ok();
1200
1201 {
1203 let config = WalConfig {
1204 wal_dir: temp_dir.clone(),
1205 sync_on_write: true,
1206 ..Default::default()
1207 };
1208
1209 let mut wal = Wal::with_config(config)?;
1210
1211 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1212 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1213 wal.delete(Key::from_str("key1"))?;
1214 wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1215
1216 wal.flush()?;
1217 }
1218
1219 let memtable = Memtable::new();
1221 let max_sequence = Wal::replay_to_memtable(&temp_dir, &memtable)?;
1222
1223 assert_eq!(max_sequence, 3);
1224
1225 assert_eq!(memtable.get(&Key::from_str("key1"))?, None); assert_eq!(
1228 memtable.get(&Key::from_str("key2"))?,
1229 Some(CipherBlob::new(vec![4, 5, 6]))
1230 );
1231 assert_eq!(
1232 memtable.get(&Key::from_str("key3"))?,
1233 Some(CipherBlob::new(vec![7, 8, 9]))
1234 );
1235
1236 std::fs::remove_dir_all(&temp_dir).ok();
1237 Ok(())
1238 }
1239
1240 #[test]
1241 fn test_wal_reader_basic() -> Result<()> {
1242 use std::env;
1243
1244 let temp_dir = env::temp_dir().join("test_wal_reader_basic");
1245 std::fs::create_dir_all(&temp_dir).ok();
1246
1247 let wal_file = temp_dir.join("test.wal");
1248
1249 {
1251 let mut wal = Wal::create(&wal_file)?;
1252 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1253 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1254 wal.flush()?;
1255 }
1256
1257 let wal_file_actual = temp_dir.join("wal_00000000.log");
1259 let mut reader = WalReader::open(&wal_file_actual)?;
1260
1261 let entry1 = reader.read_entry()?.expect("Should have entry 1");
1262 assert_eq!(entry1.sequence, 0);
1263 assert_eq!(entry1.key, Key::from_str("key1"));
1264
1265 let entry2 = reader.read_entry()?.expect("Should have entry 2");
1266 assert_eq!(entry2.sequence, 1);
1267 assert_eq!(entry2.key, Key::from_str("key2"));
1268
1269 let entry3 = reader.read_entry()?;
1270 assert_eq!(entry3, None); std::fs::remove_dir_all(&temp_dir).ok();
1273 Ok(())
1274 }
1275
1276 #[test]
1277 fn test_wal_recovery_with_truncated_file() -> Result<()> {
1278 use std::env;
1279 use std::io::Write as IoWrite;
1280
1281 let temp_dir = env::temp_dir().join("test_wal_recovery_truncated");
1282 std::fs::create_dir_all(&temp_dir).ok();
1283
1284 let wal_file = temp_dir.join("wal_00000000.log");
1286 {
1287 let mut wal = Wal::create(&wal_file)?;
1288 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1289 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1290 wal.flush()?;
1291
1292 let mut file = OpenOptions::new().append(true).open(&wal_file)?;
1294 let incomplete_len = 1234u32;
1295 file.write_all(&incomplete_len.to_le_bytes())?;
1296 file.flush()?;
1297 }
1298
1299 let (entries, _) = Wal::recover(&temp_dir)?;
1301
1302 assert_eq!(entries.len(), 2);
1304 assert_eq!(entries[0].key, Key::from_str("key1"));
1305 assert_eq!(entries[1].key, Key::from_str("key2"));
1306
1307 std::fs::remove_dir_all(&temp_dir).ok();
1308 Ok(())
1309 }
1310
1311 #[test]
1312 fn test_wal_sequence_recovery_after_crash() -> Result<()> {
1313 use std::env;
1314
1315 let temp_dir = env::temp_dir().join("test_wal_seq_recovery_crash");
1316 std::fs::remove_dir_all(&temp_dir).ok();
1318 std::fs::create_dir_all(&temp_dir).ok();
1319
1320 {
1322 let config = WalConfig {
1323 wal_dir: temp_dir.clone(),
1324 sync_on_write: true,
1325 ..Default::default()
1326 };
1327
1328 let mut wal = Wal::with_config(config)?;
1329
1330 wal.put(Key::from_str("a"), CipherBlob::new(vec![1]))?;
1331 wal.put(Key::from_str("b"), CipherBlob::new(vec![2]))?;
1332 wal.put(Key::from_str("c"), CipherBlob::new(vec![3]))?;
1333 wal.put(Key::from_str("d"), CipherBlob::new(vec![4]))?;
1334 wal.put(Key::from_str("e"), CipherBlob::new(vec![5]))?;
1335 wal.flush()?;
1336 }
1338
1339 {
1341 let config = WalConfig {
1342 wal_dir: temp_dir.clone(),
1343 sync_on_write: true,
1344 ..Default::default()
1345 };
1346
1347 let mut wal = Wal::with_config(config)?;
1348
1349 assert_eq!(wal.sequence(), 5);
1351
1352 let seq = wal.put(Key::from_str("f"), CipherBlob::new(vec![6]))?;
1354 assert_eq!(seq, 5);
1355
1356 let seq = wal.put(Key::from_str("g"), CipherBlob::new(vec![7]))?;
1357 assert_eq!(seq, 6);
1358
1359 wal.flush()?;
1360 }
1361
1362 let (entries, max_sequence) = Wal::recover(&temp_dir)?;
1364 assert_eq!(entries.len(), 7);
1365 assert_eq!(max_sequence, 6);
1366
1367 std::fs::remove_dir_all(&temp_dir).ok();
1368 Ok(())
1369 }
1370
1371 #[test]
1372 fn test_wal_corruption_detection_and_partial_recovery() -> Result<()> {
1373 use std::env;
1374 use std::io::Write as IoWrite;
1375
1376 let temp_dir = env::temp_dir().join("test_wal_corruption_detect");
1377 std::fs::remove_dir_all(&temp_dir).ok();
1378 std::fs::create_dir_all(&temp_dir).ok();
1379
1380 let wal_file = temp_dir.join("wal_00000000.log");
1381
1382 {
1384 let config = WalConfig {
1385 wal_dir: temp_dir.clone(),
1386 sync_on_write: true,
1387 ..Default::default()
1388 };
1389
1390 let mut wal = Wal::with_config(config)?;
1391 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1392 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1393 wal.put(Key::from_str("key3"), CipherBlob::new(vec![7, 8, 9]))?;
1394 wal.flush()?;
1395 }
1396
1397 {
1399 let data = std::fs::read(&wal_file).map_err(|e| {
1400 AmateRSError::IoError(ErrorContext::new(format!("Failed to read WAL: {}", e)))
1401 })?;
1402
1403 let mut corrupted_data = data.clone();
1404 let first_entry_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
1407 let second_entry_start = 4 + first_entry_len;
1408
1409 let corrupt_offset = second_entry_start + 4 + 10; if corrupt_offset < corrupted_data.len() {
1412 corrupted_data[corrupt_offset] ^= 0xFF;
1413 }
1414
1415 let mut file = File::create(&wal_file).map_err(|e| {
1416 AmateRSError::IoError(ErrorContext::new(format!("Failed to create file: {}", e)))
1417 })?;
1418 file.write_all(&corrupted_data).map_err(|e| {
1419 AmateRSError::IoError(ErrorContext::new(format!("Failed to write file: {}", e)))
1420 })?;
1421 file.flush().map_err(|e| {
1422 AmateRSError::IoError(ErrorContext::new(format!("Failed to flush file: {}", e)))
1423 })?;
1424 }
1425
1426 let (entries, _max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1428
1429 assert_eq!(stats.entries_corrupted, 1);
1431 assert_eq!(stats.entries_recovered, entries.len() as u64);
1432 assert!(stats.bytes_recovered > 0);
1433 assert!(entries.len() >= 2);
1435
1436 std::fs::remove_dir_all(&temp_dir).ok();
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn test_wal_truncate_before() -> Result<()> {
1442 use std::env;
1443
1444 let temp_dir = env::temp_dir().join("test_wal_truncate_before");
1445 std::fs::remove_dir_all(&temp_dir).ok();
1446 std::fs::create_dir_all(&temp_dir).ok();
1447
1448 let config = WalConfig {
1449 wal_dir: temp_dir.clone(),
1450 max_file_size: 512, max_wal_files: 100, sync_on_write: true,
1453 };
1454
1455 let mut wal = Wal::with_config(config)?;
1456
1457 for i in 0..30 {
1459 wal.put(
1460 Key::from_str(&format!("key_{}", i)),
1461 CipherBlob::new(vec![i as u8; 100]),
1462 )?;
1463 }
1464 wal.flush()?;
1465
1466 let file_count_before = std::fs::read_dir(&temp_dir)
1468 .map_err(|e| {
1469 AmateRSError::IoError(ErrorContext::new(format!("Failed to read dir: {}", e)))
1470 })?
1471 .filter_map(|e| e.ok())
1472 .filter(|e| {
1473 let name = e.file_name().to_string_lossy().to_string();
1474 name.starts_with("wal_") && name.ends_with(".log")
1475 })
1476 .count();
1477 assert!(file_count_before > 1, "Should have multiple WAL files");
1478
1479 let truncated = wal.truncate_before(10)?;
1481
1482 assert!(truncated > 0, "Should have truncated at least one file");
1484
1485 let (remaining_entries, _) = Wal::recover(&temp_dir)?;
1487 let has_high_seq = remaining_entries.iter().any(|e| e.sequence > 10);
1490 assert!(has_high_seq, "Should still have entries with sequence > 10");
1491
1492 std::fs::remove_dir_all(&temp_dir).ok();
1493 Ok(())
1494 }
1495
1496 #[test]
1497 fn test_wal_size_tracking() -> Result<()> {
1498 use std::env;
1499
1500 let temp_dir = env::temp_dir().join("test_wal_size_tracking");
1501 std::fs::remove_dir_all(&temp_dir).ok();
1502 std::fs::create_dir_all(&temp_dir).ok();
1503
1504 let config = WalConfig {
1505 wal_dir: temp_dir.clone(),
1506 sync_on_write: true,
1507 ..Default::default()
1508 };
1509
1510 let mut wal = Wal::with_config(config)?;
1511
1512 assert_eq!(wal.current_size(), 0);
1514
1515 wal.put(Key::from_str("key1"), CipherBlob::new(vec![1, 2, 3]))?;
1517 let size_after_one = wal.current_size();
1518 assert!(size_after_one > 0, "Size should increase after writing");
1519
1520 wal.put(Key::from_str("key2"), CipherBlob::new(vec![4, 5, 6]))?;
1522 let size_after_two = wal.current_size();
1523 assert!(
1524 size_after_two > size_after_one,
1525 "Size should increase with more entries"
1526 );
1527
1528 wal.flush()?;
1529
1530 let total = wal.total_wal_size()?;
1532 assert_eq!(total, size_after_two);
1533
1534 std::fs::remove_dir_all(&temp_dir).ok();
1535 Ok(())
1536 }
1537
1538 #[test]
1539 fn test_wal_total_size_multiple_files() -> Result<()> {
1540 use std::env;
1541
1542 let temp_dir = env::temp_dir().join("test_wal_total_size_multi");
1543 std::fs::remove_dir_all(&temp_dir).ok();
1544 std::fs::create_dir_all(&temp_dir).ok();
1545
1546 let config = WalConfig {
1547 wal_dir: temp_dir.clone(),
1548 max_file_size: 512,
1549 max_wal_files: 100,
1550 sync_on_write: true,
1551 };
1552
1553 let mut wal = Wal::with_config(config)?;
1554
1555 for i in 0..20 {
1556 wal.put(
1557 Key::from_str(&format!("key_{}", i)),
1558 CipherBlob::new(vec![i as u8; 100]),
1559 )?;
1560 }
1561 wal.flush()?;
1562
1563 let total = wal.total_wal_size()?;
1564 assert!(total > 0, "Total WAL size should be positive");
1565
1566 if wal.current_file_number() > 0 {
1568 assert!(
1569 total >= wal.current_size(),
1570 "Total size should be >= current file size"
1571 );
1572 }
1573
1574 std::fs::remove_dir_all(&temp_dir).ok();
1575 Ok(())
1576 }
1577
1578 #[test]
1579 fn test_wal_empty_recovery() -> Result<()> {
1580 use std::env;
1581
1582 let temp_dir = env::temp_dir().join("test_wal_empty_recovery");
1583 std::fs::remove_dir_all(&temp_dir).ok();
1584 std::fs::create_dir_all(&temp_dir).ok();
1585
1586 {
1588 let config = WalConfig {
1589 wal_dir: temp_dir.clone(),
1590 sync_on_write: true,
1591 ..Default::default()
1592 };
1593 let wal = Wal::with_config(config)?;
1594 drop(wal);
1595 }
1596
1597 let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1599 assert_eq!(entries.len(), 0);
1600 assert_eq!(max_seq, 0);
1601 assert_eq!(stats.entries_recovered, 0);
1602 assert_eq!(stats.entries_corrupted, 0);
1603
1604 std::fs::remove_dir_all(&temp_dir).ok();
1605 Ok(())
1606 }
1607
1608 #[test]
1609 fn test_wal_single_entry_recovery() -> Result<()> {
1610 use std::env;
1611
1612 let temp_dir = env::temp_dir().join("test_wal_single_entry_recovery");
1613 std::fs::remove_dir_all(&temp_dir).ok();
1614 std::fs::create_dir_all(&temp_dir).ok();
1615
1616 {
1617 let config = WalConfig {
1618 wal_dir: temp_dir.clone(),
1619 sync_on_write: true,
1620 ..Default::default()
1621 };
1622
1623 let mut wal = Wal::with_config(config)?;
1624 wal.put(Key::from_str("only_key"), CipherBlob::new(vec![42]))?;
1625 wal.flush()?;
1626 }
1627
1628 let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1629 assert_eq!(entries.len(), 1);
1630 assert_eq!(max_seq, 0);
1631 assert_eq!(stats.entries_recovered, 1);
1632 assert_eq!(stats.entries_corrupted, 0);
1633 assert!(stats.bytes_recovered > 0);
1634 assert_eq!(entries[0].key, Key::from_str("only_key"));
1635
1636 std::fs::remove_dir_all(&temp_dir).ok();
1637 Ok(())
1638 }
1639
1640 #[test]
1641 fn test_wal_large_recovery() -> Result<()> {
1642 use std::env;
1643
1644 let temp_dir = env::temp_dir().join("test_wal_large_recovery");
1645 std::fs::remove_dir_all(&temp_dir).ok();
1646 std::fs::create_dir_all(&temp_dir).ok();
1647
1648 let entry_count = 500;
1649
1650 {
1651 let config = WalConfig {
1652 wal_dir: temp_dir.clone(),
1653 max_file_size: 4096,
1654 max_wal_files: 1000,
1655 sync_on_write: false,
1656 };
1657
1658 let mut wal = Wal::with_config(config)?;
1659
1660 for i in 0..entry_count {
1661 wal.put(
1662 Key::from_str(&format!("large_key_{:05}", i)),
1663 CipherBlob::new(vec![(i % 256) as u8; 50]),
1664 )?;
1665 }
1666 wal.flush()?;
1667 }
1668
1669 let (entries, max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1671 assert_eq!(entries.len(), entry_count);
1672 assert_eq!(max_seq, (entry_count - 1) as u64);
1673 assert_eq!(stats.entries_recovered, entry_count as u64);
1674 assert_eq!(stats.entries_corrupted, 0);
1675 assert!(stats.bytes_recovered > 0);
1676
1677 for (i, entry) in entries.iter().enumerate() {
1679 assert_eq!(entry.sequence, i as u64);
1680 }
1681
1682 std::fs::remove_dir_all(&temp_dir).ok();
1683 Ok(())
1684 }
1685
1686 #[test]
1687 fn test_wal_truncate_keeps_current_file() -> Result<()> {
1688 use std::env;
1689
1690 let temp_dir = env::temp_dir().join("test_wal_truncate_keeps_current");
1691 std::fs::remove_dir_all(&temp_dir).ok();
1692 std::fs::create_dir_all(&temp_dir).ok();
1693
1694 let config = WalConfig {
1695 wal_dir: temp_dir.clone(),
1696 max_file_size: 512,
1697 max_wal_files: 100,
1698 sync_on_write: true,
1699 };
1700
1701 let mut wal = Wal::with_config(config)?;
1702
1703 for i in 0..30 {
1704 wal.put(
1705 Key::from_str(&format!("key_{}", i)),
1706 CipherBlob::new(vec![i as u8; 100]),
1707 )?;
1708 }
1709 wal.flush()?;
1710
1711 let current_file_num = wal.current_file_number();
1712
1713 wal.truncate_before(u64::MAX)?;
1715
1716 let current_path = Wal::wal_file_path(&temp_dir, current_file_num);
1718 assert!(
1719 current_path.exists(),
1720 "Current active WAL file should not be removed"
1721 );
1722
1723 std::fs::remove_dir_all(&temp_dir).ok();
1724 Ok(())
1725 }
1726
1727 #[test]
1728 fn test_wal_sequence_recovery_across_rotations() -> Result<()> {
1729 use std::env;
1730
1731 let temp_dir = env::temp_dir().join("test_wal_seq_recovery_rotation");
1732 std::fs::remove_dir_all(&temp_dir).ok();
1733 std::fs::create_dir_all(&temp_dir).ok();
1734
1735 let entries_written;
1737 {
1738 let config = WalConfig {
1739 wal_dir: temp_dir.clone(),
1740 max_file_size: 512,
1741 max_wal_files: 100,
1742 sync_on_write: true,
1743 };
1744
1745 let mut wal = Wal::with_config(config)?;
1746
1747 for i in 0..25 {
1748 wal.put(
1749 Key::from_str(&format!("rkey_{}", i)),
1750 CipherBlob::new(vec![i as u8; 80]),
1751 )?;
1752 }
1753 wal.flush()?;
1754 entries_written = wal.sequence();
1755 }
1756
1757 {
1759 let config = WalConfig {
1760 wal_dir: temp_dir.clone(),
1761 max_file_size: 512,
1762 max_wal_files: 100,
1763 sync_on_write: true,
1764 };
1765
1766 let wal = Wal::with_config(config)?;
1767 assert_eq!(
1768 wal.sequence(),
1769 entries_written,
1770 "Sequence should continue from where it left off"
1771 );
1772 }
1773
1774 std::fs::remove_dir_all(&temp_dir).ok();
1775 Ok(())
1776 }
1777
1778 #[test]
1779 fn test_wal_recovery_stats_with_corruption() -> Result<()> {
1780 use std::env;
1781 use std::io::Write as IoWrite;
1782
1783 let temp_dir = env::temp_dir().join("test_wal_recovery_stats_corrupt");
1784 std::fs::remove_dir_all(&temp_dir).ok();
1785 std::fs::create_dir_all(&temp_dir).ok();
1786
1787 let wal_file = temp_dir.join("wal_00000000.log");
1788
1789 {
1791 let config = WalConfig {
1792 wal_dir: temp_dir.clone(),
1793 sync_on_write: true,
1794 ..Default::default()
1795 };
1796
1797 let mut wal = Wal::with_config(config)?;
1798 wal.put(Key::from_str("s1"), CipherBlob::new(vec![10]))?;
1799 wal.put(Key::from_str("s2"), CipherBlob::new(vec![20]))?;
1800 wal.flush()?;
1801 }
1802
1803 {
1805 let mut file = OpenOptions::new()
1806 .append(true)
1807 .open(&wal_file)
1808 .map_err(|e| {
1809 AmateRSError::IoError(ErrorContext::new(format!(
1810 "Failed to open for corruption: {}",
1811 e
1812 )))
1813 })?;
1814 let fake_len = 30u32;
1816 file.write_all(&fake_len.to_le_bytes()).map_err(|e| {
1817 AmateRSError::IoError(ErrorContext::new(format!("write error: {}", e)))
1818 })?;
1819 file.write_all(&[0xDE; 30]).map_err(|e| {
1820 AmateRSError::IoError(ErrorContext::new(format!("write error: {}", e)))
1821 })?;
1822 file.flush().map_err(|e| {
1823 AmateRSError::IoError(ErrorContext::new(format!("flush error: {}", e)))
1824 })?;
1825 }
1826
1827 let (_entries, _max_seq, stats) = Wal::recover_with_stats(&temp_dir)?;
1828
1829 assert_eq!(stats.entries_recovered, 2);
1830 assert!(
1831 stats.entries_corrupted >= 1,
1832 "Should detect at least one corrupted entry"
1833 );
1834
1835 std::fs::remove_dir_all(&temp_dir).ok();
1836 Ok(())
1837 }
1838}