1use std::fs::{File, OpenOptions};
34use std::io::{BufReader, Read, Write};
35use std::path::{Path, PathBuf};
36
37use serde::{Deserialize, Serialize};
38
39use crate::error::{CoreError, Result};
40use crate::ids::{CollectionId, Lsn};
41use crate::page::PageCodec;
42
43pub const WAL_MAGIC: u32 = u32::from_le_bytes(*b"QVWL");
45pub const WAL_FORMAT_VERSION: u16 = 1;
47
48const WAL_FILE_HEADER_SIZE: usize = 16;
49const FRAME_PREFIX_SIZE: usize = 8; pub const MAX_RECORD_BYTES: u32 = 64 * 1024 * 1024;
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60pub enum WalOp {
61 CreateCollection {
64 collection_id: CollectionId,
66 name: String,
68 descriptor: Vec<u8>,
70 },
71 DropCollection {
73 collection_id: CollectionId,
75 },
76 Upsert {
78 collection_id: CollectionId,
80 external_id: String,
82 vector: Vec<u8>,
84 payload: Vec<u8>,
86 },
87 Delete {
89 collection_id: CollectionId,
91 external_id: String,
93 },
94 Checkpoint {
97 last_checkpointed_lsn: Lsn,
99 manifest_version: u64,
101 },
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub struct WalEntry {
107 pub lsn: Lsn,
109 pub op: WalOp,
111}
112
113#[derive(Debug)]
118pub struct WalWriter {
119 file: File,
120 path: PathBuf,
121 unsynced: bool,
122}
123
124impl WalWriter {
125 pub fn create(path: &Path, base_lsn: Lsn) -> Result<Self> {
128 let mut file = OpenOptions::new()
129 .create_new(true)
130 .read(true)
131 .write(true)
132 .open(path)
133 .map_err(|e| CoreError::io(path, e))?;
134 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
135 hdr[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
136 hdr[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
137 hdr[8..16].copy_from_slice(&base_lsn.value().to_le_bytes());
139 file.write_all(&hdr).map_err(|e| CoreError::io(path, e))?;
140 file.sync_data().map_err(|e| CoreError::io(path, e))?;
141 Ok(Self {
142 file,
143 path: path.to_path_buf(),
144 unsynced: false,
145 })
146 }
147
148 pub fn open_append(path: &Path) -> Result<Self> {
151 let mut file = OpenOptions::new()
152 .read(true)
153 .append(true)
154 .open(path)
155 .map_err(|e| CoreError::io(path, e))?;
156 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
157 file.read_exact(&mut hdr)
158 .map_err(|e| CoreError::io(path, e))?;
159 let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
160 if magic != WAL_MAGIC {
161 return Err(CoreError::BadMagic {
162 expected: WAL_MAGIC,
163 found: magic,
164 });
165 }
166 let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
167 if ver != WAL_FORMAT_VERSION {
168 return Err(CoreError::UnsupportedVersion {
169 found: ver,
170 supported: WAL_FORMAT_VERSION,
171 });
172 }
173 Ok(Self {
174 file,
175 path: path.to_path_buf(),
176 unsynced: false,
177 })
178 }
179
180 pub fn append(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
184 let plaintext = postcard::to_allocvec(entry)?;
185 let sealed = codec.seal_record(&plaintext)?;
186 let len = u32::try_from(sealed.len())
187 .map_err(|_| CoreError::TooLarge(format!("wal record {} bytes", sealed.len())))?;
188 if len > MAX_RECORD_BYTES {
189 return Err(CoreError::TooLarge(format!(
190 "wal record {len} bytes exceeds cap {MAX_RECORD_BYTES}"
191 )));
192 }
193 let crc = crc32c::crc32c(&sealed);
196 let mut frame = Vec::with_capacity(FRAME_PREFIX_SIZE + sealed.len());
197 frame.extend_from_slice(&len.to_le_bytes());
198 frame.extend_from_slice(&crc.to_le_bytes());
199 frame.extend_from_slice(&sealed);
200 self.file
201 .write_all(&frame)
202 .map_err(|e| CoreError::io(&self.path, e))?;
203 self.unsynced = true;
204 Ok(())
205 }
206
207 pub fn sync(&mut self) -> Result<()> {
209 if self.unsynced {
210 self.file
211 .sync_data()
212 .map_err(|e| CoreError::io(&self.path, e))?;
213 self.unsynced = false;
214 }
215 Ok(())
216 }
217
218 pub fn append_sync(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
220 self.append(codec, entry)?;
221 self.sync()
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct WalReplay {
228 pub entries: Vec<WalEntry>,
230 pub torn_at: Option<u64>,
233 pub base_lsn: Lsn,
235}
236
237impl WalReplay {
238 #[must_use]
240 pub fn max_lsn(&self) -> Option<Lsn> {
241 self.entries.iter().map(|e| e.lsn).max()
242 }
243}
244
245enum ReadOutcome {
246 Full,
247 Partial,
248 Eof,
249}
250
251fn read_full<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<ReadOutcome> {
252 let mut filled = 0;
253 while filled < buf.len() {
254 match reader.read(&mut buf[filled..]) {
255 Ok(0) => {
256 return Ok(if filled == 0 {
257 ReadOutcome::Eof
258 } else {
259 ReadOutcome::Partial
260 });
261 }
262 Ok(n) => filled += n,
263 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
264 Err(e) => return Err(CoreError::BareIo(e)),
265 }
266 }
267 Ok(ReadOutcome::Full)
268}
269
270pub fn read_all(path: &Path, codec: &dyn PageCodec) -> Result<WalReplay> {
278 let file = File::open(path).map_err(|e| CoreError::io(path, e))?;
279 let file_len = file.metadata().map_err(|e| CoreError::io(path, e))?.len();
280 let mut reader = BufReader::new(file);
281
282 let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
283 match read_full(&mut reader, &mut hdr)? {
284 ReadOutcome::Full => {}
285 _ => {
286 return Err(CoreError::MalformedPage(format!(
287 "wal {} is shorter than its header",
288 path.display()
289 )));
290 }
291 }
292 let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
293 if magic != WAL_MAGIC {
294 return Err(CoreError::BadMagic {
295 expected: WAL_MAGIC,
296 found: magic,
297 });
298 }
299 let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
300 if ver != WAL_FORMAT_VERSION {
301 return Err(CoreError::UnsupportedVersion {
302 found: ver,
303 supported: WAL_FORMAT_VERSION,
304 });
305 }
306 let base_lsn = Lsn(u64::from_le_bytes([
307 hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15],
308 ]));
309
310 let mut entries = Vec::new();
311 let mut offset = WAL_FILE_HEADER_SIZE as u64;
312 let mut torn_at = None;
313 loop {
314 let mut prefix = [0u8; FRAME_PREFIX_SIZE];
315 match read_full(&mut reader, &mut prefix)? {
316 ReadOutcome::Eof => break, ReadOutcome::Partial => {
318 torn_at = Some(offset);
319 break;
320 }
321 ReadOutcome::Full => {}
322 }
323 let len = u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]);
324 let crc = u32::from_le_bytes([prefix[4], prefix[5], prefix[6], prefix[7]]);
325 let frame_end = offset
326 .checked_add(FRAME_PREFIX_SIZE as u64)
327 .and_then(|o| o.checked_add(u64::from(len)));
328 match frame_end {
329 Some(end) if len != 0 && len <= MAX_RECORD_BYTES && end <= file_len => {}
330 _ => {
331 torn_at = Some(offset);
333 break;
334 }
335 }
336 let mut buf = vec![0u8; len as usize];
337 match read_full(&mut reader, &mut buf)? {
338 ReadOutcome::Full => {}
339 _ => {
340 torn_at = Some(offset);
341 break;
342 }
343 }
344 if crc32c::crc32c(&buf) != crc {
345 torn_at = Some(offset);
346 break;
347 }
348 let plaintext = codec.open_record(&buf)?;
353 match postcard::from_bytes::<WalEntry>(&plaintext) {
354 Ok(entry) => {
355 entries.push(entry);
356 offset += FRAME_PREFIX_SIZE as u64 + u64::from(len);
357 }
358 Err(_) => {
359 torn_at = Some(offset);
361 break;
362 }
363 }
364 }
365 Ok(WalReplay {
366 entries,
367 torn_at,
368 base_lsn,
369 })
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
379 use crate::page::PlainCodec;
380 use proptest::prelude::*;
381
382 fn sample_ops() -> Vec<WalOp> {
383 vec![
384 WalOp::CreateCollection {
385 collection_id: CollectionId(1),
386 name: "alpha".into(),
387 descriptor: vec![1, 2, 3, 4],
388 },
389 WalOp::Upsert {
390 collection_id: CollectionId(1),
391 external_id: "alpha".into(),
392 vector: vec![0u8; 32],
393 payload: br#"{"k":"v"}"#.to_vec(),
394 },
395 WalOp::Delete {
396 collection_id: CollectionId(1),
397 external_id: "alpha".into(),
398 },
399 WalOp::Checkpoint {
400 last_checkpointed_lsn: Lsn(2),
401 manifest_version: 5,
402 },
403 WalOp::DropCollection {
404 collection_id: CollectionId(1),
405 },
406 ]
407 }
408
409 fn entries_from(ops: &[WalOp]) -> Vec<WalEntry> {
410 ops.iter()
411 .enumerate()
412 .map(|(i, op)| WalEntry {
413 lsn: Lsn(i as u64 + 1),
414 op: op.clone(),
415 })
416 .collect()
417 }
418
419 fn write_log(path: &Path, entries: &[WalEntry]) {
420 let mut w = WalWriter::create(path, Lsn(1)).unwrap();
421 for e in entries {
422 w.append(&PlainCodec, e).unwrap();
423 }
424 w.sync().unwrap();
425 }
426
427 #[test]
428 fn roundtrips_every_op() {
429 let dir = tempfile::tempdir().unwrap();
430 let path = dir.path().join("wal-1.log");
431 let entries = entries_from(&sample_ops());
432 write_log(&path, &entries);
433
434 let replay = read_all(&path, &PlainCodec).unwrap();
435 assert_eq!(replay.entries, entries);
436 assert_eq!(replay.torn_at, None);
437 assert_eq!(replay.base_lsn, Lsn(1));
438 assert_eq!(replay.max_lsn(), Some(Lsn(entries.len() as u64)));
439 }
440
441 #[test]
442 fn empty_log_replays_to_nothing() {
443 let dir = tempfile::tempdir().unwrap();
444 let path = dir.path().join("wal-1.log");
445 let _w = WalWriter::create(&path, Lsn(10)).unwrap();
446 let replay = read_all(&path, &PlainCodec).unwrap();
447 assert!(replay.entries.is_empty());
448 assert_eq!(replay.torn_at, None);
449 assert_eq!(replay.base_lsn, Lsn(10));
450 assert_eq!(replay.max_lsn(), None);
451 }
452
453 #[test]
454 fn reopen_and_append_continues_the_log() {
455 let dir = tempfile::tempdir().unwrap();
456 let path = dir.path().join("wal-1.log");
457 let entries = entries_from(&sample_ops());
458 {
459 let mut w = WalWriter::create(&path, Lsn(1)).unwrap();
460 w.append_sync(&PlainCodec, &entries[0]).unwrap();
461 w.append_sync(&PlainCodec, &entries[1]).unwrap();
462 }
463 {
464 let mut w = WalWriter::open_append(&path).unwrap();
465 for e in &entries[2..] {
466 w.append(&PlainCodec, e).unwrap();
467 }
468 w.sync().unwrap();
469 }
470 let replay = read_all(&path, &PlainCodec).unwrap();
471 assert_eq!(replay.entries, entries);
472 assert_eq!(replay.torn_at, None);
473 }
474
475 #[test]
476 fn torn_prefix_at_tail_is_dropped() {
477 let dir = tempfile::tempdir().unwrap();
478 let path = dir.path().join("wal-1.log");
479 let entries = entries_from(&sample_ops());
480 write_log(&path, &entries);
481 let clean_len = std::fs::metadata(&path).unwrap().len();
482 {
484 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
485 f.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
486 f.sync_data().unwrap();
487 }
488 let replay = read_all(&path, &PlainCodec).unwrap();
489 assert_eq!(replay.entries, entries);
490 assert_eq!(replay.torn_at, Some(clean_len));
491 }
492
493 #[test]
494 fn torn_payload_at_tail_is_dropped() {
495 let dir = tempfile::tempdir().unwrap();
496 let path = dir.path().join("wal-1.log");
497 let entries = entries_from(&sample_ops());
498 write_log(&path, &entries);
499 let clean_len = std::fs::metadata(&path).unwrap().len();
500 {
502 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
503 f.write_all(&100u32.to_le_bytes()).unwrap();
504 f.write_all(&0u32.to_le_bytes()).unwrap();
505 f.write_all(&[1, 2, 3]).unwrap();
506 f.sync_data().unwrap();
507 }
508 let replay = read_all(&path, &PlainCodec).unwrap();
509 assert_eq!(replay.entries, entries);
510 assert_eq!(replay.torn_at, Some(clean_len));
511 }
512
513 #[test]
514 fn corruption_stops_recovery_point_in_time() {
515 let dir = tempfile::tempdir().unwrap();
516 let path = dir.path().join("wal-1.log");
517 let entries = entries_from(&sample_ops());
518 write_log(&path, &entries);
519
520 let len0 = postcard::to_allocvec(&entries[0]).unwrap().len() as u64;
524 let second_frame_offset = WAL_FILE_HEADER_SIZE as u64 + FRAME_PREFIX_SIZE as u64 + len0;
525 let corrupt_at = second_frame_offset + FRAME_PREFIX_SIZE as u64 + 1;
526
527 let mut bytes = std::fs::read(&path).unwrap();
528 bytes[corrupt_at as usize] ^= 0xFF;
529 std::fs::write(&path, &bytes).unwrap();
530
531 let replay = read_all(&path, &PlainCodec).unwrap();
532 assert_eq!(replay.entries, vec![entries[0].clone()]);
533 assert_eq!(replay.torn_at, Some(second_frame_offset));
534 }
535
536 #[test]
537 fn foreign_file_is_rejected_by_magic() {
538 let dir = tempfile::tempdir().unwrap();
539 let path = dir.path().join("wal-1.log");
540 std::fs::write(&path, vec![0u8; WAL_FILE_HEADER_SIZE + 4]).unwrap();
541 assert!(matches!(
542 read_all(&path, &PlainCodec),
543 Err(CoreError::BadMagic { .. })
544 ));
545 }
546
547 proptest! {
548 #[test]
549 fn entries_roundtrip(seeds in proptest::collection::vec(0u8..5, 0..40)) {
550 let ops = sample_ops();
551 let entries: Vec<WalEntry> = seeds
552 .iter()
553 .enumerate()
554 .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
555 .collect();
556 let dir = tempfile::tempdir().unwrap();
557 let path = dir.path().join("wal.log");
558 write_log(&path, &entries);
559 let replay = read_all(&path, &PlainCodec).unwrap();
560 prop_assert_eq!(replay.entries, entries);
561 prop_assert_eq!(replay.torn_at, None);
562 }
563
564 #[test]
565 fn truncation_yields_a_clean_prefix(
566 seeds in proptest::collection::vec(0u8..5, 1..20),
567 cut_num in 0u64..1000,
568 ) {
569 let ops = sample_ops();
570 let entries: Vec<WalEntry> = seeds
571 .iter()
572 .enumerate()
573 .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
574 .collect();
575 let dir = tempfile::tempdir().unwrap();
576 let path = dir.path().join("wal.log");
577 write_log(&path, &entries);
578
579 let full = std::fs::metadata(&path).unwrap().len();
580 let mut frame_ends = Vec::new();
582 let mut off = WAL_FILE_HEADER_SIZE as u64;
583 for e in &entries {
584 off += FRAME_PREFIX_SIZE as u64 + postcard::to_allocvec(e).unwrap().len() as u64;
585 frame_ends.push(off);
586 }
587 let cut = WAL_FILE_HEADER_SIZE as u64
589 + (cut_num % (full - WAL_FILE_HEADER_SIZE as u64 + 1));
590 let f = OpenOptions::new().write(true).open(&path).unwrap();
591 f.set_len(cut).unwrap();
592 drop(f);
593
594 let replay = read_all(&path, &PlainCodec).unwrap();
595 let survivors = frame_ends.iter().filter(|&&end| end <= cut).count();
596 prop_assert_eq!(replay.entries.as_slice(), &entries[..survivors]);
597 let clean = cut == WAL_FILE_HEADER_SIZE as u64 || frame_ends.contains(&cut);
599 prop_assert_eq!(replay.torn_at.is_none(), clean);
600 }
601 }
602}