1use crate::error::{Result, SQLRiteError};
29use crate::sql::pager::cell::{Cell, KIND_LOCAL, KIND_OVERFLOW};
30use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
31use crate::sql::pager::pager::Pager;
32use crate::sql::pager::varint;
33
34pub const OVERFLOW_THRESHOLD: usize = PAYLOAD_PER_PAGE / 4;
38
39#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct OverflowRef {
44 pub rowid: i64,
45 pub total_body_len: u64,
48 pub first_overflow_page: u32,
50}
51
52impl OverflowRef {
53 pub fn encode(&self) -> Vec<u8> {
57 let mut body = Vec::with_capacity(1 + varint::MAX_VARINT_BYTES * 2 + 4);
58 body.push(KIND_OVERFLOW);
59 varint::write_i64(&mut body, self.rowid);
60 varint::write_u64(&mut body, self.total_body_len);
61 body.extend_from_slice(&self.first_overflow_page.to_le_bytes());
62
63 let mut out = Vec::with_capacity(body.len() + varint::MAX_VARINT_BYTES);
64 varint::write_u64(&mut out, body.len() as u64);
65 out.extend_from_slice(&body);
66 out
67 }
68
69 pub fn decode(buf: &[u8], pos: usize) -> Result<(OverflowRef, usize)> {
70 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
71 let body_start = pos + len_bytes;
72 let body_end = body_start
73 .checked_add(body_len as usize)
74 .ok_or_else(|| SQLRiteError::Internal("overflow ref length overflow".to_string()))?;
75 if body_end > buf.len() {
76 return Err(SQLRiteError::Internal(format!(
77 "overflow ref extends past buffer: needs {body_start}..{body_end}, have {}",
78 buf.len()
79 )));
80 }
81
82 let body = &buf[body_start..body_end];
83 if body.first().copied() != Some(KIND_OVERFLOW) {
84 return Err(SQLRiteError::Internal(format!(
85 "OverflowRef::decode called on non-overflow entry (kind_tag = {:#x})",
86 body.first().copied().unwrap_or(0)
87 )));
88 }
89 let mut cur = 1usize;
90 let (rowid, n) = varint::read_i64(body, cur)?;
91 cur += n;
92 let (total_body_len, n) = varint::read_u64(body, cur)?;
93 cur += n;
94 if cur + 4 > body.len() {
95 return Err(SQLRiteError::Internal(
96 "overflow ref truncated before first_overflow_page".to_string(),
97 ));
98 }
99 let first_overflow_page = u32::from_le_bytes(body[cur..cur + 4].try_into().unwrap());
100 cur += 4;
101 if cur != body.len() {
102 return Err(SQLRiteError::Internal(format!(
103 "overflow ref had {} trailing bytes",
104 body.len() - cur
105 )));
106 }
107 Ok((
108 OverflowRef {
109 rowid,
110 total_body_len,
111 first_overflow_page,
112 },
113 body_end - pos,
114 ))
115 }
116}
117
118#[derive(Debug, Clone, PartialEq)]
121pub enum PagedEntry {
122 Local(Cell),
123 Overflow(OverflowRef),
124}
125
126impl PagedEntry {
127 pub fn rowid(&self) -> i64 {
128 match self {
129 PagedEntry::Local(c) => c.rowid,
130 PagedEntry::Overflow(r) => r.rowid,
131 }
132 }
133
134 pub fn encode(&self) -> Result<Vec<u8>> {
135 match self {
136 PagedEntry::Local(c) => c.encode(),
137 PagedEntry::Overflow(r) => Ok(r.encode()),
138 }
139 }
140
141 pub fn decode(buf: &[u8], pos: usize) -> Result<(PagedEntry, usize)> {
151 let kind = Cell::peek_kind(buf, pos)?;
152 match kind {
153 KIND_LOCAL => {
154 let (c, n) = Cell::decode(buf, pos)?;
155 Ok((PagedEntry::Local(c), n))
156 }
157 KIND_OVERFLOW => {
158 let (r, n) = OverflowRef::decode(buf, pos)?;
159 Ok((PagedEntry::Overflow(r), n))
160 }
161 other => Err(SQLRiteError::Internal(format!(
162 "PagedEntry::decode at offset {pos} got kind tag {other:#x} ({}); \
163 expected KIND_LOCAL (0x01) or KIND_OVERFLOW (0x02). \
164 The caller is reading a {} page with the table-leaf decoder.",
165 kind_name(other),
166 kind_btree_hint(other),
167 ))),
168 }
169 }
170}
171
172fn kind_name(tag: u8) -> &'static str {
175 match tag {
176 crate::sql::pager::cell::KIND_LOCAL => "KIND_LOCAL",
177 crate::sql::pager::cell::KIND_OVERFLOW => "KIND_OVERFLOW",
178 crate::sql::pager::cell::KIND_INTERIOR => "KIND_INTERIOR",
179 crate::sql::pager::cell::KIND_INDEX => "KIND_INDEX",
180 crate::sql::pager::cell::KIND_HNSW => "KIND_HNSW",
181 crate::sql::pager::cell::KIND_FTS_POSTING => "KIND_FTS_POSTING",
182 _ => "unknown kind",
183 }
184}
185
186fn kind_btree_hint(tag: u8) -> &'static str {
188 match tag {
189 crate::sql::pager::cell::KIND_INTERIOR => "B-Tree interior",
190 crate::sql::pager::cell::KIND_INDEX => "secondary-index",
191 crate::sql::pager::cell::KIND_HNSW => "HNSW",
192 crate::sql::pager::cell::KIND_FTS_POSTING => "FTS",
193 _ => "non-table",
194 }
195}
196
197pub fn write_overflow_chain(
206 pager: &mut Pager,
207 bytes: &[u8],
208 alloc: &mut crate::sql::pager::allocator::PageAllocator,
209) -> Result<u32> {
210 if bytes.is_empty() {
211 return Err(SQLRiteError::Internal(
212 "refusing to write an empty overflow chain — caller should inline instead".to_string(),
213 ));
214 }
215 let chunks: Vec<&[u8]> = bytes.chunks(PAYLOAD_PER_PAGE).collect();
218 let pages: Vec<u32> = (0..chunks.len()).map(|_| alloc.allocate()).collect();
219 for (i, chunk) in chunks.iter().enumerate() {
220 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
221 pager.stage_page(pages[i], encode_overflow_page(next, chunk)?);
222 }
223 Ok(pages[0])
224}
225
226pub fn read_overflow_chain(pager: &Pager, first_page: u32, total_body_len: u64) -> Result<Vec<u8>> {
231 let mut out = Vec::with_capacity(total_body_len as usize);
232 let mut current = first_page;
233 while current != 0 {
234 let raw = pager.read_page(current).ok_or_else(|| {
235 SQLRiteError::Internal(format!("overflow chain references missing page {current}"))
236 })?;
237 let ty_byte = raw[0];
238 if ty_byte != PageType::Overflow as u8 {
239 return Err(SQLRiteError::Internal(format!(
240 "page {current} was supposed to be Overflow but is type {ty_byte}"
241 )));
242 }
243 let next = u32::from_le_bytes(raw[1..5].try_into().unwrap());
244 let payload_len = u16::from_le_bytes(raw[5..7].try_into().unwrap()) as usize;
245 if payload_len > PAYLOAD_PER_PAGE {
246 return Err(SQLRiteError::Internal(format!(
247 "overflow page {current} reports payload_len {payload_len} > max"
248 )));
249 }
250 out.extend_from_slice(&raw[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + payload_len]);
251 current = next;
252 }
253 if out.len() as u64 != total_body_len {
254 return Err(SQLRiteError::Internal(format!(
255 "overflow chain produced {} bytes, OverflowRef claimed {total_body_len}",
256 out.len()
257 )));
258 }
259 Ok(out)
260}
261
262fn encode_overflow_page(next: u32, payload: &[u8]) -> Result<[u8; PAGE_SIZE]> {
265 if payload.len() > PAYLOAD_PER_PAGE {
266 return Err(SQLRiteError::Internal(format!(
267 "overflow page payload {} exceeds max {PAYLOAD_PER_PAGE}",
268 payload.len()
269 )));
270 }
271 let mut buf = [0u8; PAGE_SIZE];
272 buf[0] = PageType::Overflow as u8;
273 buf[1..5].copy_from_slice(&next.to_le_bytes());
274 buf[5..7].copy_from_slice(&(payload.len() as u16).to_le_bytes());
275 buf[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + payload.len()].copy_from_slice(payload);
276 Ok(buf)
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::sql::db::table::Value;
283
284 fn tmp_path(name: &str) -> std::path::PathBuf {
285 let mut p = std::env::temp_dir();
286 let pid = std::process::id();
287 let nanos = std::time::SystemTime::now()
288 .duration_since(std::time::UNIX_EPOCH)
289 .map(|d| d.as_nanos())
290 .unwrap_or(0);
291 p.push(format!("sqlrite-overflow-{pid}-{nanos}-{name}.sqlrite"));
292 p
293 }
294
295 #[test]
296 fn overflow_ref_round_trip() {
297 let r = OverflowRef {
298 rowid: 42,
299 total_body_len: 123_456,
300 first_overflow_page: 7,
301 };
302 let bytes = r.encode();
303 let (back, consumed) = OverflowRef::decode(&bytes, 0).unwrap();
304 assert_eq!(back, r);
305 assert_eq!(consumed, bytes.len());
306 }
307
308 #[test]
309 fn paged_entry_dispatches_on_kind() {
310 let local = Cell::new(1, vec![Some(Value::Integer(10))]);
311 let local_bytes = local.encode().unwrap();
312 let (decoded, _) = PagedEntry::decode(&local_bytes, 0).unwrap();
313 assert_eq!(decoded, PagedEntry::Local(local));
314
315 let overflow = OverflowRef {
316 rowid: 2,
317 total_body_len: 5000,
318 first_overflow_page: 13,
319 };
320 let overflow_bytes = overflow.encode();
321 let (decoded, _) = PagedEntry::decode(&overflow_bytes, 0).unwrap();
322 assert_eq!(decoded, PagedEntry::Overflow(overflow));
323 }
324
325 #[test]
331 fn paged_entry_decode_rejects_index_kind_with_clear_error() {
332 use crate::sql::pager::index_cell::IndexCell;
333 let ic = IndexCell::new(42, Value::Text("alice".into()));
334 let bytes = ic.encode().unwrap();
335 let err = PagedEntry::decode(&bytes, 0).unwrap_err();
336 let msg = format!("{err}");
337 assert!(
338 msg.contains("KIND_INDEX"),
339 "expected error to name KIND_INDEX, got: {msg}",
340 );
341 assert!(
342 msg.contains("secondary-index"),
343 "expected error to identify the secondary-index B-Tree, got: {msg}",
344 );
345 }
346
347 #[test]
350 fn paged_entry_decode_rejects_hnsw_and_fts_kinds() {
351 use crate::sql::pager::cell::{KIND_FTS_POSTING, KIND_HNSW};
352 for (tag, hint) in [(KIND_HNSW, "HNSW"), (KIND_FTS_POSTING, "FTS")] {
356 let bytes = vec![1u8, tag];
360 let err = PagedEntry::decode(&bytes, 0).unwrap_err();
361 let msg = format!("{err}");
362 assert!(
363 msg.contains(hint),
364 "expected error to identify the {hint} B-Tree, got: {msg}",
365 );
366 }
367 }
368
369 #[test]
370 fn peek_rowid_works_for_both_kinds() {
371 let local = Cell::new(99, vec![Some(Value::Integer(1))]);
372 let local_bytes = local.encode().unwrap();
373 assert_eq!(Cell::peek_rowid(&local_bytes, 0).unwrap(), 99);
374
375 let overflow = OverflowRef {
376 rowid: -7,
377 total_body_len: 100,
378 first_overflow_page: 42,
379 };
380 let overflow_bytes = overflow.encode();
381 assert_eq!(Cell::peek_rowid(&overflow_bytes, 0).unwrap(), -7);
382 }
383
384 #[test]
385 fn write_then_read_overflow_chain() {
386 let path = tmp_path("chain");
387 let mut pager = Pager::create(&path).unwrap();
388
389 let blob: Vec<u8> = (0..10_000).map(|i| (i % 251) as u8).collect();
391 let pages_needed = blob.len().div_ceil(PAYLOAD_PER_PAGE) as u32;
392 let mut alloc =
393 crate::sql::pager::allocator::PageAllocator::new(std::collections::VecDeque::new(), 10);
394 let start = write_overflow_chain(&mut pager, &blob, &mut alloc).unwrap();
395 assert_eq!(start, 10);
396 assert_eq!(alloc.high_water(), 10 + pages_needed);
398
399 pager
400 .commit(crate::sql::pager::header::DbHeader {
401 page_count: alloc.high_water(),
402 schema_root_page: 1,
403 format_version: crate::sql::pager::header::FORMAT_VERSION_BASELINE,
404 freelist_head: 0,
405 })
406 .unwrap();
407
408 drop(pager);
410 let pager = Pager::open(&path).unwrap();
411 let back = read_overflow_chain(&pager, start, blob.len() as u64).unwrap();
412 assert_eq!(back, blob);
413
414 let _ = std::fs::remove_file(&path);
415 }
416
417 #[test]
418 fn read_overflow_chain_rejects_length_mismatch() {
419 let path = tmp_path("mismatch");
420 let mut pager = Pager::create(&path).unwrap();
421 let blob = vec![1u8; 500];
422 let mut alloc =
423 crate::sql::pager::allocator::PageAllocator::new(std::collections::VecDeque::new(), 10);
424 let start = write_overflow_chain(&mut pager, &blob, &mut alloc).unwrap();
425 assert_eq!(start, 10);
426 pager
427 .commit(crate::sql::pager::header::DbHeader {
428 page_count: alloc.high_water(),
429 schema_root_page: 1,
430 format_version: crate::sql::pager::header::FORMAT_VERSION_BASELINE,
431 freelist_head: 0,
432 })
433 .unwrap();
434
435 let err = read_overflow_chain(&pager, start, 999).unwrap_err();
437 assert!(format!("{err}").contains("overflow chain produced"));
438
439 let _ = std::fs::remove_file(&path);
440 }
441
442 #[test]
443 fn empty_chain_is_rejected() {
444 let path = tmp_path("empty");
445 let mut pager = Pager::create(&path).unwrap();
446 let mut alloc =
447 crate::sql::pager::allocator::PageAllocator::new(std::collections::VecDeque::new(), 10);
448 let err = write_overflow_chain(&mut pager, &[], &mut alloc).unwrap_err();
449 assert!(format!("{err}").contains("empty overflow chain"));
450 let _ = std::fs::remove_file(&path);
451 }
452
453 #[test]
454 fn overflow_threshold_is_reasonable() {
455 assert!(OVERFLOW_THRESHOLD <= PAYLOAD_PER_PAGE / 4);
457 assert!(OVERFLOW_THRESHOLD > 200);
459 }
460}