1use std::fs::{File, OpenOptions};
34use std::io::{BufReader, Read, Write};
35use std::path::{Path, PathBuf};
36
37use serde::{Deserialize, Serialize};
38
39use crate::error::{CoreError, Result};
40use crate::ids::{CollectionId, Lsn};
41use crate::page::PageCodec;
42
43pub const WAL_MAGIC: u32 = u32::from_le_bytes(*b"QVWL");
45pub const WAL_FORMAT_VERSION: u16 = 1;
47
48const WAL_FILE_HEADER_SIZE: usize = 16;
49const FRAME_PREFIX_SIZE: usize = 8; pub const MAX_RECORD_BYTES: u32 = 64 * 1024 * 1024;
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60pub enum WalOp {
61 CreateCollection {
64 collection_id: CollectionId,
66 name: String,
68 descriptor: Vec<u8>,
70 },
71 DropCollection {
73 collection_id: CollectionId,
75 },
76 Upsert {
78 collection_id: CollectionId,
80 external_id: String,
82 vector: Vec<u8>,
84 payload: Vec<u8>,
86 },
87 Delete {
89 collection_id: CollectionId,
91 external_id: String,
93 },
94 Checkpoint {
97 last_checkpointed_lsn: Lsn,
99 manifest_version: u64,
101 },
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub struct WalEntry {
107 pub lsn: Lsn,
109 pub op: WalOp,
111}
112
113#[derive(Debug)]
118pub struct WalWriter {
119 file: File,
120 path: PathBuf,
121 unsynced: bool,
122}
123
124impl WalWriter {
125 pub fn create(path: &Path, base_lsn: Lsn) -> Result<Self> {
128 let mut file = OpenOptions::new()
129 .create_new(true)
130 .read(true)
131 .write(true)
132 .open(path)
133 .map_err(|e| CoreError::io(path, e))?;
134 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
135 hdr[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
136 hdr[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
137 hdr[8..16].copy_from_slice(&base_lsn.value().to_le_bytes());
139 file.write_all(&hdr).map_err(|e| CoreError::io(path, e))?;
140 file.sync_data().map_err(|e| CoreError::io(path, e))?;
141 Ok(Self {
142 file,
143 path: path.to_path_buf(),
144 unsynced: false,
145 })
146 }
147
148 pub fn open_append(path: &Path) -> Result<Self> {
151 let mut file = OpenOptions::new()
152 .read(true)
153 .append(true)
154 .open(path)
155 .map_err(|e| CoreError::io(path, e))?;
156 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
157 file.read_exact(&mut hdr)
158 .map_err(|e| CoreError::io(path, e))?;
159 let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
160 if magic != WAL_MAGIC {
161 return Err(CoreError::BadMagic {
162 expected: WAL_MAGIC,
163 found: magic,
164 });
165 }
166 let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
167 if ver != WAL_FORMAT_VERSION {
168 return Err(CoreError::UnsupportedVersion {
169 found: ver,
170 supported: WAL_FORMAT_VERSION,
171 });
172 }
173 Ok(Self {
174 file,
175 path: path.to_path_buf(),
176 unsynced: false,
177 })
178 }
179
180 pub fn append(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
184 let plaintext = postcard::to_allocvec(entry)?;
185 let sealed = codec.seal_record(&plaintext)?;
186 let len = u32::try_from(sealed.len())
187 .map_err(|_| CoreError::TooLarge(format!("wal record {} bytes", sealed.len())))?;
188 if len > MAX_RECORD_BYTES {
189 return Err(CoreError::TooLarge(format!(
190 "wal record {len} bytes exceeds cap {MAX_RECORD_BYTES}"
191 )));
192 }
193 let crc = crc32c::crc32c(&sealed);
196 let mut frame = Vec::with_capacity(FRAME_PREFIX_SIZE + sealed.len());
197 frame.extend_from_slice(&len.to_le_bytes());
198 frame.extend_from_slice(&crc.to_le_bytes());
199 frame.extend_from_slice(&sealed);
200 self.file
201 .write_all(&frame)
202 .map_err(|e| CoreError::io(&self.path, e))?;
203 self.unsynced = true;
204 Ok(())
205 }
206
207 pub fn sync(&mut self) -> Result<()> {
209 if self.unsynced {
210 self.file
211 .sync_data()
212 .map_err(|e| CoreError::io(&self.path, e))?;
213 self.unsynced = false;
214 }
215 Ok(())
216 }
217
218 pub fn append_sync(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
220 self.append(codec, entry)?;
221 self.sync()
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct WalReplay {
228 pub entries: Vec<WalEntry>,
230 pub torn_at: Option<u64>,
233 pub base_lsn: Lsn,
235}
236
237impl WalReplay {
238 #[must_use]
240 pub fn max_lsn(&self) -> Option<Lsn> {
241 self.entries.iter().map(|e| e.lsn).max()
242 }
243}
244
245enum ReadOutcome {
246 Full,
247 Partial,
248 Eof,
249}
250
251fn read_full<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<ReadOutcome> {
252 let mut filled = 0;
253 while filled < buf.len() {
254 match reader.read(&mut buf[filled..]) {
255 Ok(0) => {
256 return Ok(if filled == 0 {
257 ReadOutcome::Eof
258 } else {
259 ReadOutcome::Partial
260 });
261 }
262 Ok(n) => filled += n,
263 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
264 Err(e) => return Err(CoreError::BareIo(e)),
265 }
266 }
267 Ok(ReadOutcome::Full)
268}
269
270pub fn read_all(path: &Path, codec: &dyn PageCodec) -> Result<WalReplay> {
281 let file = File::open(path).map_err(|e| CoreError::io(path, e))?;
282 let file_len = file.metadata().map_err(|e| CoreError::io(path, e))?.len();
283 let mut reader = BufReader::new(file);
284
285 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
286 match read_full(&mut reader, &mut hdr)? {
287 ReadOutcome::Full => {}
288 _ => {
289 return Ok(WalReplay {
298 entries: Vec::new(),
299 torn_at: Some(0),
300 base_lsn: Lsn(0),
301 });
302 }
303 }
304 let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
305 if magic != WAL_MAGIC {
306 return Err(CoreError::BadMagic {
307 expected: WAL_MAGIC,
308 found: magic,
309 });
310 }
311 let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
312 if ver != WAL_FORMAT_VERSION {
313 return Err(CoreError::UnsupportedVersion {
314 found: ver,
315 supported: WAL_FORMAT_VERSION,
316 });
317 }
318 let base_lsn = Lsn(u64::from_le_bytes([
319 hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15],
320 ]));
321
322 let mut entries = Vec::new();
323 let mut offset = WAL_FILE_HEADER_SIZE as u64;
324 let mut torn_at = None;
325 loop {
326 let mut prefix = [0u8; FRAME_PREFIX_SIZE];
327 match read_full(&mut reader, &mut prefix)? {
328 ReadOutcome::Eof => break, ReadOutcome::Partial => {
330 torn_at = Some(offset);
331 break;
332 }
333 ReadOutcome::Full => {}
334 }
335 let len = u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]);
336 let crc = u32::from_le_bytes([prefix[4], prefix[5], prefix[6], prefix[7]]);
337 let frame_end = offset
338 .checked_add(FRAME_PREFIX_SIZE as u64)
339 .and_then(|o| o.checked_add(u64::from(len)));
340 match frame_end {
341 Some(end) if len != 0 && len <= MAX_RECORD_BYTES && end <= file_len => {}
342 _ => {
343 torn_at = Some(offset);
345 break;
346 }
347 }
348 let mut buf = vec![0u8; len as usize];
349 match read_full(&mut reader, &mut buf)? {
350 ReadOutcome::Full => {}
351 _ => {
352 torn_at = Some(offset);
353 break;
354 }
355 }
356 if crc32c::crc32c(&buf) != crc {
357 torn_at = Some(offset);
358 break;
359 }
360 let plaintext = codec.open_record(&buf)?;
365 match postcard::from_bytes::<WalEntry>(&plaintext) {
366 Ok(entry) => {
367 entries.push(entry);
368 offset += FRAME_PREFIX_SIZE as u64 + u64::from(len);
369 }
370 Err(_) => {
371 torn_at = Some(offset);
373 break;
374 }
375 }
376 }
377 Ok(WalReplay {
378 entries,
379 torn_at,
380 base_lsn,
381 })
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
391 use crate::page::PlainCodec;
392 use proptest::prelude::*;
393
394 fn sample_ops() -> Vec<WalOp> {
395 vec![
396 WalOp::CreateCollection {
397 collection_id: CollectionId(1),
398 name: "alpha".into(),
399 descriptor: vec![1, 2, 3, 4],
400 },
401 WalOp::Upsert {
402 collection_id: CollectionId(1),
403 external_id: "alpha".into(),
404 vector: vec![0u8; 32],
405 payload: br#"{"k":"v"}"#.to_vec(),
406 },
407 WalOp::Delete {
408 collection_id: CollectionId(1),
409 external_id: "alpha".into(),
410 },
411 WalOp::Checkpoint {
412 last_checkpointed_lsn: Lsn(2),
413 manifest_version: 5,
414 },
415 WalOp::DropCollection {
416 collection_id: CollectionId(1),
417 },
418 ]
419 }
420
421 fn entries_from(ops: &[WalOp]) -> Vec<WalEntry> {
422 ops.iter()
423 .enumerate()
424 .map(|(i, op)| WalEntry {
425 lsn: Lsn(i as u64 + 1),
426 op: op.clone(),
427 })
428 .collect()
429 }
430
431 fn write_log(path: &Path, entries: &[WalEntry]) {
432 let mut w = WalWriter::create(path, Lsn(1)).unwrap();
433 for e in entries {
434 w.append(&PlainCodec, e).unwrap();
435 }
436 w.sync().unwrap();
437 }
438
439 #[test]
440 fn roundtrips_every_op() {
441 let dir = tempfile::tempdir().unwrap();
442 let path = dir.path().join("wal-1.log");
443 let entries = entries_from(&sample_ops());
444 write_log(&path, &entries);
445
446 let replay = read_all(&path, &PlainCodec).unwrap();
447 assert_eq!(replay.entries, entries);
448 assert_eq!(replay.torn_at, None);
449 assert_eq!(replay.base_lsn, Lsn(1));
450 assert_eq!(replay.max_lsn(), Some(Lsn(entries.len() as u64)));
451 }
452
453 #[test]
454 fn empty_log_replays_to_nothing() {
455 let dir = tempfile::tempdir().unwrap();
456 let path = dir.path().join("wal-1.log");
457 let _w = WalWriter::create(&path, Lsn(10)).unwrap();
458 let replay = read_all(&path, &PlainCodec).unwrap();
459 assert!(replay.entries.is_empty());
460 assert_eq!(replay.torn_at, None);
461 assert_eq!(replay.base_lsn, Lsn(10));
462 assert_eq!(replay.max_lsn(), None);
463 }
464
465 #[test]
466 fn reopen_and_append_continues_the_log() {
467 let dir = tempfile::tempdir().unwrap();
468 let path = dir.path().join("wal-1.log");
469 let entries = entries_from(&sample_ops());
470 {
471 let mut w = WalWriter::create(&path, Lsn(1)).unwrap();
472 w.append_sync(&PlainCodec, &entries[0]).unwrap();
473 w.append_sync(&PlainCodec, &entries[1]).unwrap();
474 }
475 {
476 let mut w = WalWriter::open_append(&path).unwrap();
477 for e in &entries[2..] {
478 w.append(&PlainCodec, e).unwrap();
479 }
480 w.sync().unwrap();
481 }
482 let replay = read_all(&path, &PlainCodec).unwrap();
483 assert_eq!(replay.entries, entries);
484 assert_eq!(replay.torn_at, None);
485 }
486
487 #[test]
488 fn torn_prefix_at_tail_is_dropped() {
489 let dir = tempfile::tempdir().unwrap();
490 let path = dir.path().join("wal-1.log");
491 let entries = entries_from(&sample_ops());
492 write_log(&path, &entries);
493 let clean_len = std::fs::metadata(&path).unwrap().len();
494 {
496 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
497 f.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
498 f.sync_data().unwrap();
499 }
500 let replay = read_all(&path, &PlainCodec).unwrap();
501 assert_eq!(replay.entries, entries);
502 assert_eq!(replay.torn_at, Some(clean_len));
503 }
504
505 #[test]
506 fn torn_payload_at_tail_is_dropped() {
507 let dir = tempfile::tempdir().unwrap();
508 let path = dir.path().join("wal-1.log");
509 let entries = entries_from(&sample_ops());
510 write_log(&path, &entries);
511 let clean_len = std::fs::metadata(&path).unwrap().len();
512 {
514 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
515 f.write_all(&100u32.to_le_bytes()).unwrap();
516 f.write_all(&0u32.to_le_bytes()).unwrap();
517 f.write_all(&[1, 2, 3]).unwrap();
518 f.sync_data().unwrap();
519 }
520 let replay = read_all(&path, &PlainCodec).unwrap();
521 assert_eq!(replay.entries, entries);
522 assert_eq!(replay.torn_at, Some(clean_len));
523 }
524
525 #[test]
526 fn corruption_stops_recovery_point_in_time() {
527 let dir = tempfile::tempdir().unwrap();
528 let path = dir.path().join("wal-1.log");
529 let entries = entries_from(&sample_ops());
530 write_log(&path, &entries);
531
532 let len0 = postcard::to_allocvec(&entries[0]).unwrap().len() as u64;
536 let second_frame_offset = WAL_FILE_HEADER_SIZE as u64 + FRAME_PREFIX_SIZE as u64 + len0;
537 let corrupt_at = second_frame_offset + FRAME_PREFIX_SIZE as u64 + 1;
538
539 let mut bytes = std::fs::read(&path).unwrap();
540 bytes[corrupt_at as usize] ^= 0xFF;
541 std::fs::write(&path, &bytes).unwrap();
542
543 let replay = read_all(&path, &PlainCodec).unwrap();
544 assert_eq!(replay.entries, vec![entries[0].clone()]);
545 assert_eq!(replay.torn_at, Some(second_frame_offset));
546 }
547
548 #[test]
549 fn foreign_file_is_rejected_by_magic() {
550 let dir = tempfile::tempdir().unwrap();
551 let path = dir.path().join("wal-1.log");
552 std::fs::write(&path, vec![0u8; WAL_FILE_HEADER_SIZE + 4]).unwrap();
553 assert!(matches!(
554 read_all(&path, &PlainCodec),
555 Err(CoreError::BadMagic { .. })
556 ));
557 }
558
559 #[test]
560 fn sub_header_segment_is_an_empty_torn_tail() {
561 for len in [0usize, 1, WAL_FILE_HEADER_SIZE - 1] {
567 let dir = tempfile::tempdir().unwrap();
568 let path = dir.path().join("wal-1.log");
569 std::fs::write(&path, vec![0u8; len]).unwrap();
570 let replay = read_all(&path, &PlainCodec).unwrap();
571 assert!(replay.entries.is_empty(), "len {len}: expected no entries");
572 assert_eq!(replay.torn_at, Some(0), "len {len}: expected torn at 0");
573 }
574 }
575
576 proptest! {
577 #[test]
578 fn entries_roundtrip(seeds in proptest::collection::vec(0u8..5, 0..40)) {
579 let ops = sample_ops();
580 let entries: Vec<WalEntry> = seeds
581 .iter()
582 .enumerate()
583 .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
584 .collect();
585 let dir = tempfile::tempdir().unwrap();
586 let path = dir.path().join("wal.log");
587 write_log(&path, &entries);
588 let replay = read_all(&path, &PlainCodec).unwrap();
589 prop_assert_eq!(replay.entries, entries);
590 prop_assert_eq!(replay.torn_at, None);
591 }
592
593 #[test]
594 fn truncation_yields_a_clean_prefix(
595 seeds in proptest::collection::vec(0u8..5, 1..20),
596 cut_num in 0u64..1000,
597 ) {
598 let ops = sample_ops();
599 let entries: Vec<WalEntry> = seeds
600 .iter()
601 .enumerate()
602 .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
603 .collect();
604 let dir = tempfile::tempdir().unwrap();
605 let path = dir.path().join("wal.log");
606 write_log(&path, &entries);
607
608 let full = std::fs::metadata(&path).unwrap().len();
609 let mut frame_ends = Vec::new();
611 let mut off = WAL_FILE_HEADER_SIZE as u64;
612 for e in &entries {
613 off += FRAME_PREFIX_SIZE as u64 + postcard::to_allocvec(e).unwrap().len() as u64;
614 frame_ends.push(off);
615 }
616 let cut = WAL_FILE_HEADER_SIZE as u64
618 + (cut_num % (full - WAL_FILE_HEADER_SIZE as u64 + 1));
619 let f = OpenOptions::new().write(true).open(&path).unwrap();
620 f.set_len(cut).unwrap();
621 drop(f);
622
623 let replay = read_all(&path, &PlainCodec).unwrap();
624 let survivors = frame_ends.iter().filter(|&&end| end <= cut).count();
625 prop_assert_eq!(replay.entries.as_slice(), &entries[..survivors]);
626 let clean = cut == WAL_FILE_HEADER_SIZE as u64 || frame_ends.contains(&cut);
628 prop_assert_eq!(replay.torn_at.is_none(), clean);
629 }
630 }
631}