1use crate::error::{Result, SQLRiteError};
36use crate::sql::pager::cell::Cell;
37use crate::sql::pager::overflow::PagedEntry;
38use crate::sql::pager::page::PAYLOAD_PER_PAGE;
39
40const OFFSET_SLOT_COUNT: usize = 0;
42const OFFSET_CELLS_TOP: usize = 2;
43const PAGE_PAYLOAD_HEADER: usize = 4;
44
45const SLOT_SIZE: usize = 2;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum Find {
51 Found(usize),
53 NotFound(usize),
55}
56
57pub struct TablePage {
59 buf: Box<[u8; PAYLOAD_PER_PAGE]>,
60}
61
62impl TablePage {
63 pub fn empty() -> Self {
65 let mut buf = Box::new([0u8; PAYLOAD_PER_PAGE]);
66 write_u16(&mut buf[..], OFFSET_SLOT_COUNT, 0);
67 write_u16(&mut buf[..], OFFSET_CELLS_TOP, PAYLOAD_PER_PAGE as u16);
68 Self { buf }
69 }
70
71 pub fn from_bytes(bytes: &[u8; PAYLOAD_PER_PAGE]) -> Self {
73 Self {
74 buf: Box::new(*bytes),
75 }
76 }
77
78 pub fn as_bytes(&self) -> &[u8; PAYLOAD_PER_PAGE] {
80 &self.buf
81 }
82
83 pub fn slot_count(&self) -> usize {
84 read_u16(&self.buf[..], OFFSET_SLOT_COUNT) as usize
85 }
86
87 fn set_slot_count(&mut self, n: usize) {
88 write_u16(&mut self.buf[..], OFFSET_SLOT_COUNT, n as u16);
89 }
90
91 pub fn cells_top(&self) -> usize {
92 read_u16(&self.buf[..], OFFSET_CELLS_TOP) as usize
93 }
94
95 fn set_cells_top(&mut self, v: usize) {
96 write_u16(&mut self.buf[..], OFFSET_CELLS_TOP, v as u16);
97 }
98
99 const fn slots_start() -> usize {
101 PAGE_PAYLOAD_HEADER
102 }
103
104 fn slots_end(&self) -> usize {
105 Self::slots_start() + self.slot_count() * SLOT_SIZE
106 }
107
108 pub fn free_space(&self) -> usize {
111 self.cells_top().saturating_sub(self.slots_end())
114 }
115
116 pub fn would_fit(&self, cell_encoded_size: usize) -> bool {
119 cell_encoded_size.saturating_add(SLOT_SIZE) <= self.free_space()
120 }
121
122 pub fn slot_offset_raw(&self, slot: usize) -> Result<usize> {
127 self.slot_offset(slot)
128 }
129
130 fn slot_offset(&self, slot: usize) -> Result<usize> {
131 if slot >= self.slot_count() {
132 return Err(SQLRiteError::Internal(format!(
133 "slot {slot} out of bounds (count = {})",
134 self.slot_count()
135 )));
136 }
137 let at = Self::slots_start() + slot * SLOT_SIZE;
138 Ok(read_u16(&self.buf[..], at) as usize)
139 }
140
141 fn set_slot_offset(&mut self, slot: usize, offset: usize) {
142 let at = Self::slots_start() + slot * SLOT_SIZE;
143 write_u16(&mut self.buf[..], at, offset as u16);
144 }
145
146 pub fn rowid_at(&self, slot: usize) -> Result<i64> {
149 let offset = self.slot_offset(slot)?;
150 Cell::peek_rowid(&self.buf[..], offset)
151 }
152
153 pub fn cell_at(&self, slot: usize) -> Result<Cell> {
155 let offset = self.slot_offset(slot)?;
156 let (cell, _) = Cell::decode(&self.buf[..], offset)?;
157 Ok(cell)
158 }
159
160 pub fn find(&self, rowid: i64) -> Result<Find> {
162 let mut lo = 0usize;
163 let mut hi = self.slot_count();
164 while lo < hi {
165 let mid = lo + (hi - lo) / 2;
166 let mid_rowid = self.rowid_at(mid)?;
167 match mid_rowid.cmp(&rowid) {
168 std::cmp::Ordering::Equal => return Ok(Find::Found(mid)),
169 std::cmp::Ordering::Less => lo = mid + 1,
170 std::cmp::Ordering::Greater => hi = mid,
171 }
172 }
173 Ok(Find::NotFound(lo))
174 }
175
176 pub fn lookup(&self, rowid: i64) -> Result<Option<Cell>> {
178 match self.find(rowid)? {
179 Find::Found(slot) => Ok(Some(self.cell_at(slot)?)),
180 Find::NotFound(_) => Ok(None),
181 }
182 }
183
184 pub fn iter(&self) -> TablePageIter<'_> {
187 TablePageIter { page: self, pos: 0 }
188 }
189
190 pub fn insert(&mut self, cell: &Cell) -> Result<()> {
194 let encoded = cell.encode()?;
195 self.insert_entry(cell.rowid, &encoded)
196 }
197
198 pub fn insert_paged_entry(&mut self, entry: &PagedEntry) -> Result<()> {
201 let encoded = entry.encode()?;
202 self.insert_entry(entry.rowid(), &encoded)
203 }
204
205 pub fn insert_entry(&mut self, rowid: i64, encoded: &[u8]) -> Result<()> {
212 match self.find(rowid)? {
213 Find::Found(_) => Err(SQLRiteError::Internal(format!(
214 "duplicate rowid {rowid} — caller must delete before re-inserting"
215 ))),
216 Find::NotFound(insert_at) => {
217 if !self.would_fit(encoded.len()) {
218 return Err(SQLRiteError::Internal(format!(
219 "page full: entry of {} bytes + slot wouldn't fit in {} bytes of free space",
220 encoded.len(),
221 self.free_space()
222 )));
223 }
224
225 let new_cells_top = self.cells_top() - encoded.len();
227 self.buf[new_cells_top..new_cells_top + encoded.len()].copy_from_slice(encoded);
228 self.set_cells_top(new_cells_top);
229
230 let old_count = self.slot_count();
233 let shift_start = Self::slots_start() + insert_at * SLOT_SIZE;
234 let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
235 self.buf
236 .copy_within(shift_start..shift_end, shift_start + SLOT_SIZE);
237 self.set_slot_count(old_count + 1);
238 self.set_slot_offset(insert_at, new_cells_top);
239 Ok(())
240 }
241 }
242 }
243
244 pub fn entry_at(&self, slot: usize) -> Result<PagedEntry> {
247 let offset = self.slot_offset(slot)?;
248 let (entry, _) = PagedEntry::decode(&self.buf[..], offset)?;
249 Ok(entry)
250 }
251
252 pub fn delete(&mut self, rowid: i64) -> Result<bool> {
256 let slot = match self.find(rowid)? {
257 Find::Found(s) => s,
258 Find::NotFound(_) => return Ok(false),
259 };
260 let old_count = self.slot_count();
261 let shift_start = Self::slots_start() + (slot + 1) * SLOT_SIZE;
263 let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
264 self.buf
265 .copy_within(shift_start..shift_end, shift_start - SLOT_SIZE);
266 self.set_slot_count(old_count - 1);
267 Ok(true)
268 }
269}
270
271pub struct TablePageIter<'a> {
272 page: &'a TablePage,
273 pos: usize,
274}
275
276impl<'a> Iterator for TablePageIter<'a> {
277 type Item = Result<Cell>;
278
279 fn next(&mut self) -> Option<Self::Item> {
280 if self.pos >= self.page.slot_count() {
281 return None;
282 }
283 let cell = self.page.cell_at(self.pos);
284 self.pos += 1;
285 Some(cell)
286 }
287}
288
289fn read_u16(buf: &[u8], offset: usize) -> u16 {
290 u16::from_le_bytes([buf[offset], buf[offset + 1]])
291}
292
293fn write_u16(buf: &mut [u8], offset: usize, value: u16) {
294 let bytes = value.to_le_bytes();
295 buf[offset] = bytes[0];
296 buf[offset + 1] = bytes[1];
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::sql::db::table::Value;
303
304 fn int_cell(rowid: i64, v: i64) -> Cell {
305 Cell::new(rowid, vec![Some(Value::Integer(v))])
306 }
307
308 fn text_cell(rowid: i64, s: &str) -> Cell {
309 Cell::new(rowid, vec![Some(Value::Text(s.to_string()))])
310 }
311
312 #[test]
313 fn empty_page_metadata_matches_spec() {
314 let p = TablePage::empty();
315 assert_eq!(p.slot_count(), 0);
316 assert_eq!(p.cells_top(), PAYLOAD_PER_PAGE);
317 assert_eq!(p.free_space(), PAYLOAD_PER_PAGE - PAGE_PAYLOAD_HEADER);
319 }
320
321 #[test]
322 fn insert_lookup_iterate() {
323 let mut p = TablePage::empty();
324 p.insert(&int_cell(2, 20)).unwrap();
325 p.insert(&int_cell(1, 10)).unwrap();
326 p.insert(&int_cell(3, 30)).unwrap();
327 assert_eq!(p.slot_count(), 3);
328
329 assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
331 assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 20)));
332 assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
333 assert_eq!(p.lookup(4).unwrap(), None);
334
335 let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
337 assert_eq!(got, vec![int_cell(1, 10), int_cell(2, 20), int_cell(3, 30)]);
338 }
339
340 #[test]
341 fn duplicate_rowid_is_rejected() {
342 let mut p = TablePage::empty();
343 p.insert(&int_cell(1, 100)).unwrap();
344 let err = p.insert(&int_cell(1, 200)).unwrap_err();
345 assert!(format!("{err}").contains("duplicate rowid"));
346 }
347
348 #[test]
349 fn delete_then_reinsert() {
350 let mut p = TablePage::empty();
351 p.insert(&int_cell(1, 10)).unwrap();
352 p.insert(&int_cell(2, 20)).unwrap();
353 p.insert(&int_cell(3, 30)).unwrap();
354
355 assert!(p.delete(2).unwrap());
356 assert_eq!(p.slot_count(), 2);
357 assert_eq!(p.lookup(2).unwrap(), None);
358
359 assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
361 assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
362
363 p.insert(&int_cell(2, 999)).unwrap();
365 assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 999)));
366 }
367
368 #[test]
369 fn delete_missing_rowid_reports_false() {
370 let mut p = TablePage::empty();
371 p.insert(&int_cell(1, 10)).unwrap();
372 assert!(!p.delete(999).unwrap());
373 assert_eq!(p.slot_count(), 1);
374 }
375
376 #[test]
377 fn bytes_round_trip_through_from_bytes() {
378 let mut p = TablePage::empty();
379 p.insert(&text_cell(1, "alpha")).unwrap();
380 p.insert(&text_cell(2, "beta")).unwrap();
381 p.insert(&text_cell(3, "gamma")).unwrap();
382
383 let bytes = *p.as_bytes();
384 let p2 = TablePage::from_bytes(&bytes);
385 assert_eq!(p2.slot_count(), 3);
386 assert_eq!(p2.lookup(1).unwrap(), Some(text_cell(1, "alpha")));
387 assert_eq!(p2.lookup(2).unwrap(), Some(text_cell(2, "beta")));
388 assert_eq!(p2.lookup(3).unwrap(), Some(text_cell(3, "gamma")));
389 }
390
391 #[test]
392 fn find_returns_insertion_slot() {
393 let mut p = TablePage::empty();
394 p.insert(&int_cell(10, 0)).unwrap();
395 p.insert(&int_cell(20, 0)).unwrap();
396 p.insert(&int_cell(30, 0)).unwrap();
397
398 assert_eq!(p.find(5).unwrap(), Find::NotFound(0));
400 assert_eq!(p.find(15).unwrap(), Find::NotFound(1));
402 assert_eq!(p.find(999).unwrap(), Find::NotFound(3));
404 assert_eq!(p.find(20).unwrap(), Find::Found(1));
406 }
407
408 #[test]
409 fn would_fit_gates_insert() {
410 let mut p = TablePage::empty();
411 let body = "x".repeat(200);
413 let mut rid = 0i64;
414 let mut inserted = 0usize;
415 loop {
416 rid += 1;
417 let c = text_cell(rid, &body);
418 let size = c.encoded_len().unwrap();
419 if !p.would_fit(size) {
420 break;
421 }
422 p.insert(&c).unwrap();
423 inserted += 1;
424 }
425 assert!(inserted > 10 && inserted < 50);
427
428 let overflow = text_cell(rid, &body);
430 let err = p.insert(&overflow).unwrap_err();
431 assert!(format!("{err}").contains("page full"));
432 }
433
434 #[test]
435 fn free_space_tracks_inserts_and_deletes_by_slot_only() {
436 let mut p = TablePage::empty();
439 let initial = p.free_space();
440
441 let c = int_cell(1, 42);
442 let cell_size = c.encoded_len().unwrap();
443 p.insert(&c).unwrap();
444 let after_insert = p.free_space();
445 assert_eq!(after_insert, initial - cell_size - SLOT_SIZE);
446
447 p.delete(1).unwrap();
448 let after_delete = p.free_space();
449 assert_eq!(after_delete, initial - cell_size);
451 }
452
453 #[test]
454 fn mixed_types_and_nulls_round_trip_on_a_page() {
455 let mut p = TablePage::empty();
456 p.insert(&Cell::new(
457 1,
458 vec![
459 Some(Value::Integer(10)),
460 Some(Value::Text("hi".to_string())),
461 None,
462 ],
463 ))
464 .unwrap();
465 p.insert(&Cell::new(
466 2,
467 vec![None, Some(Value::Real(2.5)), Some(Value::Bool(true))],
468 ))
469 .unwrap();
470
471 let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
472 assert_eq!(got.len(), 2);
473 assert_eq!(got[0].rowid, 1);
474 assert_eq!(got[0].values[2], None);
475 assert_eq!(got[1].rowid, 2);
476 assert_eq!(got[1].values[0], None);
477 }
478
479 #[test]
480 fn peek_rowid_matches_cell_at() {
481 let mut p = TablePage::empty();
482 p.insert(&int_cell(42, 0)).unwrap();
483 p.insert(&int_cell(7, 0)).unwrap();
484 p.insert(&int_cell(100, 0)).unwrap();
485 assert_eq!(p.rowid_at(0).unwrap(), 7);
487 assert_eq!(p.rowid_at(1).unwrap(), 42);
488 assert_eq!(p.rowid_at(2).unwrap(), 100);
489 }
490}