quill_sql/storage/heap/
wal_codec.rs

1use std::convert::TryFrom;
2
3use crate::buffer::PageId;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::codec::RidCodec;
6use crate::storage::page::{RecordId, TupleMeta};
7use crate::transaction::{CommandId, TransactionId};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct RelationIdent {
11    pub root_page_id: PageId,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct TupleMetaRepr {
16    pub insert_txn_id: TransactionId,
17    pub insert_cid: CommandId,
18    pub delete_txn_id: TransactionId,
19    pub delete_cid: CommandId,
20    pub is_deleted: bool,
21    pub next_version: Option<RecordId>,
22    pub prev_version: Option<RecordId>,
23}
24
25impl From<TupleMetaRepr> for TupleMeta {
26    fn from(value: TupleMetaRepr) -> Self {
27        TupleMeta {
28            insert_txn_id: value.insert_txn_id,
29            insert_cid: value.insert_cid,
30            delete_txn_id: value.delete_txn_id,
31            delete_cid: value.delete_cid,
32            is_deleted: value.is_deleted,
33            next_version: value.next_version,
34            prev_version: value.prev_version,
35        }
36    }
37}
38
39impl From<TupleMeta> for TupleMetaRepr {
40    fn from(value: TupleMeta) -> Self {
41        TupleMetaRepr {
42            insert_txn_id: value.insert_txn_id,
43            insert_cid: value.insert_cid,
44            delete_txn_id: value.delete_txn_id,
45            delete_cid: value.delete_cid,
46            is_deleted: value.is_deleted,
47            next_version: value.next_version,
48            prev_version: value.prev_version,
49        }
50    }
51}
52
53#[derive(Debug, Clone)]
54pub struct HeapInsertPayload {
55    pub relation: RelationIdent,
56    pub page_id: PageId,
57    pub slot_id: u16,
58    /// transaction id that produced this heap operation
59    pub op_txn_id: TransactionId,
60    pub tuple_meta: TupleMetaRepr,
61    pub tuple_data: Vec<u8>,
62}
63
64#[derive(Debug, Clone)]
65pub struct HeapUpdatePayload {
66    pub relation: RelationIdent,
67    pub page_id: PageId,
68    pub slot_id: u16,
69    /// transaction id that produced this heap operation
70    pub op_txn_id: TransactionId,
71    pub new_tuple_meta: TupleMetaRepr,
72    pub new_tuple_data: Vec<u8>,
73    pub old_tuple_meta: Option<TupleMetaRepr>,
74    pub old_tuple_data: Option<Vec<u8>>,
75}
76
77#[derive(Debug, Clone)]
78pub struct HeapDeletePayload {
79    pub relation: RelationIdent,
80    pub page_id: PageId,
81    pub slot_id: u16,
82    /// transaction id that produced this heap operation
83    pub op_txn_id: TransactionId,
84    pub old_tuple_meta: TupleMetaRepr,
85    pub old_tuple_data: Option<Vec<u8>>,
86}
87
88#[derive(Debug, Clone)]
89pub enum HeapRecordPayload {
90    Insert(HeapInsertPayload),
91    Update(HeapUpdatePayload),
92    Delete(HeapDeletePayload),
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[repr(u8)]
97pub enum HeapRecordKind {
98    Insert = 1,
99    Update = 2,
100    Delete = 3,
101}
102
103impl TryFrom<u8> for HeapRecordKind {
104    type Error = QuillSQLError;
105
106    fn try_from(value: u8) -> Result<Self, Self::Error> {
107        match value {
108            1 => Ok(HeapRecordKind::Insert),
109            2 => Ok(HeapRecordKind::Update),
110            3 => Ok(HeapRecordKind::Delete),
111            other => Err(QuillSQLError::Internal(format!(
112                "Unknown heap record kind: {}",
113                other
114            ))),
115        }
116    }
117}
118
119pub fn encode_heap_record(payload: &HeapRecordPayload) -> (u8, Vec<u8>) {
120    match payload {
121        HeapRecordPayload::Insert(body) => (HeapRecordKind::Insert as u8, encode_heap_insert(body)),
122        HeapRecordPayload::Update(body) => (HeapRecordKind::Update as u8, encode_heap_update(body)),
123        HeapRecordPayload::Delete(body) => (HeapRecordKind::Delete as u8, encode_heap_delete(body)),
124    }
125}
126
127pub fn decode_heap_record(bytes: &[u8], info: u8) -> QuillSQLResult<HeapRecordPayload> {
128    match HeapRecordKind::try_from(info)? {
129        HeapRecordKind::Insert => Ok(HeapRecordPayload::Insert(decode_heap_insert(bytes)?)),
130        HeapRecordKind::Update => Ok(HeapRecordPayload::Update(decode_heap_update(bytes)?)),
131        HeapRecordKind::Delete => Ok(HeapRecordPayload::Delete(decode_heap_delete(bytes)?)),
132    }
133}
134
135fn encode_relation_ident(relation: &RelationIdent, buf: &mut Vec<u8>) {
136    buf.extend_from_slice(&relation.root_page_id.to_le_bytes());
137}
138
139fn decode_relation_ident(bytes: &[u8]) -> QuillSQLResult<(RelationIdent, usize)> {
140    if bytes.len() < 4 {
141        return Err(QuillSQLError::Internal(
142            "Heap payload too short for relation ident".to_string(),
143        ));
144    }
145    let root_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as PageId;
146    Ok((RelationIdent { root_page_id }, 4))
147}
148
149fn encode_tuple_meta(meta: &TupleMetaRepr, buf: &mut Vec<u8>) {
150    buf.extend_from_slice(&meta.insert_txn_id.to_le_bytes());
151    buf.extend_from_slice(&meta.insert_cid.to_le_bytes());
152    buf.extend_from_slice(&meta.delete_txn_id.to_le_bytes());
153    buf.extend_from_slice(&meta.delete_cid.to_le_bytes());
154    buf.push(meta.is_deleted as u8);
155    if let Some(next) = meta.next_version {
156        buf.push(1);
157        buf.extend(RidCodec::encode(&next));
158    } else {
159        buf.push(0);
160    }
161    if let Some(prev) = meta.prev_version {
162        buf.push(1);
163        buf.extend(RidCodec::encode(&prev));
164    } else {
165        buf.push(0);
166    }
167}
168
169fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMetaRepr, usize)> {
170    if bytes.len() < 8 + 4 + 8 + 4 + 1 + 1 + 1 {
171        return Err(QuillSQLError::Internal(
172            "Heap payload too short for tuple meta".to_string(),
173        ));
174    }
175    let insert_txn_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as TransactionId;
176    let insert_cid = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as CommandId;
177    let delete_txn_id = u64::from_le_bytes(bytes[12..20].try_into().unwrap()) as TransactionId;
178    let delete_cid = u32::from_le_bytes(bytes[20..24].try_into().unwrap()) as CommandId;
179    let is_deleted = bytes[24] != 0;
180    let mut offset = 25;
181
182    let has_next = bytes
183        .get(offset)
184        .copied()
185        .ok_or_else(|| QuillSQLError::Internal("tuple meta missing next flag".to_string()))?
186        != 0;
187    offset += 1;
188    let next_version = if has_next {
189        let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
190        offset += consumed;
191        Some(rid)
192    } else {
193        None
194    };
195
196    let has_prev = bytes
197        .get(offset)
198        .copied()
199        .ok_or_else(|| QuillSQLError::Internal("tuple meta missing prev flag".to_string()))?
200        != 0;
201    offset += 1;
202    let prev_version = if has_prev {
203        let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
204        offset += consumed;
205        Some(rid)
206    } else {
207        None
208    };
209
210    Ok((
211        TupleMetaRepr {
212            insert_txn_id,
213            insert_cid,
214            delete_txn_id,
215            delete_cid,
216            is_deleted,
217            next_version,
218            prev_version,
219        },
220        offset,
221    ))
222}
223
224fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
225    buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
226    buf.extend_from_slice(data);
227}
228
229fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
230    if bytes.len() < 4 {
231        return Err(QuillSQLError::Internal(
232            "Heap payload missing length prefix".to_string(),
233        ));
234    }
235    let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
236    if bytes.len() < 4 + len {
237        return Err(QuillSQLError::Internal(
238            "Heap payload length prefix out of bounds".to_string(),
239        ));
240    }
241    Ok((bytes[4..4 + len].to_vec(), 4 + len))
242}
243
244fn encode_optional_bytes(opt: &Option<Vec<u8>>, buf: &mut Vec<u8>) {
245    match opt {
246        Some(data) => {
247            buf.push(1);
248            encode_bytes(data, buf);
249        }
250        None => buf.push(0),
251    }
252}
253
254fn decode_optional_bytes(bytes: &[u8]) -> QuillSQLResult<(Option<Vec<u8>>, usize)> {
255    if bytes.is_empty() {
256        return Err(QuillSQLError::Internal(
257            "Heap payload missing option flag".to_string(),
258        ));
259    }
260    let flag = bytes[0];
261    if flag == 0 {
262        Ok((None, 1))
263    } else {
264        let (data, consumed) = decode_bytes(&bytes[1..])?;
265        Ok((Some(data), consumed + 1))
266    }
267}
268
269fn encode_optional_meta(opt: &Option<TupleMetaRepr>, buf: &mut Vec<u8>) {
270    match opt {
271        Some(meta) => {
272            buf.push(1);
273            encode_tuple_meta(meta, buf);
274        }
275        None => buf.push(0),
276    }
277}
278
279fn decode_optional_meta(bytes: &[u8]) -> QuillSQLResult<(Option<TupleMetaRepr>, usize)> {
280    if bytes.is_empty() {
281        return Err(QuillSQLError::Internal(
282            "Heap payload missing option flag".to_string(),
283        ));
284    }
285    let flag = bytes[0];
286    if flag == 0 {
287        Ok((None, 1))
288    } else {
289        let (meta, consumed) = decode_tuple_meta(&bytes[1..])?;
290        Ok((Some(meta), consumed + 1))
291    }
292}
293
294fn encode_heap_insert(body: &HeapInsertPayload) -> Vec<u8> {
295    // Heap/Insert (rmid=Heap, info=1)
296    // body: relation(root_id u32) + page_id(4) + slot_id(2) + op_txn_id(8) + tuple_meta(17B) + tuple_data_len+data
297    let mut buf = Vec::new();
298    encode_relation_ident(&body.relation, &mut buf);
299    buf.extend_from_slice(&body.page_id.to_le_bytes());
300    buf.extend_from_slice(&body.slot_id.to_le_bytes());
301    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
302    encode_tuple_meta(&body.tuple_meta, &mut buf);
303    encode_bytes(&body.tuple_data, &mut buf);
304    buf
305}
306
307fn decode_heap_insert(bytes: &[u8]) -> QuillSQLResult<HeapInsertPayload> {
308    let (relation, mut offset) = decode_relation_ident(bytes)?;
309    if bytes.len() < offset + 4 + 2 {
310        return Err(QuillSQLError::Internal(
311            "Heap insert payload too short".to_string(),
312        ));
313    }
314    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
315    offset += 4;
316    let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
317    offset += 2;
318    if bytes.len() < offset + 8 {
319        return Err(QuillSQLError::Internal(
320            "Heap insert payload missing op_txn_id".to_string(),
321        ));
322    }
323    let op_txn_id =
324        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
325    offset += 8;
326    let (tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
327    offset += consumed;
328    let (tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
329    Ok(HeapInsertPayload {
330        relation,
331        page_id,
332        slot_id,
333        op_txn_id,
334        tuple_meta,
335        tuple_data,
336    })
337}
338
339fn encode_heap_update(body: &HeapUpdatePayload) -> Vec<u8> {
340    // Heap/Update (rmid=Heap, info=2)
341    // body: relation + page_id + slot_id + op_txn_id + new_meta(17B) + new_data + has_old_meta+old_meta + has_old_data+old_data
342    let mut buf = Vec::new();
343    encode_relation_ident(&body.relation, &mut buf);
344    buf.extend_from_slice(&body.page_id.to_le_bytes());
345    buf.extend_from_slice(&body.slot_id.to_le_bytes());
346    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
347    encode_tuple_meta(&body.new_tuple_meta, &mut buf);
348    encode_bytes(&body.new_tuple_data, &mut buf);
349    encode_optional_meta(&body.old_tuple_meta, &mut buf);
350    encode_optional_bytes(&body.old_tuple_data, &mut buf);
351    buf
352}
353
354fn decode_heap_update(bytes: &[u8]) -> QuillSQLResult<HeapUpdatePayload> {
355    let (relation, mut offset) = decode_relation_ident(bytes)?;
356    if bytes.len() < offset + 4 + 2 {
357        return Err(QuillSQLError::Internal(
358            "Heap update payload too short".to_string(),
359        ));
360    }
361    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
362    offset += 4;
363    let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
364    offset += 2;
365    if bytes.len() < offset + 8 {
366        return Err(QuillSQLError::Internal(
367            "Heap update payload missing op_txn_id".to_string(),
368        ));
369    }
370    let op_txn_id =
371        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
372    offset += 8;
373    let (new_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
374    offset += consumed;
375    let (new_tuple_data, consumed) = decode_bytes(&bytes[offset..])?;
376    offset += consumed;
377    let (old_tuple_meta, consumed) = decode_optional_meta(&bytes[offset..])?;
378    offset += consumed;
379    let (old_tuple_data, _consumed) = decode_optional_bytes(&bytes[offset..])?;
380    Ok(HeapUpdatePayload {
381        relation,
382        page_id,
383        slot_id,
384        op_txn_id,
385        new_tuple_meta,
386        new_tuple_data,
387        old_tuple_meta,
388        old_tuple_data,
389    })
390}
391
392fn encode_heap_delete(body: &HeapDeletePayload) -> Vec<u8> {
393    // Heap/Delete (rmid=Heap, info=3)
394    // body: relation + page_id + slot_id + op_txn_id + old_meta(17B) + has_old_data+len+data
395    let mut buf = Vec::new();
396    encode_relation_ident(&body.relation, &mut buf);
397    buf.extend_from_slice(&body.page_id.to_le_bytes());
398    buf.extend_from_slice(&body.slot_id.to_le_bytes());
399    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
400    encode_tuple_meta(&body.old_tuple_meta, &mut buf);
401    encode_optional_bytes(&body.old_tuple_data, &mut buf);
402    buf
403}
404
405fn decode_heap_delete(bytes: &[u8]) -> QuillSQLResult<HeapDeletePayload> {
406    let (relation, mut offset) = decode_relation_ident(bytes)?;
407    if bytes.len() < offset + 4 + 2 {
408        return Err(QuillSQLError::Internal(
409            "Heap delete payload too short".to_string(),
410        ));
411    }
412    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
413    offset += 4;
414    let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
415    offset += 2;
416    if bytes.len() < offset + 8 {
417        return Err(QuillSQLError::Internal(
418            "Heap delete payload missing op_txn_id".to_string(),
419        ));
420    }
421    let op_txn_id =
422        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
423    offset += 8;
424    let (old_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
425    offset += consumed;
426    let (old_tuple_data, _consumed) = decode_optional_bytes(&bytes[offset..])?;
427    Ok(HeapDeletePayload {
428        relation,
429        page_id,
430        slot_id,
431        op_txn_id,
432        old_tuple_meta,
433        old_tuple_data,
434    })
435}