Skip to main content

vyre_runtime/
replay.rs

1//! Differential megakernel replay log.
2//!
3//! Every slot the host publishes into the megakernel ring is also
4//! appended to a circular log on disk. A later replay run can feed
5//! the log into a fresh megakernel + backend pair and diff the
6//! epoch-by-epoch observable stream against the original. This
7//! catches schedule-dependent bugs  -  GPU nondeterminism, atomic
8//! ordering hazards, cache-line races  -  that unit tests cannot hit
9//! by construction.
10//!
11//! ## Layout
12//!
13//! ```text
14//! header (32 bytes, aligned to 4 KiB):
15//!     magic:          b"VRRL0001"        (8 bytes)    -  "Vyre Ring-Replay Log"
16//!     version:        u32 = 1            (4 bytes)
17//!     flags:          u32 = 0            (4 bytes)
18//!     capacity:       u64                (8 bytes)    -  total record slots
19//!     next_slot:      u64                (8 bytes)    -  write cursor (mod capacity)
20//! records:                                          (capacity × RECORD_BYTES)
21//!     magic:          u32 = 0xDEADBEEF  (4 bytes)   -  sync marker for forward scan
22//!     timestamp_ns:   u64                (8 bytes)
23//!     slot_idx:       u32                (4 bytes)
24//!     tenant_id:      u32                (4 bytes)
25//!     opcode:         u32                (4 bytes)
26//!     args:           [u32; 4]           (16 bytes)
27//!     epoch:          u32                (4 bytes)   -  observed at publish time
28//!     reserved:       u32                (4 bytes)   -  future use; zero for v1
29//! ```
30//!
31//! Record size = 52 bytes ≤ 64. Aligning to 64 by padding the reserved
32//! tail keeps records cache-line aligned so a consumer can `mmap` the
33//! log and read records without tearing.
34//!
35//! ## Rollover
36//!
37//! The log is a fixed-capacity ring. `next_slot = (next_slot + 1) %
38//! capacity`; a replay iterates from `next_slot` through all records
39//! that have a live magic word. Records that predate the first wrap
40//! are overwritten in publish order.
41
42use std::fs::{File, OpenOptions};
43use std::io::{Read, Seek, SeekFrom, Write};
44use std::path::Path;
45use std::sync::Arc;
46
47use crate::PipelineError;
48
49const LOG_MAGIC: &[u8; 8] = b"VRRL0001";
50const LOG_VERSION: u32 = 1;
51const RECORD_MAGIC: u32 = 0xDEAD_BEEF;
52const RECORD_BYTES: u64 = 64;
53const HEADER_BYTES: u64 = 32;
54const MAX_REPLAY_RECORDS: u64 = 1_048_576;
55
56/// One published ring slot as captured by the replay log.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct RecordedSlot {
59    /// Host wall-clock timestamp, nanoseconds since UNIX epoch.
60    pub timestamp_ns: u64,
61    /// Ring slot index the host published into.
62    pub slot_idx: u32,
63    /// Tenant id from the slot's TENANT_WORD.
64    pub tenant_id: u32,
65    /// Opcode from the slot's OPCODE_WORD.
66    pub opcode: u32,
67    /// First four argument words (the rest of the 13-word arg space
68    /// lives in a packed-slot extension and is captured separately).
69    pub args: [u32; 4],
70    /// Megakernel EPOCH word observed at publish time. A replay run
71    /// on the same backend must reach the same epoch in the same
72    /// order  -  divergence is the load-bearing signal.
73    pub epoch: u32,
74}
75
76/// Errors surfaced by the replay-log surface. Every variant carries
77/// an actionable `Fix:` hint.
78#[derive(Debug, thiserror::Error)]
79#[non_exhaustive]
80pub enum ReplayLogError {
81    /// I/O syscall on the log file failed.
82    #[error("replay log {op} on `{path}` failed: {source}. Fix: check disk space + permissions.")]
83    Io {
84        /// Syscall name (`open`, `seek`, `read`, `write`).
85        op: &'static str,
86        /// Path the syscall was issued against.
87        path: Arc<str>,
88        /// Underlying io::Error.
89        #[source]
90        source: std::io::Error,
91    },
92    /// Log header magic or version mismatch.
93    #[error("replay log `{path}` header mismatch. Fix: regenerate the log; VRRL format may have changed.")]
94    HeaderMismatch {
95        /// Log path.
96        path: Arc<str>,
97    },
98    /// Capacity of `0` is rejected  -  a zero-capacity log never accepts writes.
99    #[error("replay log capacity must be > 0. Fix: construct with at least one slot.")]
100    ZeroCapacity,
101    /// Record capacity exceeds the replay-log bound. Capping here
102    /// prevents malformed log headers from forcing host OOM during
103    /// replay and keeps record offsets within checked arithmetic.
104    #[error("replay log capacity {count} exceeds max {max}. Fix: shard replay into smaller logs.")]
105    CapacityOverflow {
106        /// Requested capacity.
107        count: u64,
108        /// Maximum accepted capacity.
109        max: u64,
110    },
111}
112
113fn io_err(op: &'static str, path: &Path, source: std::io::Error) -> ReplayLogError {
114    ReplayLogError::Io {
115        op,
116        path: Arc::from(path.to_string_lossy().as_ref()),
117        source,
118    }
119}
120
121/// Append-only circular replay log backed by a real file. Callers
122/// drive `append` on every host-side `publish_slot` and `replay_all`
123/// at cert-time to walk the captured slot stream.
124#[derive(Debug)]
125pub struct RingLog {
126    file: File,
127    path_repr: Arc<str>,
128    capacity: u64,
129    next_slot: u64,
130}
131
132impl RingLog {
133    /// Open a log at `path`, creating + preallocating one with
134    /// `capacity` records if no file exists yet.
135    ///
136    /// # Errors
137    ///
138    /// - [`ReplayLogError::ZeroCapacity`] if `capacity == 0`.
139    /// - [`ReplayLogError::CapacityOverflow`] if `capacity > u32::MAX`.
140    /// - [`ReplayLogError::Io`] on any syscall failure.
141    /// - [`ReplayLogError::HeaderMismatch`] when an existing file
142    ///   has the wrong magic or version.
143    pub fn open(path: impl AsRef<Path>, capacity: u64) -> Result<Self, ReplayLogError> {
144        if capacity == 0 {
145            return Err(ReplayLogError::ZeroCapacity);
146        }
147        validate_capacity(capacity)?;
148
149        let path = path.as_ref();
150        let path_repr: Arc<str> = Arc::from(path.to_string_lossy().as_ref());
151        let existed = path.exists();
152        let mut file = OpenOptions::new()
153            .create(true)
154            .truncate(false)
155            .read(true)
156            .write(true)
157            .open(path)
158            .map_err(|e| io_err("open", path, e))?;
159
160        if existed {
161            let mut magic = [0u8; 8];
162            file.read_exact(&mut magic)
163                .map_err(|e| io_err("read", path, e))?;
164            if &magic != LOG_MAGIC {
165                return Err(ReplayLogError::HeaderMismatch {
166                    path: Arc::clone(&path_repr),
167                });
168            }
169            let mut version_bytes = [0u8; 4];
170            file.read_exact(&mut version_bytes)
171                .map_err(|e| io_err("read", path, e))?;
172            if u32::from_le_bytes(version_bytes) != LOG_VERSION {
173                return Err(ReplayLogError::HeaderMismatch {
174                    path: Arc::clone(&path_repr),
175                });
176            }
177            let mut _flags = [0u8; 4];
178            file.read_exact(&mut _flags)
179                .map_err(|e| io_err("read", path, e))?;
180            let mut cap_bytes = [0u8; 8];
181            file.read_exact(&mut cap_bytes)
182                .map_err(|e| io_err("read", path, e))?;
183            let mut cursor_bytes = [0u8; 8];
184            file.read_exact(&mut cursor_bytes)
185                .map_err(|e| io_err("read", path, e))?;
186            let existing_cap = u64::from_le_bytes(cap_bytes);
187            validate_capacity(existing_cap)?;
188            let cursor = u64::from_le_bytes(cursor_bytes);
189            return Ok(Self {
190                file,
191                path_repr,
192                capacity: existing_cap,
193                next_slot: cursor % existing_cap,
194            });
195        }
196
197        // Fresh log: write the header + zero the body so every record
198        // magic starts at `0` (the uninitialised sentinel the replay
199        // scanner treats as EMPTY).
200        let total_bytes = log_file_len(capacity)?;
201        file.set_len(total_bytes)
202            .map_err(|e| io_err("set_len", path, e))?;
203        file.seek(SeekFrom::Start(0))
204            .map_err(|e| io_err("seek", path, e))?;
205        file.write_all(LOG_MAGIC)
206            .map_err(|e| io_err("write", path, e))?;
207        file.write_all(&LOG_VERSION.to_le_bytes())
208            .map_err(|e| io_err("write", path, e))?;
209        file.write_all(&0u32.to_le_bytes())
210            .map_err(|e| io_err("write", path, e))?; // flags
211        file.write_all(&capacity.to_le_bytes())
212            .map_err(|e| io_err("write", path, e))?;
213        file.write_all(&0u64.to_le_bytes())
214            .map_err(|e| io_err("write", path, e))?; // cursor
215
216        Ok(Self {
217            file,
218            path_repr,
219            capacity,
220            next_slot: 0,
221        })
222    }
223
224    /// Number of record slots in the log. Records past this capacity
225    /// wrap and overwrite the oldest entry.
226    #[must_use]
227    pub fn capacity(&self) -> u64 {
228        self.capacity
229    }
230
231    /// Current write cursor (next slot to be overwritten).
232    #[must_use]
233    pub fn cursor(&self) -> u64 {
234        self.next_slot
235    }
236
237    /// Path representation this log was opened against.
238    #[must_use]
239    pub fn path(&self) -> &str {
240        self.path_repr.as_ref()
241    }
242
243    /// Append a record. Overwrites the oldest slot when the log
244    /// wraps. The cursor is persisted to disk on every append so a
245    /// crash mid-session does not desynchronise the replay.
246    ///
247    /// # Errors
248    ///
249    /// Propagates [`ReplayLogError::Io`] on any file I/O failure.
250    pub fn append(&mut self, slot: RecordedSlot) -> Result<(), ReplayLogError> {
251        let record_offset = log_record_offset(self.next_slot)?;
252        self.file
253            .seek(SeekFrom::Start(record_offset))
254            .map_err(|e| self.io_err("seek", e))?;
255
256        let mut buf = [0u8; RECORD_BYTES as usize];
257        buf[0..4].copy_from_slice(&RECORD_MAGIC.to_le_bytes());
258        buf[4..12].copy_from_slice(&slot.timestamp_ns.to_le_bytes());
259        buf[12..16].copy_from_slice(&slot.slot_idx.to_le_bytes());
260        buf[16..20].copy_from_slice(&slot.tenant_id.to_le_bytes());
261        buf[20..24].copy_from_slice(&slot.opcode.to_le_bytes());
262        buf[24..28].copy_from_slice(&slot.args[0].to_le_bytes());
263        buf[28..32].copy_from_slice(&slot.args[1].to_le_bytes());
264        buf[32..36].copy_from_slice(&slot.args[2].to_le_bytes());
265        buf[36..40].copy_from_slice(&slot.args[3].to_le_bytes());
266        buf[40..44].copy_from_slice(&slot.epoch.to_le_bytes());
267        // bytes 44..64 reserved  -  explicitly zeroed by the buf init.
268        self.file
269            .write_all(&buf)
270            .map_err(|e| self.io_err("write", e))?;
271
272        // Persist the advanced cursor. Readers that mmap the log see
273        // this value and use it to know how far to scan.
274        self.next_slot = (self.next_slot + 1) % self.capacity;
275        self.file
276            .seek(SeekFrom::Start(24)) // header cursor offset
277            .map_err(|e| self.io_err("seek", e))?;
278        self.file
279            .write_all(&self.next_slot.to_le_bytes())
280            .map_err(|e| self.io_err("write", e))?;
281
282        Ok(())
283    }
284
285    /// Walk the log in publish order starting at the record
286    /// immediately after the current cursor (oldest still-live
287    /// record). Stops at the first record whose magic differs from
288    /// the crate-private `RECORD_MAGIC` sentinel  -  meaning the log
289    /// is still before wraparound at that position  -  unless every record
290    /// has been written.
291    ///
292    /// # Errors
293    ///
294    /// Propagates [`ReplayLogError::Io`] on read failure.
295    pub fn replay_all(&mut self) -> Result<Vec<RecordedSlot>, ReplayLogError> {
296        let capacity =
297            usize::try_from(self.capacity).map_err(|_| ReplayLogError::CapacityOverflow {
298                count: self.capacity,
299                max: MAX_REPLAY_RECORDS,
300            })?;
301        let mut out = Vec::with_capacity(capacity);
302        for step in 0..self.capacity {
303            let slot_index = (self.next_slot + step) % self.capacity;
304            let offset = log_record_offset(slot_index)?;
305            self.file
306                .seek(SeekFrom::Start(offset))
307                .map_err(|e| self.io_err("seek", e))?;
308            let mut buf = [0u8; RECORD_BYTES as usize];
309            self.file
310                .read_exact(&mut buf)
311                .map_err(|e| self.io_err("read", e))?;
312            let magic = read_u32(&buf, 0);
313            if magic == 0 {
314                // Untouched record  -  log has not wrapped past this slot yet.
315                continue;
316            }
317            if magic != RECORD_MAGIC {
318                return Err(ReplayLogError::HeaderMismatch {
319                    path: self.path_repr.clone(),
320                });
321            }
322            out.push(RecordedSlot {
323                timestamp_ns: read_u64(&buf, 4),
324                slot_idx: read_u32(&buf, 12),
325                tenant_id: read_u32(&buf, 16),
326                opcode: read_u32(&buf, 20),
327                args: [
328                    read_u32(&buf, 24),
329                    read_u32(&buf, 28),
330                    read_u32(&buf, 32),
331                    read_u32(&buf, 36),
332                ],
333                epoch: read_u32(&buf, 40),
334            });
335        }
336        Ok(out)
337    }
338
339    /// Flush + sync the file to durable storage. Callers invoke this
340    /// when they want the log guaranteed on disk  -  the hot-path
341    /// `append` does not fsync per-record.
342    ///
343    /// # Errors
344    ///
345    /// Propagates [`ReplayLogError::Io`] on fsync failure.
346    pub fn sync(&mut self) -> Result<(), ReplayLogError> {
347        self.file.sync_all().map_err(|e| self.io_err("sync", e))?;
348        Ok(())
349    }
350
351    fn io_err(&self, op: &'static str, source: std::io::Error) -> ReplayLogError {
352        ReplayLogError::Io {
353            op,
354            path: self.path_repr.clone(),
355            source,
356        }
357    }
358}
359
360fn validate_capacity(capacity: u64) -> Result<(), ReplayLogError> {
361    if capacity == 0 {
362        return Err(ReplayLogError::ZeroCapacity);
363    }
364    if capacity > MAX_REPLAY_RECORDS {
365        return Err(ReplayLogError::CapacityOverflow {
366            count: capacity,
367            max: MAX_REPLAY_RECORDS,
368        });
369    }
370    Ok(())
371}
372
373fn log_file_len(capacity: u64) -> Result<u64, ReplayLogError> {
374    log_record_position(capacity)
375}
376
377fn log_record_offset(slot_index: u64) -> Result<u64, ReplayLogError> {
378    log_record_position(slot_index)
379}
380
381fn log_record_position(record_index: u64) -> Result<u64, ReplayLogError> {
382    let record_bytes =
383        vyre_driver::accounting::checked_mul_u64_lazy(record_index, RECORD_BYTES, || {
384            replay_capacity_overflow(record_index)
385        })?;
386    vyre_driver::accounting::checked_add_u64_lazy(HEADER_BYTES, record_bytes, || {
387        replay_capacity_overflow(record_index)
388    })
389}
390
391fn replay_capacity_overflow(count: u64) -> ReplayLogError {
392    ReplayLogError::CapacityOverflow {
393        count,
394        max: MAX_REPLAY_RECORDS,
395    }
396}
397
398fn read_u32(buf: &[u8], offset: usize) -> u32 {
399    let mut bytes = [0u8; 4];
400    bytes.copy_from_slice(&buf[offset..offset + 4]);
401    u32::from_le_bytes(bytes)
402}
403
404fn read_u64(buf: &[u8], offset: usize) -> u64 {
405    let mut bytes = [0u8; 8];
406    bytes.copy_from_slice(&buf[offset..offset + 8]);
407    u64::from_le_bytes(bytes)
408}
409
410/// Let callers bridge ReplayLogError into the unified PipelineError
411/// surface when driving the log from the megakernel pump loop.
412impl From<ReplayLogError> for PipelineError {
413    fn from(err: ReplayLogError) -> Self {
414        PipelineError::Backend(err.to_string())
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    fn rec(slot_idx: u32, epoch: u32) -> RecordedSlot {
423        RecordedSlot {
424            timestamp_ns: 1_000_000 + slot_idx as u64,
425            slot_idx,
426            tenant_id: 0,
427            opcode: 0x4000_0000 + slot_idx,
428            args: [slot_idx, slot_idx * 2, slot_idx * 3, slot_idx * 4],
429            epoch,
430        }
431    }
432
433    #[test]
434    fn open_rejects_zero_capacity() {
435        let dir = tempfile::tempdir().unwrap();
436        let path = dir.path().join("log.vrrl");
437        let err = RingLog::open(&path, 0).expect_err("zero capacity must reject");
438        assert!(matches!(err, ReplayLogError::ZeroCapacity));
439    }
440
441    #[test]
442    fn append_and_replay_round_trip() {
443        let dir = tempfile::tempdir().unwrap();
444        let path = dir.path().join("log.vrrl");
445        let mut log = RingLog::open(&path, 4)
446            .expect("Fix: open fresh log; restore this invariant before continuing.");
447        log.append(rec(1, 10)).unwrap();
448        log.append(rec(2, 11)).unwrap();
449        log.sync().unwrap();
450
451        let replay = log
452            .replay_all()
453            .expect("Fix: replay; restore this invariant before continuing.");
454        assert_eq!(replay.len(), 2);
455        assert_eq!(replay[0].slot_idx, 1);
456        assert_eq!(replay[0].epoch, 10);
457        assert_eq!(replay[1].slot_idx, 2);
458        assert_eq!(replay[1].epoch, 11);
459    }
460
461    #[test]
462    fn log_rollover_preserves_most_recent() {
463        let dir = tempfile::tempdir().unwrap();
464        let path = dir.path().join("log.vrrl");
465        let mut log =
466            RingLog::open(&path, 3).expect("Fix: open; restore this invariant before continuing.");
467        for i in 0..5 {
468            log.append(rec(i, 100 + i)).unwrap();
469        }
470        let replay = log
471            .replay_all()
472            .expect("Fix: replay; restore this invariant before continuing.");
473        assert_eq!(replay.len(), 3, "capacity=3 must retain exactly 3 records");
474        let slot_ids: Vec<u32> = replay.iter().map(|r| r.slot_idx).collect();
475        // Publish order: 0, 1, 2, 3, 4. After 2 wraps, live records
476        // are [3, 4, 2] in ring-physical order; replay starts at
477        // next_slot = 2 so the visible order is [2, 3, 4].
478        assert_eq!(slot_ids, vec![2, 3, 4]);
479    }
480
481    #[test]
482    fn reopen_restores_cursor() {
483        let dir = tempfile::tempdir().unwrap();
484        let path = dir.path().join("log.vrrl");
485        {
486            let mut log = RingLog::open(&path, 4)
487                .expect("Fix: open fresh; restore this invariant before continuing.");
488            log.append(rec(1, 10)).unwrap();
489            log.append(rec(2, 11)).unwrap();
490            log.sync().unwrap();
491        }
492        let mut reopened = RingLog::open(&path, 4)
493            .expect("Fix: reopen; restore this invariant before continuing.");
494        assert_eq!(reopened.cursor(), 2);
495        let replay = reopened.replay_all().unwrap();
496        assert_eq!(replay.len(), 2);
497    }
498
499    #[test]
500    fn corrupted_magic_rejected() {
501        use std::io::Write as _;
502
503        let dir = tempfile::tempdir().unwrap();
504        let path = dir.path().join("log.vrrl");
505        {
506            // Create a "log" file with the wrong magic.
507            let mut f = std::fs::File::create(&path).unwrap();
508            f.write_all(b"XXXX0001").unwrap();
509            f.write_all(&1u32.to_le_bytes()).unwrap();
510            f.write_all(&0u32.to_le_bytes()).unwrap();
511            f.write_all(&4u64.to_le_bytes()).unwrap();
512            f.write_all(&0u64.to_le_bytes()).unwrap();
513            // Ensure enough bytes for the subsequent reads in open() (headers ≥ 32 B).
514            f.set_len(HEADER_BYTES + 4 * RECORD_BYTES).unwrap();
515        }
516        let err = RingLog::open(&path, 4).expect_err("wrong magic must reject");
517        assert!(matches!(err, ReplayLogError::HeaderMismatch { .. }));
518    }
519
520    fn write_header(path: &Path, capacity: u64, cursor: u64) {
521        use std::io::Write as _;
522
523        let mut f = std::fs::File::create(path).unwrap();
524        f.write_all(LOG_MAGIC).unwrap();
525        f.write_all(&LOG_VERSION.to_le_bytes()).unwrap();
526        f.write_all(&0u32.to_le_bytes()).unwrap();
527        f.write_all(&capacity.to_le_bytes()).unwrap();
528        f.write_all(&cursor.to_le_bytes()).unwrap();
529    }
530
531    #[test]
532    fn existing_log_zero_capacity_rejected_before_cursor_modulo() {
533        let dir = tempfile::tempdir().unwrap();
534        let path = dir.path().join("log.vrrl");
535        write_header(&path, 0, 0);
536
537        let err = RingLog::open(&path, 4).expect_err("header capacity=0 must reject");
538        assert!(matches!(err, ReplayLogError::ZeroCapacity));
539    }
540
541    #[test]
542    fn existing_log_huge_capacity_rejected_before_replay_allocation() {
543        let dir = tempfile::tempdir().unwrap();
544        let path = dir.path().join("log.vrrl");
545        write_header(&path, MAX_REPLAY_RECORDS + 1, 0);
546
547        let err = RingLog::open(&path, 4).expect_err("huge header capacity must reject");
548        assert!(matches!(
549            err,
550            ReplayLogError::CapacityOverflow {
551                count,
552                max: MAX_REPLAY_RECORDS
553            } if count == MAX_REPLAY_RECORDS + 1
554        ));
555    }
556
557    #[test]
558    fn capacity_overflow_rejected() {
559        let dir = tempfile::tempdir().unwrap();
560        let path = dir.path().join("log.vrrl");
561        let err = RingLog::open(&path, MAX_REPLAY_RECORDS + 1)
562            .expect_err("over-size capacity must reject");
563        assert!(matches!(
564            err,
565            ReplayLogError::CapacityOverflow {
566                count,
567                max: MAX_REPLAY_RECORDS
568            } if count == MAX_REPLAY_RECORDS + 1
569        ));
570    }
571}