1use rustlite_core::Result;
24use serde::{Deserialize, Serialize};
25
26pub mod reader;
27pub mod record;
28pub mod recovery;
29pub mod segment;
30pub mod writer;
31
32pub use reader::WalReader;
33pub use record::{RecordPayload, RecordType, WalRecord};
34pub use recovery::{RecoveryManager, RecoveryStats};
35pub use segment::{SegmentInfo, SegmentManager};
36pub use writer::WalWriter;
37
38#[derive(Debug, Clone)]
40pub struct WalConfig {
41 pub sync_mode: SyncMode,
43 pub max_segment_size: u64,
45 pub wal_dir: std::path::PathBuf,
47}
48
49impl Default for WalConfig {
50 fn default() -> Self {
51 Self {
52 sync_mode: SyncMode::Sync,
53 max_segment_size: 64 * 1024 * 1024, wal_dir: std::path::PathBuf::from("wal"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum SyncMode {
62 Sync,
64 Async,
66 None,
68}
69
70pub struct WalManager {
72 config: WalConfig,
73 writer: Option<WalWriter>,
74}
75
76impl WalManager {
77 pub fn new(config: WalConfig) -> Result<Self> {
78 Ok(Self {
79 config,
80 writer: None,
81 })
82 }
83
84 pub fn open(&mut self) -> Result<()> {
88 let writer = WalWriter::new(
89 &self.config.wal_dir,
90 self.config.max_segment_size,
91 self.config.sync_mode,
92 )?;
93 self.writer = Some(writer);
94
95 Ok(())
96 }
97
98 pub fn append(&mut self, record: WalRecord) -> Result<u64> {
100 let writer = self
101 .writer
102 .as_mut()
103 .ok_or_else(|| rustlite_core::Error::InvalidOperation("WAL not opened".to_string()))?;
104 writer.append(record)
105 }
106
107 pub fn sync(&mut self) -> Result<()> {
109 if let Some(writer) = &mut self.writer {
110 writer.sync()
111 } else {
112 Ok(())
113 }
114 }
115
116 pub fn close(&mut self) -> Result<()> {
118 if let Some(mut writer) = self.writer.take() {
119 writer.sync()?;
120 }
121 Ok(())
122 }
123
124 pub fn recover(&self) -> Result<Vec<WalRecord>> {
129 let recovery = RecoveryManager::new(self.config.clone())?;
130 recovery.recover()
131 }
132
133 pub fn recover_with_markers(&self) -> Result<Vec<WalRecord>> {
137 let recovery = RecoveryManager::new(self.config.clone())?;
138 recovery.recover_with_markers()
139 }
140
141 pub fn stats(&self) -> Result<RecoveryStats> {
143 let recovery = RecoveryManager::new(self.config.clone())?;
144 recovery.get_stats()
145 }
146
147 pub fn reader(&self) -> Result<WalReader> {
149 WalReader::new(&self.config.wal_dir)
150 }
151
152 pub fn segment_manager(&self) -> SegmentManager {
154 SegmentManager::new(self.config.wal_dir.clone())
155 }
156
157 pub fn config(&self) -> &WalConfig {
159 &self.config
160 }
161
162 pub fn is_open(&self) -> bool {
164 self.writer.is_some()
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use tempfile::TempDir;
172
173 fn setup_test_config() -> (TempDir, WalConfig) {
174 let temp_dir = TempDir::new().expect("Failed to create temp dir");
175 let wal_path = temp_dir.path().join("wal");
176 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
177
178 let config = WalConfig {
179 wal_dir: wal_path,
180 sync_mode: SyncMode::Sync,
181 max_segment_size: 64 * 1024 * 1024,
182 };
183
184 (temp_dir, config)
185 }
186
187 #[test]
188 fn test_wal_config_default() {
189 let config = WalConfig::default();
190 assert_eq!(config.sync_mode, SyncMode::Sync);
191 assert_eq!(config.max_segment_size, 64 * 1024 * 1024);
192 }
193
194 #[test]
195 fn test_sync_mode() {
196 assert_eq!(SyncMode::Sync, SyncMode::Sync);
197 assert_ne!(SyncMode::Sync, SyncMode::Async);
198 }
199
200 #[test]
201 fn test_wal_manager_lifecycle() {
202 let (_temp_dir, config) = setup_test_config();
203
204 let mut manager = WalManager::new(config).expect("Failed to create manager");
205 assert!(!manager.is_open());
206
207 manager.open().expect("Failed to open");
208 assert!(manager.is_open());
209
210 manager.close().expect("Failed to close");
211 assert!(!manager.is_open());
212 }
213
214 #[test]
215 fn test_wal_manager_write_and_recover() {
216 let (_temp_dir, config) = setup_test_config();
217
218 {
220 let mut manager = WalManager::new(config.clone()).expect("Failed to create manager");
221 manager.open().expect("Failed to open");
222
223 for i in 0..5 {
224 let record = WalRecord::put(
225 format!("key{}", i).into_bytes(),
226 format!("value{}", i).into_bytes(),
227 );
228 manager.append(record).expect("Failed to append");
229 }
230
231 manager.sync().expect("Failed to sync");
232 manager.close().expect("Failed to close");
233 }
234
235 {
237 let manager = WalManager::new(config).expect("Failed to create manager");
238 let records = manager.recover().expect("Failed to recover");
239
240 assert_eq!(records.len(), 5);
241 }
242 }
243
244 #[test]
245 fn test_wal_manager_stats() {
246 let (_temp_dir, config) = setup_test_config();
247
248 {
250 let mut manager = WalManager::new(config.clone()).expect("Failed to create manager");
251 manager.open().expect("Failed to open");
252
253 manager.append(WalRecord::begin_tx(1)).expect("Failed");
254 manager.append(WalRecord::put(b"k".to_vec(), b"v".to_vec())).expect("Failed");
255 manager.append(WalRecord::commit_tx(1)).expect("Failed");
256
257 manager.close().expect("Failed to close");
258 }
259
260 let manager = WalManager::new(config).expect("Failed to create manager");
261 let stats = manager.stats().expect("Failed to get stats");
262
263 assert_eq!(stats.total_records, 3);
264 assert_eq!(stats.transactions_started, 1);
265 assert_eq!(stats.transactions_committed, 1);
266 }
267
268 #[test]
269 fn test_wal_manager_segment_manager() {
270 let (_temp_dir, config) = setup_test_config();
271
272 {
273 let mut manager = WalManager::new(config.clone()).expect("Failed to create manager");
274 manager.open().expect("Failed to open");
275 manager.append(WalRecord::put(b"k".to_vec(), b"v".to_vec())).expect("Failed");
276 manager.close().expect("Failed to close");
277 }
278
279 let manager = WalManager::new(config).expect("Failed to create manager");
280 let seg_manager = manager.segment_manager();
281
282 assert_eq!(seg_manager.segment_count().unwrap(), 1);
283 }
284}