1use crate::buffer::PageId;
2use crate::catalog::{Column, DataType, Schema, SchemaRef};
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, RidCodec};
5use crate::storage::page::RecordId;
6use crate::transaction::TransactionId;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexRelationIdent {
11 pub header_page_id: PageId,
12 pub schema: SchemaRepr,
13}
14
15impl IndexRelationIdent {
16 pub fn schema_ref(&self) -> SchemaRef {
17 self.schema.to_schema_ref()
18 }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct SchemaRepr {
23 pub columns: Vec<ColumnRepr>,
24}
25
26impl From<SchemaRef> for SchemaRepr {
27 fn from(schema: SchemaRef) -> Self {
28 let columns = schema
29 .columns
30 .iter()
31 .map(|col| ColumnRepr {
32 name: col.name.clone(),
33 data_type: col.data_type,
34 nullable: col.nullable,
35 })
36 .collect();
37 Self { columns }
38 }
39}
40
41impl SchemaRepr {
42 pub fn to_schema_ref(&self) -> SchemaRef {
43 let cols = self
44 .columns
45 .iter()
46 .map(|c| Column::new(&c.name, c.data_type, c.nullable))
47 .collect::<Vec<_>>();
48 Arc::new(Schema::new(cols))
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ColumnRepr {
54 pub name: String,
55 pub data_type: DataType,
56 pub nullable: bool,
57}
58
59#[derive(Debug, Clone)]
60pub struct IndexLeafInsertPayload {
61 pub relation: IndexRelationIdent,
62 pub page_id: PageId,
63 pub op_txn_id: TransactionId,
64 pub key_data: Vec<u8>,
65 pub rid: RecordId,
66}
67
68#[derive(Debug, Clone)]
69pub struct IndexLeafDeletePayload {
70 pub relation: IndexRelationIdent,
71 pub page_id: PageId,
72 pub op_txn_id: TransactionId,
73 pub key_data: Vec<u8>,
74 pub old_rid: RecordId,
75}
76
77#[derive(Debug, Clone)]
78pub struct IndexLeafSplitEntryPayload {
79 pub key_data: Vec<u8>,
80 pub rid: RecordId,
81}
82
83#[derive(Debug, Clone)]
84pub struct IndexLeafSplitPayload {
85 pub relation: IndexRelationIdent,
86 pub left_page_id: PageId,
87 pub right_page_id: PageId,
88 pub leaf_max_size: u32,
89 pub split_index: u16,
90 pub right_next_page_id: PageId,
91 pub entries: Vec<IndexLeafSplitEntryPayload>,
92}
93
94#[derive(Debug, Clone)]
95pub struct IndexLeafMergePayload {
96 pub relation: IndexRelationIdent,
97 pub left_page_id: PageId,
98 pub right_page_id: PageId,
99 pub leaf_max_size: u32,
100 pub left_next_page_id: PageId,
101 pub entries: Vec<IndexLeafSplitEntryPayload>,
102}
103
104#[derive(Debug, Clone)]
105pub struct IndexInternalEntryPayload {
106 pub key_data: Vec<u8>,
107 pub child_page_id: PageId,
108}
109
110#[derive(Debug, Clone)]
111pub struct IndexInternalSplitPayload {
112 pub relation: IndexRelationIdent,
113 pub left_page_id: PageId,
114 pub left_new_size: u16,
115 pub left_high_key: Option<Vec<u8>>,
116 pub left_next_page_id: PageId,
117 pub right_page_id: PageId,
118 pub internal_max_size: u32,
119 pub right_entries: Vec<IndexInternalEntryPayload>,
120 pub right_high_key: Option<Vec<u8>>,
121 pub right_next_page_id: PageId,
122}
123
124#[derive(Debug, Clone)]
125pub struct IndexInternalMergePayload {
126 pub relation: IndexRelationIdent,
127 pub left_page_id: PageId,
128 pub right_page_id: PageId,
129 pub internal_max_size: u32,
130 pub left_entries: Vec<IndexInternalEntryPayload>,
131 pub high_key: Option<Vec<u8>>,
132 pub next_page_id: PageId,
133}
134
135#[derive(Debug, Clone)]
136pub struct IndexParentInsertPayload {
137 pub relation: IndexRelationIdent,
138 pub parent_page_id: PageId,
139 pub left_child_page_id: PageId,
140 pub right_child_page_id: PageId,
141 pub key_data: Vec<u8>,
142}
143
144#[derive(Debug, Clone)]
145pub struct IndexParentDeletePayload {
146 pub relation: IndexRelationIdent,
147 pub parent_page_id: PageId,
148 pub child_page_id: PageId,
149}
150
151#[derive(Debug, Clone)]
152pub struct IndexParentUpdatePayload {
153 pub relation: IndexRelationIdent,
154 pub parent_page_id: PageId,
155 pub child_page_id: PageId,
156 pub key_data: Vec<u8>,
157}
158
159#[derive(Debug, Clone)]
160pub struct IndexLeafRedistributePayload {
161 pub relation: IndexRelationIdent,
162 pub from_page_id: PageId,
163 pub to_page_id: PageId,
164 pub from_is_left: bool,
165 pub moved_entry: IndexLeafSplitEntryPayload,
166}
167
168#[derive(Debug, Clone)]
169pub struct IndexInternalRedistributePayload {
170 pub relation: IndexRelationIdent,
171 pub from_page_id: PageId,
172 pub to_page_id: PageId,
173 pub parent_page_id: PageId,
174 pub from_is_left: bool,
175 pub from_old_sentinel: PageId,
176 pub to_old_sentinel: PageId,
177 pub moved_entry: IndexInternalEntryPayload,
178 pub separator_key_data: Vec<u8>,
179 pub to_new_high_key: Option<Vec<u8>>,
180 pub to_new_next_page_id: PageId,
181 pub from_new_high_key: Option<Vec<u8>>,
182 pub from_new_next_page_id: PageId,
183}
184
185#[derive(Debug, Clone)]
186pub struct IndexRootInstallLeafPayload {
187 pub relation: IndexRelationIdent,
188 pub page_id: PageId,
189 pub leaf_max_size: u32,
190 pub next_page_id: PageId,
191 pub entries: Vec<IndexLeafSplitEntryPayload>,
192}
193
194#[derive(Debug, Clone)]
195pub struct IndexRootInstallInternalPayload {
196 pub relation: IndexRelationIdent,
197 pub page_id: PageId,
198 pub internal_max_size: u32,
199 pub entries: Vec<IndexInternalEntryPayload>,
200 pub high_key: Option<Vec<u8>>,
201 pub next_page_id: PageId,
202}
203
204#[derive(Debug, Clone)]
205pub struct IndexRootAdoptPayload {
206 pub relation: IndexRelationIdent,
207 pub new_root_page_id: PageId,
208}
209
210#[derive(Debug, Clone)]
211pub struct IndexRootResetPayload {
212 pub relation: IndexRelationIdent,
213}
214
215#[derive(Debug, Clone)]
216pub enum IndexRecordPayload {
217 LeafInsert(IndexLeafInsertPayload),
218 LeafDelete(IndexLeafDeletePayload),
219 LeafSplit(IndexLeafSplitPayload),
220 InternalSplit(IndexInternalSplitPayload),
221 ParentInsert(IndexParentInsertPayload),
222 LeafMerge(IndexLeafMergePayload),
223 InternalMerge(IndexInternalMergePayload),
224 ParentDelete(IndexParentDeletePayload),
225 ParentUpdate(IndexParentUpdatePayload),
226 LeafRedistribute(IndexLeafRedistributePayload),
227 InternalRedistribute(IndexInternalRedistributePayload),
228 RootInstallLeaf(IndexRootInstallLeafPayload),
229 RootInstallInternal(IndexRootInstallInternalPayload),
230 RootAdopt(IndexRootAdoptPayload),
231 RootReset(IndexRootResetPayload),
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235#[repr(u8)]
236pub enum IndexRecordKind {
237 LeafInsert = 1,
238 LeafDelete = 2,
239 LeafSplit = 3,
240 InternalSplit = 4,
241 ParentInsert = 5,
242 LeafMerge = 6,
243 InternalMerge = 7,
244 ParentDelete = 8,
245 ParentUpdate = 9,
246 LeafRedistribute = 10,
247 InternalRedistribute = 11,
248 RootInstallLeaf = 12,
249 RootInstallInternal = 13,
250 RootAdopt = 14,
251 RootReset = 15,
252}
253
254impl TryFrom<u8> for IndexRecordKind {
255 type Error = QuillSQLError;
256
257 fn try_from(value: u8) -> Result<Self, Self::Error> {
258 match value {
259 1 => Ok(IndexRecordKind::LeafInsert),
260 2 => Ok(IndexRecordKind::LeafDelete),
261 3 => Ok(IndexRecordKind::LeafSplit),
262 4 => Ok(IndexRecordKind::InternalSplit),
263 5 => Ok(IndexRecordKind::ParentInsert),
264 6 => Ok(IndexRecordKind::LeafMerge),
265 7 => Ok(IndexRecordKind::InternalMerge),
266 8 => Ok(IndexRecordKind::ParentDelete),
267 9 => Ok(IndexRecordKind::ParentUpdate),
268 10 => Ok(IndexRecordKind::LeafRedistribute),
269 11 => Ok(IndexRecordKind::InternalRedistribute),
270 12 => Ok(IndexRecordKind::RootInstallLeaf),
271 13 => Ok(IndexRecordKind::RootInstallInternal),
272 14 => Ok(IndexRecordKind::RootAdopt),
273 15 => Ok(IndexRecordKind::RootReset),
274 other => Err(QuillSQLError::Internal(format!(
275 "Unknown index record kind: {}",
276 other
277 ))),
278 }
279 }
280}
281
282pub fn encode_index_record(payload: &IndexRecordPayload) -> (u8, Vec<u8>) {
283 match payload {
284 IndexRecordPayload::LeafInsert(body) => {
285 (IndexRecordKind::LeafInsert as u8, encode_leaf_insert(body))
286 }
287 IndexRecordPayload::LeafDelete(body) => {
288 (IndexRecordKind::LeafDelete as u8, encode_leaf_delete(body))
289 }
290 IndexRecordPayload::LeafSplit(body) => {
291 (IndexRecordKind::LeafSplit as u8, encode_leaf_split(body))
292 }
293 IndexRecordPayload::InternalSplit(body) => (
294 IndexRecordKind::InternalSplit as u8,
295 encode_internal_split(body),
296 ),
297 IndexRecordPayload::ParentInsert(body) => (
298 IndexRecordKind::ParentInsert as u8,
299 encode_parent_insert(body),
300 ),
301 IndexRecordPayload::LeafMerge(body) => {
302 (IndexRecordKind::LeafMerge as u8, encode_leaf_merge(body))
303 }
304 IndexRecordPayload::InternalMerge(body) => (
305 IndexRecordKind::InternalMerge as u8,
306 encode_internal_merge(body),
307 ),
308 IndexRecordPayload::ParentDelete(body) => (
309 IndexRecordKind::ParentDelete as u8,
310 encode_parent_delete(body),
311 ),
312 IndexRecordPayload::ParentUpdate(body) => (
313 IndexRecordKind::ParentUpdate as u8,
314 encode_parent_update(body),
315 ),
316 IndexRecordPayload::LeafRedistribute(body) => (
317 IndexRecordKind::LeafRedistribute as u8,
318 encode_leaf_redistribute(body),
319 ),
320 IndexRecordPayload::InternalRedistribute(body) => (
321 IndexRecordKind::InternalRedistribute as u8,
322 encode_internal_redistribute(body),
323 ),
324 IndexRecordPayload::RootInstallLeaf(body) => (
325 IndexRecordKind::RootInstallLeaf as u8,
326 encode_root_install_leaf(body),
327 ),
328 IndexRecordPayload::RootInstallInternal(body) => (
329 IndexRecordKind::RootInstallInternal as u8,
330 encode_root_install_internal(body),
331 ),
332 IndexRecordPayload::RootAdopt(body) => {
333 (IndexRecordKind::RootAdopt as u8, encode_root_adopt(body))
334 }
335 IndexRecordPayload::RootReset(body) => {
336 (IndexRecordKind::RootReset as u8, encode_root_reset(body))
337 }
338 }
339}
340
341pub fn decode_index_record(bytes: &[u8], info: u8) -> QuillSQLResult<IndexRecordPayload> {
342 match IndexRecordKind::try_from(info)? {
343 IndexRecordKind::LeafInsert => {
344 Ok(IndexRecordPayload::LeafInsert(decode_leaf_insert(bytes)?))
345 }
346 IndexRecordKind::LeafDelete => {
347 Ok(IndexRecordPayload::LeafDelete(decode_leaf_delete(bytes)?))
348 }
349 IndexRecordKind::LeafSplit => Ok(IndexRecordPayload::LeafSplit(decode_leaf_split(bytes)?)),
350 IndexRecordKind::InternalSplit => Ok(IndexRecordPayload::InternalSplit(
351 decode_internal_split(bytes)?,
352 )),
353 IndexRecordKind::ParentInsert => Ok(IndexRecordPayload::ParentInsert(
354 decode_parent_insert(bytes)?,
355 )),
356 IndexRecordKind::LeafMerge => Ok(IndexRecordPayload::LeafMerge(decode_leaf_merge(bytes)?)),
357 IndexRecordKind::InternalMerge => Ok(IndexRecordPayload::InternalMerge(
358 decode_internal_merge(bytes)?,
359 )),
360 IndexRecordKind::ParentDelete => Ok(IndexRecordPayload::ParentDelete(
361 decode_parent_delete(bytes)?,
362 )),
363 IndexRecordKind::ParentUpdate => Ok(IndexRecordPayload::ParentUpdate(
364 decode_parent_update(bytes)?,
365 )),
366 IndexRecordKind::LeafRedistribute => Ok(IndexRecordPayload::LeafRedistribute(
367 decode_leaf_redistribute(bytes)?,
368 )),
369 IndexRecordKind::InternalRedistribute => Ok(IndexRecordPayload::InternalRedistribute(
370 decode_internal_redistribute(bytes)?,
371 )),
372 IndexRecordKind::RootInstallLeaf => Ok(IndexRecordPayload::RootInstallLeaf(
373 decode_root_install_leaf(bytes)?,
374 )),
375 IndexRecordKind::RootInstallInternal => Ok(IndexRecordPayload::RootInstallInternal(
376 decode_root_install_internal(bytes)?,
377 )),
378 IndexRecordKind::RootAdopt => Ok(IndexRecordPayload::RootAdopt(decode_root_adopt(bytes)?)),
379 IndexRecordKind::RootReset => Ok(IndexRecordPayload::RootReset(decode_root_reset(bytes)?)),
380 }
381}
382
383fn encode_leaf_insert(body: &IndexLeafInsertPayload) -> Vec<u8> {
384 let mut buf = Vec::new();
385 encode_relation(&body.relation, &mut buf);
386 buf.extend_from_slice(&body.page_id.to_le_bytes());
387 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
388 encode_bytes(&body.key_data, &mut buf);
389 buf.extend(RidCodec::encode(&body.rid));
390 buf
391}
392
393fn decode_leaf_insert(bytes: &[u8]) -> QuillSQLResult<IndexLeafInsertPayload> {
394 let (relation, mut offset) = decode_relation(bytes)?;
395 if bytes.len() < offset + 4 + 8 {
396 return Err(QuillSQLError::Internal(
397 "Index leaf insert payload too short".to_string(),
398 ));
399 }
400 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
401 offset += 4;
402 let op_txn_id =
403 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
404 offset += 8;
405 let (key_data, consumed) = decode_bytes(&bytes[offset..])?;
406 offset += consumed;
407 let (rid, _) = RidCodec::decode(&bytes[offset..])?;
408 Ok(IndexLeafInsertPayload {
409 relation,
410 page_id,
411 op_txn_id,
412 key_data,
413 rid,
414 })
415}
416
417fn encode_leaf_delete(body: &IndexLeafDeletePayload) -> Vec<u8> {
418 let mut buf = Vec::new();
419 encode_relation(&body.relation, &mut buf);
420 buf.extend_from_slice(&body.page_id.to_le_bytes());
421 buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
422 encode_bytes(&body.key_data, &mut buf);
423 buf.extend(RidCodec::encode(&body.old_rid));
424 buf
425}
426
427fn decode_leaf_delete(bytes: &[u8]) -> QuillSQLResult<IndexLeafDeletePayload> {
428 let (relation, mut offset) = decode_relation(bytes)?;
429 if bytes.len() < offset + 4 + 8 {
430 return Err(QuillSQLError::Internal(
431 "Index leaf delete payload too short".to_string(),
432 ));
433 }
434 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
435 offset += 4;
436 let op_txn_id =
437 u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
438 offset += 8;
439 let (key_data, consumed) = decode_bytes(&bytes[offset..])?;
440 offset += consumed;
441 let (old_rid, _) = RidCodec::decode(&bytes[offset..])?;
442 Ok(IndexLeafDeletePayload {
443 relation,
444 page_id,
445 op_txn_id,
446 key_data,
447 old_rid,
448 })
449}
450
451fn encode_leaf_split_entry(entry: &IndexLeafSplitEntryPayload, buf: &mut Vec<u8>) {
452 encode_bytes(&entry.key_data, buf);
453 buf.extend(RidCodec::encode(&entry.rid));
454}
455
456fn decode_leaf_split_entry(bytes: &[u8]) -> QuillSQLResult<(IndexLeafSplitEntryPayload, usize)> {
457 let (key_data, consumed_key) = decode_bytes(bytes)?;
458 let (rid, consumed_rid) = RidCodec::decode(&bytes[consumed_key..])?;
459 Ok((
460 IndexLeafSplitEntryPayload { key_data, rid },
461 consumed_key + consumed_rid,
462 ))
463}
464
465fn encode_leaf_split(body: &IndexLeafSplitPayload) -> Vec<u8> {
466 let mut buf = Vec::new();
467 encode_relation(&body.relation, &mut buf);
468 buf.extend_from_slice(&body.left_page_id.to_le_bytes());
469 buf.extend_from_slice(&body.right_page_id.to_le_bytes());
470 buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
471 buf.extend_from_slice(&body.split_index.to_le_bytes());
472 buf.extend_from_slice(&body.right_next_page_id.to_le_bytes());
473 buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
474 for entry in &body.entries {
475 encode_leaf_split_entry(entry, &mut buf);
476 }
477 buf
478}
479
480fn decode_leaf_split(bytes: &[u8]) -> QuillSQLResult<IndexLeafSplitPayload> {
481 let (relation, mut offset) = decode_relation(bytes)?;
482 let required = offset + 4 + 4 + 4 + 2 + 4;
483 if bytes.len() < required {
484 return Err(QuillSQLError::Internal(
485 "Index leaf split payload too short".to_string(),
486 ));
487 }
488 let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
489 offset += 4;
490 let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
491 offset += 4;
492 let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
493 offset += 4;
494 let split_index = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
495 offset += 2;
496 let right_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
497 offset += 4;
498 if bytes.len() < offset + 4 {
499 return Err(QuillSQLError::Internal(
500 "Index leaf split entries length missing".to_string(),
501 ));
502 }
503 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
504 offset += 4;
505 let mut entries = Vec::with_capacity(entry_count);
506 let mut cursor = offset;
507 for _ in 0..entry_count {
508 if cursor >= bytes.len() {
509 return Err(QuillSQLError::Internal(
510 "Index leaf split entries truncated".to_string(),
511 ));
512 }
513 let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
514 entries.push(entry);
515 cursor += consumed;
516 }
517 Ok(IndexLeafSplitPayload {
518 relation,
519 left_page_id,
520 right_page_id,
521 leaf_max_size,
522 split_index,
523 right_next_page_id,
524 entries,
525 })
526}
527
528fn encode_leaf_merge(body: &IndexLeafMergePayload) -> Vec<u8> {
529 let mut buf = Vec::new();
530 encode_relation(&body.relation, &mut buf);
531 buf.extend_from_slice(&body.left_page_id.to_le_bytes());
532 buf.extend_from_slice(&body.right_page_id.to_le_bytes());
533 buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
534 buf.extend_from_slice(&body.left_next_page_id.to_le_bytes());
535 buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
536 for entry in &body.entries {
537 encode_leaf_split_entry(entry, &mut buf);
538 }
539 buf
540}
541
542fn decode_leaf_merge(bytes: &[u8]) -> QuillSQLResult<IndexLeafMergePayload> {
543 let (relation, mut offset) = decode_relation(bytes)?;
544 let required = offset + 4 + 4 + 4 + 4;
545 if bytes.len() < required {
546 return Err(QuillSQLError::Internal(
547 "Index leaf merge payload too short".to_string(),
548 ));
549 }
550 let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
551 offset += 4;
552 let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
553 offset += 4;
554 let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
555 offset += 4;
556 let left_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
557 offset += 4;
558 if bytes.len() < offset + 4 {
559 return Err(QuillSQLError::Internal(
560 "Index leaf merge entries length missing".to_string(),
561 ));
562 }
563 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
564 offset += 4;
565 let mut entries = Vec::with_capacity(entry_count);
566 let mut cursor = offset;
567 for _ in 0..entry_count {
568 if cursor >= bytes.len() {
569 return Err(QuillSQLError::Internal(
570 "Index leaf merge entries truncated".to_string(),
571 ));
572 }
573 let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
574 entries.push(entry);
575 cursor += consumed;
576 }
577 Ok(IndexLeafMergePayload {
578 relation,
579 left_page_id,
580 right_page_id,
581 leaf_max_size,
582 left_next_page_id,
583 entries,
584 })
585}
586
587fn encode_internal_entry(entry: &IndexInternalEntryPayload, buf: &mut Vec<u8>) {
588 encode_bytes(&entry.key_data, buf);
589 buf.extend_from_slice(&entry.child_page_id.to_le_bytes());
590}
591
592fn decode_internal_entry(bytes: &[u8]) -> QuillSQLResult<(IndexInternalEntryPayload, usize)> {
593 let (key_data, consumed_key) = decode_bytes(bytes)?;
594 if bytes.len() < consumed_key + 4 {
595 return Err(QuillSQLError::Internal(
596 "Index internal entry truncated".to_string(),
597 ));
598 }
599 let child_page_id =
600 u32::from_le_bytes(bytes[consumed_key..consumed_key + 4].try_into().unwrap());
601 Ok((
602 IndexInternalEntryPayload {
603 key_data,
604 child_page_id,
605 },
606 consumed_key + 4,
607 ))
608}
609
610fn encode_internal_split(body: &IndexInternalSplitPayload) -> Vec<u8> {
611 let mut buf = Vec::new();
612 encode_relation(&body.relation, &mut buf);
613 buf.extend_from_slice(&body.left_page_id.to_le_bytes());
614 buf.extend_from_slice(&body.left_new_size.to_le_bytes());
615 match &body.left_high_key {
616 Some(bytes) => {
617 buf.push(1);
618 encode_bytes(bytes, &mut buf);
619 }
620 None => buf.push(0),
621 }
622 buf.extend_from_slice(&body.left_next_page_id.to_le_bytes());
623 buf.extend_from_slice(&body.right_page_id.to_le_bytes());
624 buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
625 buf.extend_from_slice(&(body.right_entries.len() as u32).to_le_bytes());
626 for entry in &body.right_entries {
627 encode_internal_entry(entry, &mut buf);
628 }
629 match &body.right_high_key {
630 Some(bytes) => {
631 buf.push(1);
632 encode_bytes(bytes, &mut buf);
633 }
634 None => buf.push(0),
635 }
636 buf.extend_from_slice(&body.right_next_page_id.to_le_bytes());
637 buf
638}
639
640fn decode_internal_split(bytes: &[u8]) -> QuillSQLResult<IndexInternalSplitPayload> {
641 let (relation, mut offset) = decode_relation(bytes)?;
642 if bytes.len() < offset + 4 + 2 + 1 + 4 + 4 + 4 {
643 return Err(QuillSQLError::Internal(
644 "Index internal split payload too short".to_string(),
645 ));
646 }
647 let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
648 offset += 4;
649 let left_new_size = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
650 offset += 2;
651 let has_left_high = bytes[offset] != 0;
652 offset += 1;
653 let left_high_key = if has_left_high {
654 let (data, consumed) = decode_bytes(&bytes[offset..])?;
655 offset += consumed;
656 Some(data)
657 } else {
658 None
659 };
660 let left_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
661 offset += 4;
662 let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
663 offset += 4;
664 let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
665 offset += 4;
666 if bytes.len() < offset + 4 {
667 return Err(QuillSQLError::Internal(
668 "Index internal split entries length missing".to_string(),
669 ));
670 }
671 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
672 offset += 4;
673 let mut entries = Vec::with_capacity(entry_count);
674 let mut cursor = offset;
675 for _ in 0..entry_count {
676 if cursor >= bytes.len() {
677 return Err(QuillSQLError::Internal(
678 "Index internal split entries truncated".to_string(),
679 ));
680 }
681 let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
682 entries.push(entry);
683 cursor += consumed;
684 }
685 if bytes.len() <= cursor {
686 return Err(QuillSQLError::Internal(
687 "Index internal split missing right high key flag".to_string(),
688 ));
689 }
690 let has_right_high = bytes[cursor] != 0;
691 cursor += 1;
692 let right_high_key = if has_right_high {
693 let (data, consumed) = decode_bytes(&bytes[cursor..])?;
694 cursor += consumed;
695 Some(data)
696 } else {
697 None
698 };
699 if bytes.len() < cursor + 4 {
700 return Err(QuillSQLError::Internal(
701 "Index internal split missing right_next_page_id".to_string(),
702 ));
703 }
704 let right_next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
705 Ok(IndexInternalSplitPayload {
706 relation,
707 left_page_id,
708 left_new_size,
709 left_high_key,
710 left_next_page_id,
711 right_page_id,
712 internal_max_size,
713 right_entries: entries,
714 right_high_key,
715 right_next_page_id,
716 })
717}
718
719fn encode_internal_merge(body: &IndexInternalMergePayload) -> Vec<u8> {
720 let mut buf = Vec::new();
721 encode_relation(&body.relation, &mut buf);
722 buf.extend_from_slice(&body.left_page_id.to_le_bytes());
723 buf.extend_from_slice(&body.right_page_id.to_le_bytes());
724 buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
725 buf.extend_from_slice(&(body.left_entries.len() as u32).to_le_bytes());
726 for entry in &body.left_entries {
727 encode_internal_entry(entry, &mut buf);
728 }
729 match &body.high_key {
730 Some(bytes) => {
731 buf.push(1);
732 encode_bytes(bytes, &mut buf);
733 }
734 None => buf.push(0),
735 }
736 buf.extend_from_slice(&body.next_page_id.to_le_bytes());
737 buf
738}
739
740fn decode_internal_merge(bytes: &[u8]) -> QuillSQLResult<IndexInternalMergePayload> {
741 let (relation, mut offset) = decode_relation(bytes)?;
742 if bytes.len() < offset + 4 + 4 + 4 {
743 return Err(QuillSQLError::Internal(
744 "Index internal merge payload too short".to_string(),
745 ));
746 }
747 let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
748 offset += 4;
749 let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
750 offset += 4;
751 let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
752 offset += 4;
753 if bytes.len() < offset + 4 {
754 return Err(QuillSQLError::Internal(
755 "Index internal merge entry length missing".to_string(),
756 ));
757 }
758 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
759 offset += 4;
760 let mut left_entries = Vec::with_capacity(entry_count);
761 let mut cursor = offset;
762 for _ in 0..entry_count {
763 if cursor >= bytes.len() {
764 return Err(QuillSQLError::Internal(
765 "Index internal merge entries truncated".to_string(),
766 ));
767 }
768 let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
769 left_entries.push(entry);
770 cursor += consumed;
771 }
772 if bytes.len() <= cursor {
773 return Err(QuillSQLError::Internal(
774 "Index internal merge missing high key flag".to_string(),
775 ));
776 }
777 let has_high_key = bytes[cursor] != 0;
778 cursor += 1;
779 let high_key = if has_high_key {
780 let (data, consumed) = decode_bytes(&bytes[cursor..])?;
781 cursor += consumed;
782 Some(data)
783 } else {
784 None
785 };
786 if bytes.len() < cursor + 4 {
787 return Err(QuillSQLError::Internal(
788 "Index internal merge missing next_page_id".to_string(),
789 ));
790 }
791 let next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
792 Ok(IndexInternalMergePayload {
793 relation,
794 left_page_id,
795 right_page_id,
796 internal_max_size,
797 left_entries,
798 high_key,
799 next_page_id,
800 })
801}
802
803fn encode_parent_insert(body: &IndexParentInsertPayload) -> Vec<u8> {
804 let mut buf = Vec::new();
805 encode_relation(&body.relation, &mut buf);
806 buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
807 buf.extend_from_slice(&body.left_child_page_id.to_le_bytes());
808 buf.extend_from_slice(&body.right_child_page_id.to_le_bytes());
809 encode_bytes(&body.key_data, &mut buf);
810 buf
811}
812
813fn decode_parent_insert(bytes: &[u8]) -> QuillSQLResult<IndexParentInsertPayload> {
814 let (relation, mut offset) = decode_relation(bytes)?;
815 if bytes.len() < offset + 4 * 3 {
816 return Err(QuillSQLError::Internal(
817 "Index parent insert payload too short".to_string(),
818 ));
819 }
820 let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
821 offset += 4;
822 let left_child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
823 offset += 4;
824 let right_child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
825 offset += 4;
826 let (key_data, _) = decode_bytes(&bytes[offset..])?;
827 Ok(IndexParentInsertPayload {
828 relation,
829 parent_page_id,
830 left_child_page_id,
831 right_child_page_id,
832 key_data,
833 })
834}
835
836fn encode_parent_delete(body: &IndexParentDeletePayload) -> Vec<u8> {
837 let mut buf = Vec::new();
838 encode_relation(&body.relation, &mut buf);
839 buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
840 buf.extend_from_slice(&body.child_page_id.to_le_bytes());
841 buf
842}
843
844fn decode_parent_delete(bytes: &[u8]) -> QuillSQLResult<IndexParentDeletePayload> {
845 let (relation, mut offset) = decode_relation(bytes)?;
846 if bytes.len() < offset + 8 {
847 return Err(QuillSQLError::Internal(
848 "Index parent delete payload too short".to_string(),
849 ));
850 }
851 let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
852 offset += 4;
853 let child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
854 Ok(IndexParentDeletePayload {
855 relation,
856 parent_page_id,
857 child_page_id,
858 })
859}
860
861fn encode_parent_update(body: &IndexParentUpdatePayload) -> Vec<u8> {
862 let mut buf = Vec::new();
863 encode_relation(&body.relation, &mut buf);
864 buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
865 buf.extend_from_slice(&body.child_page_id.to_le_bytes());
866 encode_bytes(&body.key_data, &mut buf);
867 buf
868}
869
870fn decode_parent_update(bytes: &[u8]) -> QuillSQLResult<IndexParentUpdatePayload> {
871 let (relation, mut offset) = decode_relation(bytes)?;
872 if bytes.len() < offset + 8 {
873 return Err(QuillSQLError::Internal(
874 "Index parent update payload too short".to_string(),
875 ));
876 }
877 let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
878 offset += 4;
879 let child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
880 offset += 4;
881 let (key_data, _) = decode_bytes(&bytes[offset..])?;
882 Ok(IndexParentUpdatePayload {
883 relation,
884 parent_page_id,
885 child_page_id,
886 key_data,
887 })
888}
889
890fn encode_leaf_redistribute(body: &IndexLeafRedistributePayload) -> Vec<u8> {
891 let mut buf = Vec::new();
892 encode_relation(&body.relation, &mut buf);
893 buf.extend_from_slice(&body.from_page_id.to_le_bytes());
894 buf.extend_from_slice(&body.to_page_id.to_le_bytes());
895 buf.push(if body.from_is_left { 1 } else { 0 });
896 encode_leaf_split_entry(&body.moved_entry, &mut buf);
897 buf
898}
899
900fn decode_leaf_redistribute(bytes: &[u8]) -> QuillSQLResult<IndexLeafRedistributePayload> {
901 let (relation, mut offset) = decode_relation(bytes)?;
902 if bytes.len() < offset + 4 + 4 + 1 {
903 return Err(QuillSQLError::Internal(
904 "Index leaf redistribute payload too short".to_string(),
905 ));
906 }
907 let from_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
908 offset += 4;
909 let to_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
910 offset += 4;
911 let from_is_left = bytes[offset] != 0;
912 offset += 1;
913 let (moved_entry, _) = decode_leaf_split_entry(&bytes[offset..])?;
914 Ok(IndexLeafRedistributePayload {
915 relation,
916 from_page_id,
917 to_page_id,
918 from_is_left,
919 moved_entry,
920 })
921}
922
923fn encode_internal_redistribute(body: &IndexInternalRedistributePayload) -> Vec<u8> {
924 let mut buf = Vec::new();
925 encode_relation(&body.relation, &mut buf);
926 buf.extend_from_slice(&body.from_page_id.to_le_bytes());
927 buf.extend_from_slice(&body.to_page_id.to_le_bytes());
928 buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
929 buf.push(if body.from_is_left { 1 } else { 0 });
930 buf.extend_from_slice(&body.from_old_sentinel.to_le_bytes());
931 buf.extend_from_slice(&body.to_old_sentinel.to_le_bytes());
932 encode_internal_entry(&body.moved_entry, &mut buf);
933 encode_bytes(&body.separator_key_data, &mut buf);
934 match &body.to_new_high_key {
935 Some(bytes) => {
936 buf.push(1);
937 encode_bytes(bytes, &mut buf);
938 }
939 None => buf.push(0),
940 }
941 buf.extend_from_slice(&body.to_new_next_page_id.to_le_bytes());
942 match &body.from_new_high_key {
943 Some(bytes) => {
944 buf.push(1);
945 encode_bytes(bytes, &mut buf);
946 }
947 None => buf.push(0),
948 }
949 buf.extend_from_slice(&body.from_new_next_page_id.to_le_bytes());
950 buf
951}
952
953fn decode_internal_redistribute(bytes: &[u8]) -> QuillSQLResult<IndexInternalRedistributePayload> {
954 let (relation, mut offset) = decode_relation(bytes)?;
955 if bytes.len() < offset + 4 + 4 + 4 + 1 + 4 + 4 {
956 return Err(QuillSQLError::Internal(
957 "Index internal redistribute payload too short".to_string(),
958 ));
959 }
960 let from_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
961 offset += 4;
962 let to_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
963 offset += 4;
964 let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
965 offset += 4;
966 let from_is_left = bytes[offset] != 0;
967 offset += 1;
968 let from_old_sentinel = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
969 offset += 4;
970 let to_old_sentinel = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
971 offset += 4;
972 let (moved_entry, consumed_entry) = decode_internal_entry(&bytes[offset..])?;
973 offset += consumed_entry;
974 let (separator_key_data, consumed_sep) = decode_bytes(&bytes[offset..])?;
975 offset += consumed_sep;
976 if bytes.len() <= offset {
977 return Err(QuillSQLError::Internal(
978 "Index internal redistribute missing to high key flag".to_string(),
979 ));
980 }
981 let has_to_high = bytes[offset] != 0;
982 offset += 1;
983 let to_new_high_key = if has_to_high {
984 let (data, consumed) = decode_bytes(&bytes[offset..])?;
985 offset += consumed;
986 Some(data)
987 } else {
988 None
989 };
990 if bytes.len() < offset + 4 {
991 return Err(QuillSQLError::Internal(
992 "Index internal redistribute missing to next page".to_string(),
993 ));
994 }
995 let to_new_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
996 offset += 4;
997 if bytes.len() <= offset {
998 return Err(QuillSQLError::Internal(
999 "Index internal redistribute missing from high key flag".to_string(),
1000 ));
1001 }
1002 let has_from_high = bytes[offset] != 0;
1003 offset += 1;
1004 let from_new_high_key = if has_from_high {
1005 let (data, consumed) = decode_bytes(&bytes[offset..])?;
1006 offset += consumed;
1007 Some(data)
1008 } else {
1009 None
1010 };
1011 if bytes.len() < offset + 4 {
1012 return Err(QuillSQLError::Internal(
1013 "Index internal redistribute missing from next page".to_string(),
1014 ));
1015 }
1016 let from_new_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1017 Ok(IndexInternalRedistributePayload {
1018 relation,
1019 from_page_id,
1020 to_page_id,
1021 parent_page_id,
1022 from_is_left,
1023 from_old_sentinel,
1024 to_old_sentinel,
1025 moved_entry,
1026 separator_key_data,
1027 to_new_high_key,
1028 to_new_next_page_id,
1029 from_new_high_key,
1030 from_new_next_page_id,
1031 })
1032}
1033
1034fn encode_root_install_leaf(body: &IndexRootInstallLeafPayload) -> Vec<u8> {
1035 let mut buf = Vec::new();
1036 encode_relation(&body.relation, &mut buf);
1037 buf.extend_from_slice(&body.page_id.to_le_bytes());
1038 buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
1039 buf.extend_from_slice(&body.next_page_id.to_le_bytes());
1040 buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
1041 for entry in &body.entries {
1042 encode_leaf_split_entry(entry, &mut buf);
1043 }
1044 buf
1045}
1046
1047fn decode_root_install_leaf(bytes: &[u8]) -> QuillSQLResult<IndexRootInstallLeafPayload> {
1048 let (relation, mut offset) = decode_relation(bytes)?;
1049 if bytes.len() < offset + 4 + 4 + 4 {
1050 return Err(QuillSQLError::Internal(
1051 "Index root install leaf payload too short".to_string(),
1052 ));
1053 }
1054 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1055 offset += 4;
1056 let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1057 offset += 4;
1058 let next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1059 offset += 4;
1060 if bytes.len() < offset + 4 {
1061 return Err(QuillSQLError::Internal(
1062 "Index root install leaf entries length missing".to_string(),
1063 ));
1064 }
1065 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
1066 offset += 4;
1067 let mut entries = Vec::with_capacity(entry_count);
1068 let mut cursor = offset;
1069 for _ in 0..entry_count {
1070 if cursor >= bytes.len() {
1071 return Err(QuillSQLError::Internal(
1072 "Index root install leaf entries truncated".to_string(),
1073 ));
1074 }
1075 let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
1076 entries.push(entry);
1077 cursor += consumed;
1078 }
1079 Ok(IndexRootInstallLeafPayload {
1080 relation,
1081 page_id,
1082 leaf_max_size,
1083 next_page_id,
1084 entries,
1085 })
1086}
1087
1088fn encode_root_install_internal(body: &IndexRootInstallInternalPayload) -> Vec<u8> {
1089 let mut buf = Vec::new();
1090 encode_relation(&body.relation, &mut buf);
1091 buf.extend_from_slice(&body.page_id.to_le_bytes());
1092 buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
1093 buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
1094 for entry in &body.entries {
1095 encode_internal_entry(entry, &mut buf);
1096 }
1097 match &body.high_key {
1098 Some(bytes) => {
1099 buf.push(1);
1100 encode_bytes(bytes, &mut buf);
1101 }
1102 None => buf.push(0),
1103 }
1104 buf.extend_from_slice(&body.next_page_id.to_le_bytes());
1105 buf
1106}
1107
1108fn decode_root_install_internal(bytes: &[u8]) -> QuillSQLResult<IndexRootInstallInternalPayload> {
1109 let (relation, mut offset) = decode_relation(bytes)?;
1110 if bytes.len() < offset + 4 + 4 {
1111 return Err(QuillSQLError::Internal(
1112 "Index root install internal payload too short".to_string(),
1113 ));
1114 }
1115 let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1116 offset += 4;
1117 let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1118 offset += 4;
1119 if bytes.len() < offset + 4 {
1120 return Err(QuillSQLError::Internal(
1121 "Index root install internal entries length missing".to_string(),
1122 ));
1123 }
1124 let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
1125 offset += 4;
1126 let mut entries = Vec::with_capacity(entry_count);
1127 let mut cursor = offset;
1128 for _ in 0..entry_count {
1129 if cursor >= bytes.len() {
1130 return Err(QuillSQLError::Internal(
1131 "Index root install internal entries truncated".to_string(),
1132 ));
1133 }
1134 let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
1135 entries.push(entry);
1136 cursor += consumed;
1137 }
1138 if bytes.len() <= cursor {
1139 return Err(QuillSQLError::Internal(
1140 "Index root install internal missing high key flag".to_string(),
1141 ));
1142 }
1143 let has_high_key = bytes[cursor] != 0;
1144 cursor += 1;
1145 let high_key = if has_high_key {
1146 let (data, consumed) = decode_bytes(&bytes[cursor..])?;
1147 cursor += consumed;
1148 Some(data)
1149 } else {
1150 None
1151 };
1152 if bytes.len() < cursor + 4 {
1153 return Err(QuillSQLError::Internal(
1154 "Index root install internal missing next_page_id".to_string(),
1155 ));
1156 }
1157 let next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
1158 Ok(IndexRootInstallInternalPayload {
1159 relation,
1160 page_id,
1161 internal_max_size,
1162 entries,
1163 high_key,
1164 next_page_id,
1165 })
1166}
1167
1168fn encode_root_adopt(body: &IndexRootAdoptPayload) -> Vec<u8> {
1169 let mut buf = Vec::new();
1170 encode_relation(&body.relation, &mut buf);
1171 buf.extend_from_slice(&body.new_root_page_id.to_le_bytes());
1172 buf
1173}
1174
1175fn decode_root_adopt(bytes: &[u8]) -> QuillSQLResult<IndexRootAdoptPayload> {
1176 let (relation, offset) = decode_relation(bytes)?;
1177 if bytes.len() < offset + 4 {
1178 return Err(QuillSQLError::Internal(
1179 "Index root adopt payload too short".to_string(),
1180 ));
1181 }
1182 let new_root_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1183 Ok(IndexRootAdoptPayload {
1184 relation,
1185 new_root_page_id,
1186 })
1187}
1188
1189fn encode_root_reset(body: &IndexRootResetPayload) -> Vec<u8> {
1190 let mut buf = Vec::new();
1191 encode_relation(&body.relation, &mut buf);
1192 buf
1193}
1194
1195fn decode_root_reset(bytes: &[u8]) -> QuillSQLResult<IndexRootResetPayload> {
1196 let (relation, _) = decode_relation(bytes)?;
1197 Ok(IndexRootResetPayload { relation })
1198}
1199
1200fn encode_relation(relation: &IndexRelationIdent, buf: &mut Vec<u8>) {
1201 buf.extend_from_slice(&relation.header_page_id.to_le_bytes());
1202 encode_schema(&relation.schema, buf);
1203}
1204
1205fn decode_relation(bytes: &[u8]) -> QuillSQLResult<(IndexRelationIdent, usize)> {
1206 if bytes.len() < 4 {
1207 return Err(QuillSQLError::Internal(
1208 "Index relation ident too short".to_string(),
1209 ));
1210 }
1211 let header_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
1212 let (schema, consumed) = decode_schema(&bytes[4..])?;
1213 Ok((
1214 IndexRelationIdent {
1215 header_page_id,
1216 schema,
1217 },
1218 4 + consumed,
1219 ))
1220}
1221
1222fn encode_schema(schema: &SchemaRepr, buf: &mut Vec<u8>) {
1223 buf.extend(CommonCodec::encode_u16(schema.columns.len() as u16));
1224 for column in &schema.columns {
1225 encode_string(&column.name, buf);
1226 encode_data_type(column.data_type, buf);
1227 buf.push(column.nullable as u8);
1228 }
1229}
1230
1231fn encode_data_type(data_type: DataType, buf: &mut Vec<u8>) {
1232 match data_type {
1233 DataType::Boolean => buf.push(0),
1234 DataType::Int8 => buf.push(1),
1235 DataType::Int16 => buf.push(2),
1236 DataType::Int32 => buf.push(3),
1237 DataType::Int64 => buf.push(4),
1238 DataType::UInt8 => buf.push(5),
1239 DataType::UInt16 => buf.push(6),
1240 DataType::UInt32 => buf.push(7),
1241 DataType::UInt64 => buf.push(8),
1242 DataType::Float32 => buf.push(9),
1243 DataType::Float64 => buf.push(10),
1244 DataType::Varchar(len) => {
1245 buf.push(11);
1246 match len {
1247 Some(n) => {
1248 buf.push(1);
1249 buf.extend(CommonCodec::encode_u32(n as u32));
1250 }
1251 None => buf.push(0),
1252 }
1253 }
1254 }
1255}
1256
1257fn decode_schema(bytes: &[u8]) -> QuillSQLResult<(SchemaRepr, usize)> {
1258 let (col_count, mut offset) = CommonCodec::decode_u16(bytes)?;
1259 let mut columns = Vec::with_capacity(col_count as usize);
1260 for _ in 0..col_count {
1261 let (name, consumed_name) = decode_string(&bytes[offset..])?;
1262 offset += consumed_name;
1263 let (data_type, consumed_dt) = decode_data_type(&bytes[offset..])?;
1264 offset += consumed_dt;
1265 if offset >= bytes.len() {
1266 return Err(QuillSQLError::Internal(
1267 "Schema payload truncated (nullable)".to_string(),
1268 ));
1269 }
1270 let nullable = bytes[offset] != 0;
1271 offset += 1;
1272 columns.push(ColumnRepr {
1273 name,
1274 data_type,
1275 nullable,
1276 });
1277 }
1278 Ok((SchemaRepr { columns }, offset))
1279}
1280
1281fn decode_data_type(bytes: &[u8]) -> QuillSQLResult<(DataType, usize)> {
1282 if bytes.is_empty() {
1283 return Err(QuillSQLError::Internal(
1284 "Schema payload truncated (datatype tag)".to_string(),
1285 ));
1286 }
1287 let tag = bytes[0];
1288 let mut consumed = 1;
1289 let data_type = match tag {
1290 0 => DataType::Boolean,
1291 1 => DataType::Int8,
1292 2 => DataType::Int16,
1293 3 => DataType::Int32,
1294 4 => DataType::Int64,
1295 5 => DataType::UInt8,
1296 6 => DataType::UInt16,
1297 7 => DataType::UInt32,
1298 8 => DataType::UInt64,
1299 9 => DataType::Float32,
1300 10 => DataType::Float64,
1301 11 => {
1302 if bytes.len() < consumed + 1 {
1303 return Err(QuillSQLError::Internal(
1304 "Schema payload truncated (varchar flag)".to_string(),
1305 ));
1306 }
1307 let has_len = bytes[consumed] != 0;
1308 consumed += 1;
1309 if has_len {
1310 let (len, len_consumed) = CommonCodec::decode_u32(&bytes[consumed..])?;
1311 consumed += len_consumed;
1312 DataType::Varchar(Some(len as usize))
1313 } else {
1314 DataType::Varchar(None)
1315 }
1316 }
1317 other => {
1318 return Err(QuillSQLError::Internal(format!(
1319 "Unknown datatype tag {} in schema repr",
1320 other
1321 )))
1322 }
1323 };
1324 Ok((data_type, consumed))
1325}
1326
1327fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
1328 buf.extend(CommonCodec::encode_u32(data.len() as u32));
1329 buf.extend_from_slice(data);
1330}
1331
1332fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
1333 let (len, mut offset) = CommonCodec::decode_u32(bytes)?;
1334 let len = len as usize;
1335 if bytes.len() < offset + len {
1336 return Err(QuillSQLError::Internal(
1337 "Index WAL payload truncated while decoding bytes".to_string(),
1338 ));
1339 }
1340 let data = bytes[offset..offset + len].to_vec();
1341 offset += len;
1342 Ok((data, offset))
1343}
1344
1345fn encode_string(value: &str, buf: &mut Vec<u8>) {
1346 let bytes = value.as_bytes();
1347 buf.extend(CommonCodec::encode_u16(bytes.len() as u16));
1348 buf.extend_from_slice(bytes);
1349}
1350
1351fn decode_string(bytes: &[u8]) -> QuillSQLResult<(String, usize)> {
1352 let (len, mut offset) = CommonCodec::decode_u16(bytes)?;
1353 let len = len as usize;
1354 if bytes.len() < offset + len {
1355 return Err(QuillSQLError::Internal(
1356 "Index WAL payload truncated while decoding string".to_string(),
1357 ));
1358 }
1359 let value = std::str::from_utf8(&bytes[offset..offset + len])
1360 .map_err(|e| QuillSQLError::Internal(format!("Invalid utf8 in schema repr: {}", e)))?
1361 .to_string();
1362 offset += len;
1363 Ok((value, offset))
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use super::*;
1369 use crate::catalog::{Column, DataType, Schema};
1370 use crate::storage::codec::TupleCodec;
1371 use crate::storage::tuple::Tuple;
1372 use crate::utils::scalar::ScalarValue;
1373 use std::sync::Arc;
1374
1375 fn schema() -> SchemaRef {
1376 Arc::new(Schema::new(vec![Column::new("k", DataType::Int32, false)]))
1377 }
1378
1379 fn roundtrip(payload: IndexRecordPayload, kind: IndexRecordKind) {
1380 let (info, bytes) = encode_index_record(&payload);
1381 assert_eq!(info, kind as u8);
1382 let decoded = decode_index_record(&bytes, info).unwrap();
1383 match (payload, decoded) {
1384 (IndexRecordPayload::LeafInsert(a), IndexRecordPayload::LeafInsert(b)) => {
1385 assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1386 assert_eq!(a.page_id, b.page_id);
1387 assert_eq!(a.op_txn_id, b.op_txn_id);
1388 assert_eq!(a.key_data, b.key_data);
1389 assert_eq!(a.rid, b.rid);
1390 }
1391 (IndexRecordPayload::LeafDelete(a), IndexRecordPayload::LeafDelete(b)) => {
1392 assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1393 assert_eq!(a.page_id, b.page_id);
1394 assert_eq!(a.op_txn_id, b.op_txn_id);
1395 assert_eq!(a.key_data, b.key_data);
1396 assert_eq!(a.old_rid, b.old_rid);
1397 }
1398 (
1399 IndexRecordPayload::RootInstallInternal(a),
1400 IndexRecordPayload::RootInstallInternal(b),
1401 ) => {
1402 assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1403 assert_eq!(a.page_id, b.page_id);
1404 assert_eq!(a.internal_max_size, b.internal_max_size);
1405 assert_eq!(a.entries.len(), b.entries.len());
1406 assert_eq!(a.high_key, b.high_key);
1407 assert_eq!(a.next_page_id, b.next_page_id);
1408 }
1409 (lhs, rhs) => panic!("payload variant mismatch: {:?} vs {:?}", lhs, rhs),
1410 }
1411 }
1412
1413 #[test]
1414 fn leaf_insert_roundtrip() {
1415 let schema = schema();
1416 let payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
1417 relation: IndexRelationIdent {
1418 header_page_id: 99,
1419 schema: SchemaRepr::from(schema.clone()),
1420 },
1421 page_id: 7,
1422 op_txn_id: 1,
1423 key_data: TupleCodec::encode(&Tuple::new(schema, vec![ScalarValue::Int32(Some(3))])),
1424 rid: RecordId::new(4, 2),
1425 });
1426 roundtrip(payload, IndexRecordKind::LeafInsert);
1427 }
1428
1429 #[test]
1430 fn leaf_delete_roundtrip() {
1431 let schema = schema();
1432 let payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
1433 relation: IndexRelationIdent {
1434 header_page_id: 3,
1435 schema: SchemaRepr::from(schema.clone()),
1436 },
1437 page_id: 2,
1438 op_txn_id: 4,
1439 key_data: TupleCodec::encode(&Tuple::new(schema, vec![ScalarValue::Int32(Some(5))])),
1440 old_rid: RecordId::new(9, 1),
1441 });
1442 roundtrip(payload, IndexRecordKind::LeafDelete);
1443 }
1444
1445 #[test]
1446 fn root_install_internal_roundtrip() {
1447 let schema = schema();
1448 let payload = IndexRecordPayload::RootInstallInternal(IndexRootInstallInternalPayload {
1449 relation: IndexRelationIdent {
1450 header_page_id: 5,
1451 schema: SchemaRepr::from(schema.clone()),
1452 },
1453 page_id: 6,
1454 internal_max_size: 8,
1455 entries: vec![IndexInternalEntryPayload {
1456 key_data: TupleCodec::encode(&Tuple::new(
1457 schema.clone(),
1458 vec![ScalarValue::Int32(Some(10))],
1459 )),
1460 child_page_id: 11,
1461 }],
1462 high_key: None,
1463 next_page_id: 12,
1464 });
1465 roundtrip(payload, IndexRecordKind::RootInstallInternal);
1466 }
1467}