Skip to main content

vyre_runtime/
replay.rs

1//! Differential megakernel replay log.
2//!
3//! Every slot the host publishes into the megakernel ring is also
4//! appended to a circular log on disk. A later replay run can feed
5//! the log into a fresh megakernel + backend pair and diff the
6//! epoch-by-epoch observable stream against the original. This
7//! catches schedule-dependent bugs  -  GPU nondeterminism, atomic
8//! ordering hazards, cache-line races  -  that unit tests cannot hit
9//! by construction.
10//!
11//! ## Layout
12//!
13//! ```text
14//! header (32 bytes, aligned to 4 KiB):
15//!     magic:          b"VRRL0001"        (8 bytes)    -  "Vyre Ring-Replay Log"
16//!     version:        u32 = 1            (4 bytes)
17//!     flags:          u32 = 0            (4 bytes)
18//!     capacity:       u64                (8 bytes)    -  total record slots
19//!     next_slot:      u64                (8 bytes)    -  write cursor (mod capacity)
20//! records:                                          (capacity × RECORD_BYTES)
21//!     magic:          u32 = 0xDEADBEEF  (4 bytes)   -  sync marker for forward scan
22//!     timestamp_ns:   u64                (8 bytes)
23//!     slot_idx:       u32                (4 bytes)
24//!     tenant_id:      u32                (4 bytes)
25//!     opcode:         u32                (4 bytes)
26//!     args:           [u32; 4]           (16 bytes)
27//!     epoch:          u32                (4 bytes)   -  observed at publish time
28//!     slot_status:    u32                (4 bytes)   -  terminal ring status, zero when unknown
29//!     failure_class:  u32                (4 bytes)   -  [`ReplayFailureClass`] discriminant
30//!     backend_code:   u32                (4 bytes)   -  stable [`vyre_driver::backend::ErrorCode`]
31//!     output_digest:  u64                (8 bytes)   -  digest of output bytes observed at failure
32//! ```
33//!
34//! Record size = 52 bytes ≤ 64. Aligning to 64 by padding the reserved
35//! tail keeps records cache-line aligned so a consumer can `mmap` the
36//! log and read records without tearing.
37//!
38//! ## Rollover
39//!
40//! The log is a fixed-capacity ring. `next_slot = (next_slot + 1) %
41//! capacity`; a replay iterates from `next_slot` through all records
42//! that have a live magic word. Records that predate the first wrap
43//! are overwritten in publish order.
44
45use std::fs::{File, OpenOptions};
46use std::io::{Read, Seek, SeekFrom, Write};
47use std::path::Path;
48use std::sync::Arc;
49
50use crate::megakernel::recovery::{classify_backend_recovery_error, MegakernelRecoveryClass};
51use crate::PipelineError;
52use vyre_driver::backend::BackendError;
53
54const LOG_MAGIC: &[u8; 8] = b"VRRL0001";
55const LOG_VERSION: u32 = 1;
56const RECORD_MAGIC: u32 = 0xDEAD_BEEF;
57const RECORD_BYTES: u64 = 64;
58const HEADER_BYTES: u64 = 32;
59const MAX_REPLAY_RECORDS: u64 = 1_048_576;
60
61/// One published ring slot as captured by the replay log.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub struct RecordedSlot {
64    /// Host wall-clock timestamp, nanoseconds since UNIX epoch.
65    pub timestamp_ns: u64,
66    /// Ring slot index the host published into.
67    pub slot_idx: u32,
68    /// Tenant id from the slot's TENANT_WORD.
69    pub tenant_id: u32,
70    /// Opcode from the slot's OPCODE_WORD.
71    pub opcode: u32,
72    /// First four argument words (the rest of the 13-word arg space
73    /// lives in a packed-slot extension and is captured separately).
74    pub args: [u32; 4],
75    /// Megakernel EPOCH word observed at publish time. A replay run
76    /// on the same backend must reach the same epoch in the same
77    /// order  -  divergence is the load-bearing signal.
78    pub epoch: u32,
79}
80
81/// One replay record including optional failure evidence.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct ReplayRecord {
84    /// Published ring slot.
85    pub slot: RecordedSlot,
86    /// Backend/runtime failure evidence captured for this slot.
87    pub failure: Option<ReplayFailureEvidence>,
88}
89
90/// Backend/runtime failure class encoded into the replay record tail.
91#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
92pub enum ReplayFailureClass {
93    /// No failure evidence was recorded for this published slot.
94    #[default]
95    None,
96    /// Backend context, adapter, or compiled-pipeline state was lost or stale.
97    DeviceLoss,
98    /// Queue/resource pressure that can be retried without recompilation.
99    TransientQueue,
100    /// Program/lowering/kernel-source failure that should not be retried as-is.
101    ProgramBug,
102    /// Failure did not match a known automated recovery class.
103    Unclassified,
104}
105
106impl ReplayFailureClass {
107    const NONE: u32 = 0;
108    const DEVICE_LOSS: u32 = 1;
109    const TRANSIENT_QUEUE: u32 = 2;
110    const PROGRAM_BUG: u32 = 3;
111    const UNCLASSIFIED: u32 = 4;
112
113    const fn encode(self) -> u32 {
114        match self {
115            Self::None => Self::NONE,
116            Self::DeviceLoss => Self::DEVICE_LOSS,
117            Self::TransientQueue => Self::TRANSIENT_QUEUE,
118            Self::ProgramBug => Self::PROGRAM_BUG,
119            Self::Unclassified => Self::UNCLASSIFIED,
120        }
121    }
122
123    const fn decode(raw: u32) -> Self {
124        match raw {
125            Self::NONE => Self::None,
126            Self::DEVICE_LOSS => Self::DeviceLoss,
127            Self::TRANSIENT_QUEUE => Self::TransientQueue,
128            Self::PROGRAM_BUG => Self::ProgramBug,
129            Self::UNCLASSIFIED => Self::Unclassified,
130            _ => Self::Unclassified,
131        }
132    }
133
134    const fn from_recovery_class(class: MegakernelRecoveryClass) -> Self {
135        match class {
136            MegakernelRecoveryClass::DeviceLoss => Self::DeviceLoss,
137            MegakernelRecoveryClass::TransientQueue => Self::TransientQueue,
138            MegakernelRecoveryClass::ProgramBug => Self::ProgramBug,
139            MegakernelRecoveryClass::Unclassified => Self::Unclassified,
140        }
141    }
142}
143
144/// Failure evidence captured in a replay record.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct ReplayFailureEvidence {
147    /// Terminal or observed ring status word for the failed slot.
148    pub slot_status: u32,
149    /// Recovery-oriented failure class.
150    pub failure_class: ReplayFailureClass,
151    /// Stable backend error code. Zero means no backend error was known.
152    pub backend_error_code: u32,
153    /// Stable digest over output bytes observed before/at failure.
154    pub output_digest: u64,
155}
156
157impl ReplayFailureEvidence {
158    /// Build replay failure evidence from a backend error and observed output bytes.
159    #[must_use]
160    pub fn from_backend_error(slot_status: u32, error: &BackendError, output_bytes: &[u8]) -> Self {
161        Self {
162            slot_status,
163            failure_class: ReplayFailureClass::from_recovery_class(
164                classify_backend_recovery_error(error),
165            ),
166            backend_error_code: error.code().stable_id(),
167            output_digest: output_digest(output_bytes),
168        }
169    }
170
171    fn from_words(
172        slot_status: u32,
173        failure_class: u32,
174        backend_error_code: u32,
175        output_digest: u64,
176    ) -> Option<Self> {
177        if slot_status == 0 && failure_class == 0 && backend_error_code == 0 && output_digest == 0 {
178            return None;
179        }
180        Some(Self {
181            slot_status,
182            failure_class: ReplayFailureClass::decode(failure_class),
183            backend_error_code,
184            output_digest,
185        })
186    }
187}
188
189/// Errors surfaced by the replay-log surface. Every variant carries
190/// an actionable `Fix:` hint.
191#[derive(Debug, thiserror::Error)]
192#[non_exhaustive]
193pub enum ReplayLogError {
194    /// I/O syscall on the log file failed.
195    #[error("replay log {op} on `{path}` failed: {source}. Fix: check disk space + permissions.")]
196    Io {
197        /// Syscall name (`open`, `seek`, `read`, `write`).
198        op: &'static str,
199        /// Path the syscall was issued against.
200        path: Arc<str>,
201        /// Underlying io::Error.
202        #[source]
203        source: std::io::Error,
204    },
205    /// Log header magic or version mismatch.
206    #[error("replay log `{path}` header mismatch. Fix: regenerate the log; VRRL format may have changed.")]
207    HeaderMismatch {
208        /// Log path.
209        path: Arc<str>,
210    },
211    /// Capacity of `0` is rejected  -  a zero-capacity log never accepts writes.
212    #[error("replay log capacity must be > 0. Fix: construct with at least one slot.")]
213    ZeroCapacity,
214    /// Record capacity exceeds the replay-log bound. Capping here
215    /// prevents malformed log headers from forcing host OOM during
216    /// replay and keeps record offsets within checked arithmetic.
217    #[error("replay log capacity {count} exceeds max {max}. Fix: shard replay into smaller logs.")]
218    CapacityOverflow {
219        /// Requested capacity.
220        count: u64,
221        /// Maximum accepted capacity.
222        max: u64,
223    },
224}
225
226fn io_err(op: &'static str, path: &Path, source: std::io::Error) -> ReplayLogError {
227    ReplayLogError::Io {
228        op,
229        path: Arc::from(path.to_string_lossy().as_ref()),
230        source,
231    }
232}
233
234/// Append-only circular replay log backed by a real file. Callers
235/// drive `append` on every host-side `publish_slot` and `replay_all`
236/// at cert-time to walk the captured slot stream.
237#[derive(Debug)]
238pub struct RingLog {
239    file: File,
240    path_repr: Arc<str>,
241    capacity: u64,
242    next_slot: u64,
243}
244
245impl RingLog {
246    /// Open a log at `path`, creating + preallocating one with
247    /// `capacity` records if no file exists yet.
248    ///
249    /// # Errors
250    ///
251    /// - [`ReplayLogError::ZeroCapacity`] if `capacity == 0`.
252    /// - [`ReplayLogError::CapacityOverflow`] if `capacity > u32::MAX`.
253    /// - [`ReplayLogError::Io`] on any syscall failure.
254    /// - [`ReplayLogError::HeaderMismatch`] when an existing file
255    ///   has the wrong magic or version.
256    pub fn open(path: impl AsRef<Path>, capacity: u64) -> Result<Self, ReplayLogError> {
257        if capacity == 0 {
258            return Err(ReplayLogError::ZeroCapacity);
259        }
260        validate_capacity(capacity)?;
261
262        let path = path.as_ref();
263        let path_repr: Arc<str> = Arc::from(path.to_string_lossy().as_ref());
264        let existed = path.exists();
265        let mut file = OpenOptions::new()
266            .create(true)
267            .truncate(false)
268            .read(true)
269            .write(true)
270            .open(path)
271            .map_err(|e| io_err("open", path, e))?;
272
273        if existed {
274            let mut magic = [0u8; 8];
275            file.read_exact(&mut magic)
276                .map_err(|e| io_err("read", path, e))?;
277            if &magic != LOG_MAGIC {
278                return Err(ReplayLogError::HeaderMismatch {
279                    path: Arc::clone(&path_repr),
280                });
281            }
282            let mut version_bytes = [0u8; 4];
283            file.read_exact(&mut version_bytes)
284                .map_err(|e| io_err("read", path, e))?;
285            if u32::from_le_bytes(version_bytes) != LOG_VERSION {
286                return Err(ReplayLogError::HeaderMismatch {
287                    path: Arc::clone(&path_repr),
288                });
289            }
290            let mut _flags = [0u8; 4];
291            file.read_exact(&mut _flags)
292                .map_err(|e| io_err("read", path, e))?;
293            let mut cap_bytes = [0u8; 8];
294            file.read_exact(&mut cap_bytes)
295                .map_err(|e| io_err("read", path, e))?;
296            let mut cursor_bytes = [0u8; 8];
297            file.read_exact(&mut cursor_bytes)
298                .map_err(|e| io_err("read", path, e))?;
299            let existing_cap = u64::from_le_bytes(cap_bytes);
300            validate_capacity(existing_cap)?;
301            let cursor = u64::from_le_bytes(cursor_bytes);
302            return Ok(Self {
303                file,
304                path_repr,
305                capacity: existing_cap,
306                next_slot: cursor % existing_cap,
307            });
308        }
309
310        // Fresh log: write the header + zero the body so every record
311        // magic starts at `0` (the uninitialised sentinel the replay
312        // scanner treats as EMPTY).
313        let total_bytes = log_file_len(capacity)?;
314        file.set_len(total_bytes)
315            .map_err(|e| io_err("set_len", path, e))?;
316        file.seek(SeekFrom::Start(0))
317            .map_err(|e| io_err("seek", path, e))?;
318        file.write_all(LOG_MAGIC)
319            .map_err(|e| io_err("write", path, e))?;
320        file.write_all(&LOG_VERSION.to_le_bytes())
321            .map_err(|e| io_err("write", path, e))?;
322        file.write_all(&0u32.to_le_bytes())
323            .map_err(|e| io_err("write", path, e))?; // flags
324        file.write_all(&capacity.to_le_bytes())
325            .map_err(|e| io_err("write", path, e))?;
326        file.write_all(&0u64.to_le_bytes())
327            .map_err(|e| io_err("write", path, e))?; // cursor
328
329        Ok(Self {
330            file,
331            path_repr,
332            capacity,
333            next_slot: 0,
334        })
335    }
336
337    /// Number of record slots in the log. Records past this capacity
338    /// wrap and overwrite the oldest entry.
339    #[must_use]
340    pub fn capacity(&self) -> u64 {
341        self.capacity
342    }
343
344    /// Current write cursor (next slot to be overwritten).
345    #[must_use]
346    pub fn cursor(&self) -> u64 {
347        self.next_slot
348    }
349
350    /// Path representation this log was opened against.
351    #[must_use]
352    pub fn path(&self) -> &str {
353        self.path_repr.as_ref()
354    }
355
356    /// Append a record. Overwrites the oldest slot when the log
357    /// wraps. The cursor is persisted to disk on every append so a
358    /// crash mid-session does not desynchronise the replay.
359    ///
360    /// # Errors
361    ///
362    /// Propagates [`ReplayLogError::Io`] on any file I/O failure.
363    pub fn append(&mut self, slot: RecordedSlot) -> Result<(), ReplayLogError> {
364        self.append_record(ReplayRecord {
365            slot,
366            failure: None,
367        })
368    }
369
370    /// Append a record with backend/runtime failure evidence.
371    ///
372    /// # Errors
373    ///
374    /// Propagates [`ReplayLogError::Io`] on any file I/O failure.
375    pub fn append_with_failure(
376        &mut self,
377        slot: RecordedSlot,
378        failure: ReplayFailureEvidence,
379    ) -> Result<(), ReplayLogError> {
380        self.append_record(ReplayRecord {
381            slot,
382            failure: Some(failure),
383        })
384    }
385
386    fn append_record(&mut self, record: ReplayRecord) -> Result<(), ReplayLogError> {
387        let record_offset = log_record_offset(self.next_slot)?;
388        self.file
389            .seek(SeekFrom::Start(record_offset))
390            .map_err(|e| self.io_err("seek", e))?;
391
392        let mut buf = [0u8; RECORD_BYTES as usize];
393        buf[0..4].copy_from_slice(&RECORD_MAGIC.to_le_bytes());
394        buf[4..12].copy_from_slice(&record.slot.timestamp_ns.to_le_bytes());
395        buf[12..16].copy_from_slice(&record.slot.slot_idx.to_le_bytes());
396        buf[16..20].copy_from_slice(&record.slot.tenant_id.to_le_bytes());
397        buf[20..24].copy_from_slice(&record.slot.opcode.to_le_bytes());
398        buf[24..28].copy_from_slice(&record.slot.args[0].to_le_bytes());
399        buf[28..32].copy_from_slice(&record.slot.args[1].to_le_bytes());
400        buf[32..36].copy_from_slice(&record.slot.args[2].to_le_bytes());
401        buf[36..40].copy_from_slice(&record.slot.args[3].to_le_bytes());
402        buf[40..44].copy_from_slice(&record.slot.epoch.to_le_bytes());
403        if let Some(failure) = record.failure {
404            buf[44..48].copy_from_slice(&failure.slot_status.to_le_bytes());
405            buf[48..52].copy_from_slice(&failure.failure_class.encode().to_le_bytes());
406            buf[52..56].copy_from_slice(&failure.backend_error_code.to_le_bytes());
407            buf[56..64].copy_from_slice(&failure.output_digest.to_le_bytes());
408        }
409        self.file
410            .write_all(&buf)
411            .map_err(|e| self.io_err("write", e))?;
412
413        // Persist the advanced cursor. Readers that mmap the log see
414        // this value and use it to know how far to scan.
415        self.next_slot = (self.next_slot + 1) % self.capacity;
416        self.file
417            .seek(SeekFrom::Start(24)) // header cursor offset
418            .map_err(|e| self.io_err("seek", e))?;
419        self.file
420            .write_all(&self.next_slot.to_le_bytes())
421            .map_err(|e| self.io_err("write", e))?;
422
423        Ok(())
424    }
425
426    /// Walk the log in publish order starting at the record
427    /// immediately after the current cursor (oldest still-live
428    /// record). Stops at the first record whose magic differs from
429    /// the crate-private `RECORD_MAGIC` sentinel  -  meaning the log
430    /// is still before wraparound at that position  -  unless every record
431    /// has been written.
432    ///
433    /// # Errors
434    ///
435    /// Propagates [`ReplayLogError::Io`] on read failure.
436    pub fn replay_all(&mut self) -> Result<Vec<RecordedSlot>, ReplayLogError> {
437        Ok(self
438            .replay_records()?
439            .into_iter()
440            .map(|record| record.slot)
441            .collect())
442    }
443
444    /// Walk the log in publish order and return full records, including
445    /// optional failure evidence.
446    ///
447    /// # Errors
448    ///
449    /// Propagates [`ReplayLogError::Io`] on read failure.
450    pub fn replay_records(&mut self) -> Result<Vec<ReplayRecord>, ReplayLogError> {
451        let capacity =
452            usize::try_from(self.capacity).map_err(|_| ReplayLogError::CapacityOverflow {
453                count: self.capacity,
454                max: MAX_REPLAY_RECORDS,
455            })?;
456        let mut out = Vec::with_capacity(capacity);
457        for step in 0..self.capacity {
458            let slot_index = (self.next_slot + step) % self.capacity;
459            let offset = log_record_offset(slot_index)?;
460            self.file
461                .seek(SeekFrom::Start(offset))
462                .map_err(|e| self.io_err("seek", e))?;
463            let mut buf = [0u8; RECORD_BYTES as usize];
464            self.file
465                .read_exact(&mut buf)
466                .map_err(|e| self.io_err("read", e))?;
467            let magic = read_u32(&buf, 0);
468            if magic == 0 {
469                // Untouched record  -  log has not wrapped past this slot yet.
470                continue;
471            }
472            if magic != RECORD_MAGIC {
473                return Err(ReplayLogError::HeaderMismatch {
474                    path: self.path_repr.clone(),
475                });
476            }
477            let slot = RecordedSlot {
478                timestamp_ns: read_u64(&buf, 4),
479                slot_idx: read_u32(&buf, 12),
480                tenant_id: read_u32(&buf, 16),
481                opcode: read_u32(&buf, 20),
482                args: [
483                    read_u32(&buf, 24),
484                    read_u32(&buf, 28),
485                    read_u32(&buf, 32),
486                    read_u32(&buf, 36),
487                ],
488                epoch: read_u32(&buf, 40),
489            };
490            out.push(ReplayRecord {
491                slot,
492                failure: ReplayFailureEvidence::from_words(
493                    read_u32(&buf, 44),
494                    read_u32(&buf, 48),
495                    read_u32(&buf, 52),
496                    read_u64(&buf, 56),
497                ),
498            });
499        }
500        Ok(out)
501    }
502
503    /// Flush + sync the file to durable storage. Callers invoke this
504    /// when they want the log guaranteed on disk  -  the hot-path
505    /// `append` does not fsync per-record.
506    ///
507    /// # Errors
508    ///
509    /// Propagates [`ReplayLogError::Io`] on fsync failure.
510    pub fn sync(&mut self) -> Result<(), ReplayLogError> {
511        self.file.sync_all().map_err(|e| self.io_err("sync", e))?;
512        Ok(())
513    }
514
515    fn io_err(&self, op: &'static str, source: std::io::Error) -> ReplayLogError {
516        ReplayLogError::Io {
517            op,
518            path: self.path_repr.clone(),
519            source,
520        }
521    }
522}
523
524fn validate_capacity(capacity: u64) -> Result<(), ReplayLogError> {
525    if capacity == 0 {
526        return Err(ReplayLogError::ZeroCapacity);
527    }
528    if capacity > MAX_REPLAY_RECORDS {
529        return Err(ReplayLogError::CapacityOverflow {
530            count: capacity,
531            max: MAX_REPLAY_RECORDS,
532        });
533    }
534    Ok(())
535}
536
537fn log_file_len(capacity: u64) -> Result<u64, ReplayLogError> {
538    log_record_position(capacity)
539}
540
541fn log_record_offset(slot_index: u64) -> Result<u64, ReplayLogError> {
542    log_record_position(slot_index)
543}
544
545fn log_record_position(record_index: u64) -> Result<u64, ReplayLogError> {
546    let record_bytes =
547        vyre_driver::accounting::checked_mul_u64_lazy(record_index, RECORD_BYTES, || {
548            replay_capacity_overflow(record_index)
549        })?;
550    vyre_driver::accounting::checked_add_u64_lazy(HEADER_BYTES, record_bytes, || {
551        replay_capacity_overflow(record_index)
552    })
553}
554
555fn replay_capacity_overflow(count: u64) -> ReplayLogError {
556    ReplayLogError::CapacityOverflow {
557        count,
558        max: MAX_REPLAY_RECORDS,
559    }
560}
561
562fn read_u32(buf: &[u8], offset: usize) -> u32 {
563    let mut bytes = [0u8; 4];
564    bytes.copy_from_slice(&buf[offset..offset + 4]);
565    u32::from_le_bytes(bytes)
566}
567
568fn read_u64(buf: &[u8], offset: usize) -> u64 {
569    let mut bytes = [0u8; 8];
570    bytes.copy_from_slice(&buf[offset..offset + 8]);
571    u64::from_le_bytes(bytes)
572}
573
574fn output_digest(bytes: &[u8]) -> u64 {
575    let digest = blake3::hash(bytes);
576    let mut out = [0u8; 8];
577    out.copy_from_slice(&digest.as_bytes()[..8]);
578    u64::from_le_bytes(out)
579}
580
581/// Let callers bridge ReplayLogError into the unified PipelineError
582/// surface when driving the log from the megakernel pump loop.
583impl From<ReplayLogError> for PipelineError {
584    fn from(err: ReplayLogError) -> Self {
585        PipelineError::Backend(err.to_string())
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    fn rec(slot_idx: u32, epoch: u32) -> RecordedSlot {
594        RecordedSlot {
595            timestamp_ns: 1_000_000 + slot_idx as u64,
596            slot_idx,
597            tenant_id: 0,
598            opcode: 0x4000_0000 + slot_idx,
599            args: [slot_idx, slot_idx * 2, slot_idx * 3, slot_idx * 4],
600            epoch,
601        }
602    }
603
604    #[test]
605    fn open_rejects_zero_capacity() {
606        let dir = tempfile::tempdir().unwrap();
607        let path = dir.path().join("log.vrrl");
608        let err = RingLog::open(&path, 0).expect_err("zero capacity must reject");
609        assert!(matches!(err, ReplayLogError::ZeroCapacity));
610    }
611
612    #[test]
613    fn append_and_replay_round_trip() {
614        let dir = tempfile::tempdir().unwrap();
615        let path = dir.path().join("log.vrrl");
616        let mut log = RingLog::open(&path, 4)
617            .expect("Fix: open fresh log; restore this invariant before continuing.");
618        log.append(rec(1, 10)).unwrap();
619        log.append(rec(2, 11)).unwrap();
620        log.sync().unwrap();
621
622        let replay = log
623            .replay_all()
624            .expect("Fix: replay; restore this invariant before continuing.");
625        assert_eq!(replay.len(), 2);
626        assert_eq!(replay[0].slot_idx, 1);
627        assert_eq!(replay[0].epoch, 10);
628        assert_eq!(replay[1].slot_idx, 2);
629        assert_eq!(replay[1].epoch, 11);
630    }
631
632    #[test]
633    fn append_with_failure_round_trips_reproduction_evidence() {
634        let dir = tempfile::tempdir().unwrap();
635        let path = dir.path().join("log.vrrl");
636        let mut log = RingLog::open(&path, 4)
637            .expect("Fix: open fresh log; restore this invariant before continuing.");
638        let backend_error = BackendError::DispatchFailed {
639            code: Some(17),
640            message: "DeviceLost after queue submit".to_string(),
641        };
642        let failure =
643            ReplayFailureEvidence::from_backend_error(3, &backend_error, b"partial-output");
644
645        assert_eq!(failure.failure_class, ReplayFailureClass::DeviceLoss);
646        assert_eq!(failure.backend_error_code, backend_error.code().stable_id());
647        assert_ne!(failure.output_digest, 0);
648
649        log.append_with_failure(rec(7, 44), failure).unwrap();
650        log.sync().unwrap();
651
652        let replay = log
653            .replay_records()
654            .expect("Fix: replay records; restore this invariant before continuing.");
655        assert_eq!(replay.len(), 1);
656        assert_eq!(replay[0].slot.slot_idx, 7);
657        assert_eq!(replay[0].slot.epoch, 44);
658        assert_eq!(replay[0].failure, Some(failure));
659    }
660
661    #[test]
662    fn append_without_failure_has_no_failure_evidence() {
663        let dir = tempfile::tempdir().unwrap();
664        let path = dir.path().join("log.vrrl");
665        let mut log = RingLog::open(&path, 2)
666            .expect("Fix: open fresh log; restore this invariant before continuing.");
667
668        log.append(rec(1, 10)).unwrap();
669
670        let replay = log
671            .replay_records()
672            .expect("Fix: replay records; restore this invariant before continuing.");
673        assert_eq!(replay.len(), 1);
674        assert_eq!(replay[0].slot.slot_idx, 1);
675        assert_eq!(replay[0].failure, None);
676    }
677
678    #[test]
679    fn log_rollover_preserves_most_recent() {
680        let dir = tempfile::tempdir().unwrap();
681        let path = dir.path().join("log.vrrl");
682        let mut log =
683            RingLog::open(&path, 3).expect("Fix: open; restore this invariant before continuing.");
684        for i in 0..5 {
685            log.append(rec(i, 100 + i)).unwrap();
686        }
687        let replay = log
688            .replay_all()
689            .expect("Fix: replay; restore this invariant before continuing.");
690        assert_eq!(replay.len(), 3, "capacity=3 must retain exactly 3 records");
691        let slot_ids: Vec<u32> = replay.iter().map(|r| r.slot_idx).collect();
692        // Publish order: 0, 1, 2, 3, 4. After 2 wraps, live records
693        // are [3, 4, 2] in ring-physical order; replay starts at
694        // next_slot = 2 so the visible order is [2, 3, 4].
695        assert_eq!(slot_ids, vec![2, 3, 4]);
696    }
697
698    #[test]
699    fn reopen_restores_cursor() {
700        let dir = tempfile::tempdir().unwrap();
701        let path = dir.path().join("log.vrrl");
702        {
703            let mut log = RingLog::open(&path, 4)
704                .expect("Fix: open fresh; restore this invariant before continuing.");
705            log.append(rec(1, 10)).unwrap();
706            log.append(rec(2, 11)).unwrap();
707            log.sync().unwrap();
708        }
709        let mut reopened = RingLog::open(&path, 4)
710            .expect("Fix: reopen; restore this invariant before continuing.");
711        assert_eq!(reopened.cursor(), 2);
712        let replay = reopened.replay_all().unwrap();
713        assert_eq!(replay.len(), 2);
714    }
715
716    #[test]
717    fn corrupted_magic_rejected() {
718        use std::io::Write as _;
719
720        let dir = tempfile::tempdir().unwrap();
721        let path = dir.path().join("log.vrrl");
722        {
723            // Create a "log" file with the wrong magic.
724            let mut f = std::fs::File::create(&path).unwrap();
725            f.write_all(b"XXXX0001").unwrap();
726            f.write_all(&1u32.to_le_bytes()).unwrap();
727            f.write_all(&0u32.to_le_bytes()).unwrap();
728            f.write_all(&4u64.to_le_bytes()).unwrap();
729            f.write_all(&0u64.to_le_bytes()).unwrap();
730            // Ensure enough bytes for the subsequent reads in open() (headers ≥ 32 B).
731            f.set_len(HEADER_BYTES + 4 * RECORD_BYTES).unwrap();
732        }
733        let err = RingLog::open(&path, 4).expect_err("wrong magic must reject");
734        assert!(matches!(err, ReplayLogError::HeaderMismatch { .. }));
735    }
736
737    fn write_header(path: &Path, capacity: u64, cursor: u64) {
738        use std::io::Write as _;
739
740        let mut f = std::fs::File::create(path).unwrap();
741        f.write_all(LOG_MAGIC).unwrap();
742        f.write_all(&LOG_VERSION.to_le_bytes()).unwrap();
743        f.write_all(&0u32.to_le_bytes()).unwrap();
744        f.write_all(&capacity.to_le_bytes()).unwrap();
745        f.write_all(&cursor.to_le_bytes()).unwrap();
746    }
747
748    #[test]
749    fn existing_log_zero_capacity_rejected_before_cursor_modulo() {
750        let dir = tempfile::tempdir().unwrap();
751        let path = dir.path().join("log.vrrl");
752        write_header(&path, 0, 0);
753
754        let err = RingLog::open(&path, 4).expect_err("header capacity=0 must reject");
755        assert!(matches!(err, ReplayLogError::ZeroCapacity));
756    }
757
758    #[test]
759    fn existing_log_huge_capacity_rejected_before_replay_allocation() {
760        let dir = tempfile::tempdir().unwrap();
761        let path = dir.path().join("log.vrrl");
762        write_header(&path, MAX_REPLAY_RECORDS + 1, 0);
763
764        let err = RingLog::open(&path, 4).expect_err("huge header capacity must reject");
765        assert!(matches!(
766            err,
767            ReplayLogError::CapacityOverflow {
768                count,
769                max: MAX_REPLAY_RECORDS
770            } if count == MAX_REPLAY_RECORDS + 1
771        ));
772    }
773
774    #[test]
775    fn capacity_overflow_rejected() {
776        let dir = tempfile::tempdir().unwrap();
777        let path = dir.path().join("log.vrrl");
778        let err = RingLog::open(&path, MAX_REPLAY_RECORDS + 1)
779            .expect_err("over-size capacity must reject");
780        assert!(matches!(
781            err,
782            ReplayLogError::CapacityOverflow {
783                count,
784                max: MAX_REPLAY_RECORDS
785            } if count == MAX_REPLAY_RECORDS + 1
786        ));
787    }
788}