1use std::fs::{File, OpenOptions};
10use std::io::{Read, Seek, SeekFrom, Write};
11
12use crate::wal::codec::encode;
13use crate::wal::event::WalEvent;
14use crate::wal::repair::RepairPolicy;
15use crate::wal::tail_validation::WalCorruption;
16use crate::wal::writer::{WalWriter, WalWriterError};
17
18enum StreamingReadResult {
20 Record { sequence: u64 },
22 Eof,
24 Corruption(WalCorruption),
26 IoError(String),
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum WalFsWriterInitError {
33 IoError(String),
35 Corruption(WalCorruption),
37}
38
39impl std::fmt::Display for WalFsWriterInitError {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 WalFsWriterInitError::IoError(e) => {
43 write!(f, "I/O error when opening or bootstrapping WAL file: {e}")
44 }
45 WalFsWriterInitError::Corruption(details) => {
46 write!(f, "strict WAL tail validation failed during bootstrap: {details}")
47 }
48 }
49 }
50}
51
52impl std::error::Error for WalFsWriterInitError {}
53
54impl std::convert::From<std::io::Error> for WalFsWriterInitError {
55 fn from(err: std::io::Error) -> Self {
56 WalFsWriterInitError::IoError(err.to_string())
57 }
58}
59
60impl std::convert::From<WalCorruption> for WalFsWriterInitError {
61 fn from(err: WalCorruption) -> Self {
62 WalFsWriterInitError::Corruption(err)
63 }
64}
65
66pub struct WalFsWriter {
73 file: File,
74 current_sequence: u64,
75 is_closed: bool,
76 poisoned: bool,
77}
78
79impl WalFsWriter {
80 pub fn new(path: std::path::PathBuf) -> Result<Self, WalFsWriterInitError> {
102 Self::new_with_repair(path, RepairPolicy::Strict)
103 }
104
105 pub fn new_with_repair(
114 path: std::path::PathBuf,
115 policy: RepairPolicy,
116 ) -> Result<Self, WalFsWriterInitError> {
117 match policy {
118 RepairPolicy::Strict => {
119 let mut file = OpenOptions::new()
120 .create(true)
121 .truncate(false)
122 .read(true)
123 .write(true)
124 .open(&path)?;
125 let current_sequence = Self::load_current_sequence_strict(&file)?;
126 file.seek(SeekFrom::End(0))?;
127 Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
128 }
129 RepairPolicy::TruncatePartial => {
130 let file = OpenOptions::new()
131 .create(true)
132 .truncate(false)
133 .read(true)
134 .write(true)
135 .open(&path)?;
136 let (current_sequence, needs_truncation) =
137 Self::load_current_sequence_lenient(&file)?;
138
139 if let Some(valid_end_offset) = needs_truncation {
140 drop(file);
141 crate::wal::repair::truncate_to_last_valid(&path, valid_end_offset)
142 .map_err(|e| WalFsWriterInitError::IoError(e.to_string()))?;
143 let mut file = OpenOptions::new()
144 .create(true)
145 .truncate(false)
146 .read(true)
147 .write(true)
148 .open(&path)?;
149 file.seek(SeekFrom::End(0))?;
150 Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
151 } else {
152 let mut file = file;
153 file.seek(SeekFrom::End(0))?;
154 Ok(WalFsWriter { file, current_sequence, is_closed: false, poisoned: false })
155 }
156 }
157 }
158 }
159
160 fn load_current_sequence_strict(file: &File) -> Result<u64, WalFsWriterInitError> {
163 let mut file_ref = file.try_clone()?;
164 file_ref.seek(SeekFrom::Start(0))?;
165
166 let mut last_sequence = 0u64;
167 loop {
168 match Self::read_next_record_sequence(&mut file_ref) {
169 StreamingReadResult::Record { sequence } => last_sequence = sequence,
170 StreamingReadResult::Eof => return Ok(last_sequence),
171 StreamingReadResult::Corruption(corruption) => {
172 return Err(WalFsWriterInitError::Corruption(corruption));
173 }
174 StreamingReadResult::IoError(e) => {
175 return Err(WalFsWriterInitError::IoError(e));
176 }
177 }
178 }
179 }
180
181 fn load_current_sequence_lenient(
184 file: &File,
185 ) -> Result<(u64, Option<u64>), WalFsWriterInitError> {
186 let mut file_ref = file.try_clone()?;
187 file_ref.seek(SeekFrom::Start(0))?;
188
189 let mut last_sequence = 0u64;
190 loop {
191 let record_start = file_ref
192 .stream_position()
193 .map_err(|e| WalFsWriterInitError::IoError(e.to_string()))?;
194 match Self::read_next_record_sequence(&mut file_ref) {
195 StreamingReadResult::Record { sequence } => {
196 last_sequence = sequence;
197 }
198 StreamingReadResult::Eof => return Ok((last_sequence, None)),
199 StreamingReadResult::Corruption(_) => {
200 return Ok((last_sequence, Some(record_start)));
202 }
203 StreamingReadResult::IoError(e) => {
204 return Err(WalFsWriterInitError::IoError(e));
205 }
206 }
207 }
208 }
209
210 fn read_next_record_sequence(file: &mut File) -> StreamingReadResult {
213 use crate::wal::codec::{HEADER_LEN, VERSION};
214 use crate::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
215
216 let record_start = match file.stream_position() {
217 Ok(pos) => pos,
218 Err(e) => return StreamingReadResult::IoError(e.to_string()),
219 };
220
221 let mut header = [0u8; HEADER_LEN];
223 let mut total = 0;
224 while total < HEADER_LEN {
225 match file.read(&mut header[total..]) {
226 Ok(0) => {
227 return if total == 0 {
228 StreamingReadResult::Eof
229 } else {
230 StreamingReadResult::Corruption(WalCorruption {
231 offset: record_start,
232 reason: WalCorruptionReasonCode::IncompleteHeader,
233 })
234 };
235 }
236 Ok(n) => total += n,
237 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
238 Err(e) => return StreamingReadResult::IoError(e.to_string()),
239 }
240 }
241
242 let version = u32::from_le_bytes(header[0..4].try_into().unwrap());
243 let payload_len = u32::from_le_bytes(header[4..8].try_into().unwrap());
244
245 if version != VERSION {
246 return StreamingReadResult::Corruption(WalCorruption {
247 offset: record_start,
248 reason: WalCorruptionReasonCode::UnsupportedVersion,
249 });
250 }
251
252 let payload_len_usize = payload_len as usize;
253
254 const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; if payload_len_usize > MAX_REASONABLE_PAYLOAD {
257 return StreamingReadResult::Corruption(WalCorruption {
258 offset: record_start,
259 reason: WalCorruptionReasonCode::DecodeFailure,
260 });
261 }
262
263 let mut payload = vec![0u8; payload_len_usize];
265 match file.read_exact(&mut payload) {
266 Ok(()) => {}
267 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
268 return StreamingReadResult::Corruption(WalCorruption {
269 offset: record_start,
270 reason: WalCorruptionReasonCode::IncompletePayload,
271 });
272 }
273 Err(e) => return StreamingReadResult::IoError(e.to_string()),
274 }
275
276 let mut record = Vec::with_capacity(HEADER_LEN + payload_len_usize);
278 record.extend_from_slice(&header);
279 record.extend_from_slice(&payload);
280
281 match crate::wal::codec::decode(&record) {
282 Ok(event) => StreamingReadResult::Record { sequence: event.sequence() },
283 Err(crate::wal::codec::DecodeError::CrcMismatch { .. }) => {
284 StreamingReadResult::Corruption(WalCorruption {
285 offset: record_start,
286 reason: WalCorruptionReasonCode::CrcMismatch,
287 })
288 }
289 Err(_) => StreamingReadResult::Corruption(WalCorruption {
290 offset: record_start,
291 reason: WalCorruptionReasonCode::DecodeFailure,
292 }),
293 }
294 }
295
296 pub fn current_sequence(&self) -> u64 {
298 self.current_sequence
299 }
300}
301
302impl WalWriter for WalFsWriter {
303 fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
318 if self.is_closed {
319 return Err(WalWriterError::Closed);
320 }
321 if self.poisoned {
322 return Err(WalWriterError::Poisoned);
323 }
324
325 if event.sequence() <= self.current_sequence {
327 return Err(WalWriterError::SequenceViolation {
328 expected: self.current_sequence.saturating_add(1),
329 provided: event.sequence(),
330 });
331 }
332
333 let bytes = encode(event).map_err(|e| WalWriterError::EncodeError(e.to_string()))?;
335
336 let pre_write_pos =
338 self.file.stream_position().map_err(|e| WalWriterError::IoError(e.to_string()))?;
339
340 if let Err(write_err) = self.file.write_all(&bytes) {
342 let truncate_result = self.file.set_len(pre_write_pos);
344 let seek_result = if truncate_result.is_ok() {
345 self.file.seek(SeekFrom::End(0))
346 } else {
347 Err(std::io::Error::other("skipped after truncate failure"))
348 };
349
350 if truncate_result.is_err() || seek_result.is_err() {
351 tracing::error!(
352 write_error = %write_err,
353 truncate_ok = truncate_result.is_ok(),
354 seek_ok = seek_result.is_ok(),
355 pre_write_pos,
356 "WAL write failed and recovery truncation also failed; \
357 writer is permanently poisoned"
358 );
359 self.poisoned = true;
360 return Err(WalWriterError::Poisoned);
361 }
362 return Err(WalWriterError::IoError(write_err.to_string()));
363 }
364
365 self.current_sequence = event.sequence();
367
368 tracing::debug!(sequence = event.sequence(), "WAL event appended");
369
370 Ok(())
371 }
372
373 fn flush(&mut self) -> Result<(), WalWriterError> {
384 if self.is_closed {
385 return Err(WalWriterError::Closed);
386 }
387
388 self.file.sync_all().map_err(|e| WalWriterError::IoError(e.to_string()))?;
390
391 Ok(())
392 }
393
394 fn close(self) -> Result<(), WalWriterError> {
404 let mut this = self;
405
406 this.flush()?;
408
409 this.is_closed = true;
411
412 Ok(())
413 }
414}
415
416impl Drop for WalFsWriter {
417 fn drop(&mut self) {
418 if !self.is_closed {
421 let _ = self.file.sync_all();
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use std::fs;
429 use std::io::Write;
430 use std::path::PathBuf;
431 use std::sync::atomic::{AtomicUsize, Ordering};
432
433 use super::*;
434 use crate::wal::codec;
435 use crate::wal::tail_validation::WalCorruptionReasonCode;
436
437 static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
439
440 fn temp_wal_path() -> PathBuf {
441 let dir = std::env::temp_dir();
442 let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
443 let path =
445 dir.join(format!("actionqueue_wal_writer_test_{}_{}.tmp", std::process::id(), count));
446 let _ = fs::remove_file(&path);
448 path
449 }
450
451 fn create_test_task_spec(payload: Vec<u8>) -> actionqueue_core::task::task_spec::TaskSpec {
452 actionqueue_core::task::task_spec::TaskSpec::new(
453 actionqueue_core::ids::TaskId::new(),
454 actionqueue_core::task::task_spec::TaskPayload::with_content_type(
455 payload,
456 "application/octet-stream",
457 ),
458 actionqueue_core::task::run_policy::RunPolicy::Once,
459 actionqueue_core::task::constraints::TaskConstraints::default(),
460 actionqueue_core::task::metadata::TaskMetadata::default(),
461 )
462 .expect("test task spec should be valid")
463 }
464
465 #[test]
466 fn test_creates_new_file() {
467 let path = temp_wal_path();
468 let writer =
469 WalFsWriter::new(path.clone()).expect("writer creation should succeed for new file");
470
471 assert!(path.exists());
472 drop(writer);
473 let _ = fs::remove_file(path);
474 }
475
476 #[test]
477 fn test_append_writes_encoded_event() {
478 let path = temp_wal_path();
479 let mut writer =
480 WalFsWriter::new(path.clone()).expect("writer creation should succeed for append test");
481
482 let event = WalEvent::new(
483 1,
484 crate::wal::event::WalEventType::TaskCreated {
485 task_spec: create_test_task_spec(vec![1, 2, 3]),
486 timestamp: 0,
487 },
488 );
489
490 writer.append(&event).expect("Append should succeed");
491
492 drop(writer);
493 let _ = fs::remove_file(path);
494 }
495
496 #[test]
497 fn test_flush_succeeds() {
498 let path = temp_wal_path();
499 let mut writer =
500 WalFsWriter::new(path.clone()).expect("writer creation should succeed for flush test");
501
502 let event = WalEvent::new(
503 1,
504 crate::wal::event::WalEventType::TaskCreated {
505 task_spec: create_test_task_spec(vec![1, 2, 3]),
506 timestamp: 0,
507 },
508 );
509
510 writer.append(&event).expect("Append should succeed");
511 writer.flush().expect("Flush should succeed");
512
513 drop(writer);
514 let _ = fs::remove_file(path);
515 }
516
517 #[test]
518 fn test_close_succeeds() {
519 let path = temp_wal_path();
520 let writer =
521 WalFsWriter::new(path.clone()).expect("writer creation should succeed for close test");
522 writer.close().expect("Close should succeed");
523
524 let _ = fs::remove_file(path);
525 }
526
527 #[test]
528 fn test_bootstrap_empty_wal() {
529 let path = temp_wal_path();
530 let mut writer = WalFsWriter::new(path.clone())
531 .expect("writer creation should succeed for empty bootstrap test");
532
533 let event = WalEvent::new(
539 5,
540 crate::wal::event::WalEventType::TaskCreated {
541 task_spec: create_test_task_spec(vec![1, 2, 3]),
542 timestamp: 0,
543 },
544 );
545
546 writer.append(&event).expect("Append should succeed");
547 drop(writer);
548
549 let mut writer2 = WalFsWriter::new(path.clone())
551 .expect("writer reopening should succeed for empty bootstrap test");
552 let event2 = WalEvent::new(
553 10,
554 crate::wal::event::WalEventType::TaskCreated {
555 task_spec: create_test_task_spec(vec![4, 5, 6]),
556 timestamp: 0,
557 },
558 );
559 writer2.append(&event2).expect("Append should succeed");
560 drop(writer2);
561
562 let _ = fs::remove_file(path);
563 }
564
565 #[test]
566 fn test_bootstrap_with_existing_events() {
567 let path = temp_wal_path();
568
569 {
571 let mut writer = WalFsWriter::new(path.clone())
572 .expect("initial writer creation should succeed for bootstrap test");
573 let event1 = WalEvent::new(
574 3,
575 crate::wal::event::WalEventType::TaskCreated {
576 task_spec: create_test_task_spec(vec![1, 2, 3]),
577 timestamp: 0,
578 },
579 );
580 let event2 = WalEvent::new(
581 7,
582 crate::wal::event::WalEventType::RunStateChanged {
583 run_id: actionqueue_core::ids::RunId::new(),
584 previous_state: actionqueue_core::run::state::RunState::Scheduled,
585 new_state: actionqueue_core::run::state::RunState::Running,
586 timestamp: 1000,
587 },
588 );
589 let event3 = WalEvent::new(
590 12,
591 crate::wal::event::WalEventType::AttemptStarted {
592 run_id: actionqueue_core::ids::RunId::new(),
593 attempt_id: actionqueue_core::ids::AttemptId::new(),
594 timestamp: 2000,
595 },
596 );
597 writer.append(&event1).expect("Append should succeed");
598 writer.append(&event2).expect("Append should succeed");
599 writer.append(&event3).expect("Append should succeed");
600 writer.flush().expect("Flush should succeed");
601 }
602
603 let mut writer = WalFsWriter::new(path.clone())
606 .expect("writer reopening should succeed for bootstrap test");
607
608 let event = WalEvent::new(
610 15, crate::wal::event::WalEventType::TaskCreated {
612 task_spec: create_test_task_spec(vec![7, 8, 9]),
613 timestamp: 0,
614 },
615 );
616 writer.append(&event).expect("Append should succeed");
617
618 drop(writer);
619 let _ = fs::remove_file(path);
620 }
621
622 #[test]
623 fn test_bootstrap_fails_with_partial_record() {
624 let path = temp_wal_path();
625
626 {
628 let mut writer = WalFsWriter::new(path.clone())
629 .expect("initial writer creation should succeed for partial bootstrap test");
630 let event1 = WalEvent::new(
631 5,
632 crate::wal::event::WalEventType::TaskCreated {
633 task_spec: create_test_task_spec(vec![1, 2, 3]),
634 timestamp: 0,
635 },
636 );
637 let event2 = WalEvent::new(
638 10,
639 crate::wal::event::WalEventType::RunStateChanged {
640 run_id: actionqueue_core::ids::RunId::new(),
641 previous_state: actionqueue_core::run::state::RunState::Scheduled,
642 new_state: actionqueue_core::run::state::RunState::Running,
643 timestamp: 1000,
644 },
645 );
646 writer.append(&event1).expect("Append should succeed");
647 writer.append(&event2).expect("Append should succeed");
648 writer.flush().expect("Flush should succeed");
649 }
650
651 let expected_offset = {
652 let first = codec::encode(&WalEvent::new(
653 5,
654 crate::wal::event::WalEventType::TaskCreated {
655 task_spec: create_test_task_spec(vec![1, 2, 3]),
656 timestamp: 0,
657 },
658 ))
659 .expect("encode should succeed")
660 .len() as u64;
661
662 let second = codec::encode(&WalEvent::new(
663 10,
664 crate::wal::event::WalEventType::RunStateChanged {
665 run_id: actionqueue_core::ids::RunId::new(),
666 previous_state: actionqueue_core::run::state::RunState::Scheduled,
667 new_state: actionqueue_core::run::state::RunState::Running,
668 timestamp: 1000,
669 },
670 ))
671 .expect("encode should succeed")
672 .len() as u64;
673
674 first + second
675 };
676
677 {
679 use std::fs::OpenOptions;
680 let mut file = OpenOptions::new()
681 .append(true)
682 .create(true)
683 .open(&path)
684 .expect("Failed to open file");
685
686 file.write_all(&codec::VERSION.to_le_bytes()).expect("Failed to write version");
687 file.write_all(&50u32.to_le_bytes()).expect("Failed to write length");
688 file.write_all(&0u32.to_le_bytes()).expect("Failed to write CRC");
689 file.write_all(&[0u8; 20]).expect("Failed to write partial payload");
690 file.sync_all().expect("Failed to flush");
691 }
692
693 let result = WalFsWriter::new(path.clone());
694 assert!(matches!(
695 result,
696 Err(WalFsWriterInitError::Corruption(WalCorruption {
697 offset,
698 reason: WalCorruptionReasonCode::IncompletePayload
699 })) if offset == expected_offset
700 ));
701
702 let _ = fs::remove_file(path);
703 }
704
705 #[test]
706 fn test_new_returns_error_when_parent_directory_is_missing() {
707 let parent = std::env::temp_dir().join(format!(
708 "actionqueue_wal_writer_missing_parent_{}_{}",
709 std::process::id(),
710 TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
711 ));
712 let _ = fs::remove_dir_all(&parent);
713 let path = parent.join("wal.log");
714
715 let result = WalFsWriter::new(path);
716 assert!(matches!(result, Err(WalFsWriterInitError::IoError(_))));
717 }
718
719 #[test]
720 fn test_rejects_duplicate_sequence() {
721 let path = temp_wal_path();
722 let mut writer = WalFsWriter::new(path.clone())
723 .expect("writer creation should succeed for duplicate test");
724
725 let event1 = WalEvent::new(
727 1,
728 crate::wal::event::WalEventType::TaskCreated {
729 task_spec: create_test_task_spec(vec![1, 2, 3]),
730 timestamp: 0,
731 },
732 );
733 writer.append(&event1).expect("First append should succeed");
734
735 let event2 = WalEvent::new(
737 1, crate::wal::event::WalEventType::TaskCreated {
739 task_spec: create_test_task_spec(vec![4, 5, 6]),
740 timestamp: 0,
741 },
742 );
743 let result = writer.append(&event2);
744 assert!(matches!(
745 result,
746 Err(WalWriterError::SequenceViolation { expected: 2, provided: 1 })
747 ));
748
749 drop(writer);
750 let _ = fs::remove_file(path);
751 }
752
753 #[test]
754 fn test_rejects_non_increasing_sequence() {
755 let path = temp_wal_path();
756 let mut writer = WalFsWriter::new(path.clone())
757 .expect("writer creation should succeed for non-increasing test");
758
759 let event1 = WalEvent::new(
761 5,
762 crate::wal::event::WalEventType::TaskCreated {
763 task_spec: create_test_task_spec(vec![1, 2, 3]),
764 timestamp: 0,
765 },
766 );
767 writer.append(&event1).expect("First append should succeed");
768
769 let event2 = WalEvent::new(
771 3, crate::wal::event::WalEventType::TaskCreated {
773 task_spec: create_test_task_spec(vec![4, 5, 6]),
774 timestamp: 0,
775 },
776 );
777 let result = writer.append(&event2);
778 assert!(matches!(
779 result,
780 Err(WalWriterError::SequenceViolation { expected: 6, provided: 3 })
781 ));
782
783 let event3 = WalEvent::new(
785 5, crate::wal::event::WalEventType::TaskCreated {
787 task_spec: create_test_task_spec(vec![7, 8, 9]),
788 timestamp: 0,
789 },
790 );
791 let result = writer.append(&event3);
792 assert!(matches!(
793 result,
794 Err(WalWriterError::SequenceViolation { expected: 6, provided: 5 })
795 ));
796
797 drop(writer);
798 let _ = fs::remove_file(path);
799 }
800
801 #[test]
802 fn test_sequence_rejection_preserves_file() {
803 let path = temp_wal_path();
804 let mut writer = WalFsWriter::new(path.clone())
805 .expect("writer creation should succeed for preservation test");
806
807 let event1 = WalEvent::new(
809 1,
810 crate::wal::event::WalEventType::TaskCreated {
811 task_spec: create_test_task_spec(vec![1, 2, 3]),
812 timestamp: 0,
813 },
814 );
815 writer.append(&event1).expect("First append should succeed");
816 writer.flush().expect("Flush should succeed");
817
818 let event2 = WalEvent::new(
820 1, crate::wal::event::WalEventType::TaskCreated {
822 task_spec: create_test_task_spec(vec![4, 5, 6]),
823 timestamp: 0,
824 },
825 );
826 let result = writer.append(&event2);
827 assert!(matches!(result, Err(WalWriterError::SequenceViolation { .. })));
828
829 let event3 = WalEvent::new(
831 2,
832 crate::wal::event::WalEventType::TaskCreated {
833 task_spec: create_test_task_spec(vec![7, 8, 9]),
834 timestamp: 0,
835 },
836 );
837 writer.append(&event3).expect("Append after rejection should succeed");
838
839 drop(writer);
840 let _ = fs::remove_file(path);
841 }
842
843 #[test]
844 fn test_sequence_rejection_across_writer_restart() {
845 let path = temp_wal_path();
846
847 {
849 let mut writer = WalFsWriter::new(path.clone())
850 .expect("writer creation should succeed for restart test");
851 let event1 = WalEvent::new(
852 1,
853 crate::wal::event::WalEventType::TaskCreated {
854 task_spec: create_test_task_spec(vec![1, 2, 3]),
855 timestamp: 0,
856 },
857 );
858 writer.append(&event1).expect("First append should succeed");
859 let event2 = WalEvent::new(
860 2,
861 crate::wal::event::WalEventType::TaskCreated {
862 task_spec: create_test_task_spec(vec![4, 5, 6]),
863 timestamp: 0,
864 },
865 );
866 writer.append(&event2).expect("Second append should succeed");
867 }
868
869 {
871 let mut writer = WalFsWriter::new(path.clone())
872 .expect("writer reopening should succeed for restart test");
873
874 let event1 = WalEvent::new(
876 1,
877 crate::wal::event::WalEventType::TaskCreated {
878 task_spec: create_test_task_spec(vec![7, 8, 9]),
879 timestamp: 0,
880 },
881 );
882 let result = writer.append(&event1);
883 assert!(matches!(
884 result,
885 Err(WalWriterError::SequenceViolation { expected: 3, provided: 1 })
886 ));
887
888 let event2 = WalEvent::new(
890 2,
891 crate::wal::event::WalEventType::TaskCreated {
892 task_spec: create_test_task_spec(vec![10, 11, 12]),
893 timestamp: 0,
894 },
895 );
896 let result = writer.append(&event2);
897 assert!(matches!(
898 result,
899 Err(WalWriterError::SequenceViolation { expected: 3, provided: 2 })
900 ));
901
902 let event3 = WalEvent::new(
904 3,
905 crate::wal::event::WalEventType::TaskCreated {
906 task_spec: create_test_task_spec(vec![13, 14, 15]),
907 timestamp: 0,
908 },
909 );
910 writer.append(&event3).expect("Append after bootstrap should succeed");
911 }
912
913 let _ = fs::remove_file(path);
914 }
915
916 #[test]
917 fn test_strict_policy_fails_on_partial_record() {
918 let path = temp_wal_path();
919
920 {
922 let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
923 let event = WalEvent::new(
924 1,
925 crate::wal::event::WalEventType::TaskCreated {
926 task_spec: create_test_task_spec(vec![1, 2, 3]),
927 timestamp: 0,
928 },
929 );
930 writer.append(&event).expect("append should succeed");
931 writer.flush().expect("flush should succeed");
932 }
933
934 {
936 let mut file = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
937 file.write_all(&codec::VERSION.to_le_bytes()).unwrap();
938 file.write_all(&50u32.to_le_bytes()).unwrap();
939 file.write_all(&0u32.to_le_bytes()).unwrap(); file.write_all(&[0u8; 10]).unwrap(); file.sync_all().unwrap();
942 }
943
944 let result = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::Strict);
945 assert!(matches!(result, Err(WalFsWriterInitError::Corruption(_))));
946
947 let _ = fs::remove_file(path);
948 }
949
950 #[test]
951 fn test_truncate_partial_repairs_and_continues() {
952 let path = temp_wal_path();
953
954 {
956 let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
957 let event1 = WalEvent::new(
958 1,
959 crate::wal::event::WalEventType::TaskCreated {
960 task_spec: create_test_task_spec(vec![1, 2, 3]),
961 timestamp: 0,
962 },
963 );
964 let event2 = WalEvent::new(
965 2,
966 crate::wal::event::WalEventType::TaskCreated {
967 task_spec: create_test_task_spec(vec![4, 5, 6]),
968 timestamp: 0,
969 },
970 );
971 writer.append(&event1).expect("append 1 should succeed");
972 writer.append(&event2).expect("append 2 should succeed");
973 writer.flush().expect("flush should succeed");
974 }
975
976 {
978 let mut file = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
979 file.write_all(&codec::VERSION.to_le_bytes()).unwrap();
980 file.write_all(&100u32.to_le_bytes()).unwrap();
981 file.write_all(&0u32.to_le_bytes()).unwrap(); file.write_all(&[0xAB; 20]).unwrap(); file.sync_all().unwrap();
984 }
985
986 let mut writer = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::TruncatePartial)
988 .expect("repair should succeed");
989
990 let event3 = WalEvent::new(
992 3,
993 crate::wal::event::WalEventType::TaskCreated {
994 task_spec: create_test_task_spec(vec![7, 8, 9]),
995 timestamp: 0,
996 },
997 );
998 writer.append(&event3).expect("append after repair should succeed");
999 writer.flush().expect("flush should succeed");
1000
1001 drop(writer);
1002
1003 let writer2 =
1005 WalFsWriter::new(path.clone()).expect("strict open after repair should succeed");
1006 assert_eq!(writer2.current_sequence(), 3);
1007
1008 let _ = fs::remove_file(path);
1009 }
1010
1011 #[test]
1012 fn test_truncate_partial_no_corruption_matches_strict() {
1013 let path = temp_wal_path();
1014
1015 {
1017 let mut writer = WalFsWriter::new(path.clone()).expect("initial writer should succeed");
1018 let event = WalEvent::new(
1019 1,
1020 crate::wal::event::WalEventType::TaskCreated {
1021 task_spec: create_test_task_spec(vec![1, 2, 3]),
1022 timestamp: 0,
1023 },
1024 );
1025 writer.append(&event).expect("append should succeed");
1026 writer.flush().expect("flush should succeed");
1027 }
1028
1029 let writer = WalFsWriter::new_with_repair(path.clone(), RepairPolicy::TruncatePartial)
1031 .expect("repair on clean file should succeed");
1032 assert_eq!(writer.current_sequence(), 1);
1033
1034 let _ = fs::remove_file(path);
1035 }
1036}