Skip to main content

sql_rs/storage/
wal.rs

1use crate::{SqlRsError, Result};
2use parking_lot::Mutex;
3use serde::{Deserialize, Serialize};
4use std::fs::{File, OpenOptions};
5use std::io::{BufReader, BufWriter, Read, Write};
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub enum WalEntry {
10    Put { key: Vec<u8>, value: Vec<u8> },
11    Delete { key: Vec<u8> },
12    Checkpoint,
13}
14
15pub struct WriteAheadLog {
16    path: PathBuf,
17    writer: Mutex<BufWriter<File>>,
18}
19
20impl WriteAheadLog {
21    pub fn open<P: AsRef<Path>>(db_path: P) -> Result<Self> {
22        let mut wal_path = PathBuf::from(db_path.as_ref());
23        wal_path.set_extension("wal");
24
25        let file = OpenOptions::new()
26            .create(true)
27            .append(true)
28            .open(&wal_path)?;
29
30        Ok(Self {
31            path: wal_path,
32            writer: Mutex::new(BufWriter::new(file)),
33        })
34    }
35
36    pub fn log_put(&self, key: &[u8], value: &[u8]) -> Result<()> {
37        let entry = WalEntry::Put {
38            key: key.to_vec(),
39            value: value.to_vec(),
40        };
41        self.write_entry(&entry)
42    }
43
44    pub fn log_delete(&self, key: &[u8]) -> Result<()> {
45        let entry = WalEntry::Delete {
46            key: key.to_vec(),
47        };
48        self.write_entry(&entry)
49    }
50
51    pub fn checkpoint(&self) -> Result<()> {
52        let entry = WalEntry::Checkpoint;
53        self.write_entry(&entry)?;
54        self.writer.lock().flush()?;
55        Ok(())
56    }
57
58    fn write_entry(&self, entry: &WalEntry) -> Result<()> {
59        let mut writer = self.writer.lock();
60        let data = bincode::serialize(entry)
61            .map_err(|e| SqlRsError::Serialization(format!("WAL serialize error: {}", e)))?;
62
63        let len = data.len() as u32;
64        writer.write_all(&len.to_le_bytes())?;
65        writer.write_all(&data)?;
66
67        Ok(())
68    }
69
70    pub fn replay<F>(&self, mut apply: F) -> Result<()>
71    where
72        F: FnMut(WalEntry) -> Result<()>,
73    {
74        let file = File::open(&self.path)?;
75        let mut reader = BufReader::new(file);
76
77        loop {
78            let mut len_bytes = [0u8; 4];
79            match reader.read_exact(&mut len_bytes) {
80                Ok(_) => {}
81                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
82                Err(e) => return Err(e.into()),
83            }
84
85            let len = u32::from_le_bytes(len_bytes) as usize;
86            let mut data = vec![0u8; len];
87            reader.read_exact(&mut data)?;
88
89            let entry: WalEntry = bincode::deserialize(&data)
90                .map_err(|e| SqlRsError::Serialization(format!("WAL deserialize error: {}", e)))?;
91
92            apply(entry)?;
93        }
94
95        Ok(())
96    }
97
98    pub fn truncate(&self) -> Result<()> {
99        let mut writer = self.writer.lock();
100        writer.flush()?;
101        drop(writer);
102
103        let file = OpenOptions::new()
104            .write(true)
105            .truncate(true)
106            .open(&self.path)?;
107
108        *self.writer.lock() = BufWriter::new(file);
109        Ok(())
110    }
111
112    pub fn flush(&self) -> Result<()> {
113        self.writer.lock().flush()?;
114        Ok(())
115    }
116}