1use crate::buffer::PAGE_SIZE;
2use crate::catalog::SchemaRef;
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, DecodedData, RidCodec, TupleCodec};
5use crate::storage::page::RecordId;
6use crate::storage::page::{
7 BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeInternalPageHeader, BPlusTreeLeafPage,
8 BPlusTreeLeafPageHeader, BPlusTreePage, BPlusTreePageType,
9};
10use crate::storage::tuple::Tuple;
11
12pub struct BPlusTreeHeaderPageCodec;
13
14impl BPlusTreeHeaderPageCodec {
15 pub fn encode(page: &BPlusTreeHeaderPage) -> Vec<u8> {
16 let mut bytes = vec![];
17 bytes.extend(CommonCodec::encode_u32(page.root_page_id));
18 bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
19 bytes
20 }
21
22 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeHeaderPage>> {
23 if bytes.len() != PAGE_SIZE {
24 return Err(QuillSQLError::Storage(format!(
25 "Header page size is not {} instead of {}",
26 PAGE_SIZE,
27 bytes.len()
28 )));
29 }
30
31 let (root_page_id, offset) = CommonCodec::decode_u32(bytes)?;
32 let page = BPlusTreeHeaderPage { root_page_id };
33
34 Ok((page, offset))
35 }
36}
37
38pub struct BPlusTreePageCodec;
39
40impl BPlusTreePageCodec {
41 pub fn encode(page: &BPlusTreePage) -> Vec<u8> {
42 match page {
43 BPlusTreePage::Leaf(page) => BPlusTreeLeafPageCodec::encode(page),
44 BPlusTreePage::Internal(page) => BPlusTreeInternalPageCodec::encode(page),
45 }
46 }
47
48 pub fn decode(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<DecodedData<BPlusTreePage>> {
49 if bytes.len() != PAGE_SIZE {
50 return Err(QuillSQLError::Storage(format!(
51 "Index page size is not {} instead of {}",
52 PAGE_SIZE,
53 bytes.len()
54 )));
55 }
56
57 let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
59
60 match page_type {
61 BPlusTreePageType::LeafPage => {
62 let (page, offset) = BPlusTreeLeafPageCodec::decode(bytes, schema.clone())?;
63 Ok((BPlusTreePage::Leaf(page), offset))
64 }
65 BPlusTreePageType::InternalPage => {
66 let (page, offset) = BPlusTreeInternalPageCodec::decode(bytes, schema.clone())?;
67 Ok((BPlusTreePage::Internal(page), offset))
68 }
69 }
70 }
71
72 pub fn decode_header(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreePageType>> {
74 BPlusTreePageTypeCodec::decode(bytes)
75 }
76}
77
78pub struct BPlusTreeLeafPageCodec;
79
80impl BPlusTreeLeafPageCodec {
81 pub fn encode(page: &BPlusTreeLeafPage) -> Vec<u8> {
82 let mut bytes = vec![];
83 bytes.extend(BPlusTreeLeafPageHeaderCodec::encode(&page.header));
84 for (tuple, rid) in page.array.iter() {
85 bytes.extend(TupleCodec::encode(tuple));
86 bytes.extend(RidCodec::encode(rid));
87 }
88 assert!(bytes.len() <= PAGE_SIZE);
90 bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
91 bytes
92 }
93
94 pub fn decode(
95 bytes: &[u8],
96 schema: SchemaRef,
97 ) -> QuillSQLResult<DecodedData<BPlusTreeLeafPage>> {
98 if bytes.len() != PAGE_SIZE {
99 return Err(QuillSQLError::Storage(format!(
100 "Index page size is not {} instead of {}",
101 PAGE_SIZE,
102 bytes.len()
103 )));
104 }
105 let mut left_bytes = bytes;
106
107 let (page_type, _) = BPlusTreePageTypeCodec::decode(left_bytes)?;
109
110 if matches!(page_type, BPlusTreePageType::LeafPage) {
111 let (header, offset) = BPlusTreeLeafPageHeaderCodec::decode(left_bytes)?;
112 left_bytes = &left_bytes[offset..];
113
114 let mut array = vec![];
115 for _ in 0..header.current_size {
116 let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
117 left_bytes = &left_bytes[offset..];
118 let (rid, offset) = RidCodec::decode(left_bytes)?;
119 left_bytes = &left_bytes[offset..];
120 array.push((tuple, rid));
121 }
122
123 Ok((
124 BPlusTreeLeafPage {
125 schema,
126 header,
127 array,
128 },
129 PAGE_SIZE,
130 ))
131 } else {
132 Err(QuillSQLError::Storage(
133 "Index page type must be leaf page".to_string(),
134 ))
135 }
136 }
137
138 pub fn decode_header_only(
140 bytes: &[u8],
141 ) -> QuillSQLResult<DecodedData<BPlusTreeLeafPageHeader>> {
142 if bytes.len() != PAGE_SIZE {
143 return Err(QuillSQLError::Storage(format!(
144 "Index page size is not {} instead of {}",
145 PAGE_SIZE,
146 bytes.len()
147 )));
148 }
149 let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
150 if !matches!(page_type, BPlusTreePageType::LeafPage) {
151 return Err(QuillSQLError::Storage(
152 "Index page type must be leaf page".to_string(),
153 ));
154 }
155 BPlusTreeLeafPageHeaderCodec::decode(bytes)
156 }
157
158 pub fn decode_kv_at_offset(
161 bytes: &[u8],
162 schema: SchemaRef,
163 offset: usize,
164 ) -> QuillSQLResult<DecodedData<(Tuple, RecordId)>> {
165 if bytes.len() != PAGE_SIZE {
166 return Err(QuillSQLError::Storage(format!(
167 "Index page size is not {} instead of {}",
168 PAGE_SIZE,
169 bytes.len()
170 )));
171 }
172 let mut left_bytes = &bytes[offset..];
173 let (tuple, t_off) = TupleCodec::decode(left_bytes, schema.clone())?;
174 left_bytes = &left_bytes[t_off..];
175 let (rid, r_off) = RidCodec::decode(left_bytes)?;
176 let consumed = t_off + r_off;
177 Ok(((tuple, rid), offset + consumed))
178 }
179}
180
181pub struct BPlusTreeInternalPageCodec;
182
183impl BPlusTreeInternalPageCodec {
184 pub fn encode(page: &BPlusTreeInternalPage) -> Vec<u8> {
185 let mut bytes = vec![];
186 bytes.extend(BPlusTreeInternalPageHeaderCodec::encode(&page.header));
187 if let Some(hk) = &page.high_key {
189 bytes.extend(CommonCodec::encode_u8(1));
190 bytes.extend(TupleCodec::encode(hk));
191 } else {
192 bytes.extend(CommonCodec::encode_u8(0));
193 }
194 bytes.extend(CommonCodec::encode_u8(1)); let mut prev_key_bytes: Vec<u8> = Vec::new();
197 for (tuple, page_id) in page.array.iter() {
198 let full = TupleCodec::encode(tuple);
199 let mut lcp = 0usize;
200 let max_lcp = prev_key_bytes.len().min(full.len());
201 while lcp < max_lcp && prev_key_bytes[lcp] == full[lcp] {
202 lcp += 1;
203 }
204 let suffix = &full[lcp..];
205 bytes.extend(CommonCodec::encode_u16(lcp as u16));
206 bytes.extend(CommonCodec::encode_u32(suffix.len() as u32));
207 bytes.extend(suffix);
208 bytes.extend(CommonCodec::encode_u32(*page_id));
209 prev_key_bytes = full;
210 }
211 assert!(bytes.len() <= PAGE_SIZE);
213 bytes.extend(vec![0; PAGE_SIZE - bytes.len()]);
214 bytes
215 }
216
217 pub fn decode(
218 bytes: &[u8],
219 schema: SchemaRef,
220 ) -> QuillSQLResult<DecodedData<BPlusTreeInternalPage>> {
221 if bytes.len() != PAGE_SIZE {
222 return Err(QuillSQLError::Storage(format!(
223 "Index page size is not {} instead of {}",
224 PAGE_SIZE,
225 bytes.len()
226 )));
227 }
228 let mut left_bytes = bytes;
229
230 let (page_type, _) = BPlusTreePageTypeCodec::decode(left_bytes)?;
232
233 if matches!(page_type, BPlusTreePageType::InternalPage) {
234 let (header, offset) = BPlusTreeInternalPageHeaderCodec::decode(left_bytes)?;
235 left_bytes = &left_bytes[offset..];
236
237 let (has_high_key, offset) = CommonCodec::decode_u8(left_bytes)?;
239 left_bytes = &left_bytes[offset..];
240 let high_key = if has_high_key == 1 {
241 let (hk, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
242 left_bytes = &left_bytes[offset..];
243 Some(hk)
244 } else {
245 None
246 };
247
248 let (compress_flag, offset) = CommonCodec::decode_u8(left_bytes)?;
250 left_bytes = &left_bytes[offset..];
251
252 let mut array = vec![];
253 if compress_flag == 1 {
254 let mut prev_key_bytes: Vec<u8> = Vec::new();
256 for _ in 0..header.current_size {
257 let (lcp_u16, o1) = CommonCodec::decode_u16(left_bytes)?;
258 left_bytes = &left_bytes[o1..];
259 let (suf_len_u32, o2) = CommonCodec::decode_u32(left_bytes)?;
260 left_bytes = &left_bytes[o2..];
261 let suf_len = suf_len_u32 as usize;
262 let suffix = &left_bytes[..suf_len];
263 left_bytes = &left_bytes[suf_len..];
264 let mut full = Vec::with_capacity(lcp_u16 as usize + suf_len);
266 full.extend_from_slice(
267 &prev_key_bytes[..(lcp_u16 as usize).min(prev_key_bytes.len())],
268 );
269 full.extend_from_slice(suffix);
270 let (tuple, _off) = TupleCodec::decode(&full, schema.clone())?;
271 prev_key_bytes = full;
272 let (page_id, o3) = CommonCodec::decode_u32(left_bytes)?;
273 left_bytes = &left_bytes[o3..];
274 array.push((tuple, page_id));
275 }
276 } else {
277 for _ in 0..header.current_size {
279 let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
280 left_bytes = &left_bytes[offset..];
281 let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
282 left_bytes = &left_bytes[offset..];
283 array.push((tuple, page_id));
284 }
285 }
286
287 Ok((
288 BPlusTreeInternalPage {
289 schema,
290 header,
291 array,
292 high_key,
293 },
294 PAGE_SIZE,
295 ))
296 } else {
297 Err(QuillSQLError::Storage(
298 "Index page type must be internal page".to_string(),
299 ))
300 }
301 }
302
303 pub fn decode_header_only(
305 bytes: &[u8],
306 ) -> QuillSQLResult<DecodedData<BPlusTreeInternalPageHeader>> {
307 if bytes.len() != PAGE_SIZE {
308 return Err(QuillSQLError::Storage(format!(
309 "Index page size is not {} instead of {}",
310 PAGE_SIZE,
311 bytes.len()
312 )));
313 }
314 let (page_type, _) = BPlusTreePageTypeCodec::decode(bytes)?;
315 if !matches!(page_type, BPlusTreePageType::InternalPage) {
316 return Err(QuillSQLError::Storage(
317 "Index page type must be internal page".to_string(),
318 ));
319 }
320 BPlusTreeInternalPageHeaderCodec::decode(bytes)
321 }
322}
323
324impl BPlusTreeInternalPageCodec {
325 pub fn lookup_child_from_bytes(
327 bytes: &[u8],
328 schema: SchemaRef,
329 search_key: &Tuple,
330 ) -> QuillSQLResult<u32> {
331 if bytes.len() != PAGE_SIZE {
332 return Err(QuillSQLError::Storage(format!(
333 "Index page size is not {} instead of {}",
334 PAGE_SIZE,
335 bytes.len()
336 )));
337 }
338 let (page_type, _t_off) = BPlusTreePageTypeCodec::decode(bytes)?;
340 if !matches!(page_type, BPlusTreePageType::InternalPage) {
341 return Err(QuillSQLError::Storage(
342 "Index page type must be internal page".to_string(),
343 ));
344 }
345 let (header, h_off) = BPlusTreeInternalPageHeaderCodec::decode(bytes)?;
346 let mut left_bytes = &bytes[h_off..];
347 let (has_high_key, o1) = CommonCodec::decode_u8(left_bytes)?;
349 left_bytes = &left_bytes[o1..];
350 if has_high_key == 1 {
351 let (_hk, o2) = TupleCodec::decode(left_bytes, schema.clone())?;
352 left_bytes = &left_bytes[o2..];
353 }
354 let (compress_flag, o3) = CommonCodec::decode_u8(left_bytes)?;
356 left_bytes = &left_bytes[o3..];
357
358 let mut result_pid: u32;
359 if compress_flag == 1 {
360 let (_lcp, o4) = CommonCodec::decode_u16(left_bytes)?;
362 left_bytes = &left_bytes[o4..];
363 let (suf_len_u32, o5) = CommonCodec::decode_u32(left_bytes)?;
364 left_bytes = &left_bytes[o5..];
365 let suf_len = suf_len_u32 as usize;
366 let suffix = &left_bytes[..suf_len];
367 left_bytes = &left_bytes[suf_len..];
368 let (pid0, o6) = CommonCodec::decode_u32(left_bytes)?;
369 left_bytes = &left_bytes[o6..];
370 result_pid = pid0;
371 let mut prev_key_bytes: Vec<u8> = suffix.to_vec();
372 for _ in 1..header.current_size {
373 let (lcp_u16, o7) = CommonCodec::decode_u16(left_bytes)?;
374 left_bytes = &left_bytes[o7..];
375 let (suf_len_u32, o8) = CommonCodec::decode_u32(left_bytes)?;
376 left_bytes = &left_bytes[o8..];
377 let suf_len = suf_len_u32 as usize;
378 let suffix = &left_bytes[..suf_len];
379 left_bytes = &left_bytes[suf_len..];
380 let keep = (lcp_u16 as usize).min(prev_key_bytes.len());
381 let mut full = Vec::with_capacity(keep + suf_len);
382 full.extend_from_slice(&prev_key_bytes[..keep]);
383 full.extend_from_slice(suffix);
384 let (tuple_i, _off_i) = TupleCodec::decode(&full, schema.clone())?;
385 let (pid_i, o9) = CommonCodec::decode_u32(left_bytes)?;
386 left_bytes = &left_bytes[o9..];
387 if tuple_i <= *search_key {
388 result_pid = pid_i;
389 }
390 prev_key_bytes = full;
391 }
392 } else {
393 let (_sentinel, o4) = TupleCodec::decode(left_bytes, schema.clone())?;
395 left_bytes = &left_bytes[o4..];
396 let (pid0, o5) = CommonCodec::decode_u32(left_bytes)?;
397 left_bytes = &left_bytes[o5..];
398 result_pid = pid0;
399 for _ in 1..header.current_size {
400 let (tuple_i, o6) = TupleCodec::decode(left_bytes, schema.clone())?;
401 left_bytes = &left_bytes[o6..];
402 let (pid_i, o7) = CommonCodec::decode_u32(left_bytes)?;
403 left_bytes = &left_bytes[o7..];
404 if tuple_i <= *search_key {
405 result_pid = pid_i;
406 }
407 }
408 }
409 Ok(result_pid)
410 }
411}
412
413pub struct BPlusTreePageTypeCodec;
414
415impl BPlusTreePageTypeCodec {
416 pub fn encode(page_type: &BPlusTreePageType) -> Vec<u8> {
417 match page_type {
418 BPlusTreePageType::LeafPage => CommonCodec::encode_u8(1),
419 BPlusTreePageType::InternalPage => CommonCodec::encode_u8(2),
420 }
421 }
422
423 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreePageType>> {
424 let (flag, offset) = CommonCodec::decode_u8(bytes)?;
425 match flag {
426 1 => Ok((BPlusTreePageType::LeafPage, offset)),
427 2 => Ok((BPlusTreePageType::InternalPage, offset)),
428 _ => Err(QuillSQLError::Storage(format!(
429 "Invalid page type {}",
430 flag
431 ))),
432 }
433 }
434}
435
436pub struct BPlusTreeLeafPageHeaderCodec;
437
438impl BPlusTreeLeafPageHeaderCodec {
439 pub fn encode(header: &BPlusTreeLeafPageHeader) -> Vec<u8> {
440 let mut bytes = Vec::new();
441 bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
442 bytes.extend(CommonCodec::encode_u32(header.current_size));
443 bytes.extend(CommonCodec::encode_u32(header.max_size));
444 bytes.extend(CommonCodec::encode_u32(header.next_page_id));
445 bytes.extend(CommonCodec::encode_u32(header.version));
446 bytes
447 }
448
449 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeLeafPageHeader>> {
450 let mut left_bytes = bytes;
451
452 let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
453 left_bytes = &left_bytes[offset..];
454
455 let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
456 left_bytes = &left_bytes[offset..];
457
458 let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
459 left_bytes = &left_bytes[offset..];
460
461 let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
462 left_bytes = &left_bytes[offset..];
463
464 let (version, offset) = CommonCodec::decode_u32(left_bytes)?;
465 left_bytes = &left_bytes[offset..];
466
467 Ok((
468 BPlusTreeLeafPageHeader {
469 page_type,
470 current_size,
471 max_size,
472 next_page_id,
473 version,
474 },
475 bytes.len() - left_bytes.len(),
476 ))
477 }
478}
479
480pub struct BPlusTreeInternalPageHeaderCodec;
481
482impl BPlusTreeInternalPageHeaderCodec {
483 pub fn encode(header: &BPlusTreeInternalPageHeader) -> Vec<u8> {
484 let mut bytes = Vec::new();
485 bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
486 bytes.extend(CommonCodec::encode_u32(header.current_size));
487 bytes.extend(CommonCodec::encode_u32(header.max_size));
488 bytes.extend(CommonCodec::encode_u32(header.version));
489 bytes.extend(CommonCodec::encode_u32(header.next_page_id));
490 bytes
491 }
492
493 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<BPlusTreeInternalPageHeader>> {
494 let mut left_bytes = bytes;
495
496 let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
497 left_bytes = &left_bytes[offset..];
498
499 let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
500 left_bytes = &left_bytes[offset..];
501
502 let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
503 left_bytes = &left_bytes[offset..];
504
505 let (version, offset) = CommonCodec::decode_u32(left_bytes)?;
506 left_bytes = &left_bytes[offset..];
507
508 let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
509 left_bytes = &left_bytes[offset..];
510
511 Ok((
512 BPlusTreeInternalPageHeader {
513 page_type,
514 current_size,
515 max_size,
516 version,
517 next_page_id,
518 },
519 bytes.len() - left_bytes.len(),
520 ))
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use crate::catalog::{Column, DataType, Schema};
527 use crate::storage::codec::btree_page::BPlusTreePageCodec;
528 use crate::storage::page::RecordId;
529 use crate::storage::page::{BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage};
530 use crate::storage::tuple::Tuple;
531 use std::sync::Arc;
532
533 #[test]
534 fn index_page_codec() {
535 let schema = Arc::new(Schema::new(vec![
536 Column::new("a", DataType::Int8, true),
537 Column::new("b", DataType::Int32, true),
538 ]));
539 let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
540 let rid1 = RecordId::new(1, 1);
541 let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
542 let rid2 = RecordId::new(2, 2);
543
544 let mut leaf_page = BPlusTreeLeafPage::new(schema.clone(), 100);
545 leaf_page.insert(tuple1.clone(), rid1);
546 leaf_page.insert(tuple2.clone(), rid2);
547 let page = BPlusTreePage::Leaf(leaf_page);
548 let (new_page, _) =
549 BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
550 assert_eq!(new_page, page);
551
552 let mut internal_page = BPlusTreeInternalPage::new(schema.clone(), 100);
553 internal_page.insert(Tuple::empty(schema.clone()), 1);
554 internal_page.insert(tuple1, 2);
555 internal_page.insert(tuple2, 3);
556 let page = BPlusTreePage::Internal(internal_page);
557 let (new_page, _) =
558 BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
559 assert_eq!(new_page, page);
560 }
561}