1use std::fs::{File, OpenOptions};
2use std::io::{self, BufReader, BufWriter, Read, Write};
3use std::path::Path;
4
5use crate::core::{DocId, LuciError, Result};
6
7use crate::storage::header::xxh3_checksum;
8
9const RECORD_HEADER_SIZE: usize = 16;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum DurabilityMode {
17 Full,
20 Batch,
24 None,
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
33pub enum WalRecord {
34 Put {
36 doc_id: DocId,
37 data: Vec<u8>,
39 },
40 Delete { doc_id: DocId },
42}
43
44const TAG_PUT: u8 = 1;
46const TAG_DELETE: u8 = 2;
47
48pub struct Wal {
56 writer: BufWriter<File>,
57 mode: DurabilityMode,
58}
59
60impl Wal {
61 pub fn open(path: impl AsRef<Path>, mode: DurabilityMode) -> Result<Self> {
67 let file = OpenOptions::new()
68 .create(true)
69 .append(true)
70 .open(path.as_ref())?;
71
72 Ok(Self {
73 writer: BufWriter::new(file),
74 mode,
75 })
76 }
77
78 pub fn append(&mut self, record: &WalRecord) -> Result<()> {
84 let (tag, payload) = encode_payload(record);
85
86 let checksum = xxh3_checksum(&payload);
87
88 let mut header = [0u8; RECORD_HEADER_SIZE];
90 header[0] = tag;
91 header[4..8].copy_from_slice(&(payload.len() as u32).to_le_bytes());
92 header[8..16].copy_from_slice(&checksum.to_le_bytes());
93
94 self.writer.write_all(&header)?;
95 self.writer.write_all(&payload)?;
96
97 if self.mode == DurabilityMode::Full {
98 self.writer.flush()?;
99 self.writer.get_ref().sync_all()?;
100 }
101
102 Ok(())
103 }
104
105 pub fn sync(&mut self) -> Result<()> {
110 self.writer.flush()?;
111 self.writer.get_ref().sync_all()?;
112 Ok(())
113 }
114
115 pub fn truncate(&mut self) -> Result<()> {
120 self.writer.flush()?;
121 self.writer.get_ref().set_len(0)?;
122 Ok(())
124 }
125
126 pub fn mode(&self) -> DurabilityMode {
128 self.mode
129 }
130}
131
132pub fn replay_wal(path: impl AsRef<Path>) -> Result<Vec<WalRecord>> {
143 let file = match File::open(path.as_ref()) {
144 Ok(f) => f,
145 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
146 Err(e) => return Err(LuciError::Io(e)),
147 };
148
149 let mut reader = BufReader::new(file);
150 let mut records = Vec::new();
151
152 loop {
153 let mut header = [0u8; RECORD_HEADER_SIZE];
155 match reader.read_exact(&mut header) {
156 Ok(()) => {}
157 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
158 Err(e) => return Err(LuciError::Io(e)),
159 }
160
161 let tag = header[0];
162 let payload_length = u32::from_le_bytes(header[4..8].try_into().unwrap()) as usize;
163 let stored_checksum = u64::from_le_bytes(header[8..16].try_into().unwrap());
164
165 let mut payload = vec![0u8; payload_length];
167 match reader.read_exact(&mut payload) {
168 Ok(()) => {}
169 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
170 Err(e) => return Err(LuciError::Io(e)),
171 }
172
173 if xxh3_checksum(&payload) != stored_checksum {
175 break;
176 }
177
178 match decode_record(tag, &payload) {
180 Some(record) => records.push(record),
181 None => break,
182 }
183 }
184
185 Ok(records)
186}
187
188fn encode_payload(record: &WalRecord) -> (u8, Vec<u8>) {
189 match record {
190 WalRecord::Put { doc_id, data } => {
191 let mut payload = Vec::with_capacity(4 + data.len());
192 payload.extend_from_slice(&doc_id.as_u32().to_le_bytes());
193 payload.extend_from_slice(data);
194 (TAG_PUT, payload)
195 }
196 WalRecord::Delete { doc_id } => {
197 let payload = doc_id.as_u32().to_le_bytes().to_vec();
198 (TAG_DELETE, payload)
199 }
200 }
201}
202
203fn decode_record(tag: u8, payload: &[u8]) -> Option<WalRecord> {
204 match tag {
205 TAG_PUT => {
206 if payload.len() < 4 {
207 return None;
208 }
209 let doc_id = DocId::new(u32::from_le_bytes(payload[0..4].try_into().unwrap()));
210 let data = payload[4..].to_vec();
211 Some(WalRecord::Put { doc_id, data })
212 }
213 TAG_DELETE => {
214 if payload.len() < 4 {
215 return None;
216 }
217 let doc_id = DocId::new(u32::from_le_bytes(payload[0..4].try_into().unwrap()));
218 Some(WalRecord::Delete { doc_id })
219 }
220 _ => None,
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use std::fs;
228
229 fn test_path(name: &str) -> std::path::PathBuf {
230 let dir = std::env::temp_dir().join(format!("luci_wal_test_{}", std::process::id()));
231 fs::create_dir_all(&dir).unwrap();
232 dir.join(name)
233 }
234
235 #[test]
236 fn write_and_replay_put() {
237 let path = test_path("put.wal");
238 let _ = fs::remove_file(&path);
239
240 {
241 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
242 wal.append(&WalRecord::Put {
243 doc_id: DocId::new(1),
244 data: b"hello".to_vec(),
245 })
246 .unwrap();
247 wal.sync().unwrap();
248 }
249
250 let records = replay_wal(&path).unwrap();
251 assert_eq!(records.len(), 1);
252 assert_eq!(
253 records[0],
254 WalRecord::Put {
255 doc_id: DocId::new(1),
256 data: b"hello".to_vec(),
257 }
258 );
259
260 fs::remove_file(&path).unwrap();
261 }
262
263 #[test]
264 fn write_and_replay_delete() {
265 let path = test_path("delete.wal");
266 let _ = fs::remove_file(&path);
267
268 {
269 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
270 wal.append(&WalRecord::Delete {
271 doc_id: DocId::new(42),
272 })
273 .unwrap();
274 wal.sync().unwrap();
275 }
276
277 let records = replay_wal(&path).unwrap();
278 assert_eq!(records.len(), 1);
279 assert_eq!(
280 records[0],
281 WalRecord::Delete {
282 doc_id: DocId::new(42),
283 }
284 );
285
286 fs::remove_file(&path).unwrap();
287 }
288
289 #[test]
290 fn multiple_records() {
291 let path = test_path("multi.wal");
292 let _ = fs::remove_file(&path);
293
294 {
295 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
296 for i in 0..100 {
297 wal.append(&WalRecord::Put {
298 doc_id: DocId::new(i),
299 data: format!("doc-{i}").into_bytes(),
300 })
301 .unwrap();
302 }
303 wal.append(&WalRecord::Delete {
304 doc_id: DocId::new(50),
305 })
306 .unwrap();
307 wal.sync().unwrap();
308 }
309
310 let records = replay_wal(&path).unwrap();
311 assert_eq!(records.len(), 101);
312
313 assert_eq!(
315 records[0],
316 WalRecord::Put {
317 doc_id: DocId::new(0),
318 data: b"doc-0".to_vec(),
319 }
320 );
321 assert_eq!(
322 records[100],
323 WalRecord::Delete {
324 doc_id: DocId::new(50),
325 }
326 );
327
328 fs::remove_file(&path).unwrap();
329 }
330
331 #[test]
332 fn truncate_clears_wal() {
333 let path = test_path("truncate.wal");
334 let _ = fs::remove_file(&path);
335
336 {
337 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
338 wal.append(&WalRecord::Put {
339 doc_id: DocId::new(1),
340 data: b"data".to_vec(),
341 })
342 .unwrap();
343 wal.sync().unwrap();
344 wal.truncate().unwrap();
345 }
346
347 let records = replay_wal(&path).unwrap();
348 assert!(records.is_empty());
349
350 fs::remove_file(&path).unwrap();
351 }
352
353 #[test]
354 fn truncated_header_is_discarded() {
355 let path = test_path("trunc_header.wal");
356 let _ = fs::remove_file(&path);
357
358 {
360 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
361 wal.append(&WalRecord::Put {
362 doc_id: DocId::new(1),
363 data: b"valid".to_vec(),
364 })
365 .unwrap();
366 wal.sync().unwrap();
367 }
368
369 {
371 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
372 file.write_all(&[0u8; 8]).unwrap();
373 }
374
375 let records = replay_wal(&path).unwrap();
376 assert_eq!(records.len(), 1);
377 assert_eq!(
378 records[0],
379 WalRecord::Put {
380 doc_id: DocId::new(1),
381 data: b"valid".to_vec(),
382 }
383 );
384
385 fs::remove_file(&path).unwrap();
386 }
387
388 #[test]
389 fn truncated_payload_is_discarded() {
390 let path = test_path("trunc_payload.wal");
391 let _ = fs::remove_file(&path);
392
393 {
395 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
396 wal.append(&WalRecord::Put {
397 doc_id: DocId::new(1),
398 data: b"first".to_vec(),
399 })
400 .unwrap();
401 wal.append(&WalRecord::Put {
402 doc_id: DocId::new(2),
403 data: b"second".to_vec(),
404 })
405 .unwrap();
406 wal.sync().unwrap();
407 }
408
409 {
411 let meta = fs::metadata(&path).unwrap();
412 let first_record_size = RECORD_HEADER_SIZE + 4 + 5; let truncated_len = first_record_size + RECORD_HEADER_SIZE + 2;
415 assert!(truncated_len < meta.len() as usize);
416 let file = OpenOptions::new().write(true).open(&path).unwrap();
417 file.set_len(truncated_len as u64).unwrap();
418 }
419
420 let records = replay_wal(&path).unwrap();
421 assert_eq!(records.len(), 1);
422 assert_eq!(
423 records[0],
424 WalRecord::Put {
425 doc_id: DocId::new(1),
426 data: b"first".to_vec(),
427 }
428 );
429
430 fs::remove_file(&path).unwrap();
431 }
432
433 #[test]
434 fn corrupted_checksum_stops_replay() {
435 let path = test_path("corrupt_checksum.wal");
436 let _ = fs::remove_file(&path);
437
438 {
439 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
440 wal.append(&WalRecord::Put {
441 doc_id: DocId::new(1),
442 data: b"good".to_vec(),
443 })
444 .unwrap();
445 wal.append(&WalRecord::Put {
446 doc_id: DocId::new(2),
447 data: b"will-be-corrupted".to_vec(),
448 })
449 .unwrap();
450 wal.append(&WalRecord::Put {
451 doc_id: DocId::new(3),
452 data: b"after-corrupt".to_vec(),
453 })
454 .unwrap();
455 wal.sync().unwrap();
456 }
457
458 {
460 let first_record_size = RECORD_HEADER_SIZE + 4 + 4; let checksum_offset = first_record_size + 8; let mut file = OpenOptions::new()
463 .read(true)
464 .write(true)
465 .open(&path)
466 .unwrap();
467 let mut buf = [0u8; 1];
468 use std::io::{Seek, SeekFrom};
469 file.seek(SeekFrom::Start(checksum_offset as u64)).unwrap();
470 file.read_exact(&mut buf).unwrap();
471 buf[0] ^= 0xFF;
472 file.seek(SeekFrom::Start(checksum_offset as u64)).unwrap();
473 file.write_all(&buf).unwrap();
474 }
475
476 let records = replay_wal(&path).unwrap();
478 assert_eq!(records.len(), 1);
479 assert_eq!(
480 records[0],
481 WalRecord::Put {
482 doc_id: DocId::new(1),
483 data: b"good".to_vec(),
484 }
485 );
486
487 fs::remove_file(&path).unwrap();
488 }
489
490 #[test]
491 fn replay_nonexistent_file_returns_empty() {
492 let path = test_path("nonexistent.wal");
493 let _ = fs::remove_file(&path);
494 let records = replay_wal(&path).unwrap();
495 assert!(records.is_empty());
496 }
497
498 #[test]
499 fn replay_empty_file() {
500 let path = test_path("empty.wal");
501 let _ = fs::remove_file(&path);
502 File::create(&path).unwrap();
503
504 let records = replay_wal(&path).unwrap();
505 assert!(records.is_empty());
506
507 fs::remove_file(&path).unwrap();
508 }
509
510 #[test]
511 fn full_durability_mode() {
512 let path = test_path("full_mode.wal");
513 let _ = fs::remove_file(&path);
514
515 let mut wal = Wal::open(&path, DurabilityMode::Full).unwrap();
517 assert_eq!(wal.mode(), DurabilityMode::Full);
518
519 wal.append(&WalRecord::Put {
520 doc_id: DocId::new(1),
521 data: b"durable".to_vec(),
522 })
523 .unwrap();
524
525 drop(wal);
527
528 let records = replay_wal(&path).unwrap();
529 assert_eq!(records.len(), 1);
530
531 fs::remove_file(&path).unwrap();
532 }
533
534 #[test]
535 fn append_after_truncate() {
536 let path = test_path("append_after_trunc.wal");
537 let _ = fs::remove_file(&path);
538
539 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
540
541 wal.append(&WalRecord::Put {
542 doc_id: DocId::new(1),
543 data: b"before".to_vec(),
544 })
545 .unwrap();
546 wal.sync().unwrap();
547 wal.truncate().unwrap();
548
549 wal.append(&WalRecord::Put {
550 doc_id: DocId::new(2),
551 data: b"after".to_vec(),
552 })
553 .unwrap();
554 wal.sync().unwrap();
555 drop(wal);
556
557 let records = replay_wal(&path).unwrap();
558 assert_eq!(records.len(), 1);
559 assert_eq!(
560 records[0],
561 WalRecord::Put {
562 doc_id: DocId::new(2),
563 data: b"after".to_vec(),
564 }
565 );
566
567 fs::remove_file(&path).unwrap();
568 }
569
570 #[test]
571 fn large_payload() {
572 let path = test_path("large_payload.wal");
573 let _ = fs::remove_file(&path);
574
575 let big_data = vec![0xCDu8; 1_000_000];
576
577 {
578 let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
579 wal.append(&WalRecord::Put {
580 doc_id: DocId::new(1),
581 data: big_data.clone(),
582 })
583 .unwrap();
584 wal.sync().unwrap();
585 }
586
587 let records = replay_wal(&path).unwrap();
588 assert_eq!(records.len(), 1);
589 match &records[0] {
590 WalRecord::Put { doc_id, data } => {
591 assert_eq!(*doc_id, DocId::new(1));
592 assert_eq!(data, &big_data);
593 }
594 _ => panic!("expected Put"),
595 }
596
597 fs::remove_file(&path).unwrap();
598 }
599}