quill_sql/storage/codec/
btree_page.rs

1use crate::buffer::PAGE_SIZE;
2use crate::catalog::SchemaRef;
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, DecodedData, RidCodec, TupleCodec};
5use crate::storage::page::RecordId;
6use crate::storage::page::{
7    BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeInternalPageHeader, BPlusTreeLeafPage,
8    BPlusTreeLeafPageHeader, BPlusTreePage, BPlusTreePageType,
9};
10use crate::storage::tuple::Tuple;
11
12pub struct BPlusTreeHeaderPageCodec;
13
14impl BPlusTreeHeaderPageCodec {
15    pub fn encode(page: &BPlusTreeHeaderPage) -> Vec<u8> {
16        let mut bytes = vec![];
17        bytes.extend(CommonCodec::encode_u32(page.root_page_id));
18        bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
19        bytes
20    }
21
22    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeHeaderPage>> {
23        if bytes.len() != PAGE_SIZE {
24            return Err(QuillSQLError::Storage(format!(
25                "Header page size is not {} instead of {}",
26                PAGE_SIZE,
27                bytes.len()
28            )));
29        }
30
31        let (root_page_id, offset) = CommonCodec::decode_u32(bytes)?;
32        let page = BPlusTreeHeaderPage { root_page_id };
33
34        Ok((page, offset))
35    }
36}
37
38pub struct BPlusTreePageCodec;
39
40impl BPlusTreePageCodec {
41    pub fn encode(page: &BPlusTreePage) -> Vec<u8> {
42        match page {
43            BPlusTreePage::Leaf(page) => BPlusTreeLeafPageCodec::encode(page),
44            BPlusTreePage::Internal(page) => BPlusTreeInternalPageCodec::encode(page),
45        }
46    }
47
48    pub fn decode(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<DecodedData<BPlusTreePage>> {
49        if bytes.len() != PAGE_SIZE {
50            return Err(QuillSQLError::Storage(format!(
51                "Index page size is not {} instead of {}",
52                PAGE_SIZE,
53                bytes.len()
54            )));
55        }
56
57        // not consume left_bytes
58        let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
59
60        match page_type {
61            BPlusTreePageType::LeafPage => {
62                let (page, offset) = BPlusTreeLeafPageCodec::decode(bytes, schema.clone())?;
63                Ok((BPlusTreePage::Leaf(page), offset))
64            }
65            BPlusTreePageType::InternalPage => {
66                let (page, offset) = BPlusTreeInternalPageCodec::decode(bytes, schema.clone())?;
67                Ok((BPlusTreePage::Internal(page), offset))
68            }
69        }
70    }
71
72    /// Decode only the page type header without materializing the entire page.
73    pub fn decode_header(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreePageType>> {
74        BPlusTreePageTypeCodec::decode(bytes)
75    }
76}
77
78pub struct BPlusTreeLeafPageCodec;
79
80impl BPlusTreeLeafPageCodec {
81    pub fn encode(page: &BPlusTreeLeafPage) -> Vec<u8> {
82        let mut bytes = vec![];
83        bytes.extend(BPlusTreeLeafPageHeaderCodec::encode(&page.header));
84        for (tuple, rid) in page.array.iter() {
85            bytes.extend(TupleCodec::encode(tuple));
86            bytes.extend(RidCodec::encode(rid));
87        }
88        // make sure length of bytes is PAGE_SIZE
89        assert!(bytes.len() <= PAGE_SIZE);
90        bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
91        bytes
92    }
93
94    pub fn decode(
95        bytes: &[u8],
96        schema: SchemaRef,
97    ) -> QuillSQLResult<DecodedData<BPlusTreeLeafPage>> {
98        if bytes.len() != PAGE_SIZE {
99            return Err(QuillSQLError::Storage(format!(
100                "Index page size is not {} instead of {}",
101                PAGE_SIZE,
102                bytes.len()
103            )));
104        }
105        let mut left_bytes = bytes;
106
107        // not consume left_bytes
108        let (page_type, _) = BPlusTreePageTypeCodec::decode(left_bytes)?;
109
110        if matches!(page_type, BPlusTreePageType::LeafPage) {
111            let (header, offset) = BPlusTreeLeafPageHeaderCodec::decode(left_bytes)?;
112            left_bytes = &left_bytes[offset..];
113
114            let mut array = vec![];
115            for _ in 0..header.current_size {
116                let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
117                left_bytes = &left_bytes[offset..];
118                let (rid, offset) = RidCodec::decode(left_bytes)?;
119                left_bytes = &left_bytes[offset..];
120                array.push((tuple, rid));
121            }
122
123            Ok((
124                BPlusTreeLeafPage {
125                    schema,
126                    header,
127                    array,
128                },
129                PAGE_SIZE,
130            ))
131        } else {
132            Err(QuillSQLError::Storage(
133                "Index page type must be leaf page".to_string(),
134            ))
135        }
136    }
137
138    /// Decode only the leaf page header and return (header, bytes_consumed)
139    pub fn decode_header_only(
140        bytes: &[u8],
141    ) -> QuillSQLResult<DecodedData<BPlusTreeLeafPageHeader>> {
142        if bytes.len() != PAGE_SIZE {
143            return Err(QuillSQLError::Storage(format!(
144                "Index page size is not {} instead of {}",
145                PAGE_SIZE,
146                bytes.len()
147            )));
148        }
149        let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
150        if !matches!(page_type, BPlusTreePageType::LeafPage) {
151            return Err(QuillSQLError::Storage(
152                "Index page type must be leaf page".to_string(),
153            ));
154        }
155        BPlusTreeLeafPageHeaderCodec::decode(bytes)
156    }
157
158    /// Decode one (Tuple, RecordId) starting at `offset` within the page bytes.
159    /// Returns ((tuple, rid), new_offset).
160    pub fn decode_kv_at_offset(
161        bytes: &[u8],
162        schema: SchemaRef,
163        offset: usize,
164    ) -> QuillSQLResult<DecodedData<(Tuple, RecordId)>> {
165        if bytes.len() != PAGE_SIZE {
166            return Err(QuillSQLError::Storage(format!(
167                "Index page size is not {} instead of {}",
168                PAGE_SIZE,
169                bytes.len()
170            )));
171        }
172        let mut left_bytes = &bytes[offset..];
173        let (tuple, t_off) = TupleCodec::decode(left_bytes, schema.clone())?;
174        left_bytes = &left_bytes[t_off..];
175        let (rid, r_off) = RidCodec::decode(left_bytes)?;
176        let consumed = t_off + r_off;
177        Ok(((tuple, rid), offset + consumed))
178    }
179}
180
181pub struct BPlusTreeInternalPageCodec;
182
183impl BPlusTreeInternalPageCodec {
184    pub fn encode(page: &BPlusTreeInternalPage) -> Vec<u8> {
185        let mut bytes = vec![];
186        bytes.extend(BPlusTreeInternalPageHeaderCodec::encode(&page.header));
187        // encode optional high_key presence
188        if let Some(hk) = &page.high_key {
189            bytes.extend(CommonCodec::encode_u8(1));
190            bytes.extend(TupleCodec::encode(hk));
191        } else {
192            bytes.extend(CommonCodec::encode_u8(0));
193        }
194        // enable prefix-compress for internal keys
195        bytes.extend(CommonCodec::encode_u8(1)); // compression flag = 1 (prefix-compressed)
196        let mut prev_key_bytes: Vec<u8> = Vec::new();
197        for (tuple, page_id) in page.array.iter() {
198            let full = TupleCodec::encode(tuple);
199            let mut lcp = 0usize;
200            let max_lcp = prev_key_bytes.len().min(full.len());
201            while lcp < max_lcp && prev_key_bytes[lcp] == full[lcp] {
202                lcp += 1;
203            }
204            let suffix = &full[lcp..];
205            bytes.extend(CommonCodec::encode_u16(lcp as u16));
206            bytes.extend(CommonCodec::encode_u32(suffix.len() as u32));
207            bytes.extend(suffix);
208            bytes.extend(CommonCodec::encode_u32(*page_id));
209            prev_key_bytes = full;
210        }
211        // make sure length of bytes is PAGE_SIZE
212        assert!(bytes.len() <= PAGE_SIZE);
213        bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
214        bytes
215    }
216
217    pub fn decode(
218        bytes: &[u8],
219        schema: SchemaRef,
220    ) -> QuillSQLResult<DecodedData<BPlusTreeInternalPage>> {
221        if bytes.len() != PAGE_SIZE {
222            return Err(QuillSQLError::Storage(format!(
223                "Index page size is not {} instead of {}",
224                PAGE_SIZE,
225                bytes.len()
226            )));
227        }
228        let mut left_bytes = bytes;
229
230        // not consume left_bytes
231        let (page_type, _) = BPlusTreePageTypeCodec::decode(left_bytes)?;
232
233        if matches!(page_type, BPlusTreePageType::InternalPage) {
234            let (header, offset) = BPlusTreeInternalPageHeaderCodec::decode(left_bytes)?;
235            left_bytes = &left_bytes[offset..];
236
237            // decode optional high_key
238            let (has_high_key, offset) = CommonCodec::decode_u8(left_bytes)?;
239            left_bytes = &left_bytes[offset..];
240            let high_key = if has_high_key == 1 {
241                let (hk, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
242                left_bytes = &left_bytes[offset..];
243                Some(hk)
244            } else {
245                None
246            };
247
248            // compression flag
249            let (compress_flag, offset) = CommonCodec::decode_u8(left_bytes)?;
250            left_bytes = &left_bytes[offset..];
251
252            let mut array = vec![];
253            if compress_flag == 1 {
254                // prefix-compressed decoding
255                let mut prev_key_bytes: Vec<u8> = Vec::new();
256                for _ in 0..header.current_size {
257                    let (lcp_u16, o1) = CommonCodec::decode_u16(left_bytes)?;
258                    left_bytes = &left_bytes[o1..];
259                    let (suf_len_u32, o2) = CommonCodec::decode_u32(left_bytes)?;
260                    left_bytes = &left_bytes[o2..];
261                    let suf_len = suf_len_u32 as usize;
262                    let suffix = &left_bytes[..suf_len];
263                    left_bytes = &left_bytes[suf_len..];
264                    // reconstruct full key bytes
265                    let mut full = Vec::with_capacity(lcp_u16 as usize + suf_len);
266                    full.extend_from_slice(
267                        &prev_key_bytes[..(lcp_u16 as usize).min(prev_key_bytes.len())],
268                    );
269                    full.extend_from_slice(suffix);
270                    let (tuple, _off) = TupleCodec::decode(&full, schema.clone())?;
271                    prev_key_bytes = full;
272                    let (page_id, o3) = CommonCodec::decode_u32(left_bytes)?;
273                    left_bytes = &left_bytes[o3..];
274                    array.push((tuple, page_id));
275                }
276            } else {
277                // legacy raw format
278                for _ in 0..header.current_size {
279                    let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
280                    left_bytes = &left_bytes[offset..];
281                    let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
282                    left_bytes = &left_bytes[offset..];
283                    array.push((tuple, page_id));
284                }
285            }
286
287            Ok((
288                BPlusTreeInternalPage {
289                    schema,
290                    header,
291                    array,
292                    high_key,
293                },
294                PAGE_SIZE,
295            ))
296        } else {
297            Err(QuillSQLError::Storage(
298                "Index page type must be internal page".to_string(),
299            ))
300        }
301    }
302
303    /// Decode only the internal page header without materializing payload.
304    pub fn decode_header_only(
305        bytes: &[u8],
306    ) -> QuillSQLResult<DecodedData<BPlusTreeInternalPageHeader>> {
307        if bytes.len() != PAGE_SIZE {
308            return Err(QuillSQLError::Storage(format!(
309                "Index page size is not {} instead of {}",
310                PAGE_SIZE,
311                bytes.len()
312            )));
313        }
314        let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
315        if !matches!(page_type, BPlusTreePageType::InternalPage) {
316            return Err(QuillSQLError::Storage(
317                "Index page type must be internal page".to_string(),
318            ));
319        }
320        BPlusTreeInternalPageHeaderCodec::decode(bytes)
321    }
322}
323
324impl BPlusTreeInternalPageCodec {
325    /// Lookup child page id from an internal page byte slice without fully decoding the page.
326    pub fn lookup_child_from_bytes(
327        bytes: &[u8],
328        schema: SchemaRef,
329        search_key: &Tuple,
330    ) -> QuillSQLResult<u32> {
331        if bytes.len() != PAGE_SIZE {
332            return Err(QuillSQLError::Storage(format!(
333                "Index page size is not {} instead of {}",
334                PAGE_SIZE,
335                bytes.len()
336            )));
337        }
338        // Decode type and header
339        let (page_type, _t_off) = BPlusTreePageTypeCodec::decode(bytes)?;
340        if !matches!(page_type, BPlusTreePageType::InternalPage) {
341            return Err(QuillSQLError::Storage(
342                "Index page type must be internal page".to_string(),
343            ));
344        }
345        let (header, h_off) = BPlusTreeInternalPageHeaderCodec::decode(bytes)?;
346        let mut left_bytes = &bytes[h_off..];
347        // Optional high_key
348        let (has_high_key, o1) = CommonCodec::decode_u8(left_bytes)?;
349        left_bytes = &left_bytes[o1..];
350        if has_high_key == 1 {
351            let (_hk, o2) = TupleCodec::decode(left_bytes, schema.clone())?;
352            left_bytes = &left_bytes[o2..];
353        }
354        // Compression flag
355        let (compress_flag, o3) = CommonCodec::decode_u8(left_bytes)?;
356        left_bytes = &left_bytes[o3..];
357
358        let mut result_pid: u32;
359        if compress_flag == 1 {
360            // sentinel
361            let (_lcp, o4) = CommonCodec::decode_u16(left_bytes)?;
362            left_bytes = &left_bytes[o4..];
363            let (suf_len_u32, o5) = CommonCodec::decode_u32(left_bytes)?;
364            left_bytes = &left_bytes[o5..];
365            let suf_len = suf_len_u32 as usize;
366            let suffix = &left_bytes[..suf_len];
367            left_bytes = &left_bytes[suf_len..];
368            let (pid0, o6) = CommonCodec::decode_u32(left_bytes)?;
369            left_bytes = &left_bytes[o6..];
370            result_pid = pid0;
371            let mut prev_key_bytes: Vec<u8> = suffix.to_vec();
372            for _ in 1..header.current_size {
373                let (lcp_u16, o7) = CommonCodec::decode_u16(left_bytes)?;
374                left_bytes = &left_bytes[o7..];
375                let (suf_len_u32, o8) = CommonCodec::decode_u32(left_bytes)?;
376                left_bytes = &left_bytes[o8..];
377                let suf_len = suf_len_u32 as usize;
378                let suffix = &left_bytes[..suf_len];
379                left_bytes = &left_bytes[suf_len..];
380                let keep = (lcp_u16 as usize).min(prev_key_bytes.len());
381                let mut full = Vec::with_capacity(keep + suf_len);
382                full.extend_from_slice(&prev_key_bytes[..keep]);
383                full.extend_from_slice(suffix);
384                let (tuple_i, _off_i) = TupleCodec::decode(&full, schema.clone())?;
385                let (pid_i, o9) = CommonCodec::decode_u32(left_bytes)?;
386                left_bytes = &left_bytes[o9..];
387                if tuple_i <= *search_key {
388                    result_pid = pid_i;
389                }
390                prev_key_bytes = full;
391            }
392        } else {
393            // raw format
394            let (_sentinel, o4) = TupleCodec::decode(left_bytes, schema.clone())?;
395            left_bytes = &left_bytes[o4..];
396            let (pid0, o5) = CommonCodec::decode_u32(left_bytes)?;
397            left_bytes = &left_bytes[o5..];
398            result_pid = pid0;
399            for _ in 1..header.current_size {
400                let (tuple_i, o6) = TupleCodec::decode(left_bytes, schema.clone())?;
401                left_bytes = &left_bytes[o6..];
402                let (pid_i, o7) = CommonCodec::decode_u32(left_bytes)?;
403                left_bytes = &left_bytes[o7..];
404                if tuple_i <= *search_key {
405                    result_pid = pid_i;
406                }
407            }
408        }
409        Ok(result_pid)
410    }
411}
412
413pub struct BPlusTreePageTypeCodec;
414
415impl BPlusTreePageTypeCodec {
416    pub fn encode(page_type: &BPlusTreePageType) -> Vec<u8> {
417        match page_type {
418            BPlusTreePageType::LeafPage => CommonCodec::encode_u8(1),
419            BPlusTreePageType::InternalPage => CommonCodec::encode_u8(2),
420        }
421    }
422
423    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreePageType>> {
424        let (flag, offset) = CommonCodec::decode_u8(bytes)?;
425        match flag {
426            1 => Ok((BPlusTreePageType::LeafPage, offset)),
427            2 => Ok((BPlusTreePageType::InternalPage, offset)),
428            _ => Err(QuillSQLError::Storage(format!(
429                "Invalid page type {}",
430                flag
431            ))),
432        }
433    }
434}
435
436pub struct BPlusTreeLeafPageHeaderCodec;
437
438impl BPlusTreeLeafPageHeaderCodec {
439    pub fn encode(header: &BPlusTreeLeafPageHeader) -> Vec<u8> {
440        let mut bytes = Vec::new();
441        bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
442        bytes.extend(CommonCodec::encode_u32(header.current_size));
443        bytes.extend(CommonCodec::encode_u32(header.max_size));
444        bytes.extend(CommonCodec::encode_u32(header.next_page_id));
445        bytes.extend(CommonCodec::encode_u32(header.version));
446        bytes
447    }
448
449    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeLeafPageHeader>> {
450        let mut left_bytes = bytes;
451
452        let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
453        left_bytes = &left_bytes[offset..];
454
455        let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
456        left_bytes = &left_bytes[offset..];
457
458        let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
459        left_bytes = &left_bytes[offset..];
460
461        let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
462        left_bytes = &left_bytes[offset..];
463
464        let (version, offset) = CommonCodec::decode_u32(left_bytes)?;
465        left_bytes = &left_bytes[offset..];
466
467        Ok((
468            BPlusTreeLeafPageHeader {
469                page_type,
470                current_size,
471                max_size,
472                next_page_id,
473                version,
474            },
475            bytes.len() - left_bytes.len(),
476        ))
477    }
478}
479
480pub struct BPlusTreeInternalPageHeaderCodec;
481
482impl BPlusTreeInternalPageHeaderCodec {
483    pub fn encode(header: &BPlusTreeInternalPageHeader) -> Vec<u8> {
484        let mut bytes = Vec::new();
485        bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
486        bytes.extend(CommonCodec::encode_u32(header.current_size));
487        bytes.extend(CommonCodec::encode_u32(header.max_size));
488        bytes.extend(CommonCodec::encode_u32(header.version));
489        bytes.extend(CommonCodec::encode_u32(header.next_page_id));
490        bytes
491    }
492
493    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeInternalPageHeader>> {
494        let mut left_bytes = bytes;
495
496        let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
497        left_bytes = &left_bytes[offset..];
498
499        let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
500        left_bytes = &left_bytes[offset..];
501
502        let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
503        left_bytes = &left_bytes[offset..];
504
505        let (version, offset) = CommonCodec::decode_u32(left_bytes)?;
506        left_bytes = &left_bytes[offset..];
507
508        let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
509        left_bytes = &left_bytes[offset..];
510
511        Ok((
512            BPlusTreeInternalPageHeader {
513                page_type,
514                current_size,
515                max_size,
516                version,
517                next_page_id,
518            },
519            bytes.len() - left_bytes.len(),
520        ))
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use crate::catalog::{Column, DataType, Schema};
527    use crate::storage::codec::btree_page::BPlusTreePageCodec;
528    use crate::storage::page::RecordId;
529    use crate::storage::page::{BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage};
530    use crate::storage::tuple::Tuple;
531    use std::sync::Arc;
532
533    #[test]
534    fn index_page_codec() {
535        let schema = Arc::new(Schema::new(vec![
536            Column::new("a", DataType::Int8, true),
537            Column::new("b", DataType::Int32, true),
538        ]));
539        let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
540        let rid1 = RecordId::new(1, 1);
541        let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
542        let rid2 = RecordId::new(2, 2);
543
544        let mut leaf_page = BPlusTreeLeafPage::new(schema.clone(), 100);
545        leaf_page.insert(tuple1.clone(), rid1);
546        leaf_page.insert(tuple2.clone(), rid2);
547        let page = BPlusTreePage::Leaf(leaf_page);
548        let (new_page, _) =
549            BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
550        assert_eq!(new_page, page);
551
552        let mut internal_page = BPlusTreeInternalPage::new(schema.clone(), 100);
553        internal_page.insert(Tuple::empty(schema.clone()), 1);
554        internal_page.insert(tuple1, 2);
555        internal_page.insert(tuple2, 3);
556        let page = BPlusTreePage::Internal(internal_page);
557        let (new_page, _) =
558            BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
559        assert_eq!(new_page, page);
560    }
561}