1use std::fs::{File, OpenOptions};
2use std::hash::Hasher;
3use std::io::{BufWriter, Read, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7use anyhow::{bail, Context, Result};
8use bytes::{Buf, BufMut, Bytes};
9use crossbeam_skiplist::SkipMap;
10use parking_lot::Mutex;
11
12pub struct Wal {
13 file: Arc<Mutex<BufWriter<File>>>,
14}
15
16impl Wal {
17 pub fn create(path: impl AsRef<Path>) -> Result<Self> {
18 Ok(Self {
19 file: Arc::new(Mutex::new(BufWriter::new(
20 OpenOptions::new()
21 .read(true)
22 .create_new(true)
23 .write(true)
24 .open(path)
25 .context("failed to create WAL")?,
26 ))),
27 })
28 }
29
30 pub fn recover(path: impl AsRef<Path>, skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {
31 let path = path.as_ref();
32 let mut file = OpenOptions::new()
33 .read(true)
34 .append(true)
35 .open(path)
36 .context("failed to recover from WAL")?;
37 let mut buf = Vec::new();
38 file.read_to_end(&mut buf)?;
39 let mut rbuf: &[u8] = buf.as_slice();
40 while rbuf.has_remaining() {
41 let mut hasher = crc32fast::Hasher::new();
42 let key_len = rbuf.get_u16() as usize;
43 hasher.write_u16(key_len as u16);
44 let key = Bytes::copy_from_slice(&rbuf[..key_len]);
45 hasher.write(&key);
46 rbuf.advance(key_len);
47 let value_len = rbuf.get_u16() as usize;
48 hasher.write_u16(value_len as u16);
49 let value = Bytes::copy_from_slice(&rbuf[..value_len]);
50 hasher.write(&value);
51 rbuf.advance(value_len);
52 let checksum = rbuf.get_u32();
53 if hasher.finalize() != checksum {
54 bail!("checksum mismatch");
55 }
56 skiplist.insert(key, value);
57 }
58 Ok(Self {
59 file: Arc::new(Mutex::new(BufWriter::new(file))),
60 })
61 }
62
63 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
64 let mut file = self.file.lock();
65 let mut buf: Vec<u8> =
66 Vec::with_capacity(key.len() + value.len() + std::mem::size_of::<u16>());
67 let mut hasher = crc32fast::Hasher::new();
68 hasher.write_u16(key.len() as u16);
69 buf.put_u16(key.len() as u16);
70 hasher.write(key);
71 buf.put_slice(key);
72 hasher.write_u16(value.len() as u16);
73 buf.put_u16(value.len() as u16);
74 buf.put_slice(value);
75 hasher.write(value);
76 buf.put_u32(hasher.finalize());
78 file.write_all(&buf)?;
79 Ok(())
80 }
81
82 pub fn sync(&self) -> Result<()> {
83 let mut file = self.file.lock();
84 file.flush()?;
85 file.get_mut().sync_all()?;
86 Ok(())
87 }
88}