1use crate::error::{Result, SQLRiteError};
46use crate::sql::db::table::Value;
47use crate::sql::pager::varint;
48
49pub const KIND_LOCAL: u8 = 0x01;
57pub const KIND_OVERFLOW: u8 = 0x02;
58pub const KIND_INTERIOR: u8 = 0x03;
59pub const KIND_INDEX: u8 = 0x04;
60pub const KIND_HNSW: u8 = 0x05;
75
76pub mod tag {
78 pub const INTEGER: u8 = 0;
79 pub const REAL: u8 = 1;
80 pub const TEXT: u8 = 2;
81 pub const BOOL: u8 = 3;
82 pub const VECTOR: u8 = 4;
87}
88
89#[derive(Debug, Clone, PartialEq)]
94pub struct Cell {
95 pub rowid: i64,
96 pub values: Vec<Option<Value>>,
97}
98
99impl Cell {
100 pub fn new(rowid: i64, values: Vec<Option<Value>>) -> Self {
101 Self { rowid, values }
102 }
103
104 pub fn encode(&self) -> Result<Vec<u8>> {
109 let mut body = Vec::new();
112 body.push(KIND_LOCAL);
113 varint::write_i64(&mut body, self.rowid);
114 varint::write_u64(&mut body, self.values.len() as u64);
115 encode_null_bitmap(&mut body, &self.values);
116 for v in self.values.iter().flatten() {
117 encode_value(&mut body, v)?;
118 }
119
120 let mut out = Vec::with_capacity(body.len() + varint::MAX_VARINT_BYTES);
121 varint::write_u64(&mut out, body.len() as u64);
122 out.extend_from_slice(&body);
123 Ok(out)
124 }
125
126 pub fn encoded_len(&self) -> Result<usize> {
129 Ok(self.encode()?.len())
133 }
134
135 pub fn peek_rowid(buf: &[u8], pos: usize) -> Result<i64> {
140 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
141 let body_start = pos + len_bytes;
142 if body_start >= buf.len() {
144 return Err(SQLRiteError::Internal(
145 "paged cell truncated before kind tag".to_string(),
146 ));
147 }
148 let (rowid, _) = varint::read_i64(buf, body_start + 1)?;
149 Ok(rowid)
150 }
151
152 pub fn encoded_size_at(buf: &[u8], pos: usize) -> Result<usize> {
156 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
157 Ok(len_bytes + body_len as usize)
158 }
159
160 pub fn peek_kind(buf: &[u8], pos: usize) -> Result<u8> {
163 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
164 let kind_pos = pos + len_bytes;
165 buf.get(kind_pos).copied().ok_or_else(|| {
166 SQLRiteError::Internal("paged cell truncated before kind tag".to_string())
167 })
168 }
169
170 pub fn decode(buf: &[u8], pos: usize) -> Result<(Cell, usize)> {
175 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
176 let body_start = pos + len_bytes;
177 let body_end = body_start
178 .checked_add(body_len as usize)
179 .ok_or_else(|| SQLRiteError::Internal("cell length overflow".to_string()))?;
180 if body_end > buf.len() {
181 return Err(SQLRiteError::Internal(format!(
182 "cell extends past buffer: needs bytes {body_start}..{body_end}, have {}",
183 buf.len()
184 )));
185 }
186
187 let body = &buf[body_start..body_end];
188 if body.is_empty() {
189 return Err(SQLRiteError::Internal(
190 "paged cell body is empty (no kind tag)".to_string(),
191 ));
192 }
193 let kind_tag = body[0];
194 if kind_tag != KIND_LOCAL {
195 return Err(SQLRiteError::Internal(format!(
196 "Cell::decode called on non-local entry (kind_tag = {kind_tag:#x})"
197 )));
198 }
199 let mut cur = 1usize;
200
201 let (rowid, n) = varint::read_i64(body, cur)?;
202 cur += n;
203 let (col_count_u, n) = varint::read_u64(body, cur)?;
204 cur += n;
205 let col_count = col_count_u as usize;
206
207 let bitmap_bytes = col_count.div_ceil(8);
208 if cur + bitmap_bytes > body.len() {
209 return Err(SQLRiteError::Internal(
210 "cell body truncated before null bitmap ends".to_string(),
211 ));
212 }
213 let bitmap = &body[cur..cur + bitmap_bytes];
214 cur += bitmap_bytes;
215
216 let mut values = Vec::with_capacity(col_count);
217 for col in 0..col_count {
218 if is_null(bitmap, col) {
219 values.push(None);
220 } else {
221 let (v, n) = decode_value(body, cur)?;
222 cur += n;
223 values.push(Some(v));
224 }
225 }
226
227 if cur != body.len() {
228 return Err(SQLRiteError::Internal(format!(
229 "cell body had {} trailing bytes after last value",
230 body.len() - cur
231 )));
232 }
233
234 Ok((Cell { rowid, values }, body_end - pos))
235 }
236}
237
238fn encode_null_bitmap(out: &mut Vec<u8>, values: &[Option<Value>]) {
239 let n = values.len().div_ceil(8);
240 let start = out.len();
241 out.resize(start + n, 0);
242 for (i, v) in values.iter().enumerate() {
243 if v.is_none() {
244 let byte_idx = start + (i / 8);
245 let bit = i % 8;
246 out[byte_idx] |= 1 << bit;
247 }
248 }
249}
250
251fn is_null(bitmap: &[u8], col: usize) -> bool {
252 let byte = col / 8;
253 let bit = col % 8;
254 bitmap.get(byte).is_some_and(|b| (b >> bit) & 1 == 1)
255}
256
257pub(super) fn encode_value(out: &mut Vec<u8>, value: &Value) -> Result<()> {
258 match value {
259 Value::Integer(i) => {
260 out.push(tag::INTEGER);
261 varint::write_i64(out, *i);
262 }
263 Value::Real(f) => {
264 out.push(tag::REAL);
265 out.extend_from_slice(&f.to_le_bytes());
266 }
267 Value::Text(s) => {
268 out.push(tag::TEXT);
269 let bytes = s.as_bytes();
270 varint::write_u64(out, bytes.len() as u64);
271 out.extend_from_slice(bytes);
272 }
273 Value::Bool(b) => {
274 out.push(tag::BOOL);
275 out.push(if *b { 1 } else { 0 });
276 }
277 Value::Vector(v) => {
278 out.push(tag::VECTOR);
279 varint::write_u64(out, v.len() as u64);
281 for x in v {
283 out.extend_from_slice(&x.to_le_bytes());
284 }
285 }
286 Value::Null => {
287 return Err(SQLRiteError::Internal(
288 "Null values are encoded via the null bitmap, not a value block".to_string(),
289 ));
290 }
291 }
292 Ok(())
293}
294
295pub(super) fn decode_value(buf: &[u8], pos: usize) -> Result<(Value, usize)> {
296 let tag = *buf
297 .get(pos)
298 .ok_or_else(|| SQLRiteError::Internal(format!("value block truncated at offset {pos}")))?;
299 let body_start = pos + 1;
300 match tag {
301 tag::INTEGER => {
302 let (v, n) = varint::read_i64(buf, body_start)?;
303 Ok((Value::Integer(v), 1 + n))
304 }
305 tag::REAL => {
306 let end = body_start + 8;
307 if end > buf.len() {
308 return Err(SQLRiteError::Internal(
309 "Real value truncated: needs 8 bytes".to_string(),
310 ));
311 }
312 let arr: [u8; 8] = buf[body_start..end].try_into().unwrap();
313 Ok((Value::Real(f64::from_le_bytes(arr)), 1 + 8))
314 }
315 tag::TEXT => {
316 let (len, n) = varint::read_u64(buf, body_start)?;
317 let text_start = body_start + n;
318 let text_end = text_start + (len as usize);
319 if text_end > buf.len() {
320 return Err(SQLRiteError::Internal("Text value truncated".to_string()));
321 }
322 let s = std::str::from_utf8(&buf[text_start..text_end])
323 .map_err(|e| SQLRiteError::Internal(format!("Text value is not valid UTF-8: {e}")))?
324 .to_string();
325 Ok((Value::Text(s), 1 + n + (len as usize)))
326 }
327 tag::BOOL => {
328 let byte = *buf
329 .get(body_start)
330 .ok_or_else(|| SQLRiteError::Internal("Bool value truncated".to_string()))?;
331 Ok((Value::Bool(byte != 0), 1 + 1))
332 }
333 tag::VECTOR => {
334 let (dim, n) = varint::read_u64(buf, body_start)?;
337 let dim = dim as usize;
338 let elements_start = body_start + n;
339 let elements_end = elements_start + dim * 4;
340 if elements_end > buf.len() {
341 return Err(SQLRiteError::Internal(format!(
342 "Vector value truncated: needs {dim} × 4 = {} bytes",
343 dim * 4
344 )));
345 }
346 let mut out = Vec::with_capacity(dim);
347 for i in 0..dim {
348 let off = elements_start + i * 4;
349 let arr: [u8; 4] = buf[off..off + 4].try_into().unwrap();
350 out.push(f32::from_le_bytes(arr));
351 }
352 Ok((Value::Vector(out), 1 + n + dim * 4))
353 }
354 other => Err(SQLRiteError::Internal(format!(
355 "unknown value tag {other:#x} at offset {pos}"
356 ))),
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 fn round_trip(cell: &Cell) {
365 let bytes = cell.encode().unwrap();
366 let (back, consumed) = Cell::decode(&bytes, 0).unwrap();
367 assert_eq!(&back, cell);
368 assert_eq!(consumed, bytes.len());
369 }
370
371 #[test]
372 fn empty_cell_no_columns() {
373 round_trip(&Cell::new(1, vec![]));
374 }
375
376 #[test]
377 fn integer_only_cell() {
378 round_trip(&Cell::new(
379 42,
380 vec![Some(Value::Integer(1)), Some(Value::Integer(-1000))],
381 ));
382 }
383
384 #[test]
385 fn mixed_types_cell() {
386 round_trip(&Cell::new(
387 100,
388 vec![
389 Some(Value::Integer(7)),
390 Some(Value::Text("hello".to_string())),
391 Some(Value::Real(2.5)),
395 Some(Value::Bool(true)),
396 ],
397 ));
398 }
399
400 #[test]
401 fn nulls_interspersed() {
402 round_trip(&Cell::new(
403 5,
404 vec![
405 Some(Value::Integer(1)),
406 None,
407 Some(Value::Text("middle".to_string())),
408 None,
409 None,
410 Some(Value::Bool(false)),
411 ],
412 ));
413 }
414
415 #[test]
416 fn all_null_cell() {
417 round_trip(&Cell::new(
418 9,
419 vec![None, None, None, None, None, None, None, None, None],
420 ));
421 }
422
423 #[test]
424 fn large_text_cell() {
425 let big = "abc".repeat(10_000);
426 round_trip(&Cell::new(1, vec![Some(Value::Text(big))]));
427 }
428
429 #[test]
430 fn utf8_text_cell() {
431 round_trip(&Cell::new(
432 1,
433 vec![Some(Value::Text("héllo 🦀 世界".to_string()))],
434 ));
435 }
436
437 #[test]
438 fn negative_and_large_rowids() {
439 round_trip(&Cell::new(i64::MIN, vec![Some(Value::Integer(1))]));
440 round_trip(&Cell::new(i64::MAX, vec![Some(Value::Integer(1))]));
441 round_trip(&Cell::new(-1, vec![Some(Value::Integer(1))]));
442 }
443
444 #[test]
445 fn bool_edges() {
446 round_trip(&Cell::new(
447 1,
448 vec![Some(Value::Bool(true)), Some(Value::Bool(false))],
449 ));
450 }
451
452 #[test]
453 fn real_edges() {
454 for v in [
456 0.0f64,
457 1.0,
458 -1.0,
459 f64::MIN,
460 f64::MAX,
461 f64::INFINITY,
462 f64::NEG_INFINITY,
463 ] {
464 round_trip(&Cell::new(1, vec![Some(Value::Real(v))]));
465 }
466 }
467
468 #[test]
473 fn vector_round_trip_small() {
474 let v = vec![0.1f32, 0.2, 0.3];
478 round_trip(&Cell::new(1, vec![Some(Value::Vector(v))]));
479 }
480
481 #[test]
482 fn vector_round_trip_high_dim() {
483 let v: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
486 round_trip(&Cell::new(7, vec![Some(Value::Vector(v))]));
487 }
488
489 #[test]
490 fn vector_round_trip_edge_values() {
491 let v = vec![
494 0.0f32,
495 -0.0,
496 1.0,
497 -1.0,
498 f32::MIN,
499 f32::MAX,
500 f32::INFINITY,
501 f32::NEG_INFINITY,
502 ];
503 let cell = Cell::new(2, vec![Some(Value::Vector(v.clone()))]);
506 let bytes = cell.encode().expect("encode");
507 let (decoded, _) = Cell::decode(&bytes, 0).expect("decode");
508 match &decoded.values[0] {
509 Some(Value::Vector(out)) => {
510 assert_eq!(out.len(), v.len());
511 for (i, (a, b)) in out.iter().zip(v.iter()).enumerate() {
512 assert_eq!(
513 a.to_bits(),
514 b.to_bits(),
515 "element {i} bits mismatch: out {a:?}, expected {b:?}"
516 );
517 }
518 }
519 other => panic!("decoded into wrong variant: {other:?}"),
520 }
521 }
522
523 #[test]
524 fn vector_round_trip_mixed_with_other_columns() {
525 let cell = Cell::new(
529 42,
530 vec![
531 Some(Value::Integer(7)),
532 Some(Value::Text("alpha".to_string())),
533 Some(Value::Vector(vec![1.0, 2.0, 3.0, 4.0])),
534 Some(Value::Bool(true)),
535 ],
536 );
537 round_trip(&cell);
538 }
539
540 #[test]
541 fn vector_decode_truncated_buffer_errors() {
542 let cell = Cell::new(1, vec![Some(Value::Vector(vec![1.0, 2.0, 3.0]))]);
545 let bytes = cell.encode().expect("encode");
546 for chop in 1..=4 {
547 let truncated = &bytes[..bytes.len() - chop];
548 assert!(
549 Cell::decode(truncated, 0).is_err(),
550 "expected error decoding {} bytes short of full {}",
551 chop,
552 bytes.len()
553 );
554 }
555 }
556
557 #[test]
558 fn encoding_null_directly_is_rejected() {
559 let bad = Cell::new(1, vec![Some(Value::Null)]);
560 let err = bad.encode().unwrap_err();
561 assert!(format!("{err}").contains("Null values are encoded"));
562 }
563
564 #[test]
565 fn decode_rejects_truncated_buffer() {
566 let cell = Cell::new(1, vec![Some(Value::Text("some text here".to_string()))]);
567 let bytes = cell.encode().unwrap();
568 let truncated = &bytes[..bytes.len() - 5];
569 assert!(Cell::decode(truncated, 0).is_err());
570 }
571
572 #[test]
573 fn decode_rejects_unknown_value_tag() {
574 let mut buf = Vec::new();
583 buf.push(5); buf.push(KIND_LOCAL); buf.push(0); buf.push(1); buf.push(0); buf.push(0xFE); let err = Cell::decode(&buf, 0).unwrap_err();
590 assert!(format!("{err}").contains("unknown value tag"));
591 }
592
593 #[test]
594 fn decode_rejects_wrong_kind_tag() {
595 let mut buf = Vec::new();
598 buf.push(1); buf.push(KIND_OVERFLOW);
600 let err = Cell::decode(&buf, 0).unwrap_err();
601 assert!(format!("{err}").contains("non-local"));
602 }
603
604 #[test]
605 fn concatenated_cells_read_sequentially() {
606 let c1 = Cell::new(1, vec![Some(Value::Integer(100))]);
607 let c2 = Cell::new(2, vec![Some(Value::Text("two".to_string()))]);
608 let c3 = Cell::new(3, vec![None]);
609
610 let mut buf = Vec::new();
611 buf.extend_from_slice(&c1.encode().unwrap());
612 buf.extend_from_slice(&c2.encode().unwrap());
613 buf.extend_from_slice(&c3.encode().unwrap());
614
615 let (d1, n1) = Cell::decode(&buf, 0).unwrap();
616 let (d2, n2) = Cell::decode(&buf, n1).unwrap();
617 let (d3, n3) = Cell::decode(&buf, n1 + n2).unwrap();
618 assert_eq!(d1, c1);
619 assert_eq!(d2, c2);
620 assert_eq!(d3, c3);
621 assert_eq!(n1 + n2 + n3, buf.len());
622 }
623
624 #[test]
625 fn null_bitmap_byte_boundary() {
626 let values: Vec<Option<Value>> = (0..8)
628 .map(|i| {
629 if i % 2 == 0 {
630 Some(Value::Integer(i))
631 } else {
632 None
633 }
634 })
635 .collect();
636 round_trip(&Cell::new(1, values));
637
638 let values: Vec<Option<Value>> = (0..9)
640 .map(|i| {
641 if i % 3 == 0 {
642 Some(Value::Integer(i))
643 } else {
644 None
645 }
646 })
647 .collect();
648 round_trip(&Cell::new(1, values));
649 }
650}