1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18pub fn encode_key_value(value: &Value) -> Vec<u8> {
20 let mut buf = Vec::with_capacity(16);
21 encode_key_value_into(value, &mut buf);
22 buf
23}
24
25pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27 let mut buf = Vec::new();
28 for v in values {
29 buf.extend_from_slice(&encode_key_value(v));
30 }
31 buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35 buf.clear();
36 for v in values {
37 encode_key_value_into(v, buf);
38 }
39}
40
41fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
42 match value {
43 Value::Null => buf.push(TAG_NULL),
44 Value::Boolean(b) => {
45 buf.push(TAG_BOOLEAN);
46 buf.push(if *b { 0x01 } else { 0x00 });
47 }
48 Value::Integer(i) => encode_integer_into(*i, buf),
49 Value::Real(r) => encode_real_into(*r, buf),
50 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
51 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
52 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
53 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
54 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
55 Value::Interval {
56 months,
57 days,
58 micros,
59 } => {
60 buf.push(TAG_INTERVAL);
62 let mut mb = months.to_be_bytes();
63 mb[0] ^= 0x80;
64 buf.extend_from_slice(&mb);
65 let mut db = days.to_be_bytes();
66 db[0] ^= 0x80;
67 buf.extend_from_slice(&db);
68 let mut ub = micros.to_be_bytes();
69 ub[0] ^= 0x80;
70 buf.extend_from_slice(&ub);
71 }
72 }
73}
74
75fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
76 encode_signed_varint(TAG_INTEGER, val, buf);
77}
78
79pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
85 buf.push(tag);
86 if val == 0 {
87 buf.push(0x80);
88 return;
89 }
90 if val > 0 {
91 let bytes = val.to_be_bytes();
92 let start = bytes.iter().position(|&b| b != 0).unwrap();
93 let byte_count = (8 - start) as u8;
94 buf.push(0x80 + byte_count);
95 buf.extend_from_slice(&bytes[start..]);
96 } else {
97 let abs_val = if val == i64::MIN {
98 u64::MAX / 2 + 1
99 } else {
100 (-val) as u64
101 };
102 let bytes = abs_val.to_be_bytes();
103 let start = bytes.iter().position(|&b| b != 0).unwrap();
104 let byte_count = (8 - start) as u8;
105 buf.push(0x80 - byte_count);
106 for &b in &bytes[start..] {
107 buf.push(!b);
108 }
109 }
110}
111
112fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
113 buf.push(TAG_REAL);
114 let bits = val.to_bits();
115 let encoded = if val.is_sign_negative() {
116 !bits
117 } else {
118 bits ^ (1u64 << 63)
119 };
120 buf.extend_from_slice(&encoded.to_be_bytes());
121}
122
123fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
124 buf.push(tag);
125 for &b in data {
126 if b == 0x00 {
127 buf.push(0x00);
128 buf.push(0xFF);
129 } else {
130 buf.push(b);
131 }
132 }
133 buf.push(0x00);
134}
135
136pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
138 if data.is_empty() {
139 return Err(SqlError::InvalidValue("empty key data".into()));
140 }
141 match data[0] {
142 TAG_NULL => Ok((Value::Null, 1)),
143 TAG_BOOLEAN => {
144 if data.len() < 2 {
145 return Err(SqlError::InvalidValue("truncated boolean".into()));
146 }
147 Ok((Value::Boolean(data[1] != 0), 2))
148 }
149 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
150 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
151 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
152 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
153 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
154 (Value::Date(d), n + 1)
155 }),
156 TAG_TIMESTAMP => {
157 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
158 }
159 TAG_INTERVAL => {
160 if data.len() < 1 + 16 {
161 return Err(SqlError::InvalidValue("truncated interval".into()));
162 }
163 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
164 mb[0] ^= 0x80;
165 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
166 db[0] ^= 0x80;
167 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
168 ub[0] ^= 0x80;
169 Ok((
170 Value::Interval {
171 months: i32::from_be_bytes(mb),
172 days: i32::from_be_bytes(db),
173 micros: i64::from_be_bytes(ub),
174 },
175 17,
176 ))
177 }
178 TAG_TEXT => {
179 let (bytes, n) = decode_null_escaped(&data[1..])?;
180 let s = String::from_utf8(bytes)
181 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
182 Ok((Value::Text(CompactString::from(s)), n + 1))
183 }
184 TAG_BLOB => {
185 let (bytes, n) = decode_null_escaped(&data[1..])?;
186 Ok((Value::Blob(bytes), n + 1))
187 }
188 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
189 }
190}
191
192pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
194 let mut values = Vec::with_capacity(count);
195 let mut pos = 0;
196 for _ in 0..count {
197 let (v, n) = decode_key_value(&data[pos..])?;
198 values.push(v);
199 pos += n;
200 }
201 Ok(values)
202}
203
204fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
205 let (v, n) = decode_signed_varint(data)?;
206 Ok((Value::Integer(v), n))
207}
208
209pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
211 if data.is_empty() {
212 return Err(SqlError::InvalidValue("truncated integer".into()));
213 }
214 let marker = data[0];
215 if marker == 0x80 {
216 return Ok((0, 1));
217 }
218 if marker > 0x80 {
219 let byte_count = (marker - 0x80) as usize;
220 if data.len() < 1 + byte_count {
221 return Err(SqlError::InvalidValue("truncated positive integer".into()));
222 }
223 let mut bytes = [0u8; 8];
224 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
225 let val = i64::from_be_bytes(bytes);
226 Ok((val, 1 + byte_count))
227 } else {
228 let byte_count = (0x80 - marker) as usize;
229 if data.len() < 1 + byte_count {
230 return Err(SqlError::InvalidValue("truncated negative integer".into()));
231 }
232 let mut bytes = [0u8; 8];
233 for i in 0..byte_count {
234 bytes[8 - byte_count + i] = !data[1 + i];
235 }
236 let abs_val = u64::from_be_bytes(bytes);
237 let val = (-(abs_val as i128)) as i64;
238 Ok((val, 1 + byte_count))
239 }
240}
241
242fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
243 if data.len() < 8 {
244 return Err(SqlError::InvalidValue("truncated real".into()));
245 }
246 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
247 let bits = if encoded & (1u64 << 63) != 0 {
248 encoded ^ (1u64 << 63)
250 } else {
251 !encoded
253 };
254 let val = f64::from_bits(bits);
255 Ok((Value::Real(val), 8))
256}
257
258fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
260 let mut result = Vec::new();
261 let mut i = 0;
262 while i < data.len() {
263 if data[i] == 0x00 {
264 if i + 1 < data.len() && data[i + 1] == 0xFF {
265 result.push(0x00);
266 i += 2;
267 } else {
268 return Ok((result, i + 1)); }
270 } else {
271 result.push(data[i]);
272 i += 1;
273 }
274 }
275 Err(SqlError::InvalidValue(
276 "unterminated null-escaped string".into(),
277 ))
278}
279
280pub fn encode_row(values: &[Value]) -> Vec<u8> {
282 let col_count = values.len();
283 let bitmap_bytes = col_count.div_ceil(8);
284 let mut buf = Vec::new();
285
286 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
287 let mut bitmap = vec![0u8; bitmap_bytes];
288 for (i, v) in values.iter().enumerate() {
289 if v.is_null() {
290 bitmap[i / 8] |= 1 << (i % 8);
291 }
292 }
293 buf.extend_from_slice(&bitmap);
294
295 for v in values {
296 if v.is_null() {
297 continue;
298 }
299 match v {
300 Value::Integer(i) => {
301 buf.push(DataType::Integer.type_tag());
302 buf.extend_from_slice(&8u32.to_le_bytes());
303 buf.extend_from_slice(&i.to_le_bytes());
304 }
305 Value::Real(r) => {
306 buf.push(DataType::Real.type_tag());
307 buf.extend_from_slice(&8u32.to_le_bytes());
308 buf.extend_from_slice(&r.to_le_bytes());
309 }
310 Value::Boolean(b) => {
311 buf.push(DataType::Boolean.type_tag());
312 buf.extend_from_slice(&1u32.to_le_bytes());
313 buf.push(if *b { 1 } else { 0 });
314 }
315 Value::Text(s) => {
316 let bytes = s.as_bytes();
317 buf.push(DataType::Text.type_tag());
318 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
319 buf.extend_from_slice(bytes);
320 }
321 Value::Blob(data) => {
322 buf.push(DataType::Blob.type_tag());
323 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
324 buf.extend_from_slice(data);
325 }
326 Value::Time(t) => {
327 buf.push(DataType::Time.type_tag());
328 buf.extend_from_slice(&8u32.to_le_bytes());
329 buf.extend_from_slice(&t.to_le_bytes());
330 }
331 Value::Date(d) => {
332 buf.push(DataType::Date.type_tag());
333 buf.extend_from_slice(&4u32.to_le_bytes());
334 buf.extend_from_slice(&d.to_le_bytes());
335 }
336 Value::Timestamp(t) => {
337 buf.push(DataType::Timestamp.type_tag());
338 buf.extend_from_slice(&8u32.to_le_bytes());
339 buf.extend_from_slice(&t.to_le_bytes());
340 }
341 Value::Interval {
342 months,
343 days,
344 micros,
345 } => {
346 buf.push(DataType::Interval.type_tag());
347 buf.extend_from_slice(&16u32.to_le_bytes());
348 buf.extend_from_slice(&months.to_le_bytes());
349 buf.extend_from_slice(&days.to_le_bytes());
350 buf.extend_from_slice(µs.to_le_bytes());
351 }
352 Value::Null => unreachable!(),
353 }
354 }
355
356 buf
357}
358
359pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
360 buf.clear();
361 let col_count = values.len();
362 let bitmap_bytes = col_count.div_ceil(8);
363
364 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
365
366 let bitmap_start = buf.len();
367 buf.resize(buf.len() + bitmap_bytes, 0);
368
369 for (i, v) in values.iter().enumerate() {
370 if v.is_null() {
371 buf[bitmap_start + i / 8] |= 1 << (i % 8);
372 continue;
373 }
374 match v {
375 Value::Integer(val) => {
376 buf.push(DataType::Integer.type_tag());
377 buf.extend_from_slice(&8u32.to_le_bytes());
378 buf.extend_from_slice(&val.to_le_bytes());
379 }
380 Value::Real(r) => {
381 buf.push(DataType::Real.type_tag());
382 buf.extend_from_slice(&8u32.to_le_bytes());
383 buf.extend_from_slice(&r.to_le_bytes());
384 }
385 Value::Boolean(b) => {
386 buf.push(DataType::Boolean.type_tag());
387 buf.extend_from_slice(&1u32.to_le_bytes());
388 buf.push(if *b { 1 } else { 0 });
389 }
390 Value::Text(s) => {
391 let bytes = s.as_bytes();
392 buf.push(DataType::Text.type_tag());
393 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
394 buf.extend_from_slice(bytes);
395 }
396 Value::Blob(data) => {
397 buf.push(DataType::Blob.type_tag());
398 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
399 buf.extend_from_slice(data);
400 }
401 Value::Time(t) => {
402 buf.push(DataType::Time.type_tag());
403 buf.extend_from_slice(&8u32.to_le_bytes());
404 buf.extend_from_slice(&t.to_le_bytes());
405 }
406 Value::Date(d) => {
407 buf.push(DataType::Date.type_tag());
408 buf.extend_from_slice(&4u32.to_le_bytes());
409 buf.extend_from_slice(&d.to_le_bytes());
410 }
411 Value::Timestamp(t) => {
412 buf.push(DataType::Timestamp.type_tag());
413 buf.extend_from_slice(&8u32.to_le_bytes());
414 buf.extend_from_slice(&t.to_le_bytes());
415 }
416 Value::Interval {
417 months,
418 days,
419 micros,
420 } => {
421 buf.push(DataType::Interval.type_tag());
422 buf.extend_from_slice(&16u32.to_le_bytes());
423 buf.extend_from_slice(&months.to_le_bytes());
424 buf.extend_from_slice(&days.to_le_bytes());
425 buf.extend_from_slice(µs.to_le_bytes());
426 }
427 Value::Null => unreachable!(),
428 }
429 }
430}
431
432fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
433 match DataType::from_tag(type_tag) {
434 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
435 data[..8].try_into().unwrap(),
436 ))),
437 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
438 data[..8].try_into().unwrap(),
439 ))),
440 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
441 Some(DataType::Text) => {
442 let s = std::str::from_utf8(data)
443 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
444 Ok(Value::Text(CompactString::from(s)))
445 }
446 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
447 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
448 data[..8].try_into().unwrap(),
449 ))),
450 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
451 data[..4].try_into().unwrap(),
452 ))),
453 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
454 data[..8].try_into().unwrap(),
455 ))),
456 Some(DataType::Interval) => {
457 if data.len() < 16 {
458 return Err(SqlError::InvalidValue("truncated interval".into()));
459 }
460 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
461 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
462 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
463 Ok(Value::Interval {
464 months,
465 days,
466 micros,
467 })
468 }
469 _ => Err(SqlError::InvalidValue(format!(
470 "unknown column type tag: {type_tag}"
471 ))),
472 }
473}
474
475fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
476 if data.len() < 2 {
477 return Err(SqlError::InvalidValue("row data too short".into()));
478 }
479 let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
480 let bitmap_bytes = col_count.div_ceil(8);
481 let pos = 2;
482 if data.len() < pos + bitmap_bytes {
483 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
484 }
485 Ok((
486 col_count,
487 &data[pos..pos + bitmap_bytes],
488 pos + bitmap_bytes,
489 ))
490}
491
492pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
493 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
494
495 let mut values = Vec::with_capacity(col_count);
496 for i in 0..col_count {
497 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
498 values.push(Value::Null);
499 continue;
500 }
501
502 if pos + 5 > data.len() {
503 return Err(SqlError::InvalidValue("truncated column data".into()));
504 }
505 let type_tag = data[pos];
506 pos += 1;
507 let data_len =
508 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
509 pos += 4;
510
511 if pos + data_len > data.len() {
512 return Err(SqlError::InvalidValue("truncated column value".into()));
513 }
514
515 values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
516 pos += data_len;
517 }
518
519 Ok(values)
520}
521
522#[inline]
524pub fn row_non_pk_count(data: &[u8]) -> usize {
525 u16::from_le_bytes([data[0], data[1]]) as usize
526}
527
528pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
529 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
530
531 for i in 0..col_count {
532 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
533 continue;
534 }
535
536 if pos + 5 > data.len() {
537 return Err(SqlError::InvalidValue("truncated column data".into()));
538 }
539 let type_tag = data[pos];
540 pos += 1;
541 let data_len =
542 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
543 pos += 4;
544
545 if pos + data_len > data.len() {
546 return Err(SqlError::InvalidValue("truncated column value".into()));
547 }
548
549 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
550 out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
551 }
552 pos += data_len;
553 }
554
555 Ok(())
556}
557
558pub fn decode_pk_into(
559 key: &[u8],
560 count: usize,
561 out: &mut [Value],
562 pk_mapping: &[usize],
563) -> Result<()> {
564 let mut pos = 0;
565 for i in 0..count {
566 let (v, n) = decode_key_value(&key[pos..])?;
567 if i < pk_mapping.len() {
568 out[pk_mapping[i]] = v;
569 }
570 pos += n;
571 }
572 Ok(())
573}
574
575pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
576 if targets.is_empty() {
577 return Ok(Vec::new());
578 }
579 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
580
581 let mut results = Vec::with_capacity(targets.len());
582 let mut ti = 0;
583
584 for col in 0..col_count {
585 if ti >= targets.len() {
586 break;
587 }
588 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
589
590 if col == targets[ti] {
591 if is_null {
592 results.push(Value::Null);
593 } else {
594 if pos + 5 > data.len() {
595 return Err(SqlError::InvalidValue("truncated column data".into()));
596 }
597 let type_tag = data[pos];
598 pos += 1;
599 let data_len =
600 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
601 as usize;
602 pos += 4;
603 if pos + data_len > data.len() {
604 return Err(SqlError::InvalidValue("truncated column value".into()));
605 }
606 results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
607 pos += data_len;
608 }
609 ti += 1;
610 } else if !is_null {
611 if pos + 5 > data.len() {
612 return Err(SqlError::InvalidValue("truncated column data".into()));
613 }
614 let data_len =
615 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
616 as usize;
617 pos += 5 + data_len;
618 }
619 }
620
621 while ti < targets.len() {
623 results.push(Value::Null);
624 ti += 1;
625 }
626
627 Ok(results)
628}
629
630pub fn decode_columns_into(
631 data: &[u8],
632 targets: &[usize],
633 schema_cols: &[usize],
634 row: &mut [Value],
635) -> Result<()> {
636 if targets.is_empty() {
637 return Ok(());
638 }
639 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
640
641 let mut ti = 0;
642 for col in 0..col_count {
643 if ti >= targets.len() {
644 break;
645 }
646 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
647
648 if col == targets[ti] {
649 if is_null {
650 row[schema_cols[ti]] = Value::Null;
651 } else {
652 if pos + 5 > data.len() {
653 return Err(SqlError::InvalidValue("truncated column data".into()));
654 }
655 let type_tag = data[pos];
656 pos += 1;
657 let data_len =
658 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
659 as usize;
660 pos += 4;
661 if pos + data_len > data.len() {
662 return Err(SqlError::InvalidValue("truncated column value".into()));
663 }
664 row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
665 pos += data_len;
666 }
667 ti += 1;
668 } else if !is_null {
669 if pos + 5 > data.len() {
670 return Err(SqlError::InvalidValue("truncated column data".into()));
671 }
672 let data_len =
673 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
674 as usize;
675 pos += 5 + data_len;
676 }
677 }
678
679 Ok(())
680}
681
682#[derive(Debug, Clone, Copy)]
683pub enum RawColumn<'a> {
684 Null,
685 Integer(i64),
686 Real(f64),
687 Boolean(bool),
688 Text(&'a str),
689 Blob(&'a [u8]),
690 Time(i64),
691 Date(i32),
692 Timestamp(i64),
693 Interval { months: i32, days: i32, micros: i64 },
694}
695
696impl<'a> RawColumn<'a> {
697 pub fn to_value(self) -> Value {
698 match self {
699 RawColumn::Null => Value::Null,
700 RawColumn::Integer(i) => Value::Integer(i),
701 RawColumn::Real(r) => Value::Real(r),
702 RawColumn::Boolean(b) => Value::Boolean(b),
703 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
704 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
705 RawColumn::Time(t) => Value::Time(t),
706 RawColumn::Date(d) => Value::Date(d),
707 RawColumn::Timestamp(t) => Value::Timestamp(t),
708 RawColumn::Interval {
709 months,
710 days,
711 micros,
712 } => Value::Interval {
713 months,
714 days,
715 micros,
716 },
717 }
718 }
719
720 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
721 use std::cmp::Ordering;
722 match (self, other) {
723 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
724 (RawColumn::Null, _) | (_, Value::Null) => None,
725 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
726 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
727 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
728 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
729 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
730 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
731 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
732 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
733 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
734 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
735 (
736 RawColumn::Interval {
737 months: am,
738 days: ad,
739 micros: au,
740 },
741 Value::Interval {
742 months: bm,
743 days: bd,
744 micros: bu,
745 },
746 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
747 _ => None,
748 }
749 }
750
751 pub fn eq_value(&self, other: &Value) -> bool {
752 match (self, other) {
753 (RawColumn::Null, Value::Null) => true,
754 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
755 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
756 (RawColumn::Real(a), Value::Real(b)) => a == b,
757 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
758 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
759 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
760 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
761 (RawColumn::Time(a), Value::Time(b)) => a == b,
762 (RawColumn::Date(a), Value::Date(b)) => a == b,
763 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
764 (
765 RawColumn::Interval {
766 months: am,
767 days: ad,
768 micros: au,
769 },
770 Value::Interval {
771 months: bm,
772 days: bd,
773 micros: bu,
774 },
775 ) => am == bm && ad == bd && au == bu,
776 _ => false,
777 }
778 }
779
780 pub fn as_f64(&self) -> Option<f64> {
781 match self {
782 RawColumn::Integer(i) => Some(*i as f64),
783 RawColumn::Real(r) => Some(*r),
784 _ => None,
785 }
786 }
787
788 pub fn as_i64(&self) -> Option<i64> {
789 match self {
790 RawColumn::Integer(i) => Some(*i),
791 RawColumn::Time(t) => Some(*t),
792 RawColumn::Date(d) => Some(*d as i64),
793 RawColumn::Timestamp(t) => Some(*t),
794 _ => None,
795 }
796 }
797}
798
799fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
800 match DataType::from_tag(type_tag) {
801 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
802 data[..8].try_into().unwrap(),
803 ))),
804 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
805 data[..8].try_into().unwrap(),
806 ))),
807 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
808 Some(DataType::Text) => {
809 let s = std::str::from_utf8(data)
810 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
811 Ok(RawColumn::Text(s))
812 }
813 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
814 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
815 data[..8].try_into().unwrap(),
816 ))),
817 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
818 data[..4].try_into().unwrap(),
819 ))),
820 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
821 data[..8].try_into().unwrap(),
822 ))),
823 Some(DataType::Interval) => {
824 if data.len() < 16 {
825 return Err(SqlError::InvalidValue("truncated interval".into()));
826 }
827 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
828 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
829 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
830 Ok(RawColumn::Interval {
831 months,
832 days,
833 micros,
834 })
835 }
836 _ => Err(SqlError::InvalidValue(format!(
837 "unknown column type tag: {type_tag}"
838 ))),
839 }
840}
841
842pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
844 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
845 if target >= col_count || new_val.is_null() {
846 return Ok(false);
847 }
848 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
849 if was_null {
850 return Ok(false);
851 }
852 for col in 0..target {
853 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
854 if !is_null {
855 if pos + 5 > data.len() {
856 return Err(SqlError::InvalidValue("truncated column data".into()));
857 }
858 let data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
859 pos += 5 + data_len;
860 }
861 }
862 if pos + 5 > data.len() {
863 return Err(SqlError::InvalidValue("truncated column data".into()));
864 }
865 let old_data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
866 let new_data_len = match new_val {
867 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
868 Value::Date(_) => 4,
869 Value::Interval { .. } => 16,
870 Value::Boolean(_) => 1,
871 Value::Text(s) => s.len(),
872 Value::Blob(b) => b.len(),
873 Value::Null => return Ok(false),
874 };
875 if new_data_len != old_data_len {
876 return Ok(false);
877 }
878 data[pos] = new_val.data_type().type_tag();
879 let val_start = pos + 5;
880 match new_val {
881 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
882 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
883 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
884 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
885 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
886 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
887 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
888 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
889 Value::Interval {
890 months,
891 days,
892 micros,
893 } => {
894 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
895 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
896 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
897 }
898 Value::Null => unreachable!(),
899 }
900 Ok(true)
901}
902
903pub fn patch_row_column(
905 data: &[u8],
906 target: usize,
907 new_val: &Value,
908 out: &mut Vec<u8>,
909) -> Result<()> {
910 let (col_count, bitmap, header_end) = parse_row_header(data)?;
911
912 let new_col_count = if target >= col_count {
914 target + 1
915 } else {
916 col_count
917 };
918 let new_bitmap_bytes = new_col_count.div_ceil(8);
919 let bitmap_bytes = col_count.div_ceil(8);
920 out.clear();
921
922 out.extend_from_slice(&(new_col_count as u16).to_le_bytes());
923 let bitmap_start = out.len();
924 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
925 for _ in bitmap_bytes..new_bitmap_bytes {
926 out.push(0xFF); }
928 if new_val.is_null() {
929 out[bitmap_start + target / 8] |= 1 << (target % 8);
930 } else {
931 out[bitmap_start + target / 8] &= !(1 << (target % 8));
932 }
933
934 let mut pos = header_end;
935 for col in 0..new_col_count {
936 let was_null = if col < col_count {
937 bitmap[col / 8] & (1 << (col % 8)) != 0
938 } else {
939 true };
941
942 if col == target {
943 if !was_null && pos + 5 <= data.len() {
944 let data_len = u32::from_le_bytes([
945 data[pos + 1],
946 data[pos + 2],
947 data[pos + 3],
948 data[pos + 4],
949 ]) as usize;
950 pos += 5 + data_len;
951 }
952 if !new_val.is_null() {
953 match new_val {
954 Value::Integer(v) => {
955 out.push(DataType::Integer.type_tag());
956 out.extend_from_slice(&8u32.to_le_bytes());
957 out.extend_from_slice(&v.to_le_bytes());
958 }
959 Value::Real(r) => {
960 out.push(DataType::Real.type_tag());
961 out.extend_from_slice(&8u32.to_le_bytes());
962 out.extend_from_slice(&r.to_le_bytes());
963 }
964 Value::Boolean(b) => {
965 out.push(DataType::Boolean.type_tag());
966 out.extend_from_slice(&1u32.to_le_bytes());
967 out.push(if *b { 1 } else { 0 });
968 }
969 Value::Text(s) => {
970 let bytes = s.as_bytes();
971 out.push(DataType::Text.type_tag());
972 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
973 out.extend_from_slice(bytes);
974 }
975 Value::Blob(d) => {
976 out.push(DataType::Blob.type_tag());
977 out.extend_from_slice(&(d.len() as u32).to_le_bytes());
978 out.extend_from_slice(d);
979 }
980 Value::Time(t) => {
981 out.push(DataType::Time.type_tag());
982 out.extend_from_slice(&8u32.to_le_bytes());
983 out.extend_from_slice(&t.to_le_bytes());
984 }
985 Value::Date(d) => {
986 out.push(DataType::Date.type_tag());
987 out.extend_from_slice(&4u32.to_le_bytes());
988 out.extend_from_slice(&d.to_le_bytes());
989 }
990 Value::Timestamp(t) => {
991 out.push(DataType::Timestamp.type_tag());
992 out.extend_from_slice(&8u32.to_le_bytes());
993 out.extend_from_slice(&t.to_le_bytes());
994 }
995 Value::Interval {
996 months,
997 days,
998 micros,
999 } => {
1000 out.push(DataType::Interval.type_tag());
1001 out.extend_from_slice(&16u32.to_le_bytes());
1002 out.extend_from_slice(&months.to_le_bytes());
1003 out.extend_from_slice(&days.to_le_bytes());
1004 out.extend_from_slice(µs.to_le_bytes());
1005 }
1006 Value::Null => unreachable!(),
1007 }
1008 }
1009 } else if was_null {
1010 } else {
1011 if pos + 5 > data.len() {
1012 return Err(SqlError::InvalidValue("truncated column data".into()));
1013 }
1014 let data_len =
1015 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1016 as usize;
1017 let end = pos + 5 + data_len;
1018 if end > data.len() {
1019 return Err(SqlError::InvalidValue("truncated column value".into()));
1020 }
1021 out.extend_from_slice(&data[pos..end]);
1022 pos = end;
1023 }
1024 }
1025 Ok(())
1026}
1027
1028pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1029 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
1030 if target >= col_count {
1031 return Ok(RawColumn::Null);
1032 }
1033
1034 for col in 0..=target {
1035 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1036
1037 if col == target {
1038 if is_null {
1039 return Ok(RawColumn::Null);
1040 }
1041 if pos + 5 > data.len() {
1042 return Err(SqlError::InvalidValue("truncated column data".into()));
1043 }
1044 let type_tag = data[pos];
1045 pos += 1;
1046 let data_len =
1047 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
1048 as usize;
1049 pos += 4;
1050 if pos + data_len > data.len() {
1051 return Err(SqlError::InvalidValue("truncated column value".into()));
1052 }
1053 return decode_value_raw(type_tag, &data[pos..pos + data_len]);
1054 } else if !is_null {
1055 if pos + 5 > data.len() {
1056 return Err(SqlError::InvalidValue("truncated column data".into()));
1057 }
1058 let data_len =
1059 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1060 as usize;
1061 pos += 5 + data_len;
1062 }
1063 }
1064
1065 unreachable!()
1066}
1067
1068pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1070 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
1071 if target >= col_count {
1072 return Ok((RawColumn::Null, usize::MAX));
1073 }
1074
1075 for col in 0..=target {
1076 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1077
1078 if col == target {
1079 if is_null {
1080 return Ok((RawColumn::Null, usize::MAX));
1081 }
1082 if pos + 5 > data.len() {
1083 return Err(SqlError::InvalidValue("truncated column data".into()));
1084 }
1085 let tag_offset = pos;
1086 let type_tag = data[pos];
1087 pos += 1;
1088 let data_len =
1089 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
1090 as usize;
1091 pos += 4;
1092 if pos + data_len > data.len() {
1093 return Err(SqlError::InvalidValue("truncated column value".into()));
1094 }
1095 let raw = decode_value_raw(type_tag, &data[pos..pos + data_len])?;
1096 return Ok((raw, tag_offset));
1097 } else if !is_null {
1098 if pos + 5 > data.len() {
1099 return Err(SqlError::InvalidValue("truncated column data".into()));
1100 }
1101 let data_len =
1102 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1103 as usize;
1104 pos += 5 + data_len;
1105 }
1106 }
1107
1108 unreachable!()
1109}
1110
1111pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1113 if offset == usize::MAX || new_val.is_null() {
1114 return Ok(false);
1115 }
1116 if offset + 5 > data.len() {
1117 return Err(SqlError::InvalidValue("truncated column data".into()));
1118 }
1119 let old_data_len =
1120 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1121 let new_data_len = match new_val {
1122 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1123 Value::Date(_) => 4,
1124 Value::Interval { .. } => 16,
1125 Value::Boolean(_) => 1,
1126 Value::Text(s) => s.len(),
1127 Value::Blob(b) => b.len(),
1128 Value::Null => return Ok(false),
1129 };
1130 if new_data_len != old_data_len {
1131 return Ok(false);
1132 }
1133 data[offset] = new_val.data_type().type_tag();
1134 let val_start = offset + 5;
1135 match new_val {
1136 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1137 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1138 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1139 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1140 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1141 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1142 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1143 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1144 Value::Interval {
1145 months,
1146 days,
1147 micros,
1148 } => {
1149 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1150 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1151 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1152 }
1153 Value::Null => unreachable!(),
1154 }
1155 Ok(true)
1156}
1157
1158pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1159 if key.is_empty() || key[0] != TAG_INTEGER {
1160 return Err(SqlError::InvalidValue("not an integer key".into()));
1161 }
1162 let (val, _) = decode_integer(&key[1..])?;
1163 match val {
1164 Value::Integer(i) => Ok(i),
1165 _ => unreachable!(),
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172
1173 #[test]
1174 fn key_null() {
1175 let encoded = encode_key_value(&Value::Null);
1176 let (decoded, n) = decode_key_value(&encoded).unwrap();
1177 assert_eq!(n, 1);
1178 assert_eq!(decoded, Value::Null);
1179 }
1180
1181 #[test]
1182 fn key_boolean() {
1183 let f_enc = encode_key_value(&Value::Boolean(false));
1184 let t_enc = encode_key_value(&Value::Boolean(true));
1185 assert!(f_enc < t_enc);
1186
1187 let (f_dec, _) = decode_key_value(&f_enc).unwrap();
1188 let (t_dec, _) = decode_key_value(&t_enc).unwrap();
1189 assert_eq!(f_dec, Value::Boolean(false));
1190 assert_eq!(t_dec, Value::Boolean(true));
1191 }
1192
1193 #[test]
1194 fn key_integer_roundtrip() {
1195 let test_values = [
1196 i64::MIN,
1197 -1_000_000,
1198 -256,
1199 -1,
1200 0,
1201 1,
1202 127,
1203 128,
1204 255,
1205 256,
1206 65535,
1207 1_000_000,
1208 i64::MAX,
1209 ];
1210 for &v in &test_values {
1211 let encoded = encode_key_value(&Value::Integer(v));
1212 let (decoded, _) = decode_key_value(&encoded).unwrap();
1213 assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
1214 }
1215 }
1216
1217 #[test]
1218 fn key_integer_sort_order() {
1219 let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
1220 let encoded: Vec<Vec<u8>> = values
1221 .iter()
1222 .map(|&v| encode_key_value(&Value::Integer(v)))
1223 .collect();
1224
1225 for i in 0..encoded.len() - 1 {
1226 assert!(
1227 encoded[i] < encoded[i + 1],
1228 "sort order broken: {} vs {}",
1229 values[i],
1230 values[i + 1]
1231 );
1232 }
1233 }
1234
1235 #[test]
1236 fn key_real_roundtrip() {
1237 let test_values = [
1238 f64::NEG_INFINITY,
1239 -1e100,
1240 -1.0,
1241 -f64::MIN_POSITIVE,
1242 -0.0,
1243 0.0,
1244 f64::MIN_POSITIVE,
1245 0.5,
1246 1.0,
1247 1e100,
1248 f64::INFINITY,
1249 ];
1250 for &v in &test_values {
1251 let encoded = encode_key_value(&Value::Real(v));
1252 let (decoded, _) = decode_key_value(&encoded).unwrap();
1253 match decoded {
1254 Value::Real(r) => {
1255 assert!(
1256 v.to_bits() == r.to_bits(),
1257 "roundtrip failed for {v}: got {r}"
1258 );
1259 }
1260 _ => panic!("expected Real"),
1261 }
1262 }
1263 }
1264
1265 #[test]
1266 fn key_real_sort_order() {
1267 let values = [
1268 f64::NEG_INFINITY,
1269 -100.0,
1270 -1.0,
1271 -0.0,
1272 0.0,
1273 1.0,
1274 100.0,
1275 f64::INFINITY,
1276 ];
1277 let encoded: Vec<Vec<u8>> = values
1278 .iter()
1279 .map(|&v| encode_key_value(&Value::Real(v)))
1280 .collect();
1281
1282 for i in 0..encoded.len() - 1 {
1283 assert!(
1284 encoded[i] <= encoded[i + 1],
1285 "sort order broken: {} vs {}",
1286 values[i],
1287 values[i + 1]
1288 );
1289 }
1290 }
1291
1292 #[test]
1293 fn key_text_roundtrip() {
1294 let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
1295 for &v in &test_values {
1296 let encoded = encode_key_value(&Value::Text(v.into()));
1297 let (decoded, _) = decode_key_value(&encoded).unwrap();
1298 assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
1299 }
1300 }
1301
1302 #[test]
1303 fn key_text_sort_order() {
1304 let values = ["", "a", "ab", "b", "ba", "z"];
1305 let encoded: Vec<Vec<u8>> = values
1306 .iter()
1307 .map(|&v| encode_key_value(&Value::Text(v.into())))
1308 .collect();
1309
1310 for i in 0..encoded.len() - 1 {
1311 assert!(
1312 encoded[i] < encoded[i + 1],
1313 "sort order broken: {:?} vs {:?}",
1314 values[i],
1315 values[i + 1]
1316 );
1317 }
1318 }
1319
1320 #[test]
1321 fn key_blob_roundtrip() {
1322 let test_values: Vec<Vec<u8>> = vec![
1323 vec![],
1324 vec![0x00],
1325 vec![0x00, 0xFF],
1326 vec![0xFF, 0x00],
1327 vec![0x00, 0x00, 0x00],
1328 ];
1329 for v in &test_values {
1330 let encoded = encode_key_value(&Value::Blob(v.clone()));
1331 let (decoded, _) = decode_key_value(&encoded).unwrap();
1332 assert_eq!(decoded, Value::Blob(v.clone()));
1333 }
1334 }
1335
1336 #[test]
1337 fn key_composite_roundtrip() {
1338 let values = vec![
1339 Value::Integer(42),
1340 Value::Text("hello".into()),
1341 Value::Boolean(true),
1342 ];
1343 let encoded = encode_composite_key(&values);
1344 let decoded = decode_composite_key(&encoded, 3).unwrap();
1345 assert_eq!(decoded[0], Value::Integer(42));
1346 assert_eq!(decoded[1], Value::Text("hello".into()));
1347 assert_eq!(decoded[2], Value::Boolean(true));
1348 }
1349
1350 #[test]
1351 fn key_composite_sort_order() {
1352 let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
1354 let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
1355 let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
1356 assert!(k1 < k2);
1357 assert!(k2 < k3);
1358 }
1359
1360 #[test]
1361 fn key_cross_type_ordering() {
1362 let null = encode_key_value(&Value::Null);
1363 let bool_val = encode_key_value(&Value::Boolean(false));
1364 let int = encode_key_value(&Value::Integer(0));
1365 let text = encode_key_value(&Value::Text("".into()));
1366 let blob = encode_key_value(&Value::Blob(vec![]));
1367
1368 assert!(null < blob);
1369 assert!(blob < text);
1370 assert!(text < bool_val);
1371 assert!(bool_val < int);
1372 }
1373
1374 #[test]
1375 fn row_roundtrip_simple() {
1376 let values = vec![
1377 Value::Integer(42),
1378 Value::Text("hello".into()),
1379 Value::Boolean(true),
1380 ];
1381 let encoded = encode_row(&values);
1382 let decoded = decode_row(&encoded).unwrap();
1383 assert_eq!(decoded.len(), 3);
1384 assert_eq!(decoded[0], Value::Integer(42));
1385 assert_eq!(decoded[1], Value::Text("hello".into()));
1386 assert_eq!(decoded[2], Value::Boolean(true));
1387 }
1388
1389 #[test]
1390 fn row_roundtrip_with_nulls() {
1391 let values = vec![
1392 Value::Integer(1),
1393 Value::Null,
1394 Value::Text("test".into()),
1395 Value::Null,
1396 ];
1397 let encoded = encode_row(&values);
1398 let decoded = decode_row(&encoded).unwrap();
1399 assert_eq!(decoded.len(), 4);
1400 assert_eq!(decoded[0], Value::Integer(1));
1401 assert!(decoded[1].is_null());
1402 assert_eq!(decoded[2], Value::Text("test".into()));
1403 assert!(decoded[3].is_null());
1404 }
1405
1406 #[test]
1407 fn row_roundtrip_empty() {
1408 let values: Vec<Value> = vec![];
1409 let encoded = encode_row(&values);
1410 let decoded = decode_row(&encoded).unwrap();
1411 assert!(decoded.is_empty());
1412 }
1413
1414 #[test]
1415 fn row_roundtrip_all_types() {
1416 let values = vec![
1417 Value::Integer(-100),
1418 Value::Real(3.15),
1419 Value::Text("hello world".into()),
1420 Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1421 Value::Boolean(false),
1422 Value::Null,
1423 ];
1424 let encoded = encode_row(&values);
1425 let decoded = decode_row(&encoded).unwrap();
1426 assert_eq!(decoded.len(), 6);
1427 assert_eq!(decoded[0], Value::Integer(-100));
1428 assert_eq!(decoded[1], Value::Real(3.15));
1429 assert_eq!(decoded[2], Value::Text("hello world".into()));
1430 assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1431 assert_eq!(decoded[4], Value::Boolean(false));
1432 assert!(decoded[5].is_null());
1433 }
1434
1435 #[test]
1436 fn null_escaped_with_embedded_nulls() {
1437 let text = "before\0after";
1438 let encoded = encode_key_value(&Value::Text(text.into()));
1439 let (decoded, _) = decode_key_value(&encoded).unwrap();
1440 assert_eq!(decoded, Value::Text(text.into()));
1441 }
1442
1443 #[test]
1444 fn key_integer_edge_cases() {
1445 for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1446 let encoded = encode_key_value(&Value::Integer(v));
1447 let (decoded, n) = decode_key_value(&encoded).unwrap();
1448 assert_eq!(n, encoded.len());
1449 assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1450 }
1451 }
1452
1453 #[test]
1454 fn decode_columns_single() {
1455 let values = vec![
1456 Value::Integer(42),
1457 Value::Text("hello".into()),
1458 Value::Boolean(true),
1459 ];
1460 let encoded = encode_row(&values);
1461 let cols = decode_columns(&encoded, &[1]).unwrap();
1462 assert_eq!(cols.len(), 1);
1463 assert_eq!(cols[0], Value::Text("hello".into()));
1464 }
1465
1466 #[test]
1467 fn decode_columns_multiple() {
1468 let values = vec![
1469 Value::Integer(1),
1470 Value::Real(2.5),
1471 Value::Text("skip".into()),
1472 Value::Boolean(false),
1473 Value::Blob(vec![0xAB]),
1474 ];
1475 let encoded = encode_row(&values);
1476 let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1477 assert_eq!(cols.len(), 3);
1478 assert_eq!(cols[0], Value::Integer(1));
1479 assert_eq!(cols[1], Value::Boolean(false));
1480 assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1481 }
1482
1483 #[test]
1484 fn decode_columns_with_nulls() {
1485 let values = vec![
1486 Value::Integer(10),
1487 Value::Null,
1488 Value::Text("after_null".into()),
1489 Value::Null,
1490 Value::Boolean(true),
1491 ];
1492 let encoded = encode_row(&values);
1493 let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1494 assert_eq!(cols.len(), 3);
1495 assert!(cols[0].is_null());
1496 assert_eq!(cols[1], Value::Text("after_null".into()));
1497 assert_eq!(cols[2], Value::Boolean(true));
1498 }
1499
1500 #[test]
1501 fn decode_columns_first_and_last() {
1502 let values = vec![
1503 Value::Text("first".into()),
1504 Value::Integer(99),
1505 Value::Boolean(false),
1506 Value::Real(3.125),
1507 ];
1508 let encoded = encode_row(&values);
1509 let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1510 assert_eq!(cols.len(), 2);
1511 assert_eq!(cols[0], Value::Text("first".into()));
1512 assert_eq!(cols[1], Value::Real(3.125));
1513 }
1514
1515 #[test]
1516 fn decode_columns_empty_targets() {
1517 let values = vec![Value::Integer(1)];
1518 let encoded = encode_row(&values);
1519 let cols = decode_columns(&encoded, &[]).unwrap();
1520 assert!(cols.is_empty());
1521 }
1522
1523 #[test]
1524 fn decode_columns_all_matches_full_decode() {
1525 let values = vec![
1526 Value::Integer(-100),
1527 Value::Real(3.15),
1528 Value::Text("hello world".into()),
1529 Value::Blob(vec![0xDE, 0xAD]),
1530 Value::Boolean(false),
1531 Value::Null,
1532 ];
1533 let encoded = encode_row(&values);
1534 let full = decode_row(&encoded).unwrap();
1535 let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1536 assert_eq!(full, selective);
1537 }
1538
1539 #[test]
1540 fn raw_column_integer() {
1541 let values = vec![Value::Integer(42), Value::Text("hello".into())];
1542 let encoded = encode_row(&values);
1543 let raw = decode_column_raw(&encoded, 0).unwrap();
1544 assert!(matches!(raw, RawColumn::Integer(42)));
1545 assert_eq!(raw.to_value(), Value::Integer(42));
1546 }
1547
1548 #[test]
1549 fn raw_column_text_borrows() {
1550 let values = vec![Value::Integer(1), Value::Text("hello".into())];
1551 let encoded = encode_row(&values);
1552 let raw = decode_column_raw(&encoded, 1).unwrap();
1553 match raw {
1554 RawColumn::Text(s) => assert_eq!(s, "hello"),
1555 other => panic!("expected Text, got {other:?}"),
1556 }
1557 }
1558
1559 #[test]
1560 fn raw_column_null() {
1561 let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1562 let encoded = encode_row(&values);
1563 let raw = decode_column_raw(&encoded, 1).unwrap();
1564 assert!(matches!(raw, RawColumn::Null));
1565 }
1566
1567 #[test]
1568 fn raw_column_last() {
1569 let values = vec![
1570 Value::Integer(1),
1571 Value::Text("skip".into()),
1572 Value::Real(3.15),
1573 ];
1574 let encoded = encode_row(&values);
1575 let raw = decode_column_raw(&encoded, 2).unwrap();
1576 match raw {
1577 RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1578 other => panic!("expected Real, got {other:?}"),
1579 }
1580 }
1581
1582 #[test]
1583 fn raw_column_out_of_bounds_returns_null() {
1584 let values = vec![Value::Integer(1)];
1585 let encoded = encode_row(&values);
1586 assert!(matches!(
1587 decode_column_raw(&encoded, 1).unwrap(),
1588 RawColumn::Null
1589 ));
1590 }
1591
1592 #[test]
1593 fn raw_column_eq_value() {
1594 let raw_int = RawColumn::Integer(42);
1595 assert!(raw_int.eq_value(&Value::Integer(42)));
1596 assert!(!raw_int.eq_value(&Value::Integer(43)));
1597 assert!(raw_int.eq_value(&Value::Real(42.0)));
1598
1599 let raw_text = RawColumn::Text("hello");
1600 assert!(raw_text.eq_value(&Value::Text("hello".into())));
1601 assert!(!raw_text.eq_value(&Value::Text("world".into())));
1602 }
1603
1604 #[test]
1605 fn raw_column_cmp_value() {
1606 use std::cmp::Ordering;
1607 let raw = RawColumn::Integer(42);
1608 assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1609 assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1610 assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1611 assert_eq!(raw.cmp_value(&Value::Null), None);
1612 }
1613
1614 #[test]
1615 fn raw_column_as_numeric() {
1616 assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1617 assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1618 assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1619 assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1620 assert_eq!(RawColumn::Text("x").as_f64(), None);
1621 assert_eq!(RawColumn::Null.as_i64(), None);
1622 }
1623
1624 #[test]
1625 fn decode_pk_integer_roundtrip() {
1626 for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1627 let encoded = encode_key_value(&Value::Integer(v));
1628 let decoded = decode_pk_integer(&encoded).unwrap();
1629 assert_eq!(decoded, v);
1630 }
1631 }
1632
1633 #[test]
1634 fn decode_pk_integer_rejects_non_integer() {
1635 let encoded = encode_key_value(&Value::Text("hello".into()));
1636 assert!(decode_pk_integer(&encoded).is_err());
1637 }
1638
1639 #[test]
1640 fn raw_column_blob() {
1641 let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1642 let encoded = encode_row(&values);
1643 let raw = decode_column_raw(&encoded, 0).unwrap();
1644 match raw {
1645 RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1646 other => panic!("expected Blob, got {other:?}"),
1647 }
1648 }
1649
1650 #[test]
1651 fn raw_column_matches_full_decode() {
1652 let values = vec![
1653 Value::Integer(-100),
1654 Value::Real(3.15),
1655 Value::Text("hello world".into()),
1656 Value::Blob(vec![0xDE, 0xAD]),
1657 Value::Boolean(false),
1658 Value::Null,
1659 ];
1660 let encoded = encode_row(&values);
1661 let full = decode_row(&encoded).unwrap();
1662 for (i, expected) in full.iter().enumerate() {
1663 let raw = decode_column_raw(&encoded, i).unwrap();
1664 assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1665 }
1666 }
1667}