mini_lsm/
wal.rs

1use std::fs::{File, OpenOptions};
2use std::hash::Hasher;
3use std::io::{BufWriter, Read, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7use anyhow::{bail, Context, Result};
8use bytes::{Buf, BufMut, Bytes};
9use crossbeam_skiplist::SkipMap;
10use parking_lot::Mutex;
11
12pub struct Wal {
13    file: Arc<Mutex<BufWriter<File>>>,
14}
15
16impl Wal {
17    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
18        Ok(Self {
19            file: Arc::new(Mutex::new(BufWriter::new(
20                OpenOptions::new()
21                    .read(true)
22                    .create_new(true)
23                    .write(true)
24                    .open(path)
25                    .context("failed to create WAL")?,
26            ))),
27        })
28    }
29
30    pub fn recover(path: impl AsRef<Path>, skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {
31        let path = path.as_ref();
32        let mut file = OpenOptions::new()
33            .read(true)
34            .append(true)
35            .open(path)
36            .context("failed to recover from WAL")?;
37        let mut buf = Vec::new();
38        file.read_to_end(&mut buf)?;
39        let mut rbuf: &[u8] = buf.as_slice();
40        while rbuf.has_remaining() {
41            let mut hasher = crc32fast::Hasher::new();
42            let key_len = rbuf.get_u16() as usize;
43            hasher.write_u16(key_len as u16);
44            let key = Bytes::copy_from_slice(&rbuf[..key_len]);
45            hasher.write(&key);
46            rbuf.advance(key_len);
47            let value_len = rbuf.get_u16() as usize;
48            hasher.write_u16(value_len as u16);
49            let value = Bytes::copy_from_slice(&rbuf[..value_len]);
50            hasher.write(&value);
51            rbuf.advance(value_len);
52            let checksum = rbuf.get_u32();
53            if hasher.finalize() != checksum {
54                bail!("checksum mismatch");
55            }
56            skiplist.insert(key, value);
57        }
58        Ok(Self {
59            file: Arc::new(Mutex::new(BufWriter::new(file))),
60        })
61    }
62
63    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
64        let mut file = self.file.lock();
65        let mut buf: Vec<u8> =
66            Vec::with_capacity(key.len() + value.len() + std::mem::size_of::<u16>());
67        let mut hasher = crc32fast::Hasher::new();
68        hasher.write_u16(key.len() as u16);
69        buf.put_u16(key.len() as u16);
70        hasher.write(key);
71        buf.put_slice(key);
72        hasher.write_u16(value.len() as u16);
73        buf.put_u16(value.len() as u16);
74        buf.put_slice(value);
75        hasher.write(value);
76        // add checksum: week 2 day 7
77        buf.put_u32(hasher.finalize());
78        file.write_all(&buf)?;
79        Ok(())
80    }
81
82    pub fn sync(&self) -> Result<()> {
83        let mut file = self.file.lock();
84        file.flush()?;
85        file.get_mut().sync_all()?;
86        Ok(())
87    }
88}