quill_sql/storage/heap/
wal_codec.rs

1use std::convert::TryFrom;
2
3use crate::buffer::PageId;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::codec::RidCodec;
6use crate::storage::page::{RecordId, TupleMeta};
7use crate::transaction::{CommandId, TransactionId};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct RelationIdent {
11    pub root_page_id: PageId,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct TupleMetaRepr {
16    pub insert_txn_id: TransactionId,
17    pub insert_cid: CommandId,
18    pub delete_txn_id: TransactionId,
19    pub delete_cid: CommandId,
20    pub is_deleted: bool,
21    pub next_version: Option<RecordId>,
22    pub prev_version: Option<RecordId>,
23}
24
25impl From<TupleMetaRepr> for TupleMeta {
26    fn from(value: TupleMetaRepr) -> Self {
27        TupleMeta {
28            insert_txn_id: value.insert_txn_id,
29            insert_cid: value.insert_cid,
30            delete_txn_id: value.delete_txn_id,
31            delete_cid: value.delete_cid,
32            is_deleted: value.is_deleted,
33            next_version: value.next_version,
34            prev_version: value.prev_version,
35        }
36    }
37}
38
39impl From<TupleMeta> for TupleMetaRepr {
40    fn from(value: TupleMeta) -> Self {
41        TupleMetaRepr {
42            insert_txn_id: value.insert_txn_id,
43            insert_cid: value.insert_cid,
44            delete_txn_id: value.delete_txn_id,
45            delete_cid: value.delete_cid,
46            is_deleted: value.is_deleted,
47            next_version: value.next_version,
48            prev_version: value.prev_version,
49        }
50    }
51}
52
53#[derive(Debug, Clone)]
54pub struct HeapInsertPayload {
55    pub relation: RelationIdent,
56    pub page_id: PageId,
57    pub slot_id: u16,
58    /// transaction id that produced this heap operation
59    pub op_txn_id: TransactionId,
60    pub tuple_meta: TupleMetaRepr,
61    pub tuple_data: Vec<u8>,
62}
63
64#[derive(Debug, Clone)]
65pub struct HeapDeletePayload {
66    pub relation: RelationIdent,
67    pub page_id: PageId,
68    pub slot_id: u16,
69    /// transaction id that produced this heap operation
70    pub op_txn_id: TransactionId,
71    pub new_tuple_meta: TupleMetaRepr,
72    pub old_tuple_meta: TupleMetaRepr,
73    pub old_tuple_data: Vec<u8>,
74}
75
76#[derive(Debug, Clone)]
77pub enum HeapRecordPayload {
78    Insert(HeapInsertPayload),
79    Delete(HeapDeletePayload),
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83#[repr(u8)]
84pub enum HeapRecordKind {
85    Insert = 1,
86    Delete = 2,
87}
88
89impl TryFrom<u8> for HeapRecordKind {
90    type Error = QuillSQLError;
91
92    fn try_from(value: u8) -> Result<Self, Self::Error> {
93        match value {
94            1 => Ok(HeapRecordKind::Insert),
95            2 => Ok(HeapRecordKind::Delete),
96            other => Err(QuillSQLError::Internal(format!(
97                "Unknown heap record kind: {}",
98                other
99            ))),
100        }
101    }
102}
103
104pub fn encode_heap_record(payload: &HeapRecordPayload) -> (u8, Vec<u8>) {
105    match payload {
106        HeapRecordPayload::Insert(body) => (HeapRecordKind::Insert as u8, encode_heap_insert(body)),
107        HeapRecordPayload::Delete(body) => (HeapRecordKind::Delete as u8, encode_heap_delete(body)),
108    }
109}
110
111pub fn decode_heap_record(bytes: &[u8], info: u8) -> QuillSQLResult<HeapRecordPayload> {
112    match HeapRecordKind::try_from(info)? {
113        HeapRecordKind::Insert => Ok(HeapRecordPayload::Insert(decode_heap_insert(bytes)?)),
114        HeapRecordKind::Delete => Ok(HeapRecordPayload::Delete(decode_heap_delete(bytes)?)),
115    }
116}
117
118fn encode_relation_ident(relation: &RelationIdent, buf: &mut Vec<u8>) {
119    buf.extend_from_slice(&relation.root_page_id.to_le_bytes());
120}
121
122fn decode_relation_ident(bytes: &[u8]) -> QuillSQLResult<(RelationIdent, usize)> {
123    if bytes.len() < 4 {
124        return Err(QuillSQLError::Internal(
125            "Heap payload too short for relation ident".to_string(),
126        ));
127    }
128    let root_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as PageId;
129    Ok((RelationIdent { root_page_id }, 4))
130}
131
132fn encode_tuple_meta(meta: &TupleMetaRepr, buf: &mut Vec<u8>) {
133    buf.extend_from_slice(&meta.insert_txn_id.to_le_bytes());
134    buf.extend_from_slice(&meta.insert_cid.to_le_bytes());
135    buf.extend_from_slice(&meta.delete_txn_id.to_le_bytes());
136    buf.extend_from_slice(&meta.delete_cid.to_le_bytes());
137    buf.push(meta.is_deleted as u8);
138    if let Some(next) = meta.next_version {
139        buf.push(1);
140        buf.extend(RidCodec::encode(&next));
141    } else {
142        buf.push(0);
143    }
144    if let Some(prev) = meta.prev_version {
145        buf.push(1);
146        buf.extend(RidCodec::encode(&prev));
147    } else {
148        buf.push(0);
149    }
150}
151
152fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMetaRepr, usize)> {
153    if bytes.len() < 8 + 4 + 8 + 4 + 1 + 1 + 1 {
154        return Err(QuillSQLError::Internal(
155            "Heap payload too short for tuple meta".to_string(),
156        ));
157    }
158    let insert_txn_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as TransactionId;
159    let insert_cid = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as CommandId;
160    let delete_txn_id = u64::from_le_bytes(bytes[12..20].try_into().unwrap()) as TransactionId;
161    let delete_cid = u32::from_le_bytes(bytes[20..24].try_into().unwrap()) as CommandId;
162    let is_deleted = bytes[24] != 0;
163    let mut offset = 25;
164
165    let has_next = bytes
166        .get(offset)
167        .copied()
168        .ok_or_else(|| QuillSQLError::Internal("tuple meta missing next flag".to_string()))?
169        != 0;
170    offset += 1;
171    let next_version = if has_next {
172        let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
173        offset += consumed;
174        Some(rid)
175    } else {
176        None
177    };
178
179    let has_prev = bytes
180        .get(offset)
181        .copied()
182        .ok_or_else(|| QuillSQLError::Internal("tuple meta missing prev flag".to_string()))?
183        != 0;
184    offset += 1;
185    let prev_version = if has_prev {
186        let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
187        offset += consumed;
188        Some(rid)
189    } else {
190        None
191    };
192
193    Ok((
194        TupleMetaRepr {
195            insert_txn_id,
196            insert_cid,
197            delete_txn_id,
198            delete_cid,
199            is_deleted,
200            next_version,
201            prev_version,
202        },
203        offset,
204    ))
205}
206
207fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
208    buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
209    buf.extend_from_slice(data);
210}
211
212fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
213    if bytes.len() < 4 {
214        return Err(QuillSQLError::Internal(
215            "Heap payload missing length prefix".to_string(),
216        ));
217    }
218    let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
219    if bytes.len() < 4 + len {
220        return Err(QuillSQLError::Internal(
221            "Heap payload length prefix out of bounds".to_string(),
222        ));
223    }
224    Ok((bytes[4..4 + len].to_vec(), 4 + len))
225}
226
227fn encode_heap_insert(body: &HeapInsertPayload) -> Vec<u8> {
228    // Heap/Insert (rmid=Heap, info=1)
229    // body: relation(root_id u32) + page_id(4) + slot_id(2) + op_txn_id(8) + tuple_meta(17B) + tuple_data_len+data
230    let mut buf = Vec::new();
231    encode_relation_ident(&body.relation, &mut buf);
232    buf.extend_from_slice(&body.page_id.to_le_bytes());
233    buf.extend_from_slice(&body.slot_id.to_le_bytes());
234    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
235    encode_tuple_meta(&body.tuple_meta, &mut buf);
236    encode_bytes(&body.tuple_data, &mut buf);
237    buf
238}
239
240fn decode_heap_insert(bytes: &[u8]) -> QuillSQLResult<HeapInsertPayload> {
241    let (relation, mut offset) = decode_relation_ident(bytes)?;
242    if bytes.len() < offset + 4 + 2 {
243        return Err(QuillSQLError::Internal(
244            "Heap insert payload too short".to_string(),
245        ));
246    }
247    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
248    offset += 4;
249    let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
250    offset += 2;
251    if bytes.len() < offset + 8 {
252        return Err(QuillSQLError::Internal(
253            "Heap insert payload missing op_txn_id".to_string(),
254        ));
255    }
256    let op_txn_id =
257        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
258    offset += 8;
259    let (tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
260    offset += consumed;
261    let (tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
262    Ok(HeapInsertPayload {
263        relation,
264        page_id,
265        slot_id,
266        op_txn_id,
267        tuple_meta,
268        tuple_data,
269    })
270}
271
272fn encode_heap_delete(body: &HeapDeletePayload) -> Vec<u8> {
273    // Heap/Delete (rmid=Heap, info=3)
274    // body: relation + page_id + slot_id + op_txn_id + new_meta + old_meta + has_old_data+len+data
275    let mut buf = Vec::new();
276    encode_relation_ident(&body.relation, &mut buf);
277    buf.extend_from_slice(&body.page_id.to_le_bytes());
278    buf.extend_from_slice(&body.slot_id.to_le_bytes());
279    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
280    encode_tuple_meta(&body.new_tuple_meta, &mut buf);
281    encode_tuple_meta(&body.old_tuple_meta, &mut buf);
282    encode_bytes(&body.old_tuple_data, &mut buf);
283    buf
284}
285
286fn decode_heap_delete(bytes: &[u8]) -> QuillSQLResult<HeapDeletePayload> {
287    let (relation, mut offset) = decode_relation_ident(bytes)?;
288    if bytes.len() < offset + 4 + 2 {
289        return Err(QuillSQLError::Internal(
290            "Heap delete payload too short".to_string(),
291        ));
292    }
293    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
294    offset += 4;
295    let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
296    offset += 2;
297    if bytes.len() < offset + 8 {
298        return Err(QuillSQLError::Internal(
299            "Heap delete payload missing op_txn_id".to_string(),
300        ));
301    }
302    let op_txn_id =
303        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
304    offset += 8;
305    let (new_tuple_meta, consumed_new) = decode_tuple_meta(&bytes[offset..])?;
306    offset += consumed_new;
307    let (old_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
308    offset += consumed;
309    let (old_tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
310    Ok(HeapDeletePayload {
311        relation,
312        page_id,
313        slot_id,
314        op_txn_id,
315        new_tuple_meta,
316        old_tuple_meta,
317        old_tuple_data,
318    })
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use crate::transaction::TransactionId;
325
326    fn roundtrip(payload: HeapRecordPayload, kind: HeapRecordKind) {
327        let (info, bytes) = match &payload {
328            HeapRecordPayload::Insert(body) => (kind as u8, encode_heap_insert(body)),
329            HeapRecordPayload::Delete(body) => (kind as u8, encode_heap_delete(body)),
330        };
331        let decoded = decode_heap_record(&bytes, info).unwrap();
332        match (payload, decoded) {
333            (HeapRecordPayload::Insert(a), HeapRecordPayload::Insert(b)) => {
334                assert_eq!(a.relation.root_page_id, b.relation.root_page_id);
335                assert_eq!(a.page_id, b.page_id);
336                assert_eq!(a.slot_id, b.slot_id);
337                assert_eq!(a.op_txn_id, b.op_txn_id);
338                assert_eq!(a.tuple_meta, b.tuple_meta);
339                assert_eq!(a.tuple_data, b.tuple_data);
340            }
341            (HeapRecordPayload::Delete(a), HeapRecordPayload::Delete(b)) => {
342                assert_eq!(a.relation.root_page_id, b.relation.root_page_id);
343                assert_eq!(a.page_id, b.page_id);
344                assert_eq!(a.slot_id, b.slot_id);
345                assert_eq!(a.op_txn_id, b.op_txn_id);
346                assert_eq!(a.new_tuple_meta, b.new_tuple_meta);
347                assert_eq!(a.old_tuple_meta, b.old_tuple_meta);
348                assert_eq!(a.old_tuple_data, b.old_tuple_data);
349            }
350            _ => panic!("payload variant mismatch"),
351        }
352    }
353
354    #[test]
355    fn heap_insert_roundtrip() {
356        let payload = HeapRecordPayload::Insert(HeapInsertPayload {
357            relation: RelationIdent { root_page_id: 11 },
358            page_id: 9,
359            slot_id: 3,
360            op_txn_id: 42,
361            tuple_meta: TupleMetaRepr {
362                insert_txn_id: 42,
363                insert_cid: 1,
364                delete_txn_id: 0,
365                delete_cid: 0,
366                is_deleted: false,
367                next_version: None,
368                prev_version: None,
369            },
370            tuple_data: vec![1, 2, 3, 4],
371        });
372        roundtrip(payload, HeapRecordKind::Insert);
373    }
374
375    #[test]
376    fn heap_delete_roundtrip() {
377        let payload = HeapRecordPayload::Delete(HeapDeletePayload {
378            relation: RelationIdent { root_page_id: 7 },
379            page_id: 5,
380            slot_id: 2,
381            op_txn_id: TransactionId::default(),
382            new_tuple_meta: TupleMetaRepr {
383                insert_txn_id: 1,
384                insert_cid: 0,
385                delete_txn_id: 2,
386                delete_cid: 0,
387                is_deleted: true,
388                next_version: None,
389                prev_version: None,
390            },
391            old_tuple_meta: TupleMetaRepr {
392                insert_txn_id: 1,
393                insert_cid: 0,
394                delete_txn_id: 0,
395                delete_cid: 0,
396                is_deleted: false,
397                next_version: None,
398                prev_version: None,
399            },
400            old_tuple_data: vec![9; 6],
401        });
402        roundtrip(payload, HeapRecordKind::Delete);
403    }
404}