1use std::convert::TryFrom;
2
3use crate::buffer::PageId;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::codec::RidCodec;
6use crate::storage::page::{RecordId, TupleMeta};
7use crate::transaction::{CommandId, TransactionId};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct RelationIdent {
11 pub root_page_id: PageId,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct TupleMetaRepr {
16 pub insert_txn_id: TransactionId,
17 pub insert_cid: CommandId,
18 pub delete_txn_id: TransactionId,
19 pub delete_cid: CommandId,
20 pub is_deleted: bool,
21 pub next_version: Option<RecordId>,
22 pub prev_version: Option<RecordId>,
23}
24
25impl From<TupleMetaRepr> for TupleMeta {
26 fn from(value: TupleMetaRepr) -> Self {
27 TupleMeta {
28 insert_txn_id: value.insert_txn_id,
29 insert_cid: value.insert_cid,
30 delete_txn_id: value.delete_txn_id,
31 delete_cid: value.delete_cid,
32 is_deleted: value.is_deleted,
33 next_version: value.next_version,
34 prev_version: value.prev_version,
35 }
36 }
37}
38
39impl From<TupleMeta> for TupleMetaRepr {
40 fn from(value: TupleMeta) -> Self {
41 TupleMetaRepr {
42 insert_txn_id: value.insert_txn_id,
43 insert_cid: value.insert_cid,
44 delete_txn_id: value.delete_txn_id,
45 delete_cid: value.delete_cid,
46 is_deleted: value.is_deleted,
47 next_version: value.next_version,
48 prev_version: value.prev_version,
49 }
50 }
51}
52
53#[derive(Debug, Clone)]
54pub struct HeapInsertPayload {
55 pub relation: RelationIdent,
56 pub page_id: PageId,
57 pub slot_id: u16,
58 pub op_txn_id: TransactionId,
60 pub tuple_meta: TupleMetaRepr,
61 pub tuple_data: Vec<u8>,
62}
63
64#[derive(Debug, Clone)]
65pub struct HeapUpdatePayload {
66 pub relation: RelationIdent,
67 pub page_id: PageId,
68 pub slot_id: u16,
69 pub op_txn_id: TransactionId,
71 pub new_tuple_meta: TupleMetaRepr,
72 pub new_tuple_data: Vec<u8>,
73 pub old_tuple_meta: Option<TupleMetaRepr>,
74 pub old_tuple_data: Option<Vec<u8>>,
75}
76
77#[derive(Debug, Clone)]
78pub struct HeapDeletePayload {
79 pub relation: RelationIdent,
80 pub page_id: PageId,
81 pub slot_id: u16,
82 pub op_txn_id: TransactionId,
84 pub old_tuple_meta: TupleMetaRepr,
85 pub old_tuple_data: Option<Vec<u8>>,
86}
87
88#[derive(Debug, Clone)]
89pub enum HeapRecordPayload {
90 Insert(HeapInsertPayload),
91 Update(HeapUpdatePayload),
92 Delete(HeapDeletePayload),
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[repr(u8)]
97pub enum HeapRecordKind {
98 Insert = 1,
99 Update = 2,
100 Delete = 3,
101}
102
103impl TryFrom<u8> for HeapRecordKind {
104 type Error = QuillSQLError;
105
106 fn try_from(value: u8) -> Result<Self, Self::Error> {
107 match value {
108 1 => Ok(HeapRecordKind::Insert),
109 2 => Ok(HeapRecordKind::Update),
110 3 => Ok(HeapRecordKind::Delete),
111 other => Err(QuillSQLError::Internal(format!(
112 "Unknown heap record kind: {}",
113 other
114 ))),
115 }
116 }
117}
118
119pub fn encode_heap_record(payload: &HeapRecordPayload) -> (u8, Vec<u8>) {
120 match payload {
121 HeapRecordPayload::Insert(body) => (HeapRecordKind::Insert as u8, encode_heap_insert(body)),
122 HeapRecordPayload::Update(body) => (HeapRecordKind::Update as u8, encode_heap_update(body)),
123 HeapRecordPayload::Delete(body) => (HeapRecordKind::Delete as u8, encode_heap_delete(body)),
124 }
125}
126
127pub fn decode_heap_record(bytes: &[u8], info: u8) -> QuillSQLResult<HeapRecordPayload> {
128 match HeapRecordKind::try_from(info)? {
129 HeapRecordKind::Insert => Ok(HeapRecordPayload::Insert(decode_heap_insert(bytes)?)),
130 HeapRecordKind::Update => Ok(HeapRecordPayload::Update(decode_heap_update(bytes)?)),
131 HeapRecordKind::Delete => Ok(HeapRecordPayload::Delete(decode_heap_delete(bytes)?)),
132 }
133}
134
135fn encode_relation_ident(relation: &RelationIdent, buf: &mut Vec<u8>) {
136 buf.extend_from_slice(&relation.root_page_id.to_le_bytes());
137}
138
139fn decode_relation_ident(bytes: &[u8]) -> QuillSQLResult<(RelationIdent, usize)> {
140 if bytes.len() < 4 {
141 return Err(QuillSQLError::Internal(
142 "Heap payload too short for relation ident".to_string(),
143 ));
144 }
145 let root_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as PageId;
146 Ok((RelationIdent { root_page_id }, 4))
147}
148
149fn encode_tuple_meta(meta: &TupleMetaRepr, buf: &mut Vec<u8>) {
150 buf.extend_from_slice(&meta.insert_txn_id.to_le_bytes());
151 buf.extend_from_slice(&meta.insert_cid.to_le_bytes());
152 buf.extend_from_slice(&meta.delete_txn_id.to_le_bytes());
153 buf.extend_from_slice(&meta.delete_cid.to_le_bytes());
154 buf.push(meta.is_deleted as u8);
155 if let Some(next) = meta.next_version {
156 buf.push(1);
157 buf.extend(RidCodec::encode(&next));
158 } else {
159 buf.push(0);
160 }
161 if let Some(prev) = meta.prev_version {
162 buf.push(1);
163 buf.extend(RidCodec::encode(&prev));
164 } else {
165 buf.push(0);
166 }
167}
168
169fn decode_tuple_meta(bytes: &[u8]) -> QuillSQLResult<(TupleMetaRepr, usize)> {
170 if bytes.len() < 8 + 4 + 8 + 4 + 1 + 1 + 1 {
171 return Err(QuillSQLError::Internal(
172 "Heap payload too short for tuple meta".to_string(),
173 ));
174 }
175 let insert_txn_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as TransactionId;
176 let insert_cid = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as CommandId;
177 let delete_txn_id = u64::from_le_bytes(bytes[12..20].try_into().unwrap()) as TransactionId;
178 let delete_cid = u32::from_le_bytes(bytes[20..24].try_into().unwrap()) as CommandId;
179 let is_deleted = bytes[24] != 0;
180 let mut offset = 25;
181
182 let has_next = bytes
183 .get(offset)
184 .copied()
185 .ok_or_else(|| QuillSQLError::Internal("tuple meta missing next flag".to_string()))?
186 != 0;
187 offset += 1;
188 let next_version = if has_next {
189 let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
190 offset += consumed;
191 Some(rid)
192 } else {
193 None
194 };
195
196 let has_prev = bytes
197 .get(offset)
198 .copied()
199 .ok_or_else(|| QuillSQLError::Internal("tuple meta missing prev flag".to_string()))?
200 != 0;
201 offset += 1;
202 let prev_version = if has_prev {
203 let (rid, consumed) = RidCodec::decode(&bytes[offset..])?;
204 offset += consumed;
205 Some(rid)
206 } else {
207 None
208 };
209
210 Ok((
211 TupleMetaRepr {
212 insert_txn_id,
213 insert_cid,
214 delete_txn_id,
215 delete_cid,
216 is_deleted,
217 next_version,
218 prev_version,
219 },
220 offset,
221 ))
222}
223
224fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
225 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
226 buf.extend_from_slice(data);
227}
228
229fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
230 if bytes.len() < 4 {
231 return Err(QuillSQLError::Internal(
232 "Heap payload missing length prefix".to_string(),
233 ));
234 }
235 let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
236 if bytes.len() < 4 + len {
237 return Err(QuillSQLError::Internal(
238 "Heap payload length prefix out of bounds".to_string(),
239 ));
240 }
241 Ok((bytes[4..4 + len].to_vec(), 4 + len))
242}
243
244fn encode_optional_bytes(opt: &Option<Vec<u8>>, buf: &mut Vec<u8>) {
245 match opt {
246 Some(data) => {
247 buf.push(1);
248 encode_bytes(data, buf);
249 }
250 None => buf.push(0),
251 }
252}
253
254fn decode_optional_bytes(bytes: &[u8]) -> QuillSQLResult<(Option<Vec<u8>>, usize)> {
255 if bytes.is_empty() {
256 return Err(QuillSQLError::Internal(
257 "Heap payload missing option flag".to_string(),
258 ));
259 }
260 let flag = bytes[0];
261 if flag == 0 {
262 Ok((None, 1))
263 } else {
264 let (data, consumed) = decode_bytes(&bytes[1..])?;
265 Ok((Some(data), consumed + 1))
266 }
267}
268
269fn encode_optional_meta(opt: &Option<TupleMetaRepr>, buf: &mut Vec<u8>) {
270 match opt {
271 Some(meta) => {
272 buf.push(1);
273 encode_tuple_meta(meta, buf);
274 }
275 None => buf.push(0),
276 }
277}
278
279fn decode_optional_meta(bytes: &[u8]) -> QuillSQLResult<(Option<TupleMetaRepr>, usize)> {
280 if bytes.is_empty() {
281 return Err(QuillSQLError::Internal(
282 "Heap payload missing option flag".to_string(),
283 ));
284 }
285 let flag = bytes[0];
286 if flag == 0 {
287 Ok((None, 1))
288 } else {
289 let (meta, consumed) = decode_tuple_meta(&bytes[1..])?;
290 Ok((Some(meta), consumed + 1))
291 }
292}
293
294fn encode_heap_insert(body: &HeapInsertPayload) -> Vec<u8> {
295 let mut buf = Vec::new();
298 encode_relation_ident(&body.relation, &mut buf);
299 buf.extend_from_slice(&body.page_id.to_le_bytes());
300 buf.extend_from_slice(&body.slot_id.to_le_bytes());
301 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
302 encode_tuple_meta(&body.tuple_meta, &mut buf);
303 encode_bytes(&body.tuple_data, &mut buf);
304 buf
305}
306
307fn decode_heap_insert(bytes: &[u8]) -> QuillSQLResult<HeapInsertPayload> {
308 let (relation, mut offset) = decode_relation_ident(bytes)?;
309 if bytes.len() < offset + 4 + 2 {
310 return Err(QuillSQLError::Internal(
311 "Heap insert payload too short".to_string(),
312 ));
313 }
314 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
315 offset += 4;
316 let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
317 offset += 2;
318 if bytes.len() < offset + 8 {
319 return Err(QuillSQLError::Internal(
320 "Heap insert payload missing op_txn_id".to_string(),
321 ));
322 }
323 let op_txn_id =
324 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
325 offset += 8;
326 let (tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
327 offset += consumed;
328 let (tuple_data, _consumed) = decode_bytes(&bytes[offset..])?;
329 Ok(HeapInsertPayload {
330 relation,
331 page_id,
332 slot_id,
333 op_txn_id,
334 tuple_meta,
335 tuple_data,
336 })
337}
338
339fn encode_heap_update(body: &HeapUpdatePayload) -> Vec<u8> {
340 let mut buf = Vec::new();
343 encode_relation_ident(&body.relation, &mut buf);
344 buf.extend_from_slice(&body.page_id.to_le_bytes());
345 buf.extend_from_slice(&body.slot_id.to_le_bytes());
346 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
347 encode_tuple_meta(&body.new_tuple_meta, &mut buf);
348 encode_bytes(&body.new_tuple_data, &mut buf);
349 encode_optional_meta(&body.old_tuple_meta, &mut buf);
350 encode_optional_bytes(&body.old_tuple_data, &mut buf);
351 buf
352}
353
354fn decode_heap_update(bytes: &[u8]) -> QuillSQLResult<HeapUpdatePayload> {
355 let (relation, mut offset) = decode_relation_ident(bytes)?;
356 if bytes.len() < offset + 4 + 2 {
357 return Err(QuillSQLError::Internal(
358 "Heap update payload too short".to_string(),
359 ));
360 }
361 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
362 offset += 4;
363 let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
364 offset += 2;
365 if bytes.len() < offset + 8 {
366 return Err(QuillSQLError::Internal(
367 "Heap update payload missing op_txn_id".to_string(),
368 ));
369 }
370 let op_txn_id =
371 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
372 offset += 8;
373 let (new_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
374 offset += consumed;
375 let (new_tuple_data, consumed) = decode_bytes(&bytes[offset..])?;
376 offset += consumed;
377 let (old_tuple_meta, consumed) = decode_optional_meta(&bytes[offset..])?;
378 offset += consumed;
379 let (old_tuple_data, _consumed) = decode_optional_bytes(&bytes[offset..])?;
380 Ok(HeapUpdatePayload {
381 relation,
382 page_id,
383 slot_id,
384 op_txn_id,
385 new_tuple_meta,
386 new_tuple_data,
387 old_tuple_meta,
388 old_tuple_data,
389 })
390}
391
392fn encode_heap_delete(body: &HeapDeletePayload) -> Vec<u8> {
393 let mut buf = Vec::new();
396 encode_relation_ident(&body.relation, &mut buf);
397 buf.extend_from_slice(&body.page_id.to_le_bytes());
398 buf.extend_from_slice(&body.slot_id.to_le_bytes());
399 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
400 encode_tuple_meta(&body.old_tuple_meta, &mut buf);
401 encode_optional_bytes(&body.old_tuple_data, &mut buf);
402 buf
403}
404
405fn decode_heap_delete(bytes: &[u8]) -> QuillSQLResult<HeapDeletePayload> {
406 let (relation, mut offset) = decode_relation_ident(bytes)?;
407 if bytes.len() < offset + 4 + 2 {
408 return Err(QuillSQLError::Internal(
409 "Heap delete payload too short".to_string(),
410 ));
411 }
412 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as PageId;
413 offset += 4;
414 let slot_id = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
415 offset += 2;
416 if bytes.len() < offset + 8 {
417 return Err(QuillSQLError::Internal(
418 "Heap delete payload missing op_txn_id".to_string(),
419 ));
420 }
421 let op_txn_id =
422 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
423 offset += 8;
424 let (old_tuple_meta, consumed) = decode_tuple_meta(&bytes[offset..])?;
425 offset += consumed;
426 let (old_tuple_data, _consumed) = decode_optional_bytes(&bytes[offset..])?;
427 Ok(HeapDeletePayload {
428 relation,
429 page_id,
430 slot_id,
431 op_txn_id,
432 old_tuple_meta,
433 old_tuple_data,
434 })
435}