quill_sql/storage/index/
wal_codec.rs

1use crate::buffer::PageId;
2use crate::catalog::{Column, DataType, Schema, SchemaRef};
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, RidCodec};
5use crate::storage::page::RecordId;
6use crate::transaction::TransactionId;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexRelationIdent {
11    pub header_page_id: PageId,
12    pub schema: SchemaRepr,
13}
14
15impl IndexRelationIdent {
16    pub fn schema_ref(&self) -> SchemaRef {
17        self.schema.to_schema_ref()
18    }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct SchemaRepr {
23    pub columns: Vec<ColumnRepr>,
24}
25
26impl From<SchemaRef> for SchemaRepr {
27    fn from(schema: SchemaRef) -> Self {
28        let columns = schema
29            .columns
30            .iter()
31            .map(|col| ColumnRepr {
32                name: col.name.clone(),
33                data_type: col.data_type,
34                nullable: col.nullable,
35            })
36            .collect();
37        Self { columns }
38    }
39}
40
41impl SchemaRepr {
42    pub fn to_schema_ref(&self) -> SchemaRef {
43        let cols = self
44            .columns
45            .iter()
46            .map(|c| Column::new(&c.name, c.data_type, c.nullable))
47            .collect::<Vec<_>>();
48        Arc::new(Schema::new(cols))
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ColumnRepr {
54    pub name: String,
55    pub data_type: DataType,
56    pub nullable: bool,
57}
58
59#[derive(Debug, Clone)]
60pub struct IndexLeafInsertPayload {
61    pub relation: IndexRelationIdent,
62    pub page_id: PageId,
63    pub op_txn_id: TransactionId,
64    pub key_data: Vec<u8>,
65    pub rid: RecordId,
66}
67
68#[derive(Debug, Clone)]
69pub struct IndexLeafDeletePayload {
70    pub relation: IndexRelationIdent,
71    pub page_id: PageId,
72    pub op_txn_id: TransactionId,
73    pub key_data: Vec<u8>,
74    pub old_rid: RecordId,
75}
76
77#[derive(Debug, Clone)]
78pub struct IndexLeafSplitEntryPayload {
79    pub key_data: Vec<u8>,
80    pub rid: RecordId,
81}
82
83#[derive(Debug, Clone)]
84pub struct IndexLeafSplitPayload {
85    pub relation: IndexRelationIdent,
86    pub left_page_id: PageId,
87    pub right_page_id: PageId,
88    pub leaf_max_size: u32,
89    pub split_index: u16,
90    pub right_next_page_id: PageId,
91    pub entries: Vec<IndexLeafSplitEntryPayload>,
92}
93
94#[derive(Debug, Clone)]
95pub struct IndexLeafMergePayload {
96    pub relation: IndexRelationIdent,
97    pub left_page_id: PageId,
98    pub right_page_id: PageId,
99    pub leaf_max_size: u32,
100    pub left_next_page_id: PageId,
101    pub entries: Vec<IndexLeafSplitEntryPayload>,
102}
103
104#[derive(Debug, Clone)]
105pub struct IndexInternalEntryPayload {
106    pub key_data: Vec<u8>,
107    pub child_page_id: PageId,
108}
109
110#[derive(Debug, Clone)]
111pub struct IndexInternalSplitPayload {
112    pub relation: IndexRelationIdent,
113    pub left_page_id: PageId,
114    pub left_new_size: u16,
115    pub left_high_key: Option<Vec<u8>>,
116    pub left_next_page_id: PageId,
117    pub right_page_id: PageId,
118    pub internal_max_size: u32,
119    pub right_entries: Vec<IndexInternalEntryPayload>,
120    pub right_high_key: Option<Vec<u8>>,
121    pub right_next_page_id: PageId,
122}
123
124#[derive(Debug, Clone)]
125pub struct IndexInternalMergePayload {
126    pub relation: IndexRelationIdent,
127    pub left_page_id: PageId,
128    pub right_page_id: PageId,
129    pub internal_max_size: u32,
130    pub left_entries: Vec<IndexInternalEntryPayload>,
131    pub high_key: Option<Vec<u8>>,
132    pub next_page_id: PageId,
133}
134
135#[derive(Debug, Clone)]
136pub struct IndexParentInsertPayload {
137    pub relation: IndexRelationIdent,
138    pub parent_page_id: PageId,
139    pub left_child_page_id: PageId,
140    pub right_child_page_id: PageId,
141    pub key_data: Vec<u8>,
142}
143
144#[derive(Debug, Clone)]
145pub struct IndexParentDeletePayload {
146    pub relation: IndexRelationIdent,
147    pub parent_page_id: PageId,
148    pub child_page_id: PageId,
149}
150
151#[derive(Debug, Clone)]
152pub struct IndexParentUpdatePayload {
153    pub relation: IndexRelationIdent,
154    pub parent_page_id: PageId,
155    pub child_page_id: PageId,
156    pub key_data: Vec<u8>,
157}
158
159#[derive(Debug, Clone)]
160pub struct IndexLeafRedistributePayload {
161    pub relation: IndexRelationIdent,
162    pub from_page_id: PageId,
163    pub to_page_id: PageId,
164    pub from_is_left: bool,
165    pub moved_entry: IndexLeafSplitEntryPayload,
166}
167
168#[derive(Debug, Clone)]
169pub struct IndexInternalRedistributePayload {
170    pub relation: IndexRelationIdent,
171    pub from_page_id: PageId,
172    pub to_page_id: PageId,
173    pub parent_page_id: PageId,
174    pub from_is_left: bool,
175    pub from_old_sentinel: PageId,
176    pub to_old_sentinel: PageId,
177    pub moved_entry: IndexInternalEntryPayload,
178    pub separator_key_data: Vec<u8>,
179    pub to_new_high_key: Option<Vec<u8>>,
180    pub to_new_next_page_id: PageId,
181    pub from_new_high_key: Option<Vec<u8>>,
182    pub from_new_next_page_id: PageId,
183}
184
185#[derive(Debug, Clone)]
186pub struct IndexRootInstallLeafPayload {
187    pub relation: IndexRelationIdent,
188    pub page_id: PageId,
189    pub leaf_max_size: u32,
190    pub next_page_id: PageId,
191    pub entries: Vec<IndexLeafSplitEntryPayload>,
192}
193
194#[derive(Debug, Clone)]
195pub struct IndexRootInstallInternalPayload {
196    pub relation: IndexRelationIdent,
197    pub page_id: PageId,
198    pub internal_max_size: u32,
199    pub entries: Vec<IndexInternalEntryPayload>,
200    pub high_key: Option<Vec<u8>>,
201    pub next_page_id: PageId,
202}
203
204#[derive(Debug, Clone)]
205pub struct IndexRootAdoptPayload {
206    pub relation: IndexRelationIdent,
207    pub new_root_page_id: PageId,
208}
209
210#[derive(Debug, Clone)]
211pub struct IndexRootResetPayload {
212    pub relation: IndexRelationIdent,
213}
214
215#[derive(Debug, Clone)]
216pub enum IndexRecordPayload {
217    LeafInsert(IndexLeafInsertPayload),
218    LeafDelete(IndexLeafDeletePayload),
219    LeafSplit(IndexLeafSplitPayload),
220    InternalSplit(IndexInternalSplitPayload),
221    ParentInsert(IndexParentInsertPayload),
222    LeafMerge(IndexLeafMergePayload),
223    InternalMerge(IndexInternalMergePayload),
224    ParentDelete(IndexParentDeletePayload),
225    ParentUpdate(IndexParentUpdatePayload),
226    LeafRedistribute(IndexLeafRedistributePayload),
227    InternalRedistribute(IndexInternalRedistributePayload),
228    RootInstallLeaf(IndexRootInstallLeafPayload),
229    RootInstallInternal(IndexRootInstallInternalPayload),
230    RootAdopt(IndexRootAdoptPayload),
231    RootReset(IndexRootResetPayload),
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235#[repr(u8)]
236pub enum IndexRecordKind {
237    LeafInsert = 1,
238    LeafDelete = 2,
239    LeafSplit = 3,
240    InternalSplit = 4,
241    ParentInsert = 5,
242    LeafMerge = 6,
243    InternalMerge = 7,
244    ParentDelete = 8,
245    ParentUpdate = 9,
246    LeafRedistribute = 10,
247    InternalRedistribute = 11,
248    RootInstallLeaf = 12,
249    RootInstallInternal = 13,
250    RootAdopt = 14,
251    RootReset = 15,
252}
253
254impl TryFrom<u8> for IndexRecordKind {
255    type Error = QuillSQLError;
256
257    fn try_from(value: u8) -> Result<Self, Self::Error> {
258        match value {
259            1 => Ok(IndexRecordKind::LeafInsert),
260            2 => Ok(IndexRecordKind::LeafDelete),
261            3 => Ok(IndexRecordKind::LeafSplit),
262            4 => Ok(IndexRecordKind::InternalSplit),
263            5 => Ok(IndexRecordKind::ParentInsert),
264            6 => Ok(IndexRecordKind::LeafMerge),
265            7 => Ok(IndexRecordKind::InternalMerge),
266            8 => Ok(IndexRecordKind::ParentDelete),
267            9 => Ok(IndexRecordKind::ParentUpdate),
268            10 => Ok(IndexRecordKind::LeafRedistribute),
269            11 => Ok(IndexRecordKind::InternalRedistribute),
270            12 => Ok(IndexRecordKind::RootInstallLeaf),
271            13 => Ok(IndexRecordKind::RootInstallInternal),
272            14 => Ok(IndexRecordKind::RootAdopt),
273            15 => Ok(IndexRecordKind::RootReset),
274            other => Err(QuillSQLError::Internal(format!(
275                "Unknown index record kind: {}",
276                other
277            ))),
278        }
279    }
280}
281
282pub fn encode_index_record(payload: &IndexRecordPayload) -> (u8, Vec<u8>) {
283    match payload {
284        IndexRecordPayload::LeafInsert(body) => {
285            (IndexRecordKind::LeafInsert as u8, encode_leaf_insert(body))
286        }
287        IndexRecordPayload::LeafDelete(body) => {
288            (IndexRecordKind::LeafDelete as u8, encode_leaf_delete(body))
289        }
290        IndexRecordPayload::LeafSplit(body) => {
291            (IndexRecordKind::LeafSplit as u8, encode_leaf_split(body))
292        }
293        IndexRecordPayload::InternalSplit(body) => (
294            IndexRecordKind::InternalSplit as u8,
295            encode_internal_split(body),
296        ),
297        IndexRecordPayload::ParentInsert(body) => (
298            IndexRecordKind::ParentInsert as u8,
299            encode_parent_insert(body),
300        ),
301        IndexRecordPayload::LeafMerge(body) => {
302            (IndexRecordKind::LeafMerge as u8, encode_leaf_merge(body))
303        }
304        IndexRecordPayload::InternalMerge(body) => (
305            IndexRecordKind::InternalMerge as u8,
306            encode_internal_merge(body),
307        ),
308        IndexRecordPayload::ParentDelete(body) => (
309            IndexRecordKind::ParentDelete as u8,
310            encode_parent_delete(body),
311        ),
312        IndexRecordPayload::ParentUpdate(body) => (
313            IndexRecordKind::ParentUpdate as u8,
314            encode_parent_update(body),
315        ),
316        IndexRecordPayload::LeafRedistribute(body) => (
317            IndexRecordKind::LeafRedistribute as u8,
318            encode_leaf_redistribute(body),
319        ),
320        IndexRecordPayload::InternalRedistribute(body) => (
321            IndexRecordKind::InternalRedistribute as u8,
322            encode_internal_redistribute(body),
323        ),
324        IndexRecordPayload::RootInstallLeaf(body) => (
325            IndexRecordKind::RootInstallLeaf as u8,
326            encode_root_install_leaf(body),
327        ),
328        IndexRecordPayload::RootInstallInternal(body) => (
329            IndexRecordKind::RootInstallInternal as u8,
330            encode_root_install_internal(body),
331        ),
332        IndexRecordPayload::RootAdopt(body) => {
333            (IndexRecordKind::RootAdopt as u8, encode_root_adopt(body))
334        }
335        IndexRecordPayload::RootReset(body) => {
336            (IndexRecordKind::RootReset as u8, encode_root_reset(body))
337        }
338    }
339}
340
341pub fn decode_index_record(bytes: &[u8], info: u8) -> QuillSQLResult<IndexRecordPayload> {
342    match IndexRecordKind::try_from(info)? {
343        IndexRecordKind::LeafInsert => {
344            Ok(IndexRecordPayload::LeafInsert(decode_leaf_insert(bytes)?))
345        }
346        IndexRecordKind::LeafDelete => {
347            Ok(IndexRecordPayload::LeafDelete(decode_leaf_delete(bytes)?))
348        }
349        IndexRecordKind::LeafSplit => Ok(IndexRecordPayload::LeafSplit(decode_leaf_split(bytes)?)),
350        IndexRecordKind::InternalSplit => Ok(IndexRecordPayload::InternalSplit(
351            decode_internal_split(bytes)?,
352        )),
353        IndexRecordKind::ParentInsert => Ok(IndexRecordPayload::ParentInsert(
354            decode_parent_insert(bytes)?,
355        )),
356        IndexRecordKind::LeafMerge => Ok(IndexRecordPayload::LeafMerge(decode_leaf_merge(bytes)?)),
357        IndexRecordKind::InternalMerge => Ok(IndexRecordPayload::InternalMerge(
358            decode_internal_merge(bytes)?,
359        )),
360        IndexRecordKind::ParentDelete => Ok(IndexRecordPayload::ParentDelete(
361            decode_parent_delete(bytes)?,
362        )),
363        IndexRecordKind::ParentUpdate => Ok(IndexRecordPayload::ParentUpdate(
364            decode_parent_update(bytes)?,
365        )),
366        IndexRecordKind::LeafRedistribute => Ok(IndexRecordPayload::LeafRedistribute(
367            decode_leaf_redistribute(bytes)?,
368        )),
369        IndexRecordKind::InternalRedistribute => Ok(IndexRecordPayload::InternalRedistribute(
370            decode_internal_redistribute(bytes)?,
371        )),
372        IndexRecordKind::RootInstallLeaf => Ok(IndexRecordPayload::RootInstallLeaf(
373            decode_root_install_leaf(bytes)?,
374        )),
375        IndexRecordKind::RootInstallInternal => Ok(IndexRecordPayload::RootInstallInternal(
376            decode_root_install_internal(bytes)?,
377        )),
378        IndexRecordKind::RootAdopt => Ok(IndexRecordPayload::RootAdopt(decode_root_adopt(bytes)?)),
379        IndexRecordKind::RootReset => Ok(IndexRecordPayload::RootReset(decode_root_reset(bytes)?)),
380    }
381}
382
383fn encode_leaf_insert(body: &IndexLeafInsertPayload) -> Vec<u8> {
384    let mut buf = Vec::new();
385    encode_relation(&body.relation, &mut buf);
386    buf.extend_from_slice(&body.page_id.to_le_bytes());
387    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
388    encode_bytes(&body.key_data, &mut buf);
389    buf.extend(RidCodec::encode(&body.rid));
390    buf
391}
392
393fn decode_leaf_insert(bytes: &[u8]) -> QuillSQLResult<IndexLeafInsertPayload> {
394    let (relation, mut offset) = decode_relation(bytes)?;
395    if bytes.len() < offset + 4 + 8 {
396        return Err(QuillSQLError::Internal(
397            "Index leaf insert payload too short".to_string(),
398        ));
399    }
400    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
401    offset += 4;
402    let op_txn_id =
403        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
404    offset += 8;
405    let (key_data, consumed) = decode_bytes(&bytes[offset..])?;
406    offset += consumed;
407    let (rid, _) = RidCodec::decode(&bytes[offset..])?;
408    Ok(IndexLeafInsertPayload {
409        relation,
410        page_id,
411        op_txn_id,
412        key_data,
413        rid,
414    })
415}
416
417fn encode_leaf_delete(body: &IndexLeafDeletePayload) -> Vec<u8> {
418    let mut buf = Vec::new();
419    encode_relation(&body.relation, &mut buf);
420    buf.extend_from_slice(&body.page_id.to_le_bytes());
421    buf.extend_from_slice(&body.op_txn_id.to_le_bytes());
422    encode_bytes(&body.key_data, &mut buf);
423    buf.extend(RidCodec::encode(&body.old_rid));
424    buf
425}
426
427fn decode_leaf_delete(bytes: &[u8]) -> QuillSQLResult<IndexLeafDeletePayload> {
428    let (relation, mut offset) = decode_relation(bytes)?;
429    if bytes.len() < offset + 4 + 8 {
430        return Err(QuillSQLError::Internal(
431            "Index leaf delete payload too short".to_string(),
432        ));
433    }
434    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
435    offset += 4;
436    let op_txn_id =
437        u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as TransactionId;
438    offset += 8;
439    let (key_data, consumed) = decode_bytes(&bytes[offset..])?;
440    offset += consumed;
441    let (old_rid, _) = RidCodec::decode(&bytes[offset..])?;
442    Ok(IndexLeafDeletePayload {
443        relation,
444        page_id,
445        op_txn_id,
446        key_data,
447        old_rid,
448    })
449}
450
451fn encode_leaf_split_entry(entry: &IndexLeafSplitEntryPayload, buf: &mut Vec<u8>) {
452    encode_bytes(&entry.key_data, buf);
453    buf.extend(RidCodec::encode(&entry.rid));
454}
455
456fn decode_leaf_split_entry(bytes: &[u8]) -> QuillSQLResult<(IndexLeafSplitEntryPayload, usize)> {
457    let (key_data, consumed_key) = decode_bytes(bytes)?;
458    let (rid, consumed_rid) = RidCodec::decode(&bytes[consumed_key..])?;
459    Ok((
460        IndexLeafSplitEntryPayload { key_data, rid },
461        consumed_key + consumed_rid,
462    ))
463}
464
465fn encode_leaf_split(body: &IndexLeafSplitPayload) -> Vec<u8> {
466    let mut buf = Vec::new();
467    encode_relation(&body.relation, &mut buf);
468    buf.extend_from_slice(&body.left_page_id.to_le_bytes());
469    buf.extend_from_slice(&body.right_page_id.to_le_bytes());
470    buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
471    buf.extend_from_slice(&body.split_index.to_le_bytes());
472    buf.extend_from_slice(&body.right_next_page_id.to_le_bytes());
473    buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
474    for entry in &body.entries {
475        encode_leaf_split_entry(entry, &mut buf);
476    }
477    buf
478}
479
480fn decode_leaf_split(bytes: &[u8]) -> QuillSQLResult<IndexLeafSplitPayload> {
481    let (relation, mut offset) = decode_relation(bytes)?;
482    let required = offset + 4 + 4 + 4 + 2 + 4;
483    if bytes.len() < required {
484        return Err(QuillSQLError::Internal(
485            "Index leaf split payload too short".to_string(),
486        ));
487    }
488    let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
489    offset += 4;
490    let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
491    offset += 4;
492    let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
493    offset += 4;
494    let split_index = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
495    offset += 2;
496    let right_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
497    offset += 4;
498    if bytes.len() < offset + 4 {
499        return Err(QuillSQLError::Internal(
500            "Index leaf split entries length missing".to_string(),
501        ));
502    }
503    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
504    offset += 4;
505    let mut entries = Vec::with_capacity(entry_count);
506    let mut cursor = offset;
507    for _ in 0..entry_count {
508        if cursor >= bytes.len() {
509            return Err(QuillSQLError::Internal(
510                "Index leaf split entries truncated".to_string(),
511            ));
512        }
513        let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
514        entries.push(entry);
515        cursor += consumed;
516    }
517    Ok(IndexLeafSplitPayload {
518        relation,
519        left_page_id,
520        right_page_id,
521        leaf_max_size,
522        split_index,
523        right_next_page_id,
524        entries,
525    })
526}
527
528fn encode_leaf_merge(body: &IndexLeafMergePayload) -> Vec<u8> {
529    let mut buf = Vec::new();
530    encode_relation(&body.relation, &mut buf);
531    buf.extend_from_slice(&body.left_page_id.to_le_bytes());
532    buf.extend_from_slice(&body.right_page_id.to_le_bytes());
533    buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
534    buf.extend_from_slice(&body.left_next_page_id.to_le_bytes());
535    buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
536    for entry in &body.entries {
537        encode_leaf_split_entry(entry, &mut buf);
538    }
539    buf
540}
541
542fn decode_leaf_merge(bytes: &[u8]) -> QuillSQLResult<IndexLeafMergePayload> {
543    let (relation, mut offset) = decode_relation(bytes)?;
544    let required = offset + 4 + 4 + 4 + 4;
545    if bytes.len() < required {
546        return Err(QuillSQLError::Internal(
547            "Index leaf merge payload too short".to_string(),
548        ));
549    }
550    let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
551    offset += 4;
552    let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
553    offset += 4;
554    let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
555    offset += 4;
556    let left_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
557    offset += 4;
558    if bytes.len() < offset + 4 {
559        return Err(QuillSQLError::Internal(
560            "Index leaf merge entries length missing".to_string(),
561        ));
562    }
563    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
564    offset += 4;
565    let mut entries = Vec::with_capacity(entry_count);
566    let mut cursor = offset;
567    for _ in 0..entry_count {
568        if cursor >= bytes.len() {
569            return Err(QuillSQLError::Internal(
570                "Index leaf merge entries truncated".to_string(),
571            ));
572        }
573        let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
574        entries.push(entry);
575        cursor += consumed;
576    }
577    Ok(IndexLeafMergePayload {
578        relation,
579        left_page_id,
580        right_page_id,
581        leaf_max_size,
582        left_next_page_id,
583        entries,
584    })
585}
586
587fn encode_internal_entry(entry: &IndexInternalEntryPayload, buf: &mut Vec<u8>) {
588    encode_bytes(&entry.key_data, buf);
589    buf.extend_from_slice(&entry.child_page_id.to_le_bytes());
590}
591
592fn decode_internal_entry(bytes: &[u8]) -> QuillSQLResult<(IndexInternalEntryPayload, usize)> {
593    let (key_data, consumed_key) = decode_bytes(bytes)?;
594    if bytes.len() < consumed_key + 4 {
595        return Err(QuillSQLError::Internal(
596            "Index internal entry truncated".to_string(),
597        ));
598    }
599    let child_page_id =
600        u32::from_le_bytes(bytes[consumed_key..consumed_key + 4].try_into().unwrap());
601    Ok((
602        IndexInternalEntryPayload {
603            key_data,
604            child_page_id,
605        },
606        consumed_key + 4,
607    ))
608}
609
610fn encode_internal_split(body: &IndexInternalSplitPayload) -> Vec<u8> {
611    let mut buf = Vec::new();
612    encode_relation(&body.relation, &mut buf);
613    buf.extend_from_slice(&body.left_page_id.to_le_bytes());
614    buf.extend_from_slice(&body.left_new_size.to_le_bytes());
615    match &body.left_high_key {
616        Some(bytes) => {
617            buf.push(1);
618            encode_bytes(bytes, &mut buf);
619        }
620        None => buf.push(0),
621    }
622    buf.extend_from_slice(&body.left_next_page_id.to_le_bytes());
623    buf.extend_from_slice(&body.right_page_id.to_le_bytes());
624    buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
625    buf.extend_from_slice(&(body.right_entries.len() as u32).to_le_bytes());
626    for entry in &body.right_entries {
627        encode_internal_entry(entry, &mut buf);
628    }
629    match &body.right_high_key {
630        Some(bytes) => {
631            buf.push(1);
632            encode_bytes(bytes, &mut buf);
633        }
634        None => buf.push(0),
635    }
636    buf.extend_from_slice(&body.right_next_page_id.to_le_bytes());
637    buf
638}
639
640fn decode_internal_split(bytes: &[u8]) -> QuillSQLResult<IndexInternalSplitPayload> {
641    let (relation, mut offset) = decode_relation(bytes)?;
642    if bytes.len() < offset + 4 + 2 + 1 + 4 + 4 + 4 {
643        return Err(QuillSQLError::Internal(
644            "Index internal split payload too short".to_string(),
645        ));
646    }
647    let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
648    offset += 4;
649    let left_new_size = u16::from_le_bytes(bytes[offset..offset + 2].try_into().unwrap());
650    offset += 2;
651    let has_left_high = bytes[offset] != 0;
652    offset += 1;
653    let left_high_key = if has_left_high {
654        let (data, consumed) = decode_bytes(&bytes[offset..])?;
655        offset += consumed;
656        Some(data)
657    } else {
658        None
659    };
660    let left_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
661    offset += 4;
662    let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
663    offset += 4;
664    let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
665    offset += 4;
666    if bytes.len() < offset + 4 {
667        return Err(QuillSQLError::Internal(
668            "Index internal split entries length missing".to_string(),
669        ));
670    }
671    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
672    offset += 4;
673    let mut entries = Vec::with_capacity(entry_count);
674    let mut cursor = offset;
675    for _ in 0..entry_count {
676        if cursor >= bytes.len() {
677            return Err(QuillSQLError::Internal(
678                "Index internal split entries truncated".to_string(),
679            ));
680        }
681        let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
682        entries.push(entry);
683        cursor += consumed;
684    }
685    if bytes.len() <= cursor {
686        return Err(QuillSQLError::Internal(
687            "Index internal split missing right high key flag".to_string(),
688        ));
689    }
690    let has_right_high = bytes[cursor] != 0;
691    cursor += 1;
692    let right_high_key = if has_right_high {
693        let (data, consumed) = decode_bytes(&bytes[cursor..])?;
694        cursor += consumed;
695        Some(data)
696    } else {
697        None
698    };
699    if bytes.len() < cursor + 4 {
700        return Err(QuillSQLError::Internal(
701            "Index internal split missing right_next_page_id".to_string(),
702        ));
703    }
704    let right_next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
705    Ok(IndexInternalSplitPayload {
706        relation,
707        left_page_id,
708        left_new_size,
709        left_high_key,
710        left_next_page_id,
711        right_page_id,
712        internal_max_size,
713        right_entries: entries,
714        right_high_key,
715        right_next_page_id,
716    })
717}
718
719fn encode_internal_merge(body: &IndexInternalMergePayload) -> Vec<u8> {
720    let mut buf = Vec::new();
721    encode_relation(&body.relation, &mut buf);
722    buf.extend_from_slice(&body.left_page_id.to_le_bytes());
723    buf.extend_from_slice(&body.right_page_id.to_le_bytes());
724    buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
725    buf.extend_from_slice(&(body.left_entries.len() as u32).to_le_bytes());
726    for entry in &body.left_entries {
727        encode_internal_entry(entry, &mut buf);
728    }
729    match &body.high_key {
730        Some(bytes) => {
731            buf.push(1);
732            encode_bytes(bytes, &mut buf);
733        }
734        None => buf.push(0),
735    }
736    buf.extend_from_slice(&body.next_page_id.to_le_bytes());
737    buf
738}
739
740fn decode_internal_merge(bytes: &[u8]) -> QuillSQLResult<IndexInternalMergePayload> {
741    let (relation, mut offset) = decode_relation(bytes)?;
742    if bytes.len() < offset + 4 + 4 + 4 {
743        return Err(QuillSQLError::Internal(
744            "Index internal merge payload too short".to_string(),
745        ));
746    }
747    let left_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
748    offset += 4;
749    let right_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
750    offset += 4;
751    let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
752    offset += 4;
753    if bytes.len() < offset + 4 {
754        return Err(QuillSQLError::Internal(
755            "Index internal merge entry length missing".to_string(),
756        ));
757    }
758    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
759    offset += 4;
760    let mut left_entries = Vec::with_capacity(entry_count);
761    let mut cursor = offset;
762    for _ in 0..entry_count {
763        if cursor >= bytes.len() {
764            return Err(QuillSQLError::Internal(
765                "Index internal merge entries truncated".to_string(),
766            ));
767        }
768        let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
769        left_entries.push(entry);
770        cursor += consumed;
771    }
772    if bytes.len() <= cursor {
773        return Err(QuillSQLError::Internal(
774            "Index internal merge missing high key flag".to_string(),
775        ));
776    }
777    let has_high_key = bytes[cursor] != 0;
778    cursor += 1;
779    let high_key = if has_high_key {
780        let (data, consumed) = decode_bytes(&bytes[cursor..])?;
781        cursor += consumed;
782        Some(data)
783    } else {
784        None
785    };
786    if bytes.len() < cursor + 4 {
787        return Err(QuillSQLError::Internal(
788            "Index internal merge missing next_page_id".to_string(),
789        ));
790    }
791    let next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
792    Ok(IndexInternalMergePayload {
793        relation,
794        left_page_id,
795        right_page_id,
796        internal_max_size,
797        left_entries,
798        high_key,
799        next_page_id,
800    })
801}
802
803fn encode_parent_insert(body: &IndexParentInsertPayload) -> Vec<u8> {
804    let mut buf = Vec::new();
805    encode_relation(&body.relation, &mut buf);
806    buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
807    buf.extend_from_slice(&body.left_child_page_id.to_le_bytes());
808    buf.extend_from_slice(&body.right_child_page_id.to_le_bytes());
809    encode_bytes(&body.key_data, &mut buf);
810    buf
811}
812
813fn decode_parent_insert(bytes: &[u8]) -> QuillSQLResult<IndexParentInsertPayload> {
814    let (relation, mut offset) = decode_relation(bytes)?;
815    if bytes.len() < offset + 4 * 3 {
816        return Err(QuillSQLError::Internal(
817            "Index parent insert payload too short".to_string(),
818        ));
819    }
820    let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
821    offset += 4;
822    let left_child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
823    offset += 4;
824    let right_child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
825    offset += 4;
826    let (key_data, _) = decode_bytes(&bytes[offset..])?;
827    Ok(IndexParentInsertPayload {
828        relation,
829        parent_page_id,
830        left_child_page_id,
831        right_child_page_id,
832        key_data,
833    })
834}
835
836fn encode_parent_delete(body: &IndexParentDeletePayload) -> Vec<u8> {
837    let mut buf = Vec::new();
838    encode_relation(&body.relation, &mut buf);
839    buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
840    buf.extend_from_slice(&body.child_page_id.to_le_bytes());
841    buf
842}
843
844fn decode_parent_delete(bytes: &[u8]) -> QuillSQLResult<IndexParentDeletePayload> {
845    let (relation, mut offset) = decode_relation(bytes)?;
846    if bytes.len() < offset + 8 {
847        return Err(QuillSQLError::Internal(
848            "Index parent delete payload too short".to_string(),
849        ));
850    }
851    let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
852    offset += 4;
853    let child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
854    Ok(IndexParentDeletePayload {
855        relation,
856        parent_page_id,
857        child_page_id,
858    })
859}
860
861fn encode_parent_update(body: &IndexParentUpdatePayload) -> Vec<u8> {
862    let mut buf = Vec::new();
863    encode_relation(&body.relation, &mut buf);
864    buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
865    buf.extend_from_slice(&body.child_page_id.to_le_bytes());
866    encode_bytes(&body.key_data, &mut buf);
867    buf
868}
869
870fn decode_parent_update(bytes: &[u8]) -> QuillSQLResult<IndexParentUpdatePayload> {
871    let (relation, mut offset) = decode_relation(bytes)?;
872    if bytes.len() < offset + 8 {
873        return Err(QuillSQLError::Internal(
874            "Index parent update payload too short".to_string(),
875        ));
876    }
877    let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
878    offset += 4;
879    let child_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
880    offset += 4;
881    let (key_data, _) = decode_bytes(&bytes[offset..])?;
882    Ok(IndexParentUpdatePayload {
883        relation,
884        parent_page_id,
885        child_page_id,
886        key_data,
887    })
888}
889
890fn encode_leaf_redistribute(body: &IndexLeafRedistributePayload) -> Vec<u8> {
891    let mut buf = Vec::new();
892    encode_relation(&body.relation, &mut buf);
893    buf.extend_from_slice(&body.from_page_id.to_le_bytes());
894    buf.extend_from_slice(&body.to_page_id.to_le_bytes());
895    buf.push(if body.from_is_left { 1 } else { 0 });
896    encode_leaf_split_entry(&body.moved_entry, &mut buf);
897    buf
898}
899
900fn decode_leaf_redistribute(bytes: &[u8]) -> QuillSQLResult<IndexLeafRedistributePayload> {
901    let (relation, mut offset) = decode_relation(bytes)?;
902    if bytes.len() < offset + 4 + 4 + 1 {
903        return Err(QuillSQLError::Internal(
904            "Index leaf redistribute payload too short".to_string(),
905        ));
906    }
907    let from_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
908    offset += 4;
909    let to_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
910    offset += 4;
911    let from_is_left = bytes[offset] != 0;
912    offset += 1;
913    let (moved_entry, _) = decode_leaf_split_entry(&bytes[offset..])?;
914    Ok(IndexLeafRedistributePayload {
915        relation,
916        from_page_id,
917        to_page_id,
918        from_is_left,
919        moved_entry,
920    })
921}
922
923fn encode_internal_redistribute(body: &IndexInternalRedistributePayload) -> Vec<u8> {
924    let mut buf = Vec::new();
925    encode_relation(&body.relation, &mut buf);
926    buf.extend_from_slice(&body.from_page_id.to_le_bytes());
927    buf.extend_from_slice(&body.to_page_id.to_le_bytes());
928    buf.extend_from_slice(&body.parent_page_id.to_le_bytes());
929    buf.push(if body.from_is_left { 1 } else { 0 });
930    buf.extend_from_slice(&body.from_old_sentinel.to_le_bytes());
931    buf.extend_from_slice(&body.to_old_sentinel.to_le_bytes());
932    encode_internal_entry(&body.moved_entry, &mut buf);
933    encode_bytes(&body.separator_key_data, &mut buf);
934    match &body.to_new_high_key {
935        Some(bytes) => {
936            buf.push(1);
937            encode_bytes(bytes, &mut buf);
938        }
939        None => buf.push(0),
940    }
941    buf.extend_from_slice(&body.to_new_next_page_id.to_le_bytes());
942    match &body.from_new_high_key {
943        Some(bytes) => {
944            buf.push(1);
945            encode_bytes(bytes, &mut buf);
946        }
947        None => buf.push(0),
948    }
949    buf.extend_from_slice(&body.from_new_next_page_id.to_le_bytes());
950    buf
951}
952
953fn decode_internal_redistribute(bytes: &[u8]) -> QuillSQLResult<IndexInternalRedistributePayload> {
954    let (relation, mut offset) = decode_relation(bytes)?;
955    if bytes.len() < offset + 4 + 4 + 4 + 1 + 4 + 4 {
956        return Err(QuillSQLError::Internal(
957            "Index internal redistribute payload too short".to_string(),
958        ));
959    }
960    let from_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
961    offset += 4;
962    let to_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
963    offset += 4;
964    let parent_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
965    offset += 4;
966    let from_is_left = bytes[offset] != 0;
967    offset += 1;
968    let from_old_sentinel = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
969    offset += 4;
970    let to_old_sentinel = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
971    offset += 4;
972    let (moved_entry, consumed_entry) = decode_internal_entry(&bytes[offset..])?;
973    offset += consumed_entry;
974    let (separator_key_data, consumed_sep) = decode_bytes(&bytes[offset..])?;
975    offset += consumed_sep;
976    if bytes.len() <= offset {
977        return Err(QuillSQLError::Internal(
978            "Index internal redistribute missing to high key flag".to_string(),
979        ));
980    }
981    let has_to_high = bytes[offset] != 0;
982    offset += 1;
983    let to_new_high_key = if has_to_high {
984        let (data, consumed) = decode_bytes(&bytes[offset..])?;
985        offset += consumed;
986        Some(data)
987    } else {
988        None
989    };
990    if bytes.len() < offset + 4 {
991        return Err(QuillSQLError::Internal(
992            "Index internal redistribute missing to next page".to_string(),
993        ));
994    }
995    let to_new_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
996    offset += 4;
997    if bytes.len() <= offset {
998        return Err(QuillSQLError::Internal(
999            "Index internal redistribute missing from high key flag".to_string(),
1000        ));
1001    }
1002    let has_from_high = bytes[offset] != 0;
1003    offset += 1;
1004    let from_new_high_key = if has_from_high {
1005        let (data, consumed) = decode_bytes(&bytes[offset..])?;
1006        offset += consumed;
1007        Some(data)
1008    } else {
1009        None
1010    };
1011    if bytes.len() < offset + 4 {
1012        return Err(QuillSQLError::Internal(
1013            "Index internal redistribute missing from next page".to_string(),
1014        ));
1015    }
1016    let from_new_next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1017    Ok(IndexInternalRedistributePayload {
1018        relation,
1019        from_page_id,
1020        to_page_id,
1021        parent_page_id,
1022        from_is_left,
1023        from_old_sentinel,
1024        to_old_sentinel,
1025        moved_entry,
1026        separator_key_data,
1027        to_new_high_key,
1028        to_new_next_page_id,
1029        from_new_high_key,
1030        from_new_next_page_id,
1031    })
1032}
1033
1034fn encode_root_install_leaf(body: &IndexRootInstallLeafPayload) -> Vec<u8> {
1035    let mut buf = Vec::new();
1036    encode_relation(&body.relation, &mut buf);
1037    buf.extend_from_slice(&body.page_id.to_le_bytes());
1038    buf.extend_from_slice(&body.leaf_max_size.to_le_bytes());
1039    buf.extend_from_slice(&body.next_page_id.to_le_bytes());
1040    buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
1041    for entry in &body.entries {
1042        encode_leaf_split_entry(entry, &mut buf);
1043    }
1044    buf
1045}
1046
1047fn decode_root_install_leaf(bytes: &[u8]) -> QuillSQLResult<IndexRootInstallLeafPayload> {
1048    let (relation, mut offset) = decode_relation(bytes)?;
1049    if bytes.len() < offset + 4 + 4 + 4 {
1050        return Err(QuillSQLError::Internal(
1051            "Index root install leaf payload too short".to_string(),
1052        ));
1053    }
1054    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1055    offset += 4;
1056    let leaf_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1057    offset += 4;
1058    let next_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1059    offset += 4;
1060    if bytes.len() < offset + 4 {
1061        return Err(QuillSQLError::Internal(
1062            "Index root install leaf entries length missing".to_string(),
1063        ));
1064    }
1065    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
1066    offset += 4;
1067    let mut entries = Vec::with_capacity(entry_count);
1068    let mut cursor = offset;
1069    for _ in 0..entry_count {
1070        if cursor >= bytes.len() {
1071            return Err(QuillSQLError::Internal(
1072                "Index root install leaf entries truncated".to_string(),
1073            ));
1074        }
1075        let (entry, consumed) = decode_leaf_split_entry(&bytes[cursor..])?;
1076        entries.push(entry);
1077        cursor += consumed;
1078    }
1079    Ok(IndexRootInstallLeafPayload {
1080        relation,
1081        page_id,
1082        leaf_max_size,
1083        next_page_id,
1084        entries,
1085    })
1086}
1087
1088fn encode_root_install_internal(body: &IndexRootInstallInternalPayload) -> Vec<u8> {
1089    let mut buf = Vec::new();
1090    encode_relation(&body.relation, &mut buf);
1091    buf.extend_from_slice(&body.page_id.to_le_bytes());
1092    buf.extend_from_slice(&body.internal_max_size.to_le_bytes());
1093    buf.extend_from_slice(&(body.entries.len() as u32).to_le_bytes());
1094    for entry in &body.entries {
1095        encode_internal_entry(entry, &mut buf);
1096    }
1097    match &body.high_key {
1098        Some(bytes) => {
1099            buf.push(1);
1100            encode_bytes(bytes, &mut buf);
1101        }
1102        None => buf.push(0),
1103    }
1104    buf.extend_from_slice(&body.next_page_id.to_le_bytes());
1105    buf
1106}
1107
1108fn decode_root_install_internal(bytes: &[u8]) -> QuillSQLResult<IndexRootInstallInternalPayload> {
1109    let (relation, mut offset) = decode_relation(bytes)?;
1110    if bytes.len() < offset + 4 + 4 {
1111        return Err(QuillSQLError::Internal(
1112            "Index root install internal payload too short".to_string(),
1113        ));
1114    }
1115    let page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1116    offset += 4;
1117    let internal_max_size = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1118    offset += 4;
1119    if bytes.len() < offset + 4 {
1120        return Err(QuillSQLError::Internal(
1121            "Index root install internal entries length missing".to_string(),
1122        ));
1123    }
1124    let entry_count = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
1125    offset += 4;
1126    let mut entries = Vec::with_capacity(entry_count);
1127    let mut cursor = offset;
1128    for _ in 0..entry_count {
1129        if cursor >= bytes.len() {
1130            return Err(QuillSQLError::Internal(
1131                "Index root install internal entries truncated".to_string(),
1132            ));
1133        }
1134        let (entry, consumed) = decode_internal_entry(&bytes[cursor..])?;
1135        entries.push(entry);
1136        cursor += consumed;
1137    }
1138    if bytes.len() <= cursor {
1139        return Err(QuillSQLError::Internal(
1140            "Index root install internal missing high key flag".to_string(),
1141        ));
1142    }
1143    let has_high_key = bytes[cursor] != 0;
1144    cursor += 1;
1145    let high_key = if has_high_key {
1146        let (data, consumed) = decode_bytes(&bytes[cursor..])?;
1147        cursor += consumed;
1148        Some(data)
1149    } else {
1150        None
1151    };
1152    if bytes.len() < cursor + 4 {
1153        return Err(QuillSQLError::Internal(
1154            "Index root install internal missing next_page_id".to_string(),
1155        ));
1156    }
1157    let next_page_id = u32::from_le_bytes(bytes[cursor..cursor + 4].try_into().unwrap());
1158    Ok(IndexRootInstallInternalPayload {
1159        relation,
1160        page_id,
1161        internal_max_size,
1162        entries,
1163        high_key,
1164        next_page_id,
1165    })
1166}
1167
1168fn encode_root_adopt(body: &IndexRootAdoptPayload) -> Vec<u8> {
1169    let mut buf = Vec::new();
1170    encode_relation(&body.relation, &mut buf);
1171    buf.extend_from_slice(&body.new_root_page_id.to_le_bytes());
1172    buf
1173}
1174
1175fn decode_root_adopt(bytes: &[u8]) -> QuillSQLResult<IndexRootAdoptPayload> {
1176    let (relation, offset) = decode_relation(bytes)?;
1177    if bytes.len() < offset + 4 {
1178        return Err(QuillSQLError::Internal(
1179            "Index root adopt payload too short".to_string(),
1180        ));
1181    }
1182    let new_root_page_id = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
1183    Ok(IndexRootAdoptPayload {
1184        relation,
1185        new_root_page_id,
1186    })
1187}
1188
1189fn encode_root_reset(body: &IndexRootResetPayload) -> Vec<u8> {
1190    let mut buf = Vec::new();
1191    encode_relation(&body.relation, &mut buf);
1192    buf
1193}
1194
1195fn decode_root_reset(bytes: &[u8]) -> QuillSQLResult<IndexRootResetPayload> {
1196    let (relation, _) = decode_relation(bytes)?;
1197    Ok(IndexRootResetPayload { relation })
1198}
1199
1200fn encode_relation(relation: &IndexRelationIdent, buf: &mut Vec<u8>) {
1201    buf.extend_from_slice(&relation.header_page_id.to_le_bytes());
1202    encode_schema(&relation.schema, buf);
1203}
1204
1205fn decode_relation(bytes: &[u8]) -> QuillSQLResult<(IndexRelationIdent, usize)> {
1206    if bytes.len() < 4 {
1207        return Err(QuillSQLError::Internal(
1208            "Index relation ident too short".to_string(),
1209        ));
1210    }
1211    let header_page_id = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
1212    let (schema, consumed) = decode_schema(&bytes[4..])?;
1213    Ok((
1214        IndexRelationIdent {
1215            header_page_id,
1216            schema,
1217        },
1218        4 + consumed,
1219    ))
1220}
1221
1222fn encode_schema(schema: &SchemaRepr, buf: &mut Vec<u8>) {
1223    buf.extend(CommonCodec::encode_u16(schema.columns.len() as u16));
1224    for column in &schema.columns {
1225        encode_string(&column.name, buf);
1226        encode_data_type(column.data_type, buf);
1227        buf.push(column.nullable as u8);
1228    }
1229}
1230
1231fn encode_data_type(data_type: DataType, buf: &mut Vec<u8>) {
1232    match data_type {
1233        DataType::Boolean => buf.push(0),
1234        DataType::Int8 => buf.push(1),
1235        DataType::Int16 => buf.push(2),
1236        DataType::Int32 => buf.push(3),
1237        DataType::Int64 => buf.push(4),
1238        DataType::UInt8 => buf.push(5),
1239        DataType::UInt16 => buf.push(6),
1240        DataType::UInt32 => buf.push(7),
1241        DataType::UInt64 => buf.push(8),
1242        DataType::Float32 => buf.push(9),
1243        DataType::Float64 => buf.push(10),
1244        DataType::Varchar(len) => {
1245            buf.push(11);
1246            match len {
1247                Some(n) => {
1248                    buf.push(1);
1249                    buf.extend(CommonCodec::encode_u32(n as u32));
1250                }
1251                None => buf.push(0),
1252            }
1253        }
1254    }
1255}
1256
1257fn decode_schema(bytes: &[u8]) -> QuillSQLResult<(SchemaRepr, usize)> {
1258    let (col_count, mut offset) = CommonCodec::decode_u16(bytes)?;
1259    let mut columns = Vec::with_capacity(col_count as usize);
1260    for _ in 0..col_count {
1261        let (name, consumed_name) = decode_string(&bytes[offset..])?;
1262        offset += consumed_name;
1263        let (data_type, consumed_dt) = decode_data_type(&bytes[offset..])?;
1264        offset += consumed_dt;
1265        if offset >= bytes.len() {
1266            return Err(QuillSQLError::Internal(
1267                "Schema payload truncated (nullable)".to_string(),
1268            ));
1269        }
1270        let nullable = bytes[offset] != 0;
1271        offset += 1;
1272        columns.push(ColumnRepr {
1273            name,
1274            data_type,
1275            nullable,
1276        });
1277    }
1278    Ok((SchemaRepr { columns }, offset))
1279}
1280
1281fn decode_data_type(bytes: &[u8]) -> QuillSQLResult<(DataType, usize)> {
1282    if bytes.is_empty() {
1283        return Err(QuillSQLError::Internal(
1284            "Schema payload truncated (datatype tag)".to_string(),
1285        ));
1286    }
1287    let tag = bytes[0];
1288    let mut consumed = 1;
1289    let data_type = match tag {
1290        0 => DataType::Boolean,
1291        1 => DataType::Int8,
1292        2 => DataType::Int16,
1293        3 => DataType::Int32,
1294        4 => DataType::Int64,
1295        5 => DataType::UInt8,
1296        6 => DataType::UInt16,
1297        7 => DataType::UInt32,
1298        8 => DataType::UInt64,
1299        9 => DataType::Float32,
1300        10 => DataType::Float64,
1301        11 => {
1302            if bytes.len() < consumed + 1 {
1303                return Err(QuillSQLError::Internal(
1304                    "Schema payload truncated (varchar flag)".to_string(),
1305                ));
1306            }
1307            let has_len = bytes[consumed] != 0;
1308            consumed += 1;
1309            if has_len {
1310                let (len, len_consumed) = CommonCodec::decode_u32(&bytes[consumed..])?;
1311                consumed += len_consumed;
1312                DataType::Varchar(Some(len as usize))
1313            } else {
1314                DataType::Varchar(None)
1315            }
1316        }
1317        other => {
1318            return Err(QuillSQLError::Internal(format!(
1319                "Unknown datatype tag {} in schema repr",
1320                other
1321            )))
1322        }
1323    };
1324    Ok((data_type, consumed))
1325}
1326
1327fn encode_bytes(data: &[u8], buf: &mut Vec<u8>) {
1328    buf.extend(CommonCodec::encode_u32(data.len() as u32));
1329    buf.extend_from_slice(data);
1330}
1331
1332fn decode_bytes(bytes: &[u8]) -> QuillSQLResult<(Vec<u8>, usize)> {
1333    let (len, mut offset) = CommonCodec::decode_u32(bytes)?;
1334    let len = len as usize;
1335    if bytes.len() < offset + len {
1336        return Err(QuillSQLError::Internal(
1337            "Index WAL payload truncated while decoding bytes".to_string(),
1338        ));
1339    }
1340    let data = bytes[offset..offset + len].to_vec();
1341    offset += len;
1342    Ok((data, offset))
1343}
1344
1345fn encode_string(value: &str, buf: &mut Vec<u8>) {
1346    let bytes = value.as_bytes();
1347    buf.extend(CommonCodec::encode_u16(bytes.len() as u16));
1348    buf.extend_from_slice(bytes);
1349}
1350
1351fn decode_string(bytes: &[u8]) -> QuillSQLResult<(String, usize)> {
1352    let (len, mut offset) = CommonCodec::decode_u16(bytes)?;
1353    let len = len as usize;
1354    if bytes.len() < offset + len {
1355        return Err(QuillSQLError::Internal(
1356            "Index WAL payload truncated while decoding string".to_string(),
1357        ));
1358    }
1359    let value = std::str::from_utf8(&bytes[offset..offset + len])
1360        .map_err(|e| QuillSQLError::Internal(format!("Invalid utf8 in schema repr: {}", e)))?
1361        .to_string();
1362    offset += len;
1363    Ok((value, offset))
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368    use super::*;
1369    use crate::catalog::{Column, DataType, Schema};
1370    use crate::storage::codec::TupleCodec;
1371    use crate::storage::tuple::Tuple;
1372    use crate::utils::scalar::ScalarValue;
1373    use std::sync::Arc;
1374
1375    fn schema() -> SchemaRef {
1376        Arc::new(Schema::new(vec![Column::new("k", DataType::Int32, false)]))
1377    }
1378
1379    fn roundtrip(payload: IndexRecordPayload, kind: IndexRecordKind) {
1380        let (info, bytes) = encode_index_record(&payload);
1381        assert_eq!(info, kind as u8);
1382        let decoded = decode_index_record(&bytes, info).unwrap();
1383        match (payload, decoded) {
1384            (IndexRecordPayload::LeafInsert(a), IndexRecordPayload::LeafInsert(b)) => {
1385                assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1386                assert_eq!(a.page_id, b.page_id);
1387                assert_eq!(a.op_txn_id, b.op_txn_id);
1388                assert_eq!(a.key_data, b.key_data);
1389                assert_eq!(a.rid, b.rid);
1390            }
1391            (IndexRecordPayload::LeafDelete(a), IndexRecordPayload::LeafDelete(b)) => {
1392                assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1393                assert_eq!(a.page_id, b.page_id);
1394                assert_eq!(a.op_txn_id, b.op_txn_id);
1395                assert_eq!(a.key_data, b.key_data);
1396                assert_eq!(a.old_rid, b.old_rid);
1397            }
1398            (
1399                IndexRecordPayload::RootInstallInternal(a),
1400                IndexRecordPayload::RootInstallInternal(b),
1401            ) => {
1402                assert_eq!(a.relation.header_page_id, b.relation.header_page_id);
1403                assert_eq!(a.page_id, b.page_id);
1404                assert_eq!(a.internal_max_size, b.internal_max_size);
1405                assert_eq!(a.entries.len(), b.entries.len());
1406                assert_eq!(a.high_key, b.high_key);
1407                assert_eq!(a.next_page_id, b.next_page_id);
1408            }
1409            (lhs, rhs) => panic!("payload variant mismatch: {:?} vs {:?}", lhs, rhs),
1410        }
1411    }
1412
1413    #[test]
1414    fn leaf_insert_roundtrip() {
1415        let schema = schema();
1416        let payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
1417            relation: IndexRelationIdent {
1418                header_page_id: 99,
1419                schema: SchemaRepr::from(schema.clone()),
1420            },
1421            page_id: 7,
1422            op_txn_id: 1,
1423            key_data: TupleCodec::encode(&Tuple::new(schema, vec![ScalarValue::Int32(Some(3))])),
1424            rid: RecordId::new(4, 2),
1425        });
1426        roundtrip(payload, IndexRecordKind::LeafInsert);
1427    }
1428
1429    #[test]
1430    fn leaf_delete_roundtrip() {
1431        let schema = schema();
1432        let payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
1433            relation: IndexRelationIdent {
1434                header_page_id: 3,
1435                schema: SchemaRepr::from(schema.clone()),
1436            },
1437            page_id: 2,
1438            op_txn_id: 4,
1439            key_data: TupleCodec::encode(&Tuple::new(schema, vec![ScalarValue::Int32(Some(5))])),
1440            old_rid: RecordId::new(9, 1),
1441        });
1442        roundtrip(payload, IndexRecordKind::LeafDelete);
1443    }
1444
1445    #[test]
1446    fn root_install_internal_roundtrip() {
1447        let schema = schema();
1448        let payload = IndexRecordPayload::RootInstallInternal(IndexRootInstallInternalPayload {
1449            relation: IndexRelationIdent {
1450                header_page_id: 5,
1451                schema: SchemaRepr::from(schema.clone()),
1452            },
1453            page_id: 6,
1454            internal_max_size: 8,
1455            entries: vec![IndexInternalEntryPayload {
1456                key_data: TupleCodec::encode(&Tuple::new(
1457                    schema.clone(),
1458                    vec![ScalarValue::Int32(Some(10))],
1459                )),
1460                child_page_id: 11,
1461            }],
1462            high_key: None,
1463            next_page_id: 12,
1464        });
1465        roundtrip(payload, IndexRecordKind::RootInstallInternal);
1466    }
1467}