Skip to main content

reddb_server/storage/wal/
checkpoint.rs

1//! Checkpoint Manager
2//!
3//! Responsible for transferring committed transactions from the WAL to the main
4//! database file. Checkpointing ensures durability and allows WAL truncation.
5//!
6//! # Algorithm
7//!
8//! 1. Read all WAL records sequentially
9//! 2. Track transaction states (Begin, Commit, Rollback)
10//! 3. For committed transactions, collect PageWrite records
11//! 4. Apply committed pages to the Pager in LSN order
12//! 5. Sync Pager to disk
13//! 6. Update checkpoint LSN in database header
14//! 7. Truncate WAL
15//!
16//! # References
17//!
18//! - Turso `core/storage/wal.rs:checkpoint()` - Checkpoint logic
19//! - SQLite WAL documentation
20
21use std::collections::{HashMap, HashSet};
22use std::io;
23use std::path::Path;
24
25use super::reader::WalReader;
26use super::record::WalRecord;
27use super::writer::WalWriter;
28use crate::storage::engine::{Page, Pager, PAGE_SIZE};
29
30/// Checkpoint mode
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum CheckpointMode {
33    /// Passive: Only checkpoint if no active writers
34    Passive,
35    /// Full: Wait for active writers to finish, then checkpoint all
36    Full,
37    /// Restart: Like Full, but also truncates the WAL
38    Restart,
39    /// Truncate: Checkpoint all and truncate WAL
40    Truncate,
41}
42
43/// Checkpoint result statistics
44#[derive(Debug, Clone, Default)]
45pub struct CheckpointResult {
46    /// Number of transactions processed
47    pub transactions_processed: u64,
48    /// Number of pages checkpointed
49    pub pages_checkpointed: u64,
50    /// Number of records processed
51    pub records_processed: u64,
52    /// Final LSN after checkpoint
53    pub checkpoint_lsn: u64,
54    /// Whether WAL was truncated
55    pub wal_truncated: bool,
56}
57
58/// Checkpoint error types
59#[derive(Debug)]
60pub enum CheckpointError {
61    /// I/O error
62    Io(io::Error),
63    /// Pager error
64    Pager(String),
65    /// WAL is corrupted
66    CorruptedWal(String),
67    /// No WAL file found
68    NoWal,
69}
70
71impl std::fmt::Display for CheckpointError {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            Self::Io(e) => write!(f, "I/O error: {}", e),
75            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
76            Self::CorruptedWal(msg) => write!(f, "Corrupted WAL: {}", msg),
77            Self::NoWal => write!(f, "No WAL file found"),
78        }
79    }
80}
81
82impl std::error::Error for CheckpointError {}
83
84impl From<io::Error> for CheckpointError {
85    fn from(e: io::Error) -> Self {
86        Self::Io(e)
87    }
88}
89
90/// Transaction state during checkpoint
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum TxState {
93    Active,
94    Committed,
95    Aborted,
96}
97
98/// Pending page write from a transaction
99#[derive(Debug)]
100struct PendingWrite {
101    tx_id: u64,
102    page_id: u32,
103    data: Vec<u8>,
104    lsn: u64,
105}
106
107/// Checkpoint manager
108///
109/// Responsible for transferring committed transactions from the WAL to the main database file.
110pub struct Checkpointer {
111    /// Checkpoint mode
112    mode: CheckpointMode,
113}
114
115impl Checkpointer {
116    /// Create a new checkpointer with the given mode
117    pub fn new(mode: CheckpointMode) -> Self {
118        Self { mode }
119    }
120
121    /// Create a checkpointer with default mode (Full)
122    pub fn default_mode() -> Self {
123        Self::new(CheckpointMode::Full)
124    }
125
126    /// Perform a checkpoint
127    ///
128    /// Reads all records from the WAL and applies committed changes to the database.
129    ///
130    /// # Arguments
131    ///
132    /// * `pager` - The Pager to write committed pages to
133    /// * `wal_path` - Path to the WAL file
134    ///
135    /// # Returns
136    ///
137    /// Checkpoint statistics or error
138    pub fn checkpoint(
139        &self,
140        pager: &Pager,
141        wal_path: &Path,
142    ) -> Result<CheckpointResult, CheckpointError> {
143        // Open WAL for reading
144        let wal_reader = match WalReader::open(wal_path) {
145            Ok(r) => r,
146            Err(e) if e.kind() == io::ErrorKind::NotFound => {
147                // No WAL file - nothing to checkpoint
148                return Ok(CheckpointResult::default());
149            }
150            Err(e) => return Err(CheckpointError::Io(e)),
151        };
152
153        // Phase 1: Read and categorize all records
154        let mut tx_states: HashMap<u64, TxState> = HashMap::new();
155        let mut pending_writes: Vec<PendingWrite> = Vec::new();
156        let mut records_processed: u64 = 0;
157        let mut last_lsn: u64 = 0;
158
159        for record_result in wal_reader.iter() {
160            let (lsn, record) = record_result.map_err(CheckpointError::Io)?;
161            records_processed += 1;
162            last_lsn = lsn;
163
164            match record {
165                WalRecord::Begin { tx_id } => {
166                    tx_states.insert(tx_id, TxState::Active);
167                }
168                WalRecord::Commit { tx_id } => {
169                    tx_states.insert(tx_id, TxState::Committed);
170                }
171                WalRecord::Rollback { tx_id } => {
172                    tx_states.insert(tx_id, TxState::Aborted);
173                }
174                WalRecord::PageWrite {
175                    tx_id,
176                    page_id,
177                    data,
178                } => {
179                    pending_writes.push(PendingWrite {
180                        tx_id,
181                        page_id,
182                        data,
183                        lsn,
184                    });
185                }
186                WalRecord::Checkpoint {
187                    lsn: _checkpoint_lsn,
188                } => {
189                    // Checkpoint marker - we can skip records before this LSN
190                    // For now, we process everything
191                }
192                WalRecord::TxCommitBatch { .. } => {
193                    // Store-level logical commit batches are replayed by
194                    // UnifiedStore, not by the pager page checkpoint path.
195                }
196                WalRecord::VectorInsert { .. } => {
197                    // Vector-turbo logical inserts are replayed by the
198                    // vector index layer, not by page checkpointing.
199                    // Issue #694 — named crash boundary for #673.
200                    crate::runtime::turbo_crash_inject::fire(
201                        crate::runtime::turbo_crash_inject::InjectionPoint::MidCheckpoint,
202                    );
203                }
204                WalRecord::FullPageImage { .. } => {
205                    // FPI records (gh-478) are consumed by the pager
206                    // recovery path before redo, not by checkpoint
207                    // accounting.
208                }
209            }
210        }
211
212        // Phase 2: Filter for committed transactions only
213        let committed_txs: HashSet<u64> = tx_states
214            .iter()
215            .filter(|(_, state)| **state == TxState::Committed)
216            .map(|(tx_id, _)| *tx_id)
217            .collect();
218
219        // Phase 3: Collect pages from committed transactions
220        // Keep only the latest write for each page (from committed txs)
221        let mut latest_writes: HashMap<u32, Vec<u8>> = HashMap::new();
222
223        for write in pending_writes {
224            if committed_txs.contains(&write.tx_id) {
225                // Always overwrite with later writes (they have higher LSN)
226                latest_writes.insert(write.page_id, write.data);
227            }
228        }
229
230        // Phase 4 (PREPARE): Mark checkpoint in progress in header
231        if !latest_writes.is_empty() {
232            pager
233                .set_checkpoint_in_progress(true, last_lsn)
234                .map_err(|e| CheckpointError::Pager(e.to_string()))?;
235        }
236
237        // Phase 5 (APPLY): Write committed pages to Pager
238        let mut pages_checkpointed: u64 = 0;
239
240        for (page_id, data) in &latest_writes {
241            // Reconstruct page from WAL data
242            if data.len() != PAGE_SIZE {
243                return Err(CheckpointError::CorruptedWal(format!(
244                    "Page {} has wrong size: {} (expected {})",
245                    page_id,
246                    data.len(),
247                    PAGE_SIZE
248                )));
249            }
250
251            let mut page_data = [0u8; PAGE_SIZE];
252            page_data.copy_from_slice(data);
253            let page = Page::from_bytes(page_data);
254
255            // Write to pager
256            pager
257                .write_page(*page_id, page)
258                .map_err(|e| CheckpointError::Pager(e.to_string()))?;
259
260            pages_checkpointed += 1;
261        }
262
263        // Phase 6: Sync Pager to disk
264        pager
265            .sync()
266            .map_err(|e| CheckpointError::Pager(e.to_string()))?;
267
268        // Phase 7 (COMPLETE): Clear in-progress flag and update checkpoint LSN
269        if !latest_writes.is_empty() {
270            pager
271                .complete_checkpoint(last_lsn)
272                .map_err(|e| CheckpointError::Pager(e.to_string()))?;
273        }
274
275        // Phase 8: Truncate WAL if requested
276        let wal_truncated = matches!(
277            self.mode,
278            CheckpointMode::Restart | CheckpointMode::Truncate
279        );
280
281        if wal_truncated {
282            let mut wal_writer = WalWriter::open(wal_path)?;
283            wal_writer.truncate()?;
284
285            // Write checkpoint marker with current LSN
286            let checkpoint_record = WalRecord::Checkpoint { lsn: last_lsn };
287            wal_writer.append(&checkpoint_record)?;
288            wal_writer.sync()?;
289        }
290
291        Ok(CheckpointResult {
292            transactions_processed: committed_txs.len() as u64,
293            pages_checkpointed,
294            records_processed,
295            checkpoint_lsn: last_lsn,
296            wal_truncated,
297        })
298    }
299
300    /// Perform crash recovery
301    ///
302    /// Called on database open to apply any committed transactions from the WAL
303    /// that weren't checkpointed before the crash. If a checkpoint was interrupted
304    /// (checkpoint_in_progress flag set), re-applies all WAL records from scratch.
305    ///
306    /// # Arguments
307    ///
308    /// * `pager` - The Pager to recover into
309    /// * `wal_path` - Path to the WAL file
310    ///
311    /// # Returns
312    ///
313    /// Recovery statistics or error
314    pub fn recover(pager: &Pager, wal_path: &Path) -> Result<CheckpointResult, CheckpointError> {
315        // Check if a checkpoint was interrupted
316        if let Ok(header) = pager.header() {
317            if header.checkpoint_in_progress {
318                // Previous checkpoint was interrupted — re-apply everything from WAL
319                // This is safe because page writes are idempotent (last-write-wins)
320                let _ = pager.set_checkpoint_in_progress(false, 0);
321            }
322        }
323        let checkpointer = Self::new(CheckpointMode::Truncate);
324        checkpointer.checkpoint(pager, wal_path)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use crate::storage::engine::PageType;
332    use std::fs;
333    use std::time::{SystemTime, UNIX_EPOCH};
334
335    fn temp_dir() -> std::path::PathBuf {
336        let timestamp = SystemTime::now()
337            .duration_since(UNIX_EPOCH)
338            .unwrap()
339            .as_nanos();
340        std::env::temp_dir().join(format!("reddb_checkpoint_test_{}", timestamp))
341    }
342
343    fn cleanup(dir: &Path) {
344        let _ = fs::remove_dir_all(dir);
345    }
346
347    fn temp_wal_path(dir: &Path, name: &str) -> std::path::PathBuf {
348        reddb_file::layout::wal_component_temp_path(dir, "checkpoint", name, std::process::id())
349    }
350
351    #[test]
352    fn test_checkpoint_empty_wal() {
353        let dir = temp_dir();
354        let _ = fs::create_dir_all(&dir);
355        let db_path = dir.join("test.db");
356        let wal_path = temp_wal_path(&dir, "empty");
357
358        // Create pager
359        let pager = Pager::open_default(&db_path).unwrap();
360
361        // No WAL file - should succeed with empty result
362        let checkpointer = Checkpointer::default_mode();
363        let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
364
365        assert_eq!(result.transactions_processed, 0);
366        assert_eq!(result.pages_checkpointed, 0);
367
368        cleanup(&dir);
369    }
370
371    #[test]
372    fn test_checkpoint_committed_transaction() {
373        let dir = temp_dir();
374        let _ = fs::create_dir_all(&dir);
375        let db_path = dir.join("test.db");
376        let wal_path = temp_wal_path(&dir, "committed");
377
378        // Create pager
379        let pager = Pager::open_default(&db_path).unwrap();
380
381        // Allocate a page to get its ID
382        let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
383        let page_id = page.page_id();
384
385        // Create WAL with a committed transaction
386        {
387            let mut wal_writer = WalWriter::open(&wal_path).unwrap();
388
389            // Begin transaction
390            wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
391
392            // Write a page
393            let mut page_data = [0u8; PAGE_SIZE];
394            page_data[0] = 0x42; // Mark with test byte
395            wal_writer
396                .append(&WalRecord::PageWrite {
397                    tx_id: 1,
398                    page_id,
399                    data: page_data.to_vec(),
400                })
401                .unwrap();
402
403            // Commit transaction
404            wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
405
406            wal_writer.sync().unwrap();
407        }
408
409        // Checkpoint
410        let checkpointer = Checkpointer::new(CheckpointMode::Full);
411        let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
412
413        assert_eq!(result.transactions_processed, 1);
414        assert_eq!(result.pages_checkpointed, 1);
415        assert_eq!(result.records_processed, 3);
416
417        // Verify page was written
418        let read_page = pager.read_page(page_id).unwrap();
419        assert_eq!(read_page.as_bytes()[0], 0x42);
420
421        cleanup(&dir);
422    }
423
424    #[test]
425    fn test_checkpoint_aborted_transaction() {
426        let dir = temp_dir();
427        let _ = fs::create_dir_all(&dir);
428        let db_path = dir.join("test.db");
429        let wal_path = temp_wal_path(&dir, "aborted");
430
431        // Create pager
432        let pager = Pager::open_default(&db_path).unwrap();
433
434        // Allocate a page to get its ID
435        let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
436        let page_id = page.page_id();
437
438        // Create WAL with an aborted transaction
439        {
440            let mut wal_writer = WalWriter::open(&wal_path).unwrap();
441
442            // Begin transaction
443            wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
444
445            // Write a page
446            let mut page_data = [0u8; PAGE_SIZE];
447            page_data[0] = 0x42;
448            wal_writer
449                .append(&WalRecord::PageWrite {
450                    tx_id: 1,
451                    page_id,
452                    data: page_data.to_vec(),
453                })
454                .unwrap();
455
456            // Rollback transaction
457            wal_writer
458                .append(&WalRecord::Rollback { tx_id: 1 })
459                .unwrap();
460
461            wal_writer.sync().unwrap();
462        }
463
464        // Checkpoint
465        let checkpointer = Checkpointer::new(CheckpointMode::Full);
466        let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
467
468        // Aborted transaction should not be checkpointed
469        assert_eq!(result.transactions_processed, 0);
470        assert_eq!(result.pages_checkpointed, 0);
471
472        // Verify page was NOT written (should still be zeros)
473        let read_page = pager.read_page(page_id).unwrap();
474        assert_ne!(read_page.as_bytes()[0], 0x42);
475
476        cleanup(&dir);
477    }
478
479    #[test]
480    fn test_checkpoint_mixed_transactions() {
481        let dir = temp_dir();
482        let _ = fs::create_dir_all(&dir);
483        let db_path = dir.join("test.db");
484        let wal_path = temp_wal_path(&dir, "truncate");
485
486        // Create pager
487        let pager = Pager::open_default(&db_path).unwrap();
488
489        // Allocate pages
490        let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
491        let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
492        let page1_id = page1.page_id();
493        let page2_id = page2.page_id();
494
495        // Create WAL with mixed transactions
496        {
497            let mut wal_writer = WalWriter::open(&wal_path).unwrap();
498
499            // Transaction 1: Committed
500            wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
501            let mut page_data1 = [0u8; PAGE_SIZE];
502            page_data1[0] = 0x11;
503            wal_writer
504                .append(&WalRecord::PageWrite {
505                    tx_id: 1,
506                    page_id: page1_id,
507                    data: page_data1.to_vec(),
508                })
509                .unwrap();
510            wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
511
512            // Transaction 2: Aborted
513            wal_writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
514            let mut page_data2 = [0u8; PAGE_SIZE];
515            page_data2[0] = 0x22;
516            wal_writer
517                .append(&WalRecord::PageWrite {
518                    tx_id: 2,
519                    page_id: page2_id,
520                    data: page_data2.to_vec(),
521                })
522                .unwrap();
523            wal_writer
524                .append(&WalRecord::Rollback { tx_id: 2 })
525                .unwrap();
526
527            // Transaction 3: Committed
528            wal_writer.append(&WalRecord::Begin { tx_id: 3 }).unwrap();
529            let mut page_data3 = [0u8; PAGE_SIZE];
530            page_data3[0] = 0x33;
531            wal_writer
532                .append(&WalRecord::PageWrite {
533                    tx_id: 3,
534                    page_id: page2_id,
535                    data: page_data3.to_vec(),
536                })
537                .unwrap();
538            wal_writer.append(&WalRecord::Commit { tx_id: 3 }).unwrap();
539
540            wal_writer.sync().unwrap();
541        }
542
543        // Checkpoint
544        let checkpointer = Checkpointer::new(CheckpointMode::Full);
545        let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
546
547        // Only committed transactions (1 and 3) should be processed
548        assert_eq!(result.transactions_processed, 2);
549        assert_eq!(result.pages_checkpointed, 2);
550
551        // Verify pages
552        let read_page1 = pager.read_page(page1_id).unwrap();
553        assert_eq!(read_page1.as_bytes()[0], 0x11);
554
555        let read_page2 = pager.read_page(page2_id).unwrap();
556        assert_eq!(read_page2.as_bytes()[0], 0x33); // From tx 3, not tx 2
557
558        cleanup(&dir);
559    }
560
561    #[test]
562    fn test_checkpoint_truncate() {
563        let dir = temp_dir();
564        let _ = fs::create_dir_all(&dir);
565        let db_path = dir.join("test.db");
566        let wal_path = temp_wal_path(&dir, "full-page-images");
567
568        // Create pager
569        let pager = Pager::open_default(&db_path).unwrap();
570
571        // Create WAL with a committed transaction
572        {
573            let mut wal_writer = WalWriter::open(&wal_path).unwrap();
574            wal_writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
575            wal_writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
576            wal_writer.sync().unwrap();
577        }
578
579        // Checkpoint with truncate
580        let checkpointer = Checkpointer::new(CheckpointMode::Truncate);
581        let result = checkpointer.checkpoint(&pager, &wal_path).unwrap();
582
583        assert!(result.wal_truncated);
584
585        let records: Vec<_> = WalReader::open(&wal_path)
586            .unwrap()
587            .iter()
588            .collect::<Result<Vec<_>, _>>()
589            .unwrap();
590        assert_eq!(records.len(), 1);
591        assert_eq!(
592            records[0].1,
593            WalRecord::Checkpoint {
594                lsn: result.checkpoint_lsn
595            }
596        );
597
598        cleanup(&dir);
599    }
600}