Skip to main content

laminar_storage/
wal.rs

1//! Write-Ahead Log for durability and exactly-once semantics.
2//!
3//! The WAL provides durability by persisting all state mutations before they
4//! are applied. This enables recovery after crashes and supports exactly-once
5//! processing semantics.
6//!
7//! ## Record Format
8//!
9//! Each WAL record is stored as:
10//! ```text
11//! +----------+----------+------------------+
12//! | Length   | CRC32C   | Entry Data       |
13//! | (4 bytes)| (4 bytes)| (Length bytes)   |
14//! +----------+----------+------------------+
15//! ```
16//!
17//! - **Length**: 4-byte little-endian u32, size of Entry Data
18//! - **CRC32C**: 4-byte little-endian u32, CRC32C checksum of Entry Data
19//! - **Entry Data**: rkyv-serialized `WalEntry`
20//!
21//! ## Durability
22//!
23//! Uses `fdatasync` (via `sync_data()`) instead of `fsync` for better performance.
24//! This syncs file data without updating metadata (atime, mtime), saving 50-100μs per sync.
25//!
26//! ## Torn Write Detection
27//!
28//! On recovery, the WAL reader detects partial writes:
29//! - Incomplete length prefix (< 4 bytes at EOF)
30//! - Incomplete CRC field (< 4 bytes after length)
31//! - Incomplete data (< length bytes after CRC)
32//! - CRC32 mismatch (data corruption)
33//!
34//! Use `repair()` to truncate the WAL to the last valid record.
35
36use std::fs::{File, OpenOptions};
37use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
38use std::path::{Path, PathBuf};
39use std::time::{Duration, Instant};
40
41use rkyv::{
42    api::high, rancor::Error as RkyvError, util::AlignedVec, Archive,
43    Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
44};
45
46// Module to contain types that use derive macros with generated code
47mod wal_types {
48    #![allow(missing_docs)] // Allow for derive-generated code
49
50    use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
51    #[allow(clippy::disallowed_types)] // cold path: WAL operations
52    use std::collections::HashMap;
53
54    /// WAL entry types representing different operations.
55    #[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
56    pub enum WalEntry {
57        /// Put a key-value pair.
58        Put {
59            /// The key to put.
60            key: Vec<u8>,
61            /// The value to associate with the key.
62            value: Vec<u8>,
63        },
64        /// Delete a key.
65        Delete {
66            /// The key to delete.
67            key: Vec<u8>,
68        },
69        /// Checkpoint marker.
70        Checkpoint {
71            /// Checkpoint identifier.
72            id: u64,
73        },
74        /// Commit offsets for exactly-once semantics.
75        Commit {
76            /// Map of topic/partition to offset.
77            offsets: HashMap<String, u64>,
78            /// Current watermark at commit time (for recovery).
79            watermark: Option<i64>,
80        },
81    }
82}
83
84pub use wal_types::WalEntry;
85
86/// Size of the record header (length + CRC32).
87const RECORD_HEADER_SIZE: u64 = 8;
88
89/// Maximum allowed WAL entry size (256 MiB).
90/// Entries larger than this are almost certainly corrupted lengths. This guards
91/// against OOM from corrupted WAL files where the length field is garbage.
92const MAX_WAL_ENTRY_SIZE: u64 = 256 * 1024 * 1024;
93
94/// WAL position for checkpointing.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
96#[rkyv(compare(PartialEq))]
97pub struct WalPosition {
98    /// Offset in the WAL file.
99    pub offset: u64,
100}
101
102/// Write-Ahead Log implementation.
103pub struct WriteAheadLog {
104    /// Buffered writer for efficient writes.
105    writer: BufWriter<File>,
106    /// Path to the current log file.
107    path: PathBuf,
108    /// Sync interval for group commit.
109    sync_interval: Duration,
110    /// Last sync time.
111    last_sync: Instant,
112    /// Current file position.
113    position: u64,
114    /// Sync on every write for stronger durability (at the cost of throughput).
115    sync_on_write: bool,
116    /// Pre-allocated write buffer reused across `append()` calls.
117    /// Grows to high-water mark and stays, eliminating per-append allocation.
118    write_buffer: Vec<u8>,
119    /// Reusable rkyv serialization buffer (avoids `AlignedVec` alloc per append).
120    serialize_buffer: AlignedVec,
121}
122
123/// Error type for WAL operations.
124#[derive(Debug, thiserror::Error)]
125pub enum WalError {
126    /// IO error during WAL operations.
127    #[error("IO error: {0}")]
128    Io(#[from] std::io::Error),
129
130    /// Serialization error when writing entries.
131    #[error("Serialization error: {0}")]
132    Serialization(String),
133
134    /// Deserialization error when reading entries.
135    #[error("Deserialization error: {0}")]
136    Deserialization(String),
137
138    /// Corrupted WAL entry detected (incomplete record).
139    #[error("Corrupted WAL entry at position {position}: {reason}")]
140    Corrupted {
141        /// Position where corruption was detected.
142        position: u64,
143        /// Reason for corruption.
144        reason: String,
145    },
146
147    /// CRC32 checksum mismatch.
148    #[error("CRC32 checksum mismatch at position {position}: expected {expected:#010x}, got {actual:#010x}")]
149    ChecksumMismatch {
150        /// Position of the corrupted record.
151        position: u64,
152        /// Expected CRC32 value.
153        expected: u32,
154        /// Actual CRC32 value.
155        actual: u32,
156    },
157
158    /// Torn write detected (partial record at end of WAL).
159    #[error("Torn write detected at position {position}: {reason}")]
160    TornWrite {
161        /// Position where torn write was detected.
162        position: u64,
163        /// Description of the torn write.
164        reason: String,
165    },
166}
167
168impl WriteAheadLog {
169    /// Create a new WAL instance.
170    ///
171    /// # Arguments
172    ///
173    /// * `path` - Path to the WAL file
174    /// * `sync_interval` - Interval for group commit
175    ///
176    /// # Errors
177    ///
178    /// Returns `WalError::Io` if the file cannot be created or opened.
179    pub fn new<P: AsRef<Path>>(path: P, sync_interval: Duration) -> Result<Self, WalError> {
180        let path = path.as_ref().to_path_buf();
181        let file = OpenOptions::new().create(true).append(true).open(&path)?;
182
183        let position = file.metadata()?.len();
184
185        Ok(Self {
186            writer: BufWriter::new(file),
187            path,
188            sync_interval,
189            last_sync: Instant::now(),
190            position,
191            sync_on_write: false,
192            write_buffer: Vec::with_capacity(4096),
193            serialize_buffer: AlignedVec::with_capacity(256),
194        })
195    }
196
197    /// Enable sync on every write for stronger durability.
198    ///
199    /// When enabled, every `append()` call triggers an immediate `sync()`,
200    /// providing per-write durability at the cost of throughput (~10x lower).
201    /// Without this, up to `sync_interval` of data may be lost on crash.
202    pub fn set_sync_on_write(&mut self, enabled: bool) {
203        self.sync_on_write = enabled;
204    }
205
206    /// Append an entry to the WAL.
207    ///
208    /// Record format: `[length: 4 bytes][crc32: 4 bytes][data: length bytes]`
209    ///
210    /// # Arguments
211    ///
212    /// * `entry` - The entry to append
213    ///
214    /// # Errors
215    ///
216    /// Returns `WalError::Serialization` if the entry cannot be serialized,
217    /// or `WalError::Io` if the write fails.
218    pub fn append(&mut self, entry: &WalEntry) -> Result<u64, WalError> {
219        let start_pos = self.position;
220
221        // Serialize into reusable buffer (avoids AlignedVec alloc per append)
222        self.serialize_buffer.clear();
223        let taken = std::mem::take(&mut self.serialize_buffer);
224        let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
225            .map_err(|e| WalError::Serialization(e.to_string()))?;
226
227        // Compute CRC32C checksum of the serialized data
228        let crc = crc32c::crc32c(&bytes);
229
230        // Write record: [length: 4 bytes][crc32: 4 bytes][data: length bytes]
231        if bytes.len() > u32::MAX as usize {
232            return Err(WalError::Serialization(format!(
233                "Entry too large: {} bytes (max {})",
234                bytes.len(),
235                u32::MAX
236            )));
237        }
238        #[allow(clippy::cast_possible_truncation)] // Validated < u32::MAX on line 215
239        let len = bytes.len() as u32;
240
241        // Coalesce header + data into self.write_buffer (reused across calls).
242        self.write_buffer.clear();
243        #[allow(clippy::cast_possible_truncation)] // RECORD_HEADER_SIZE is 8, always fits usize
244        self.write_buffer
245            .reserve(RECORD_HEADER_SIZE as usize + bytes.len());
246        self.write_buffer.extend_from_slice(&len.to_le_bytes());
247        self.write_buffer.extend_from_slice(&crc.to_le_bytes());
248        self.write_buffer.extend_from_slice(&bytes);
249
250        self.writer.write_all(&self.write_buffer)?;
251
252        let bytes_len = bytes.len() as u64;
253        self.position += RECORD_HEADER_SIZE + bytes_len;
254
255        // Restore serialize buffer for reuse
256        self.serialize_buffer = bytes;
257
258        // Check if we need to sync (group commit or sync-on-write mode)
259        if self.sync_on_write || self.last_sync.elapsed() >= self.sync_interval {
260            self.sync()?;
261        }
262
263        Ok(start_pos)
264    }
265
266    /// Force sync to disk using fdatasync.
267    ///
268    /// Uses `sync_data()` instead of `sync_all()` for better performance.
269    /// This syncs file data without updating metadata, saving 50-100μs per sync.
270    ///
271    /// # Errors
272    ///
273    /// Returns `WalError::Io` if the sync fails.
274    pub fn sync(&mut self) -> Result<(), WalError> {
275        self.writer.flush()?;
276        // Use sync_data() (fdatasync) instead of sync_all() (fsync)
277        // This avoids updating file metadata (atime, mtime), saving ~50-100μs per sync
278        self.writer.get_ref().sync_data()?;
279        self.last_sync = Instant::now();
280        Ok(())
281    }
282
283    /// Read entries from a specific position.
284    ///
285    /// # Arguments
286    ///
287    /// * `position` - Starting position to read from
288    ///
289    /// # Errors
290    ///
291    /// Returns `WalError::Io` if the file cannot be opened or seeked.
292    pub fn read_from(&self, position: u64) -> Result<WalReader, WalError> {
293        let file = File::open(&self.path)?;
294        let file_len = file.metadata()?.len();
295        let mut reader = BufReader::new(file);
296
297        // Seek to position
298        reader.seek(SeekFrom::Start(position))?;
299
300        Ok(WalReader {
301            reader,
302            position,
303            file_len,
304        })
305    }
306
307    /// Get the current position in the log.
308    #[must_use]
309    pub fn position(&self) -> u64 {
310        self.position
311    }
312
313    /// Get the path to the WAL file.
314    #[must_use]
315    pub fn path(&self) -> &Path {
316        &self.path
317    }
318
319    /// Truncate the log at the specified position.
320    ///
321    /// Used after successful checkpointing to remove old entries.
322    ///
323    /// # Errors
324    ///
325    /// Returns `WalError::Io` if the truncation or file operations fail.
326    pub fn truncate(&mut self, position: u64) -> Result<(), WalError> {
327        self.sync()?;
328
329        // Close current writer
330        let file = OpenOptions::new()
331            .write(true)
332            .truncate(false)
333            .open(&self.path)?;
334
335        // Truncate file and sync to make the truncation durable.
336        // Without this sync, a crash could leave the file at its old length.
337        file.set_len(position)?;
338        file.sync_all()?;
339
340        // Reopen for append
341        let file = OpenOptions::new().append(true).open(&self.path)?;
342
343        self.writer = BufWriter::new(file);
344        self.position = position;
345
346        Ok(())
347    }
348
349    /// Repair the WAL by truncating to the last valid record.
350    ///
351    /// This should be called during recovery to handle torn writes from crashes.
352    /// It reads through the WAL, validates each record, and truncates at the
353    /// first invalid record (torn write or corruption).
354    ///
355    /// # Returns
356    ///
357    /// Returns `Ok(valid_position)` where `valid_position` is the end of the
358    /// last valid record (and the new WAL length).
359    ///
360    /// # Errors
361    ///
362    /// Returns `WalError::Io` if file operations fail.
363    pub fn repair(&mut self) -> Result<u64, WalError> {
364        self.sync()?;
365
366        let file = File::open(&self.path)?;
367        let file_len = file.metadata()?.len();
368        let mut reader = BufReader::new(file);
369
370        let mut valid_position: u64 = 0;
371        let mut current_position: u64 = 0;
372
373        loop {
374            // Try to read a complete record
375            match Self::validate_record(&mut reader, current_position, file_len) {
376                Ok(record_len) => {
377                    current_position += record_len;
378                    valid_position = current_position;
379                }
380                Err(WalError::TornWrite { .. }) => {
381                    // Torn write detected - truncate here
382                    break;
383                }
384                Err(WalError::ChecksumMismatch { .. }) => {
385                    // Corruption detected - truncate here
386                    break;
387                }
388                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
389                    // Clean EOF - we're done
390                    break;
391                }
392                Err(e) => return Err(e),
393            }
394        }
395
396        // Truncate to last valid position if needed
397        if valid_position < file_len {
398            self.truncate(valid_position)?;
399        }
400
401        Ok(valid_position)
402    }
403
404    /// Validate a single record at the current position.
405    ///
406    /// Returns the total size of the record (header + data) if valid.
407    fn validate_record(
408        reader: &mut BufReader<File>,
409        position: u64,
410        file_len: u64,
411    ) -> Result<u64, WalError> {
412        let remaining = file_len.saturating_sub(position);
413
414        // Check if we have enough bytes for the header
415        if remaining < RECORD_HEADER_SIZE {
416            if remaining == 0 {
417                // Clean EOF
418                return Err(WalError::Io(std::io::Error::new(
419                    std::io::ErrorKind::UnexpectedEof,
420                    "end of file",
421                )));
422            }
423            return Err(WalError::TornWrite {
424                position,
425                reason: format!(
426                    "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
427                ),
428            });
429        }
430
431        // Read length
432        let mut len_bytes = [0u8; 4];
433        reader.read_exact(&mut len_bytes)?;
434        let len = u64::from(u32::from_le_bytes(len_bytes));
435
436        // Read expected CRC32
437        let mut crc_bytes = [0u8; 4];
438        reader.read_exact(&mut crc_bytes)?;
439        let expected_crc = u32::from_le_bytes(crc_bytes);
440
441        // Reject unreasonably large entries (corrupted length field)
442        if len > MAX_WAL_ENTRY_SIZE {
443            return Err(WalError::Corrupted {
444                position,
445                reason: format!(
446                    "[LDB-6006] WAL entry length {len} exceeds maximum \
447                     {MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
448                ),
449            });
450        }
451
452        // Check if we have enough bytes for the data
453        let data_remaining = remaining - RECORD_HEADER_SIZE;
454        if data_remaining < len {
455            return Err(WalError::TornWrite {
456                position,
457                reason: format!(
458                    "incomplete data: only {data_remaining} bytes remaining, need {len}"
459                ),
460            });
461        }
462
463        // Read data and validate CRC
464        #[allow(clippy::cast_possible_truncation)] // guarded by MAX_WAL_ENTRY_SIZE above
465        let mut data = vec![0u8; len as usize];
466        reader.read_exact(&mut data)?;
467
468        let actual_crc = crc32c::crc32c(&data);
469        if actual_crc != expected_crc {
470            return Err(WalError::ChecksumMismatch {
471                position,
472                expected: expected_crc,
473                actual: actual_crc,
474            });
475        }
476
477        Ok(RECORD_HEADER_SIZE + len)
478    }
479}
480
481/// WAL reader for recovery replay.
482pub struct WalReader {
483    reader: BufReader<File>,
484    position: u64,
485    file_len: u64,
486}
487
488impl WalReader {
489    /// Get the current position in the WAL.
490    #[must_use]
491    pub fn position(&self) -> u64 {
492        self.position
493    }
494}
495
496/// Result of reading a WAL record.
497#[derive(Debug)]
498pub enum WalReadResult {
499    /// Successfully read an entry.
500    Entry(WalEntry),
501    /// Reached end of valid records.
502    Eof,
503    /// Torn write detected (partial record at end).
504    TornWrite {
505        /// Position where torn write was detected.
506        position: u64,
507        /// Description of what was incomplete.
508        reason: String,
509    },
510    /// CRC32 checksum mismatch.
511    ChecksumMismatch {
512        /// Position of the corrupted record.
513        position: u64,
514        /// Expected CRC32C value from the record header.
515        expected: u32,
516        /// Actual CRC32C computed from the data.
517        actual: u32,
518    },
519    /// Corrupted entry (e.g. unreasonable length).
520    Corrupted {
521        /// Position of the corrupted record.
522        position: u64,
523        /// Description of the corruption.
524        reason: String,
525    },
526}
527
528impl WalReader {
529    /// Read the next entry, with detailed status.
530    ///
531    /// Unlike the Iterator implementation, this method distinguishes between
532    /// clean EOF, torn writes, and checksum errors.
533    ///
534    /// # Errors
535    ///
536    /// Returns `WalError::Io` if file reading fails, or `WalError::Deserialization`
537    /// if the entry data cannot be deserialized.
538    pub fn read_next(&mut self) -> Result<WalReadResult, WalError> {
539        let remaining = self.file_len.saturating_sub(self.position);
540
541        // Check for EOF
542        if remaining == 0 {
543            return Ok(WalReadResult::Eof);
544        }
545
546        // Check for incomplete header
547        if remaining < RECORD_HEADER_SIZE {
548            return Ok(WalReadResult::TornWrite {
549                position: self.position,
550                reason: format!(
551                    "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
552                ),
553            });
554        }
555
556        let record_start = self.position;
557
558        // Read length
559        let mut len_bytes = [0u8; 4];
560        self.reader.read_exact(&mut len_bytes)?;
561        let len = u64::from(u32::from_le_bytes(len_bytes));
562        self.position += 4;
563
564        // Read expected CRC32
565        let mut crc_bytes = [0u8; 4];
566        self.reader.read_exact(&mut crc_bytes)?;
567        let expected_crc = u32::from_le_bytes(crc_bytes);
568        self.position += 4;
569
570        // Reject unreasonably large entries (corrupted length field)
571        if len > MAX_WAL_ENTRY_SIZE {
572            return Ok(WalReadResult::Corrupted {
573                position: record_start,
574                reason: format!(
575                    "[LDB-6006] WAL entry length {len} exceeds maximum \
576                     {MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
577                ),
578            });
579        }
580
581        // Check for incomplete data
582        let data_remaining = self.file_len.saturating_sub(self.position);
583        if data_remaining < len {
584            return Ok(WalReadResult::TornWrite {
585                position: record_start,
586                reason: format!(
587                    "incomplete data: only {data_remaining} bytes remaining, need {len}"
588                ),
589            });
590        }
591
592        // Read data
593        #[allow(clippy::cast_possible_truncation)] // guarded by MAX_WAL_ENTRY_SIZE above
594        let mut data = vec![0u8; len as usize];
595        self.reader.read_exact(&mut data)?;
596        self.position += len;
597
598        // Validate CRC
599        let actual_crc = crc32c::crc32c(&data);
600        if actual_crc != expected_crc {
601            return Ok(WalReadResult::ChecksumMismatch {
602                position: record_start,
603                expected: expected_crc,
604                actual: actual_crc,
605            });
606        }
607
608        // Deserialize entry
609        match rkyv::from_bytes::<WalEntry, RkyvError>(&data) {
610            Ok(entry) => Ok(WalReadResult::Entry(entry)),
611            Err(e) => Err(WalError::Deserialization(e.to_string())),
612        }
613    }
614}
615
616impl Iterator for WalReader {
617    type Item = Result<WalEntry, WalError>;
618
619    fn next(&mut self) -> Option<Self::Item> {
620        match self.read_next() {
621            Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
622            Ok(WalReadResult::Eof) => None,
623            Ok(WalReadResult::TornWrite { position, reason }) => {
624                Some(Err(WalError::TornWrite { position, reason }))
625            }
626            Ok(WalReadResult::ChecksumMismatch {
627                position,
628                expected,
629                actual,
630            }) => Some(Err(WalError::ChecksumMismatch {
631                position,
632                expected,
633                actual,
634            })),
635            Ok(WalReadResult::Corrupted { position, reason }) => {
636                Some(Err(WalError::Corrupted { position, reason }))
637            }
638            Err(e) => Some(Err(e)),
639        }
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    #[allow(clippy::disallowed_types)] // cold path: WAL operations
646    use std::collections::HashMap;
647
648    use super::*;
649    use tempfile::NamedTempFile;
650
651    #[test]
652    fn test_wal_append_and_read() {
653        let temp_file = NamedTempFile::new().unwrap();
654        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
655
656        // Append entries
657        let pos1 = wal
658            .append(&WalEntry::Put {
659                key: b"key1".to_vec(),
660                value: b"value1".to_vec(),
661            })
662            .unwrap();
663
664        let _pos2 = wal
665            .append(&WalEntry::Delete {
666                key: b"key2".to_vec(),
667            })
668            .unwrap();
669
670        wal.sync().unwrap();
671
672        // Read entries
673        let reader = wal.read_from(pos1).unwrap();
674        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
675
676        assert_eq!(entries.len(), 2);
677
678        match &entries[0] {
679            WalEntry::Put { key, value } => {
680                assert_eq!(key, b"key1");
681                assert_eq!(value, b"value1");
682            }
683            _ => panic!("Expected Put entry"),
684        }
685
686        match &entries[1] {
687            WalEntry::Delete { key } => {
688                assert_eq!(key, b"key2");
689            }
690            _ => panic!("Expected Delete entry"),
691        }
692    }
693
694    #[test]
695    fn test_wal_checkpoint() {
696        let temp_file = NamedTempFile::new().unwrap();
697        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
698
699        // Append entries
700        wal.append(&WalEntry::Put {
701            key: b"key1".to_vec(),
702            value: b"value1".to_vec(),
703        })
704        .unwrap();
705
706        let checkpoint_pos = wal.append(&WalEntry::Checkpoint { id: 1 }).unwrap();
707
708        wal.append(&WalEntry::Put {
709            key: b"key2".to_vec(),
710            value: b"value2".to_vec(),
711        })
712        .unwrap();
713
714        wal.sync().unwrap();
715
716        // Read from checkpoint
717        let reader = wal.read_from(checkpoint_pos).unwrap();
718        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
719
720        assert_eq!(entries.len(), 2); // Checkpoint and Put
721
722        match &entries[0] {
723            WalEntry::Checkpoint { id } => {
724                assert_eq!(*id, 1);
725            }
726            _ => panic!("Expected Checkpoint entry"),
727        }
728    }
729
730    #[test]
731    fn test_wal_truncate() {
732        let temp_file = NamedTempFile::new().unwrap();
733        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
734
735        // Append entries
736        wal.append(&WalEntry::Put {
737            key: b"key1".to_vec(),
738            value: b"value1".to_vec(),
739        })
740        .unwrap();
741
742        let truncate_pos = wal.position();
743
744        wal.append(&WalEntry::Put {
745            key: b"key2".to_vec(),
746            value: b"value2".to_vec(),
747        })
748        .unwrap();
749
750        wal.sync().unwrap();
751
752        // Truncate at second entry
753        wal.truncate(truncate_pos).unwrap();
754
755        // Read all entries
756        let reader = wal.read_from(0).unwrap();
757        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
758
759        assert_eq!(entries.len(), 1); // Only first entry remains
760    }
761
762    #[test]
763    fn test_wal_commit_offsets() {
764        let temp_file = NamedTempFile::new().unwrap();
765        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
766
767        // Append commit entry with watermark
768        let mut offsets = HashMap::new();
769        offsets.insert("topic1".to_string(), 100);
770        offsets.insert("topic2".to_string(), 200);
771
772        wal.append(&WalEntry::Commit {
773            offsets: offsets.clone(),
774            watermark: Some(1000),
775        })
776        .unwrap();
777        wal.sync().unwrap();
778
779        // Read and verify
780        let reader = wal.read_from(0).unwrap();
781        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
782
783        assert_eq!(entries.len(), 1);
784
785        match &entries[0] {
786            WalEntry::Commit {
787                offsets: read_offsets,
788                watermark,
789            } => {
790                assert_eq!(read_offsets.get("topic1"), Some(&100));
791                assert_eq!(read_offsets.get("topic2"), Some(&200));
792                assert_eq!(*watermark, Some(1000));
793            }
794            _ => panic!("Expected Commit entry"),
795        }
796    }
797
798    #[test]
799    fn test_wal_crc32_validation() {
800        let temp_file = NamedTempFile::new().unwrap();
801        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
802
803        // Write a valid entry
804        wal.append(&WalEntry::Put {
805            key: b"key1".to_vec(),
806            value: b"value1".to_vec(),
807        })
808        .unwrap();
809        wal.sync().unwrap();
810
811        // Corrupt the data by modifying a byte in the middle
812        {
813            use std::io::Write;
814            let mut file = OpenOptions::new()
815                .write(true)
816                .open(temp_file.path())
817                .unwrap();
818            // Seek past header (8 bytes) and corrupt the data
819            file.seek(SeekFrom::Start(10)).unwrap();
820            file.write_all(&[0xFF]).unwrap();
821            file.sync_all().unwrap();
822        }
823
824        // Reading should detect CRC mismatch
825        let mut reader = wal.read_from(0).unwrap();
826        match reader.read_next().unwrap() {
827            WalReadResult::ChecksumMismatch {
828                position,
829                expected,
830                actual,
831            } => {
832                assert_eq!(position, 0);
833                assert_ne!(expected, actual);
834            }
835            other => panic!("Expected ChecksumMismatch, got {other:?}"),
836        }
837    }
838
839    #[test]
840    fn test_wal_torn_write_detection_incomplete_header() {
841        let temp_file = NamedTempFile::new().unwrap();
842        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
843
844        // Write a valid entry
845        wal.append(&WalEntry::Put {
846            key: b"key1".to_vec(),
847            value: b"value1".to_vec(),
848        })
849        .unwrap();
850        wal.sync().unwrap();
851
852        let valid_pos = wal.position();
853
854        // Simulate torn write: write only 3 bytes of next header (incomplete)
855        {
856            use std::io::Write;
857            let mut file = OpenOptions::new()
858                .append(true)
859                .open(temp_file.path())
860                .unwrap();
861            file.write_all(&[0x10, 0x00, 0x00]).unwrap(); // 3 bytes of length
862            file.sync_all().unwrap();
863        }
864
865        // Reading should get the first entry, then detect torn write
866        let mut reader = wal.read_from(0).unwrap();
867
868        // First entry should be valid
869        match reader.read_next().unwrap() {
870            WalReadResult::Entry(WalEntry::Put { key, .. }) => {
871                assert_eq!(key, b"key1");
872            }
873            other => panic!("Expected valid entry, got {other:?}"),
874        }
875
876        // Second read should detect torn write
877        match reader.read_next().unwrap() {
878            WalReadResult::TornWrite { position, .. } => {
879                assert_eq!(position, valid_pos);
880            }
881            other => panic!("Expected TornWrite, got {other:?}"),
882        }
883    }
884
885    #[test]
886    fn test_wal_torn_write_detection_incomplete_data() {
887        let temp_file = NamedTempFile::new().unwrap();
888        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
889
890        // Write a valid entry
891        wal.append(&WalEntry::Put {
892            key: b"key1".to_vec(),
893            value: b"value1".to_vec(),
894        })
895        .unwrap();
896        wal.sync().unwrap();
897
898        let valid_pos = wal.position();
899
900        // Simulate torn write: write header claiming 100 bytes but only provide 10
901        {
902            use std::io::Write;
903            let mut file = OpenOptions::new()
904                .append(true)
905                .open(temp_file.path())
906                .unwrap();
907            // Write length (100 bytes) + CRC (dummy) + only 10 bytes of data
908            let len: u32 = 100;
909            let crc: u32 = 0x1234_5678;
910            file.write_all(&len.to_le_bytes()).unwrap();
911            file.write_all(&crc.to_le_bytes()).unwrap();
912            file.write_all(&[0u8; 10]).unwrap(); // Only 10 bytes, not 100
913            file.sync_all().unwrap();
914        }
915
916        // Reading should get the first entry, then detect torn write
917        let mut reader = wal.read_from(0).unwrap();
918
919        // First entry should be valid
920        match reader.read_next().unwrap() {
921            WalReadResult::Entry(WalEntry::Put { key, .. }) => {
922                assert_eq!(key, b"key1");
923            }
924            other => panic!("Expected valid entry, got {other:?}"),
925        }
926
927        // Second read should detect torn write (incomplete data)
928        match reader.read_next().unwrap() {
929            WalReadResult::TornWrite { position, reason } => {
930                assert_eq!(position, valid_pos);
931                assert!(reason.contains("incomplete data"));
932            }
933            other => panic!("Expected TornWrite, got {other:?}"),
934        }
935    }
936
937    #[test]
938    fn test_wal_repair() {
939        let temp_file = NamedTempFile::new().unwrap();
940        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
941
942        // Write two valid entries
943        wal.append(&WalEntry::Put {
944            key: b"key1".to_vec(),
945            value: b"value1".to_vec(),
946        })
947        .unwrap();
948        wal.append(&WalEntry::Put {
949            key: b"key2".to_vec(),
950            value: b"value2".to_vec(),
951        })
952        .unwrap();
953        wal.sync().unwrap();
954
955        let valid_len = wal.position();
956
957        // Simulate torn write: append garbage
958        {
959            use std::io::Write;
960            let mut file = OpenOptions::new()
961                .append(true)
962                .open(temp_file.path())
963                .unwrap();
964            file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
965            file.sync_all().unwrap();
966        }
967
968        // Repair should truncate to last valid record
969        let repaired_len = wal.repair().unwrap();
970        assert_eq!(repaired_len, valid_len);
971
972        // Verify we can still read both valid entries
973        let reader = wal.read_from(0).unwrap();
974        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
975        assert_eq!(entries.len(), 2);
976    }
977
978    #[test]
979    fn test_wal_repair_with_crc_corruption() {
980        let temp_file = NamedTempFile::new().unwrap();
981        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
982
983        // Write one valid entry
984        wal.append(&WalEntry::Put {
985            key: b"key1".to_vec(),
986            value: b"value1".to_vec(),
987        })
988        .unwrap();
989        wal.sync().unwrap();
990
991        let first_entry_end = wal.position();
992
993        // Write another entry
994        wal.append(&WalEntry::Put {
995            key: b"key2".to_vec(),
996            value: b"value2".to_vec(),
997        })
998        .unwrap();
999        wal.sync().unwrap();
1000
1001        // Corrupt the CRC of the second entry
1002        {
1003            use std::io::Write;
1004            let mut file = OpenOptions::new()
1005                .write(true)
1006                .open(temp_file.path())
1007                .unwrap();
1008            // Seek to CRC of second entry (first_entry_end + 4 bytes for length)
1009            file.seek(SeekFrom::Start(first_entry_end + 4)).unwrap();
1010            file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
1011            file.sync_all().unwrap();
1012        }
1013
1014        // Repair should truncate at the corrupted entry
1015        let repaired_len = wal.repair().unwrap();
1016        assert_eq!(repaired_len, first_entry_end);
1017
1018        // Verify only first entry remains
1019        let reader = wal.read_from(0).unwrap();
1020        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
1021        assert_eq!(entries.len(), 1);
1022
1023        match &entries[0] {
1024            WalEntry::Put { key, value } => {
1025                assert_eq!(key, b"key1");
1026                assert_eq!(value, b"value1");
1027            }
1028            _ => panic!("Expected Put entry"),
1029        }
1030    }
1031
1032    #[test]
1033    fn test_wal_read_next_vs_iterator() {
1034        let temp_file = NamedTempFile::new().unwrap();
1035        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1036
1037        wal.append(&WalEntry::Put {
1038            key: b"key1".to_vec(),
1039            value: b"value1".to_vec(),
1040        })
1041        .unwrap();
1042        wal.sync().unwrap();
1043
1044        // Test read_next()
1045        let mut reader1 = wal.read_from(0).unwrap();
1046        match reader1.read_next().unwrap() {
1047            WalReadResult::Entry(WalEntry::Put { key, .. }) => {
1048                assert_eq!(key, b"key1");
1049            }
1050            other => panic!("Expected Entry, got {other:?}"),
1051        }
1052        match reader1.read_next().unwrap() {
1053            WalReadResult::Eof => {}
1054            other => panic!("Expected Eof, got {other:?}"),
1055        }
1056
1057        // Test Iterator
1058        let reader2 = wal.read_from(0).unwrap();
1059        let entries: Vec<_> = reader2.collect::<Result<Vec<_>, _>>().unwrap();
1060        assert_eq!(entries.len(), 1);
1061    }
1062
1063    #[test]
1064    fn test_wal_empty_file() {
1065        let temp_file = NamedTempFile::new().unwrap();
1066        let wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1067
1068        // Reading from empty WAL should return no entries
1069        let mut reader = wal.read_from(0).unwrap();
1070        match reader.read_next().unwrap() {
1071            WalReadResult::Eof => {}
1072            other => panic!("Expected Eof, got {other:?}"),
1073        }
1074    }
1075
1076    #[test]
1077    fn test_wal_watermark_in_commit() {
1078        let temp_file = NamedTempFile::new().unwrap();
1079        let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
1080
1081        // Commit without watermark
1082        wal.append(&WalEntry::Commit {
1083            offsets: HashMap::new(),
1084            watermark: None,
1085        })
1086        .unwrap();
1087
1088        // Commit with watermark
1089        wal.append(&WalEntry::Commit {
1090            offsets: HashMap::new(),
1091            watermark: Some(12345),
1092        })
1093        .unwrap();
1094        wal.sync().unwrap();
1095
1096        let reader = wal.read_from(0).unwrap();
1097        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
1098        assert_eq!(entries.len(), 2);
1099
1100        match &entries[0] {
1101            WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, None),
1102            _ => panic!("Expected Commit"),
1103        }
1104
1105        match &entries[1] {
1106            WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, Some(12345)),
1107            _ => panic!("Expected Commit"),
1108        }
1109    }
1110}