1use std::fs::{File, OpenOptions};
37use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
38use std::path::{Path, PathBuf};
39use std::time::{Duration, Instant};
40
41use rkyv::{
42 api::high, rancor::Error as RkyvError, util::AlignedVec, Archive,
43 Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
44};
45
46mod wal_types {
48 #![allow(missing_docs)] use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
51 #[allow(clippy::disallowed_types)] use std::collections::HashMap;
53
54 #[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
56 pub enum WalEntry {
57 Put {
59 key: Vec<u8>,
61 value: Vec<u8>,
63 },
64 Delete {
66 key: Vec<u8>,
68 },
69 Checkpoint {
71 id: u64,
73 },
74 Commit {
76 offsets: HashMap<String, u64>,
78 watermark: Option<i64>,
80 },
81 }
82}
83
84pub use wal_types::WalEntry;
85
86const RECORD_HEADER_SIZE: u64 = 8;
88
89const MAX_WAL_ENTRY_SIZE: u64 = 256 * 1024 * 1024;
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
96#[rkyv(compare(PartialEq))]
97pub struct WalPosition {
98 pub offset: u64,
100}
101
102pub struct WriteAheadLog {
104 writer: BufWriter<File>,
106 path: PathBuf,
108 sync_interval: Duration,
110 last_sync: Instant,
112 position: u64,
114 sync_on_write: bool,
116 write_buffer: Vec<u8>,
119 serialize_buffer: AlignedVec,
121}
122
123#[derive(Debug, thiserror::Error)]
125pub enum WalError {
126 #[error("IO error: {0}")]
128 Io(#[from] std::io::Error),
129
130 #[error("Serialization error: {0}")]
132 Serialization(String),
133
134 #[error("Deserialization error: {0}")]
136 Deserialization(String),
137
138 #[error("Corrupted WAL entry at position {position}: {reason}")]
140 Corrupted {
141 position: u64,
143 reason: String,
145 },
146
147 #[error("CRC32 checksum mismatch at position {position}: expected {expected:#010x}, got {actual:#010x}")]
149 ChecksumMismatch {
150 position: u64,
152 expected: u32,
154 actual: u32,
156 },
157
158 #[error("Torn write detected at position {position}: {reason}")]
160 TornWrite {
161 position: u64,
163 reason: String,
165 },
166}
167
168impl WriteAheadLog {
169 pub fn new<P: AsRef<Path>>(path: P, sync_interval: Duration) -> Result<Self, WalError> {
180 let path = path.as_ref().to_path_buf();
181 let file = OpenOptions::new().create(true).append(true).open(&path)?;
182
183 let position = file.metadata()?.len();
184
185 Ok(Self {
186 writer: BufWriter::new(file),
187 path,
188 sync_interval,
189 last_sync: Instant::now(),
190 position,
191 sync_on_write: false,
192 write_buffer: Vec::with_capacity(4096),
193 serialize_buffer: AlignedVec::with_capacity(256),
194 })
195 }
196
197 pub fn set_sync_on_write(&mut self, enabled: bool) {
203 self.sync_on_write = enabled;
204 }
205
206 pub fn append(&mut self, entry: &WalEntry) -> Result<u64, WalError> {
219 let start_pos = self.position;
220
221 self.serialize_buffer.clear();
223 let taken = std::mem::take(&mut self.serialize_buffer);
224 let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
225 .map_err(|e| WalError::Serialization(e.to_string()))?;
226
227 let crc = crc32c::crc32c(&bytes);
229
230 if bytes.len() > u32::MAX as usize {
232 return Err(WalError::Serialization(format!(
233 "Entry too large: {} bytes (max {})",
234 bytes.len(),
235 u32::MAX
236 )));
237 }
238 #[allow(clippy::cast_possible_truncation)] let len = bytes.len() as u32;
240
241 self.write_buffer.clear();
243 #[allow(clippy::cast_possible_truncation)] self.write_buffer
245 .reserve(RECORD_HEADER_SIZE as usize + bytes.len());
246 self.write_buffer.extend_from_slice(&len.to_le_bytes());
247 self.write_buffer.extend_from_slice(&crc.to_le_bytes());
248 self.write_buffer.extend_from_slice(&bytes);
249
250 self.writer.write_all(&self.write_buffer)?;
251
252 let bytes_len = bytes.len() as u64;
253 self.position += RECORD_HEADER_SIZE + bytes_len;
254
255 self.serialize_buffer = bytes;
257
258 if self.sync_on_write || self.last_sync.elapsed() >= self.sync_interval {
260 self.sync()?;
261 }
262
263 Ok(start_pos)
264 }
265
266 pub fn sync(&mut self) -> Result<(), WalError> {
275 self.writer.flush()?;
276 self.writer.get_ref().sync_data()?;
279 self.last_sync = Instant::now();
280 Ok(())
281 }
282
283 pub fn read_from(&self, position: u64) -> Result<WalReader, WalError> {
293 let file = File::open(&self.path)?;
294 let file_len = file.metadata()?.len();
295 let mut reader = BufReader::new(file);
296
297 reader.seek(SeekFrom::Start(position))?;
299
300 Ok(WalReader {
301 reader,
302 position,
303 file_len,
304 })
305 }
306
307 #[must_use]
309 pub fn position(&self) -> u64 {
310 self.position
311 }
312
313 #[must_use]
315 pub fn path(&self) -> &Path {
316 &self.path
317 }
318
319 pub fn truncate(&mut self, position: u64) -> Result<(), WalError> {
327 self.sync()?;
328
329 let file = OpenOptions::new()
331 .write(true)
332 .truncate(false)
333 .open(&self.path)?;
334
335 file.set_len(position)?;
338 file.sync_all()?;
339
340 let file = OpenOptions::new().append(true).open(&self.path)?;
342
343 self.writer = BufWriter::new(file);
344 self.position = position;
345
346 Ok(())
347 }
348
349 pub fn repair(&mut self) -> Result<u64, WalError> {
364 self.sync()?;
365
366 let file = File::open(&self.path)?;
367 let file_len = file.metadata()?.len();
368 let mut reader = BufReader::new(file);
369
370 let mut valid_position: u64 = 0;
371 let mut current_position: u64 = 0;
372
373 loop {
374 match Self::validate_record(&mut reader, current_position, file_len) {
376 Ok(record_len) => {
377 current_position += record_len;
378 valid_position = current_position;
379 }
380 Err(WalError::TornWrite { .. }) => {
381 break;
383 }
384 Err(WalError::ChecksumMismatch { .. }) => {
385 break;
387 }
388 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
389 break;
391 }
392 Err(e) => return Err(e),
393 }
394 }
395
396 if valid_position < file_len {
398 self.truncate(valid_position)?;
399 }
400
401 Ok(valid_position)
402 }
403
404 fn validate_record(
408 reader: &mut BufReader<File>,
409 position: u64,
410 file_len: u64,
411 ) -> Result<u64, WalError> {
412 let remaining = file_len.saturating_sub(position);
413
414 if remaining < RECORD_HEADER_SIZE {
416 if remaining == 0 {
417 return Err(WalError::Io(std::io::Error::new(
419 std::io::ErrorKind::UnexpectedEof,
420 "end of file",
421 )));
422 }
423 return Err(WalError::TornWrite {
424 position,
425 reason: format!(
426 "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
427 ),
428 });
429 }
430
431 let mut len_bytes = [0u8; 4];
433 reader.read_exact(&mut len_bytes)?;
434 let len = u64::from(u32::from_le_bytes(len_bytes));
435
436 let mut crc_bytes = [0u8; 4];
438 reader.read_exact(&mut crc_bytes)?;
439 let expected_crc = u32::from_le_bytes(crc_bytes);
440
441 if len > MAX_WAL_ENTRY_SIZE {
443 return Err(WalError::Corrupted {
444 position,
445 reason: format!(
446 "[LDB-6006] WAL entry length {len} exceeds maximum \
447 {MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
448 ),
449 });
450 }
451
452 let data_remaining = remaining - RECORD_HEADER_SIZE;
454 if data_remaining < len {
455 return Err(WalError::TornWrite {
456 position,
457 reason: format!(
458 "incomplete data: only {data_remaining} bytes remaining, need {len}"
459 ),
460 });
461 }
462
463 #[allow(clippy::cast_possible_truncation)] let mut data = vec![0u8; len as usize];
466 reader.read_exact(&mut data)?;
467
468 let actual_crc = crc32c::crc32c(&data);
469 if actual_crc != expected_crc {
470 return Err(WalError::ChecksumMismatch {
471 position,
472 expected: expected_crc,
473 actual: actual_crc,
474 });
475 }
476
477 Ok(RECORD_HEADER_SIZE + len)
478 }
479}
480
481pub struct WalReader {
483 reader: BufReader<File>,
484 position: u64,
485 file_len: u64,
486}
487
488impl WalReader {
489 #[must_use]
491 pub fn position(&self) -> u64 {
492 self.position
493 }
494}
495
496#[derive(Debug)]
498pub enum WalReadResult {
499 Entry(WalEntry),
501 Eof,
503 TornWrite {
505 position: u64,
507 reason: String,
509 },
510 ChecksumMismatch {
512 position: u64,
514 expected: u32,
516 actual: u32,
518 },
519 Corrupted {
521 position: u64,
523 reason: String,
525 },
526}
527
528impl WalReader {
529 pub fn read_next(&mut self) -> Result<WalReadResult, WalError> {
539 let remaining = self.file_len.saturating_sub(self.position);
540
541 if remaining == 0 {
543 return Ok(WalReadResult::Eof);
544 }
545
546 if remaining < RECORD_HEADER_SIZE {
548 return Ok(WalReadResult::TornWrite {
549 position: self.position,
550 reason: format!(
551 "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
552 ),
553 });
554 }
555
556 let record_start = self.position;
557
558 let mut len_bytes = [0u8; 4];
560 self.reader.read_exact(&mut len_bytes)?;
561 let len = u64::from(u32::from_le_bytes(len_bytes));
562 self.position += 4;
563
564 let mut crc_bytes = [0u8; 4];
566 self.reader.read_exact(&mut crc_bytes)?;
567 let expected_crc = u32::from_le_bytes(crc_bytes);
568 self.position += 4;
569
570 if len > MAX_WAL_ENTRY_SIZE {
572 return Ok(WalReadResult::Corrupted {
573 position: record_start,
574 reason: format!(
575 "[LDB-6006] WAL entry length {len} exceeds maximum \
576 {MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
577 ),
578 });
579 }
580
581 let data_remaining = self.file_len.saturating_sub(self.position);
583 if data_remaining < len {
584 return Ok(WalReadResult::TornWrite {
585 position: record_start,
586 reason: format!(
587 "incomplete data: only {data_remaining} bytes remaining, need {len}"
588 ),
589 });
590 }
591
592 #[allow(clippy::cast_possible_truncation)] let mut data = vec![0u8; len as usize];
595 self.reader.read_exact(&mut data)?;
596 self.position += len;
597
598 let actual_crc = crc32c::crc32c(&data);
600 if actual_crc != expected_crc {
601 return Ok(WalReadResult::ChecksumMismatch {
602 position: record_start,
603 expected: expected_crc,
604 actual: actual_crc,
605 });
606 }
607
608 match rkyv::from_bytes::<WalEntry, RkyvError>(&data) {
610 Ok(entry) => Ok(WalReadResult::Entry(entry)),
611 Err(e) => Err(WalError::Deserialization(e.to_string())),
612 }
613 }
614}
615
616impl Iterator for WalReader {
617 type Item = Result<WalEntry, WalError>;
618
619 fn next(&mut self) -> Option<Self::Item> {
620 match self.read_next() {
621 Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
622 Ok(WalReadResult::Eof) => None,
623 Ok(WalReadResult::TornWrite { position, reason }) => {
624 Some(Err(WalError::TornWrite { position, reason }))
625 }
626 Ok(WalReadResult::ChecksumMismatch {
627 position,
628 expected,
629 actual,
630 }) => Some(Err(WalError::ChecksumMismatch {
631 position,
632 expected,
633 actual,
634 })),
635 Ok(WalReadResult::Corrupted { position, reason }) => {
636 Some(Err(WalError::Corrupted { position, reason }))
637 }
638 Err(e) => Some(Err(e)),
639 }
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 #[allow(clippy::disallowed_types)] use std::collections::HashMap;
647
648 use super::*;
649 use tempfile::NamedTempFile;
650
651 #[test]
652 fn test_wal_append_and_read() {
653 let temp_file = NamedTempFile::new().unwrap();
654 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
655
656 let pos1 = wal
658 .append(&WalEntry::Put {
659 key: b"key1".to_vec(),
660 value: b"value1".to_vec(),
661 })
662 .unwrap();
663
664 let _pos2 = wal
665 .append(&WalEntry::Delete {
666 key: b"key2".to_vec(),
667 })
668 .unwrap();
669
670 wal.sync().unwrap();
671
672 let reader = wal.read_from(pos1).unwrap();
674 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
675
676 assert_eq!(entries.len(), 2);
677
678 match &entries[0] {
679 WalEntry::Put { key, value } => {
680 assert_eq!(key, b"key1");
681 assert_eq!(value, b"value1");
682 }
683 _ => panic!("Expected Put entry"),
684 }
685
686 match &entries[1] {
687 WalEntry::Delete { key } => {
688 assert_eq!(key, b"key2");
689 }
690 _ => panic!("Expected Delete entry"),
691 }
692 }
693
694 #[test]
695 fn test_wal_checkpoint() {
696 let temp_file = NamedTempFile::new().unwrap();
697 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
698
699 wal.append(&WalEntry::Put {
701 key: b"key1".to_vec(),
702 value: b"value1".to_vec(),
703 })
704 .unwrap();
705
706 let checkpoint_pos = wal.append(&WalEntry::Checkpoint { id: 1 }).unwrap();
707
708 wal.append(&WalEntry::Put {
709 key: b"key2".to_vec(),
710 value: b"value2".to_vec(),
711 })
712 .unwrap();
713
714 wal.sync().unwrap();
715
716 let reader = wal.read_from(checkpoint_pos).unwrap();
718 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
719
720 assert_eq!(entries.len(), 2); match &entries[0] {
723 WalEntry::Checkpoint { id } => {
724 assert_eq!(*id, 1);
725 }
726 _ => panic!("Expected Checkpoint entry"),
727 }
728 }
729
730 #[test]
731 fn test_wal_truncate() {
732 let temp_file = NamedTempFile::new().unwrap();
733 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
734
735 wal.append(&WalEntry::Put {
737 key: b"key1".to_vec(),
738 value: b"value1".to_vec(),
739 })
740 .unwrap();
741
742 let truncate_pos = wal.position();
743
744 wal.append(&WalEntry::Put {
745 key: b"key2".to_vec(),
746 value: b"value2".to_vec(),
747 })
748 .unwrap();
749
750 wal.sync().unwrap();
751
752 wal.truncate(truncate_pos).unwrap();
754
755 let reader = wal.read_from(0).unwrap();
757 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
758
759 assert_eq!(entries.len(), 1); }
761
762 #[test]
763 fn test_wal_commit_offsets() {
764 let temp_file = NamedTempFile::new().unwrap();
765 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
766
767 let mut offsets = HashMap::new();
769 offsets.insert("topic1".to_string(), 100);
770 offsets.insert("topic2".to_string(), 200);
771
772 wal.append(&WalEntry::Commit {
773 offsets: offsets.clone(),
774 watermark: Some(1000),
775 })
776 .unwrap();
777 wal.sync().unwrap();
778
779 let reader = wal.read_from(0).unwrap();
781 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
782
783 assert_eq!(entries.len(), 1);
784
785 match &entries[0] {
786 WalEntry::Commit {
787 offsets: read_offsets,
788 watermark,
789 } => {
790 assert_eq!(read_offsets.get("topic1"), Some(&100));
791 assert_eq!(read_offsets.get("topic2"), Some(&200));
792 assert_eq!(*watermark, Some(1000));
793 }
794 _ => panic!("Expected Commit entry"),
795 }
796 }
797
798 #[test]
799 fn test_wal_crc32_validation() {
800 let temp_file = NamedTempFile::new().unwrap();
801 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
802
803 wal.append(&WalEntry::Put {
805 key: b"key1".to_vec(),
806 value: b"value1".to_vec(),
807 })
808 .unwrap();
809 wal.sync().unwrap();
810
811 {
813 use std::io::Write;
814 let mut file = OpenOptions::new()
815 .write(true)
816 .open(temp_file.path())
817 .unwrap();
818 file.seek(SeekFrom::Start(10)).unwrap();
820 file.write_all(&[0xFF]).unwrap();
821 file.sync_all().unwrap();
822 }
823
824 let mut reader = wal.read_from(0).unwrap();
826 match reader.read_next().unwrap() {
827 WalReadResult::ChecksumMismatch {
828 position,
829 expected,
830 actual,
831 } => {
832 assert_eq!(position, 0);
833 assert_ne!(expected, actual);
834 }
835 other => panic!("Expected ChecksumMismatch, got {other:?}"),
836 }
837 }
838
839 #[test]
840 fn test_wal_torn_write_detection_incomplete_header() {
841 let temp_file = NamedTempFile::new().unwrap();
842 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
843
844 wal.append(&WalEntry::Put {
846 key: b"key1".to_vec(),
847 value: b"value1".to_vec(),
848 })
849 .unwrap();
850 wal.sync().unwrap();
851
852 let valid_pos = wal.position();
853
854 {
856 use std::io::Write;
857 let mut file = OpenOptions::new()
858 .append(true)
859 .open(temp_file.path())
860 .unwrap();
861 file.write_all(&[0x10, 0x00, 0x00]).unwrap(); file.sync_all().unwrap();
863 }
864
865 let mut reader = wal.read_from(0).unwrap();
867
868 match reader.read_next().unwrap() {
870 WalReadResult::Entry(WalEntry::Put { key, .. }) => {
871 assert_eq!(key, b"key1");
872 }
873 other => panic!("Expected valid entry, got {other:?}"),
874 }
875
876 match reader.read_next().unwrap() {
878 WalReadResult::TornWrite { position, .. } => {
879 assert_eq!(position, valid_pos);
880 }
881 other => panic!("Expected TornWrite, got {other:?}"),
882 }
883 }
884
885 #[test]
886 fn test_wal_torn_write_detection_incomplete_data() {
887 let temp_file = NamedTempFile::new().unwrap();
888 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
889
890 wal.append(&WalEntry::Put {
892 key: b"key1".to_vec(),
893 value: b"value1".to_vec(),
894 })
895 .unwrap();
896 wal.sync().unwrap();
897
898 let valid_pos = wal.position();
899
900 {
902 use std::io::Write;
903 let mut file = OpenOptions::new()
904 .append(true)
905 .open(temp_file.path())
906 .unwrap();
907 let len: u32 = 100;
909 let crc: u32 = 0x1234_5678;
910 file.write_all(&len.to_le_bytes()).unwrap();
911 file.write_all(&crc.to_le_bytes()).unwrap();
912 file.write_all(&[0u8; 10]).unwrap(); file.sync_all().unwrap();
914 }
915
916 let mut reader = wal.read_from(0).unwrap();
918
919 match reader.read_next().unwrap() {
921 WalReadResult::Entry(WalEntry::Put { key, .. }) => {
922 assert_eq!(key, b"key1");
923 }
924 other => panic!("Expected valid entry, got {other:?}"),
925 }
926
927 match reader.read_next().unwrap() {
929 WalReadResult::TornWrite { position, reason } => {
930 assert_eq!(position, valid_pos);
931 assert!(reason.contains("incomplete data"));
932 }
933 other => panic!("Expected TornWrite, got {other:?}"),
934 }
935 }
936
937 #[test]
938 fn test_wal_repair() {
939 let temp_file = NamedTempFile::new().unwrap();
940 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
941
942 wal.append(&WalEntry::Put {
944 key: b"key1".to_vec(),
945 value: b"value1".to_vec(),
946 })
947 .unwrap();
948 wal.append(&WalEntry::Put {
949 key: b"key2".to_vec(),
950 value: b"value2".to_vec(),
951 })
952 .unwrap();
953 wal.sync().unwrap();
954
955 let valid_len = wal.position();
956
957 {
959 use std::io::Write;
960 let mut file = OpenOptions::new()
961 .append(true)
962 .open(temp_file.path())
963 .unwrap();
964 file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
965 file.sync_all().unwrap();
966 }
967
968 let repaired_len = wal.repair().unwrap();
970 assert_eq!(repaired_len, valid_len);
971
972 let reader = wal.read_from(0).unwrap();
974 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
975 assert_eq!(entries.len(), 2);
976 }
977
978 #[test]
979 fn test_wal_repair_with_crc_corruption() {
980 let temp_file = NamedTempFile::new().unwrap();
981 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
982
983 wal.append(&WalEntry::Put {
985 key: b"key1".to_vec(),
986 value: b"value1".to_vec(),
987 })
988 .unwrap();
989 wal.sync().unwrap();
990
991 let first_entry_end = wal.position();
992
993 wal.append(&WalEntry::Put {
995 key: b"key2".to_vec(),
996 value: b"value2".to_vec(),
997 })
998 .unwrap();
999 wal.sync().unwrap();
1000
1001 {
1003 use std::io::Write;
1004 let mut file = OpenOptions::new()
1005 .write(true)
1006 .open(temp_file.path())
1007 .unwrap();
1008 file.seek(SeekFrom::Start(first_entry_end + 4)).unwrap();
1010 file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
1011 file.sync_all().unwrap();
1012 }
1013
1014 let repaired_len = wal.repair().unwrap();
1016 assert_eq!(repaired_len, first_entry_end);
1017
1018 let reader = wal.read_from(0).unwrap();
1020 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
1021 assert_eq!(entries.len(), 1);
1022
1023 match &entries[0] {
1024 WalEntry::Put { key, value } => {
1025 assert_eq!(key, b"key1");
1026 assert_eq!(value, b"value1");
1027 }
1028 _ => panic!("Expected Put entry"),
1029 }
1030 }
1031
1032 #[test]
1033 fn test_wal_read_next_vs_iterator() {
1034 let temp_file = NamedTempFile::new().unwrap();
1035 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1036
1037 wal.append(&WalEntry::Put {
1038 key: b"key1".to_vec(),
1039 value: b"value1".to_vec(),
1040 })
1041 .unwrap();
1042 wal.sync().unwrap();
1043
1044 let mut reader1 = wal.read_from(0).unwrap();
1046 match reader1.read_next().unwrap() {
1047 WalReadResult::Entry(WalEntry::Put { key, .. }) => {
1048 assert_eq!(key, b"key1");
1049 }
1050 other => panic!("Expected Entry, got {other:?}"),
1051 }
1052 match reader1.read_next().unwrap() {
1053 WalReadResult::Eof => {}
1054 other => panic!("Expected Eof, got {other:?}"),
1055 }
1056
1057 let reader2 = wal.read_from(0).unwrap();
1059 let entries: Vec<_> = reader2.collect::<Result<Vec<_>, _>>().unwrap();
1060 assert_eq!(entries.len(), 1);
1061 }
1062
1063 #[test]
1064 fn test_wal_empty_file() {
1065 let temp_file = NamedTempFile::new().unwrap();
1066 let wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1067
1068 let mut reader = wal.read_from(0).unwrap();
1070 match reader.read_next().unwrap() {
1071 WalReadResult::Eof => {}
1072 other => panic!("Expected Eof, got {other:?}"),
1073 }
1074 }
1075
1076 #[test]
1077 fn test_wal_watermark_in_commit() {
1078 let temp_file = NamedTempFile::new().unwrap();
1079 let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1080
1081 wal.append(&WalEntry::Commit {
1083 offsets: HashMap::new(),
1084 watermark: None,
1085 })
1086 .unwrap();
1087
1088 wal.append(&WalEntry::Commit {
1090 offsets: HashMap::new(),
1091 watermark: Some(12345),
1092 })
1093 .unwrap();
1094 wal.sync().unwrap();
1095
1096 let reader = wal.read_from(0).unwrap();
1097 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
1098 assert_eq!(entries.len(), 2);
1099
1100 match &entries[0] {
1101 WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, None),
1102 _ => panic!("Expected Commit"),
1103 }
1104
1105 match &entries[1] {
1106 WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, Some(12345)),
1107 _ => panic!("Expected Commit"),
1108 }
1109 }
1110}