1use crate::error::{Result, SQLRiteError};
46use crate::sql::db::table::Value;
47use crate::sql::pager::varint;
48
49pub const KIND_LOCAL: u8 = 0x01;
57pub const KIND_OVERFLOW: u8 = 0x02;
58pub const KIND_INTERIOR: u8 = 0x03;
59pub const KIND_INDEX: u8 = 0x04;
60pub const KIND_HNSW: u8 = 0x05;
75
76pub const KIND_FTS_POSTING: u8 = 0x06;
99
100pub mod tag {
102 pub const INTEGER: u8 = 0;
103 pub const REAL: u8 = 1;
104 pub const TEXT: u8 = 2;
105 pub const BOOL: u8 = 3;
106 pub const VECTOR: u8 = 4;
111}
112
113#[derive(Debug, Clone, PartialEq)]
118pub struct Cell {
119 pub rowid: i64,
120 pub values: Vec<Option<Value>>,
121}
122
123impl Cell {
124 pub fn new(rowid: i64, values: Vec<Option<Value>>) -> Self {
125 Self { rowid, values }
126 }
127
128 pub fn encode(&self) -> Result<Vec<u8>> {
133 let mut body = Vec::new();
136 body.push(KIND_LOCAL);
137 varint::write_i64(&mut body, self.rowid);
138 varint::write_u64(&mut body, self.values.len() as u64);
139 encode_null_bitmap(&mut body, &self.values);
140 for v in self.values.iter().flatten() {
141 encode_value(&mut body, v)?;
142 }
143
144 let mut out = Vec::with_capacity(body.len() + varint::MAX_VARINT_BYTES);
145 varint::write_u64(&mut out, body.len() as u64);
146 out.extend_from_slice(&body);
147 Ok(out)
148 }
149
150 pub fn encoded_len(&self) -> Result<usize> {
153 Ok(self.encode()?.len())
157 }
158
159 pub fn peek_rowid(buf: &[u8], pos: usize) -> Result<i64> {
164 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
165 let body_start = pos + len_bytes;
166 if body_start >= buf.len() {
168 return Err(SQLRiteError::Internal(
169 "paged cell truncated before kind tag".to_string(),
170 ));
171 }
172 let (rowid, _) = varint::read_i64(buf, body_start + 1)?;
173 Ok(rowid)
174 }
175
176 pub fn encoded_size_at(buf: &[u8], pos: usize) -> Result<usize> {
180 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
181 Ok(len_bytes + body_len as usize)
182 }
183
184 pub fn peek_kind(buf: &[u8], pos: usize) -> Result<u8> {
187 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
188 let kind_pos = pos + len_bytes;
189 buf.get(kind_pos).copied().ok_or_else(|| {
190 SQLRiteError::Internal("paged cell truncated before kind tag".to_string())
191 })
192 }
193
194 pub fn decode(buf: &[u8], pos: usize) -> Result<(Cell, usize)> {
199 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
200 let body_start = pos + len_bytes;
201 let body_end = body_start
202 .checked_add(body_len as usize)
203 .ok_or_else(|| SQLRiteError::Internal("cell length overflow".to_string()))?;
204 if body_end > buf.len() {
205 return Err(SQLRiteError::Internal(format!(
206 "cell extends past buffer: needs bytes {body_start}..{body_end}, have {}",
207 buf.len()
208 )));
209 }
210
211 let body = &buf[body_start..body_end];
212 if body.is_empty() {
213 return Err(SQLRiteError::Internal(
214 "paged cell body is empty (no kind tag)".to_string(),
215 ));
216 }
217 let kind_tag = body[0];
218 if kind_tag != KIND_LOCAL {
219 return Err(SQLRiteError::Internal(format!(
220 "Cell::decode called on non-local entry (kind_tag = {kind_tag:#x})"
221 )));
222 }
223 let mut cur = 1usize;
224
225 let (rowid, n) = varint::read_i64(body, cur)?;
226 cur += n;
227 let (col_count_u, n) = varint::read_u64(body, cur)?;
228 cur += n;
229 let col_count = col_count_u as usize;
230
231 let bitmap_bytes = col_count.div_ceil(8);
232 if cur + bitmap_bytes > body.len() {
233 return Err(SQLRiteError::Internal(
234 "cell body truncated before null bitmap ends".to_string(),
235 ));
236 }
237 let bitmap = &body[cur..cur + bitmap_bytes];
238 cur += bitmap_bytes;
239
240 let mut values = Vec::with_capacity(col_count);
241 for col in 0..col_count {
242 if is_null(bitmap, col) {
243 values.push(None);
244 } else {
245 let (v, n) = decode_value(body, cur)?;
246 cur += n;
247 values.push(Some(v));
248 }
249 }
250
251 if cur != body.len() {
252 return Err(SQLRiteError::Internal(format!(
253 "cell body had {} trailing bytes after last value",
254 body.len() - cur
255 )));
256 }
257
258 Ok((Cell { rowid, values }, body_end - pos))
259 }
260}
261
262fn encode_null_bitmap(out: &mut Vec<u8>, values: &[Option<Value>]) {
263 let n = values.len().div_ceil(8);
264 let start = out.len();
265 out.resize(start + n, 0);
266 for (i, v) in values.iter().enumerate() {
267 if v.is_none() {
268 let byte_idx = start + (i / 8);
269 let bit = i % 8;
270 out[byte_idx] |= 1 << bit;
271 }
272 }
273}
274
275fn is_null(bitmap: &[u8], col: usize) -> bool {
276 let byte = col / 8;
277 let bit = col % 8;
278 bitmap.get(byte).is_some_and(|b| (b >> bit) & 1 == 1)
279}
280
281pub(super) fn encode_value(out: &mut Vec<u8>, value: &Value) -> Result<()> {
282 match value {
283 Value::Integer(i) => {
284 out.push(tag::INTEGER);
285 varint::write_i64(out, *i);
286 }
287 Value::Real(f) => {
288 out.push(tag::REAL);
289 out.extend_from_slice(&f.to_le_bytes());
290 }
291 Value::Text(s) => {
292 out.push(tag::TEXT);
293 let bytes = s.as_bytes();
294 varint::write_u64(out, bytes.len() as u64);
295 out.extend_from_slice(bytes);
296 }
297 Value::Bool(b) => {
298 out.push(tag::BOOL);
299 out.push(if *b { 1 } else { 0 });
300 }
301 Value::Vector(v) => {
302 out.push(tag::VECTOR);
303 varint::write_u64(out, v.len() as u64);
305 for x in v {
307 out.extend_from_slice(&x.to_le_bytes());
308 }
309 }
310 Value::Null => {
311 return Err(SQLRiteError::Internal(
312 "Null values are encoded via the null bitmap, not a value block".to_string(),
313 ));
314 }
315 }
316 Ok(())
317}
318
319pub(super) fn decode_value(buf: &[u8], pos: usize) -> Result<(Value, usize)> {
320 let tag = *buf
321 .get(pos)
322 .ok_or_else(|| SQLRiteError::Internal(format!("value block truncated at offset {pos}")))?;
323 let body_start = pos + 1;
324 match tag {
325 tag::INTEGER => {
326 let (v, n) = varint::read_i64(buf, body_start)?;
327 Ok((Value::Integer(v), 1 + n))
328 }
329 tag::REAL => {
330 let end = body_start + 8;
331 if end > buf.len() {
332 return Err(SQLRiteError::Internal(
333 "Real value truncated: needs 8 bytes".to_string(),
334 ));
335 }
336 let arr: [u8; 8] = buf[body_start..end].try_into().unwrap();
337 Ok((Value::Real(f64::from_le_bytes(arr)), 1 + 8))
338 }
339 tag::TEXT => {
340 let (len, n) = varint::read_u64(buf, body_start)?;
341 let text_start = body_start + n;
342 let text_end = text_start + (len as usize);
343 if text_end > buf.len() {
344 return Err(SQLRiteError::Internal("Text value truncated".to_string()));
345 }
346 let s = std::str::from_utf8(&buf[text_start..text_end])
347 .map_err(|e| SQLRiteError::Internal(format!("Text value is not valid UTF-8: {e}")))?
348 .to_string();
349 Ok((Value::Text(s), 1 + n + (len as usize)))
350 }
351 tag::BOOL => {
352 let byte = *buf
353 .get(body_start)
354 .ok_or_else(|| SQLRiteError::Internal("Bool value truncated".to_string()))?;
355 Ok((Value::Bool(byte != 0), 1 + 1))
356 }
357 tag::VECTOR => {
358 let (dim, n) = varint::read_u64(buf, body_start)?;
361 let dim = dim as usize;
362 let elements_start = body_start + n;
363 let elements_end = elements_start + dim * 4;
364 if elements_end > buf.len() {
365 return Err(SQLRiteError::Internal(format!(
366 "Vector value truncated: needs {dim} × 4 = {} bytes",
367 dim * 4
368 )));
369 }
370 let mut out = Vec::with_capacity(dim);
371 for i in 0..dim {
372 let off = elements_start + i * 4;
373 let arr: [u8; 4] = buf[off..off + 4].try_into().unwrap();
374 out.push(f32::from_le_bytes(arr));
375 }
376 Ok((Value::Vector(out), 1 + n + dim * 4))
377 }
378 other => Err(SQLRiteError::Internal(format!(
379 "unknown value tag {other:#x} at offset {pos}"
380 ))),
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 fn round_trip(cell: &Cell) {
389 let bytes = cell.encode().unwrap();
390 let (back, consumed) = Cell::decode(&bytes, 0).unwrap();
391 assert_eq!(&back, cell);
392 assert_eq!(consumed, bytes.len());
393 }
394
395 #[test]
396 fn empty_cell_no_columns() {
397 round_trip(&Cell::new(1, vec![]));
398 }
399
400 #[test]
401 fn integer_only_cell() {
402 round_trip(&Cell::new(
403 42,
404 vec![Some(Value::Integer(1)), Some(Value::Integer(-1000))],
405 ));
406 }
407
408 #[test]
409 fn mixed_types_cell() {
410 round_trip(&Cell::new(
411 100,
412 vec![
413 Some(Value::Integer(7)),
414 Some(Value::Text("hello".to_string())),
415 Some(Value::Real(2.5)),
419 Some(Value::Bool(true)),
420 ],
421 ));
422 }
423
424 #[test]
425 fn nulls_interspersed() {
426 round_trip(&Cell::new(
427 5,
428 vec![
429 Some(Value::Integer(1)),
430 None,
431 Some(Value::Text("middle".to_string())),
432 None,
433 None,
434 Some(Value::Bool(false)),
435 ],
436 ));
437 }
438
439 #[test]
440 fn all_null_cell() {
441 round_trip(&Cell::new(
442 9,
443 vec![None, None, None, None, None, None, None, None, None],
444 ));
445 }
446
447 #[test]
448 fn large_text_cell() {
449 let big = "abc".repeat(10_000);
450 round_trip(&Cell::new(1, vec![Some(Value::Text(big))]));
451 }
452
453 #[test]
454 fn utf8_text_cell() {
455 round_trip(&Cell::new(
456 1,
457 vec![Some(Value::Text("héllo 🦀 世界".to_string()))],
458 ));
459 }
460
461 #[test]
462 fn negative_and_large_rowids() {
463 round_trip(&Cell::new(i64::MIN, vec![Some(Value::Integer(1))]));
464 round_trip(&Cell::new(i64::MAX, vec![Some(Value::Integer(1))]));
465 round_trip(&Cell::new(-1, vec![Some(Value::Integer(1))]));
466 }
467
468 #[test]
469 fn bool_edges() {
470 round_trip(&Cell::new(
471 1,
472 vec![Some(Value::Bool(true)), Some(Value::Bool(false))],
473 ));
474 }
475
476 #[test]
477 fn real_edges() {
478 for v in [
480 0.0f64,
481 1.0,
482 -1.0,
483 f64::MIN,
484 f64::MAX,
485 f64::INFINITY,
486 f64::NEG_INFINITY,
487 ] {
488 round_trip(&Cell::new(1, vec![Some(Value::Real(v))]));
489 }
490 }
491
492 #[test]
497 fn vector_round_trip_small() {
498 let v = vec![0.1f32, 0.2, 0.3];
502 round_trip(&Cell::new(1, vec![Some(Value::Vector(v))]));
503 }
504
505 #[test]
506 fn vector_round_trip_high_dim() {
507 let v: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
510 round_trip(&Cell::new(7, vec![Some(Value::Vector(v))]));
511 }
512
513 #[test]
514 fn vector_round_trip_edge_values() {
515 let v = vec![
518 0.0f32,
519 -0.0,
520 1.0,
521 -1.0,
522 f32::MIN,
523 f32::MAX,
524 f32::INFINITY,
525 f32::NEG_INFINITY,
526 ];
527 let cell = Cell::new(2, vec![Some(Value::Vector(v.clone()))]);
530 let bytes = cell.encode().expect("encode");
531 let (decoded, _) = Cell::decode(&bytes, 0).expect("decode");
532 match &decoded.values[0] {
533 Some(Value::Vector(out)) => {
534 assert_eq!(out.len(), v.len());
535 for (i, (a, b)) in out.iter().zip(v.iter()).enumerate() {
536 assert_eq!(
537 a.to_bits(),
538 b.to_bits(),
539 "element {i} bits mismatch: out {a:?}, expected {b:?}"
540 );
541 }
542 }
543 other => panic!("decoded into wrong variant: {other:?}"),
544 }
545 }
546
547 #[test]
548 fn vector_round_trip_mixed_with_other_columns() {
549 let cell = Cell::new(
553 42,
554 vec![
555 Some(Value::Integer(7)),
556 Some(Value::Text("alpha".to_string())),
557 Some(Value::Vector(vec![1.0, 2.0, 3.0, 4.0])),
558 Some(Value::Bool(true)),
559 ],
560 );
561 round_trip(&cell);
562 }
563
564 #[test]
565 fn vector_decode_truncated_buffer_errors() {
566 let cell = Cell::new(1, vec![Some(Value::Vector(vec![1.0, 2.0, 3.0]))]);
569 let bytes = cell.encode().expect("encode");
570 for chop in 1..=4 {
571 let truncated = &bytes[..bytes.len() - chop];
572 assert!(
573 Cell::decode(truncated, 0).is_err(),
574 "expected error decoding {} bytes short of full {}",
575 chop,
576 bytes.len()
577 );
578 }
579 }
580
581 #[test]
582 fn encoding_null_directly_is_rejected() {
583 let bad = Cell::new(1, vec![Some(Value::Null)]);
584 let err = bad.encode().unwrap_err();
585 assert!(format!("{err}").contains("Null values are encoded"));
586 }
587
588 #[test]
589 fn decode_rejects_truncated_buffer() {
590 let cell = Cell::new(1, vec![Some(Value::Text("some text here".to_string()))]);
591 let bytes = cell.encode().unwrap();
592 let truncated = &bytes[..bytes.len() - 5];
593 assert!(Cell::decode(truncated, 0).is_err());
594 }
595
596 #[test]
597 fn decode_rejects_unknown_value_tag() {
598 let mut buf = Vec::new();
607 buf.push(5); buf.push(KIND_LOCAL); buf.push(0); buf.push(1); buf.push(0); buf.push(0xFE); let err = Cell::decode(&buf, 0).unwrap_err();
614 assert!(format!("{err}").contains("unknown value tag"));
615 }
616
617 #[test]
618 fn decode_rejects_wrong_kind_tag() {
619 let mut buf = Vec::new();
622 buf.push(1); buf.push(KIND_OVERFLOW);
624 let err = Cell::decode(&buf, 0).unwrap_err();
625 assert!(format!("{err}").contains("non-local"));
626 }
627
628 #[test]
629 fn concatenated_cells_read_sequentially() {
630 let c1 = Cell::new(1, vec![Some(Value::Integer(100))]);
631 let c2 = Cell::new(2, vec![Some(Value::Text("two".to_string()))]);
632 let c3 = Cell::new(3, vec![None]);
633
634 let mut buf = Vec::new();
635 buf.extend_from_slice(&c1.encode().unwrap());
636 buf.extend_from_slice(&c2.encode().unwrap());
637 buf.extend_from_slice(&c3.encode().unwrap());
638
639 let (d1, n1) = Cell::decode(&buf, 0).unwrap();
640 let (d2, n2) = Cell::decode(&buf, n1).unwrap();
641 let (d3, n3) = Cell::decode(&buf, n1 + n2).unwrap();
642 assert_eq!(d1, c1);
643 assert_eq!(d2, c2);
644 assert_eq!(d3, c3);
645 assert_eq!(n1 + n2 + n3, buf.len());
646 }
647
648 #[test]
649 fn null_bitmap_byte_boundary() {
650 let values: Vec<Option<Value>> = (0..8)
652 .map(|i| {
653 if i % 2 == 0 {
654 Some(Value::Integer(i))
655 } else {
656 None
657 }
658 })
659 .collect();
660 round_trip(&Cell::new(1, values));
661
662 let values: Vec<Option<Value>> = (0..9)
664 .map(|i| {
665 if i % 3 == 0 {
666 Some(Value::Integer(i))
667 } else {
668 None
669 }
670 })
671 .collect();
672 round_trip(&Cell::new(1, values));
673 }
674}