Skip to main content

regulus_db/persistence/
wal.rs

1use std::fs::{File, OpenOptions};
2use std::io::{BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use serde::{Serialize, Deserialize};
5use crate::storage::Row;
6use crate::types::{TableSchema, DbResult, DbError};
7
8/// WAL 操作类型
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub enum WalOperation {
11    CreateTable { schema: TableSchema },
12    DropTable { name: String },
13    Insert { table: String, row_id: u64, row: Row },
14    Update { table: String, row_id: u64, row: Row },
15    Delete { table: String, row_id: u64 },
16}
17
18impl WalOperation {
19    /// 转换为没有 row_id 包装的版本(用于序列化到 WAL)
20    pub fn to_serializable(&self) -> SerializableWalOperation {
21        match self {
22            WalOperation::CreateTable { schema } => SerializableWalOperation::CreateTable { schema: schema.clone() },
23            WalOperation::DropTable { name } => SerializableWalOperation::DropTable { name: name.clone() },
24            WalOperation::Insert { table, row_id, row } => SerializableWalOperation::Insert {
25                table: table.clone(),
26                row_id: *row_id,
27                row: row.0.clone(),
28            },
29            WalOperation::Update { table, row_id, row } => SerializableWalOperation::Update {
30                table: table.clone(),
31                row_id: *row_id,
32                row: row.0.clone(),
33            },
34            WalOperation::Delete { table, row_id } => SerializableWalOperation::Delete {
35                table: table.clone(),
36                row_id: *row_id,
37            },
38        }
39    }
40
41    pub fn from_serializable(op: SerializableWalOperation) -> Self {
42        match op {
43            SerializableWalOperation::CreateTable { schema } => WalOperation::CreateTable { schema },
44            SerializableWalOperation::DropTable { name } => WalOperation::DropTable { name },
45            SerializableWalOperation::Insert { table, row_id, row } => WalOperation::Insert {
46                table,
47                row_id,
48                row: Row(row),
49            },
50            SerializableWalOperation::Update { table, row_id, row } => WalOperation::Update {
51                table,
52                row_id,
53                row: Row(row),
54            },
55            SerializableWalOperation::Delete { table, row_id } => WalOperation::Delete {
56                table,
57                row_id,
58            },
59        }
60    }
61}
62
63/// 可序列化的 WAL 操作
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub enum SerializableWalOperation {
66    CreateTable { schema: TableSchema },
67    DropTable { name: String },
68    Insert { table: String, row_id: u64, row: IndexMap<String, crate::types::DbValue> },
69    Update { table: String, row_id: u64, row: IndexMap<String, crate::types::DbValue> },
70    Delete { table: String, row_id: u64 },
71}
72
73use indexmap::IndexMap;
74
75/// WAL 管理器
76pub struct WalManager {
77    file_path: PathBuf,
78    file: Option<BufWriter<File>>,
79    lsn: u64,  // Log Sequence Number
80}
81
82impl WalManager {
83    /// 创建或打开 WAL 文件
84    pub fn new(path: &Path) -> DbResult<Self> {
85        let file = OpenOptions::new()
86            .create(true)
87            .append(true)
88            .read(true)
89            .open(path)
90            .map_err(|e| DbError::IoError(e))?;
91
92        Ok(WalManager {
93            file_path: path.to_path_buf(),
94            file: Some(BufWriter::new(file)),
95            lsn: 0,
96        })
97    }
98
99    /// 追加操作到 WAL
100    pub fn append(&mut self, op: WalOperation) -> DbResult<u64> {
101        let writer = self.file.as_mut().ok_or_else(|| {
102            DbError::InternalError("WAL file not opened".to_string())
103        })?;
104
105        // 转换为可序列化的格式
106        let serializable_op = op.to_serializable();
107
108        // 序列化操作
109        let data = bincode::serialize(&serializable_op)
110            .map_err(|e| DbError::InternalError(format!("Serialization error: {}", e)))?;
111
112        // 写入格式:[长度 4 字节][数据]
113        let len = data.len() as u32;
114        writer.write_all(&len.to_le_bytes())?;
115        writer.write_all(&data)?;
116        writer.flush()?;
117
118        let current_lsn = self.lsn;
119        self.lsn += 1;
120
121        Ok(current_lsn)
122    }
123
124    /// 重放所有 WAL 操作
125    pub fn replay<F>(&mut self, mut apply: F) -> DbResult<()>
126    where
127        F: FnMut(WalOperation) -> DbResult<()>,
128    {
129        // 关闭当前的写入器
130        self.file = None;
131
132        // 重新打开文件用于读取
133        let file = File::open(&self.file_path)
134            .map_err(|e| DbError::IoError(e))?;
135        let mut reader = BufReader::new(file);
136
137        // 读取并重放所有操作
138        loop {
139            // 读取长度
140            let mut len_buf = [0u8; 4];
141            match reader.read_exact(&mut len_buf) {
142                Ok(_) => {}
143                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
144                Err(e) => return Err(DbError::IoError(e)),
145            }
146            let len = u32::from_le_bytes(len_buf) as usize;
147
148            // 读取数据
149            let mut data = vec![0u8; len];
150            reader.read_exact(&mut data)?;
151
152            // 反序列化为可序列化类型
153            let serializable_op: SerializableWalOperation = bincode::deserialize(&data)
154                .map_err(|e| DbError::InternalError(format!("Deserialization error: {}", e)))?;
155
156            // 转换为 WalOperation 并应用
157            let op = WalOperation::from_serializable(serializable_op);
158            apply(op)?;
159        }
160
161        // 重新打开写入器
162        let file = OpenOptions::new()
163            .create(true)
164            .append(true)
165            .read(true)
166            .open(&self.file_path)
167            .map_err(|e| DbError::IoError(e))?;
168        self.file = Some(BufWriter::new(file));
169
170        Ok(())
171    }
172
173    /// 截断 WAL 文件
174    pub fn truncate(&mut self) -> DbResult<()> {
175        // 刷新缓冲区
176        if let Some(writer) = &mut self.file {
177            writer.flush()?;
178        }
179
180        // 关闭文件
181        self.file = None;
182
183        // 截断并重新创建
184        let file = File::create(&self.file_path)
185            .map_err(|e| DbError::IoError(e))?;
186
187        // 重新打开写入器
188        self.file = Some(BufWriter::new(file));
189        self.lsn = 0;
190
191        Ok(())
192    }
193
194    /// 获取 WAL 文件大小(字节数)
195    pub fn size(&self) -> DbResult<u64> {
196        let metadata = std::fs::metadata(&self.file_path)
197            .map_err(|e| DbError::IoError(e))?;
198        Ok(metadata.len())
199    }
200}