quill_sql/recovery/wal/codec/
mod.rs

1use std::convert::TryFrom;
2
3use crc32fast::Hasher;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal_record::WalRecordPayload;
7use crate::recovery::Lsn;
8use crate::storage::heap::wal_codec::{decode_heap_record, encode_heap_record, HeapRecordKind};
9
10pub mod checkpoint;
11pub mod clr;
12pub mod page;
13pub mod txn;
14
15pub use checkpoint::{decode_checkpoint, encode_checkpoint, CheckpointPayload};
16pub use clr::{decode_clr, encode_clr, ClrPayload};
17pub use page::{
18    decode_page_delta, decode_page_write, encode_page_delta, encode_page_write, PageDeltaPayload,
19    PageWritePayload,
20};
21pub use txn::{decode_transaction, encode_transaction, TransactionPayload, TransactionRecordKind};
22
23pub const WAL_MAGIC: u32 = 0x5157_414c; // "QWAL" (LE)
24pub const WAL_VERSION_V1: u16 = 1;
25pub const WAL_VERSION: u16 = 2;
26pub const WAL_HEADER_LEN: usize = 4 + 2 + 8 + 8 + 1 + 1 + 4;
27pub const WAL_CRC_LEN: usize = 4;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[repr(u8)]
31pub enum ResourceManagerId {
32    Page = 1,
33    Transaction = 2,
34    Heap = 3,
35    Checkpoint = 4,
36    Clr = 5,
37}
38
39impl TryFrom<u8> for ResourceManagerId {
40    type Error = QuillSQLError;
41
42    fn try_from(value: u8) -> Result<Self, Self::Error> {
43        match value {
44            1 => Ok(ResourceManagerId::Page),
45            2 => Ok(ResourceManagerId::Transaction),
46            3 => Ok(ResourceManagerId::Heap),
47            4 => Ok(ResourceManagerId::Checkpoint),
48            5 => Ok(ResourceManagerId::Clr),
49            other => Err(QuillSQLError::Internal(format!(
50                "Unknown WAL resource manager id: {}",
51                other
52            ))),
53        }
54    }
55}
56
57#[derive(Debug, Clone)]
58pub struct WalFrame {
59    pub lsn: Lsn,
60    pub prev_lsn: Lsn,
61    pub rmid: ResourceManagerId,
62    pub info: u8,
63    pub body: Vec<u8>,
64}
65
66pub fn encode_frame(lsn: Lsn, prev_lsn: Lsn, payload: &WalRecordPayload) -> Vec<u8> {
67    let (rmid, info, body_bytes) = encode_body(payload);
68    build_frame(lsn, prev_lsn, rmid, info, &body_bytes)
69}
70
71pub fn decode_frame(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
72    if bytes.len() < 6 {
73        return Err(QuillSQLError::Internal(
74            "WAL frame too short to contain version".to_string(),
75        ));
76    }
77    let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
78    if magic != WAL_MAGIC {
79        return Err(QuillSQLError::Internal(format!(
80            "Invalid WAL magic: {:x}",
81            magic
82        )));
83    }
84    let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
85    match version {
86        WAL_VERSION => decode_frame_v2(bytes),
87        WAL_VERSION_V1 => decode_frame_v1(bytes),
88        other => Err(QuillSQLError::Internal(format!(
89            "Unsupported WAL version: {}",
90            other
91        ))),
92    }
93}
94
95pub(crate) fn encode_body(payload: &WalRecordPayload) -> (ResourceManagerId, u8, Vec<u8>) {
96    match payload {
97        WalRecordPayload::PageWrite(body) => (ResourceManagerId::Page, 0, encode_page_write(body)),
98        WalRecordPayload::PageDelta(body) => (ResourceManagerId::Page, 1, encode_page_delta(body)),
99        WalRecordPayload::Transaction(body) => {
100            let (info, buf) = encode_transaction(body);
101            (ResourceManagerId::Transaction, info, buf)
102        }
103        WalRecordPayload::Heap(body) => {
104            let (info, buf) = encode_heap_record(body);
105            (ResourceManagerId::Heap, info, buf)
106        }
107        WalRecordPayload::Checkpoint(body) => {
108            (ResourceManagerId::Checkpoint, 0, encode_checkpoint(body))
109        }
110        WalRecordPayload::Clr(body) => (ResourceManagerId::Clr, 0, encode_clr(body)),
111    }
112}
113
114fn build_frame(
115    lsn: Lsn,
116    prev_lsn: Lsn,
117    rmid: ResourceManagerId,
118    info: u8,
119    body_bytes: &[u8],
120) -> Vec<u8> {
121    let mut frame = Vec::with_capacity(WAL_HEADER_LEN + body_bytes.len() + WAL_CRC_LEN);
122    frame.extend_from_slice(&WAL_MAGIC.to_le_bytes());
123    frame.extend_from_slice(&WAL_VERSION.to_le_bytes());
124    frame.extend_from_slice(&lsn.to_le_bytes());
125    frame.extend_from_slice(&prev_lsn.to_le_bytes());
126    frame.push(rmid as u8);
127    frame.push(info);
128    frame.extend_from_slice(&(body_bytes.len() as u32).to_le_bytes());
129    frame.extend_from_slice(body_bytes);
130
131    let mut hasher = Hasher::new();
132    hasher.update(&frame);
133    let crc = hasher.finalize();
134    frame.extend_from_slice(&crc.to_le_bytes());
135    frame
136}
137
138fn decode_frame_v2(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
139    if bytes.len() < WAL_HEADER_LEN + WAL_CRC_LEN {
140        return Err(QuillSQLError::Internal(
141            "WAL frame too short to contain header".to_string(),
142        ));
143    }
144    let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
145    let prev_lsn = u64::from_le_bytes(bytes[14..22].try_into().unwrap());
146    let rmid = ResourceManagerId::try_from(bytes[22])?;
147    let info = bytes[23];
148    let body_len = u32::from_le_bytes(bytes[24..28].try_into().unwrap()) as usize;
149    let total_len = WAL_HEADER_LEN + body_len + WAL_CRC_LEN;
150    if bytes.len() < total_len {
151        return Err(QuillSQLError::Internal(
152            "WAL frame truncated before body end".to_string(),
153        ));
154    }
155
156    let body = &bytes[WAL_HEADER_LEN..WAL_HEADER_LEN + body_len];
157    let expected_crc = u32::from_le_bytes(
158        bytes[WAL_HEADER_LEN + body_len..total_len]
159            .try_into()
160            .unwrap(),
161    );
162    let mut hasher = Hasher::new();
163    hasher.update(&bytes[0..WAL_HEADER_LEN + body_len]);
164    let actual_crc = hasher.finalize();
165    if expected_crc != actual_crc {
166        return Err(QuillSQLError::Internal(
167            "CRC mismatch for WAL frame".to_string(),
168        ));
169    }
170
171    Ok((
172        WalFrame {
173            lsn,
174            prev_lsn,
175            rmid,
176            info,
177            body: body.to_vec(),
178        },
179        total_len,
180    ))
181}
182
183fn decode_frame_v1(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
184    const HEADER_LEN_V1: usize = 4 + 2 + 8 + 1 + 4; // magic + version + lsn + kind + body_len
185    if bytes.len() < HEADER_LEN_V1 + WAL_CRC_LEN {
186        return Err(QuillSQLError::Internal(
187            "WAL frame too short to contain header".to_string(),
188        ));
189    }
190    let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
191    let kind = bytes[14];
192    let rmid = ResourceManagerId::try_from(kind)?;
193    let body_len = u32::from_le_bytes(bytes[15..19].try_into().unwrap()) as usize;
194    let total_len = HEADER_LEN_V1 + body_len + WAL_CRC_LEN;
195    if bytes.len() < total_len {
196        return Err(QuillSQLError::Internal(
197            "WAL frame truncated before body end".to_string(),
198        ));
199    }
200
201    let body = &bytes[HEADER_LEN_V1..HEADER_LEN_V1 + body_len];
202    let expected_crc = u32::from_le_bytes(
203        bytes[HEADER_LEN_V1 + body_len..total_len]
204            .try_into()
205            .unwrap(),
206    );
207    let mut hasher = Hasher::new();
208    hasher.update(&bytes[0..HEADER_LEN_V1 + body_len]);
209    let actual_crc = hasher.finalize();
210    if expected_crc != actual_crc {
211        return Err(QuillSQLError::Internal(
212            "CRC mismatch for WAL frame".to_string(),
213        ));
214    }
215
216    let info = match rmid {
217        ResourceManagerId::Page => 0,
218        ResourceManagerId::Transaction => {
219            if body.len() != 9 {
220                return Err(QuillSQLError::Internal(
221                    "Legacy transaction payload must be 9 bytes".to_string(),
222                ));
223            }
224            body[8]
225        }
226        ResourceManagerId::Heap => {
227            if body.is_empty() {
228                return Err(QuillSQLError::Internal(
229                    "Legacy heap payload missing kind byte".to_string(),
230                ));
231            }
232            body[0]
233        }
234        ResourceManagerId::Checkpoint | ResourceManagerId::Clr => 0,
235    };
236
237    Ok((
238        WalFrame {
239            lsn,
240            prev_lsn: lsn.saturating_sub(1),
241            rmid,
242            info,
243            body: match rmid {
244                ResourceManagerId::Page
245                | ResourceManagerId::Checkpoint
246                | ResourceManagerId::Clr => body.to_vec(),
247                ResourceManagerId::Transaction => body[..8].to_vec(),
248                ResourceManagerId::Heap => body[1..].to_vec(),
249            },
250        },
251        total_len,
252    ))
253}
254
255pub fn decode_payload(frame: &WalFrame) -> QuillSQLResult<WalRecordPayload> {
256    match frame.rmid {
257        ResourceManagerId::Page => match frame.info {
258            0 => Ok(WalRecordPayload::PageWrite(decode_page_write(&frame.body)?)),
259            1 => Ok(WalRecordPayload::PageDelta(decode_page_delta(&frame.body)?)),
260            other => Err(QuillSQLError::Internal(format!(
261                "Unknown Page info kind: {}",
262                other
263            ))),
264        },
265        ResourceManagerId::Transaction => Ok(WalRecordPayload::Transaction(decode_transaction(
266            &frame.body,
267            frame.info,
268        )?)),
269        ResourceManagerId::Heap => Ok(WalRecordPayload::Heap(decode_heap_record(
270            &frame.body,
271            frame.info,
272        )?)),
273        ResourceManagerId::Checkpoint => Ok(WalRecordPayload::Checkpoint(decode_checkpoint(
274            &frame.body,
275        )?)),
276        ResourceManagerId::Clr => Ok(WalRecordPayload::Clr(decode_clr(&frame.body)?)),
277    }
278}
279
280pub fn heap_record_kind_to_info(kind: HeapRecordKind) -> u8 {
281    kind as u8
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::recovery::wal_record::WalRecordPayload;
288    use crate::storage::heap::wal_codec::{
289        HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
290        TupleMetaRepr,
291    };
292    use crate::transaction::INVALID_COMMAND_ID;
293
294    #[test]
295    fn encode_decode_page_write() {
296        let payload = WalRecordPayload::PageWrite(PageWritePayload {
297            page_id: 42,
298            prev_page_lsn: 7,
299            page_image: vec![1, 2, 3, 4, 5],
300        });
301        let bytes = payload.encode(100, 99);
302        let (frame, len) = decode_frame(&bytes).unwrap();
303        assert_eq!(len, bytes.len());
304        assert_eq!(frame.lsn, 100);
305        assert_eq!(frame.prev_lsn, 99);
306        assert_eq!(frame.rmid, ResourceManagerId::Page);
307        assert_eq!(frame.info, 0);
308        let decoded = decode_payload(&frame).unwrap();
309        match decoded {
310            WalRecordPayload::PageWrite(body) => {
311                assert_eq!(body.page_id, 42);
312                assert_eq!(body.prev_page_lsn, 7);
313                assert_eq!(body.page_image, vec![1, 2, 3, 4, 5]);
314            }
315            other => panic!("unexpected payload variant: {:?}", other),
316        }
317    }
318
319    #[test]
320    fn encode_decode_page_delta() {
321        let payload = WalRecordPayload::PageDelta(PageDeltaPayload {
322            page_id: 17,
323            prev_page_lsn: 8,
324            offset: 5,
325            data: vec![9, 9, 9],
326        });
327        let bytes = payload.encode(200, 150);
328        let (frame, len) = decode_frame(&bytes).unwrap();
329        assert_eq!(len, bytes.len());
330        assert_eq!(frame.rmid, ResourceManagerId::Page);
331        assert_eq!(frame.info, 1);
332        let decoded = decode_payload(&frame).unwrap();
333        match decoded {
334            WalRecordPayload::PageDelta(body) => {
335                assert_eq!(body.page_id, 17);
336                assert_eq!(body.offset, 5);
337                assert_eq!(body.data, vec![9, 9, 9]);
338            }
339            other => panic!("unexpected payload variant: {:?}", other),
340        }
341    }
342
343    #[test]
344    fn encode_decode_transaction() {
345        let payload = WalRecordPayload::Transaction(TransactionPayload {
346            marker: TransactionRecordKind::Commit,
347            txn_id: 99,
348        });
349        let bytes = payload.encode(300, 200);
350        let (frame, len) = decode_frame(&bytes).unwrap();
351        assert_eq!(len, bytes.len());
352        assert_eq!(frame.rmid, ResourceManagerId::Transaction);
353        assert_eq!(frame.info, TransactionRecordKind::Commit as u8);
354        let decoded = decode_payload(&frame).unwrap();
355        match decoded {
356            WalRecordPayload::Transaction(body) => {
357                assert_eq!(body.marker, TransactionRecordKind::Commit);
358                assert_eq!(body.txn_id, 99);
359            }
360            other => panic!("unexpected payload variant: {:?}", other),
361        }
362    }
363
364    #[test]
365    fn encode_decode_heap_insert() {
366        let payload = WalRecordPayload::Heap(HeapRecordPayload::Insert(HeapInsertPayload {
367            relation: RelationIdent { root_page_id: 10 },
368            page_id: 12,
369            slot_id: 2,
370            op_txn_id: 1,
371            tuple_meta: TupleMetaRepr {
372                insert_txn_id: 1,
373                insert_cid: 0,
374                delete_txn_id: 0,
375                delete_cid: INVALID_COMMAND_ID,
376                is_deleted: false,
377                next_version: None,
378                prev_version: None,
379            },
380            tuple_data: vec![7, 8, 9],
381        }));
382        let bytes = payload.encode(123, 100);
383        let (frame, len) = decode_frame(&bytes).unwrap();
384        assert_eq!(len, bytes.len());
385        assert_eq!(frame.rmid, ResourceManagerId::Heap);
386        assert_eq!(frame.info, HeapRecordKind::Insert as u8);
387        let decoded = decode_payload(&frame).unwrap();
388        match decoded {
389            WalRecordPayload::Heap(HeapRecordPayload::Insert(body)) => {
390                assert_eq!(body.relation.root_page_id, 10);
391                assert_eq!(body.page_id, 12);
392                assert_eq!(body.slot_id, 2);
393                assert_eq!(body.tuple_data, vec![7, 8, 9]);
394            }
395            other => panic!("unexpected payload variant: {:?}", other),
396        }
397    }
398
399    #[test]
400    fn encode_decode_heap_update() {
401        let payload = WalRecordPayload::Heap(HeapRecordPayload::Update(HeapUpdatePayload {
402            relation: RelationIdent { root_page_id: 99 },
403            page_id: 44,
404            slot_id: 5,
405            op_txn_id: 11,
406            new_tuple_meta: TupleMetaRepr {
407                insert_txn_id: 11,
408                insert_cid: 0,
409                delete_txn_id: 0,
410                delete_cid: INVALID_COMMAND_ID,
411                is_deleted: false,
412                next_version: None,
413                prev_version: None,
414            },
415            new_tuple_data: vec![1, 2, 3, 4],
416            old_tuple_meta: Some(TupleMetaRepr {
417                insert_txn_id: 5,
418                insert_cid: 0,
419                delete_txn_id: 7,
420                delete_cid: 0,
421                is_deleted: true,
422                next_version: None,
423                prev_version: None,
424            }),
425            old_tuple_data: Some(vec![9, 9, 9]),
426        }));
427        let bytes = payload.encode(200, 150);
428        let (frame, len) = decode_frame(&bytes).unwrap();
429        assert_eq!(len, bytes.len());
430        assert_eq!(frame.rmid, ResourceManagerId::Heap);
431        assert_eq!(frame.info, HeapRecordKind::Update as u8);
432        let decoded = decode_payload(&frame).unwrap();
433        match decoded {
434            WalRecordPayload::Heap(HeapRecordPayload::Update(body)) => {
435                assert_eq!(body.relation.root_page_id, 99);
436                assert_eq!(body.new_tuple_data, vec![1, 2, 3, 4]);
437                assert!(body.old_tuple_meta.unwrap().is_deleted);
438                assert_eq!(body.old_tuple_data.unwrap(), vec![9, 9, 9]);
439            }
440            other => panic!("unexpected payload variant: {:?}", other),
441        }
442    }
443
444    #[test]
445    fn encode_decode_heap_delete() {
446        let payload = WalRecordPayload::Heap(HeapRecordPayload::Delete(HeapDeletePayload {
447            relation: RelationIdent { root_page_id: 7 },
448            page_id: 3,
449            slot_id: 1,
450            op_txn_id: 4,
451            old_tuple_meta: TupleMetaRepr {
452                insert_txn_id: 2,
453                insert_cid: 0,
454                delete_txn_id: 4,
455                delete_cid: 0,
456                is_deleted: true,
457                next_version: None,
458                prev_version: None,
459            },
460            old_tuple_data: None,
461        }));
462        let bytes = payload.encode(80, 60);
463        let (frame, len) = decode_frame(&bytes).unwrap();
464        assert_eq!(len, bytes.len());
465        assert_eq!(frame.rmid, ResourceManagerId::Heap);
466        assert_eq!(frame.info, HeapRecordKind::Delete as u8);
467        let decoded = decode_payload(&frame).unwrap();
468        match decoded {
469            WalRecordPayload::Heap(HeapRecordPayload::Delete(body)) => {
470                assert_eq!(body.relation.root_page_id, 7);
471                assert!(body.old_tuple_meta.is_deleted);
472                assert!(body.old_tuple_data.is_none());
473            }
474            other => panic!("unexpected payload variant: {:?}", other),
475        }
476    }
477
478    #[test]
479    fn encode_decode_checkpoint() {
480        let payload = WalRecordPayload::Checkpoint(CheckpointPayload {
481            last_lsn: 123,
482            dirty_pages: vec![10, 11, 12],
483            active_transactions: vec![1, 2, 3],
484            dpt: vec![(10, 1000), (11, 1100)],
485        });
486        let bytes = payload.encode(999, 900);
487        let (frame, len) = decode_frame(&bytes).unwrap();
488        assert_eq!(len, bytes.len());
489        assert_eq!(frame.rmid, ResourceManagerId::Checkpoint);
490        let decoded = decode_payload(&frame).unwrap();
491        match decoded {
492            WalRecordPayload::Checkpoint(body) => {
493                assert_eq!(body.last_lsn, 123);
494                assert_eq!(body.dirty_pages, vec![10, 11, 12]);
495                assert_eq!(body.active_transactions, vec![1, 2, 3]);
496                assert_eq!(body.dpt, vec![(10, 1000), (11, 1100)]);
497            }
498            other => panic!("unexpected payload variant: {:?}", other),
499        }
500    }
501
502    #[test]
503    fn encode_decode_clr() {
504        let clr = ClrPayload {
505            txn_id: 11,
506            undone_lsn: 1234,
507            undo_next_lsn: 0,
508        };
509        let payload = WalRecordPayload::Clr(clr.clone());
510        let bytes = payload.encode(200, 150);
511        let (frame, len) = decode_frame(&bytes).unwrap();
512        assert_eq!(len, bytes.len());
513        assert_eq!(frame.rmid, ResourceManagerId::Clr);
514        let decoded = decode_payload(&frame).unwrap();
515        match decoded {
516            WalRecordPayload::Clr(body) => {
517                assert_eq!(body.txn_id, clr.txn_id);
518                assert_eq!(body.undone_lsn, clr.undone_lsn);
519                assert_eq!(body.undo_next_lsn, clr.undo_next_lsn);
520            }
521            other => panic!("unexpected payload variant: {:?}", other),
522        }
523    }
524}