quill_sql/recovery/wal/codec/
checkpoint.rs1use crate::buffer::PageId;
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::recovery::Lsn;
4use crate::transaction::TransactionId;
5
6#[derive(Debug, Clone)]
7pub struct CheckpointPayload {
8 pub last_lsn: Lsn,
9 pub dirty_pages: Vec<PageId>,
10 pub active_transactions: Vec<TransactionId>,
11 pub dpt: Vec<(PageId, Lsn)>,
13}
14
15pub fn encode_checkpoint(body: &CheckpointPayload) -> Vec<u8> {
16 let mut buf = Vec::new();
19 buf.extend_from_slice(&body.last_lsn.to_le_bytes());
20 buf.extend_from_slice(&(body.dirty_pages.len() as u32).to_le_bytes());
21 for page_id in &body.dirty_pages {
22 buf.extend_from_slice(&page_id.to_le_bytes());
23 }
24 buf.extend_from_slice(&(body.active_transactions.len() as u32).to_le_bytes());
25 for txn_id in &body.active_transactions {
26 buf.extend_from_slice(&txn_id.to_le_bytes());
27 }
28 buf.extend_from_slice(&(body.dpt.len() as u32).to_le_bytes());
29 for (page_id, rec_lsn) in &body.dpt {
30 buf.extend_from_slice(&page_id.to_le_bytes());
31 buf.extend_from_slice(&rec_lsn.to_le_bytes());
32 }
33 buf
34}
35
36pub fn decode_checkpoint(bytes: &[u8]) -> QuillSQLResult<CheckpointPayload> {
37 if bytes.len() < 8 + 4 + 4 + 4 {
38 return Err(QuillSQLError::Internal(
39 "Checkpoint payload too short".to_string(),
40 ));
41 }
42 let last_lsn = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
43 let mut offset = 8;
44 let dirty_pages_len =
45 u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
46 offset += 4;
47 let mut dirty_pages = Vec::with_capacity(dirty_pages_len);
48 for _ in 0..dirty_pages_len {
49 if bytes.len() < offset + 4 {
50 return Err(QuillSQLError::Internal(
51 "Checkpoint dirty pages truncated".to_string(),
52 ));
53 }
54 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
55 offset += 4;
56 dirty_pages.push(page_id);
57 }
58 if bytes.len() < offset + 4 {
59 return Err(QuillSQLError::Internal(
60 "Checkpoint active transactions truncated".to_string(),
61 ));
62 }
63 let active_txn_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
64 offset += 4;
65 let mut active_transactions = Vec::with_capacity(active_txn_len);
66 for _ in 0..active_txn_len {
67 if bytes.len() < offset + 8 {
68 return Err(QuillSQLError::Internal(
69 "Checkpoint active transactions truncated".to_string(),
70 ));
71 }
72 let txn_id = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
73 offset += 8;
74 active_transactions.push(txn_id);
75 }
76 if bytes.len() < offset + 4 {
77 return Err(QuillSQLError::Internal(
78 "Checkpoint DPT length missing".to_string(),
79 ));
80 }
81 let dpt_len = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
82 offset += 4;
83 let required_dpt = offset + dpt_len * (4 + 8);
84 if bytes.len() < required_dpt {
85 return Err(QuillSQLError::Internal(
86 "Checkpoint DPT truncated".to_string(),
87 ));
88 }
89 let mut dpt = Vec::with_capacity(dpt_len);
90 let mut cur = offset;
91 for _ in 0..dpt_len {
92 let pid = u32::from_le_bytes(bytes[cur..cur + 4].try_into().unwrap());
93 cur += 4;
94 let lsn = u64::from_le_bytes(bytes[cur..cur + 8].try_into().unwrap());
95 cur += 8;
96 dpt.push((pid, lsn));
97 }
98 Ok(CheckpointPayload {
99 last_lsn,
100 dirty_pages,
101 active_transactions,
102 dpt,
103 })
104}