regulus_db/persistence/
wal.rs1use std::fs::{File, OpenOptions};
2use std::io::{BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use serde::{Serialize, Deserialize};
5use crate::storage::Row;
6use crate::types::{TableSchema, DbResult, DbError};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub enum WalOperation {
11 CreateTable { schema: TableSchema },
12 DropTable { name: String },
13 Insert { table: String, row_id: u64, row: Row },
14 Update { table: String, row_id: u64, row: Row },
15 Delete { table: String, row_id: u64 },
16}
17
18impl WalOperation {
19 pub fn to_serializable(&self) -> SerializableWalOperation {
21 match self {
22 WalOperation::CreateTable { schema } => SerializableWalOperation::CreateTable { schema: schema.clone() },
23 WalOperation::DropTable { name } => SerializableWalOperation::DropTable { name: name.clone() },
24 WalOperation::Insert { table, row_id, row } => SerializableWalOperation::Insert {
25 table: table.clone(),
26 row_id: *row_id,
27 row: row.0.clone(),
28 },
29 WalOperation::Update { table, row_id, row } => SerializableWalOperation::Update {
30 table: table.clone(),
31 row_id: *row_id,
32 row: row.0.clone(),
33 },
34 WalOperation::Delete { table, row_id } => SerializableWalOperation::Delete {
35 table: table.clone(),
36 row_id: *row_id,
37 },
38 }
39 }
40
41 pub fn from_serializable(op: SerializableWalOperation) -> Self {
42 match op {
43 SerializableWalOperation::CreateTable { schema } => WalOperation::CreateTable { schema },
44 SerializableWalOperation::DropTable { name } => WalOperation::DropTable { name },
45 SerializableWalOperation::Insert { table, row_id, row } => WalOperation::Insert {
46 table,
47 row_id,
48 row: Row(row),
49 },
50 SerializableWalOperation::Update { table, row_id, row } => WalOperation::Update {
51 table,
52 row_id,
53 row: Row(row),
54 },
55 SerializableWalOperation::Delete { table, row_id } => WalOperation::Delete {
56 table,
57 row_id,
58 },
59 }
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub enum SerializableWalOperation {
66 CreateTable { schema: TableSchema },
67 DropTable { name: String },
68 Insert { table: String, row_id: u64, row: IndexMap<String, crate::types::DbValue> },
69 Update { table: String, row_id: u64, row: IndexMap<String, crate::types::DbValue> },
70 Delete { table: String, row_id: u64 },
71}
72
73use indexmap::IndexMap;
74
75pub struct WalManager {
77 file_path: PathBuf,
78 file: Option<BufWriter<File>>,
79 lsn: u64, }
81
82impl WalManager {
83 pub fn new(path: &Path) -> DbResult<Self> {
85 let file = OpenOptions::new()
86 .create(true)
87 .append(true)
88 .read(true)
89 .open(path)
90 .map_err(|e| DbError::IoError(e))?;
91
92 Ok(WalManager {
93 file_path: path.to_path_buf(),
94 file: Some(BufWriter::new(file)),
95 lsn: 0,
96 })
97 }
98
99 pub fn append(&mut self, op: WalOperation) -> DbResult<u64> {
101 let writer = self.file.as_mut().ok_or_else(|| {
102 DbError::InternalError("WAL file not opened".to_string())
103 })?;
104
105 let serializable_op = op.to_serializable();
107
108 let data = bincode::serialize(&serializable_op)
110 .map_err(|e| DbError::InternalError(format!("Serialization error: {}", e)))?;
111
112 let len = data.len() as u32;
114 writer.write_all(&len.to_le_bytes())?;
115 writer.write_all(&data)?;
116 writer.flush()?;
117
118 let current_lsn = self.lsn;
119 self.lsn += 1;
120
121 Ok(current_lsn)
122 }
123
124 pub fn replay<F>(&mut self, mut apply: F) -> DbResult<()>
126 where
127 F: FnMut(WalOperation) -> DbResult<()>,
128 {
129 self.file = None;
131
132 let file = File::open(&self.file_path)
134 .map_err(|e| DbError::IoError(e))?;
135 let mut reader = BufReader::new(file);
136
137 loop {
139 let mut len_buf = [0u8; 4];
141 match reader.read_exact(&mut len_buf) {
142 Ok(_) => {}
143 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
144 Err(e) => return Err(DbError::IoError(e)),
145 }
146 let len = u32::from_le_bytes(len_buf) as usize;
147
148 let mut data = vec![0u8; len];
150 reader.read_exact(&mut data)?;
151
152 let serializable_op: SerializableWalOperation = bincode::deserialize(&data)
154 .map_err(|e| DbError::InternalError(format!("Deserialization error: {}", e)))?;
155
156 let op = WalOperation::from_serializable(serializable_op);
158 apply(op)?;
159 }
160
161 let file = OpenOptions::new()
163 .create(true)
164 .append(true)
165 .read(true)
166 .open(&self.file_path)
167 .map_err(|e| DbError::IoError(e))?;
168 self.file = Some(BufWriter::new(file));
169
170 Ok(())
171 }
172
173 pub fn truncate(&mut self) -> DbResult<()> {
175 if let Some(writer) = &mut self.file {
177 writer.flush()?;
178 }
179
180 self.file = None;
182
183 let file = File::create(&self.file_path)
185 .map_err(|e| DbError::IoError(e))?;
186
187 self.file = Some(BufWriter::new(file));
189 self.lsn = 0;
190
191 Ok(())
192 }
193
194 pub fn size(&self) -> DbResult<u64> {
196 let metadata = std::fs::metadata(&self.file_path)
197 .map_err(|e| DbError::IoError(e))?;
198 Ok(metadata.len())
199 }
200}