1use crate::PageType;
2use crate::reader::{NativeReader, Row};
3use crate::utils::{parse_record, parse_varint};
4use std::sync::Arc;
5use yykv_types::DsError;
6
7type Result<T> = std::result::Result<T, DsError>;
8
9#[derive(Debug, Clone)]
11pub struct BTreePageHeader {
12 pub page_type: PageType,
13 pub first_freeblock: u16,
14 pub cell_count: u16,
15 pub cell_content_offset: u16,
16 pub fragmented_free_bytes: u8,
17 pub right_most_pointer: Option<u32>,
18}
19
20#[derive(Clone)]
22pub struct BTreePage {
23 pub id: u32,
24 pub header: BTreePageHeader,
25 pub data: Arc<Vec<u8>>,
26}
27
28#[derive(Debug)]
30pub enum BTreeCell {
31 TableLeaf {
32 payload_size: u64,
33 row_id: i64,
34 payload_offset: usize,
35 local_payload_size: usize,
36 overflow_page: Option<u32>,
37 },
38 TableInterior {
39 left_child_pointer: u32,
40 key: i64,
41 },
42 IndexLeaf {
43 payload_size: u64,
44 payload_offset: usize,
45 local_payload_size: usize,
46 overflow_page: Option<u32>,
47 },
48 IndexInterior {
49 left_child_pointer: u32,
50 payload_size: u64,
51 payload_offset: usize,
52 local_payload_size: usize,
53 overflow_page: Option<u32>,
54 },
55}
56
57impl BTreePage {
58 pub fn get_cell(&self, index: u16) -> Result<BTreeCell> {
60 let offsets = self.get_cell_offsets();
61 if index as usize >= offsets.len() {
62 return Err(DsError::internal(format!(
63 "Cell index out of bounds: {}",
64 index
65 )));
66 }
67
68 let mut current_offset = offsets[index as usize] as usize;
69 let page_size = self.data.len();
70
71 match self.header.page_type {
72 PageType::LeafTable => {
73 let (payload_size, consumed1) = parse_varint(&self.data[current_offset..]);
74 current_offset += consumed1;
75 let (row_id, consumed2) = parse_varint(&self.data[current_offset..]);
76 current_offset += consumed2;
77
78 let u = page_size as u64;
79 let x = u - 35;
80 let m = ((u - 12) * 32 / 255) - 23;
81
82 let (local_payload_size, overflow_page) = if payload_size <= x {
83 (payload_size as usize, None)
84 } else {
85 let k = m + ((payload_size - m) % (u - 4));
86 let local = if k <= x { k } else { m };
87
88 let overflow_ptr_offset = current_offset + local as usize;
89 let overflow_page = u32::from_be_bytes([
90 self.data[overflow_ptr_offset],
91 self.data[overflow_ptr_offset + 1],
92 self.data[overflow_ptr_offset + 2],
93 self.data[overflow_ptr_offset + 3],
94 ]);
95
96 (
97 local as usize,
98 if overflow_page > 0 {
99 Some(overflow_page)
100 } else {
101 None
102 },
103 )
104 };
105
106 Ok(BTreeCell::TableLeaf {
107 payload_size,
108 row_id: row_id as i64,
109 payload_offset: current_offset,
110 local_payload_size,
111 overflow_page,
112 })
113 }
114 PageType::InteriorTable => {
115 let left_child_pointer = u32::from_be_bytes([
116 self.data[current_offset],
117 self.data[current_offset + 1],
118 self.data[current_offset + 2],
119 self.data[current_offset + 3],
120 ]);
121 let (key, _) = parse_varint(&self.data[current_offset + 4..]);
122 Ok(BTreeCell::TableInterior {
123 left_child_pointer,
124 key: key as i64,
125 })
126 }
127 _ => Err(DsError::storage(format!(
128 "Unsupported page type for get_cell: {:?}",
129 self.header.page_type
130 ))),
131 }
132 }
133
134 pub async fn get_rows(&self, reader: &NativeReader) -> Result<Vec<Row>> {
136 let mut rows = Vec::new();
137 match self.header.page_type {
138 PageType::LeafTable => {
139 for i in 0..self.header.cell_count {
140 let cell = self.get_cell(i)?;
141 if let BTreeCell::TableLeaf {
142 row_id,
143 payload_offset,
144 local_payload_size,
145 ..
146 } = cell
147 {
148 let payload =
149 &self.data[payload_offset..payload_offset + local_payload_size];
150 let values = parse_record(payload)?;
151 rows.push(Row { row_id, values });
152 }
153 }
154 }
155 PageType::InteriorTable => {
156 for i in 0..self.header.cell_count {
157 let cell = self.get_cell(i)?;
158 if let BTreeCell::TableInterior {
159 left_child_pointer, ..
160 } = cell
161 {
162 let mut child_rows =
163 Box::pin(reader.scan_table(left_child_pointer)).await?;
164 rows.append(&mut child_rows);
165 }
166 }
167 if let Some(right_ptr) = self.header.right_most_pointer {
168 let mut child_rows = Box::pin(reader.scan_table(right_ptr)).await?;
169 rows.append(&mut child_rows);
170 }
171 }
172 _ => {
173 return Err(DsError::storage(format!(
174 "Get rows not supported on page type: {:?}",
175 self.header.page_type
176 )));
177 }
178 }
179 Ok(rows)
180 }
181
182 pub fn get_cell_offsets(&self) -> Vec<u16> {
184 let actual_header_start = if self.id == 1 { 100 } else { 0 };
186 let cell_ptr_array_start = actual_header_start + self.header_size();
187
188 let mut offsets = Vec::new();
189 for i in 0..self.header.cell_count {
190 let ptr_offset = cell_ptr_array_start + (i * 2) as usize;
191 let offset = u16::from_be_bytes([self.data[ptr_offset], self.data[ptr_offset + 1]]);
192 offsets.push(offset);
193 }
194 offsets
195 }
196
197 fn header_size(&self) -> usize {
198 match self.header.page_type {
199 PageType::LeafTable | PageType::LeafIndex => 8,
200 PageType::InteriorTable | PageType::InteriorIndex => 12,
201 }
202 }
203
204 pub fn is_interior(&self) -> bool {
205 match self.header.page_type {
206 PageType::InteriorTable | PageType::InteriorIndex => true,
207 _ => false,
208 }
209 }
210
211 pub fn decode(id: u32, data: &[u8]) -> Result<Self> {
212 let header_offset = if id == 1 { 100 } else { 0 };
213 let page_type_byte = data[header_offset];
214 let page_type = match page_type_byte {
215 0x0D => PageType::LeafTable,
216 0x05 => PageType::InteriorTable,
217 0x0A => PageType::LeafIndex,
218 0x02 => PageType::InteriorIndex,
219 _ => {
220 return Err(DsError::storage(format!(
221 "Invalid page type: 0x{:02X}",
222 page_type_byte
223 )));
224 }
225 };
226
227 let first_freeblock =
228 u16::from_be_bytes([data[header_offset + 1], data[header_offset + 2]]);
229 let cell_count = u16::from_be_bytes([data[header_offset + 3], data[header_offset + 4]]);
230 let cell_content_offset =
231 u16::from_be_bytes([data[header_offset + 5], data[header_offset + 6]]);
232 let fragmented_free_bytes = data[header_offset + 7];
233
234 let right_most_pointer =
235 if page_type == PageType::InteriorTable || page_type == PageType::InteriorIndex {
236 Some(u32::from_be_bytes([
237 data[header_offset + 8],
238 data[header_offset + 9],
239 data[header_offset + 10],
240 data[header_offset + 11],
241 ]))
242 } else {
243 None
244 };
245
246 Ok(Self {
247 id,
248 header: BTreePageHeader {
249 page_type,
250 first_freeblock,
251 cell_count,
252 cell_content_offset,
253 fragmented_free_bytes,
254 right_most_pointer,
255 },
256 data: Arc::new(data.to_vec()),
257 })
258 }
259}