quill_sql/recovery/wal/codec/
mod.rs

1use std::convert::TryFrom;
2
3use crc32fast::Hasher;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal_record::WalRecordPayload;
7use crate::recovery::Lsn;
8use crate::storage::heap::wal_codec::{decode_heap_record, encode_heap_record, HeapRecordKind};
9use crate::storage::index::wal_codec::{decode_index_record, encode_index_record};
10
11pub mod checkpoint;
12pub mod clr;
13pub mod page;
14pub mod txn;
15
16pub use checkpoint::{decode_checkpoint, encode_checkpoint, CheckpointPayload};
17pub use clr::{decode_clr, encode_clr, ClrPayload};
18pub use page::{decode_page_write, encode_page_write, PageWritePayload};
19pub use txn::{decode_transaction, encode_transaction, TransactionPayload, TransactionRecordKind};
20
21pub const WAL_MAGIC: u32 = 0x5157_414c; // "QWAL" (LE)
22pub const WAL_VERSION_V1: u16 = 1;
23pub const WAL_VERSION: u16 = 2;
24pub const WAL_HEADER_LEN: usize = 4 + 2 + 8 + 8 + 1 + 1 + 4;
25pub const WAL_CRC_LEN: usize = 4;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28#[repr(u8)]
29pub enum ResourceManagerId {
30    Page = 1,
31    Transaction = 2,
32    Heap = 3,
33    Checkpoint = 4,
34    Clr = 5,
35    Index = 6,
36}
37
38impl TryFrom<u8> for ResourceManagerId {
39    type Error = QuillSQLError;
40
41    fn try_from(value: u8) -> Result<Self, Self::Error> {
42        match value {
43            1 => Ok(ResourceManagerId::Page),
44            2 => Ok(ResourceManagerId::Transaction),
45            3 => Ok(ResourceManagerId::Heap),
46            4 => Ok(ResourceManagerId::Checkpoint),
47            5 => Ok(ResourceManagerId::Clr),
48            6 => Ok(ResourceManagerId::Index),
49            other => Err(QuillSQLError::Internal(format!(
50                "Unknown WAL resource manager id: {}",
51                other
52            ))),
53        }
54    }
55}
56
57#[derive(Debug, Clone)]
58pub struct WalFrame {
59    pub lsn: Lsn,
60    pub prev_lsn: Lsn,
61    pub rmid: ResourceManagerId,
62    pub info: u8,
63    pub body: Vec<u8>,
64}
65
66pub fn encode_frame(lsn: Lsn, prev_lsn: Lsn, payload: &WalRecordPayload) -> Vec<u8> {
67    let (rmid, info, body_bytes) = encode_body(payload);
68    build_frame(lsn, prev_lsn, rmid, info, &body_bytes)
69}
70
71pub fn decode_frame(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
72    if bytes.len() < 6 {
73        return Err(QuillSQLError::Internal(
74            "WAL frame too short to contain version".to_string(),
75        ));
76    }
77    let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
78    if magic != WAL_MAGIC {
79        return Err(QuillSQLError::Internal(format!(
80            "Invalid WAL magic: {:x}",
81            magic
82        )));
83    }
84    let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
85    match version {
86        WAL_VERSION => decode_frame_v2(bytes),
87        WAL_VERSION_V1 => decode_frame_v1(bytes),
88        other => Err(QuillSQLError::Internal(format!(
89            "Unsupported WAL version: {}",
90            other
91        ))),
92    }
93}
94
95pub(crate) fn encode_body(payload: &WalRecordPayload) -> (ResourceManagerId, u8, Vec<u8>) {
96    match payload {
97        WalRecordPayload::PageWrite(body) => (ResourceManagerId::Page, 0, encode_page_write(body)),
98        WalRecordPayload::Transaction(body) => {
99            let (info, buf) = encode_transaction(body);
100            (ResourceManagerId::Transaction, info, buf)
101        }
102        WalRecordPayload::Heap(body) => {
103            let (info, buf) = encode_heap_record(body);
104            (ResourceManagerId::Heap, info, buf)
105        }
106        WalRecordPayload::Index(body) => {
107            let (info, buf) = encode_index_record(body);
108            (ResourceManagerId::Index, info, buf)
109        }
110        WalRecordPayload::Checkpoint(body) => {
111            (ResourceManagerId::Checkpoint, 0, encode_checkpoint(body))
112        }
113        WalRecordPayload::Clr(body) => (ResourceManagerId::Clr, 0, encode_clr(body)),
114    }
115}
116
117fn build_frame(
118    lsn: Lsn,
119    prev_lsn: Lsn,
120    rmid: ResourceManagerId,
121    info: u8,
122    body_bytes: &[u8],
123) -> Vec<u8> {
124    let mut frame = Vec::with_capacity(WAL_HEADER_LEN + body_bytes.len() + WAL_CRC_LEN);
125    frame.extend_from_slice(&WAL_MAGIC.to_le_bytes());
126    frame.extend_from_slice(&WAL_VERSION.to_le_bytes());
127    frame.extend_from_slice(&lsn.to_le_bytes());
128    frame.extend_from_slice(&prev_lsn.to_le_bytes());
129    frame.push(rmid as u8);
130    frame.push(info);
131    frame.extend_from_slice(&(body_bytes.len() as u32).to_le_bytes());
132    frame.extend_from_slice(body_bytes);
133
134    let mut hasher = Hasher::new();
135    hasher.update(&frame);
136    let crc = hasher.finalize();
137    frame.extend_from_slice(&crc.to_le_bytes());
138    frame
139}
140
141fn decode_frame_v2(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
142    if bytes.len() < WAL_HEADER_LEN + WAL_CRC_LEN {
143        return Err(QuillSQLError::Internal(
144            "WAL frame too short to contain header".to_string(),
145        ));
146    }
147    let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
148    let prev_lsn = u64::from_le_bytes(bytes[14..22].try_into().unwrap());
149    let rmid = ResourceManagerId::try_from(bytes[22])?;
150    let info = bytes[23];
151    let body_len = u32::from_le_bytes(bytes[24..28].try_into().unwrap()) as usize;
152    let total_len = WAL_HEADER_LEN + body_len + WAL_CRC_LEN;
153    if bytes.len() < total_len {
154        return Err(QuillSQLError::Internal(
155            "WAL frame truncated before body end".to_string(),
156        ));
157    }
158
159    let body = &bytes[WAL_HEADER_LEN..WAL_HEADER_LEN + body_len];
160    let expected_crc = u32::from_le_bytes(
161        bytes[WAL_HEADER_LEN + body_len..total_len]
162            .try_into()
163            .unwrap(),
164    );
165    let mut hasher = Hasher::new();
166    hasher.update(&bytes[0..WAL_HEADER_LEN + body_len]);
167    let actual_crc = hasher.finalize();
168    if expected_crc != actual_crc {
169        return Err(QuillSQLError::Internal(
170            "CRC mismatch for WAL frame".to_string(),
171        ));
172    }
173
174    Ok((
175        WalFrame {
176            lsn,
177            prev_lsn,
178            rmid,
179            info,
180            body: body.to_vec(),
181        },
182        total_len,
183    ))
184}
185
186fn decode_frame_v1(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
187    const HEADER_LEN_V1: usize = 4 + 2 + 8 + 1 + 4; // magic + version + lsn + kind + body_len
188    if bytes.len() < HEADER_LEN_V1 + WAL_CRC_LEN {
189        return Err(QuillSQLError::Internal(
190            "WAL frame too short to contain header".to_string(),
191        ));
192    }
193    let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
194    let kind = bytes[14];
195    let rmid = ResourceManagerId::try_from(kind)?;
196    let body_len = u32::from_le_bytes(bytes[15..19].try_into().unwrap()) as usize;
197    let total_len = HEADER_LEN_V1 + body_len + WAL_CRC_LEN;
198    if bytes.len() < total_len {
199        return Err(QuillSQLError::Internal(
200            "WAL frame truncated before body end".to_string(),
201        ));
202    }
203
204    let body = &bytes[HEADER_LEN_V1..HEADER_LEN_V1 + body_len];
205    let expected_crc = u32::from_le_bytes(
206        bytes[HEADER_LEN_V1 + body_len..total_len]
207            .try_into()
208            .unwrap(),
209    );
210    let mut hasher = Hasher::new();
211    hasher.update(&bytes[0..HEADER_LEN_V1 + body_len]);
212    let actual_crc = hasher.finalize();
213    if expected_crc != actual_crc {
214        return Err(QuillSQLError::Internal(
215            "CRC mismatch for WAL frame".to_string(),
216        ));
217    }
218
219    let info = match rmid {
220        ResourceManagerId::Page => 0,
221        ResourceManagerId::Transaction => {
222            if body.len() != 9 {
223                return Err(QuillSQLError::Internal(
224                    "Legacy transaction payload must be 9 bytes".to_string(),
225                ));
226            }
227            body[8]
228        }
229        ResourceManagerId::Heap => {
230            if body.is_empty() {
231                return Err(QuillSQLError::Internal(
232                    "Legacy heap payload missing kind byte".to_string(),
233                ));
234            }
235            body[0]
236        }
237        ResourceManagerId::Index => {
238            if body.is_empty() {
239                return Err(QuillSQLError::Internal(
240                    "Legacy index payload missing kind byte".to_string(),
241                ));
242            }
243            body[0]
244        }
245        ResourceManagerId::Checkpoint | ResourceManagerId::Clr => 0,
246    };
247
248    Ok((
249        WalFrame {
250            lsn,
251            prev_lsn: lsn.saturating_sub(1),
252            rmid,
253            info,
254            body: match rmid {
255                ResourceManagerId::Page
256                | ResourceManagerId::Checkpoint
257                | ResourceManagerId::Clr => body.to_vec(),
258                ResourceManagerId::Transaction => body[..8].to_vec(),
259                ResourceManagerId::Heap => body[1..].to_vec(),
260                ResourceManagerId::Index => body[1..].to_vec(),
261            },
262        },
263        total_len,
264    ))
265}
266
267pub fn decode_payload(frame: &WalFrame) -> QuillSQLResult<WalRecordPayload> {
268    match frame.rmid {
269        ResourceManagerId::Page => match frame.info {
270            0 => Ok(WalRecordPayload::PageWrite(decode_page_write(&frame.body)?)),
271            other => Err(QuillSQLError::Internal(format!(
272                "Unknown Page info kind: {}",
273                other
274            ))),
275        },
276        ResourceManagerId::Transaction => Ok(WalRecordPayload::Transaction(decode_transaction(
277            &frame.body,
278            frame.info,
279        )?)),
280        ResourceManagerId::Heap => Ok(WalRecordPayload::Heap(decode_heap_record(
281            &frame.body,
282            frame.info,
283        )?)),
284        ResourceManagerId::Index => Ok(WalRecordPayload::Index(decode_index_record(
285            &frame.body,
286            frame.info,
287        )?)),
288        ResourceManagerId::Checkpoint => Ok(WalRecordPayload::Checkpoint(decode_checkpoint(
289            &frame.body,
290        )?)),
291        ResourceManagerId::Clr => Ok(WalRecordPayload::Clr(decode_clr(&frame.body)?)),
292    }
293}
294
295pub fn heap_record_kind_to_info(kind: HeapRecordKind) -> u8 {
296    kind as u8
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::recovery::wal_record::WalRecordPayload;
303    use crate::storage::heap::wal_codec::{
304        HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, RelationIdent, TupleMetaRepr,
305    };
306    use crate::transaction::INVALID_COMMAND_ID;
307
308    #[test]
309    fn encode_decode_page_write() {
310        let payload = WalRecordPayload::PageWrite(PageWritePayload {
311            page_id: 42,
312            prev_page_lsn: 7,
313            page_image: vec![1, 2, 3, 4, 5],
314        });
315        let bytes = payload.encode(100, 99);
316        let (frame, len) = decode_frame(&bytes).unwrap();
317        assert_eq!(len, bytes.len());
318        assert_eq!(frame.lsn, 100);
319        assert_eq!(frame.prev_lsn, 99);
320        assert_eq!(frame.rmid, ResourceManagerId::Page);
321        assert_eq!(frame.info, 0);
322        let decoded = decode_payload(&frame).unwrap();
323        match decoded {
324            WalRecordPayload::PageWrite(body) => {
325                assert_eq!(body.page_id, 42);
326                assert_eq!(body.prev_page_lsn, 7);
327                assert_eq!(body.page_image, vec![1, 2, 3, 4, 5]);
328            }
329            other => panic!("unexpected payload variant: {:?}", other),
330        }
331    }
332
333    #[test]
334    fn encode_decode_transaction() {
335        let payload = WalRecordPayload::Transaction(TransactionPayload {
336            marker: TransactionRecordKind::Commit,
337            txn_id: 99,
338        });
339        let bytes = payload.encode(300, 200);
340        let (frame, len) = decode_frame(&bytes).unwrap();
341        assert_eq!(len, bytes.len());
342        assert_eq!(frame.rmid, ResourceManagerId::Transaction);
343        assert_eq!(frame.info, TransactionRecordKind::Commit as u8);
344        let decoded = decode_payload(&frame).unwrap();
345        match decoded {
346            WalRecordPayload::Transaction(body) => {
347                assert_eq!(body.marker, TransactionRecordKind::Commit);
348                assert_eq!(body.txn_id, 99);
349            }
350            other => panic!("unexpected payload variant: {:?}", other),
351        }
352    }
353
354    #[test]
355    fn encode_decode_heap_insert() {
356        let payload = WalRecordPayload::Heap(HeapRecordPayload::Insert(HeapInsertPayload {
357            relation: RelationIdent { root_page_id: 10 },
358            page_id: 12,
359            slot_id: 2,
360            op_txn_id: 1,
361            tuple_meta: TupleMetaRepr {
362                insert_txn_id: 1,
363                insert_cid: 0,
364                delete_txn_id: 0,
365                delete_cid: INVALID_COMMAND_ID,
366                is_deleted: false,
367                next_version: None,
368                prev_version: None,
369            },
370            tuple_data: vec![7, 8, 9],
371        }));
372        let bytes = payload.encode(123, 100);
373        let (frame, len) = decode_frame(&bytes).unwrap();
374        assert_eq!(len, bytes.len());
375        assert_eq!(frame.rmid, ResourceManagerId::Heap);
376        assert_eq!(frame.info, HeapRecordKind::Insert as u8);
377        let decoded = decode_payload(&frame).unwrap();
378        match decoded {
379            WalRecordPayload::Heap(HeapRecordPayload::Insert(body)) => {
380                assert_eq!(body.relation.root_page_id, 10);
381                assert_eq!(body.page_id, 12);
382                assert_eq!(body.slot_id, 2);
383                assert_eq!(body.tuple_data, vec![7, 8, 9]);
384            }
385            other => panic!("unexpected payload variant: {:?}", other),
386        }
387    }
388
389    #[test]
390    fn encode_decode_heap_delete() {
391        let payload = WalRecordPayload::Heap(HeapRecordPayload::Delete(HeapDeletePayload {
392            relation: RelationIdent { root_page_id: 7 },
393            page_id: 3,
394            slot_id: 1,
395            op_txn_id: 4,
396            new_tuple_meta: TupleMetaRepr {
397                insert_txn_id: 2,
398                insert_cid: 0,
399                delete_txn_id: 4,
400                delete_cid: 0,
401                is_deleted: true,
402                next_version: None,
403                prev_version: None,
404            },
405            old_tuple_meta: TupleMetaRepr {
406                insert_txn_id: 2,
407                insert_cid: 0,
408                delete_txn_id: 4,
409                delete_cid: 0,
410                is_deleted: true,
411                next_version: None,
412                prev_version: None,
413            },
414            old_tuple_data: vec![1, 2, 3],
415        }));
416        let bytes = payload.encode(80, 60);
417        let (frame, len) = decode_frame(&bytes).unwrap();
418        assert_eq!(len, bytes.len());
419        assert_eq!(frame.rmid, ResourceManagerId::Heap);
420        assert_eq!(frame.info, HeapRecordKind::Delete as u8);
421        let decoded = decode_payload(&frame).unwrap();
422        match decoded {
423            WalRecordPayload::Heap(HeapRecordPayload::Delete(body)) => {
424                assert_eq!(body.relation.root_page_id, 7);
425                assert!(body.new_tuple_meta.is_deleted);
426                assert!(body.old_tuple_meta.is_deleted);
427                assert_eq!(body.old_tuple_data, vec![1, 2, 3]);
428            }
429            other => panic!("unexpected payload variant: {:?}", other),
430        }
431    }
432
433    #[test]
434    fn encode_decode_checkpoint() {
435        let payload = WalRecordPayload::Checkpoint(CheckpointPayload {
436            last_lsn: 123,
437            dirty_pages: vec![10, 11, 12],
438            active_transactions: vec![1, 2, 3],
439            dpt: vec![(10, 1000), (11, 1100)],
440        });
441        let bytes = payload.encode(999, 900);
442        let (frame, len) = decode_frame(&bytes).unwrap();
443        assert_eq!(len, bytes.len());
444        assert_eq!(frame.rmid, ResourceManagerId::Checkpoint);
445        let decoded = decode_payload(&frame).unwrap();
446        match decoded {
447            WalRecordPayload::Checkpoint(body) => {
448                assert_eq!(body.last_lsn, 123);
449                assert_eq!(body.dirty_pages, vec![10, 11, 12]);
450                assert_eq!(body.active_transactions, vec![1, 2, 3]);
451                assert_eq!(body.dpt, vec![(10, 1000), (11, 1100)]);
452            }
453            other => panic!("unexpected payload variant: {:?}", other),
454        }
455    }
456
457    #[test]
458    fn encode_decode_clr() {
459        let clr = ClrPayload {
460            txn_id: 11,
461            undone_lsn: 1234,
462            undo_next_lsn: 0,
463        };
464        let payload = WalRecordPayload::Clr(clr.clone());
465        let bytes = payload.encode(200, 150);
466        let (frame, len) = decode_frame(&bytes).unwrap();
467        assert_eq!(len, bytes.len());
468        assert_eq!(frame.rmid, ResourceManagerId::Clr);
469        let decoded = decode_payload(&frame).unwrap();
470        match decoded {
471            WalRecordPayload::Clr(body) => {
472                assert_eq!(body.txn_id, clr.txn_id);
473                assert_eq!(body.undone_lsn, clr.undone_lsn);
474                assert_eq!(body.undo_next_lsn, clr.undo_next_lsn);
475            }
476            other => panic!("unexpected payload variant: {:?}", other),
477        }
478    }
479}