1use crate::error::{Result, SQLRiteError};
47use crate::sql::pager::cell::{Cell, KIND_INTERIOR};
48use crate::sql::pager::page::PAYLOAD_PER_PAGE;
49use crate::sql::pager::varint;
50
51const OFFSET_SLOT_COUNT: usize = 0;
53const OFFSET_CELLS_TOP: usize = 2;
54const OFFSET_RIGHTMOST: usize = 4;
55const PAGE_PAYLOAD_HEADER: usize = 8;
56
57const SLOT_SIZE: usize = 2;
58
59#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct InteriorCell {
66 pub divider_rowid: i64,
67 pub child_page: u32,
68}
69
70impl InteriorCell {
71 pub fn encode(&self) -> Vec<u8> {
72 let mut body = Vec::with_capacity(1 + varint::MAX_VARINT_BYTES + 4);
73 body.push(KIND_INTERIOR);
74 varint::write_i64(&mut body, self.divider_rowid);
75 body.extend_from_slice(&self.child_page.to_le_bytes());
76
77 let mut out = Vec::with_capacity(body.len() + varint::MAX_VARINT_BYTES);
78 varint::write_u64(&mut out, body.len() as u64);
79 out.extend_from_slice(&body);
80 out
81 }
82
83 pub fn decode(buf: &[u8], pos: usize) -> Result<(InteriorCell, usize)> {
84 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
85 let body_start = pos + len_bytes;
86 let body_end = body_start
87 .checked_add(body_len as usize)
88 .ok_or_else(|| SQLRiteError::Internal("interior cell length overflow".to_string()))?;
89 if body_end > buf.len() {
90 return Err(SQLRiteError::Internal(format!(
91 "interior cell extends past buffer: needs {body_start}..{body_end}, have {}",
92 buf.len()
93 )));
94 }
95 let body = &buf[body_start..body_end];
96 if body.first().copied() != Some(KIND_INTERIOR) {
97 return Err(SQLRiteError::Internal(format!(
98 "InteriorCell::decode called on non-interior entry (kind_tag = {:#x})",
99 body.first().copied().unwrap_or(0)
100 )));
101 }
102 let mut cur = 1usize;
103 let (divider_rowid, n) = varint::read_i64(body, cur)?;
104 cur += n;
105 if cur + 4 > body.len() {
106 return Err(SQLRiteError::Internal(
107 "interior cell truncated before child_page".to_string(),
108 ));
109 }
110 let child_page = u32::from_le_bytes(body[cur..cur + 4].try_into().unwrap());
111 cur += 4;
112 if cur != body.len() {
113 return Err(SQLRiteError::Internal(format!(
114 "interior cell had {} trailing bytes",
115 body.len() - cur
116 )));
117 }
118 Ok((
119 InteriorCell {
120 divider_rowid,
121 child_page,
122 },
123 body_end - pos,
124 ))
125 }
126}
127
128pub struct InteriorPage {
133 buf: Box<[u8; PAYLOAD_PER_PAGE]>,
134}
135
136impl InteriorPage {
137 pub fn empty(rightmost_child: u32) -> Self {
142 let mut buf = Box::new([0u8; PAYLOAD_PER_PAGE]);
143 write_u16(&mut buf[..], OFFSET_SLOT_COUNT, 0);
144 write_u16(&mut buf[..], OFFSET_CELLS_TOP, PAYLOAD_PER_PAGE as u16);
145 write_u32(&mut buf[..], OFFSET_RIGHTMOST, rightmost_child);
146 Self { buf }
147 }
148
149 pub fn from_bytes(bytes: &[u8; PAYLOAD_PER_PAGE]) -> Self {
150 Self {
151 buf: Box::new(*bytes),
152 }
153 }
154
155 pub fn as_bytes(&self) -> &[u8; PAYLOAD_PER_PAGE] {
156 &self.buf
157 }
158
159 pub fn slot_count(&self) -> usize {
160 read_u16(&self.buf[..], OFFSET_SLOT_COUNT) as usize
161 }
162
163 fn set_slot_count(&mut self, n: usize) {
164 write_u16(&mut self.buf[..], OFFSET_SLOT_COUNT, n as u16);
165 }
166
167 pub fn cells_top(&self) -> usize {
168 read_u16(&self.buf[..], OFFSET_CELLS_TOP) as usize
169 }
170
171 fn set_cells_top(&mut self, v: usize) {
172 write_u16(&mut self.buf[..], OFFSET_CELLS_TOP, v as u16);
173 }
174
175 pub fn rightmost_child(&self) -> u32 {
176 read_u32(&self.buf[..], OFFSET_RIGHTMOST)
177 }
178
179 #[allow(dead_code)]
180 pub fn set_rightmost_child(&mut self, page: u32) {
181 write_u32(&mut self.buf[..], OFFSET_RIGHTMOST, page);
182 }
183
184 const fn slots_start() -> usize {
185 PAGE_PAYLOAD_HEADER
186 }
187
188 fn slots_end(&self) -> usize {
189 Self::slots_start() + self.slot_count() * SLOT_SIZE
190 }
191
192 pub fn free_space(&self) -> usize {
193 self.cells_top().saturating_sub(self.slots_end())
194 }
195
196 pub fn would_fit(&self, cell_encoded_size: usize) -> bool {
197 cell_encoded_size.saturating_add(SLOT_SIZE) <= self.free_space()
198 }
199
200 fn slot_offset(&self, slot: usize) -> Result<usize> {
201 if slot >= self.slot_count() {
202 return Err(SQLRiteError::Internal(format!(
203 "slot {slot} out of bounds (count = {})",
204 self.slot_count()
205 )));
206 }
207 let at = Self::slots_start() + slot * SLOT_SIZE;
208 Ok(read_u16(&self.buf[..], at) as usize)
209 }
210
211 fn set_slot_offset(&mut self, slot: usize, offset: usize) {
212 let at = Self::slots_start() + slot * SLOT_SIZE;
213 write_u16(&mut self.buf[..], at, offset as u16);
214 }
215
216 pub fn divider_at(&self, slot: usize) -> Result<i64> {
218 let offset = self.slot_offset(slot)?;
219 Cell::peek_rowid(&self.buf[..], offset)
220 }
221
222 pub fn cell_at(&self, slot: usize) -> Result<InteriorCell> {
223 let offset = self.slot_offset(slot)?;
224 let (c, _) = InteriorCell::decode(&self.buf[..], offset)?;
225 Ok(c)
226 }
227
228 pub fn insert_divider(&mut self, divider_rowid: i64, child_page: u32) -> Result<()> {
233 let mut lo = 0usize;
235 let mut hi = self.slot_count();
236 while lo < hi {
237 let mid = lo + (hi - lo) / 2;
238 let mid_rowid = self.divider_at(mid)?;
239 match mid_rowid.cmp(÷r_rowid) {
240 std::cmp::Ordering::Equal => {
241 return Err(SQLRiteError::Internal(format!(
242 "duplicate interior divider {divider_rowid}"
243 )));
244 }
245 std::cmp::Ordering::Less => lo = mid + 1,
246 std::cmp::Ordering::Greater => hi = mid,
247 }
248 }
249 let insert_at = lo;
250
251 let encoded = InteriorCell {
252 divider_rowid,
253 child_page,
254 }
255 .encode();
256
257 if !self.would_fit(encoded.len()) {
258 return Err(SQLRiteError::Internal(format!(
259 "interior page full: cell of {} bytes + slot wouldn't fit in {} bytes",
260 encoded.len(),
261 self.free_space()
262 )));
263 }
264
265 let new_cells_top = self.cells_top() - encoded.len();
266 self.buf[new_cells_top..new_cells_top + encoded.len()].copy_from_slice(&encoded);
267 self.set_cells_top(new_cells_top);
268
269 let old_count = self.slot_count();
270 let shift_start = Self::slots_start() + insert_at * SLOT_SIZE;
271 let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
272 self.buf
273 .copy_within(shift_start..shift_end, shift_start + SLOT_SIZE);
274 self.set_slot_count(old_count + 1);
275 self.set_slot_offset(insert_at, new_cells_top);
276 Ok(())
277 }
278
279 #[allow(dead_code)]
283 pub fn child_for(&self, rowid: i64) -> Result<u32> {
284 let mut lo = 0usize;
286 let mut hi = self.slot_count();
287 while lo < hi {
288 let mid = lo + (hi - lo) / 2;
289 let mid_rowid = self.divider_at(mid)?;
290 if mid_rowid < rowid {
291 lo = mid + 1;
292 } else {
293 hi = mid;
294 }
295 }
296 if lo < self.slot_count() {
297 let c = self.cell_at(lo)?;
298 Ok(c.child_page)
299 } else {
300 Ok(self.rightmost_child())
301 }
302 }
303
304 pub fn leftmost_child(&self) -> Result<u32> {
308 if self.slot_count() == 0 {
309 Ok(self.rightmost_child())
310 } else {
311 Ok(self.cell_at(0)?.child_page)
312 }
313 }
314}
315
316fn read_u16(buf: &[u8], offset: usize) -> u16 {
317 u16::from_le_bytes([buf[offset], buf[offset + 1]])
318}
319
320fn write_u16(buf: &mut [u8], offset: usize, value: u16) {
321 let bytes = value.to_le_bytes();
322 buf[offset] = bytes[0];
323 buf[offset + 1] = bytes[1];
324}
325
326fn read_u32(buf: &[u8], offset: usize) -> u32 {
327 u32::from_le_bytes([
328 buf[offset],
329 buf[offset + 1],
330 buf[offset + 2],
331 buf[offset + 3],
332 ])
333}
334
335fn write_u32(buf: &mut [u8], offset: usize, value: u32) {
336 let bytes = value.to_le_bytes();
337 buf[offset..offset + 4].copy_from_slice(&bytes);
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn interior_cell_round_trip() {
346 let c = InteriorCell {
347 divider_rowid: 42,
348 child_page: 7,
349 };
350 let bytes = c.encode();
351 let (back, n) = InteriorCell::decode(&bytes, 0).unwrap();
352 assert_eq!(back, c);
353 assert_eq!(n, bytes.len());
354 }
355
356 #[test]
357 fn peek_rowid_works_on_interior_cells() {
358 let c = InteriorCell {
359 divider_rowid: -99,
360 child_page: 13,
361 };
362 let bytes = c.encode();
363 assert_eq!(Cell::peek_rowid(&bytes, 0).unwrap(), -99);
364 }
365
366 #[test]
367 fn decode_rejects_wrong_kind_tag() {
368 use crate::sql::pager::cell::KIND_LOCAL;
369 let mut buf = Vec::new();
371 buf.push(1);
372 buf.push(KIND_LOCAL);
373 let err = InteriorCell::decode(&buf, 0).unwrap_err();
374 assert!(format!("{err}").contains("non-interior"));
375 }
376
377 #[test]
378 fn empty_interior_has_rightmost_but_no_dividers() {
379 let p = InteriorPage::empty(99);
380 assert_eq!(p.rightmost_child(), 99);
381 assert_eq!(p.slot_count(), 0);
382 assert_eq!(p.child_for(0).unwrap(), 99);
383 assert_eq!(p.child_for(i64::MAX).unwrap(), 99);
384 }
385
386 #[test]
387 fn insert_dividers_and_route_by_rowid() {
388 let mut p = InteriorPage::empty(100);
389 p.insert_divider(10, 1).unwrap();
390 p.insert_divider(20, 2).unwrap();
391 p.insert_divider(30, 3).unwrap();
392 assert_eq!(p.slot_count(), 3);
393
394 assert_eq!(p.child_for(5).unwrap(), 1);
396 assert_eq!(p.child_for(10).unwrap(), 1);
397 assert_eq!(p.child_for(11).unwrap(), 2);
399 assert_eq!(p.child_for(20).unwrap(), 2);
400 assert_eq!(p.child_for(21).unwrap(), 3);
402 assert_eq!(p.child_for(30).unwrap(), 3);
403 assert_eq!(p.child_for(31).unwrap(), 100);
405 assert_eq!(p.child_for(i64::MAX).unwrap(), 100);
406 }
407
408 #[test]
409 fn inserts_out_of_order_still_route_correctly() {
410 let mut p = InteriorPage::empty(0);
411 p.insert_divider(30, 3).unwrap();
412 p.insert_divider(10, 1).unwrap();
413 p.insert_divider(20, 2).unwrap();
414 assert_eq!(p.divider_at(0).unwrap(), 10);
416 assert_eq!(p.divider_at(1).unwrap(), 20);
417 assert_eq!(p.divider_at(2).unwrap(), 30);
418 }
419
420 #[test]
421 fn duplicate_divider_rejected() {
422 let mut p = InteriorPage::empty(0);
423 p.insert_divider(10, 1).unwrap();
424 let err = p.insert_divider(10, 99).unwrap_err();
425 assert!(format!("{err}").contains("duplicate interior divider"));
426 }
427
428 #[test]
429 fn bytes_round_trip() {
430 let mut p = InteriorPage::empty(99);
431 p.insert_divider(1, 10).unwrap();
432 p.insert_divider(5, 20).unwrap();
433 let bytes = *p.as_bytes();
434
435 let p2 = InteriorPage::from_bytes(&bytes);
436 assert_eq!(p2.rightmost_child(), 99);
437 assert_eq!(p2.slot_count(), 2);
438 assert_eq!(p2.cell_at(0).unwrap().divider_rowid, 1);
439 assert_eq!(p2.cell_at(0).unwrap().child_page, 10);
440 assert_eq!(p2.cell_at(1).unwrap().divider_rowid, 5);
441 assert_eq!(p2.cell_at(1).unwrap().child_page, 20);
442 }
443
444 #[test]
445 fn leftmost_child_picks_first_slot_or_rightmost() {
446 let p = InteriorPage::empty(42);
447 assert_eq!(p.leftmost_child().unwrap(), 42);
449
450 let mut q = InteriorPage::empty(99);
451 q.insert_divider(10, 1).unwrap();
452 q.insert_divider(20, 2).unwrap();
453 assert_eq!(q.leftmost_child().unwrap(), 1);
455 }
456
457 #[test]
458 fn many_dividers_fit_on_one_page() {
459 let mut p = InteriorPage::empty(0);
461 let mut inserted = 0usize;
462 for i in 1..1000 {
463 let cell = InteriorCell {
464 divider_rowid: i,
465 child_page: i as u32,
466 };
467 let size = cell.encode().len();
468 if !p.would_fit(size) {
469 break;
470 }
471 p.insert_divider(i, i as u32).unwrap();
472 inserted += 1;
473 }
474 assert!(inserted > 100);
476 }
477}