reddb_server/storage/wal/
checkpoint.rs1use std::collections::{HashMap, HashSet};
22use std::io;
23use std::path::Path;
24
25use super::reader::WalReader;
26use super::record::WalRecord;
27use super::writer::WalWriter;
28use crate::storage::engine::{Page, Pager, PAGE_SIZE};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum CheckpointMode {
33 Passive,
35 Full,
37 Restart,
39 Truncate,
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct CheckpointResult {
46 pub transactions_processed: u64,
48 pub pages_checkpointed: u64,
50 pub records_processed: u64,
52 pub checkpoint_lsn: u64,
54 pub wal_truncated: bool,
56}
57
58#[derive(Debug)]
60pub enum CheckpointError {
61 Io(io::Error),
63 Pager(String),
65 CorruptedWal(String),
67 NoWal,
69}
70
71impl std::fmt::Display for CheckpointError {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 Self::Io(e) => write!(f, "I/O error: {}", e),
75 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
76 Self::CorruptedWal(msg) => write!(f, "Corrupted WAL: {}", msg),
77 Self::NoWal => write!(f, "No WAL file found"),
78 }
79 }
80}
81
82impl std::error::Error for CheckpointError {}
83
84impl From<io::Error> for CheckpointError {
85 fn from(e: io::Error) -> Self {
86 Self::Io(e)
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum TxState {
93 Active,
94 Committed,
95 Aborted,
96}
97
98#[derive(Debug)]
100struct PendingWrite {
101 tx_id: u64,
102 page_id: u32,
103 data: Vec<u8>,
104 lsn: u64,
105}
106
107pub struct Checkpointer {
111 mode: CheckpointMode,
113}
114
115impl Checkpointer {
116 pub fn new(mode: CheckpointMode) -> Self {
118 Self { mode }
119 }
120
121 pub fn default_mode() -> Self {
123 Self::new(CheckpointMode::Full)
124 }
125
126 pub fn checkpoint(
139 &self,
140 pager: &Pager,
141 wal_path: &Path,
142 ) -> Result<CheckpointResult, CheckpointError> {
143 let wal_reader = match WalReader::open(wal_path) {
145 Ok(r) => r,
146 Err(e) if e.kind() == io::ErrorKind::NotFound => {
147 return Ok(CheckpointResult::default());
149 }
150 Err(e) => return Err(CheckpointError::Io(e)),
151 };
152
153 let mut tx_states: HashMap<u64, TxState> = HashMap::new();
155 let mut pending_writes: Vec<PendingWrite> = Vec::new();
156 let mut records_processed: u64 = 0;
157 let mut last_lsn: u64 = 0;
158
159 for record_result in wal_reader.iter() {
160 let (lsn, record) = record_result.map_err(CheckpointError::Io)?;
161 records_processed += 1;
162 last_lsn = lsn;
163
164 match record {
165 WalRecord::Begin { tx_id } => {
166 tx_states.insert(tx_id, TxState::Active);
167 }
168 WalRecord::Commit { tx_id } => {
169 tx_states.insert(tx_id, TxState::Committed);
170 }
171 WalRecord::Rollback { tx_id } => {
172 tx_states.insert(tx_id, TxState::Aborted);
173 }
174 WalRecord::PageWrite {
175 tx_id,
176 page_id,
177 data,
178 } => {
179 pending_writes.push(PendingWrite {
180 tx_id,
181 page_id,
182 data,
183 lsn,
184 });
185 }
186 WalRecord::Checkpoint {
187 lsn: _checkpoint_lsn,
188 } => {
189 }
192 WalRecord::TxCommitBatch { .. } => {
193 }
196 WalRecord::VectorInsert { .. } => {
197 crate::runtime::turbo_crash_inject::fire(
201 crate::runtime::turbo_crash_inject::InjectionPoint::MidCheckpoint,
202 );
203 }
204 WalRecord::FullPageImage { .. } => {
205 }
209 }
210 }
211
212 let committed_txs: HashSet<u64> = tx_states
214 .iter()
215 .filter(|(_, state)| **state == TxState::Committed)
216 .map(|(tx_id, _)| *tx_id)
217 .collect();
218
219 let mut latest_writes: HashMap<u32, Vec<u8>> = HashMap::new();
222
223 for write in pending_writes {
224 if committed_txs.contains(&write.tx_id) {
225 latest_writes.insert(write.page_id, write.data);
227 }
228 }
229
230 if !latest_writes.is_empty() {
232 pager
233 .set_checkpoint_in_progress(true, last_lsn)
234 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
235 }
236
237 let mut pages_checkpointed: u64 = 0;
239
240 for (page_id, data) in &latest_writes {
241 if data.len() != PAGE_SIZE {
243 return Err(CheckpointError::CorruptedWal(format!(
244 "Page {} has wrong size: {} (expected {})",
245 page_id,
246 data.len(),
247 PAGE_SIZE
248 )));
249 }
250
251 let mut page_data = [0u8; PAGE_SIZE];
252 page_data.copy_from_slice(data);
253 let page = Page::from_bytes(page_data);
254
255 pager
257 .write_page(*page_id, page)
258 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
259
260 pages_checkpointed += 1;
261 }
262
263 pager
265 .sync()
266 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
267
268 if !latest_writes.is_empty() {
270 pager
271 .complete_checkpoint(last_lsn)
272 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
273 }
274
275 let wal_truncated = matches!(
277 self.mode,
278 CheckpointMode::Restart | CheckpointMode::Truncate
279 );
280
281 if wal_truncated {
282 let mut wal_writer = WalWriter::open(wal_path)?;
283 wal_writer.truncate()?;
284
285 let checkpoint_record = WalRecord::Checkpoint { lsn: last_lsn };
287 wal_writer.append(&checkpoint_record)?;
288 wal_writer.sync()?;
289 }
290
291 Ok(CheckpointResult {
292 transactions_processed: committed_txs.len() as u64,
293 pages_checkpointed,
294 records_processed,
295 checkpoint_lsn: last_lsn,
296 wal_truncated,
297 })
298 }
299
300 pub fn recover(pager: &Pager, wal_path: &Path) -> Result<CheckpointResult, CheckpointError> {
315 if let Ok(header) = pager.header() {
317 if header.checkpoint_in_progress {
318 let _ = pager.set_checkpoint_in_progress(false, 0);
321 }
322 }
323 let checkpointer = Self::new(CheckpointMode::Truncate);
324 checkpointer.checkpoint(pager, wal_path)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::storage::engine::PageType;
332 use std::fs;
333 use std::time::{SystemTime, UNIX_EPOCH};
334
335 fn temp_dir() -> std::path::PathBuf {
336 let timestamp = SystemTime::now()
337 .duration_since(UNIX_EPOCH)
338 .unwrap()
339 .as_nanos();
340 std::env::temp_dir().join(format!("reddb_checkpoint_test_{}", timestamp))
341 }
342
343 fn cleanup(dir: &Path) {
344 let _ = fs::remove_dir_all(dir);
345 }
346
347 #[test]
348 fn test_checkpoint_empty_wal() {
349 let dir = temp_dir();
350 let _ = fs::create_dir_all(&dir);
351 let db_path = dir.join("test.db");
352 let wal_path = dir.join("test.wal");
353
354 let pager = Pager::open_default(&db_path).unwrap();
356
357 let checkpointer = Checkpointer::default_mode();
359 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
360
361 assert_eq!(result.transactions_processed, 0);
362 assert_eq!(result.pages_checkpointed, 0);
363
364 cleanup(&dir);
365 }
366
367 #[test]
368 fn test_checkpoint_committed_transaction() {
369 let dir = temp_dir();
370 let _ = fs::create_dir_all(&dir);
371 let db_path = dir.join("test.db");
372 let wal_path = dir.join("test.wal");
373
374 let pager = Pager::open_default(&db_path).unwrap();
376
377 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
379 let page_id = page.page_id();
380
381 {
383 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
384
385 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
387
388 let mut page_data = [0u8; PAGE_SIZE];
390 page_data[0] = 0x42; wal_writer
392 .append(&WalRecord::PageWrite {
393 tx_id: 1,
394 page_id,
395 data: page_data.to_vec(),
396 })
397 .unwrap();
398
399 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
401
402 wal_writer.sync().unwrap();
403 }
404
405 let checkpointer = Checkpointer::new(CheckpointMode::Full);
407 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
408
409 assert_eq!(result.transactions_processed, 1);
410 assert_eq!(result.pages_checkpointed, 1);
411 assert_eq!(result.records_processed, 3);
412
413 let read_page = pager.read_page(page_id).unwrap();
415 assert_eq!(read_page.as_bytes()[0], 0x42);
416
417 cleanup(&dir);
418 }
419
420 #[test]
421 fn test_checkpoint_aborted_transaction() {
422 let dir = temp_dir();
423 let _ = fs::create_dir_all(&dir);
424 let db_path = dir.join("test.db");
425 let wal_path = dir.join("test.wal");
426
427 let pager = Pager::open_default(&db_path).unwrap();
429
430 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
432 let page_id = page.page_id();
433
434 {
436 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
437
438 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
440
441 let mut page_data = [0u8; PAGE_SIZE];
443 page_data[0] = 0x42;
444 wal_writer
445 .append(&WalRecord::PageWrite {
446 tx_id: 1,
447 page_id,
448 data: page_data.to_vec(),
449 })
450 .unwrap();
451
452 wal_writer
454 .append(&WalRecord::Rollback { tx_id: 1 })
455 .unwrap();
456
457 wal_writer.sync().unwrap();
458 }
459
460 let checkpointer = Checkpointer::new(CheckpointMode::Full);
462 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
463
464 assert_eq!(result.transactions_processed, 0);
466 assert_eq!(result.pages_checkpointed, 0);
467
468 let read_page = pager.read_page(page_id).unwrap();
470 assert_ne!(read_page.as_bytes()[0], 0x42);
471
472 cleanup(&dir);
473 }
474
475 #[test]
476 fn test_checkpoint_mixed_transactions() {
477 let dir = temp_dir();
478 let _ = fs::create_dir_all(&dir);
479 let db_path = dir.join("test.db");
480 let wal_path = dir.join("test.wal");
481
482 let pager = Pager::open_default(&db_path).unwrap();
484
485 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
487 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
488 let page1_id = page1.page_id();
489 let page2_id = page2.page_id();
490
491 {
493 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
494
495 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
497 let mut page_data1 = [0u8; PAGE_SIZE];
498 page_data1[0] = 0x11;
499 wal_writer
500 .append(&WalRecord::PageWrite {
501 tx_id: 1,
502 page_id: page1_id,
503 data: page_data1.to_vec(),
504 })
505 .unwrap();
506 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
507
508 wal_writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
510 let mut page_data2 = [0u8; PAGE_SIZE];
511 page_data2[0] = 0x22;
512 wal_writer
513 .append(&WalRecord::PageWrite {
514 tx_id: 2,
515 page_id: page2_id,
516 data: page_data2.to_vec(),
517 })
518 .unwrap();
519 wal_writer
520 .append(&WalRecord::Rollback { tx_id: 2 })
521 .unwrap();
522
523 wal_writer.append(&WalRecord::Begin { tx_id: 3 }).unwrap();
525 let mut page_data3 = [0u8; PAGE_SIZE];
526 page_data3[0] = 0x33;
527 wal_writer
528 .append(&WalRecord::PageWrite {
529 tx_id: 3,
530 page_id: page2_id,
531 data: page_data3.to_vec(),
532 })
533 .unwrap();
534 wal_writer.append(&WalRecord::Commit { tx_id: 3 }).unwrap();
535
536 wal_writer.sync().unwrap();
537 }
538
539 let checkpointer = Checkpointer::new(CheckpointMode::Full);
541 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
542
543 assert_eq!(result.transactions_processed, 2);
545 assert_eq!(result.pages_checkpointed, 2);
546
547 let read_page1 = pager.read_page(page1_id).unwrap();
549 assert_eq!(read_page1.as_bytes()[0], 0x11);
550
551 let read_page2 = pager.read_page(page2_id).unwrap();
552 assert_eq!(read_page2.as_bytes()[0], 0x33); cleanup(&dir);
555 }
556
557 #[test]
558 fn test_checkpoint_truncate() {
559 let dir = temp_dir();
560 let _ = fs::create_dir_all(&dir);
561 let db_path = dir.join("test.db");
562 let wal_path = dir.join("test.wal");
563
564 let pager = Pager::open_default(&db_path).unwrap();
566
567 {
569 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
570 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
571 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
572 wal_writer.sync().unwrap();
573 }
574
575 let checkpointer = Checkpointer::new(CheckpointMode::Truncate);
577 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
578
579 assert!(result.wal_truncated);
580
581 let wal_size = fs::metadata(&wal_path).unwrap().len();
583 assert!(
585 wal_size < 50,
586 "WAL should be truncated, but size is {}",
587 wal_size
588 );
589
590 cleanup(&dir);
591 }
592}