quill_sql/recovery/wal/codec/
checkpoint.rs

1use crate::buffer::PageId;
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::recovery::Lsn;
4use crate::transaction::TransactionId;
5
6#[derive(Debug, Clone)]
7pub struct CheckpointPayload {
8    pub last_lsn: Lsn,
9    pub dirty_pages: Vec<PageId>,
10    pub active_transactions: Vec<TransactionId>,
11    /// Dirty Page Table: (page_id, recLSN)
12    pub dpt: Vec<(PageId, Lsn)>,
13}
14
15pub fn encode_checkpoint(body: &CheckpointPayload) -> Vec<u8> {
16    // Checkpoint (rmid=Checkpoint, info=0)
17    // body: last_lsn(8) + dirty_pages_count(4) + dirty_pages[] + active_txns_count(4) + active_txns[] + dpt_count(4) + dpt[]
18    let mut buf = Vec::new();
19    buf.extend_from_slice(&body.last_lsn.to_le_bytes());
20    buf.extend_from_slice(&(body.dirty_pages.len() as u32).to_le_bytes());
21    for page_id in &body.dirty_pages {
22        buf.extend_from_slice(&page_id.to_le_bytes());
23    }
24    buf.extend_from_slice(&(body.active_transactions.len() as u32).to_le_bytes());
25    for txn_id in &body.active_transactions {
26        buf.extend_from_slice(&txn_id.to_le_bytes());
27    }
28    buf.extend_from_slice(&(body.dpt.len() as u32).to_le_bytes());
29    for (page_id, rec_lsn) in &body.dpt {
30        buf.extend_from_slice(&page_id.to_le_bytes());
31        buf.extend_from_slice(&rec_lsn.to_le_bytes());
32    }
33    buf
34}
35
36pub fn decode_checkpoint(bytes: &[u8]) -> QuillSQLResult<CheckpointPayload> {
37    if bytes.len() < 8 + 4 + 4 + 4 {
38        return Err(QuillSQLError::Internal(
39            "Checkpoint payload too short".to_string(),
40        ));
41    }
42    let last_lsn = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
43    let mut offset = 8;
44    let dirty_pages_len =
45        u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
46    offset += 4;
47    let mut dirty_pages = Vec::with_capacity(dirty_pages_len);
48    for _ in 0..dirty_pages_len {
49        if bytes.len() < offset + 4 {
50            return Err(QuillSQLError::Internal(
51                "Checkpoint dirty pages truncated".to_string(),
52            ));
53        }
54        let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
55        offset += 4;
56        dirty_pages.push(page_id);
57    }
58    if bytes.len() < offset + 4 {
59        return Err(QuillSQLError::Internal(
60            "Checkpoint active transactions truncated".to_string(),
61        ));
62    }
63    let active_txn_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
64    offset += 4;
65    let mut active_transactions = Vec::with_capacity(active_txn_len);
66    for _ in 0..active_txn_len {
67        if bytes.len() < offset + 8 {
68            return Err(QuillSQLError::Internal(
69                "Checkpoint active transactions truncated".to_string(),
70            ));
71        }
72        let txn_id = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
73        offset += 8;
74        active_transactions.push(txn_id);
75    }
76    if bytes.len() < offset + 4 {
77        return Err(QuillSQLError::Internal(
78            "Checkpoint DPT length missing".to_string(),
79        ));
80    }
81    let dpt_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
82    offset += 4;
83    let required_dpt = offset + dpt_len * (4 + 8);
84    if bytes.len() < required_dpt {
85        return Err(QuillSQLError::Internal(
86            "Checkpoint DPT truncated".to_string(),
87        ));
88    }
89    let mut dpt = Vec::with_capacity(dpt_len);
90    let mut cur = offset;
91    for _ in 0..dpt_len {
92        let pid = u32::from_le_bytes(bytes[cur..cur + 4].try_into().unwrap());
93        cur += 4;
94        let lsn = u64::from_le_bytes(bytes[cur..cur + 8].try_into().unwrap());
95        cur += 8;
96        dpt.push((pid, lsn));
97    }
98    Ok(CheckpointPayload {
99        last_lsn,
100        dirty_pages,
101        active_transactions,
102        dpt,
103    })
104}