1use std::convert::TryFrom;
2
3use crate::buffer::PageId;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::codec::RidCodec;
6use crate::storage::page::{RecordId, TupleMeta};
7use crate::transaction::{CommandId, TransactionId};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct RelationIdent {
11 pub root_page_id: PageId,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct TupleMetaRepr {
16 pub insert_txn_id: TransactionId,
17 pub insert_cid: CommandId,
18 pub delete_txn_id: TransactionId,
19 pub delete_cid: CommandId,
20 pub is_deleted: bool,
21 pub next_version: Option<RecordId>,
22 pub prev_version: Option<RecordId>,
23}
24
25impl From<TupleMetaRepr> for TupleMeta {
26 fn from(value: TupleMetaRepr) -> Self {
27 TupleMeta {
28 insert_txn_id: value.insert_txn_id,
29 insert_cid: value.insert_cid,
30 delete_txn_id: value.delete_txn_id,
31 delete_cid: value.delete_cid,
32 is_deleted: value.is_deleted,
33 next_version: value.next_version,
34 prev_version: value.prev_version,
35 }
36 }
37}
38
39impl From<TupleMeta> for TupleMetaRepr {
40 fn from(value: TupleMeta) -> Self {
41 TupleMetaRepr {
42 insert_txn_id: value.insert_txn_id,
43 insert_cid: value.insert_cid,
44 delete_txn_id: value.delete_txn_id,
45 delete_cid: value.delete_cid,
46 is_deleted: value.is_deleted,
47 next_version: value.next_version,
48 prev_version: value.prev_version,
49 }
50 }
51}
52
53#[derive(Debug, Clone)]
54pub struct HeapInsertPayload {
55 pub relation: RelationIdent,
56 pub page_id: PageId,
57 pub slot_id: u16,
58 pub op_txn_id: TransactionId,
60 pub tuple_meta: TupleMetaRepr,
61 pub tuple_data: Vec<u8>,
62}
63
64#[derive(Debug, Clone)]
65pub struct HeapDeletePayload {
66 pub relation: RelationIdent,
67 pub page_id: PageId,
68 pub slot_id: u16,
69 pub op_txn_id: TransactionId,
71 pub new_tuple_meta: TupleMetaRepr,
72 pub old_tuple_meta: TupleMetaRepr,
73 pub old_tuple_data: Vec<u8>,
74}
75
76#[derive(Debug, Clone)]
77pub enum HeapRecordPayload {
78 Insert(HeapInsertPayload),
79 Delete(HeapDeletePayload),
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83#[repr(u8)]
84pub enum HeapRecordKind {
85 Insert = 1,
86 Delete = 2,
87}
88
89impl TryFrom<u8> for HeapRecordKind {
90 type Error = QuillSQLError;
91
92 fn try_from(value: u8) -> Result<Self, Self::Error> {
93 match value {
94 1 => Ok(HeapRecordKind::Insert),
95 2 => Ok(HeapRecordKind::Delete),
96 other => Err(QuillSQLError::Internal(format!(
97 "Unknown heap record kind: {}",
98 other
99 ))),
100 }
101 }
102}
103
104pub fn encode_heap_record(payload: &HeapRecordPayload) -> (u8, Vec<u8>) {
105 match payload {
106 HeapRecordPayload::Insert(body) => (HeapRecordKind::Insert as u8, encode_heap_insert(body)),
107 HeapRecordPayload::Delete(body) => (HeapRecordKind::Delete as u8, encode_heap_delete(body)),
108 }
109}
110
111pub fn decode_heap_record(bytes: &[u8], info: u8) -> QuillSQLResult<HeapRecordPayload> {
112 match HeapRecordKind::try_from(info)? {
113 HeapRecordKind::Insert => Ok(HeapRecordPayload::Insert(decode_heap_insert(bytes)?)),
114 HeapRecordKind::Delete => Ok(HeapRecordPayload::Delete(decode_heap_delete(bytes)?)),
115 }
116}
117
118fn encode_relation_ident(relation: &RelationIdent, buf: &mut Vec<u8>) {
119 buf.extend_from_slice(&relation.root_page_id.to_le_bytes());
120}
121
122fn decode_relation_ident(bytes: &[u8]) -> QuillSQLResult<(RelationIdent, usize)> {
123 if bytes.len() < 4 {
124 return Err(QuillSQLError::Internal(
125 "Heap payload too short for relation ident".to_string(),
126 ));
127 }
128 let root_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as PageId;
129 Ok((RelationIdent { root_page_id }, 4))
130}
131
132fn encode_tuple_meta(meta: &TupleMetaRepr, buf: &mut Vec<u8>) {
133 buf.extend_from_slice(&meta.insert_txn_id.to_le_bytes());
134 buf.extend_from_slice(&meta.insert_cid.to_le_bytes());
135 buf.extend_from_slice(&meta.delete_txn_id.to_le_bytes());
136 buf.extend_from_slice(&meta.delete_cid.to_le_bytes());
137 buf.push(meta.is_deleted as u8);
138 if let Some(next) = meta.next_version {
139 buf.push(1);
140 buf.extend(RidCodec::encode(&next));
141 } else {
142 buf.push(0);
143 }
144 if let Some(prev) = meta.prev_version {
145 buf.push(1);
146 buf.extend(RidCodec::encode(&prev));
147 } else {
148 buf.push(0);
149 }
150}
151
152fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMetaRepr, usize)> {
153 if bytes.len() < 8 + 4 + 8 + 4 + 1 + 1 + 1 {
154 return Err(QuillSQLError::Internal(
155 "Heap payload too short for tuple meta".to_string(),
156 ));
157 }
158 let insert_txn_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as TransactionId;
159 let insert_cid = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as CommandId;
160 let delete_txn_id = u64::from_le_bytes(bytes[12..20].try_into().unwrap()) as TransactionId;
161 let delete_cid = u32::from_le_bytes(bytes[20..24].try_into().unwrap()) as CommandId;
162 let is_deleted = bytes[24] != 0;
163 let mut offset = 25;
164
165 let has_next = bytes
166 .get(offset)
167 .copied()
168 .ok_or_else(|| QuillSQLError::Internal("tuple meta missing next flag".to_string()))?
169 != 0;
170 offset += 1;
171 let next_version = if has_next {
172 let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
173 offset += consumed;
174 Some(rid)
175 } else {
176 None
177 };
178
179 let has_prev = bytes
180 .get(offset)
181 .copied()
182 .ok_or_else(|| QuillSQLError::Internal("tuple meta missing prev flag".to_string()))?
183 != 0;
184 offset += 1;
185 let prev_version = if has_prev {
186 let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
187 offset += consumed;
188 Some(rid)
189 } else {
190 None
191 };
192
193 Ok((
194 TupleMetaRepr {
195 insert_txn_id,
196 insert_cid,
197 delete_txn_id,
198 delete_cid,
199 is_deleted,
200 next_version,
201 prev_version,
202 },
203 offset,
204 ))
205}
206
207fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
208 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
209 buf.extend_from_slice(data);
210}
211
212fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
213 if bytes.len() < 4 {
214 return Err(QuillSQLError::Internal(
215 "Heap payload missing length prefix".to_string(),
216 ));
217 }
218 let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
219 if bytes.len() < 4 + len {
220 return Err(QuillSQLError::Internal(
221 "Heap payload length prefix out of bounds".to_string(),
222 ));
223 }
224 Ok((bytes[4..4 + len].to_vec(), 4 + len))
225}
226
227fn encode_heap_insert(body: &HeapInsertPayload) -> Vec<u8> {
228 let mut buf = Vec::new();
231 encode_relation_ident(&body.relation, &mut buf);
232 buf.extend_from_slice(&body.page_id.to_le_bytes());
233 buf.extend_from_slice(&body.slot_id.to_le_bytes());
234 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
235 encode_tuple_meta(&body.tuple_meta, &mut buf);
236 encode_bytes(&body.tuple_data, &mut buf);
237 buf
238}
239
240fn decode_heap_insert(bytes: &[u8]) -> QuillSQLResult<HeapInsertPayload> {
241 let (relation, mut offset) = decode_relation_ident(bytes)?;
242 if bytes.len() < offset + 4 + 2 {
243 return Err(QuillSQLError::Internal(
244 "Heap insert payload too short".to_string(),
245 ));
246 }
247 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
248 offset += 4;
249 let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
250 offset += 2;
251 if bytes.len() < offset + 8 {
252 return Err(QuillSQLError::Internal(
253 "Heap insert payload missing op_txn_id".to_string(),
254 ));
255 }
256 let op_txn_id =
257 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
258 offset += 8;
259 let (tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
260 offset += consumed;
261 let (tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
262 Ok(HeapInsertPayload {
263 relation,
264 page_id,
265 slot_id,
266 op_txn_id,
267 tuple_meta,
268 tuple_data,
269 })
270}
271
272fn encode_heap_delete(body: &HeapDeletePayload) -> Vec<u8> {
273 let mut buf = Vec::new();
276 encode_relation_ident(&body.relation, &mut buf);
277 buf.extend_from_slice(&body.page_id.to_le_bytes());
278 buf.extend_from_slice(&body.slot_id.to_le_bytes());
279 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
280 encode_tuple_meta(&body.new_tuple_meta, &mut buf);
281 encode_tuple_meta(&body.old_tuple_meta, &mut buf);
282 encode_bytes(&body.old_tuple_data, &mut buf);
283 buf
284}
285
286fn decode_heap_delete(bytes: &[u8]) -> QuillSQLResult<HeapDeletePayload> {
287 let (relation, mut offset) = decode_relation_ident(bytes)?;
288 if bytes.len() < offset + 4 + 2 {
289 return Err(QuillSQLError::Internal(
290 "Heap delete payload too short".to_string(),
291 ));
292 }
293 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
294 offset += 4;
295 let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
296 offset += 2;
297 if bytes.len() < offset + 8 {
298 return Err(QuillSQLError::Internal(
299 "Heap delete payload missing op_txn_id".to_string(),
300 ));
301 }
302 let op_txn_id =
303 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
304 offset += 8;
305 let (new_tuple_meta, consumed_new) = decode_tuple_meta(&bytes[offset..])?;
306 offset += consumed_new;
307 let (old_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
308 offset += consumed;
309 let (old_tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
310 Ok(HeapDeletePayload {
311 relation,
312 page_id,
313 slot_id,
314 op_txn_id,
315 new_tuple_meta,
316 old_tuple_meta,
317 old_tuple_data,
318 })
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use crate::transaction::TransactionId;
325
326 fn roundtrip(payload: HeapRecordPayload, kind: HeapRecordKind) {
327 let (info, bytes) = match &payload {
328 HeapRecordPayload::Insert(body) => (kind as u8, encode_heap_insert(body)),
329 HeapRecordPayload::Delete(body) => (kind as u8, encode_heap_delete(body)),
330 };
331 let decoded = decode_heap_record(&bytes, info).unwrap();
332 match (payload, decoded) {
333 (HeapRecordPayload::Insert(a), HeapRecordPayload::Insert(b)) => {
334 assert_eq!(a.relation.root_page_id, b.relation.root_page_id);
335 assert_eq!(a.page_id, b.page_id);
336 assert_eq!(a.slot_id, b.slot_id);
337 assert_eq!(a.op_txn_id, b.op_txn_id);
338 assert_eq!(a.tuple_meta, b.tuple_meta);
339 assert_eq!(a.tuple_data, b.tuple_data);
340 }
341 (HeapRecordPayload::Delete(a), HeapRecordPayload::Delete(b)) => {
342 assert_eq!(a.relation.root_page_id, b.relation.root_page_id);
343 assert_eq!(a.page_id, b.page_id);
344 assert_eq!(a.slot_id, b.slot_id);
345 assert_eq!(a.op_txn_id, b.op_txn_id);
346 assert_eq!(a.new_tuple_meta, b.new_tuple_meta);
347 assert_eq!(a.old_tuple_meta, b.old_tuple_meta);
348 assert_eq!(a.old_tuple_data, b.old_tuple_data);
349 }
350 _ => panic!("payload variant mismatch"),
351 }
352 }
353
354 #[test]
355 fn heap_insert_roundtrip() {
356 let payload = HeapRecordPayload::Insert(HeapInsertPayload {
357 relation: RelationIdent { root_page_id: 11 },
358 page_id: 9,
359 slot_id: 3,
360 op_txn_id: 42,
361 tuple_meta: TupleMetaRepr {
362 insert_txn_id: 42,
363 insert_cid: 1,
364 delete_txn_id: 0,
365 delete_cid: 0,
366 is_deleted: false,
367 next_version: None,
368 prev_version: None,
369 },
370 tuple_data: vec![1, 2, 3, 4],
371 });
372 roundtrip(payload, HeapRecordKind::Insert);
373 }
374
375 #[test]
376 fn heap_delete_roundtrip() {
377 let payload = HeapRecordPayload::Delete(HeapDeletePayload {
378 relation: RelationIdent { root_page_id: 7 },
379 page_id: 5,
380 slot_id: 2,
381 op_txn_id: TransactionId::default(),
382 new_tuple_meta: TupleMetaRepr {
383 insert_txn_id: 1,
384 insert_cid: 0,
385 delete_txn_id: 2,
386 delete_cid: 0,
387 is_deleted: true,
388 next_version: None,
389 prev_version: None,
390 },
391 old_tuple_meta: TupleMetaRepr {
392 insert_txn_id: 1,
393 insert_cid: 0,
394 delete_txn_id: 0,
395 delete_cid: 0,
396 is_deleted: false,
397 next_version: None,
398 prev_version: None,
399 },
400 old_tuple_data: vec![9; 6],
401 });
402 roundtrip(payload, HeapRecordKind::Delete);
403 }
404}