1use crate::error::{Result, SqlError};
5use crate::types::{CompactString, DataType, Value};
6
7const TAG_NULL: u8 = 0x00;
11const TAG_BLOB: u8 = 0x01;
12const TAG_TEXT: u8 = 0x02;
13const TAG_BOOLEAN: u8 = 0x03;
14const TAG_INTEGER: u8 = 0x04;
15const TAG_REAL: u8 = 0x05;
16
17pub fn encode_key_value(value: &Value) -> Vec<u8> {
19 match value {
20 Value::Null => vec![TAG_NULL],
21 Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
22 Value::Integer(i) => encode_integer(*i),
23 Value::Real(r) => encode_real(*r),
24 Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
25 Value::Blob(b) => encode_bytes(TAG_BLOB, b),
26 }
27}
28
29pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31 let mut buf = Vec::new();
32 for v in values {
33 buf.extend_from_slice(&encode_key_value(v));
34 }
35 buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39 buf.clear();
40 for v in values {
41 encode_key_value_into(v, buf);
42 }
43}
44
45fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
46 match value {
47 Value::Null => buf.push(TAG_NULL),
48 Value::Boolean(b) => {
49 buf.push(TAG_BOOLEAN);
50 buf.push(if *b { 0x01 } else { 0x00 });
51 }
52 Value::Integer(i) => encode_integer_into(*i, buf),
53 Value::Real(r) => encode_real_into(*r, buf),
54 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
55 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
56 }
57}
58
59fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
60 buf.push(TAG_INTEGER);
61 if val == 0 {
62 buf.push(0x80);
63 return;
64 }
65 if val > 0 {
66 let bytes = val.to_be_bytes();
67 let start = bytes.iter().position(|&b| b != 0).unwrap();
68 let byte_count = (8 - start) as u8;
69 buf.push(0x80 + byte_count);
70 buf.extend_from_slice(&bytes[start..]);
71 } else {
72 let abs_val = if val == i64::MIN {
73 u64::MAX / 2 + 1
74 } else {
75 (-val) as u64
76 };
77 let bytes = abs_val.to_be_bytes();
78 let start = bytes.iter().position(|&b| b != 0).unwrap();
79 let byte_count = (8 - start) as u8;
80 buf.push(0x80 - byte_count);
81 for &b in &bytes[start..] {
82 buf.push(!b);
83 }
84 }
85}
86
87fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
88 buf.push(TAG_REAL);
89 let bits = val.to_bits();
90 let encoded = if val.is_sign_negative() {
91 !bits
92 } else {
93 bits ^ (1u64 << 63)
94 };
95 buf.extend_from_slice(&encoded.to_be_bytes());
96}
97
98fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
99 buf.push(tag);
100 for &b in data {
101 if b == 0x00 {
102 buf.push(0x00);
103 buf.push(0xFF);
104 } else {
105 buf.push(b);
106 }
107 }
108 buf.push(0x00);
109}
110
111pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
113 if data.is_empty() {
114 return Err(SqlError::InvalidValue("empty key data".into()));
115 }
116 match data[0] {
117 TAG_NULL => Ok((Value::Null, 1)),
118 TAG_BOOLEAN => {
119 if data.len() < 2 {
120 return Err(SqlError::InvalidValue("truncated boolean".into()));
121 }
122 Ok((Value::Boolean(data[1] != 0), 2))
123 }
124 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
125 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
126 TAG_TEXT => {
127 let (bytes, n) = decode_null_escaped(&data[1..])?;
128 let s = String::from_utf8(bytes)
129 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
130 Ok((Value::Text(CompactString::from(s)), n + 1))
131 }
132 TAG_BLOB => {
133 let (bytes, n) = decode_null_escaped(&data[1..])?;
134 Ok((Value::Blob(bytes), n + 1))
135 }
136 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
137 }
138}
139
140pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
142 let mut values = Vec::with_capacity(count);
143 let mut pos = 0;
144 for _ in 0..count {
145 let (v, n) = decode_key_value(&data[pos..])?;
146 values.push(v);
147 pos += n;
148 }
149 Ok(values)
150}
151
152fn encode_integer(val: i64) -> Vec<u8> {
155 let mut buf = vec![TAG_INTEGER];
156 if val == 0 {
157 buf.push(0x80);
158 return buf;
159 }
160 if val > 0 {
161 let bytes = val.to_be_bytes();
162 let start = bytes.iter().position(|&b| b != 0).unwrap();
164 let byte_count = (8 - start) as u8;
165 buf.push(0x80 + byte_count);
166 buf.extend_from_slice(&bytes[start..]);
167 } else {
168 let abs_val = if val == i64::MIN {
170 u64::MAX / 2 + 1
172 } else {
173 (-val) as u64
174 };
175 let bytes = abs_val.to_be_bytes();
176 let start = bytes.iter().position(|&b| b != 0).unwrap();
177 let byte_count = (8 - start) as u8;
178 buf.push(0x80 - byte_count);
179 for &b in &bytes[start..] {
181 buf.push(!b);
182 }
183 }
184 buf
185}
186
187fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
188 if data.is_empty() {
189 return Err(SqlError::InvalidValue("truncated integer".into()));
190 }
191 let marker = data[0];
192 if marker == 0x80 {
193 return Ok((Value::Integer(0), 1));
194 }
195 if marker > 0x80 {
196 let byte_count = (marker - 0x80) as usize;
198 if data.len() < 1 + byte_count {
199 return Err(SqlError::InvalidValue("truncated positive integer".into()));
200 }
201 let mut bytes = [0u8; 8];
202 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
203 let val = i64::from_be_bytes(bytes);
204 Ok((Value::Integer(val), 1 + byte_count))
205 } else {
206 let byte_count = (0x80 - marker) as usize;
208 if data.len() < 1 + byte_count {
209 return Err(SqlError::InvalidValue("truncated negative integer".into()));
210 }
211 let mut bytes = [0u8; 8];
212 for i in 0..byte_count {
213 bytes[8 - byte_count + i] = !data[1 + i];
214 }
215 let abs_val = u64::from_be_bytes(bytes);
216 let val = (-(abs_val as i128)) as i64;
218 Ok((Value::Integer(val), 1 + byte_count))
219 }
220}
221
222fn encode_real(val: f64) -> Vec<u8> {
225 let mut buf = vec![TAG_REAL];
226 let bits = val.to_bits();
227 let encoded = if val.is_sign_negative() {
228 !bits
230 } else {
231 bits ^ (1u64 << 63)
233 };
234 buf.extend_from_slice(&encoded.to_be_bytes());
235 buf
236}
237
238fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
239 if data.len() < 8 {
240 return Err(SqlError::InvalidValue("truncated real".into()));
241 }
242 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
243 let bits = if encoded & (1u64 << 63) != 0 {
244 encoded ^ (1u64 << 63)
246 } else {
247 !encoded
249 };
250 let val = f64::from_bits(bits);
251 Ok((Value::Real(val), 8))
252}
253
254fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
258 let mut buf = Vec::with_capacity(data.len() + 2);
259 buf.push(tag);
260 for &b in data {
261 if b == 0x00 {
262 buf.push(0x00);
263 buf.push(0xFF);
264 } else {
265 buf.push(b);
266 }
267 }
268 buf.push(0x00); buf
270}
271
272fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
274 let mut result = Vec::new();
275 let mut i = 0;
276 while i < data.len() {
277 if data[i] == 0x00 {
278 if i + 1 < data.len() && data[i + 1] == 0xFF {
279 result.push(0x00);
280 i += 2;
281 } else {
282 return Ok((result, i + 1)); }
284 } else {
285 result.push(data[i]);
286 i += 1;
287 }
288 }
289 Err(SqlError::InvalidValue(
290 "unterminated null-escaped string".into(),
291 ))
292}
293
294pub fn encode_row(values: &[Value]) -> Vec<u8> {
299 let col_count = values.len();
300 let bitmap_bytes = col_count.div_ceil(8);
301 let mut buf = Vec::new();
302
303 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
305
306 let mut bitmap = vec![0u8; bitmap_bytes];
308 for (i, v) in values.iter().enumerate() {
309 if v.is_null() {
310 bitmap[i / 8] |= 1 << (i % 8);
311 }
312 }
313 buf.extend_from_slice(&bitmap);
314
315 for v in values {
317 if v.is_null() {
318 continue;
319 }
320 match v {
321 Value::Integer(i) => {
322 buf.push(DataType::Integer.type_tag());
323 buf.extend_from_slice(&8u32.to_le_bytes());
324 buf.extend_from_slice(&i.to_le_bytes());
325 }
326 Value::Real(r) => {
327 buf.push(DataType::Real.type_tag());
328 buf.extend_from_slice(&8u32.to_le_bytes());
329 buf.extend_from_slice(&r.to_le_bytes());
330 }
331 Value::Boolean(b) => {
332 buf.push(DataType::Boolean.type_tag());
333 buf.extend_from_slice(&1u32.to_le_bytes());
334 buf.push(if *b { 1 } else { 0 });
335 }
336 Value::Text(s) => {
337 let bytes = s.as_bytes();
338 buf.push(DataType::Text.type_tag());
339 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
340 buf.extend_from_slice(bytes);
341 }
342 Value::Blob(data) => {
343 buf.push(DataType::Blob.type_tag());
344 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
345 buf.extend_from_slice(data);
346 }
347 Value::Null => unreachable!(),
348 }
349 }
350
351 buf
352}
353
354pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
355 buf.clear();
356 let col_count = values.len();
357 let bitmap_bytes = col_count.div_ceil(8);
358
359 buf.extend_from_slice(&(col_count as u16).to_le_bytes());
360
361 let bitmap_start = buf.len();
362 buf.resize(buf.len() + bitmap_bytes, 0);
363
364 for (i, v) in values.iter().enumerate() {
365 if v.is_null() {
366 buf[bitmap_start + i / 8] |= 1 << (i % 8);
367 continue;
368 }
369 match v {
370 Value::Integer(val) => {
371 buf.push(DataType::Integer.type_tag());
372 buf.extend_from_slice(&8u32.to_le_bytes());
373 buf.extend_from_slice(&val.to_le_bytes());
374 }
375 Value::Real(r) => {
376 buf.push(DataType::Real.type_tag());
377 buf.extend_from_slice(&8u32.to_le_bytes());
378 buf.extend_from_slice(&r.to_le_bytes());
379 }
380 Value::Boolean(b) => {
381 buf.push(DataType::Boolean.type_tag());
382 buf.extend_from_slice(&1u32.to_le_bytes());
383 buf.push(if *b { 1 } else { 0 });
384 }
385 Value::Text(s) => {
386 let bytes = s.as_bytes();
387 buf.push(DataType::Text.type_tag());
388 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
389 buf.extend_from_slice(bytes);
390 }
391 Value::Blob(data) => {
392 buf.push(DataType::Blob.type_tag());
393 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
394 buf.extend_from_slice(data);
395 }
396 Value::Null => unreachable!(),
397 }
398 }
399}
400
401fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
402 match DataType::from_tag(type_tag) {
403 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
404 data[..8].try_into().unwrap(),
405 ))),
406 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
407 data[..8].try_into().unwrap(),
408 ))),
409 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
410 Some(DataType::Text) => {
411 let s = std::str::from_utf8(data)
412 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
413 Ok(Value::Text(CompactString::from(s)))
414 }
415 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
416 _ => Err(SqlError::InvalidValue(format!(
417 "unknown column type tag: {type_tag}"
418 ))),
419 }
420}
421
422fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
423 if data.len() < 2 {
424 return Err(SqlError::InvalidValue("row data too short".into()));
425 }
426 let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
427 let bitmap_bytes = col_count.div_ceil(8);
428 let pos = 2;
429 if data.len() < pos + bitmap_bytes {
430 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
431 }
432 Ok((
433 col_count,
434 &data[pos..pos + bitmap_bytes],
435 pos + bitmap_bytes,
436 ))
437}
438
439pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
440 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
441
442 let mut values = Vec::with_capacity(col_count);
443 for i in 0..col_count {
444 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
445 values.push(Value::Null);
446 continue;
447 }
448
449 if pos + 5 > data.len() {
450 return Err(SqlError::InvalidValue("truncated column data".into()));
451 }
452 let type_tag = data[pos];
453 pos += 1;
454 let data_len =
455 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
456 pos += 4;
457
458 if pos + data_len > data.len() {
459 return Err(SqlError::InvalidValue("truncated column value".into()));
460 }
461
462 values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
463 pos += data_len;
464 }
465
466 Ok(values)
467}
468
469#[inline]
471pub fn row_non_pk_count(data: &[u8]) -> usize {
472 u16::from_le_bytes([data[0], data[1]]) as usize
473}
474
475pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
476 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
477
478 for i in 0..col_count {
479 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
480 continue;
481 }
482
483 if pos + 5 > data.len() {
484 return Err(SqlError::InvalidValue("truncated column data".into()));
485 }
486 let type_tag = data[pos];
487 pos += 1;
488 let data_len =
489 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
490 pos += 4;
491
492 if pos + data_len > data.len() {
493 return Err(SqlError::InvalidValue("truncated column value".into()));
494 }
495
496 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
497 out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
498 }
499 pos += data_len;
500 }
501
502 Ok(())
503}
504
505pub fn decode_pk_into(
506 key: &[u8],
507 count: usize,
508 out: &mut [Value],
509 pk_mapping: &[usize],
510) -> Result<()> {
511 let mut pos = 0;
512 for i in 0..count {
513 let (v, n) = decode_key_value(&key[pos..])?;
514 if i < pk_mapping.len() {
515 out[pk_mapping[i]] = v;
516 }
517 pos += n;
518 }
519 Ok(())
520}
521
522pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
523 if targets.is_empty() {
524 return Ok(Vec::new());
525 }
526 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
527
528 let mut results = Vec::with_capacity(targets.len());
529 let mut ti = 0;
530
531 for col in 0..col_count {
532 if ti >= targets.len() {
533 break;
534 }
535 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
536
537 if col == targets[ti] {
538 if is_null {
539 results.push(Value::Null);
540 } else {
541 if pos + 5 > data.len() {
542 return Err(SqlError::InvalidValue("truncated column data".into()));
543 }
544 let type_tag = data[pos];
545 pos += 1;
546 let data_len =
547 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
548 as usize;
549 pos += 4;
550 if pos + data_len > data.len() {
551 return Err(SqlError::InvalidValue("truncated column value".into()));
552 }
553 results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
554 pos += data_len;
555 }
556 ti += 1;
557 } else if !is_null {
558 if pos + 5 > data.len() {
559 return Err(SqlError::InvalidValue("truncated column data".into()));
560 }
561 let data_len =
562 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
563 as usize;
564 pos += 5 + data_len;
565 }
566 }
567
568 while ti < targets.len() {
570 results.push(Value::Null);
571 ti += 1;
572 }
573
574 Ok(results)
575}
576
577pub fn decode_columns_into(
578 data: &[u8],
579 targets: &[usize],
580 schema_cols: &[usize],
581 row: &mut [Value],
582) -> Result<()> {
583 if targets.is_empty() {
584 return Ok(());
585 }
586 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
587
588 let mut ti = 0;
589 for col in 0..col_count {
590 if ti >= targets.len() {
591 break;
592 }
593 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
594
595 if col == targets[ti] {
596 if is_null {
597 row[schema_cols[ti]] = Value::Null;
598 } else {
599 if pos + 5 > data.len() {
600 return Err(SqlError::InvalidValue("truncated column data".into()));
601 }
602 let type_tag = data[pos];
603 pos += 1;
604 let data_len =
605 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
606 as usize;
607 pos += 4;
608 if pos + data_len > data.len() {
609 return Err(SqlError::InvalidValue("truncated column value".into()));
610 }
611 row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
612 pos += data_len;
613 }
614 ti += 1;
615 } else if !is_null {
616 if pos + 5 > data.len() {
617 return Err(SqlError::InvalidValue("truncated column data".into()));
618 }
619 let data_len =
620 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
621 as usize;
622 pos += 5 + data_len;
623 }
624 }
625
626 Ok(())
627}
628
629#[derive(Debug, Clone, Copy)]
630pub enum RawColumn<'a> {
631 Null,
632 Integer(i64),
633 Real(f64),
634 Boolean(bool),
635 Text(&'a str),
636 Blob(&'a [u8]),
637}
638
639impl<'a> RawColumn<'a> {
640 pub fn to_value(self) -> Value {
641 match self {
642 RawColumn::Null => Value::Null,
643 RawColumn::Integer(i) => Value::Integer(i),
644 RawColumn::Real(r) => Value::Real(r),
645 RawColumn::Boolean(b) => Value::Boolean(b),
646 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
647 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
648 }
649 }
650
651 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
652 use std::cmp::Ordering;
653 match (self, other) {
654 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
655 (RawColumn::Null, _) | (_, Value::Null) => None,
656 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
657 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
658 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
659 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
660 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
661 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
662 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
663 _ => None,
664 }
665 }
666
667 pub fn eq_value(&self, other: &Value) -> bool {
668 match (self, other) {
669 (RawColumn::Null, Value::Null) => true,
670 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
671 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
672 (RawColumn::Real(a), Value::Real(b)) => a == b,
673 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
674 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
675 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
676 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
677 _ => false,
678 }
679 }
680
681 pub fn as_f64(&self) -> Option<f64> {
682 match self {
683 RawColumn::Integer(i) => Some(*i as f64),
684 RawColumn::Real(r) => Some(*r),
685 _ => None,
686 }
687 }
688
689 pub fn as_i64(&self) -> Option<i64> {
690 match self {
691 RawColumn::Integer(i) => Some(*i),
692 _ => None,
693 }
694 }
695}
696
697fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
698 match DataType::from_tag(type_tag) {
699 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
700 data[..8].try_into().unwrap(),
701 ))),
702 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
703 data[..8].try_into().unwrap(),
704 ))),
705 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
706 Some(DataType::Text) => {
707 let s = std::str::from_utf8(data)
708 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
709 Ok(RawColumn::Text(s))
710 }
711 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
712 _ => Err(SqlError::InvalidValue(format!(
713 "unknown column type tag: {type_tag}"
714 ))),
715 }
716}
717
718pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
719 let (col_count, bitmap, mut pos) = parse_row_header(data)?;
720 if target >= col_count {
721 return Ok(RawColumn::Null);
722 }
723
724 for col in 0..=target {
725 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
726
727 if col == target {
728 if is_null {
729 return Ok(RawColumn::Null);
730 }
731 if pos + 5 > data.len() {
732 return Err(SqlError::InvalidValue("truncated column data".into()));
733 }
734 let type_tag = data[pos];
735 pos += 1;
736 let data_len =
737 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
738 as usize;
739 pos += 4;
740 if pos + data_len > data.len() {
741 return Err(SqlError::InvalidValue("truncated column value".into()));
742 }
743 return decode_value_raw(type_tag, &data[pos..pos + data_len]);
744 } else if !is_null {
745 if pos + 5 > data.len() {
746 return Err(SqlError::InvalidValue("truncated column data".into()));
747 }
748 let data_len =
749 u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
750 as usize;
751 pos += 5 + data_len;
752 }
753 }
754
755 unreachable!()
756}
757
758pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
759 if key.is_empty() || key[0] != TAG_INTEGER {
760 return Err(SqlError::InvalidValue("not an integer key".into()));
761 }
762 let (val, _) = decode_integer(&key[1..])?;
763 match val {
764 Value::Integer(i) => Ok(i),
765 _ => unreachable!(),
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
776 fn key_null() {
777 let encoded = encode_key_value(&Value::Null);
778 let (decoded, n) = decode_key_value(&encoded).unwrap();
779 assert_eq!(n, 1);
780 assert_eq!(decoded, Value::Null);
781 }
782
783 #[test]
784 fn key_boolean() {
785 let f_enc = encode_key_value(&Value::Boolean(false));
786 let t_enc = encode_key_value(&Value::Boolean(true));
787 assert!(f_enc < t_enc);
788
789 let (f_dec, _) = decode_key_value(&f_enc).unwrap();
790 let (t_dec, _) = decode_key_value(&t_enc).unwrap();
791 assert_eq!(f_dec, Value::Boolean(false));
792 assert_eq!(t_dec, Value::Boolean(true));
793 }
794
795 #[test]
796 fn key_integer_roundtrip() {
797 let test_values = [
798 i64::MIN,
799 -1_000_000,
800 -256,
801 -1,
802 0,
803 1,
804 127,
805 128,
806 255,
807 256,
808 65535,
809 1_000_000,
810 i64::MAX,
811 ];
812 for &v in &test_values {
813 let encoded = encode_key_value(&Value::Integer(v));
814 let (decoded, _) = decode_key_value(&encoded).unwrap();
815 assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
816 }
817 }
818
819 #[test]
820 fn key_integer_sort_order() {
821 let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
822 let encoded: Vec<Vec<u8>> = values
823 .iter()
824 .map(|&v| encode_key_value(&Value::Integer(v)))
825 .collect();
826
827 for i in 0..encoded.len() - 1 {
828 assert!(
829 encoded[i] < encoded[i + 1],
830 "sort order broken: {} vs {}",
831 values[i],
832 values[i + 1]
833 );
834 }
835 }
836
837 #[test]
838 fn key_real_roundtrip() {
839 let test_values = [
840 f64::NEG_INFINITY,
841 -1e100,
842 -1.0,
843 -f64::MIN_POSITIVE,
844 -0.0,
845 0.0,
846 f64::MIN_POSITIVE,
847 0.5,
848 1.0,
849 1e100,
850 f64::INFINITY,
851 ];
852 for &v in &test_values {
853 let encoded = encode_key_value(&Value::Real(v));
854 let (decoded, _) = decode_key_value(&encoded).unwrap();
855 match decoded {
856 Value::Real(r) => {
857 assert!(
858 v.to_bits() == r.to_bits(),
859 "roundtrip failed for {v}: got {r}"
860 );
861 }
862 _ => panic!("expected Real"),
863 }
864 }
865 }
866
867 #[test]
868 fn key_real_sort_order() {
869 let values = [
870 f64::NEG_INFINITY,
871 -100.0,
872 -1.0,
873 -0.0,
874 0.0,
875 1.0,
876 100.0,
877 f64::INFINITY,
878 ];
879 let encoded: Vec<Vec<u8>> = values
880 .iter()
881 .map(|&v| encode_key_value(&Value::Real(v)))
882 .collect();
883
884 for i in 0..encoded.len() - 1 {
885 assert!(
886 encoded[i] <= encoded[i + 1],
887 "sort order broken: {} vs {}",
888 values[i],
889 values[i + 1]
890 );
891 }
892 }
893
894 #[test]
895 fn key_text_roundtrip() {
896 let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
897 for &v in &test_values {
898 let encoded = encode_key_value(&Value::Text(v.into()));
899 let (decoded, _) = decode_key_value(&encoded).unwrap();
900 assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
901 }
902 }
903
904 #[test]
905 fn key_text_sort_order() {
906 let values = ["", "a", "ab", "b", "ba", "z"];
907 let encoded: Vec<Vec<u8>> = values
908 .iter()
909 .map(|&v| encode_key_value(&Value::Text(v.into())))
910 .collect();
911
912 for i in 0..encoded.len() - 1 {
913 assert!(
914 encoded[i] < encoded[i + 1],
915 "sort order broken: {:?} vs {:?}",
916 values[i],
917 values[i + 1]
918 );
919 }
920 }
921
922 #[test]
923 fn key_blob_roundtrip() {
924 let test_values: Vec<Vec<u8>> = vec![
925 vec![],
926 vec![0x00],
927 vec![0x00, 0xFF],
928 vec![0xFF, 0x00],
929 vec![0x00, 0x00, 0x00],
930 ];
931 for v in &test_values {
932 let encoded = encode_key_value(&Value::Blob(v.clone()));
933 let (decoded, _) = decode_key_value(&encoded).unwrap();
934 assert_eq!(decoded, Value::Blob(v.clone()));
935 }
936 }
937
938 #[test]
939 fn key_composite_roundtrip() {
940 let values = vec![
941 Value::Integer(42),
942 Value::Text("hello".into()),
943 Value::Boolean(true),
944 ];
945 let encoded = encode_composite_key(&values);
946 let decoded = decode_composite_key(&encoded, 3).unwrap();
947 assert_eq!(decoded[0], Value::Integer(42));
948 assert_eq!(decoded[1], Value::Text("hello".into()));
949 assert_eq!(decoded[2], Value::Boolean(true));
950 }
951
952 #[test]
953 fn key_composite_sort_order() {
954 let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
956 let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
957 let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
958 assert!(k1 < k2);
959 assert!(k2 < k3);
960 }
961
962 #[test]
963 fn key_cross_type_ordering() {
964 let null = encode_key_value(&Value::Null);
965 let bool_val = encode_key_value(&Value::Boolean(false));
966 let int = encode_key_value(&Value::Integer(0));
967 let text = encode_key_value(&Value::Text("".into()));
968 let blob = encode_key_value(&Value::Blob(vec![]));
969
970 assert!(null < blob);
971 assert!(blob < text);
972 assert!(text < bool_val);
973 assert!(bool_val < int);
974 }
975
976 #[test]
979 fn row_roundtrip_simple() {
980 let values = vec![
981 Value::Integer(42),
982 Value::Text("hello".into()),
983 Value::Boolean(true),
984 ];
985 let encoded = encode_row(&values);
986 let decoded = decode_row(&encoded).unwrap();
987 assert_eq!(decoded.len(), 3);
988 assert_eq!(decoded[0], Value::Integer(42));
989 assert_eq!(decoded[1], Value::Text("hello".into()));
990 assert_eq!(decoded[2], Value::Boolean(true));
991 }
992
993 #[test]
994 fn row_roundtrip_with_nulls() {
995 let values = vec![
996 Value::Integer(1),
997 Value::Null,
998 Value::Text("test".into()),
999 Value::Null,
1000 ];
1001 let encoded = encode_row(&values);
1002 let decoded = decode_row(&encoded).unwrap();
1003 assert_eq!(decoded.len(), 4);
1004 assert_eq!(decoded[0], Value::Integer(1));
1005 assert!(decoded[1].is_null());
1006 assert_eq!(decoded[2], Value::Text("test".into()));
1007 assert!(decoded[3].is_null());
1008 }
1009
1010 #[test]
1011 fn row_roundtrip_empty() {
1012 let values: Vec<Value> = vec![];
1013 let encoded = encode_row(&values);
1014 let decoded = decode_row(&encoded).unwrap();
1015 assert!(decoded.is_empty());
1016 }
1017
1018 #[test]
1019 fn row_roundtrip_all_types() {
1020 let values = vec![
1021 Value::Integer(-100),
1022 Value::Real(3.15),
1023 Value::Text("hello world".into()),
1024 Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1025 Value::Boolean(false),
1026 Value::Null,
1027 ];
1028 let encoded = encode_row(&values);
1029 let decoded = decode_row(&encoded).unwrap();
1030 assert_eq!(decoded.len(), 6);
1031 assert_eq!(decoded[0], Value::Integer(-100));
1032 assert_eq!(decoded[1], Value::Real(3.15));
1033 assert_eq!(decoded[2], Value::Text("hello world".into()));
1034 assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1035 assert_eq!(decoded[4], Value::Boolean(false));
1036 assert!(decoded[5].is_null());
1037 }
1038
1039 #[test]
1040 fn null_escaped_with_embedded_nulls() {
1041 let text = "before\0after";
1042 let encoded = encode_key_value(&Value::Text(text.into()));
1043 let (decoded, _) = decode_key_value(&encoded).unwrap();
1044 assert_eq!(decoded, Value::Text(text.into()));
1045 }
1046
1047 #[test]
1048 fn key_integer_edge_cases() {
1049 for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1050 let encoded = encode_key_value(&Value::Integer(v));
1051 let (decoded, n) = decode_key_value(&encoded).unwrap();
1052 assert_eq!(n, encoded.len());
1053 assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1054 }
1055 }
1056
1057 #[test]
1058 fn decode_columns_single() {
1059 let values = vec![
1060 Value::Integer(42),
1061 Value::Text("hello".into()),
1062 Value::Boolean(true),
1063 ];
1064 let encoded = encode_row(&values);
1065 let cols = decode_columns(&encoded, &[1]).unwrap();
1066 assert_eq!(cols.len(), 1);
1067 assert_eq!(cols[0], Value::Text("hello".into()));
1068 }
1069
1070 #[test]
1071 fn decode_columns_multiple() {
1072 let values = vec![
1073 Value::Integer(1),
1074 Value::Real(2.5),
1075 Value::Text("skip".into()),
1076 Value::Boolean(false),
1077 Value::Blob(vec![0xAB]),
1078 ];
1079 let encoded = encode_row(&values);
1080 let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1081 assert_eq!(cols.len(), 3);
1082 assert_eq!(cols[0], Value::Integer(1));
1083 assert_eq!(cols[1], Value::Boolean(false));
1084 assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1085 }
1086
1087 #[test]
1088 fn decode_columns_with_nulls() {
1089 let values = vec![
1090 Value::Integer(10),
1091 Value::Null,
1092 Value::Text("after_null".into()),
1093 Value::Null,
1094 Value::Boolean(true),
1095 ];
1096 let encoded = encode_row(&values);
1097 let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1098 assert_eq!(cols.len(), 3);
1099 assert!(cols[0].is_null());
1100 assert_eq!(cols[1], Value::Text("after_null".into()));
1101 assert_eq!(cols[2], Value::Boolean(true));
1102 }
1103
1104 #[test]
1105 fn decode_columns_first_and_last() {
1106 let values = vec![
1107 Value::Text("first".into()),
1108 Value::Integer(99),
1109 Value::Boolean(false),
1110 Value::Real(3.125),
1111 ];
1112 let encoded = encode_row(&values);
1113 let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1114 assert_eq!(cols.len(), 2);
1115 assert_eq!(cols[0], Value::Text("first".into()));
1116 assert_eq!(cols[1], Value::Real(3.125));
1117 }
1118
1119 #[test]
1120 fn decode_columns_empty_targets() {
1121 let values = vec![Value::Integer(1)];
1122 let encoded = encode_row(&values);
1123 let cols = decode_columns(&encoded, &[]).unwrap();
1124 assert!(cols.is_empty());
1125 }
1126
1127 #[test]
1128 fn decode_columns_all_matches_full_decode() {
1129 let values = vec![
1130 Value::Integer(-100),
1131 Value::Real(3.15),
1132 Value::Text("hello world".into()),
1133 Value::Blob(vec![0xDE, 0xAD]),
1134 Value::Boolean(false),
1135 Value::Null,
1136 ];
1137 let encoded = encode_row(&values);
1138 let full = decode_row(&encoded).unwrap();
1139 let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1140 assert_eq!(full, selective);
1141 }
1142
1143 #[test]
1144 fn raw_column_integer() {
1145 let values = vec![Value::Integer(42), Value::Text("hello".into())];
1146 let encoded = encode_row(&values);
1147 let raw = decode_column_raw(&encoded, 0).unwrap();
1148 assert!(matches!(raw, RawColumn::Integer(42)));
1149 assert_eq!(raw.to_value(), Value::Integer(42));
1150 }
1151
1152 #[test]
1153 fn raw_column_text_borrows() {
1154 let values = vec![Value::Integer(1), Value::Text("hello".into())];
1155 let encoded = encode_row(&values);
1156 let raw = decode_column_raw(&encoded, 1).unwrap();
1157 match raw {
1158 RawColumn::Text(s) => assert_eq!(s, "hello"),
1159 other => panic!("expected Text, got {other:?}"),
1160 }
1161 }
1162
1163 #[test]
1164 fn raw_column_null() {
1165 let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1166 let encoded = encode_row(&values);
1167 let raw = decode_column_raw(&encoded, 1).unwrap();
1168 assert!(matches!(raw, RawColumn::Null));
1169 }
1170
1171 #[test]
1172 fn raw_column_last() {
1173 let values = vec![
1174 Value::Integer(1),
1175 Value::Text("skip".into()),
1176 Value::Real(3.15),
1177 ];
1178 let encoded = encode_row(&values);
1179 let raw = decode_column_raw(&encoded, 2).unwrap();
1180 match raw {
1181 RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1182 other => panic!("expected Real, got {other:?}"),
1183 }
1184 }
1185
1186 #[test]
1187 fn raw_column_out_of_bounds_returns_null() {
1188 let values = vec![Value::Integer(1)];
1189 let encoded = encode_row(&values);
1190 assert!(matches!(
1191 decode_column_raw(&encoded, 1).unwrap(),
1192 RawColumn::Null
1193 ));
1194 }
1195
1196 #[test]
1197 fn raw_column_eq_value() {
1198 let raw_int = RawColumn::Integer(42);
1199 assert!(raw_int.eq_value(&Value::Integer(42)));
1200 assert!(!raw_int.eq_value(&Value::Integer(43)));
1201 assert!(raw_int.eq_value(&Value::Real(42.0)));
1202
1203 let raw_text = RawColumn::Text("hello");
1204 assert!(raw_text.eq_value(&Value::Text("hello".into())));
1205 assert!(!raw_text.eq_value(&Value::Text("world".into())));
1206 }
1207
1208 #[test]
1209 fn raw_column_cmp_value() {
1210 use std::cmp::Ordering;
1211 let raw = RawColumn::Integer(42);
1212 assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1213 assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1214 assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1215 assert_eq!(raw.cmp_value(&Value::Null), None);
1216 }
1217
1218 #[test]
1219 fn raw_column_as_numeric() {
1220 assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1221 assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1222 assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1223 assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1224 assert_eq!(RawColumn::Text("x").as_f64(), None);
1225 assert_eq!(RawColumn::Null.as_i64(), None);
1226 }
1227
1228 #[test]
1229 fn decode_pk_integer_roundtrip() {
1230 for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1231 let encoded = encode_key_value(&Value::Integer(v));
1232 let decoded = decode_pk_integer(&encoded).unwrap();
1233 assert_eq!(decoded, v);
1234 }
1235 }
1236
1237 #[test]
1238 fn decode_pk_integer_rejects_non_integer() {
1239 let encoded = encode_key_value(&Value::Text("hello".into()));
1240 assert!(decode_pk_integer(&encoded).is_err());
1241 }
1242
1243 #[test]
1244 fn raw_column_blob() {
1245 let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1246 let encoded = encode_row(&values);
1247 let raw = decode_column_raw(&encoded, 0).unwrap();
1248 match raw {
1249 RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1250 other => panic!("expected Blob, got {other:?}"),
1251 }
1252 }
1253
1254 #[test]
1255 fn raw_column_matches_full_decode() {
1256 let values = vec![
1257 Value::Integer(-100),
1258 Value::Real(3.15),
1259 Value::Text("hello world".into()),
1260 Value::Blob(vec![0xDE, 0xAD]),
1261 Value::Boolean(false),
1262 Value::Null,
1263 ];
1264 let encoded = encode_row(&values);
1265 let full = decode_row(&encoded).unwrap();
1266 for (i, expected) in full.iter().enumerate() {
1267 let raw = decode_column_raw(&encoded, i).unwrap();
1268 assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1269 }
1270 }
1271}