1use crate::error::{Result, SqlError};
5use crate::types::{CompactString, DataType, Value};
6
7const TAG_NULL: u8 = 0x00;
11const TAG_BLOB: u8 = 0x01;
12const TAG_TEXT: u8 = 0x02;
13const TAG_BOOLEAN: u8 = 0x03;
14const TAG_INTEGER: u8 = 0x04;
15const TAG_REAL: u8 = 0x05;
16
17pub fn encode_key_value(value: &Value) -> Vec<u8> {
19 match value {
20 Value::Null => vec![TAG_NULL],
21 Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
22 Value::Integer(i) => encode_integer(*i),
23 Value::Real(r) => encode_real(*r),
24 Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
25 Value::Blob(b) => encode_bytes(TAG_BLOB, b),
26 }
27}
28
29pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31 let mut buf = Vec::new();
32 for v in values {
33 buf.extend_from_slice(&encode_key_value(v));
34 }
35 buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39 buf.clear();
40 for v in values {
41 encode_key_value_into(v, buf);
42 }
43}
44
45fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
46 match value {
47 Value::Null => buf.push(TAG_NULL),
48 Value::Boolean(b) => {
49 buf.push(TAG_BOOLEAN);
50 buf.push(if *b { 0x01 } else { 0x00 });
51 }
52 Value::Integer(i) => encode_integer_into(*i, buf),
53 Value::Real(r) => encode_real_into(*r, buf),
54 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
55 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
56 }
57}
58
59fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
60 buf.push(TAG_INTEGER);
61 if val == 0 {
62 buf.push(0x80);
63 return;
64 }
65 if val > 0 {
66 let bytes = val.to_be_bytes();
67 let start = bytes.iter().position(|&b| b != 0).unwrap();
68 let byte_count = (8 - start) as u8;
69 buf.push(0x80 + byte_count);
70 buf.extend_from_slice(&bytes[start..]);
71 } else {
72 let abs_val = if val == i64::MIN {
73 u64::MAX / 2 + 1
74 } else {
75 (-val) as u64
76 };
77 let bytes = abs_val.to_be_bytes();
78 let start = bytes.iter().position(|&b| b != 0).unwrap();
79 let byte_count = (8 - start) as u8;
80 buf.push(0x80 - byte_count);
81 for &b in &bytes[start..] {
82 buf.push(!b);
83 }
84 }
85}
86
87fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
88 buf.push(TAG_REAL);
89 let bits = val.to_bits();
90 let encoded = if val.is_sign_negative() {
91 !bits
92 } else {
93 bits ^ (1u64 << 63)
94 };
95 buf.extend_from_slice(&encoded.to_be_bytes());
96}
97
98fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
99 buf.push(tag);
100 for &b in data {
101 if b == 0x00 {
102 buf.push(0x00);
103 buf.push(0xFF);
104 } else {
105 buf.push(b);
106 }
107 }
108 buf.push(0x00);
109}
110
111pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
113 if data.is_empty() {
114 return Err(SqlError::InvalidValue("empty key data".into()));
115 }
116 match data[0] {
117 TAG_NULL => Ok((Value::Null, 1)),
118 TAG_BOOLEAN => {
119 if data.len() < 2 {
120 return Err(SqlError::InvalidValue("truncated boolean".into()));
121 }
122 Ok((Value::Boolean(data[1] != 0), 2))
123 }
124 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
125 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
126 TAG_TEXT => {
127 let (bytes, n) = decode_null_escaped(&data[1..])?;
128 let s = String::from_utf8(bytes)
129 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
130 Ok((Value::Text(CompactString::from(s)), n + 1))
131 }
132 TAG_BLOB => {
133 let (bytes, n) = decode_null_escaped(&data[1..])?;
134 Ok((Value::Blob(bytes), n + 1))
135 }
136 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
137 }
138}
139
140pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
142 let mut values = Vec::with_capacity(count);
143 let mut pos = 0;
144 for _ in 0..count {
145 let (v, n) = decode_key_value(&data[pos..])?;
146 values.push(v);
147 pos += n;
148 }
149 Ok(values)
150}
151
152fn encode_integer(val: i64) -> Vec<u8> {
155 let mut buf = vec![TAG_INTEGER];
156 if val == 0 {
157 buf.push(0x80);
158 return buf;
159 }
160 if val > 0 {
161 let bytes = val.to_be_bytes();
162 let start = bytes.iter().position(|&b| b != 0).unwrap();
164 let byte_count = (8 - start) as u8;
165 buf.push(0x80 + byte_count);
166 buf.extend_from_slice(&bytes[start..]);
167 } else {
168 let abs_val = if val == i64::MIN {
170 u64::MAX / 2 + 1
172 } else {
173 (-val) as u64
174 };
175 let bytes = abs_val.to_be_bytes();
176 let start = bytes.iter().position(|&b| b != 0).unwrap();
177 let byte_count = (8 - start) as u8;
178 buf.push(0x80 - byte_count);
179 for &b in &bytes[start..] {
181 buf.push(!b);
182 }
183 }
184 buf
185}
186
187fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
188 if data.is_empty() {
189 return Err(SqlError::InvalidValue("truncated integer".into()));
190 }
191 let marker = data[0];
192 if marker == 0x80 {
193 return Ok((Value::Integer(0), 1));
194 }
195 if marker > 0x80 {
196 let byte_count = (marker - 0x80) as usize;
198 if data.len() < 1 + byte_count {
199 return Err(SqlError::InvalidValue("truncated positive integer".into()));
200 }
201 let mut bytes = [0u8; 8];
202 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
203 let val = i64::from_be_bytes(bytes);
204 Ok((Value::Integer(val), 1 + byte_count))
205 } else {
206 let byte_count = (0x80 - marker) as usize;
208 if data.len() < 1 + byte_count {
209 return Err(SqlError::InvalidValue("truncated negative integer".into()));
210 }
211 let mut bytes = [0u8; 8];
212 for i in 0..byte_count {
213 bytes[8 - byte_count + i] = !data[1 + i];
214 }
215 let abs_val = u64::from_be_bytes(bytes);
216 let val = (-(abs_val as i128)) as i64;
218 Ok((Value::Integer(val), 1 + byte_count))
219 }
220}
221
222fn encode_real(val: f64) -> Vec<u8> {
225 let mut buf = vec![TAG_REAL];
226 let bits = val.to_bits();
227 let encoded = if val.is_sign_negative() {
228 !bits
230 } else {
231 bits ^ (1u64 << 63)
233 };
234 buf.extend_from_slice(&encoded.to_be_bytes());
235 buf
236}
237
238fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
239 if data.len() < 8 {
240 return Err(SqlError::InvalidValue("truncated real".into()));
241 }
242 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
243 let bits = if encoded & (1u64 << 63) != 0 {
244 encoded ^ (1u64 << 63)
246 } else {
247 !encoded
249 };
250 let val = f64::from_bits(bits);
251 Ok((Value::Real(val), 8))
252}
253
254fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
258 let mut buf = Vec::with_capacity(data.len() + 2);
259 buf.push(tag);
260 for &b in data {
261 if b == 0x00 {
262 buf.push(0x00);
263 buf.push(0xFF);
264 } else {
265 buf.push(b);
266 }
267 }
268 buf.push(0x00); buf
270}
271
272fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
274 let mut result = Vec::new();
275 let mut i = 0;
276 while i < data.len() {
277 if data[i] == 0x00 {
278 if i + 1 < data.len() && data[i + 1] == 0xFF {
279 result.push(0x00);
280 i += 2;
281 } else {
282 return Ok((result, i + 1)); }
284 } else {
285 result.push(data[i]);
286 i += 1;
287 }
288 }
289 Err(SqlError::InvalidValue(
290 "unterminated null-escaped string".into(),
291 ))
292}
293
294pub fn encode_row(values: &[Value]) -> Vec<u8> {
299 let col_count = values.len();
300 let bitmap_bytes = col_count.div_ceil(8);
301 let mut buf = Vec::new();
302
303 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
305
306 let mut bitmap = vec![0u8; bitmap_bytes];
308 for (i, v) in values.iter().enumerate() {
309 if v.is_null() {
310 bitmap[i / 8] |= 1 << (i % 8);
311 }
312 }
313 buf.extend_from_slice(&bitmap);
314
315 for v in values {
317 if v.is_null() {
318 continue;
319 }
320 match v {
321 Value::Integer(i) => {
322 buf.push(DataType::Integer.type_tag());
323 buf.extend_from_slice(&8u32.to_le_bytes());
324 buf.extend_from_slice(&i.to_le_bytes());
325 }
326 Value::Real(r) => {
327 buf.push(DataType::Real.type_tag());
328 buf.extend_from_slice(&8u32.to_le_bytes());
329 buf.extend_from_slice(&r.to_le_bytes());
330 }
331 Value::Boolean(b) => {
332 buf.push(DataType::Boolean.type_tag());
333 buf.extend_from_slice(&1u32.to_le_bytes());
334 buf.push(if *b { 1 } else { 0 });
335 }
336 Value::Text(s) => {
337 let bytes = s.as_bytes();
338 buf.push(DataType::Text.type_tag());
339 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
340 buf.extend_from_slice(bytes);
341 }
342 Value::Blob(data) => {
343 buf.push(DataType::Blob.type_tag());
344 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
345 buf.extend_from_slice(data);
346 }
347 Value::Null => unreachable!(),
348 }
349 }
350
351 buf
352}
353
354pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
355 buf.clear();
356 let col_count = values.len();
357 let bitmap_bytes = col_count.div_ceil(8);
358
359 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
360
361 let bitmap_start = buf.len();
362 buf.resize(buf.len() + bitmap_bytes, 0);
363
364 for (i, v) in values.iter().enumerate() {
365 if v.is_null() {
366 buf[bitmap_start + i / 8] |= 1 << (i % 8);
367 continue;
368 }
369 match v {
370 Value::Integer(val) => {
371 buf.push(DataType::Integer.type_tag());
372 buf.extend_from_slice(&8u32.to_le_bytes());
373 buf.extend_from_slice(&val.to_le_bytes());
374 }
375 Value::Real(r) => {
376 buf.push(DataType::Real.type_tag());
377 buf.extend_from_slice(&8u32.to_le_bytes());
378 buf.extend_from_slice(&r.to_le_bytes());
379 }
380 Value::Boolean(b) => {
381 buf.push(DataType::Boolean.type_tag());
382 buf.extend_from_slice(&1u32.to_le_bytes());
383 buf.push(if *b { 1 } else { 0 });
384 }
385 Value::Text(s) => {
386 let bytes = s.as_bytes();
387 buf.push(DataType::Text.type_tag());
388 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
389 buf.extend_from_slice(bytes);
390 }
391 Value::Blob(data) => {
392 buf.push(DataType::Blob.type_tag());
393 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
394 buf.extend_from_slice(data);
395 }
396 Value::Null => unreachable!(),
397 }
398 }
399}
400
401fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
402 match DataType::from_tag(type_tag) {
403 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
404 data[..8].try_into().unwrap(),
405 ))),
406 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
407 data[..8].try_into().unwrap(),
408 ))),
409 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
410 Some(DataType::Text) => {
411 let s = std::str::from_utf8(data)
412 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
413 Ok(Value::Text(CompactString::from(s)))
414 }
415 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
416 _ => Err(SqlError::InvalidValue(format!(
417 "unknown column type tag: {type_tag}"
418 ))),
419 }
420}
421
422fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
423 if data.len() < 2 {
424 return Err(SqlError::InvalidValue("row data too short".into()));
425 }
426 let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
427 let bitmap_bytes = col_count.div_ceil(8);
428 let pos = 2;
429 if data.len() < pos + bitmap_bytes {
430 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
431 }
432 Ok((
433 col_count,
434 &data[pos..pos + bitmap_bytes],
435 pos + bitmap_bytes,
436 ))
437}
438
439pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
440 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
441
442 let mut values = Vec::with_capacity(col_count);
443 for i in 0..col_count {
444 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
445 values.push(Value::Null);
446 continue;
447 }
448
449 if pos + 5 > data.len() {
450 return Err(SqlError::InvalidValue("truncated column data".into()));
451 }
452 let type_tag = data[pos];
453 pos += 1;
454 let data_len =
455 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
456 pos += 4;
457
458 if pos + data_len > data.len() {
459 return Err(SqlError::InvalidValue("truncated column value".into()));
460 }
461
462 values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
463 pos += data_len;
464 }
465
466 Ok(values)
467}
468
469pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
470 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
471
472 for i in 0..col_count {
473 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
474 continue;
475 }
476
477 if pos + 5 > data.len() {
478 return Err(SqlError::InvalidValue("truncated column data".into()));
479 }
480 let type_tag = data[pos];
481 pos += 1;
482 let data_len =
483 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
484 pos += 4;
485
486 if pos + data_len > data.len() {
487 return Err(SqlError::InvalidValue("truncated column value".into()));
488 }
489
490 if i < col_mapping.len() {
491 out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
492 }
493 pos += data_len;
494 }
495
496 Ok(())
497}
498
499pub fn decode_pk_into(
500 key: &[u8],
501 count: usize,
502 out: &mut [Value],
503 pk_mapping: &[usize],
504) -> Result<()> {
505 let mut pos = 0;
506 for i in 0..count {
507 let (v, n) = decode_key_value(&key[pos..])?;
508 if i < pk_mapping.len() {
509 out[pk_mapping[i]] = v;
510 }
511 pos += n;
512 }
513 Ok(())
514}
515
516pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
517 if targets.is_empty() {
518 return Ok(Vec::new());
519 }
520 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
521 if *targets.last().unwrap() >= col_count {
522 return Err(SqlError::InvalidValue("column index out of bounds".into()));
523 }
524
525 let mut results = Vec::with_capacity(targets.len());
526 let mut ti = 0;
527
528 for col in 0..col_count {
529 if ti >= targets.len() {
530 break;
531 }
532 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
533
534 if col == targets[ti] {
535 if is_null {
536 results.push(Value::Null);
537 } else {
538 if pos + 5 > data.len() {
539 return Err(SqlError::InvalidValue("truncated column data".into()));
540 }
541 let type_tag = data[pos];
542 pos += 1;
543 let data_len =
544 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
545 as usize;
546 pos += 4;
547 if pos + data_len > data.len() {
548 return Err(SqlError::InvalidValue("truncated column value".into()));
549 }
550 results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
551 pos += data_len;
552 }
553 ti += 1;
554 } else if !is_null {
555 if pos + 5 > data.len() {
556 return Err(SqlError::InvalidValue("truncated column data".into()));
557 }
558 let data_len =
559 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
560 as usize;
561 pos += 5 + data_len;
562 }
563 }
564
565 Ok(results)
566}
567
568pub fn decode_columns_into(
569 data: &[u8],
570 targets: &[usize],
571 schema_cols: &[usize],
572 row: &mut [Value],
573) -> Result<()> {
574 if targets.is_empty() {
575 return Ok(());
576 }
577 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
578 if *targets.last().unwrap() >= col_count {
579 return Err(SqlError::InvalidValue("column index out of bounds".into()));
580 }
581
582 let mut ti = 0;
583 for col in 0..col_count {
584 if ti >= targets.len() {
585 break;
586 }
587 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
588
589 if col == targets[ti] {
590 if is_null {
591 row[schema_cols[ti]] = Value::Null;
592 } else {
593 if pos + 5 > data.len() {
594 return Err(SqlError::InvalidValue("truncated column data".into()));
595 }
596 let type_tag = data[pos];
597 pos += 1;
598 let data_len =
599 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
600 as usize;
601 pos += 4;
602 if pos + data_len > data.len() {
603 return Err(SqlError::InvalidValue("truncated column value".into()));
604 }
605 row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
606 pos += data_len;
607 }
608 ti += 1;
609 } else if !is_null {
610 if pos + 5 > data.len() {
611 return Err(SqlError::InvalidValue("truncated column data".into()));
612 }
613 let data_len =
614 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
615 as usize;
616 pos += 5 + data_len;
617 }
618 }
619
620 Ok(())
621}
622
623#[derive(Debug, Clone, Copy)]
624pub enum RawColumn<'a> {
625 Null,
626 Integer(i64),
627 Real(f64),
628 Boolean(bool),
629 Text(&'a str),
630 Blob(&'a [u8]),
631}
632
633impl<'a> RawColumn<'a> {
634 pub fn to_value(self) -> Value {
635 match self {
636 RawColumn::Null => Value::Null,
637 RawColumn::Integer(i) => Value::Integer(i),
638 RawColumn::Real(r) => Value::Real(r),
639 RawColumn::Boolean(b) => Value::Boolean(b),
640 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
641 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
642 }
643 }
644
645 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
646 use std::cmp::Ordering;
647 match (self, other) {
648 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
649 (RawColumn::Null, _) | (_, Value::Null) => None,
650 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
651 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
652 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
653 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
654 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
655 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
656 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
657 _ => None,
658 }
659 }
660
661 pub fn eq_value(&self, other: &Value) -> bool {
662 match (self, other) {
663 (RawColumn::Null, Value::Null) => true,
664 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
665 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
666 (RawColumn::Real(a), Value::Real(b)) => a == b,
667 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
668 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
669 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
670 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
671 _ => false,
672 }
673 }
674
675 pub fn as_f64(&self) -> Option<f64> {
676 match self {
677 RawColumn::Integer(i) => Some(*i as f64),
678 RawColumn::Real(r) => Some(*r),
679 _ => None,
680 }
681 }
682
683 pub fn as_i64(&self) -> Option<i64> {
684 match self {
685 RawColumn::Integer(i) => Some(*i),
686 _ => None,
687 }
688 }
689}
690
691fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
692 match DataType::from_tag(type_tag) {
693 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
694 data[..8].try_into().unwrap(),
695 ))),
696 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
697 data[..8].try_into().unwrap(),
698 ))),
699 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
700 Some(DataType::Text) => {
701 let s = std::str::from_utf8(data)
702 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
703 Ok(RawColumn::Text(s))
704 }
705 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
706 _ => Err(SqlError::InvalidValue(format!(
707 "unknown column type tag: {type_tag}"
708 ))),
709 }
710}
711
712pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
713 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
714 if target >= col_count {
715 return Err(SqlError::InvalidValue("column index out of bounds".into()));
716 }
717
718 for col in 0..=target {
719 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
720
721 if col == target {
722 if is_null {
723 return Ok(RawColumn::Null);
724 }
725 if pos + 5 > data.len() {
726 return Err(SqlError::InvalidValue("truncated column data".into()));
727 }
728 let type_tag = data[pos];
729 pos += 1;
730 let data_len =
731 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
732 as usize;
733 pos += 4;
734 if pos + data_len > data.len() {
735 return Err(SqlError::InvalidValue("truncated column value".into()));
736 }
737 return decode_value_raw(type_tag, &data[pos..pos + data_len]);
738 } else if !is_null {
739 if pos + 5 > data.len() {
740 return Err(SqlError::InvalidValue("truncated column data".into()));
741 }
742 let data_len =
743 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
744 as usize;
745 pos += 5 + data_len;
746 }
747 }
748
749 unreachable!()
750}
751
752pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
753 if key.is_empty() || key[0] != TAG_INTEGER {
754 return Err(SqlError::InvalidValue("not an integer key".into()));
755 }
756 let (val, _) = decode_integer(&key[1..])?;
757 match val {
758 Value::Integer(i) => Ok(i),
759 _ => unreachable!(),
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 #[test]
770 fn key_null() {
771 let encoded = encode_key_value(&Value::Null);
772 let (decoded, n) = decode_key_value(&encoded).unwrap();
773 assert_eq!(n, 1);
774 assert_eq!(decoded, Value::Null);
775 }
776
777 #[test]
778 fn key_boolean() {
779 let f_enc = encode_key_value(&Value::Boolean(false));
780 let t_enc = encode_key_value(&Value::Boolean(true));
781 assert!(f_enc < t_enc);
782
783 let (f_dec, _) = decode_key_value(&f_enc).unwrap();
784 let (t_dec, _) = decode_key_value(&t_enc).unwrap();
785 assert_eq!(f_dec, Value::Boolean(false));
786 assert_eq!(t_dec, Value::Boolean(true));
787 }
788
789 #[test]
790 fn key_integer_roundtrip() {
791 let test_values = [
792 i64::MIN,
793 -1_000_000,
794 -256,
795 -1,
796 0,
797 1,
798 127,
799 128,
800 255,
801 256,
802 65535,
803 1_000_000,
804 i64::MAX,
805 ];
806 for &v in &test_values {
807 let encoded = encode_key_value(&Value::Integer(v));
808 let (decoded, _) = decode_key_value(&encoded).unwrap();
809 assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
810 }
811 }
812
813 #[test]
814 fn key_integer_sort_order() {
815 let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
816 let encoded: Vec<Vec<u8>> = values
817 .iter()
818 .map(|&v| encode_key_value(&Value::Integer(v)))
819 .collect();
820
821 for i in 0..encoded.len() - 1 {
822 assert!(
823 encoded[i] < encoded[i + 1],
824 "sort order broken: {} vs {}",
825 values[i],
826 values[i + 1]
827 );
828 }
829 }
830
831 #[test]
832 fn key_real_roundtrip() {
833 let test_values = [
834 f64::NEG_INFINITY,
835 -1e100,
836 -1.0,
837 -f64::MIN_POSITIVE,
838 -0.0,
839 0.0,
840 f64::MIN_POSITIVE,
841 0.5,
842 1.0,
843 1e100,
844 f64::INFINITY,
845 ];
846 for &v in &test_values {
847 let encoded = encode_key_value(&Value::Real(v));
848 let (decoded, _) = decode_key_value(&encoded).unwrap();
849 match decoded {
850 Value::Real(r) => {
851 assert!(
852 v.to_bits() == r.to_bits(),
853 "roundtrip failed for {v}: got {r}"
854 );
855 }
856 _ => panic!("expected Real"),
857 }
858 }
859 }
860
861 #[test]
862 fn key_real_sort_order() {
863 let values = [
864 f64::NEG_INFINITY,
865 -100.0,
866 -1.0,
867 -0.0,
868 0.0,
869 1.0,
870 100.0,
871 f64::INFINITY,
872 ];
873 let encoded: Vec<Vec<u8>> = values
874 .iter()
875 .map(|&v| encode_key_value(&Value::Real(v)))
876 .collect();
877
878 for i in 0..encoded.len() - 1 {
879 assert!(
880 encoded[i] <= encoded[i + 1],
881 "sort order broken: {} vs {}",
882 values[i],
883 values[i + 1]
884 );
885 }
886 }
887
888 #[test]
889 fn key_text_roundtrip() {
890 let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
891 for &v in &test_values {
892 let encoded = encode_key_value(&Value::Text(v.into()));
893 let (decoded, _) = decode_key_value(&encoded).unwrap();
894 assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
895 }
896 }
897
898 #[test]
899 fn key_text_sort_order() {
900 let values = ["", "a", "ab", "b", "ba", "z"];
901 let encoded: Vec<Vec<u8>> = values
902 .iter()
903 .map(|&v| encode_key_value(&Value::Text(v.into())))
904 .collect();
905
906 for i in 0..encoded.len() - 1 {
907 assert!(
908 encoded[i] < encoded[i + 1],
909 "sort order broken: {:?} vs {:?}",
910 values[i],
911 values[i + 1]
912 );
913 }
914 }
915
916 #[test]
917 fn key_blob_roundtrip() {
918 let test_values: Vec<Vec<u8>> = vec![
919 vec![],
920 vec![0x00],
921 vec![0x00, 0xFF],
922 vec![0xFF, 0x00],
923 vec![0x00, 0x00, 0x00],
924 ];
925 for v in &test_values {
926 let encoded = encode_key_value(&Value::Blob(v.clone()));
927 let (decoded, _) = decode_key_value(&encoded).unwrap();
928 assert_eq!(decoded, Value::Blob(v.clone()));
929 }
930 }
931
932 #[test]
933 fn key_composite_roundtrip() {
934 let values = vec![
935 Value::Integer(42),
936 Value::Text("hello".into()),
937 Value::Boolean(true),
938 ];
939 let encoded = encode_composite_key(&values);
940 let decoded = decode_composite_key(&encoded, 3).unwrap();
941 assert_eq!(decoded[0], Value::Integer(42));
942 assert_eq!(decoded[1], Value::Text("hello".into()));
943 assert_eq!(decoded[2], Value::Boolean(true));
944 }
945
946 #[test]
947 fn key_composite_sort_order() {
948 let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
950 let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
951 let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
952 assert!(k1 < k2);
953 assert!(k2 < k3);
954 }
955
956 #[test]
957 fn key_cross_type_ordering() {
958 let null = encode_key_value(&Value::Null);
959 let bool_val = encode_key_value(&Value::Boolean(false));
960 let int = encode_key_value(&Value::Integer(0));
961 let text = encode_key_value(&Value::Text("".into()));
962 let blob = encode_key_value(&Value::Blob(vec![]));
963
964 assert!(null < blob);
965 assert!(blob < text);
966 assert!(text < bool_val);
967 assert!(bool_val < int);
968 }
969
970 #[test]
973 fn row_roundtrip_simple() {
974 let values = vec![
975 Value::Integer(42),
976 Value::Text("hello".into()),
977 Value::Boolean(true),
978 ];
979 let encoded = encode_row(&values);
980 let decoded = decode_row(&encoded).unwrap();
981 assert_eq!(decoded.len(), 3);
982 assert_eq!(decoded[0], Value::Integer(42));
983 assert_eq!(decoded[1], Value::Text("hello".into()));
984 assert_eq!(decoded[2], Value::Boolean(true));
985 }
986
987 #[test]
988 fn row_roundtrip_with_nulls() {
989 let values = vec![
990 Value::Integer(1),
991 Value::Null,
992 Value::Text("test".into()),
993 Value::Null,
994 ];
995 let encoded = encode_row(&values);
996 let decoded = decode_row(&encoded).unwrap();
997 assert_eq!(decoded.len(), 4);
998 assert_eq!(decoded[0], Value::Integer(1));
999 assert!(decoded[1].is_null());
1000 assert_eq!(decoded[2], Value::Text("test".into()));
1001 assert!(decoded[3].is_null());
1002 }
1003
1004 #[test]
1005 fn row_roundtrip_empty() {
1006 let values: Vec<Value> = vec![];
1007 let encoded = encode_row(&values);
1008 let decoded = decode_row(&encoded).unwrap();
1009 assert!(decoded.is_empty());
1010 }
1011
1012 #[test]
1013 fn row_roundtrip_all_types() {
1014 let values = vec![
1015 Value::Integer(-100),
1016 Value::Real(3.15),
1017 Value::Text("hello world".into()),
1018 Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1019 Value::Boolean(false),
1020 Value::Null,
1021 ];
1022 let encoded = encode_row(&values);
1023 let decoded = decode_row(&encoded).unwrap();
1024 assert_eq!(decoded.len(), 6);
1025 assert_eq!(decoded[0], Value::Integer(-100));
1026 assert_eq!(decoded[1], Value::Real(3.15));
1027 assert_eq!(decoded[2], Value::Text("hello world".into()));
1028 assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1029 assert_eq!(decoded[4], Value::Boolean(false));
1030 assert!(decoded[5].is_null());
1031 }
1032
1033 #[test]
1034 fn null_escaped_with_embedded_nulls() {
1035 let text = "before\0after";
1036 let encoded = encode_key_value(&Value::Text(text.into()));
1037 let (decoded, _) = decode_key_value(&encoded).unwrap();
1038 assert_eq!(decoded, Value::Text(text.into()));
1039 }
1040
1041 #[test]
1042 fn key_integer_edge_cases() {
1043 for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1044 let encoded = encode_key_value(&Value::Integer(v));
1045 let (decoded, n) = decode_key_value(&encoded).unwrap();
1046 assert_eq!(n, encoded.len());
1047 assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1048 }
1049 }
1050
1051 #[test]
1052 fn decode_columns_single() {
1053 let values = vec![
1054 Value::Integer(42),
1055 Value::Text("hello".into()),
1056 Value::Boolean(true),
1057 ];
1058 let encoded = encode_row(&values);
1059 let cols = decode_columns(&encoded, &[1]).unwrap();
1060 assert_eq!(cols.len(), 1);
1061 assert_eq!(cols[0], Value::Text("hello".into()));
1062 }
1063
1064 #[test]
1065 fn decode_columns_multiple() {
1066 let values = vec![
1067 Value::Integer(1),
1068 Value::Real(2.5),
1069 Value::Text("skip".into()),
1070 Value::Boolean(false),
1071 Value::Blob(vec![0xAB]),
1072 ];
1073 let encoded = encode_row(&values);
1074 let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1075 assert_eq!(cols.len(), 3);
1076 assert_eq!(cols[0], Value::Integer(1));
1077 assert_eq!(cols[1], Value::Boolean(false));
1078 assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1079 }
1080
1081 #[test]
1082 fn decode_columns_with_nulls() {
1083 let values = vec![
1084 Value::Integer(10),
1085 Value::Null,
1086 Value::Text("after_null".into()),
1087 Value::Null,
1088 Value::Boolean(true),
1089 ];
1090 let encoded = encode_row(&values);
1091 let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1092 assert_eq!(cols.len(), 3);
1093 assert!(cols[0].is_null());
1094 assert_eq!(cols[1], Value::Text("after_null".into()));
1095 assert_eq!(cols[2], Value::Boolean(true));
1096 }
1097
1098 #[test]
1099 fn decode_columns_first_and_last() {
1100 let values = vec![
1101 Value::Text("first".into()),
1102 Value::Integer(99),
1103 Value::Boolean(false),
1104 Value::Real(3.125),
1105 ];
1106 let encoded = encode_row(&values);
1107 let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1108 assert_eq!(cols.len(), 2);
1109 assert_eq!(cols[0], Value::Text("first".into()));
1110 assert_eq!(cols[1], Value::Real(3.125));
1111 }
1112
1113 #[test]
1114 fn decode_columns_empty_targets() {
1115 let values = vec![Value::Integer(1)];
1116 let encoded = encode_row(&values);
1117 let cols = decode_columns(&encoded, &[]).unwrap();
1118 assert!(cols.is_empty());
1119 }
1120
1121 #[test]
1122 fn decode_columns_all_matches_full_decode() {
1123 let values = vec![
1124 Value::Integer(-100),
1125 Value::Real(3.15),
1126 Value::Text("hello world".into()),
1127 Value::Blob(vec![0xDE, 0xAD]),
1128 Value::Boolean(false),
1129 Value::Null,
1130 ];
1131 let encoded = encode_row(&values);
1132 let full = decode_row(&encoded).unwrap();
1133 let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1134 assert_eq!(full, selective);
1135 }
1136
1137 #[test]
1138 fn raw_column_integer() {
1139 let values = vec![Value::Integer(42), Value::Text("hello".into())];
1140 let encoded = encode_row(&values);
1141 let raw = decode_column_raw(&encoded, 0).unwrap();
1142 assert!(matches!(raw, RawColumn::Integer(42)));
1143 assert_eq!(raw.to_value(), Value::Integer(42));
1144 }
1145
1146 #[test]
1147 fn raw_column_text_borrows() {
1148 let values = vec![Value::Integer(1), Value::Text("hello".into())];
1149 let encoded = encode_row(&values);
1150 let raw = decode_column_raw(&encoded, 1).unwrap();
1151 match raw {
1152 RawColumn::Text(s) => assert_eq!(s, "hello"),
1153 other => panic!("expected Text, got {other:?}"),
1154 }
1155 }
1156
1157 #[test]
1158 fn raw_column_null() {
1159 let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1160 let encoded = encode_row(&values);
1161 let raw = decode_column_raw(&encoded, 1).unwrap();
1162 assert!(matches!(raw, RawColumn::Null));
1163 }
1164
1165 #[test]
1166 fn raw_column_last() {
1167 let values = vec![
1168 Value::Integer(1),
1169 Value::Text("skip".into()),
1170 Value::Real(3.15),
1171 ];
1172 let encoded = encode_row(&values);
1173 let raw = decode_column_raw(&encoded, 2).unwrap();
1174 match raw {
1175 RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1176 other => panic!("expected Real, got {other:?}"),
1177 }
1178 }
1179
1180 #[test]
1181 fn raw_column_out_of_bounds() {
1182 let values = vec![Value::Integer(1)];
1183 let encoded = encode_row(&values);
1184 assert!(decode_column_raw(&encoded, 1).is_err());
1185 }
1186
1187 #[test]
1188 fn raw_column_eq_value() {
1189 let raw_int = RawColumn::Integer(42);
1190 assert!(raw_int.eq_value(&Value::Integer(42)));
1191 assert!(!raw_int.eq_value(&Value::Integer(43)));
1192 assert!(raw_int.eq_value(&Value::Real(42.0)));
1193
1194 let raw_text = RawColumn::Text("hello");
1195 assert!(raw_text.eq_value(&Value::Text("hello".into())));
1196 assert!(!raw_text.eq_value(&Value::Text("world".into())));
1197 }
1198
1199 #[test]
1200 fn raw_column_cmp_value() {
1201 use std::cmp::Ordering;
1202 let raw = RawColumn::Integer(42);
1203 assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1204 assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1205 assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1206 assert_eq!(raw.cmp_value(&Value::Null), None);
1207 }
1208
1209 #[test]
1210 fn raw_column_as_numeric() {
1211 assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1212 assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1213 assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1214 assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1215 assert_eq!(RawColumn::Text("x").as_f64(), None);
1216 assert_eq!(RawColumn::Null.as_i64(), None);
1217 }
1218
1219 #[test]
1220 fn decode_pk_integer_roundtrip() {
1221 for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1222 let encoded = encode_key_value(&Value::Integer(v));
1223 let decoded = decode_pk_integer(&encoded).unwrap();
1224 assert_eq!(decoded, v);
1225 }
1226 }
1227
1228 #[test]
1229 fn decode_pk_integer_rejects_non_integer() {
1230 let encoded = encode_key_value(&Value::Text("hello".into()));
1231 assert!(decode_pk_integer(&encoded).is_err());
1232 }
1233
1234 #[test]
1235 fn raw_column_blob() {
1236 let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1237 let encoded = encode_row(&values);
1238 let raw = decode_column_raw(&encoded, 0).unwrap();
1239 match raw {
1240 RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1241 other => panic!("expected Blob, got {other:?}"),
1242 }
1243 }
1244
1245 #[test]
1246 fn raw_column_matches_full_decode() {
1247 let values = vec![
1248 Value::Integer(-100),
1249 Value::Real(3.15),
1250 Value::Text("hello world".into()),
1251 Value::Blob(vec![0xDE, 0xAD]),
1252 Value::Boolean(false),
1253 Value::Null,
1254 ];
1255 let encoded = encode_row(&values);
1256 let full = decode_row(&encoded).unwrap();
1257 for (i, expected) in full.iter().enumerate() {
1258 let raw = decode_column_raw(&encoded, i).unwrap();
1259 assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1260 }
1261 }
1262}