1use crate::{SqlRsError, Result};
2use parking_lot::Mutex;
3use serde::{Deserialize, Serialize};
4use std::fs::{File, OpenOptions};
5use std::io::{BufReader, BufWriter, Read, Write};
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub enum WalEntry {
10 Put { key: Vec<u8>, value: Vec<u8> },
11 Delete { key: Vec<u8> },
12 Checkpoint,
13}
14
15pub struct WriteAheadLog {
16 path: PathBuf,
17 writer: Mutex<BufWriter<File>>,
18}
19
20impl WriteAheadLog {
21 pub fn open<P: AsRef<Path>>(db_path: P) -> Result<Self> {
22 let mut wal_path = PathBuf::from(db_path.as_ref());
23 wal_path.set_extension("wal");
24
25 let file = OpenOptions::new()
26 .create(true)
27 .append(true)
28 .open(&wal_path)?;
29
30 Ok(Self {
31 path: wal_path,
32 writer: Mutex::new(BufWriter::new(file)),
33 })
34 }
35
36 pub fn log_put(&self, key: &[u8], value: &[u8]) -> Result<()> {
37 let entry = WalEntry::Put {
38 key: key.to_vec(),
39 value: value.to_vec(),
40 };
41 self.write_entry(&entry)
42 }
43
44 pub fn log_delete(&self, key: &[u8]) -> Result<()> {
45 let entry = WalEntry::Delete {
46 key: key.to_vec(),
47 };
48 self.write_entry(&entry)
49 }
50
51 pub fn checkpoint(&self) -> Result<()> {
52 let entry = WalEntry::Checkpoint;
53 self.write_entry(&entry)?;
54 self.writer.lock().flush()?;
55 Ok(())
56 }
57
58 fn write_entry(&self, entry: &WalEntry) -> Result<()> {
59 let mut writer = self.writer.lock();
60 let data = bincode::serialize(entry)
61 .map_err(|e| SqlRsError::Serialization(format!("WAL serialize error: {}", e)))?;
62
63 let len = data.len() as u32;
64 writer.write_all(&len.to_le_bytes())?;
65 writer.write_all(&data)?;
66
67 Ok(())
68 }
69
70 pub fn replay<F>(&self, mut apply: F) -> Result<()>
71 where
72 F: FnMut(WalEntry) -> Result<()>,
73 {
74 let file = File::open(&self.path)?;
75 let mut reader = BufReader::new(file);
76
77 loop {
78 let mut len_bytes = [0u8; 4];
79 match reader.read_exact(&mut len_bytes) {
80 Ok(_) => {}
81 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
82 Err(e) => return Err(e.into()),
83 }
84
85 let len = u32::from_le_bytes(len_bytes) as usize;
86 let mut data = vec![0u8; len];
87 reader.read_exact(&mut data)?;
88
89 let entry: WalEntry = bincode::deserialize(&data)
90 .map_err(|e| SqlRsError::Serialization(format!("WAL deserialize error: {}", e)))?;
91
92 apply(entry)?;
93 }
94
95 Ok(())
96 }
97
98 pub fn truncate(&self) -> Result<()> {
99 let mut writer = self.writer.lock();
100 writer.flush()?;
101 drop(writer);
102
103 let file = OpenOptions::new()
104 .write(true)
105 .truncate(true)
106 .open(&self.path)?;
107
108 *self.writer.lock() = BufWriter::new(file);
109 Ok(())
110 }
111
112 pub fn flush(&self) -> Result<()> {
113 self.writer.lock().flush()?;
114 Ok(())
115 }
116}