reddb_server/storage/wal/
checkpoint.rs1use std::collections::{HashMap, HashSet};
22use std::io;
23use std::path::Path;
24
25use super::reader::WalReader;
26use super::record::WalRecord;
27use super::writer::WalWriter;
28use crate::storage::engine::{Page, Pager, PAGE_SIZE};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum CheckpointMode {
33 Passive,
35 Full,
37 Restart,
39 Truncate,
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct CheckpointResult {
46 pub transactions_processed: u64,
48 pub pages_checkpointed: u64,
50 pub records_processed: u64,
52 pub checkpoint_lsn: u64,
54 pub wal_truncated: bool,
56}
57
58#[derive(Debug)]
60pub enum CheckpointError {
61 Io(io::Error),
63 Pager(String),
65 CorruptedWal(String),
67 NoWal,
69}
70
71impl std::fmt::Display for CheckpointError {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 Self::Io(e) => write!(f, "I/O error: {}", e),
75 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
76 Self::CorruptedWal(msg) => write!(f, "Corrupted WAL: {}", msg),
77 Self::NoWal => write!(f, "No WAL file found"),
78 }
79 }
80}
81
82impl std::error::Error for CheckpointError {}
83
84impl From<io::Error> for CheckpointError {
85 fn from(e: io::Error) -> Self {
86 Self::Io(e)
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum TxState {
93 Active,
94 Committed,
95 Aborted,
96}
97
98#[derive(Debug)]
100struct PendingWrite {
101 tx_id: u64,
102 page_id: u32,
103 data: Vec<u8>,
104 lsn: u64,
105}
106
107pub struct Checkpointer {
111 mode: CheckpointMode,
113}
114
115impl Checkpointer {
116 pub fn new(mode: CheckpointMode) -> Self {
118 Self { mode }
119 }
120
121 pub fn default_mode() -> Self {
123 Self::new(CheckpointMode::Full)
124 }
125
126 pub fn checkpoint(
139 &self,
140 pager: &Pager,
141 wal_path: &Path,
142 ) -> Result<CheckpointResult, CheckpointError> {
143 let wal_reader = match WalReader::open(wal_path) {
145 Ok(r) => r,
146 Err(e) if e.kind() == io::ErrorKind::NotFound => {
147 return Ok(CheckpointResult::default());
149 }
150 Err(e) => return Err(CheckpointError::Io(e)),
151 };
152
153 let mut tx_states: HashMap<u64, TxState> = HashMap::new();
155 let mut pending_writes: Vec<PendingWrite> = Vec::new();
156 let mut records_processed: u64 = 0;
157 let mut last_lsn: u64 = 0;
158
159 for record_result in wal_reader.iter() {
160 let (lsn, record) = record_result.map_err(CheckpointError::Io)?;
161 records_processed += 1;
162 last_lsn = lsn;
163
164 match record {
165 WalRecord::Begin { tx_id } => {
166 tx_states.insert(tx_id, TxState::Active);
167 }
168 WalRecord::Commit { tx_id } => {
169 tx_states.insert(tx_id, TxState::Committed);
170 }
171 WalRecord::Rollback { tx_id } => {
172 tx_states.insert(tx_id, TxState::Aborted);
173 }
174 WalRecord::PageWrite {
175 tx_id,
176 page_id,
177 data,
178 } => {
179 pending_writes.push(PendingWrite {
180 tx_id,
181 page_id,
182 data,
183 lsn,
184 });
185 }
186 WalRecord::Checkpoint {
187 lsn: _checkpoint_lsn,
188 } => {
189 }
192 WalRecord::TxCommitBatch { .. } => {
193 }
196 WalRecord::VectorInsert { .. } => {
197 crate::runtime::turbo_crash_inject::fire(
201 crate::runtime::turbo_crash_inject::InjectionPoint::MidCheckpoint,
202 );
203 }
204 WalRecord::FullPageImage { .. } => {
205 }
209 }
210 }
211
212 let committed_txs: HashSet<u64> = tx_states
214 .iter()
215 .filter(|(_, state)| **state == TxState::Committed)
216 .map(|(tx_id, _)| *tx_id)
217 .collect();
218
219 let mut latest_writes: HashMap<u32, Vec<u8>> = HashMap::new();
222
223 for write in pending_writes {
224 if committed_txs.contains(&write.tx_id) {
225 latest_writes.insert(write.page_id, write.data);
227 }
228 }
229
230 if !latest_writes.is_empty() {
232 pager
233 .set_checkpoint_in_progress(true, last_lsn)
234 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
235 }
236
237 let mut pages_checkpointed: u64 = 0;
239
240 for (page_id, data) in &latest_writes {
241 if data.len() != PAGE_SIZE {
243 return Err(CheckpointError::CorruptedWal(format!(
244 "Page {} has wrong size: {} (expected {})",
245 page_id,
246 data.len(),
247 PAGE_SIZE
248 )));
249 }
250
251 let mut page_data = [0u8; PAGE_SIZE];
252 page_data.copy_from_slice(data);
253 let page = Page::from_bytes(page_data);
254
255 pager
257 .write_page(*page_id, page)
258 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
259
260 pages_checkpointed += 1;
261 }
262
263 pager
265 .sync()
266 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
267
268 if !latest_writes.is_empty() {
270 pager
271 .complete_checkpoint(last_lsn)
272 .map_err(|e| CheckpointError::Pager(e.to_string()))?;
273 }
274
275 let wal_truncated = matches!(
277 self.mode,
278 CheckpointMode::Restart | CheckpointMode::Truncate
279 );
280
281 if wal_truncated {
282 let mut wal_writer = WalWriter::open(wal_path)?;
283 wal_writer.truncate()?;
284
285 let checkpoint_record = WalRecord::Checkpoint { lsn: last_lsn };
287 wal_writer.append(&checkpoint_record)?;
288 wal_writer.sync()?;
289 }
290
291 Ok(CheckpointResult {
292 transactions_processed: committed_txs.len() as u64,
293 pages_checkpointed,
294 records_processed,
295 checkpoint_lsn: last_lsn,
296 wal_truncated,
297 })
298 }
299
300 pub fn recover(pager: &Pager, wal_path: &Path) -> Result<CheckpointResult, CheckpointError> {
315 if let Ok(header) = pager.header() {
317 if header.checkpoint_in_progress {
318 let _ = pager.set_checkpoint_in_progress(false, 0);
321 }
322 }
323 let checkpointer = Self::new(CheckpointMode::Truncate);
324 checkpointer.checkpoint(pager, wal_path)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::storage::engine::PageType;
332 use std::fs;
333 use std::time::{SystemTime, UNIX_EPOCH};
334
335 fn temp_dir() -> std::path::PathBuf {
336 let timestamp = SystemTime::now()
337 .duration_since(UNIX_EPOCH)
338 .unwrap()
339 .as_nanos();
340 std::env::temp_dir().join(format!("reddb_checkpoint_test_{}", timestamp))
341 }
342
343 fn cleanup(dir: &Path) {
344 let _ = fs::remove_dir_all(dir);
345 }
346
347 fn temp_wal_path(dir: &Path, name: &str) -> std::path::PathBuf {
348 reddb_file::layout::wal_component_temp_path(dir, "checkpoint", name, std::process::id())
349 }
350
351 #[test]
352 fn test_checkpoint_empty_wal() {
353 let dir = temp_dir();
354 let _ = fs::create_dir_all(&dir);
355 let db_path = dir.join("test.db");
356 let wal_path = temp_wal_path(&dir, "empty");
357
358 let pager = Pager::open_default(&db_path).unwrap();
360
361 let checkpointer = Checkpointer::default_mode();
363 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
364
365 assert_eq!(result.transactions_processed, 0);
366 assert_eq!(result.pages_checkpointed, 0);
367
368 cleanup(&dir);
369 }
370
371 #[test]
372 fn test_checkpoint_committed_transaction() {
373 let dir = temp_dir();
374 let _ = fs::create_dir_all(&dir);
375 let db_path = dir.join("test.db");
376 let wal_path = temp_wal_path(&dir, "committed");
377
378 let pager = Pager::open_default(&db_path).unwrap();
380
381 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
383 let page_id = page.page_id();
384
385 {
387 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
388
389 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
391
392 let mut page_data = [0u8; PAGE_SIZE];
394 page_data[0] = 0x42; wal_writer
396 .append(&WalRecord::PageWrite {
397 tx_id: 1,
398 page_id,
399 data: page_data.to_vec(),
400 })
401 .unwrap();
402
403 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
405
406 wal_writer.sync().unwrap();
407 }
408
409 let checkpointer = Checkpointer::new(CheckpointMode::Full);
411 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
412
413 assert_eq!(result.transactions_processed, 1);
414 assert_eq!(result.pages_checkpointed, 1);
415 assert_eq!(result.records_processed, 3);
416
417 let read_page = pager.read_page(page_id).unwrap();
419 assert_eq!(read_page.as_bytes()[0], 0x42);
420
421 cleanup(&dir);
422 }
423
424 #[test]
425 fn test_checkpoint_aborted_transaction() {
426 let dir = temp_dir();
427 let _ = fs::create_dir_all(&dir);
428 let db_path = dir.join("test.db");
429 let wal_path = temp_wal_path(&dir, "aborted");
430
431 let pager = Pager::open_default(&db_path).unwrap();
433
434 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
436 let page_id = page.page_id();
437
438 {
440 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
441
442 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
444
445 let mut page_data = [0u8; PAGE_SIZE];
447 page_data[0] = 0x42;
448 wal_writer
449 .append(&WalRecord::PageWrite {
450 tx_id: 1,
451 page_id,
452 data: page_data.to_vec(),
453 })
454 .unwrap();
455
456 wal_writer
458 .append(&WalRecord::Rollback { tx_id: 1 })
459 .unwrap();
460
461 wal_writer.sync().unwrap();
462 }
463
464 let checkpointer = Checkpointer::new(CheckpointMode::Full);
466 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
467
468 assert_eq!(result.transactions_processed, 0);
470 assert_eq!(result.pages_checkpointed, 0);
471
472 let read_page = pager.read_page(page_id).unwrap();
474 assert_ne!(read_page.as_bytes()[0], 0x42);
475
476 cleanup(&dir);
477 }
478
479 #[test]
480 fn test_checkpoint_mixed_transactions() {
481 let dir = temp_dir();
482 let _ = fs::create_dir_all(&dir);
483 let db_path = dir.join("test.db");
484 let wal_path = temp_wal_path(&dir, "truncate");
485
486 let pager = Pager::open_default(&db_path).unwrap();
488
489 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
491 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
492 let page1_id = page1.page_id();
493 let page2_id = page2.page_id();
494
495 {
497 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
498
499 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
501 let mut page_data1 = [0u8; PAGE_SIZE];
502 page_data1[0] = 0x11;
503 wal_writer
504 .append(&WalRecord::PageWrite {
505 tx_id: 1,
506 page_id: page1_id,
507 data: page_data1.to_vec(),
508 })
509 .unwrap();
510 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
511
512 wal_writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
514 let mut page_data2 = [0u8; PAGE_SIZE];
515 page_data2[0] = 0x22;
516 wal_writer
517 .append(&WalRecord::PageWrite {
518 tx_id: 2,
519 page_id: page2_id,
520 data: page_data2.to_vec(),
521 })
522 .unwrap();
523 wal_writer
524 .append(&WalRecord::Rollback { tx_id: 2 })
525 .unwrap();
526
527 wal_writer.append(&WalRecord::Begin { tx_id: 3 }).unwrap();
529 let mut page_data3 = [0u8; PAGE_SIZE];
530 page_data3[0] = 0x33;
531 wal_writer
532 .append(&WalRecord::PageWrite {
533 tx_id: 3,
534 page_id: page2_id,
535 data: page_data3.to_vec(),
536 })
537 .unwrap();
538 wal_writer.append(&WalRecord::Commit { tx_id: 3 }).unwrap();
539
540 wal_writer.sync().unwrap();
541 }
542
543 let checkpointer = Checkpointer::new(CheckpointMode::Full);
545 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
546
547 assert_eq!(result.transactions_processed, 2);
549 assert_eq!(result.pages_checkpointed, 2);
550
551 let read_page1 = pager.read_page(page1_id).unwrap();
553 assert_eq!(read_page1.as_bytes()[0], 0x11);
554
555 let read_page2 = pager.read_page(page2_id).unwrap();
556 assert_eq!(read_page2.as_bytes()[0], 0x33); cleanup(&dir);
559 }
560
561 #[test]
562 fn test_checkpoint_truncate() {
563 let dir = temp_dir();
564 let _ = fs::create_dir_all(&dir);
565 let db_path = dir.join("test.db");
566 let wal_path = temp_wal_path(&dir, "full-page-images");
567
568 let pager = Pager::open_default(&db_path).unwrap();
570
571 {
573 let mut wal_writer = WalWriter::open(&wal_path).unwrap();
574 wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
575 wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
576 wal_writer.sync().unwrap();
577 }
578
579 let checkpointer = Checkpointer::new(CheckpointMode::Truncate);
581 let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
582
583 assert!(result.wal_truncated);
584
585 let records: Vec<_> = WalReader::open(&wal_path)
586 .unwrap()
587 .iter()
588 .collect::<Result<Vec<_>, _>>()
589 .unwrap();
590 assert_eq!(records.len(), 1);
591 assert_eq!(
592 records[0].1,
593 WalRecord::Checkpoint {
594 lsn: result.checkpoint_lsn
595 }
596 );
597
598 cleanup(&dir);
599 }
600}