1use std::fs::{File, OpenOptions};
46use std::io::{Read, Seek, SeekFrom, Write};
47use std::path::Path;
48use std::sync::Arc;
49
50use crate::megakernel::recovery::{classify_backend_recovery_error, MegakernelRecoveryClass};
51use crate::PipelineError;
52use vyre_driver::backend::BackendError;
53
54const LOG_MAGIC: &[u8; 8] = b"VRRL0001";
55const LOG_VERSION: u32 = 1;
56const RECORD_MAGIC: u32 = 0xDEAD_BEEF;
57const RECORD_BYTES: u64 = 64;
58const HEADER_BYTES: u64 = 32;
59const MAX_REPLAY_RECORDS: u64 = 1_048_576;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub struct RecordedSlot {
64 pub timestamp_ns: u64,
66 pub slot_idx: u32,
68 pub tenant_id: u32,
70 pub opcode: u32,
72 pub args: [u32; 4],
75 pub epoch: u32,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct ReplayRecord {
84 pub slot: RecordedSlot,
86 pub failure: Option<ReplayFailureEvidence>,
88}
89
90#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
92pub enum ReplayFailureClass {
93 #[default]
95 None,
96 DeviceLoss,
98 TransientQueue,
100 ProgramBug,
102 Unclassified,
104}
105
106impl ReplayFailureClass {
107 const NONE: u32 = 0;
108 const DEVICE_LOSS: u32 = 1;
109 const TRANSIENT_QUEUE: u32 = 2;
110 const PROGRAM_BUG: u32 = 3;
111 const UNCLASSIFIED: u32 = 4;
112
113 const fn encode(self) -> u32 {
114 match self {
115 Self::None => Self::NONE,
116 Self::DeviceLoss => Self::DEVICE_LOSS,
117 Self::TransientQueue => Self::TRANSIENT_QUEUE,
118 Self::ProgramBug => Self::PROGRAM_BUG,
119 Self::Unclassified => Self::UNCLASSIFIED,
120 }
121 }
122
123 const fn decode(raw: u32) -> Self {
124 match raw {
125 Self::NONE => Self::None,
126 Self::DEVICE_LOSS => Self::DeviceLoss,
127 Self::TRANSIENT_QUEUE => Self::TransientQueue,
128 Self::PROGRAM_BUG => Self::ProgramBug,
129 Self::UNCLASSIFIED => Self::Unclassified,
130 _ => Self::Unclassified,
131 }
132 }
133
134 const fn from_recovery_class(class: MegakernelRecoveryClass) -> Self {
135 match class {
136 MegakernelRecoveryClass::DeviceLoss => Self::DeviceLoss,
137 MegakernelRecoveryClass::TransientQueue => Self::TransientQueue,
138 MegakernelRecoveryClass::ProgramBug => Self::ProgramBug,
139 MegakernelRecoveryClass::Unclassified => Self::Unclassified,
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct ReplayFailureEvidence {
147 pub slot_status: u32,
149 pub failure_class: ReplayFailureClass,
151 pub backend_error_code: u32,
153 pub output_digest: u64,
155}
156
157impl ReplayFailureEvidence {
158 #[must_use]
160 pub fn from_backend_error(slot_status: u32, error: &BackendError, output_bytes: &[u8]) -> Self {
161 Self {
162 slot_status,
163 failure_class: ReplayFailureClass::from_recovery_class(
164 classify_backend_recovery_error(error),
165 ),
166 backend_error_code: error.code().stable_id(),
167 output_digest: output_digest(output_bytes),
168 }
169 }
170
171 fn from_words(
172 slot_status: u32,
173 failure_class: u32,
174 backend_error_code: u32,
175 output_digest: u64,
176 ) -> Option<Self> {
177 if slot_status == 0 && failure_class == 0 && backend_error_code == 0 && output_digest == 0 {
178 return None;
179 }
180 Some(Self {
181 slot_status,
182 failure_class: ReplayFailureClass::decode(failure_class),
183 backend_error_code,
184 output_digest,
185 })
186 }
187}
188
189#[derive(Debug, thiserror::Error)]
192#[non_exhaustive]
193pub enum ReplayLogError {
194 #[error("replay log {op} on `{path}` failed: {source}. Fix: check disk space + permissions.")]
196 Io {
197 op: &'static str,
199 path: Arc<str>,
201 #[source]
203 source: std::io::Error,
204 },
205 #[error("replay log `{path}` header mismatch. Fix: regenerate the log; VRRL format may have changed.")]
207 HeaderMismatch {
208 path: Arc<str>,
210 },
211 #[error("replay log capacity must be > 0. Fix: construct with at least one slot.")]
213 ZeroCapacity,
214 #[error("replay log capacity {count} exceeds max {max}. Fix: shard replay into smaller logs.")]
218 CapacityOverflow {
219 count: u64,
221 max: u64,
223 },
224}
225
226fn io_err(op: &'static str, path: &Path, source: std::io::Error) -> ReplayLogError {
227 ReplayLogError::Io {
228 op,
229 path: Arc::from(path.to_string_lossy().as_ref()),
230 source,
231 }
232}
233
234#[derive(Debug)]
238pub struct RingLog {
239 file: File,
240 path_repr: Arc<str>,
241 capacity: u64,
242 next_slot: u64,
243}
244
245impl RingLog {
246 pub fn open(path: impl AsRef<Path>, capacity: u64) -> Result<Self, ReplayLogError> {
257 if capacity == 0 {
258 return Err(ReplayLogError::ZeroCapacity);
259 }
260 validate_capacity(capacity)?;
261
262 let path = path.as_ref();
263 let path_repr: Arc<str> = Arc::from(path.to_string_lossy().as_ref());
264 let existed = path.exists();
265 let mut file = OpenOptions::new()
266 .create(true)
267 .truncate(false)
268 .read(true)
269 .write(true)
270 .open(path)
271 .map_err(|e| io_err("open", path, e))?;
272
273 if existed {
274 let mut magic = [0u8; 8];
275 file.read_exact(&mut magic)
276 .map_err(|e| io_err("read", path, e))?;
277 if &magic != LOG_MAGIC {
278 return Err(ReplayLogError::HeaderMismatch {
279 path: Arc::clone(&path_repr),
280 });
281 }
282 let mut version_bytes = [0u8; 4];
283 file.read_exact(&mut version_bytes)
284 .map_err(|e| io_err("read", path, e))?;
285 if u32::from_le_bytes(version_bytes) != LOG_VERSION {
286 return Err(ReplayLogError::HeaderMismatch {
287 path: Arc::clone(&path_repr),
288 });
289 }
290 let mut _flags = [0u8; 4];
291 file.read_exact(&mut _flags)
292 .map_err(|e| io_err("read", path, e))?;
293 let mut cap_bytes = [0u8; 8];
294 file.read_exact(&mut cap_bytes)
295 .map_err(|e| io_err("read", path, e))?;
296 let mut cursor_bytes = [0u8; 8];
297 file.read_exact(&mut cursor_bytes)
298 .map_err(|e| io_err("read", path, e))?;
299 let existing_cap = u64::from_le_bytes(cap_bytes);
300 validate_capacity(existing_cap)?;
301 let cursor = u64::from_le_bytes(cursor_bytes);
302 return Ok(Self {
303 file,
304 path_repr,
305 capacity: existing_cap,
306 next_slot: cursor % existing_cap,
307 });
308 }
309
310 let total_bytes = log_file_len(capacity)?;
314 file.set_len(total_bytes)
315 .map_err(|e| io_err("set_len", path, e))?;
316 file.seek(SeekFrom::Start(0))
317 .map_err(|e| io_err("seek", path, e))?;
318 file.write_all(LOG_MAGIC)
319 .map_err(|e| io_err("write", path, e))?;
320 file.write_all(&LOG_VERSION.to_le_bytes())
321 .map_err(|e| io_err("write", path, e))?;
322 file.write_all(&0u32.to_le_bytes())
323 .map_err(|e| io_err("write", path, e))?; file.write_all(&capacity.to_le_bytes())
325 .map_err(|e| io_err("write", path, e))?;
326 file.write_all(&0u64.to_le_bytes())
327 .map_err(|e| io_err("write", path, e))?; Ok(Self {
330 file,
331 path_repr,
332 capacity,
333 next_slot: 0,
334 })
335 }
336
337 #[must_use]
340 pub fn capacity(&self) -> u64 {
341 self.capacity
342 }
343
344 #[must_use]
346 pub fn cursor(&self) -> u64 {
347 self.next_slot
348 }
349
350 #[must_use]
352 pub fn path(&self) -> &str {
353 self.path_repr.as_ref()
354 }
355
356 pub fn append(&mut self, slot: RecordedSlot) -> Result<(), ReplayLogError> {
364 self.append_record(ReplayRecord {
365 slot,
366 failure: None,
367 })
368 }
369
370 pub fn append_with_failure(
376 &mut self,
377 slot: RecordedSlot,
378 failure: ReplayFailureEvidence,
379 ) -> Result<(), ReplayLogError> {
380 self.append_record(ReplayRecord {
381 slot,
382 failure: Some(failure),
383 })
384 }
385
386 fn append_record(&mut self, record: ReplayRecord) -> Result<(), ReplayLogError> {
387 let record_offset = log_record_offset(self.next_slot)?;
388 self.file
389 .seek(SeekFrom::Start(record_offset))
390 .map_err(|e| self.io_err("seek", e))?;
391
392 let mut buf = [0u8; RECORD_BYTES as usize];
393 buf[0..4].copy_from_slice(&RECORD_MAGIC.to_le_bytes());
394 buf[4..12].copy_from_slice(&record.slot.timestamp_ns.to_le_bytes());
395 buf[12..16].copy_from_slice(&record.slot.slot_idx.to_le_bytes());
396 buf[16..20].copy_from_slice(&record.slot.tenant_id.to_le_bytes());
397 buf[20..24].copy_from_slice(&record.slot.opcode.to_le_bytes());
398 buf[24..28].copy_from_slice(&record.slot.args[0].to_le_bytes());
399 buf[28..32].copy_from_slice(&record.slot.args[1].to_le_bytes());
400 buf[32..36].copy_from_slice(&record.slot.args[2].to_le_bytes());
401 buf[36..40].copy_from_slice(&record.slot.args[3].to_le_bytes());
402 buf[40..44].copy_from_slice(&record.slot.epoch.to_le_bytes());
403 if let Some(failure) = record.failure {
404 buf[44..48].copy_from_slice(&failure.slot_status.to_le_bytes());
405 buf[48..52].copy_from_slice(&failure.failure_class.encode().to_le_bytes());
406 buf[52..56].copy_from_slice(&failure.backend_error_code.to_le_bytes());
407 buf[56..64].copy_from_slice(&failure.output_digest.to_le_bytes());
408 }
409 self.file
410 .write_all(&buf)
411 .map_err(|e| self.io_err("write", e))?;
412
413 self.next_slot = (self.next_slot + 1) % self.capacity;
416 self.file
417 .seek(SeekFrom::Start(24)) .map_err(|e| self.io_err("seek", e))?;
419 self.file
420 .write_all(&self.next_slot.to_le_bytes())
421 .map_err(|e| self.io_err("write", e))?;
422
423 Ok(())
424 }
425
426 pub fn replay_all(&mut self) -> Result<Vec<RecordedSlot>, ReplayLogError> {
437 Ok(self
438 .replay_records()?
439 .into_iter()
440 .map(|record| record.slot)
441 .collect())
442 }
443
444 pub fn replay_records(&mut self) -> Result<Vec<ReplayRecord>, ReplayLogError> {
451 let capacity =
452 usize::try_from(self.capacity).map_err(|_| ReplayLogError::CapacityOverflow {
453 count: self.capacity,
454 max: MAX_REPLAY_RECORDS,
455 })?;
456 let mut out = Vec::with_capacity(capacity);
457 for step in 0..self.capacity {
458 let slot_index = (self.next_slot + step) % self.capacity;
459 let offset = log_record_offset(slot_index)?;
460 self.file
461 .seek(SeekFrom::Start(offset))
462 .map_err(|e| self.io_err("seek", e))?;
463 let mut buf = [0u8; RECORD_BYTES as usize];
464 self.file
465 .read_exact(&mut buf)
466 .map_err(|e| self.io_err("read", e))?;
467 let magic = read_u32(&buf, 0);
468 if magic == 0 {
469 continue;
471 }
472 if magic != RECORD_MAGIC {
473 return Err(ReplayLogError::HeaderMismatch {
474 path: self.path_repr.clone(),
475 });
476 }
477 let slot = RecordedSlot {
478 timestamp_ns: read_u64(&buf, 4),
479 slot_idx: read_u32(&buf, 12),
480 tenant_id: read_u32(&buf, 16),
481 opcode: read_u32(&buf, 20),
482 args: [
483 read_u32(&buf, 24),
484 read_u32(&buf, 28),
485 read_u32(&buf, 32),
486 read_u32(&buf, 36),
487 ],
488 epoch: read_u32(&buf, 40),
489 };
490 out.push(ReplayRecord {
491 slot,
492 failure: ReplayFailureEvidence::from_words(
493 read_u32(&buf, 44),
494 read_u32(&buf, 48),
495 read_u32(&buf, 52),
496 read_u64(&buf, 56),
497 ),
498 });
499 }
500 Ok(out)
501 }
502
503 pub fn sync(&mut self) -> Result<(), ReplayLogError> {
511 self.file.sync_all().map_err(|e| self.io_err("sync", e))?;
512 Ok(())
513 }
514
515 fn io_err(&self, op: &'static str, source: std::io::Error) -> ReplayLogError {
516 ReplayLogError::Io {
517 op,
518 path: self.path_repr.clone(),
519 source,
520 }
521 }
522}
523
524fn validate_capacity(capacity: u64) -> Result<(), ReplayLogError> {
525 if capacity == 0 {
526 return Err(ReplayLogError::ZeroCapacity);
527 }
528 if capacity > MAX_REPLAY_RECORDS {
529 return Err(ReplayLogError::CapacityOverflow {
530 count: capacity,
531 max: MAX_REPLAY_RECORDS,
532 });
533 }
534 Ok(())
535}
536
537fn log_file_len(capacity: u64) -> Result<u64, ReplayLogError> {
538 log_record_position(capacity)
539}
540
541fn log_record_offset(slot_index: u64) -> Result<u64, ReplayLogError> {
542 log_record_position(slot_index)
543}
544
545fn log_record_position(record_index: u64) -> Result<u64, ReplayLogError> {
546 let record_bytes =
547 vyre_driver::accounting::checked_mul_u64_lazy(record_index, RECORD_BYTES, || {
548 replay_capacity_overflow(record_index)
549 })?;
550 vyre_driver::accounting::checked_add_u64_lazy(HEADER_BYTES, record_bytes, || {
551 replay_capacity_overflow(record_index)
552 })
553}
554
555fn replay_capacity_overflow(count: u64) -> ReplayLogError {
556 ReplayLogError::CapacityOverflow {
557 count,
558 max: MAX_REPLAY_RECORDS,
559 }
560}
561
562fn read_u32(buf: &[u8], offset: usize) -> u32 {
563 let mut bytes = [0u8; 4];
564 bytes.copy_from_slice(&buf[offset..offset + 4]);
565 u32::from_le_bytes(bytes)
566}
567
568fn read_u64(buf: &[u8], offset: usize) -> u64 {
569 let mut bytes = [0u8; 8];
570 bytes.copy_from_slice(&buf[offset..offset + 8]);
571 u64::from_le_bytes(bytes)
572}
573
574fn output_digest(bytes: &[u8]) -> u64 {
575 let digest = blake3::hash(bytes);
576 let mut out = [0u8; 8];
577 out.copy_from_slice(&digest.as_bytes()[..8]);
578 u64::from_le_bytes(out)
579}
580
581impl From<ReplayLogError> for PipelineError {
584 fn from(err: ReplayLogError) -> Self {
585 PipelineError::Backend(err.to_string())
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 fn rec(slot_idx: u32, epoch: u32) -> RecordedSlot {
594 RecordedSlot {
595 timestamp_ns: 1_000_000 + slot_idx as u64,
596 slot_idx,
597 tenant_id: 0,
598 opcode: 0x4000_0000 + slot_idx,
599 args: [slot_idx, slot_idx * 2, slot_idx * 3, slot_idx * 4],
600 epoch,
601 }
602 }
603
604 #[test]
605 fn open_rejects_zero_capacity() {
606 let dir = tempfile::tempdir().unwrap();
607 let path = dir.path().join("log.vrrl");
608 let err = RingLog::open(&path, 0).expect_err("zero capacity must reject");
609 assert!(matches!(err, ReplayLogError::ZeroCapacity));
610 }
611
612 #[test]
613 fn append_and_replay_round_trip() {
614 let dir = tempfile::tempdir().unwrap();
615 let path = dir.path().join("log.vrrl");
616 let mut log = RingLog::open(&path, 4)
617 .expect("Fix: open fresh log; restore this invariant before continuing.");
618 log.append(rec(1, 10)).unwrap();
619 log.append(rec(2, 11)).unwrap();
620 log.sync().unwrap();
621
622 let replay = log
623 .replay_all()
624 .expect("Fix: replay; restore this invariant before continuing.");
625 assert_eq!(replay.len(), 2);
626 assert_eq!(replay[0].slot_idx, 1);
627 assert_eq!(replay[0].epoch, 10);
628 assert_eq!(replay[1].slot_idx, 2);
629 assert_eq!(replay[1].epoch, 11);
630 }
631
632 #[test]
633 fn append_with_failure_round_trips_reproduction_evidence() {
634 let dir = tempfile::tempdir().unwrap();
635 let path = dir.path().join("log.vrrl");
636 let mut log = RingLog::open(&path, 4)
637 .expect("Fix: open fresh log; restore this invariant before continuing.");
638 let backend_error = BackendError::DispatchFailed {
639 code: Some(17),
640 message: "DeviceLost after queue submit".to_string(),
641 };
642 let failure =
643 ReplayFailureEvidence::from_backend_error(3, &backend_error, b"partial-output");
644
645 assert_eq!(failure.failure_class, ReplayFailureClass::DeviceLoss);
646 assert_eq!(failure.backend_error_code, backend_error.code().stable_id());
647 assert_ne!(failure.output_digest, 0);
648
649 log.append_with_failure(rec(7, 44), failure).unwrap();
650 log.sync().unwrap();
651
652 let replay = log
653 .replay_records()
654 .expect("Fix: replay records; restore this invariant before continuing.");
655 assert_eq!(replay.len(), 1);
656 assert_eq!(replay[0].slot.slot_idx, 7);
657 assert_eq!(replay[0].slot.epoch, 44);
658 assert_eq!(replay[0].failure, Some(failure));
659 }
660
661 #[test]
662 fn append_without_failure_has_no_failure_evidence() {
663 let dir = tempfile::tempdir().unwrap();
664 let path = dir.path().join("log.vrrl");
665 let mut log = RingLog::open(&path, 2)
666 .expect("Fix: open fresh log; restore this invariant before continuing.");
667
668 log.append(rec(1, 10)).unwrap();
669
670 let replay = log
671 .replay_records()
672 .expect("Fix: replay records; restore this invariant before continuing.");
673 assert_eq!(replay.len(), 1);
674 assert_eq!(replay[0].slot.slot_idx, 1);
675 assert_eq!(replay[0].failure, None);
676 }
677
678 #[test]
679 fn log_rollover_preserves_most_recent() {
680 let dir = tempfile::tempdir().unwrap();
681 let path = dir.path().join("log.vrrl");
682 let mut log =
683 RingLog::open(&path, 3).expect("Fix: open; restore this invariant before continuing.");
684 for i in 0..5 {
685 log.append(rec(i, 100 + i)).unwrap();
686 }
687 let replay = log
688 .replay_all()
689 .expect("Fix: replay; restore this invariant before continuing.");
690 assert_eq!(replay.len(), 3, "capacity=3 must retain exactly 3 records");
691 let slot_ids: Vec<u32> = replay.iter().map(|r| r.slot_idx).collect();
692 assert_eq!(slot_ids, vec![2, 3, 4]);
696 }
697
698 #[test]
699 fn reopen_restores_cursor() {
700 let dir = tempfile::tempdir().unwrap();
701 let path = dir.path().join("log.vrrl");
702 {
703 let mut log = RingLog::open(&path, 4)
704 .expect("Fix: open fresh; restore this invariant before continuing.");
705 log.append(rec(1, 10)).unwrap();
706 log.append(rec(2, 11)).unwrap();
707 log.sync().unwrap();
708 }
709 let mut reopened = RingLog::open(&path, 4)
710 .expect("Fix: reopen; restore this invariant before continuing.");
711 assert_eq!(reopened.cursor(), 2);
712 let replay = reopened.replay_all().unwrap();
713 assert_eq!(replay.len(), 2);
714 }
715
716 #[test]
717 fn corrupted_magic_rejected() {
718 use std::io::Write as _;
719
720 let dir = tempfile::tempdir().unwrap();
721 let path = dir.path().join("log.vrrl");
722 {
723 let mut f = std::fs::File::create(&path).unwrap();
725 f.write_all(b"XXXX0001").unwrap();
726 f.write_all(&1u32.to_le_bytes()).unwrap();
727 f.write_all(&0u32.to_le_bytes()).unwrap();
728 f.write_all(&4u64.to_le_bytes()).unwrap();
729 f.write_all(&0u64.to_le_bytes()).unwrap();
730 f.set_len(HEADER_BYTES + 4 * RECORD_BYTES).unwrap();
732 }
733 let err = RingLog::open(&path, 4).expect_err("wrong magic must reject");
734 assert!(matches!(err, ReplayLogError::HeaderMismatch { .. }));
735 }
736
737 fn write_header(path: &Path, capacity: u64, cursor: u64) {
738 use std::io::Write as _;
739
740 let mut f = std::fs::File::create(path).unwrap();
741 f.write_all(LOG_MAGIC).unwrap();
742 f.write_all(&LOG_VERSION.to_le_bytes()).unwrap();
743 f.write_all(&0u32.to_le_bytes()).unwrap();
744 f.write_all(&capacity.to_le_bytes()).unwrap();
745 f.write_all(&cursor.to_le_bytes()).unwrap();
746 }
747
748 #[test]
749 fn existing_log_zero_capacity_rejected_before_cursor_modulo() {
750 let dir = tempfile::tempdir().unwrap();
751 let path = dir.path().join("log.vrrl");
752 write_header(&path, 0, 0);
753
754 let err = RingLog::open(&path, 4).expect_err("header capacity=0 must reject");
755 assert!(matches!(err, ReplayLogError::ZeroCapacity));
756 }
757
758 #[test]
759 fn existing_log_huge_capacity_rejected_before_replay_allocation() {
760 let dir = tempfile::tempdir().unwrap();
761 let path = dir.path().join("log.vrrl");
762 write_header(&path, MAX_REPLAY_RECORDS + 1, 0);
763
764 let err = RingLog::open(&path, 4).expect_err("huge header capacity must reject");
765 assert!(matches!(
766 err,
767 ReplayLogError::CapacityOverflow {
768 count,
769 max: MAX_REPLAY_RECORDS
770 } if count == MAX_REPLAY_RECORDS + 1
771 ));
772 }
773
774 #[test]
775 fn capacity_overflow_rejected() {
776 let dir = tempfile::tempdir().unwrap();
777 let path = dir.path().join("log.vrrl");
778 let err = RingLog::open(&path, MAX_REPLAY_RECORDS + 1)
779 .expect_err("over-size capacity must reject");
780 assert!(matches!(
781 err,
782 ReplayLogError::CapacityOverflow {
783 count,
784 max: MAX_REPLAY_RECORDS
785 } if count == MAX_REPLAY_RECORDS + 1
786 ));
787 }
788}