1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
10const TAG_BLOB: u8 = 0x01;
11const TAG_TEXT: u8 = 0x02;
12const TAG_BOOLEAN: u8 = 0x03;
13const TAG_INTEGER: u8 = 0x04;
14const TAG_REAL: u8 = 0x05;
15
16pub fn encode_key_value(value: &Value) -> Vec<u8> {
18 match value {
19 Value::Null => vec![TAG_NULL],
20 Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
21 Value::Integer(i) => encode_integer(*i),
22 Value::Real(r) => encode_real(*r),
23 Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
24 Value::Blob(b) => encode_bytes(TAG_BLOB, b),
25 }
26}
27
28pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
30 let mut buf = Vec::new();
31 for v in values {
32 buf.extend_from_slice(&encode_key_value(v));
33 }
34 buf
35}
36
37pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
38 buf.clear();
39 for v in values {
40 encode_key_value_into(v, buf);
41 }
42}
43
44fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
45 match value {
46 Value::Null => buf.push(TAG_NULL),
47 Value::Boolean(b) => {
48 buf.push(TAG_BOOLEAN);
49 buf.push(if *b { 0x01 } else { 0x00 });
50 }
51 Value::Integer(i) => encode_integer_into(*i, buf),
52 Value::Real(r) => encode_real_into(*r, buf),
53 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
54 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
55 }
56}
57
58fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
59 buf.push(TAG_INTEGER);
60 if val == 0 {
61 buf.push(0x80);
62 return;
63 }
64 if val > 0 {
65 let bytes = val.to_be_bytes();
66 let start = bytes.iter().position(|&b| b != 0).unwrap();
67 let byte_count = (8 - start) as u8;
68 buf.push(0x80 + byte_count);
69 buf.extend_from_slice(&bytes[start..]);
70 } else {
71 let abs_val = if val == i64::MIN {
72 u64::MAX / 2 + 1
73 } else {
74 (-val) as u64
75 };
76 let bytes = abs_val.to_be_bytes();
77 let start = bytes.iter().position(|&b| b != 0).unwrap();
78 let byte_count = (8 - start) as u8;
79 buf.push(0x80 - byte_count);
80 for &b in &bytes[start..] {
81 buf.push(!b);
82 }
83 }
84}
85
86fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
87 buf.push(TAG_REAL);
88 let bits = val.to_bits();
89 let encoded = if val.is_sign_negative() {
90 !bits
91 } else {
92 bits ^ (1u64 << 63)
93 };
94 buf.extend_from_slice(&encoded.to_be_bytes());
95}
96
97fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
98 buf.push(tag);
99 for &b in data {
100 if b == 0x00 {
101 buf.push(0x00);
102 buf.push(0xFF);
103 } else {
104 buf.push(b);
105 }
106 }
107 buf.push(0x00);
108}
109
110pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
112 if data.is_empty() {
113 return Err(SqlError::InvalidValue("empty key data".into()));
114 }
115 match data[0] {
116 TAG_NULL => Ok((Value::Null, 1)),
117 TAG_BOOLEAN => {
118 if data.len() < 2 {
119 return Err(SqlError::InvalidValue("truncated boolean".into()));
120 }
121 Ok((Value::Boolean(data[1] != 0), 2))
122 }
123 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
124 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
125 TAG_TEXT => {
126 let (bytes, n) = decode_null_escaped(&data[1..])?;
127 let s = String::from_utf8(bytes)
128 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
129 Ok((Value::Text(CompactString::from(s)), n + 1))
130 }
131 TAG_BLOB => {
132 let (bytes, n) = decode_null_escaped(&data[1..])?;
133 Ok((Value::Blob(bytes), n + 1))
134 }
135 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
136 }
137}
138
139pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
141 let mut values = Vec::with_capacity(count);
142 let mut pos = 0;
143 for _ in 0..count {
144 let (v, n) = decode_key_value(&data[pos..])?;
145 values.push(v);
146 pos += n;
147 }
148 Ok(values)
149}
150
151fn encode_integer(val: i64) -> Vec<u8> {
154 let mut buf = vec![TAG_INTEGER];
155 if val == 0 {
156 buf.push(0x80);
157 return buf;
158 }
159 if val > 0 {
160 let bytes = val.to_be_bytes();
161 let start = bytes.iter().position(|&b| b != 0).unwrap();
163 let byte_count = (8 - start) as u8;
164 buf.push(0x80 + byte_count);
165 buf.extend_from_slice(&bytes[start..]);
166 } else {
167 let abs_val = if val == i64::MIN {
169 u64::MAX / 2 + 1
171 } else {
172 (-val) as u64
173 };
174 let bytes = abs_val.to_be_bytes();
175 let start = bytes.iter().position(|&b| b != 0).unwrap();
176 let byte_count = (8 - start) as u8;
177 buf.push(0x80 - byte_count);
178 for &b in &bytes[start..] {
180 buf.push(!b);
181 }
182 }
183 buf
184}
185
186fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
187 if data.is_empty() {
188 return Err(SqlError::InvalidValue("truncated integer".into()));
189 }
190 let marker = data[0];
191 if marker == 0x80 {
192 return Ok((Value::Integer(0), 1));
193 }
194 if marker > 0x80 {
195 let byte_count = (marker - 0x80) as usize;
197 if data.len() < 1 + byte_count {
198 return Err(SqlError::InvalidValue("truncated positive integer".into()));
199 }
200 let mut bytes = [0u8; 8];
201 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
202 let val = i64::from_be_bytes(bytes);
203 Ok((Value::Integer(val), 1 + byte_count))
204 } else {
205 let byte_count = (0x80 - marker) as usize;
207 if data.len() < 1 + byte_count {
208 return Err(SqlError::InvalidValue("truncated negative integer".into()));
209 }
210 let mut bytes = [0u8; 8];
211 for i in 0..byte_count {
212 bytes[8 - byte_count + i] = !data[1 + i];
213 }
214 let abs_val = u64::from_be_bytes(bytes);
215 let val = (-(abs_val as i128)) as i64;
217 Ok((Value::Integer(val), 1 + byte_count))
218 }
219}
220
221fn encode_real(val: f64) -> Vec<u8> {
224 let mut buf = vec![TAG_REAL];
225 let bits = val.to_bits();
226 let encoded = if val.is_sign_negative() {
227 !bits
229 } else {
230 bits ^ (1u64 << 63)
232 };
233 buf.extend_from_slice(&encoded.to_be_bytes());
234 buf
235}
236
237fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
238 if data.len() < 8 {
239 return Err(SqlError::InvalidValue("truncated real".into()));
240 }
241 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
242 let bits = if encoded & (1u64 << 63) != 0 {
243 encoded ^ (1u64 << 63)
245 } else {
246 !encoded
248 };
249 let val = f64::from_bits(bits);
250 Ok((Value::Real(val), 8))
251}
252
253fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
257 let mut buf = Vec::with_capacity(data.len() + 2);
258 buf.push(tag);
259 for &b in data {
260 if b == 0x00 {
261 buf.push(0x00);
262 buf.push(0xFF);
263 } else {
264 buf.push(b);
265 }
266 }
267 buf.push(0x00); buf
269}
270
271fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
273 let mut result = Vec::new();
274 let mut i = 0;
275 while i < data.len() {
276 if data[i] == 0x00 {
277 if i + 1 < data.len() && data[i + 1] == 0xFF {
278 result.push(0x00);
279 i += 2;
280 } else {
281 return Ok((result, i + 1)); }
283 } else {
284 result.push(data[i]);
285 i += 1;
286 }
287 }
288 Err(SqlError::InvalidValue(
289 "unterminated null-escaped string".into(),
290 ))
291}
292
293pub fn encode_row(values: &[Value]) -> Vec<u8> {
297 let col_count = values.len();
298 let bitmap_bytes = col_count.div_ceil(8);
299 let mut buf = Vec::new();
300
301 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
302 let mut bitmap = vec![0u8; bitmap_bytes];
303 for (i, v) in values.iter().enumerate() {
304 if v.is_null() {
305 bitmap[i / 8] |= 1 << (i % 8);
306 }
307 }
308 buf.extend_from_slice(&bitmap);
309
310 for v in values {
311 if v.is_null() {
312 continue;
313 }
314 match v {
315 Value::Integer(i) => {
316 buf.push(DataType::Integer.type_tag());
317 buf.extend_from_slice(&8u32.to_le_bytes());
318 buf.extend_from_slice(&i.to_le_bytes());
319 }
320 Value::Real(r) => {
321 buf.push(DataType::Real.type_tag());
322 buf.extend_from_slice(&8u32.to_le_bytes());
323 buf.extend_from_slice(&r.to_le_bytes());
324 }
325 Value::Boolean(b) => {
326 buf.push(DataType::Boolean.type_tag());
327 buf.extend_from_slice(&1u32.to_le_bytes());
328 buf.push(if *b { 1 } else { 0 });
329 }
330 Value::Text(s) => {
331 let bytes = s.as_bytes();
332 buf.push(DataType::Text.type_tag());
333 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
334 buf.extend_from_slice(bytes);
335 }
336 Value::Blob(data) => {
337 buf.push(DataType::Blob.type_tag());
338 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
339 buf.extend_from_slice(data);
340 }
341 Value::Null => unreachable!(),
342 }
343 }
344
345 buf
346}
347
348pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
349 buf.clear();
350 let col_count = values.len();
351 let bitmap_bytes = col_count.div_ceil(8);
352
353 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
354
355 let bitmap_start = buf.len();
356 buf.resize(buf.len() + bitmap_bytes, 0);
357
358 for (i, v) in values.iter().enumerate() {
359 if v.is_null() {
360 buf[bitmap_start + i / 8] |= 1 << (i % 8);
361 continue;
362 }
363 match v {
364 Value::Integer(val) => {
365 buf.push(DataType::Integer.type_tag());
366 buf.extend_from_slice(&8u32.to_le_bytes());
367 buf.extend_from_slice(&val.to_le_bytes());
368 }
369 Value::Real(r) => {
370 buf.push(DataType::Real.type_tag());
371 buf.extend_from_slice(&8u32.to_le_bytes());
372 buf.extend_from_slice(&r.to_le_bytes());
373 }
374 Value::Boolean(b) => {
375 buf.push(DataType::Boolean.type_tag());
376 buf.extend_from_slice(&1u32.to_le_bytes());
377 buf.push(if *b { 1 } else { 0 });
378 }
379 Value::Text(s) => {
380 let bytes = s.as_bytes();
381 buf.push(DataType::Text.type_tag());
382 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
383 buf.extend_from_slice(bytes);
384 }
385 Value::Blob(data) => {
386 buf.push(DataType::Blob.type_tag());
387 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
388 buf.extend_from_slice(data);
389 }
390 Value::Null => unreachable!(),
391 }
392 }
393}
394
395fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
396 match DataType::from_tag(type_tag) {
397 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
398 data[..8].try_into().unwrap(),
399 ))),
400 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
401 data[..8].try_into().unwrap(),
402 ))),
403 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
404 Some(DataType::Text) => {
405 let s = std::str::from_utf8(data)
406 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
407 Ok(Value::Text(CompactString::from(s)))
408 }
409 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
410 _ => Err(SqlError::InvalidValue(format!(
411 "unknown column type tag: {type_tag}"
412 ))),
413 }
414}
415
416fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
417 if data.len() < 2 {
418 return Err(SqlError::InvalidValue("row data too short".into()));
419 }
420 let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
421 let bitmap_bytes = col_count.div_ceil(8);
422 let pos = 2;
423 if data.len() < pos + bitmap_bytes {
424 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
425 }
426 Ok((
427 col_count,
428 &data[pos..pos + bitmap_bytes],
429 pos + bitmap_bytes,
430 ))
431}
432
433pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
434 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
435
436 let mut values = Vec::with_capacity(col_count);
437 for i in 0..col_count {
438 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
439 values.push(Value::Null);
440 continue;
441 }
442
443 if pos + 5 > data.len() {
444 return Err(SqlError::InvalidValue("truncated column data".into()));
445 }
446 let type_tag = data[pos];
447 pos += 1;
448 let data_len =
449 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
450 pos += 4;
451
452 if pos + data_len > data.len() {
453 return Err(SqlError::InvalidValue("truncated column value".into()));
454 }
455
456 values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
457 pos += data_len;
458 }
459
460 Ok(values)
461}
462
463#[inline]
465pub fn row_non_pk_count(data: &[u8]) -> usize {
466 u16::from_le_bytes([data[0], data[1]]) as usize
467}
468
469pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
470 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
471
472 for i in 0..col_count {
473 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
474 continue;
475 }
476
477 if pos + 5 > data.len() {
478 return Err(SqlError::InvalidValue("truncated column data".into()));
479 }
480 let type_tag = data[pos];
481 pos += 1;
482 let data_len =
483 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
484 pos += 4;
485
486 if pos + data_len > data.len() {
487 return Err(SqlError::InvalidValue("truncated column value".into()));
488 }
489
490 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
491 out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
492 }
493 pos += data_len;
494 }
495
496 Ok(())
497}
498
499pub fn decode_pk_into(
500 key: &[u8],
501 count: usize,
502 out: &mut [Value],
503 pk_mapping: &[usize],
504) -> Result<()> {
505 let mut pos = 0;
506 for i in 0..count {
507 let (v, n) = decode_key_value(&key[pos..])?;
508 if i < pk_mapping.len() {
509 out[pk_mapping[i]] = v;
510 }
511 pos += n;
512 }
513 Ok(())
514}
515
516pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
517 if targets.is_empty() {
518 return Ok(Vec::new());
519 }
520 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
521
522 let mut results = Vec::with_capacity(targets.len());
523 let mut ti = 0;
524
525 for col in 0..col_count {
526 if ti >= targets.len() {
527 break;
528 }
529 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
530
531 if col == targets[ti] {
532 if is_null {
533 results.push(Value::Null);
534 } else {
535 if pos + 5 > data.len() {
536 return Err(SqlError::InvalidValue("truncated column data".into()));
537 }
538 let type_tag = data[pos];
539 pos += 1;
540 let data_len =
541 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
542 as usize;
543 pos += 4;
544 if pos + data_len > data.len() {
545 return Err(SqlError::InvalidValue("truncated column value".into()));
546 }
547 results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
548 pos += data_len;
549 }
550 ti += 1;
551 } else if !is_null {
552 if pos + 5 > data.len() {
553 return Err(SqlError::InvalidValue("truncated column data".into()));
554 }
555 let data_len =
556 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
557 as usize;
558 pos += 5 + data_len;
559 }
560 }
561
562 while ti < targets.len() {
564 results.push(Value::Null);
565 ti += 1;
566 }
567
568 Ok(results)
569}
570
571pub fn decode_columns_into(
572 data: &[u8],
573 targets: &[usize],
574 schema_cols: &[usize],
575 row: &mut [Value],
576) -> Result<()> {
577 if targets.is_empty() {
578 return Ok(());
579 }
580 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
581
582 let mut ti = 0;
583 for col in 0..col_count {
584 if ti >= targets.len() {
585 break;
586 }
587 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
588
589 if col == targets[ti] {
590 if is_null {
591 row[schema_cols[ti]] = Value::Null;
592 } else {
593 if pos + 5 > data.len() {
594 return Err(SqlError::InvalidValue("truncated column data".into()));
595 }
596 let type_tag = data[pos];
597 pos += 1;
598 let data_len =
599 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
600 as usize;
601 pos += 4;
602 if pos + data_len > data.len() {
603 return Err(SqlError::InvalidValue("truncated column value".into()));
604 }
605 row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
606 pos += data_len;
607 }
608 ti += 1;
609 } else if !is_null {
610 if pos + 5 > data.len() {
611 return Err(SqlError::InvalidValue("truncated column data".into()));
612 }
613 let data_len =
614 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
615 as usize;
616 pos += 5 + data_len;
617 }
618 }
619
620 Ok(())
621}
622
623#[derive(Debug, Clone, Copy)]
624pub enum RawColumn<'a> {
625 Null,
626 Integer(i64),
627 Real(f64),
628 Boolean(bool),
629 Text(&'a str),
630 Blob(&'a [u8]),
631}
632
633impl<'a> RawColumn<'a> {
634 pub fn to_value(self) -> Value {
635 match self {
636 RawColumn::Null => Value::Null,
637 RawColumn::Integer(i) => Value::Integer(i),
638 RawColumn::Real(r) => Value::Real(r),
639 RawColumn::Boolean(b) => Value::Boolean(b),
640 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
641 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
642 }
643 }
644
645 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
646 use std::cmp::Ordering;
647 match (self, other) {
648 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
649 (RawColumn::Null, _) | (_, Value::Null) => None,
650 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
651 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
652 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
653 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
654 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
655 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
656 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
657 _ => None,
658 }
659 }
660
661 pub fn eq_value(&self, other: &Value) -> bool {
662 match (self, other) {
663 (RawColumn::Null, Value::Null) => true,
664 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
665 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
666 (RawColumn::Real(a), Value::Real(b)) => a == b,
667 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
668 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
669 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
670 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
671 _ => false,
672 }
673 }
674
675 pub fn as_f64(&self) -> Option<f64> {
676 match self {
677 RawColumn::Integer(i) => Some(*i as f64),
678 RawColumn::Real(r) => Some(*r),
679 _ => None,
680 }
681 }
682
683 pub fn as_i64(&self) -> Option<i64> {
684 match self {
685 RawColumn::Integer(i) => Some(*i),
686 _ => None,
687 }
688 }
689}
690
691fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
692 match DataType::from_tag(type_tag) {
693 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
694 data[..8].try_into().unwrap(),
695 ))),
696 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
697 data[..8].try_into().unwrap(),
698 ))),
699 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
700 Some(DataType::Text) => {
701 let s = std::str::from_utf8(data)
702 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
703 Ok(RawColumn::Text(s))
704 }
705 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
706 _ => Err(SqlError::InvalidValue(format!(
707 "unknown column type tag: {type_tag}"
708 ))),
709 }
710}
711
712pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
714 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
715 if target >= col_count || new_val.is_null() {
716 return Ok(false);
717 }
718 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
719 if was_null {
720 return Ok(false);
721 }
722 for col in 0..target {
723 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
724 if !is_null {
725 if pos + 5 > data.len() {
726 return Err(SqlError::InvalidValue("truncated column data".into()));
727 }
728 let data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
729 pos += 5 + data_len;
730 }
731 }
732 if pos + 5 > data.len() {
733 return Err(SqlError::InvalidValue("truncated column data".into()));
734 }
735 let old_data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
736 let new_data_len = match new_val {
737 Value::Integer(_) | Value::Real(_) => 8,
738 Value::Boolean(_) => 1,
739 Value::Text(s) => s.len(),
740 Value::Blob(b) => b.len(),
741 Value::Null => return Ok(false),
742 };
743 if new_data_len != old_data_len {
744 return Ok(false);
745 }
746 data[pos] = new_val.data_type().type_tag();
747 let val_start = pos + 5;
748 match new_val {
749 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
750 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
751 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
752 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
753 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
754 Value::Null => unreachable!(),
755 }
756 Ok(true)
757}
758
759pub fn patch_row_column(
761 data: &[u8],
762 target: usize,
763 new_val: &Value,
764 out: &mut Vec<u8>,
765) -> Result<()> {
766 let (col_count, bitmap, header_end) = parse_row_header(data)?;
767
768 let new_col_count = if target >= col_count {
770 target + 1
771 } else {
772 col_count
773 };
774 let new_bitmap_bytes = new_col_count.div_ceil(8);
775 let bitmap_bytes = col_count.div_ceil(8);
776 out.clear();
777
778 out.extend_from_slice(&(new_col_count as u16).to_le_bytes());
779 let bitmap_start = out.len();
780 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
781 for _ in bitmap_bytes..new_bitmap_bytes {
782 out.push(0xFF); }
784 if new_val.is_null() {
785 out[bitmap_start + target / 8] |= 1 << (target % 8);
786 } else {
787 out[bitmap_start + target / 8] &= !(1 << (target % 8));
788 }
789
790 let mut pos = header_end;
791 for col in 0..new_col_count {
792 let was_null = if col < col_count {
793 bitmap[col / 8] & (1 << (col % 8)) != 0
794 } else {
795 true };
797
798 if col == target {
799 if !was_null && pos + 5 <= data.len() {
800 let data_len = u32::from_le_bytes([
801 data[pos + 1],
802 data[pos + 2],
803 data[pos + 3],
804 data[pos + 4],
805 ]) as usize;
806 pos += 5 + data_len;
807 }
808 if !new_val.is_null() {
809 match new_val {
810 Value::Integer(v) => {
811 out.push(DataType::Integer.type_tag());
812 out.extend_from_slice(&8u32.to_le_bytes());
813 out.extend_from_slice(&v.to_le_bytes());
814 }
815 Value::Real(r) => {
816 out.push(DataType::Real.type_tag());
817 out.extend_from_slice(&8u32.to_le_bytes());
818 out.extend_from_slice(&r.to_le_bytes());
819 }
820 Value::Boolean(b) => {
821 out.push(DataType::Boolean.type_tag());
822 out.extend_from_slice(&1u32.to_le_bytes());
823 out.push(if *b { 1 } else { 0 });
824 }
825 Value::Text(s) => {
826 let bytes = s.as_bytes();
827 out.push(DataType::Text.type_tag());
828 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
829 out.extend_from_slice(bytes);
830 }
831 Value::Blob(d) => {
832 out.push(DataType::Blob.type_tag());
833 out.extend_from_slice(&(d.len() as u32).to_le_bytes());
834 out.extend_from_slice(d);
835 }
836 Value::Null => unreachable!(),
837 }
838 }
839 } else if was_null {
840 } else {
841 if pos + 5 > data.len() {
842 return Err(SqlError::InvalidValue("truncated column data".into()));
843 }
844 let data_len =
845 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
846 as usize;
847 let end = pos + 5 + data_len;
848 if end > data.len() {
849 return Err(SqlError::InvalidValue("truncated column value".into()));
850 }
851 out.extend_from_slice(&data[pos..end]);
852 pos = end;
853 }
854 }
855 Ok(())
856}
857
858pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
859 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
860 if target >= col_count {
861 return Ok(RawColumn::Null);
862 }
863
864 for col in 0..=target {
865 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
866
867 if col == target {
868 if is_null {
869 return Ok(RawColumn::Null);
870 }
871 if pos + 5 > data.len() {
872 return Err(SqlError::InvalidValue("truncated column data".into()));
873 }
874 let type_tag = data[pos];
875 pos += 1;
876 let data_len =
877 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
878 as usize;
879 pos += 4;
880 if pos + data_len > data.len() {
881 return Err(SqlError::InvalidValue("truncated column value".into()));
882 }
883 return decode_value_raw(type_tag, &data[pos..pos + data_len]);
884 } else if !is_null {
885 if pos + 5 > data.len() {
886 return Err(SqlError::InvalidValue("truncated column data".into()));
887 }
888 let data_len =
889 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
890 as usize;
891 pos += 5 + data_len;
892 }
893 }
894
895 unreachable!()
896}
897
898pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
900 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
901 if target >= col_count {
902 return Ok((RawColumn::Null, usize::MAX));
903 }
904
905 for col in 0..=target {
906 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
907
908 if col == target {
909 if is_null {
910 return Ok((RawColumn::Null, usize::MAX));
911 }
912 if pos + 5 > data.len() {
913 return Err(SqlError::InvalidValue("truncated column data".into()));
914 }
915 let tag_offset = pos;
916 let type_tag = data[pos];
917 pos += 1;
918 let data_len =
919 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
920 as usize;
921 pos += 4;
922 if pos + data_len > data.len() {
923 return Err(SqlError::InvalidValue("truncated column value".into()));
924 }
925 let raw = decode_value_raw(type_tag, &data[pos..pos + data_len])?;
926 return Ok((raw, tag_offset));
927 } else if !is_null {
928 if pos + 5 > data.len() {
929 return Err(SqlError::InvalidValue("truncated column data".into()));
930 }
931 let data_len =
932 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
933 as usize;
934 pos += 5 + data_len;
935 }
936 }
937
938 unreachable!()
939}
940
941pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
943 if offset == usize::MAX || new_val.is_null() {
944 return Ok(false);
945 }
946 if offset + 5 > data.len() {
947 return Err(SqlError::InvalidValue("truncated column data".into()));
948 }
949 let old_data_len =
950 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
951 let new_data_len = match new_val {
952 Value::Integer(_) | Value::Real(_) => 8,
953 Value::Boolean(_) => 1,
954 Value::Text(s) => s.len(),
955 Value::Blob(b) => b.len(),
956 Value::Null => return Ok(false),
957 };
958 if new_data_len != old_data_len {
959 return Ok(false);
960 }
961 data[offset] = new_val.data_type().type_tag();
962 let val_start = offset + 5;
963 match new_val {
964 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
965 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
966 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
967 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
968 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
969 Value::Null => unreachable!(),
970 }
971 Ok(true)
972}
973
974pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
975 if key.is_empty() || key[0] != TAG_INTEGER {
976 return Err(SqlError::InvalidValue("not an integer key".into()));
977 }
978 let (val, _) = decode_integer(&key[1..])?;
979 match val {
980 Value::Integer(i) => Ok(i),
981 _ => unreachable!(),
982 }
983}
984
985#[cfg(test)]
986mod tests {
987 use super::*;
988
989 #[test]
992 fn key_null() {
993 let encoded = encode_key_value(&Value::Null);
994 let (decoded, n) = decode_key_value(&encoded).unwrap();
995 assert_eq!(n, 1);
996 assert_eq!(decoded, Value::Null);
997 }
998
999 #[test]
1000 fn key_boolean() {
1001 let f_enc = encode_key_value(&Value::Boolean(false));
1002 let t_enc = encode_key_value(&Value::Boolean(true));
1003 assert!(f_enc < t_enc);
1004
1005 let (f_dec, _) = decode_key_value(&f_enc).unwrap();
1006 let (t_dec, _) = decode_key_value(&t_enc).unwrap();
1007 assert_eq!(f_dec, Value::Boolean(false));
1008 assert_eq!(t_dec, Value::Boolean(true));
1009 }
1010
1011 #[test]
1012 fn key_integer_roundtrip() {
1013 let test_values = [
1014 i64::MIN,
1015 -1_000_000,
1016 -256,
1017 -1,
1018 0,
1019 1,
1020 127,
1021 128,
1022 255,
1023 256,
1024 65535,
1025 1_000_000,
1026 i64::MAX,
1027 ];
1028 for &v in &test_values {
1029 let encoded = encode_key_value(&Value::Integer(v));
1030 let (decoded, _) = decode_key_value(&encoded).unwrap();
1031 assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
1032 }
1033 }
1034
1035 #[test]
1036 fn key_integer_sort_order() {
1037 let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
1038 let encoded: Vec<Vec<u8>> = values
1039 .iter()
1040 .map(|&v| encode_key_value(&Value::Integer(v)))
1041 .collect();
1042
1043 for i in 0..encoded.len() - 1 {
1044 assert!(
1045 encoded[i] < encoded[i + 1],
1046 "sort order broken: {} vs {}",
1047 values[i],
1048 values[i + 1]
1049 );
1050 }
1051 }
1052
1053 #[test]
1054 fn key_real_roundtrip() {
1055 let test_values = [
1056 f64::NEG_INFINITY,
1057 -1e100,
1058 -1.0,
1059 -f64::MIN_POSITIVE,
1060 -0.0,
1061 0.0,
1062 f64::MIN_POSITIVE,
1063 0.5,
1064 1.0,
1065 1e100,
1066 f64::INFINITY,
1067 ];
1068 for &v in &test_values {
1069 let encoded = encode_key_value(&Value::Real(v));
1070 let (decoded, _) = decode_key_value(&encoded).unwrap();
1071 match decoded {
1072 Value::Real(r) => {
1073 assert!(
1074 v.to_bits() == r.to_bits(),
1075 "roundtrip failed for {v}: got {r}"
1076 );
1077 }
1078 _ => panic!("expected Real"),
1079 }
1080 }
1081 }
1082
1083 #[test]
1084 fn key_real_sort_order() {
1085 let values = [
1086 f64::NEG_INFINITY,
1087 -100.0,
1088 -1.0,
1089 -0.0,
1090 0.0,
1091 1.0,
1092 100.0,
1093 f64::INFINITY,
1094 ];
1095 let encoded: Vec<Vec<u8>> = values
1096 .iter()
1097 .map(|&v| encode_key_value(&Value::Real(v)))
1098 .collect();
1099
1100 for i in 0..encoded.len() - 1 {
1101 assert!(
1102 encoded[i] <= encoded[i + 1],
1103 "sort order broken: {} vs {}",
1104 values[i],
1105 values[i + 1]
1106 );
1107 }
1108 }
1109
1110 #[test]
1111 fn key_text_roundtrip() {
1112 let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
1113 for &v in &test_values {
1114 let encoded = encode_key_value(&Value::Text(v.into()));
1115 let (decoded, _) = decode_key_value(&encoded).unwrap();
1116 assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
1117 }
1118 }
1119
1120 #[test]
1121 fn key_text_sort_order() {
1122 let values = ["", "a", "ab", "b", "ba", "z"];
1123 let encoded: Vec<Vec<u8>> = values
1124 .iter()
1125 .map(|&v| encode_key_value(&Value::Text(v.into())))
1126 .collect();
1127
1128 for i in 0..encoded.len() - 1 {
1129 assert!(
1130 encoded[i] < encoded[i + 1],
1131 "sort order broken: {:?} vs {:?}",
1132 values[i],
1133 values[i + 1]
1134 );
1135 }
1136 }
1137
1138 #[test]
1139 fn key_blob_roundtrip() {
1140 let test_values: Vec<Vec<u8>> = vec![
1141 vec![],
1142 vec![0x00],
1143 vec![0x00, 0xFF],
1144 vec![0xFF, 0x00],
1145 vec![0x00, 0x00, 0x00],
1146 ];
1147 for v in &test_values {
1148 let encoded = encode_key_value(&Value::Blob(v.clone()));
1149 let (decoded, _) = decode_key_value(&encoded).unwrap();
1150 assert_eq!(decoded, Value::Blob(v.clone()));
1151 }
1152 }
1153
1154 #[test]
1155 fn key_composite_roundtrip() {
1156 let values = vec![
1157 Value::Integer(42),
1158 Value::Text("hello".into()),
1159 Value::Boolean(true),
1160 ];
1161 let encoded = encode_composite_key(&values);
1162 let decoded = decode_composite_key(&encoded, 3).unwrap();
1163 assert_eq!(decoded[0], Value::Integer(42));
1164 assert_eq!(decoded[1], Value::Text("hello".into()));
1165 assert_eq!(decoded[2], Value::Boolean(true));
1166 }
1167
1168 #[test]
1169 fn key_composite_sort_order() {
1170 let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
1172 let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
1173 let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
1174 assert!(k1 < k2);
1175 assert!(k2 < k3);
1176 }
1177
1178 #[test]
1179 fn key_cross_type_ordering() {
1180 let null = encode_key_value(&Value::Null);
1181 let bool_val = encode_key_value(&Value::Boolean(false));
1182 let int = encode_key_value(&Value::Integer(0));
1183 let text = encode_key_value(&Value::Text("".into()));
1184 let blob = encode_key_value(&Value::Blob(vec![]));
1185
1186 assert!(null < blob);
1187 assert!(blob < text);
1188 assert!(text < bool_val);
1189 assert!(bool_val < int);
1190 }
1191
1192 #[test]
1195 fn row_roundtrip_simple() {
1196 let values = vec![
1197 Value::Integer(42),
1198 Value::Text("hello".into()),
1199 Value::Boolean(true),
1200 ];
1201 let encoded = encode_row(&values);
1202 let decoded = decode_row(&encoded).unwrap();
1203 assert_eq!(decoded.len(), 3);
1204 assert_eq!(decoded[0], Value::Integer(42));
1205 assert_eq!(decoded[1], Value::Text("hello".into()));
1206 assert_eq!(decoded[2], Value::Boolean(true));
1207 }
1208
1209 #[test]
1210 fn row_roundtrip_with_nulls() {
1211 let values = vec![
1212 Value::Integer(1),
1213 Value::Null,
1214 Value::Text("test".into()),
1215 Value::Null,
1216 ];
1217 let encoded = encode_row(&values);
1218 let decoded = decode_row(&encoded).unwrap();
1219 assert_eq!(decoded.len(), 4);
1220 assert_eq!(decoded[0], Value::Integer(1));
1221 assert!(decoded[1].is_null());
1222 assert_eq!(decoded[2], Value::Text("test".into()));
1223 assert!(decoded[3].is_null());
1224 }
1225
1226 #[test]
1227 fn row_roundtrip_empty() {
1228 let values: Vec<Value> = vec![];
1229 let encoded = encode_row(&values);
1230 let decoded = decode_row(&encoded).unwrap();
1231 assert!(decoded.is_empty());
1232 }
1233
1234 #[test]
1235 fn row_roundtrip_all_types() {
1236 let values = vec![
1237 Value::Integer(-100),
1238 Value::Real(3.15),
1239 Value::Text("hello world".into()),
1240 Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1241 Value::Boolean(false),
1242 Value::Null,
1243 ];
1244 let encoded = encode_row(&values);
1245 let decoded = decode_row(&encoded).unwrap();
1246 assert_eq!(decoded.len(), 6);
1247 assert_eq!(decoded[0], Value::Integer(-100));
1248 assert_eq!(decoded[1], Value::Real(3.15));
1249 assert_eq!(decoded[2], Value::Text("hello world".into()));
1250 assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1251 assert_eq!(decoded[4], Value::Boolean(false));
1252 assert!(decoded[5].is_null());
1253 }
1254
1255 #[test]
1256 fn null_escaped_with_embedded_nulls() {
1257 let text = "before\0after";
1258 let encoded = encode_key_value(&Value::Text(text.into()));
1259 let (decoded, _) = decode_key_value(&encoded).unwrap();
1260 assert_eq!(decoded, Value::Text(text.into()));
1261 }
1262
1263 #[test]
1264 fn key_integer_edge_cases() {
1265 for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1266 let encoded = encode_key_value(&Value::Integer(v));
1267 let (decoded, n) = decode_key_value(&encoded).unwrap();
1268 assert_eq!(n, encoded.len());
1269 assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1270 }
1271 }
1272
1273 #[test]
1274 fn decode_columns_single() {
1275 let values = vec![
1276 Value::Integer(42),
1277 Value::Text("hello".into()),
1278 Value::Boolean(true),
1279 ];
1280 let encoded = encode_row(&values);
1281 let cols = decode_columns(&encoded, &[1]).unwrap();
1282 assert_eq!(cols.len(), 1);
1283 assert_eq!(cols[0], Value::Text("hello".into()));
1284 }
1285
1286 #[test]
1287 fn decode_columns_multiple() {
1288 let values = vec![
1289 Value::Integer(1),
1290 Value::Real(2.5),
1291 Value::Text("skip".into()),
1292 Value::Boolean(false),
1293 Value::Blob(vec![0xAB]),
1294 ];
1295 let encoded = encode_row(&values);
1296 let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1297 assert_eq!(cols.len(), 3);
1298 assert_eq!(cols[0], Value::Integer(1));
1299 assert_eq!(cols[1], Value::Boolean(false));
1300 assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1301 }
1302
1303 #[test]
1304 fn decode_columns_with_nulls() {
1305 let values = vec![
1306 Value::Integer(10),
1307 Value::Null,
1308 Value::Text("after_null".into()),
1309 Value::Null,
1310 Value::Boolean(true),
1311 ];
1312 let encoded = encode_row(&values);
1313 let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1314 assert_eq!(cols.len(), 3);
1315 assert!(cols[0].is_null());
1316 assert_eq!(cols[1], Value::Text("after_null".into()));
1317 assert_eq!(cols[2], Value::Boolean(true));
1318 }
1319
1320 #[test]
1321 fn decode_columns_first_and_last() {
1322 let values = vec![
1323 Value::Text("first".into()),
1324 Value::Integer(99),
1325 Value::Boolean(false),
1326 Value::Real(3.125),
1327 ];
1328 let encoded = encode_row(&values);
1329 let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1330 assert_eq!(cols.len(), 2);
1331 assert_eq!(cols[0], Value::Text("first".into()));
1332 assert_eq!(cols[1], Value::Real(3.125));
1333 }
1334
1335 #[test]
1336 fn decode_columns_empty_targets() {
1337 let values = vec![Value::Integer(1)];
1338 let encoded = encode_row(&values);
1339 let cols = decode_columns(&encoded, &[]).unwrap();
1340 assert!(cols.is_empty());
1341 }
1342
1343 #[test]
1344 fn decode_columns_all_matches_full_decode() {
1345 let values = vec![
1346 Value::Integer(-100),
1347 Value::Real(3.15),
1348 Value::Text("hello world".into()),
1349 Value::Blob(vec![0xDE, 0xAD]),
1350 Value::Boolean(false),
1351 Value::Null,
1352 ];
1353 let encoded = encode_row(&values);
1354 let full = decode_row(&encoded).unwrap();
1355 let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1356 assert_eq!(full, selective);
1357 }
1358
1359 #[test]
1360 fn raw_column_integer() {
1361 let values = vec![Value::Integer(42), Value::Text("hello".into())];
1362 let encoded = encode_row(&values);
1363 let raw = decode_column_raw(&encoded, 0).unwrap();
1364 assert!(matches!(raw, RawColumn::Integer(42)));
1365 assert_eq!(raw.to_value(), Value::Integer(42));
1366 }
1367
1368 #[test]
1369 fn raw_column_text_borrows() {
1370 let values = vec![Value::Integer(1), Value::Text("hello".into())];
1371 let encoded = encode_row(&values);
1372 let raw = decode_column_raw(&encoded, 1).unwrap();
1373 match raw {
1374 RawColumn::Text(s) => assert_eq!(s, "hello"),
1375 other => panic!("expected Text, got {other:?}"),
1376 }
1377 }
1378
1379 #[test]
1380 fn raw_column_null() {
1381 let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1382 let encoded = encode_row(&values);
1383 let raw = decode_column_raw(&encoded, 1).unwrap();
1384 assert!(matches!(raw, RawColumn::Null));
1385 }
1386
1387 #[test]
1388 fn raw_column_last() {
1389 let values = vec![
1390 Value::Integer(1),
1391 Value::Text("skip".into()),
1392 Value::Real(3.15),
1393 ];
1394 let encoded = encode_row(&values);
1395 let raw = decode_column_raw(&encoded, 2).unwrap();
1396 match raw {
1397 RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1398 other => panic!("expected Real, got {other:?}"),
1399 }
1400 }
1401
1402 #[test]
1403 fn raw_column_out_of_bounds_returns_null() {
1404 let values = vec![Value::Integer(1)];
1405 let encoded = encode_row(&values);
1406 assert!(matches!(
1407 decode_column_raw(&encoded, 1).unwrap(),
1408 RawColumn::Null
1409 ));
1410 }
1411
1412 #[test]
1413 fn raw_column_eq_value() {
1414 let raw_int = RawColumn::Integer(42);
1415 assert!(raw_int.eq_value(&Value::Integer(42)));
1416 assert!(!raw_int.eq_value(&Value::Integer(43)));
1417 assert!(raw_int.eq_value(&Value::Real(42.0)));
1418
1419 let raw_text = RawColumn::Text("hello");
1420 assert!(raw_text.eq_value(&Value::Text("hello".into())));
1421 assert!(!raw_text.eq_value(&Value::Text("world".into())));
1422 }
1423
1424 #[test]
1425 fn raw_column_cmp_value() {
1426 use std::cmp::Ordering;
1427 let raw = RawColumn::Integer(42);
1428 assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1429 assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1430 assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1431 assert_eq!(raw.cmp_value(&Value::Null), None);
1432 }
1433
1434 #[test]
1435 fn raw_column_as_numeric() {
1436 assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1437 assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1438 assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1439 assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1440 assert_eq!(RawColumn::Text("x").as_f64(), None);
1441 assert_eq!(RawColumn::Null.as_i64(), None);
1442 }
1443
1444 #[test]
1445 fn decode_pk_integer_roundtrip() {
1446 for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1447 let encoded = encode_key_value(&Value::Integer(v));
1448 let decoded = decode_pk_integer(&encoded).unwrap();
1449 assert_eq!(decoded, v);
1450 }
1451 }
1452
1453 #[test]
1454 fn decode_pk_integer_rejects_non_integer() {
1455 let encoded = encode_key_value(&Value::Text("hello".into()));
1456 assert!(decode_pk_integer(&encoded).is_err());
1457 }
1458
1459 #[test]
1460 fn raw_column_blob() {
1461 let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1462 let encoded = encode_row(&values);
1463 let raw = decode_column_raw(&encoded, 0).unwrap();
1464 match raw {
1465 RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1466 other => panic!("expected Blob, got {other:?}"),
1467 }
1468 }
1469
1470 #[test]
1471 fn raw_column_matches_full_decode() {
1472 let values = vec![
1473 Value::Integer(-100),
1474 Value::Real(3.15),
1475 Value::Text("hello world".into()),
1476 Value::Blob(vec![0xDE, 0xAD]),
1477 Value::Boolean(false),
1478 Value::Null,
1479 ];
1480 let encoded = encode_row(&values);
1481 let full = decode_row(&encoded).unwrap();
1482 for (i, expected) in full.iter().enumerate() {
1483 let raw = decode_column_raw(&encoded, i).unwrap();
1484 assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1485 }
1486 }
1487}