1use crate::error::{Result, SQLRiteError};
46use crate::sql::db::table::Value;
47use crate::sql::pager::varint;
48
49pub const KIND_LOCAL: u8 = 0x01;
57pub const KIND_OVERFLOW: u8 = 0x02;
58pub const KIND_INTERIOR: u8 = 0x03;
59pub const KIND_INDEX: u8 = 0x04;
60
61pub mod tag {
63 pub const INTEGER: u8 = 0;
64 pub const REAL: u8 = 1;
65 pub const TEXT: u8 = 2;
66 pub const BOOL: u8 = 3;
67}
68
69#[derive(Debug, Clone, PartialEq)]
74pub struct Cell {
75 pub rowid: i64,
76 pub values: Vec<Option<Value>>,
77}
78
79impl Cell {
80 pub fn new(rowid: i64, values: Vec<Option<Value>>) -> Self {
81 Self { rowid, values }
82 }
83
84 pub fn encode(&self) -> Result<Vec<u8>> {
89 let mut body = Vec::new();
92 body.push(KIND_LOCAL);
93 varint::write_i64(&mut body, self.rowid);
94 varint::write_u64(&mut body, self.values.len() as u64);
95 encode_null_bitmap(&mut body, &self.values);
96 for v in self.values.iter().flatten() {
97 encode_value(&mut body, v)?;
98 }
99
100 let mut out = Vec::with_capacity(body.len() + varint::MAX_VARINT_BYTES);
101 varint::write_u64(&mut out, body.len() as u64);
102 out.extend_from_slice(&body);
103 Ok(out)
104 }
105
106 pub fn encoded_len(&self) -> Result<usize> {
109 Ok(self.encode()?.len())
113 }
114
115 pub fn peek_rowid(buf: &[u8], pos: usize) -> Result<i64> {
120 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
121 let body_start = pos + len_bytes;
122 if body_start >= buf.len() {
124 return Err(SQLRiteError::Internal(
125 "paged cell truncated before kind tag".to_string(),
126 ));
127 }
128 let (rowid, _) = varint::read_i64(buf, body_start + 1)?;
129 Ok(rowid)
130 }
131
132 pub fn encoded_size_at(buf: &[u8], pos: usize) -> Result<usize> {
136 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
137 Ok(len_bytes + body_len as usize)
138 }
139
140 pub fn peek_kind(buf: &[u8], pos: usize) -> Result<u8> {
143 let (_body_len, len_bytes) = varint::read_u64(buf, pos)?;
144 let kind_pos = pos + len_bytes;
145 buf.get(kind_pos).copied().ok_or_else(|| {
146 SQLRiteError::Internal("paged cell truncated before kind tag".to_string())
147 })
148 }
149
150 pub fn decode(buf: &[u8], pos: usize) -> Result<(Cell, usize)> {
155 let (body_len, len_bytes) = varint::read_u64(buf, pos)?;
156 let body_start = pos + len_bytes;
157 let body_end = body_start
158 .checked_add(body_len as usize)
159 .ok_or_else(|| SQLRiteError::Internal("cell length overflow".to_string()))?;
160 if body_end > buf.len() {
161 return Err(SQLRiteError::Internal(format!(
162 "cell extends past buffer: needs bytes {body_start}..{body_end}, have {}",
163 buf.len()
164 )));
165 }
166
167 let body = &buf[body_start..body_end];
168 if body.is_empty() {
169 return Err(SQLRiteError::Internal(
170 "paged cell body is empty (no kind tag)".to_string(),
171 ));
172 }
173 let kind_tag = body[0];
174 if kind_tag != KIND_LOCAL {
175 return Err(SQLRiteError::Internal(format!(
176 "Cell::decode called on non-local entry (kind_tag = {kind_tag:#x})"
177 )));
178 }
179 let mut cur = 1usize;
180
181 let (rowid, n) = varint::read_i64(body, cur)?;
182 cur += n;
183 let (col_count_u, n) = varint::read_u64(body, cur)?;
184 cur += n;
185 let col_count = col_count_u as usize;
186
187 let bitmap_bytes = col_count.div_ceil(8);
188 if cur + bitmap_bytes > body.len() {
189 return Err(SQLRiteError::Internal(
190 "cell body truncated before null bitmap ends".to_string(),
191 ));
192 }
193 let bitmap = &body[cur..cur + bitmap_bytes];
194 cur += bitmap_bytes;
195
196 let mut values = Vec::with_capacity(col_count);
197 for col in 0..col_count {
198 if is_null(bitmap, col) {
199 values.push(None);
200 } else {
201 let (v, n) = decode_value(body, cur)?;
202 cur += n;
203 values.push(Some(v));
204 }
205 }
206
207 if cur != body.len() {
208 return Err(SQLRiteError::Internal(format!(
209 "cell body had {} trailing bytes after last value",
210 body.len() - cur
211 )));
212 }
213
214 Ok((Cell { rowid, values }, body_end - pos))
215 }
216}
217
218fn encode_null_bitmap(out: &mut Vec<u8>, values: &[Option<Value>]) {
219 let n = values.len().div_ceil(8);
220 let start = out.len();
221 out.resize(start + n, 0);
222 for (i, v) in values.iter().enumerate() {
223 if v.is_none() {
224 let byte_idx = start + (i / 8);
225 let bit = i % 8;
226 out[byte_idx] |= 1 << bit;
227 }
228 }
229}
230
231fn is_null(bitmap: &[u8], col: usize) -> bool {
232 let byte = col / 8;
233 let bit = col % 8;
234 bitmap.get(byte).is_some_and(|b| (b >> bit) & 1 == 1)
235}
236
237pub(super) fn encode_value(out: &mut Vec<u8>, value: &Value) -> Result<()> {
238 match value {
239 Value::Integer(i) => {
240 out.push(tag::INTEGER);
241 varint::write_i64(out, *i);
242 }
243 Value::Real(f) => {
244 out.push(tag::REAL);
245 out.extend_from_slice(&f.to_le_bytes());
246 }
247 Value::Text(s) => {
248 out.push(tag::TEXT);
249 let bytes = s.as_bytes();
250 varint::write_u64(out, bytes.len() as u64);
251 out.extend_from_slice(bytes);
252 }
253 Value::Bool(b) => {
254 out.push(tag::BOOL);
255 out.push(if *b { 1 } else { 0 });
256 }
257 Value::Null => {
258 return Err(SQLRiteError::Internal(
259 "Null values are encoded via the null bitmap, not a value block".to_string(),
260 ));
261 }
262 }
263 Ok(())
264}
265
266pub(super) fn decode_value(buf: &[u8], pos: usize) -> Result<(Value, usize)> {
267 let tag = *buf
268 .get(pos)
269 .ok_or_else(|| SQLRiteError::Internal(format!("value block truncated at offset {pos}")))?;
270 let body_start = pos + 1;
271 match tag {
272 tag::INTEGER => {
273 let (v, n) = varint::read_i64(buf, body_start)?;
274 Ok((Value::Integer(v), 1 + n))
275 }
276 tag::REAL => {
277 let end = body_start + 8;
278 if end > buf.len() {
279 return Err(SQLRiteError::Internal(
280 "Real value truncated: needs 8 bytes".to_string(),
281 ));
282 }
283 let arr: [u8; 8] = buf[body_start..end].try_into().unwrap();
284 Ok((Value::Real(f64::from_le_bytes(arr)), 1 + 8))
285 }
286 tag::TEXT => {
287 let (len, n) = varint::read_u64(buf, body_start)?;
288 let text_start = body_start + n;
289 let text_end = text_start + (len as usize);
290 if text_end > buf.len() {
291 return Err(SQLRiteError::Internal("Text value truncated".to_string()));
292 }
293 let s = std::str::from_utf8(&buf[text_start..text_end])
294 .map_err(|e| SQLRiteError::Internal(format!("Text value is not valid UTF-8: {e}")))?
295 .to_string();
296 Ok((Value::Text(s), 1 + n + (len as usize)))
297 }
298 tag::BOOL => {
299 let byte = *buf
300 .get(body_start)
301 .ok_or_else(|| SQLRiteError::Internal("Bool value truncated".to_string()))?;
302 Ok((Value::Bool(byte != 0), 1 + 1))
303 }
304 other => Err(SQLRiteError::Internal(format!(
305 "unknown value tag {other:#x} at offset {pos}"
306 ))),
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn round_trip(cell: &Cell) {
315 let bytes = cell.encode().unwrap();
316 let (back, consumed) = Cell::decode(&bytes, 0).unwrap();
317 assert_eq!(&back, cell);
318 assert_eq!(consumed, bytes.len());
319 }
320
321 #[test]
322 fn empty_cell_no_columns() {
323 round_trip(&Cell::new(1, vec![]));
324 }
325
326 #[test]
327 fn integer_only_cell() {
328 round_trip(&Cell::new(
329 42,
330 vec![Some(Value::Integer(1)), Some(Value::Integer(-1000))],
331 ));
332 }
333
334 #[test]
335 fn mixed_types_cell() {
336 round_trip(&Cell::new(
337 100,
338 vec![
339 Some(Value::Integer(7)),
340 Some(Value::Text("hello".to_string())),
341 Some(Value::Real(2.5)),
345 Some(Value::Bool(true)),
346 ],
347 ));
348 }
349
350 #[test]
351 fn nulls_interspersed() {
352 round_trip(&Cell::new(
353 5,
354 vec![
355 Some(Value::Integer(1)),
356 None,
357 Some(Value::Text("middle".to_string())),
358 None,
359 None,
360 Some(Value::Bool(false)),
361 ],
362 ));
363 }
364
365 #[test]
366 fn all_null_cell() {
367 round_trip(&Cell::new(
368 9,
369 vec![None, None, None, None, None, None, None, None, None],
370 ));
371 }
372
373 #[test]
374 fn large_text_cell() {
375 let big = "abc".repeat(10_000);
376 round_trip(&Cell::new(1, vec![Some(Value::Text(big))]));
377 }
378
379 #[test]
380 fn utf8_text_cell() {
381 round_trip(&Cell::new(
382 1,
383 vec![Some(Value::Text("héllo 🦀 世界".to_string()))],
384 ));
385 }
386
387 #[test]
388 fn negative_and_large_rowids() {
389 round_trip(&Cell::new(i64::MIN, vec![Some(Value::Integer(1))]));
390 round_trip(&Cell::new(i64::MAX, vec![Some(Value::Integer(1))]));
391 round_trip(&Cell::new(-1, vec![Some(Value::Integer(1))]));
392 }
393
394 #[test]
395 fn bool_edges() {
396 round_trip(&Cell::new(
397 1,
398 vec![Some(Value::Bool(true)), Some(Value::Bool(false))],
399 ));
400 }
401
402 #[test]
403 fn real_edges() {
404 for v in [
406 0.0f64,
407 1.0,
408 -1.0,
409 f64::MIN,
410 f64::MAX,
411 f64::INFINITY,
412 f64::NEG_INFINITY,
413 ] {
414 round_trip(&Cell::new(1, vec![Some(Value::Real(v))]));
415 }
416 }
417
418 #[test]
419 fn encoding_null_directly_is_rejected() {
420 let bad = Cell::new(1, vec![Some(Value::Null)]);
421 let err = bad.encode().unwrap_err();
422 assert!(format!("{err}").contains("Null values are encoded"));
423 }
424
425 #[test]
426 fn decode_rejects_truncated_buffer() {
427 let cell = Cell::new(1, vec![Some(Value::Text("some text here".to_string()))]);
428 let bytes = cell.encode().unwrap();
429 let truncated = &bytes[..bytes.len() - 5];
430 assert!(Cell::decode(truncated, 0).is_err());
431 }
432
433 #[test]
434 fn decode_rejects_unknown_value_tag() {
435 let mut buf = Vec::new();
444 buf.push(5); buf.push(KIND_LOCAL); buf.push(0); buf.push(1); buf.push(0); buf.push(0xFE); let err = Cell::decode(&buf, 0).unwrap_err();
451 assert!(format!("{err}").contains("unknown value tag"));
452 }
453
454 #[test]
455 fn decode_rejects_wrong_kind_tag() {
456 let mut buf = Vec::new();
459 buf.push(1); buf.push(KIND_OVERFLOW);
461 let err = Cell::decode(&buf, 0).unwrap_err();
462 assert!(format!("{err}").contains("non-local"));
463 }
464
465 #[test]
466 fn concatenated_cells_read_sequentially() {
467 let c1 = Cell::new(1, vec![Some(Value::Integer(100))]);
468 let c2 = Cell::new(2, vec![Some(Value::Text("two".to_string()))]);
469 let c3 = Cell::new(3, vec![None]);
470
471 let mut buf = Vec::new();
472 buf.extend_from_slice(&c1.encode().unwrap());
473 buf.extend_from_slice(&c2.encode().unwrap());
474 buf.extend_from_slice(&c3.encode().unwrap());
475
476 let (d1, n1) = Cell::decode(&buf, 0).unwrap();
477 let (d2, n2) = Cell::decode(&buf, n1).unwrap();
478 let (d3, n3) = Cell::decode(&buf, n1 + n2).unwrap();
479 assert_eq!(d1, c1);
480 assert_eq!(d2, c2);
481 assert_eq!(d3, c3);
482 assert_eq!(n1 + n2 + n3, buf.len());
483 }
484
485 #[test]
486 fn null_bitmap_byte_boundary() {
487 let values: Vec<Option<Value>> = (0..8)
489 .map(|i| {
490 if i % 2 == 0 {
491 Some(Value::Integer(i))
492 } else {
493 None
494 }
495 })
496 .collect();
497 round_trip(&Cell::new(1, values));
498
499 let values: Vec<Option<Value>> = (0..9)
501 .map(|i| {
502 if i % 3 == 0 {
503 Some(Value::Integer(i))
504 } else {
505 None
506 }
507 })
508 .collect();
509 round_trip(&Cell::new(1, values));
510 }
511}