Skip to main content

sqlrite/sql/pager/
table_page.rs

1//! Table-leaf page layout: a slot directory + cell content.
2//!
3//! A `TablePage` operates on the **payload portion** of a page (the 4089
4//! bytes that follow the 7-byte per-page header). The 7-byte header itself
5//! (page type, next-page, legacy payload-length) is written by the caller
6//! when the page is flushed to disk — this module doesn't touch it.
7//!
8//! Layout inside the payload area (`PAYLOAD_PER_PAGE` = 4089 bytes):
9//!
10//! ```text
11//!   offset 0..2    slot_count                 u16 LE
12//!   offset 2..4    cells_top                  u16 LE  (offset where cell
13//!                                                      content begins)
14//!   offset 4..     slot[0]..slot[n-1]         each u16 LE
15//!                                             points at the start of a
16//!                                             cell, ordered by rowid
17//!   [free space]
18//!   offset cells_top..PAYLOAD_PER_PAGE         cell content (unordered
19//!                                             in physical position,
20//!                                             ordered by slot)
21//! ```
22//!
23//! `cells_top` = PAYLOAD_PER_PAGE on an empty page. Every insert shifts it
24//! *down* (toward the slot directory) by the new cell's byte size; every
25//! insert also expands the slot directory *up* (away from the header) by
26//! 2 bytes.
27//!
28//! **Deletion leaves holes.** Removing a cell only strips its slot; the cell
29//! bytes stay in place. This keeps deletion O(n) in slots-to-shift, not
30//! O(page_size) in bytes-to-compact. The hole is reclaimed by a future
31//! `vacuum` pass (not yet implemented). `free_space` therefore underreports
32//! available space in fragmented pages — caller should treat its answer as
33//! "contiguous free bytes", which is what a cell write actually needs.
34
35use crate::error::{Result, SQLRiteError};
36use crate::sql::pager::cell::Cell;
37use crate::sql::pager::overflow::PagedEntry;
38use crate::sql::pager::page::PAYLOAD_PER_PAGE;
39
40/// Byte offsets of the two header fields inside the payload area.
41const OFFSET_SLOT_COUNT: usize = 0;
42const OFFSET_CELLS_TOP: usize = 2;
43const PAGE_PAYLOAD_HEADER: usize = 4;
44
45/// Size of one slot entry (pointer to a cell's start).
46const SLOT_SIZE: usize = 2;
47
48/// Result of searching for a rowid in the slot directory.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum Find {
51    /// A cell with this rowid lives at `slot`.
52    Found(usize),
53    /// No cell has this rowid; inserting would place it at `slot`.
54    NotFound(usize),
55}
56
57/// A table-leaf page. Owns a heap-allocated 4089-byte payload buffer.
58pub struct TablePage {
59    buf: Box<[u8; PAYLOAD_PER_PAGE]>,
60}
61
62impl TablePage {
63    /// Creates a fresh, empty page.
64    pub fn empty() -> Self {
65        let mut buf = Box::new([0u8; PAYLOAD_PER_PAGE]);
66        write_u16(&mut buf[..], OFFSET_SLOT_COUNT, 0);
67        write_u16(&mut buf[..], OFFSET_CELLS_TOP, PAYLOAD_PER_PAGE as u16);
68        Self { buf }
69    }
70
71    /// Rehydrates a page from its on-disk payload bytes.
72    pub fn from_bytes(bytes: &[u8; PAYLOAD_PER_PAGE]) -> Self {
73        Self {
74            buf: Box::new(*bytes),
75        }
76    }
77
78    /// Exposes the raw payload bytes for writing to the pager.
79    pub fn as_bytes(&self) -> &[u8; PAYLOAD_PER_PAGE] {
80        &self.buf
81    }
82
83    pub fn slot_count(&self) -> usize {
84        read_u16(&self.buf[..], OFFSET_SLOT_COUNT) as usize
85    }
86
87    fn set_slot_count(&mut self, n: usize) {
88        write_u16(&mut self.buf[..], OFFSET_SLOT_COUNT, n as u16);
89    }
90
91    pub fn cells_top(&self) -> usize {
92        read_u16(&self.buf[..], OFFSET_CELLS_TOP) as usize
93    }
94
95    fn set_cells_top(&mut self, v: usize) {
96        write_u16(&mut self.buf[..], OFFSET_CELLS_TOP, v as u16);
97    }
98
99    /// Start of the slot-directory region inside the payload.
100    const fn slots_start() -> usize {
101        PAGE_PAYLOAD_HEADER
102    }
103
104    fn slots_end(&self) -> usize {
105        Self::slots_start() + self.slot_count() * SLOT_SIZE
106    }
107
108    /// Contiguous free bytes between the end of the slot directory and
109    /// the top of the cell-content area.
110    pub fn free_space(&self) -> usize {
111        // cells_top is always >= slots_end in a well-formed page; clamp the
112        // subtraction to avoid panics if the page buffer is corrupt.
113        self.cells_top().saturating_sub(self.slots_end())
114    }
115
116    /// Returns true if a cell of the given encoded size fits, accounting
117    /// for the extra 2 bytes needed for a new slot entry.
118    pub fn would_fit(&self, cell_encoded_size: usize) -> bool {
119        cell_encoded_size.saturating_add(SLOT_SIZE) <= self.free_space()
120    }
121
122    /// Raw byte offset, within the payload, where the cell for `slot`
123    /// begins. Used by readers that decode the cell body with a type
124    /// other than `PagedEntry` — e.g., index-cell leaves carry
125    /// `IndexCell`s instead of row cells.
126    pub fn slot_offset_raw(&self, slot: usize) -> Result<usize> {
127        self.slot_offset(slot)
128    }
129
130    fn slot_offset(&self, slot: usize) -> Result<usize> {
131        if slot >= self.slot_count() {
132            return Err(SQLRiteError::Internal(format!(
133                "slot {slot} out of bounds (count = {})",
134                self.slot_count()
135            )));
136        }
137        let at = Self::slots_start() + slot * SLOT_SIZE;
138        Ok(read_u16(&self.buf[..], at) as usize)
139    }
140
141    fn set_slot_offset(&mut self, slot: usize, offset: usize) {
142        let at = Self::slots_start() + slot * SLOT_SIZE;
143        write_u16(&mut self.buf[..], at, offset as u16);
144    }
145
146    /// Reads the rowid of the cell at a given slot without decoding the
147    /// full cell body. Used by `find` to binary-search the directory.
148    pub fn rowid_at(&self, slot: usize) -> Result<i64> {
149        let offset = self.slot_offset(slot)?;
150        Cell::peek_rowid(&self.buf[..], offset)
151    }
152
153    /// Decodes the full cell stored at `slot`.
154    pub fn cell_at(&self, slot: usize) -> Result<Cell> {
155        let offset = self.slot_offset(slot)?;
156        let (cell, _) = Cell::decode(&self.buf[..], offset)?;
157        Ok(cell)
158    }
159
160    /// Binary-search for `rowid` in the slot directory.
161    pub fn find(&self, rowid: i64) -> Result<Find> {
162        let mut lo = 0usize;
163        let mut hi = self.slot_count();
164        while lo < hi {
165            let mid = lo + (hi - lo) / 2;
166            let mid_rowid = self.rowid_at(mid)?;
167            match mid_rowid.cmp(&rowid) {
168                std::cmp::Ordering::Equal => return Ok(Find::Found(mid)),
169                std::cmp::Ordering::Less => lo = mid + 1,
170                std::cmp::Ordering::Greater => hi = mid,
171            }
172        }
173        Ok(Find::NotFound(lo))
174    }
175
176    /// Returns the cell with this rowid, or `None` if it's not on the page.
177    pub fn lookup(&self, rowid: i64) -> Result<Option<Cell>> {
178        match self.find(rowid)? {
179            Find::Found(slot) => Ok(Some(self.cell_at(slot)?)),
180            Find::NotFound(_) => Ok(None),
181        }
182    }
183
184    /// Iterates `(rowid, Cell)` pairs in ascending rowid order. This is
185    /// O(N × cell-decode) — only use it when you actually need the bodies.
186    pub fn iter(&self) -> TablePageIter<'_> {
187        TablePageIter { page: self, pos: 0 }
188    }
189
190    /// Inserts `cell` as a local entry in rowid order. See
191    /// [`TablePage::insert_entry`] for the lower-level primitive that also
192    /// accepts overflow pointers.
193    pub fn insert(&mut self, cell: &Cell) -> Result<()> {
194        let encoded = cell.encode()?;
195        self.insert_entry(cell.rowid, &encoded)
196    }
197
198    /// Inserts either kind of paged entry. Delegates to
199    /// [`TablePage::insert_entry`] after encoding.
200    pub fn insert_paged_entry(&mut self, entry: &PagedEntry) -> Result<()> {
201        let encoded = entry.encode()?;
202        self.insert_entry(entry.rowid(), &encoded)
203    }
204
205    /// Inserts pre-encoded bytes at the slot that keeps the directory in
206    /// rowid order. Fails with `Internal("page full")` if the bytes plus a
207    /// new slot wouldn't fit, and with `Internal("duplicate rowid")` if a
208    /// cell or overflow pointer with the same rowid already lives on the
209    /// page. Callers should check `would_fit(encoded.len())` first — this
210    /// method asserts the fit.
211    pub fn insert_entry(&mut self, rowid: i64, encoded: &[u8]) -> Result<()> {
212        match self.find(rowid)? {
213            Find::Found(_) => Err(SQLRiteError::Internal(format!(
214                "duplicate rowid {rowid} — caller must delete before re-inserting"
215            ))),
216            Find::NotFound(insert_at) => {
217                if !self.would_fit(encoded.len()) {
218                    return Err(SQLRiteError::Internal(format!(
219                        "page full: entry of {} bytes + slot wouldn't fit in {} bytes of free space",
220                        encoded.len(),
221                        self.free_space()
222                    )));
223                }
224
225                // Write entry content at the new cells_top.
226                let new_cells_top = self.cells_top() - encoded.len();
227                self.buf[new_cells_top..new_cells_top + encoded.len()].copy_from_slice(encoded);
228                self.set_cells_top(new_cells_top);
229
230                // Shift slot entries [insert_at..n) up by one to make room,
231                // then write the new slot and bump the count.
232                let old_count = self.slot_count();
233                let shift_start = Self::slots_start() + insert_at * SLOT_SIZE;
234                let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
235                self.buf
236                    .copy_within(shift_start..shift_end, shift_start + SLOT_SIZE);
237                self.set_slot_count(old_count + 1);
238                self.set_slot_offset(insert_at, new_cells_top);
239                Ok(())
240            }
241        }
242    }
243
244    /// Decodes the paged entry at `slot`. Either a local cell or an
245    /// overflow pointer.
246    pub fn entry_at(&self, slot: usize) -> Result<PagedEntry> {
247        let offset = self.slot_offset(slot)?;
248        let (entry, _) = PagedEntry::decode(&self.buf[..], offset)?;
249        Ok(entry)
250    }
251
252    /// Removes the cell with `rowid`. Returns `Ok(true)` if it was found
253    /// and removed, `Ok(false)` if the page didn't contain it. The cell's
254    /// bytes stay in place — only its slot is dropped.
255    pub fn delete(&mut self, rowid: i64) -> Result<bool> {
256        let slot = match self.find(rowid)? {
257            Find::Found(s) => s,
258            Find::NotFound(_) => return Ok(false),
259        };
260        let old_count = self.slot_count();
261        // Shift slots [slot+1..n) down by one.
262        let shift_start = Self::slots_start() + (slot + 1) * SLOT_SIZE;
263        let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
264        self.buf
265            .copy_within(shift_start..shift_end, shift_start - SLOT_SIZE);
266        self.set_slot_count(old_count - 1);
267        Ok(true)
268    }
269}
270
271pub struct TablePageIter<'a> {
272    page: &'a TablePage,
273    pos: usize,
274}
275
276impl<'a> Iterator for TablePageIter<'a> {
277    type Item = Result<Cell>;
278
279    fn next(&mut self) -> Option<Self::Item> {
280        if self.pos >= self.page.slot_count() {
281            return None;
282        }
283        let cell = self.page.cell_at(self.pos);
284        self.pos += 1;
285        Some(cell)
286    }
287}
288
289fn read_u16(buf: &[u8], offset: usize) -> u16 {
290    u16::from_le_bytes([buf[offset], buf[offset + 1]])
291}
292
293fn write_u16(buf: &mut [u8], offset: usize, value: u16) {
294    let bytes = value.to_le_bytes();
295    buf[offset] = bytes[0];
296    buf[offset + 1] = bytes[1];
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::sql::db::table::Value;
303
304    fn int_cell(rowid: i64, v: i64) -> Cell {
305        Cell::new(rowid, vec![Some(Value::Integer(v))])
306    }
307
308    fn text_cell(rowid: i64, s: &str) -> Cell {
309        Cell::new(rowid, vec![Some(Value::Text(s.to_string()))])
310    }
311
312    #[test]
313    fn empty_page_metadata_matches_spec() {
314        let p = TablePage::empty();
315        assert_eq!(p.slot_count(), 0);
316        assert_eq!(p.cells_top(), PAYLOAD_PER_PAGE);
317        // Free = PAYLOAD_PER_PAGE - 4 (payload header, no slots yet).
318        assert_eq!(p.free_space(), PAYLOAD_PER_PAGE - PAGE_PAYLOAD_HEADER);
319    }
320
321    #[test]
322    fn insert_lookup_iterate() {
323        let mut p = TablePage::empty();
324        p.insert(&int_cell(2, 20)).unwrap();
325        p.insert(&int_cell(1, 10)).unwrap();
326        p.insert(&int_cell(3, 30)).unwrap();
327        assert_eq!(p.slot_count(), 3);
328
329        // Lookups return the right cells regardless of insertion order.
330        assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
331        assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 20)));
332        assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
333        assert_eq!(p.lookup(4).unwrap(), None);
334
335        // Iteration yields cells in rowid-ascending order.
336        let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
337        assert_eq!(got, vec![int_cell(1, 10), int_cell(2, 20), int_cell(3, 30)]);
338    }
339
340    #[test]
341    fn duplicate_rowid_is_rejected() {
342        let mut p = TablePage::empty();
343        p.insert(&int_cell(1, 100)).unwrap();
344        let err = p.insert(&int_cell(1, 200)).unwrap_err();
345        assert!(format!("{err}").contains("duplicate rowid"));
346    }
347
348    #[test]
349    fn delete_then_reinsert() {
350        let mut p = TablePage::empty();
351        p.insert(&int_cell(1, 10)).unwrap();
352        p.insert(&int_cell(2, 20)).unwrap();
353        p.insert(&int_cell(3, 30)).unwrap();
354
355        assert!(p.delete(2).unwrap());
356        assert_eq!(p.slot_count(), 2);
357        assert_eq!(p.lookup(2).unwrap(), None);
358
359        // Lookups on the survivors still work.
360        assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
361        assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
362
363        // Rowid 2 can be re-inserted (with different body, even).
364        p.insert(&int_cell(2, 999)).unwrap();
365        assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 999)));
366    }
367
368    #[test]
369    fn delete_missing_rowid_reports_false() {
370        let mut p = TablePage::empty();
371        p.insert(&int_cell(1, 10)).unwrap();
372        assert!(!p.delete(999).unwrap());
373        assert_eq!(p.slot_count(), 1);
374    }
375
376    #[test]
377    fn bytes_round_trip_through_from_bytes() {
378        let mut p = TablePage::empty();
379        p.insert(&text_cell(1, "alpha")).unwrap();
380        p.insert(&text_cell(2, "beta")).unwrap();
381        p.insert(&text_cell(3, "gamma")).unwrap();
382
383        let bytes = *p.as_bytes();
384        let p2 = TablePage::from_bytes(&bytes);
385        assert_eq!(p2.slot_count(), 3);
386        assert_eq!(p2.lookup(1).unwrap(), Some(text_cell(1, "alpha")));
387        assert_eq!(p2.lookup(2).unwrap(), Some(text_cell(2, "beta")));
388        assert_eq!(p2.lookup(3).unwrap(), Some(text_cell(3, "gamma")));
389    }
390
391    #[test]
392    fn find_returns_insertion_slot() {
393        let mut p = TablePage::empty();
394        p.insert(&int_cell(10, 0)).unwrap();
395        p.insert(&int_cell(20, 0)).unwrap();
396        p.insert(&int_cell(30, 0)).unwrap();
397
398        // Before the first element.
399        assert_eq!(p.find(5).unwrap(), Find::NotFound(0));
400        // Between two.
401        assert_eq!(p.find(15).unwrap(), Find::NotFound(1));
402        // After the last.
403        assert_eq!(p.find(999).unwrap(), Find::NotFound(3));
404        // Exact hit.
405        assert_eq!(p.find(20).unwrap(), Find::Found(1));
406    }
407
408    #[test]
409    fn would_fit_gates_insert() {
410        let mut p = TablePage::empty();
411        // Fill the page with ~200-byte cells until it says no.
412        let body = "x".repeat(200);
413        let mut rid = 0i64;
414        let mut inserted = 0usize;
415        loop {
416            rid += 1;
417            let c = text_cell(rid, &body);
418            let size = c.encoded_len().unwrap();
419            if !p.would_fit(size) {
420                break;
421            }
422            p.insert(&c).unwrap();
423            inserted += 1;
424        }
425        // Sanity: a 4089-byte page minus overhead should hold 15-20 such cells.
426        assert!(inserted > 10 && inserted < 50);
427
428        // After rejecting, the next insert *must* fail (free_space too small).
429        let overflow = text_cell(rid, &body);
430        let err = p.insert(&overflow).unwrap_err();
431        assert!(format!("{err}").contains("page full"));
432    }
433
434    #[test]
435    fn free_space_tracks_inserts_and_deletes_by_slot_only() {
436        // Deletion leaves holes — free_space drops after insert but doesn't
437        // fully recover after delete. Document the behavior explicitly.
438        let mut p = TablePage::empty();
439        let initial = p.free_space();
440
441        let c = int_cell(1, 42);
442        let cell_size = c.encoded_len().unwrap();
443        p.insert(&c).unwrap();
444        let after_insert = p.free_space();
445        assert_eq!(after_insert, initial - cell_size - SLOT_SIZE);
446
447        p.delete(1).unwrap();
448        let after_delete = p.free_space();
449        // We recovered the slot (2 bytes) but the cell content is still there.
450        assert_eq!(after_delete, initial - cell_size);
451    }
452
453    #[test]
454    fn mixed_types_and_nulls_round_trip_on_a_page() {
455        let mut p = TablePage::empty();
456        p.insert(&Cell::new(
457            1,
458            vec![
459                Some(Value::Integer(10)),
460                Some(Value::Text("hi".to_string())),
461                None,
462            ],
463        ))
464        .unwrap();
465        p.insert(&Cell::new(
466            2,
467            vec![None, Some(Value::Real(2.5)), Some(Value::Bool(true))],
468        ))
469        .unwrap();
470
471        let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
472        assert_eq!(got.len(), 2);
473        assert_eq!(got[0].rowid, 1);
474        assert_eq!(got[0].values[2], None);
475        assert_eq!(got[1].rowid, 2);
476        assert_eq!(got[1].values[0], None);
477    }
478
479    #[test]
480    fn peek_rowid_matches_cell_at() {
481        let mut p = TablePage::empty();
482        p.insert(&int_cell(42, 0)).unwrap();
483        p.insert(&int_cell(7, 0)).unwrap();
484        p.insert(&int_cell(100, 0)).unwrap();
485        // Slot order = rowid order: 7, 42, 100.
486        assert_eq!(p.rowid_at(0).unwrap(), 7);
487        assert_eq!(p.rowid_at(1).unwrap(), 42);
488        assert_eq!(p.rowid_at(2).unwrap(), 100);
489    }
490}