Skip to main content

actionqueue_storage/wal/
fs_writer.rs

1//! WAL file system writer.
2//!
3//! This module provides a file system backed WAL writer that appends events
4//! to a file using the encoded binary format from the codec module.
5//!
6//! The writer maintains an open file handle and appends encoded events
7//! sequentially. Flush operations ensure data is persisted to disk.
8
9use std::fs::{File, OpenOptions};
10use std::io::{Read, Seek, SeekFrom, Write};
11
12use crate::wal::codec::encode;
13use crate::wal::event::WalEvent;
14use crate::wal::repair::RepairPolicy;
15use crate::wal::tail_validation::WalCorruption;
16use crate::wal::writer::{WalWriter, WalWriterError};
17
18/// Internal result type for streaming WAL record reads during bootstrap.
19enum StreamingReadResult {
20    /// A complete, valid record was read with the given sequence number.
21    Record { sequence: u64 },
22    /// Clean end of file (no more data).
23    Eof,
24    /// Corruption detected at the given offset.
25    Corruption(WalCorruption),
26    /// An I/O error occurred.
27    IoError(String),
28}
29
30/// Errors that can occur when creating a [`WalFsWriter`].
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum WalFsWriterInitError {
33    /// I/O error when opening or bootstrapping the WAL file.
34    IoError(String),
35    /// Strict corruption detected while validating existing WAL bytes.
36    Corruption(WalCorruption),
37}
38
39impl std::fmt::Display for WalFsWriterInitError {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            WalFsWriterInitError::IoError(e) => {
43                write!(f, "I/O error when opening or bootstrapping WAL file: {e}")
44            }
45            WalFsWriterInitError::Corruption(details) => {
46                write!(f, "strict WAL tail validation failed during bootstrap: {details}")
47            }
48        }
49    }
50}
51
52impl std::error::Error for WalFsWriterInitError {}
53
54impl std::convert::From<std::io::Error> for WalFsWriterInitError {
55    fn from(err: std::io::Error) -> Self {
56        WalFsWriterInitError::IoError(err.to_string())
57    }
58}
59
60impl std::convert::From<WalCorruption> for WalFsWriterInitError {
61    fn from(err: WalCorruption) -> Self {
62        WalFsWriterInitError::Corruption(err)
63    }
64}
65
66/// A file system backed WAL writer.
67///
68/// The writer opens a file in read-write mode and manually seeks to end
69/// before each write. On write failure, it truncates back to the pre-write
70/// position to ensure atomicity. If truncation itself fails, the writer
71/// becomes permanently poisoned and rejects all subsequent appends.
72pub struct WalFsWriter {
73    file: File,
74    current_sequence: u64,
75    is_closed: bool,
76    poisoned: bool,
77}
78
79impl WalFsWriter {
80    /// Creates a new WAL writer at the given path.
81    ///
82    /// If the file does not exist, it will be created. If it exists, events
83    /// will be appended to the end of the file.
84    ///
85    /// During initialization, the writer strictly validates all existing framed
86    /// records from the start of the WAL. Any trailing corruption (partial
87    /// header, partial payload, unsupported version, or decode failure)
88    /// hard-fails bootstrap with typed corruption diagnostics.
89    ///
90    /// # Arguments
91    ///
92    /// * `path` - The filesystem path where the WAL file should be located
93    ///
94    /// # Errors
95    ///
96    /// Returns [`WalFsWriterInitError::IoError`] if the WAL file cannot be
97    /// opened or read during bootstrap.
98    ///
99    /// Returns [`WalFsWriterInitError::Corruption`] if strict WAL tail
100    /// validation detects corruption in existing bytes.
101    pub fn new(path: std::path::PathBuf) -> Result<Self, WalFsWriterInitError> {
102        Self::new_with_repair(path, RepairPolicy::Strict)
103    }
104
105    /// Creates a new WAL writer with the specified repair policy.
106    ///
107    /// Under [`RepairPolicy::Strict`], behaves identically to [`new`](Self::new).
108    ///
109    /// Under [`RepairPolicy::TruncatePartial`], if only trailing bytes are
110    /// corrupt (incomplete record at the end), they are truncated and the
111    /// writer opens normally from the last valid record. Mid-stream
112    /// corruption (corrupt record followed by valid records) still hard-fails.
113    pub fn new_with_repair(
114        path: std::path::PathBuf,
115        policy: RepairPolicy,
116    ) -> Result<Self, WalFsWriterInitError> {
117        match policy {
118            RepairPolicy::Strict => {
119                let mut file = OpenOptions::new()
120                    .create(true)
121                    .truncate(false)
122                    .read(true)
123                    .write(true)
124                    .open(&path)?;
125                let current_sequence = Self::load_current_sequence_strict(&file)?;
126                file.seek(SeekFrom::End(0))?;
127                Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
128            }
129            RepairPolicy::TruncatePartial => {
130                let file = OpenOptions::new()
131                    .create(true)
132                    .truncate(false)
133                    .read(true)
134                    .write(true)
135                    .open(&path)?;
136                let (current_sequence, needs_truncation) =
137                    Self::load_current_sequence_lenient(&file)?;
138
139                if let Some(valid_end_offset) = needs_truncation {
140                    drop(file);
141                    crate::wal::repair::truncate_to_last_valid(&path, valid_end_offset)
142                        .map_err(|e| WalFsWriterInitError::IoError(e.to_string()))?;
143                    let mut file = OpenOptions::new()
144                        .create(true)
145                        .truncate(false)
146                        .read(true)
147                        .write(true)
148                        .open(&path)?;
149                    file.seek(SeekFrom::End(0))?;
150                    Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
151                } else {
152                    let mut file = file;
153                    file.seek(SeekFrom::End(0))?;
154                    Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
155                }
156            }
157        }
158    }
159
160    /// Loads the current sequence number from the file using streaming
161    /// record-by-record validation. Any corruption returns a typed bootstrap error.
162    fn load_current_sequence_strict(file: &File) -> Result<u64, WalFsWriterInitError> {
163        let mut file_ref = file.try_clone()?;
164        file_ref.seek(SeekFrom::Start(0))?;
165
166        let mut last_sequence = 0u64;
167        loop {
168            match Self::read_next_record_sequence(&mut file_ref) {
169                StreamingReadResult::Record { sequence } => last_sequence = sequence,
170                StreamingReadResult::Eof => return Ok(last_sequence),
171                StreamingReadResult::Corruption(corruption) => {
172                    return Err(WalFsWriterInitError::Corruption(corruption));
173                }
174                StreamingReadResult::IoError(e) => {
175                    return Err(WalFsWriterInitError::IoError(e));
176                }
177            }
178        }
179    }
180
181    /// Loads sequence via lenient streaming validation, returning the valid end
182    /// offset if truncation is needed.
183    fn load_current_sequence_lenient(
184        file: &File,
185    ) -> Result<(u64, Option<u64>), WalFsWriterInitError> {
186        let mut file_ref = file.try_clone()?;
187        file_ref.seek(SeekFrom::Start(0))?;
188
189        let mut last_sequence = 0u64;
190        loop {
191            let record_start = file_ref
192                .stream_position()
193                .map_err(|e| WalFsWriterInitError::IoError(e.to_string()))?;
194            match Self::read_next_record_sequence(&mut file_ref) {
195                StreamingReadResult::Record { sequence } => {
196                    last_sequence = sequence;
197                }
198                StreamingReadResult::Eof => return Ok((last_sequence, None)),
199                StreamingReadResult::Corruption(_) => {
200                    // Lenient: trailing corruption is truncatable
201                    return Ok((last_sequence, Some(record_start)));
202                }
203                StreamingReadResult::IoError(e) => {
204                    return Err(WalFsWriterInitError::IoError(e));
205                }
206            }
207        }
208    }
209
210    /// Reads a single WAL record header+payload, returning the sequence number.
211    /// Uses two-phase read: header first, then payload.
212    fn read_next_record_sequence(file: &mut File) -> StreamingReadResult {
213        use crate::wal::codec::{HEADER_LEN, VERSION};
214        use crate::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
215
216        let record_start = match file.stream_position() {
217            Ok(pos) => pos,
218            Err(e) => return StreamingReadResult::IoError(e.to_string()),
219        };
220
221        // Phase 1: Read 12-byte header
222        let mut header = [0u8; HEADER_LEN];
223        let mut total = 0;
224        while total < HEADER_LEN {
225            match file.read(&mut header[total..]) {
226                Ok(0) => {
227                    return if total == 0 {
228                        StreamingReadResult::Eof
229                    } else {
230                        StreamingReadResult::Corruption(WalCorruption {
231                            offset: record_start,
232                            reason: WalCorruptionReasonCode::IncompleteHeader,
233                        })
234                    };
235                }
236                Ok(n) => total += n,
237                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
238                Err(e) => return StreamingReadResult::IoError(e.to_string()),
239            }
240        }
241
242        let version = u32::from_le_bytes(header[0..4].try_into().unwrap());
243        let payload_len = u32::from_le_bytes(header[4..8].try_into().unwrap());
244
245        if version != VERSION {
246            return StreamingReadResult::Corruption(WalCorruption {
247                offset: record_start,
248                reason: WalCorruptionReasonCode::UnsupportedVersion,
249            });
250        }
251
252        let payload_len_usize = payload_len as usize;
253
254        // Guard against unreasonable allocation from corrupted length field
255        const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; // 256 MiB
256        if payload_len_usize > MAX_REASONABLE_PAYLOAD {
257            return StreamingReadResult::Corruption(WalCorruption {
258                offset: record_start,
259                reason: WalCorruptionReasonCode::DecodeFailure,
260            });
261        }
262
263        // Phase 2: Read payload bytes
264        let mut payload = vec![0u8; payload_len_usize];
265        match file.read_exact(&mut payload) {
266            Ok(()) => {}
267            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
268                return StreamingReadResult::Corruption(WalCorruption {
269                    offset: record_start,
270                    reason: WalCorruptionReasonCode::IncompletePayload,
271                });
272            }
273            Err(e) => return StreamingReadResult::IoError(e.to_string()),
274        }
275
276        // Assemble full record and decode to extract sequence
277        let mut record = Vec::with_capacity(HEADER_LEN + payload_len_usize);
278        record.extend_from_slice(&header);
279        record.extend_from_slice(&payload);
280
281        match crate::wal::codec::decode(&record) {
282            Ok(event) => StreamingReadResult::Record { sequence: event.sequence() },
283            Err(crate::wal::codec::DecodeError::CrcMismatch { .. }) => {
284                StreamingReadResult::Corruption(WalCorruption {
285                    offset: record_start,
286                    reason: WalCorruptionReasonCode::CrcMismatch,
287                })
288            }
289            Err(_) => StreamingReadResult::Corruption(WalCorruption {
290                offset: record_start,
291                reason: WalCorruptionReasonCode::DecodeFailure,
292            }),
293        }
294    }
295
296    /// Returns the highest sequence number observed in the WAL (0 for empty WAL).
297    pub fn current_sequence(&self) -> u64 {
298        self.current_sequence
299    }
300}
301
302impl WalWriter for WalFsWriter {
303    /// Append an event to the WAL.
304    ///
305    /// The event is encoded using the codec's `encode` function and
306    /// appended to the underlying file. The current sequence tracker
307    /// is updated to the event's sequence number if it's higher.
308    ///
309    /// # Arguments
310    ///
311    /// * `event` - The WAL event to append
312    ///
313    /// # Errors
314    ///
315    /// Returns `WalWriterError::Closed` if the writer has been closed.
316    /// Returns `WalWriterError::IoError` if the write operation fails.
317    fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
318        if self.is_closed {
319            return Err(WalWriterError::Closed);
320        }
321        if self.poisoned {
322            return Err(WalWriterError::Poisoned);
323        }
324
325        // Enforce strict sequence monotonicity
326        if event.sequence() <= self.current_sequence {
327            return Err(WalWriterError::SequenceViolation {
328                expected: self.current_sequence.saturating_add(1),
329                provided: event.sequence(),
330            });
331        }
332
333        // Encode the event to bytes
334        let bytes = encode(event).map_err(|e| WalWriterError::EncodeError(e.to_string()))?;
335
336        // Record pre-write position for truncation on failure
337        let pre_write_pos =
338            self.file.stream_position().map_err(|e| WalWriterError::IoError(e.to_string()))?;
339
340        // Write the encoded bytes to the file
341        if let Err(write_err) = self.file.write_all(&bytes) {
342            // Attempt to truncate back to pre-write position
343            let truncate_result = self.file.set_len(pre_write_pos);
344            let seek_result = if truncate_result.is_ok() {
345                self.file.seek(SeekFrom::End(0))
346            } else {
347                Err(std::io::Error::other("skipped after truncate failure"))
348            };
349
350            if truncate_result.is_err() || seek_result.is_err() {
351                tracing::error!(
352                    write_error = %write_err,
353                    truncate_ok = truncate_result.is_ok(),
354                    seek_ok = seek_result.is_ok(),
355                    pre_write_pos,
356                    "WAL write failed and recovery truncation also failed; \
357                     writer is permanently poisoned"
358                );
359                self.poisoned = true;
360                return Err(WalWriterError::Poisoned);
361            }
362            return Err(WalWriterError::IoError(write_err.to_string()));
363        }
364
365        // Update current sequence
366        self.current_sequence = event.sequence();
367
368        tracing::debug!(sequence = event.sequence(), "WAL event appended");
369
370        Ok(())
371    }
372
373    /// Flushes pending writes to durable storage.
374    ///
375    /// This ensures that all buffered data is written to disk before
376    /// returning. The flush operation is synchronous and will block
377    /// until the data is persisted.
378    ///
379    /// # Errors
380    ///
381    /// Returns `WalWriterError::Closed` if the writer has been closed.
382    /// Returns `WalWriterError::IoError` if the flush operation fails.
383    fn flush(&mut self) -> Result<(), WalWriterError> {
384        if self.is_closed {
385            return Err(WalWriterError::Closed);
386        }
387
388        // Sync the file data to disk
389        self.file.sync_all().map_err(|e| WalWriterError::IoError(e.to_string()))?;
390
391        Ok(())
392    }
393
394    /// Closes the writer, releasing the file handle.
395    ///
396    /// A flush is performed before closing to ensure all data is
397    /// persisted. Once closed, the writer cannot be used for further
398    /// operations.
399    ///
400    /// # Errors
401    ///
402    /// Returns `WalWriterError::IoError` if the flush or close operation fails.
403    fn close(self) -> Result<(), WalWriterError> {
404        let mut this = self;
405
406        // Flush before closing
407        this.flush()?;
408
409        // Mark as closed so Drop does not redundantly call sync_all
410        this.is_closed = true;
411
412        Ok(())
413    }
414}
415
416impl Drop for WalFsWriter {
417    fn drop(&mut self) {
418        // If the writer was not explicitly closed, try to flush on drop
419        // This is a best-effort approach since we can't return errors from drop
420        if !self.is_closed {
421            let _ = self.file.sync_all();
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use std::fs;
429    use std::io::Write;
430    use std::path::PathBuf;
431    use std::sync::atomic::{AtomicUsize, Ordering};
432
433    use super::*;
434    use crate::wal::codec;
435    use crate::wal::tail_validation::WalCorruptionReasonCode;
436
437    // Atomic counter to ensure unique test file paths across parallel tests.
438    static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
439
440    fn temp_wal_path() -> PathBuf {
441        let dir = std::env::temp_dir();
442        let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
443        // Unique per invocation to avoid cross-test collisions under parallel execution.
444        let path =
445            dir.join(format!("actionqueue_wal_writer_test_{}_{}.tmp", std::process::id(), count));
446        // Clean up if exists from previous test runs
447        let _ = fs::remove_file(&path);
448        path
449    }
450
451    fn create_test_task_spec(payload: Vec<u8>) -> actionqueue_core::task::task_spec::TaskSpec {
452        actionqueue_core::task::task_spec::TaskSpec::new(
453            actionqueue_core::ids::TaskId::new(),
454            actionqueue_core::task::task_spec::TaskPayload::with_content_type(
455                payload,
456                "application/octet-stream",
457            ),
458            actionqueue_core::task::run_policy::RunPolicy::Once,
459            actionqueue_core::task::constraints::TaskConstraints::default(),
460            actionqueue_core::task::metadata::TaskMetadata::default(),
461        )
462        .expect("test task spec should be valid")
463    }
464
465    #[test]
466    fn test_creates_new_file() {
467        let path = temp_wal_path();
468        let writer =
469            WalFsWriter::new(path.clone()).expect("writer creation should succeed for new file");
470
471        assert!(path.exists());
472        drop(writer);
473        let _ = fs::remove_file(path);
474    }
475
476    #[test]
477    fn test_append_writes_encoded_event() {
478        let path = temp_wal_path();
479        let mut writer =
480            WalFsWriter::new(path.clone()).expect("writer creation should succeed for append test");
481
482        let event = WalEvent::new(
483            1,
484            crate::wal::event::WalEventType::TaskCreated {
485                task_spec: create_test_task_spec(vec![1, 2, 3]),
486                timestamp: 0,
487            },
488        );
489
490        writer.append(&event).expect("Append should succeed");
491
492        drop(writer);
493        let _ = fs::remove_file(path);
494    }
495
496    #[test]
497    fn test_flush_succeeds() {
498        let path = temp_wal_path();
499        let mut writer =
500            WalFsWriter::new(path.clone()).expect("writer creation should succeed for flush test");
501
502        let event = WalEvent::new(
503            1,
504            crate::wal::event::WalEventType::TaskCreated {
505                task_spec: create_test_task_spec(vec![1, 2, 3]),
506                timestamp: 0,
507            },
508        );
509
510        writer.append(&event).expect("Append should succeed");
511        writer.flush().expect("Flush should succeed");
512
513        drop(writer);
514        let _ = fs::remove_file(path);
515    }
516
517    #[test]
518    fn test_close_succeeds() {
519        let path = temp_wal_path();
520        let writer =
521            WalFsWriter::new(path.clone()).expect("writer creation should succeed for close test");
522        writer.close().expect("Close should succeed");
523
524        let _ = fs::remove_file(path);
525    }
526
527    #[test]
528    fn test_bootstrap_empty_wal() {
529        let path = temp_wal_path();
530        let mut writer = WalFsWriter::new(path.clone())
531            .expect("writer creation should succeed for empty bootstrap test");
532
533        // For an empty file, current_sequence should be 0
534        // The writer's current_sequence field is not public, so we test via
535        // the behavior of appending events starting from sequence 0
536
537        // Add an event and verify the sequence continues correctly
538        let event = WalEvent::new(
539            5,
540            crate::wal::event::WalEventType::TaskCreated {
541                task_spec: create_test_task_spec(vec![1, 2, 3]),
542                timestamp: 0,
543            },
544        );
545
546        writer.append(&event).expect("Append should succeed");
547        drop(writer);
548
549        // Reopen and verify sequence tracking works
550        let mut writer2 = WalFsWriter::new(path.clone())
551            .expect("writer reopening should succeed for empty bootstrap test");
552        let event2 = WalEvent::new(
553            10,
554            crate::wal::event::WalEventType::TaskCreated {
555                task_spec: create_test_task_spec(vec![4, 5, 6]),
556                timestamp: 0,
557            },
558        );
559        writer2.append(&event2).expect("Append should succeed");
560        drop(writer2);
561
562        let _ = fs::remove_file(path);
563    }
564
565    #[test]
566    fn test_bootstrap_with_existing_events() {
567        let path = temp_wal_path();
568
569        // First, create events and write them using a temporary writer
570        {
571            let mut writer = WalFsWriter::new(path.clone())
572                .expect("initial writer creation should succeed for bootstrap test");
573            let event1 = WalEvent::new(
574                3,
575                crate::wal::event::WalEventType::TaskCreated {
576                    task_spec: create_test_task_spec(vec![1, 2, 3]),
577                    timestamp: 0,
578                },
579            );
580            let event2 = WalEvent::new(
581                7,
582                crate::wal::event::WalEventType::RunStateChanged {
583                    run_id: actionqueue_core::ids::RunId::new(),
584                    previous_state: actionqueue_core::run::state::RunState::Scheduled,
585                    new_state: actionqueue_core::run::state::RunState::Running,
586                    timestamp: 1000,
587                },
588            );
589            let event3 = WalEvent::new(
590                12,
591                crate::wal::event::WalEventType::AttemptStarted {
592                    run_id: actionqueue_core::ids::RunId::new(),
593                    attempt_id: actionqueue_core::ids::AttemptId::new(),
594                    timestamp: 2000,
595                },
596            );
597            writer.append(&event1).expect("Append should succeed");
598            writer.append(&event2).expect("Append should succeed");
599            writer.append(&event3).expect("Append should succeed");
600            writer.flush().expect("Flush should succeed");
601        }
602
603        // Reopen the file and verify that the sequence is properly bootstrapped
604        // The writer should now have current_sequence = 12
605        let mut writer = WalFsWriter::new(path.clone())
606            .expect("writer reopening should succeed for bootstrap test");
607
608        // Add a new event and verify it continues from the bootstrap sequence
609        let event = WalEvent::new(
610            15, // Higher than the last sequence
611            crate::wal::event::WalEventType::TaskCreated {
612                task_spec: create_test_task_spec(vec![7, 8, 9]),
613                timestamp: 0,
614            },
615        );
616        writer.append(&event).expect("Append should succeed");
617
618        drop(writer);
619        let _ = fs::remove_file(path);
620    }
621
622    #[test]
623    fn test_bootstrap_fails_with_partial_record() {
624        let path = temp_wal_path();
625
626        // Write some complete events first
627        {
628            let mut writer = WalFsWriter::new(path.clone())
629                .expect("initial writer creation should succeed for partial bootstrap test");
630            let event1 = WalEvent::new(
631                5,
632                crate::wal::event::WalEventType::TaskCreated {
633                    task_spec: create_test_task_spec(vec![1, 2, 3]),
634                    timestamp: 0,
635                },
636            );
637            let event2 = WalEvent::new(
638                10,
639                crate::wal::event::WalEventType::RunStateChanged {
640                    run_id: actionqueue_core::ids::RunId::new(),
641                    previous_state: actionqueue_core::run::state::RunState::Scheduled,
642                    new_state: actionqueue_core::run::state::RunState::Running,
643                    timestamp: 1000,
644                },
645            );
646            writer.append(&event1).expect("Append should succeed");
647            writer.append(&event2).expect("Append should succeed");
648            writer.flush().expect("Flush should succeed");
649        }
650
651        let expected_offset = {
652            let first = codec::encode(&WalEvent::new(
653                5,
654                crate::wal::event::WalEventType::TaskCreated {
655                    task_spec: create_test_task_spec(vec![1, 2, 3]),
656                    timestamp: 0,
657                },
658            ))
659            .expect("encode should succeed")
660            .len() as u64;
661
662            let second = codec::encode(&WalEvent::new(
663                10,
664                crate::wal::event::WalEventType::RunStateChanged {
665                    run_id: actionqueue_core::ids::RunId::new(),
666                    previous_state: actionqueue_core::run::state::RunState::Scheduled,
667                    new_state: actionqueue_core::run::state::RunState::Running,
668                    timestamp: 1000,
669                },
670            ))
671            .expect("encode should succeed")
672            .len() as u64;
673
674            first + second
675        };
676
677        // Append a trailing partial payload record.
678        {
679            use std::fs::OpenOptions;
680            let mut file = OpenOptions::new()
681                .append(true)
682                .create(true)
683                .open(&path)
684                .expect("Failed to open file");
685
686            file.write_all(&codec::VERSION.to_le_bytes()).expect("Failed to write version");
687            file.write_all(&50u32.to_le_bytes()).expect("Failed to write length");
688            file.write_all(&0u32.to_le_bytes()).expect("Failed to write CRC");
689            file.write_all(&[0u8; 20]).expect("Failed to write partial payload");
690            file.sync_all().expect("Failed to flush");
691        }
692
693        let result = WalFsWriter::new(path.clone());
694        assert!(matches!(
695            result,
696            Err(WalFsWriterInitError::Corruption(WalCorruption {
697                offset,
698                reason: WalCorruptionReasonCode::IncompletePayload
699            })) if offset == expected_offset
700        ));
701
702        let _ = fs::remove_file(path);
703    }
704
705    #[test]
706    fn test_new_returns_error_when_parent_directory_is_missing() {
707        let parent = std::env::temp_dir().join(format!(
708            "actionqueue_wal_writer_missing_parent_{}_{}",
709            std::process::id(),
710            TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
711        ));
712        let _ = fs::remove_dir_all(&parent);
713        let path = parent.join("wal.log");
714
715        let result = WalFsWriter::new(path);
716        assert!(matches!(result, Err(WalFsWriterInitError::IoError(_))));
717    }
718
719    #[test]
720    fn test_rejects_duplicate_sequence() {
721        let path = temp_wal_path();
722        let mut writer = WalFsWriter::new(path.clone())
723            .expect("writer creation should succeed for duplicate test");
724
725        // First event should succeed
726        let event1 = WalEvent::new(
727            1,
728            crate::wal::event::WalEventType::TaskCreated {
729                task_spec: create_test_task_spec(vec![1, 2, 3]),
730                timestamp: 0,
731            },
732        );
733        writer.append(&event1).expect("First append should succeed");
734
735        // Duplicate sequence should be rejected
736        let event2 = WalEvent::new(
737            1, // Same as event1
738            crate::wal::event::WalEventType::TaskCreated {
739                task_spec: create_test_task_spec(vec![4, 5, 6]),
740                timestamp: 0,
741            },
742        );
743        let result = writer.append(&event2);
744        assert!(matches!(
745            result,
746            Err(WalWriterError::SequenceViolation { expected: 2, provided: 1 })
747        ));
748
749        drop(writer);
750        let _ = fs::remove_file(path);
751    }
752
753    #[test]
754    fn test_rejects_non_increasing_sequence() {
755        let path = temp_wal_path();
756        let mut writer = WalFsWriter::new(path.clone())
757            .expect("writer creation should succeed for non-increasing test");
758
759        // First event should succeed
760        let event1 = WalEvent::new(
761            5,
762            crate::wal::event::WalEventType::TaskCreated {
763                task_spec: create_test_task_spec(vec![1, 2, 3]),
764                timestamp: 0,
765            },
766        );
767        writer.append(&event1).expect("First append should succeed");
768
769        // Lower sequence should be rejected
770        let event2 = WalEvent::new(
771            3, // Lower than event1
772            crate::wal::event::WalEventType::TaskCreated {
773                task_spec: create_test_task_spec(vec![4, 5, 6]),
774                timestamp: 0,
775            },
776        );
777        let result = writer.append(&event2);
778        assert!(matches!(
779            result,
780            Err(WalWriterError::SequenceViolation { expected: 6, provided: 3 })
781        ));
782
783        // Same sequence should also be rejected
784        let event3 = WalEvent::new(
785            5, // Same as event1
786            crate::wal::event::WalEventType::TaskCreated {
787                task_spec: create_test_task_spec(vec![7, 8, 9]),
788                timestamp: 0,
789            },
790        );
791        let result = writer.append(&event3);
792        assert!(matches!(
793            result,
794            Err(WalWriterError::SequenceViolation { expected: 6, provided: 5 })
795        ));
796
797        drop(writer);
798        let _ = fs::remove_file(path);
799    }
800
801    #[test]
802    fn test_sequence_rejection_preserves_file() {
803        let path = temp_wal_path();
804        let mut writer = WalFsWriter::new(path.clone())
805            .expect("writer creation should succeed for preservation test");
806
807        // Valid event should be written
808        let event1 = WalEvent::new(
809            1,
810            crate::wal::event::WalEventType::TaskCreated {
811                task_spec: create_test_task_spec(vec![1, 2, 3]),
812                timestamp: 0,
813            },
814        );
815        writer.append(&event1).expect("First append should succeed");
816        writer.flush().expect("Flush should succeed");
817
818        // Invalid event should not corrupt file
819        let event2 = WalEvent::new(
820            1, // Duplicate
821            crate::wal::event::WalEventType::TaskCreated {
822                task_spec: create_test_task_spec(vec![4, 5, 6]),
823                timestamp: 0,
824            },
825        );
826        let result = writer.append(&event2);
827        assert!(matches!(result, Err(WalWriterError::SequenceViolation { .. })));
828
829        // Writer should still be usable with correct sequence
830        let event3 = WalEvent::new(
831            2,
832            crate::wal::event::WalEventType::TaskCreated {
833                task_spec: create_test_task_spec(vec![7, 8, 9]),
834                timestamp: 0,
835            },
836        );
837        writer.append(&event3).expect("Append after rejection should succeed");
838
839        drop(writer);
840        let _ = fs::remove_file(path);
841    }
842
843    #[test]
844    fn test_sequence_rejection_across_writer_restart() {
845        let path = temp_wal_path();
846
847        // First writer session - write some events
848        {
849            let mut writer = WalFsWriter::new(path.clone())
850                .expect("writer creation should succeed for restart test");
851            let event1 = WalEvent::new(
852                1,
853                crate::wal::event::WalEventType::TaskCreated {
854                    task_spec: create_test_task_spec(vec![1, 2, 3]),
855                    timestamp: 0,
856                },
857            );
858            writer.append(&event1).expect("First append should succeed");
859            let event2 = WalEvent::new(
860                2,
861                crate::wal::event::WalEventType::TaskCreated {
862                    task_spec: create_test_task_spec(vec![4, 5, 6]),
863                    timestamp: 0,
864                },
865            );
866            writer.append(&event2).expect("Second append should succeed");
867        }
868
869        // Second writer session (bootstrap from file) - should reject old sequences
870        {
871            let mut writer = WalFsWriter::new(path.clone())
872                .expect("writer reopening should succeed for restart test");
873
874            // Sequence 1 should be rejected (already in file)
875            let event1 = WalEvent::new(
876                1,
877                crate::wal::event::WalEventType::TaskCreated {
878                    task_spec: create_test_task_spec(vec![7, 8, 9]),
879                    timestamp: 0,
880                },
881            );
882            let result = writer.append(&event1);
883            assert!(matches!(
884                result,
885                Err(WalWriterError::SequenceViolation { expected: 3, provided: 1 })
886            ));
887
888            // Sequence 2 should also be rejected
889            let event2 = WalEvent::new(
890                2,
891                crate::wal::event::WalEventType::TaskCreated {
892                    task_spec: create_test_task_spec(vec![10, 11, 12]),
893                    timestamp: 0,
894                },
895            );
896            let result = writer.append(&event2);
897            assert!(matches!(
898                result,
899                Err(WalWriterError::SequenceViolation { expected: 3, provided: 2 })
900            ));
901
902            // Sequence 3 should succeed (continues from last sequence in file)
903            let event3 = WalEvent::new(
904                3,
905                crate::wal::event::WalEventType::TaskCreated {
906                    task_spec: create_test_task_spec(vec![13, 14, 15]),
907                    timestamp: 0,
908                },
909            );
910            writer.append(&event3).expect("Append after bootstrap should succeed");
911        }
912
913        let _ = fs::remove_file(path);
914    }
915
916    #[test]
917    fn test_strict_policy_fails_on_partial_record() {
918        let path = temp_wal_path();
919
920        // Write a valid event then a partial record
921        {
922            let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
923            let event = WalEvent::new(
924                1,
925                crate::wal::event::WalEventType::TaskCreated {
926                    task_spec: create_test_task_spec(vec![1, 2, 3]),
927                    timestamp: 0,
928                },
929            );
930            writer.append(&event).expect("append should succeed");
931            writer.flush().expect("flush should succeed");
932        }
933
934        // Append partial record (version + length + CRC + partial payload)
935        {
936            let mut file = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
937            file.write_all(&codec::VERSION.to_le_bytes()).unwrap();
938            file.write_all(&50u32.to_le_bytes()).unwrap();
939            file.write_all(&0u32.to_le_bytes()).unwrap(); // CRC (dummy)
940            file.write_all(&[0u8; 10]).unwrap(); // only 10 of 50 bytes
941            file.sync_all().unwrap();
942        }
943
944        let result = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::Strict);
945        assert!(matches!(result, Err(WalFsWriterInitError::Corruption(_))));
946
947        let _ = fs::remove_file(path);
948    }
949
950    #[test]
951    fn test_truncate_partial_repairs_and_continues() {
952        let path = temp_wal_path();
953
954        // Write 2 valid events
955        {
956            let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
957            let event1 = WalEvent::new(
958                1,
959                crate::wal::event::WalEventType::TaskCreated {
960                    task_spec: create_test_task_spec(vec![1, 2, 3]),
961                    timestamp: 0,
962                },
963            );
964            let event2 = WalEvent::new(
965                2,
966                crate::wal::event::WalEventType::TaskCreated {
967                    task_spec: create_test_task_spec(vec![4, 5, 6]),
968                    timestamp: 0,
969                },
970            );
971            writer.append(&event1).expect("append 1 should succeed");
972            writer.append(&event2).expect("append 2 should succeed");
973            writer.flush().expect("flush should succeed");
974        }
975
976        // Append partial 3rd record (version + length + CRC + partial payload)
977        {
978            let mut file = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
979            file.write_all(&codec::VERSION.to_le_bytes()).unwrap();
980            file.write_all(&100u32.to_le_bytes()).unwrap();
981            file.write_all(&0u32.to_le_bytes()).unwrap(); // CRC (dummy)
982            file.write_all(&[0xAB; 20]).unwrap(); // partial payload
983            file.sync_all().unwrap();
984        }
985
986        // TruncatePartial should succeed
987        let mut writer = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::TruncatePartial)
988            .expect("repair should succeed");
989
990        // Should be able to append event 3 after repair
991        let event3 = WalEvent::new(
992            3,
993            crate::wal::event::WalEventType::TaskCreated {
994                task_spec: create_test_task_spec(vec![7, 8, 9]),
995                timestamp: 0,
996            },
997        );
998        writer.append(&event3).expect("append after repair should succeed");
999        writer.flush().expect("flush should succeed");
1000
1001        drop(writer);
1002
1003        // Verify the file is valid by opening with strict mode
1004        let writer2 =
1005            WalFsWriter::new(path.clone()).expect("strict open after repair should succeed");
1006        assert_eq!(writer2.current_sequence(), 3);
1007
1008        let _ = fs::remove_file(path);
1009    }
1010
1011    #[test]
1012    fn test_truncate_partial_no_corruption_matches_strict() {
1013        let path = temp_wal_path();
1014
1015        // Write clean events
1016        {
1017            let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
1018            let event = WalEvent::new(
1019                1,
1020                crate::wal::event::WalEventType::TaskCreated {
1021                    task_spec: create_test_task_spec(vec![1, 2, 3]),
1022                    timestamp: 0,
1023                },
1024            );
1025            writer.append(&event).expect("append should succeed");
1026            writer.flush().expect("flush should succeed");
1027        }
1028
1029        // TruncatePartial on clean file should behave like Strict
1030        let writer = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::TruncatePartial)
1031            .expect("repair on clean file should succeed");
1032        assert_eq!(writer.current_sequence(), 1);
1033
1034        let _ = fs::remove_file(path);
1035    }
1036}