1use std::fs::{File, OpenOptions};
43use std::io::{Read, Seek, SeekFrom, Write};
44use std::path::Path;
45use std::sync::Arc;
46
47use crate::PipelineError;
48
49const LOG_MAGIC: &[u8; 8] = b"VRRL0001";
50const LOG_VERSION: u32 = 1;
51const RECORD_MAGIC: u32 = 0xDEAD_BEEF;
52const RECORD_BYTES: u64 = 64;
53const HEADER_BYTES: u64 = 32;
54const MAX_REPLAY_RECORDS: u64 = 1_048_576;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct RecordedSlot {
59 pub timestamp_ns: u64,
61 pub slot_idx: u32,
63 pub tenant_id: u32,
65 pub opcode: u32,
67 pub args: [u32; 4],
70 pub epoch: u32,
74}
75
76#[derive(Debug, thiserror::Error)]
79#[non_exhaustive]
80pub enum ReplayLogError {
81 #[error("replay log {op} on `{path}` failed: {source}. Fix: check disk space + permissions.")]
83 Io {
84 op: &'static str,
86 path: Arc<str>,
88 #[source]
90 source: std::io::Error,
91 },
92 #[error("replay log `{path}` header mismatch. Fix: regenerate the log; VRRL format may have changed.")]
94 HeaderMismatch {
95 path: Arc<str>,
97 },
98 #[error("replay log capacity must be > 0. Fix: construct with at least one slot.")]
100 ZeroCapacity,
101 #[error("replay log capacity {count} exceeds max {max}. Fix: shard replay into smaller logs.")]
105 CapacityOverflow {
106 count: u64,
108 max: u64,
110 },
111}
112
113fn io_err(op: &'static str, path: &Path, source: std::io::Error) -> ReplayLogError {
114 ReplayLogError::Io {
115 op,
116 path: Arc::from(path.to_string_lossy().as_ref()),
117 source,
118 }
119}
120
121#[derive(Debug)]
125pub struct RingLog {
126 file: File,
127 path_repr: Arc<str>,
128 capacity: u64,
129 next_slot: u64,
130}
131
132impl RingLog {
133 pub fn open(path: impl AsRef<Path>, capacity: u64) -> Result<Self, ReplayLogError> {
144 if capacity == 0 {
145 return Err(ReplayLogError::ZeroCapacity);
146 }
147 validate_capacity(capacity)?;
148
149 let path = path.as_ref();
150 let path_repr: Arc<str> = Arc::from(path.to_string_lossy().as_ref());
151 let existed = path.exists();
152 let mut file = OpenOptions::new()
153 .create(true)
154 .truncate(false)
155 .read(true)
156 .write(true)
157 .open(path)
158 .map_err(|e| io_err("open", path, e))?;
159
160 if existed {
161 let mut magic = [0u8; 8];
162 file.read_exact(&mut magic)
163 .map_err(|e| io_err("read", path, e))?;
164 if &magic != LOG_MAGIC {
165 return Err(ReplayLogError::HeaderMismatch {
166 path: Arc::clone(&path_repr),
167 });
168 }
169 let mut version_bytes = [0u8; 4];
170 file.read_exact(&mut version_bytes)
171 .map_err(|e| io_err("read", path, e))?;
172 if u32::from_le_bytes(version_bytes) != LOG_VERSION {
173 return Err(ReplayLogError::HeaderMismatch {
174 path: Arc::clone(&path_repr),
175 });
176 }
177 let mut _flags = [0u8; 4];
178 file.read_exact(&mut _flags)
179 .map_err(|e| io_err("read", path, e))?;
180 let mut cap_bytes = [0u8; 8];
181 file.read_exact(&mut cap_bytes)
182 .map_err(|e| io_err("read", path, e))?;
183 let mut cursor_bytes = [0u8; 8];
184 file.read_exact(&mut cursor_bytes)
185 .map_err(|e| io_err("read", path, e))?;
186 let existing_cap = u64::from_le_bytes(cap_bytes);
187 validate_capacity(existing_cap)?;
188 let cursor = u64::from_le_bytes(cursor_bytes);
189 return Ok(Self {
190 file,
191 path_repr,
192 capacity: existing_cap,
193 next_slot: cursor % existing_cap,
194 });
195 }
196
197 let total_bytes = log_file_len(capacity)?;
201 file.set_len(total_bytes)
202 .map_err(|e| io_err("set_len", path, e))?;
203 file.seek(SeekFrom::Start(0))
204 .map_err(|e| io_err("seek", path, e))?;
205 file.write_all(LOG_MAGIC)
206 .map_err(|e| io_err("write", path, e))?;
207 file.write_all(&LOG_VERSION.to_le_bytes())
208 .map_err(|e| io_err("write", path, e))?;
209 file.write_all(&0u32.to_le_bytes())
210 .map_err(|e| io_err("write", path, e))?; file.write_all(&capacity.to_le_bytes())
212 .map_err(|e| io_err("write", path, e))?;
213 file.write_all(&0u64.to_le_bytes())
214 .map_err(|e| io_err("write", path, e))?; Ok(Self {
217 file,
218 path_repr,
219 capacity,
220 next_slot: 0,
221 })
222 }
223
224 #[must_use]
227 pub fn capacity(&self) -> u64 {
228 self.capacity
229 }
230
231 #[must_use]
233 pub fn cursor(&self) -> u64 {
234 self.next_slot
235 }
236
237 #[must_use]
239 pub fn path(&self) -> &str {
240 self.path_repr.as_ref()
241 }
242
243 pub fn append(&mut self, slot: RecordedSlot) -> Result<(), ReplayLogError> {
251 let record_offset = log_record_offset(self.next_slot)?;
252 self.file
253 .seek(SeekFrom::Start(record_offset))
254 .map_err(|e| self.io_err("seek", e))?;
255
256 let mut buf = [0u8; RECORD_BYTES as usize];
257 buf[0..4].copy_from_slice(&RECORD_MAGIC.to_le_bytes());
258 buf[4..12].copy_from_slice(&slot.timestamp_ns.to_le_bytes());
259 buf[12..16].copy_from_slice(&slot.slot_idx.to_le_bytes());
260 buf[16..20].copy_from_slice(&slot.tenant_id.to_le_bytes());
261 buf[20..24].copy_from_slice(&slot.opcode.to_le_bytes());
262 buf[24..28].copy_from_slice(&slot.args[0].to_le_bytes());
263 buf[28..32].copy_from_slice(&slot.args[1].to_le_bytes());
264 buf[32..36].copy_from_slice(&slot.args[2].to_le_bytes());
265 buf[36..40].copy_from_slice(&slot.args[3].to_le_bytes());
266 buf[40..44].copy_from_slice(&slot.epoch.to_le_bytes());
267 self.file
269 .write_all(&buf)
270 .map_err(|e| self.io_err("write", e))?;
271
272 self.next_slot = (self.next_slot + 1) % self.capacity;
275 self.file
276 .seek(SeekFrom::Start(24)) .map_err(|e| self.io_err("seek", e))?;
278 self.file
279 .write_all(&self.next_slot.to_le_bytes())
280 .map_err(|e| self.io_err("write", e))?;
281
282 Ok(())
283 }
284
285 pub fn replay_all(&mut self) -> Result<Vec<RecordedSlot>, ReplayLogError> {
296 let capacity =
297 usize::try_from(self.capacity).map_err(|_| ReplayLogError::CapacityOverflow {
298 count: self.capacity,
299 max: MAX_REPLAY_RECORDS,
300 })?;
301 let mut out = Vec::with_capacity(capacity);
302 for step in 0..self.capacity {
303 let slot_index = (self.next_slot + step) % self.capacity;
304 let offset = log_record_offset(slot_index)?;
305 self.file
306 .seek(SeekFrom::Start(offset))
307 .map_err(|e| self.io_err("seek", e))?;
308 let mut buf = [0u8; RECORD_BYTES as usize];
309 self.file
310 .read_exact(&mut buf)
311 .map_err(|e| self.io_err("read", e))?;
312 let magic = read_u32(&buf, 0);
313 if magic == 0 {
314 continue;
316 }
317 if magic != RECORD_MAGIC {
318 return Err(ReplayLogError::HeaderMismatch {
319 path: self.path_repr.clone(),
320 });
321 }
322 out.push(RecordedSlot {
323 timestamp_ns: read_u64(&buf, 4),
324 slot_idx: read_u32(&buf, 12),
325 tenant_id: read_u32(&buf, 16),
326 opcode: read_u32(&buf, 20),
327 args: [
328 read_u32(&buf, 24),
329 read_u32(&buf, 28),
330 read_u32(&buf, 32),
331 read_u32(&buf, 36),
332 ],
333 epoch: read_u32(&buf, 40),
334 });
335 }
336 Ok(out)
337 }
338
339 pub fn sync(&mut self) -> Result<(), ReplayLogError> {
347 self.file.sync_all().map_err(|e| self.io_err("sync", e))?;
348 Ok(())
349 }
350
351 fn io_err(&self, op: &'static str, source: std::io::Error) -> ReplayLogError {
352 ReplayLogError::Io {
353 op,
354 path: self.path_repr.clone(),
355 source,
356 }
357 }
358}
359
360fn validate_capacity(capacity: u64) -> Result<(), ReplayLogError> {
361 if capacity == 0 {
362 return Err(ReplayLogError::ZeroCapacity);
363 }
364 if capacity > MAX_REPLAY_RECORDS {
365 return Err(ReplayLogError::CapacityOverflow {
366 count: capacity,
367 max: MAX_REPLAY_RECORDS,
368 });
369 }
370 Ok(())
371}
372
373fn log_file_len(capacity: u64) -> Result<u64, ReplayLogError> {
374 log_record_position(capacity)
375}
376
377fn log_record_offset(slot_index: u64) -> Result<u64, ReplayLogError> {
378 log_record_position(slot_index)
379}
380
381fn log_record_position(record_index: u64) -> Result<u64, ReplayLogError> {
382 let record_bytes =
383 vyre_driver::accounting::checked_mul_u64_lazy(record_index, RECORD_BYTES, || {
384 replay_capacity_overflow(record_index)
385 })?;
386 vyre_driver::accounting::checked_add_u64_lazy(HEADER_BYTES, record_bytes, || {
387 replay_capacity_overflow(record_index)
388 })
389}
390
391fn replay_capacity_overflow(count: u64) -> ReplayLogError {
392 ReplayLogError::CapacityOverflow {
393 count,
394 max: MAX_REPLAY_RECORDS,
395 }
396}
397
398fn read_u32(buf: &[u8], offset: usize) -> u32 {
399 let mut bytes = [0u8; 4];
400 bytes.copy_from_slice(&buf[offset..offset + 4]);
401 u32::from_le_bytes(bytes)
402}
403
404fn read_u64(buf: &[u8], offset: usize) -> u64 {
405 let mut bytes = [0u8; 8];
406 bytes.copy_from_slice(&buf[offset..offset + 8]);
407 u64::from_le_bytes(bytes)
408}
409
410impl From<ReplayLogError> for PipelineError {
413 fn from(err: ReplayLogError) -> Self {
414 PipelineError::Backend(err.to_string())
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 fn rec(slot_idx: u32, epoch: u32) -> RecordedSlot {
423 RecordedSlot {
424 timestamp_ns: 1_000_000 + slot_idx as u64,
425 slot_idx,
426 tenant_id: 0,
427 opcode: 0x4000_0000 + slot_idx,
428 args: [slot_idx, slot_idx * 2, slot_idx * 3, slot_idx * 4],
429 epoch,
430 }
431 }
432
433 #[test]
434 fn open_rejects_zero_capacity() {
435 let dir = tempfile::tempdir().unwrap();
436 let path = dir.path().join("log.vrrl");
437 let err = RingLog::open(&path, 0).expect_err("zero capacity must reject");
438 assert!(matches!(err, ReplayLogError::ZeroCapacity));
439 }
440
441 #[test]
442 fn append_and_replay_round_trip() {
443 let dir = tempfile::tempdir().unwrap();
444 let path = dir.path().join("log.vrrl");
445 let mut log = RingLog::open(&path, 4)
446 .expect("Fix: open fresh log; restore this invariant before continuing.");
447 log.append(rec(1, 10)).unwrap();
448 log.append(rec(2, 11)).unwrap();
449 log.sync().unwrap();
450
451 let replay = log
452 .replay_all()
453 .expect("Fix: replay; restore this invariant before continuing.");
454 assert_eq!(replay.len(), 2);
455 assert_eq!(replay[0].slot_idx, 1);
456 assert_eq!(replay[0].epoch, 10);
457 assert_eq!(replay[1].slot_idx, 2);
458 assert_eq!(replay[1].epoch, 11);
459 }
460
461 #[test]
462 fn log_rollover_preserves_most_recent() {
463 let dir = tempfile::tempdir().unwrap();
464 let path = dir.path().join("log.vrrl");
465 let mut log =
466 RingLog::open(&path, 3).expect("Fix: open; restore this invariant before continuing.");
467 for i in 0..5 {
468 log.append(rec(i, 100 + i)).unwrap();
469 }
470 let replay = log
471 .replay_all()
472 .expect("Fix: replay; restore this invariant before continuing.");
473 assert_eq!(replay.len(), 3, "capacity=3 must retain exactly 3 records");
474 let slot_ids: Vec<u32> = replay.iter().map(|r| r.slot_idx).collect();
475 assert_eq!(slot_ids, vec![2, 3, 4]);
479 }
480
481 #[test]
482 fn reopen_restores_cursor() {
483 let dir = tempfile::tempdir().unwrap();
484 let path = dir.path().join("log.vrrl");
485 {
486 let mut log = RingLog::open(&path, 4)
487 .expect("Fix: open fresh; restore this invariant before continuing.");
488 log.append(rec(1, 10)).unwrap();
489 log.append(rec(2, 11)).unwrap();
490 log.sync().unwrap();
491 }
492 let mut reopened = RingLog::open(&path, 4)
493 .expect("Fix: reopen; restore this invariant before continuing.");
494 assert_eq!(reopened.cursor(), 2);
495 let replay = reopened.replay_all().unwrap();
496 assert_eq!(replay.len(), 2);
497 }
498
499 #[test]
500 fn corrupted_magic_rejected() {
501 use std::io::Write as _;
502
503 let dir = tempfile::tempdir().unwrap();
504 let path = dir.path().join("log.vrrl");
505 {
506 let mut f = std::fs::File::create(&path).unwrap();
508 f.write_all(b"XXXX0001").unwrap();
509 f.write_all(&1u32.to_le_bytes()).unwrap();
510 f.write_all(&0u32.to_le_bytes()).unwrap();
511 f.write_all(&4u64.to_le_bytes()).unwrap();
512 f.write_all(&0u64.to_le_bytes()).unwrap();
513 f.set_len(HEADER_BYTES + 4 * RECORD_BYTES).unwrap();
515 }
516 let err = RingLog::open(&path, 4).expect_err("wrong magic must reject");
517 assert!(matches!(err, ReplayLogError::HeaderMismatch { .. }));
518 }
519
520 fn write_header(path: &Path, capacity: u64, cursor: u64) {
521 use std::io::Write as _;
522
523 let mut f = std::fs::File::create(path).unwrap();
524 f.write_all(LOG_MAGIC).unwrap();
525 f.write_all(&LOG_VERSION.to_le_bytes()).unwrap();
526 f.write_all(&0u32.to_le_bytes()).unwrap();
527 f.write_all(&capacity.to_le_bytes()).unwrap();
528 f.write_all(&cursor.to_le_bytes()).unwrap();
529 }
530
531 #[test]
532 fn existing_log_zero_capacity_rejected_before_cursor_modulo() {
533 let dir = tempfile::tempdir().unwrap();
534 let path = dir.path().join("log.vrrl");
535 write_header(&path, 0, 0);
536
537 let err = RingLog::open(&path, 4).expect_err("header capacity=0 must reject");
538 assert!(matches!(err, ReplayLogError::ZeroCapacity));
539 }
540
541 #[test]
542 fn existing_log_huge_capacity_rejected_before_replay_allocation() {
543 let dir = tempfile::tempdir().unwrap();
544 let path = dir.path().join("log.vrrl");
545 write_header(&path, MAX_REPLAY_RECORDS + 1, 0);
546
547 let err = RingLog::open(&path, 4).expect_err("huge header capacity must reject");
548 assert!(matches!(
549 err,
550 ReplayLogError::CapacityOverflow {
551 count,
552 max: MAX_REPLAY_RECORDS
553 } if count == MAX_REPLAY_RECORDS + 1
554 ));
555 }
556
557 #[test]
558 fn capacity_overflow_rejected() {
559 let dir = tempfile::tempdir().unwrap();
560 let path = dir.path().join("log.vrrl");
561 let err = RingLog::open(&path, MAX_REPLAY_RECORDS + 1)
562 .expect_err("over-size capacity must reject");
563 assert!(matches!(
564 err,
565 ReplayLogError::CapacityOverflow {
566 count,
567 max: MAX_REPLAY_RECORDS
568 } if count == MAX_REPLAY_RECORDS + 1
569 ));
570 }
571}