Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding (tuple layer)
2//! and row encoding for non-PK column storage.
3
4use crate::error::{Result, SqlError};
5use crate::types::{CompactString, DataType, Value};
6
7// ── Key encoding (order-preserving) ─────────────────────────────────
8
9/// Type tag bytes for key encoding. Ordering: NULL < BLOB < TEXT < BOOLEAN < INTEGER < REAL
10const TAG_NULL: u8 = 0x00;
11const TAG_BLOB: u8 = 0x01;
12const TAG_TEXT: u8 = 0x02;
13const TAG_BOOLEAN: u8 = 0x03;
14const TAG_INTEGER: u8 = 0x04;
15const TAG_REAL: u8 = 0x05;
16
17/// Encode a single value into an order-preserving byte sequence.
18pub fn encode_key_value(value: &Value) -> Vec<u8> {
19    match value {
20        Value::Null => vec![TAG_NULL],
21        Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
22        Value::Integer(i) => encode_integer(*i),
23        Value::Real(r) => encode_real(*r),
24        Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
25        Value::Blob(b) => encode_bytes(TAG_BLOB, b),
26    }
27}
28
29/// Encode a composite key (multiple values concatenated).
30pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31    let mut buf = Vec::new();
32    for v in values {
33        buf.extend_from_slice(&encode_key_value(v));
34    }
35    buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39    buf.clear();
40    for v in values {
41        encode_key_value_into(v, buf);
42    }
43}
44
45fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
46    match value {
47        Value::Null => buf.push(TAG_NULL),
48        Value::Boolean(b) => {
49            buf.push(TAG_BOOLEAN);
50            buf.push(if *b { 0x01 } else { 0x00 });
51        }
52        Value::Integer(i) => encode_integer_into(*i, buf),
53        Value::Real(r) => encode_real_into(*r, buf),
54        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
55        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
56    }
57}
58
59fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
60    buf.push(TAG_INTEGER);
61    if val == 0 {
62        buf.push(0x80);
63        return;
64    }
65    if val > 0 {
66        let bytes = val.to_be_bytes();
67        let start = bytes.iter().position(|&b| b != 0).unwrap();
68        let byte_count = (8 - start) as u8;
69        buf.push(0x80 + byte_count);
70        buf.extend_from_slice(&bytes[start..]);
71    } else {
72        let abs_val = if val == i64::MIN {
73            u64::MAX / 2 + 1
74        } else {
75            (-val) as u64
76        };
77        let bytes = abs_val.to_be_bytes();
78        let start = bytes.iter().position(|&b| b != 0).unwrap();
79        let byte_count = (8 - start) as u8;
80        buf.push(0x80 - byte_count);
81        for &b in &bytes[start..] {
82            buf.push(!b);
83        }
84    }
85}
86
87fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
88    buf.push(TAG_REAL);
89    let bits = val.to_bits();
90    let encoded = if val.is_sign_negative() {
91        !bits
92    } else {
93        bits ^ (1u64 << 63)
94    };
95    buf.extend_from_slice(&encoded.to_be_bytes());
96}
97
98fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
99    buf.push(tag);
100    for &b in data {
101        if b == 0x00 {
102            buf.push(0x00);
103            buf.push(0xFF);
104        } else {
105            buf.push(b);
106        }
107    }
108    buf.push(0x00);
109}
110
111/// Decode a single key value, returning the value and the number of bytes consumed.
112pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
113    if data.is_empty() {
114        return Err(SqlError::InvalidValue("empty key data".into()));
115    }
116    match data[0] {
117        TAG_NULL => Ok((Value::Null, 1)),
118        TAG_BOOLEAN => {
119            if data.len() < 2 {
120                return Err(SqlError::InvalidValue("truncated boolean".into()));
121            }
122            Ok((Value::Boolean(data[1] != 0), 2))
123        }
124        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
125        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
126        TAG_TEXT => {
127            let (bytes, n) = decode_null_escaped(&data[1..])?;
128            let s = String::from_utf8(bytes)
129                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
130            Ok((Value::Text(CompactString::from(s)), n + 1))
131        }
132        TAG_BLOB => {
133            let (bytes, n) = decode_null_escaped(&data[1..])?;
134            Ok((Value::Blob(bytes), n + 1))
135        }
136        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
137    }
138}
139
140/// Decode a composite key into multiple values.
141pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
142    let mut values = Vec::with_capacity(count);
143    let mut pos = 0;
144    for _ in 0..count {
145        let (v, n) = decode_key_value(&data[pos..])?;
146        values.push(v);
147        pos += n;
148    }
149    Ok(values)
150}
151
152// ── Integer encoding (variable-width) ───────────────────────────────
153
154fn encode_integer(val: i64) -> Vec<u8> {
155    let mut buf = vec![TAG_INTEGER];
156    if val == 0 {
157        buf.push(0x80);
158        return buf;
159    }
160    if val > 0 {
161        let bytes = val.to_be_bytes();
162        // Find first non-zero byte
163        let start = bytes.iter().position(|&b| b != 0).unwrap();
164        let byte_count = (8 - start) as u8;
165        buf.push(0x80 + byte_count);
166        buf.extend_from_slice(&bytes[start..]);
167    } else {
168        // Negative: one's complement of absolute value
169        let abs_val = if val == i64::MIN {
170            // Special case: |i64::MIN| doesn't fit in i64
171            u64::MAX / 2 + 1
172        } else {
173            (-val) as u64
174        };
175        let bytes = abs_val.to_be_bytes();
176        let start = bytes.iter().position(|&b| b != 0).unwrap();
177        let byte_count = (8 - start) as u8;
178        buf.push(0x80 - byte_count);
179        // One's complement: invert all bits
180        for &b in &bytes[start..] {
181            buf.push(!b);
182        }
183    }
184    buf
185}
186
187fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
188    if data.is_empty() {
189        return Err(SqlError::InvalidValue("truncated integer".into()));
190    }
191    let marker = data[0];
192    if marker == 0x80 {
193        return Ok((Value::Integer(0), 1));
194    }
195    if marker > 0x80 {
196        // Positive
197        let byte_count = (marker - 0x80) as usize;
198        if data.len() < 1 + byte_count {
199            return Err(SqlError::InvalidValue("truncated positive integer".into()));
200        }
201        let mut bytes = [0u8; 8];
202        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
203        let val = i64::from_be_bytes(bytes);
204        Ok((Value::Integer(val), 1 + byte_count))
205    } else {
206        // Negative
207        let byte_count = (0x80 - marker) as usize;
208        if data.len() < 1 + byte_count {
209            return Err(SqlError::InvalidValue("truncated negative integer".into()));
210        }
211        let mut bytes = [0u8; 8];
212        for i in 0..byte_count {
213            bytes[8 - byte_count + i] = !data[1 + i];
214        }
215        let abs_val = u64::from_be_bytes(bytes);
216        // Use wrapping negation to handle i64::MIN correctly
217        let val = (-(abs_val as i128)) as i64;
218        Ok((Value::Integer(val), 1 + byte_count))
219    }
220}
221
222// ── Real encoding (IEEE 754 sign-bit manipulation) ──────────────────
223
224fn encode_real(val: f64) -> Vec<u8> {
225    let mut buf = vec![TAG_REAL];
226    let bits = val.to_bits();
227    let encoded = if val.is_sign_negative() {
228        // Negative (including -0.0): flip ALL bits
229        !bits
230    } else {
231        // Positive (including +0.0): flip sign bit only
232        bits ^ (1u64 << 63)
233    };
234    buf.extend_from_slice(&encoded.to_be_bytes());
235    buf
236}
237
238fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
239    if data.len() < 8 {
240        return Err(SqlError::InvalidValue("truncated real".into()));
241    }
242    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
243    let bits = if encoded & (1u64 << 63) != 0 {
244        // Was positive: undo sign bit flip
245        encoded ^ (1u64 << 63)
246    } else {
247        // Was negative: undo full inversion
248        !encoded
249    };
250    let val = f64::from_bits(bits);
251    Ok((Value::Real(val), 8))
252}
253
254// ── Null-escaped byte encoding ──────────────────────────────────────
255
256/// Encode bytes with null-escape: 0x00 -> 0x00 0xFF, terminated by bare 0x00.
257fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
258    let mut buf = Vec::with_capacity(data.len() + 2);
259    buf.push(tag);
260    for &b in data {
261        if b == 0x00 {
262            buf.push(0x00);
263            buf.push(0xFF);
264        } else {
265            buf.push(b);
266        }
267    }
268    buf.push(0x00); // terminator
269    buf
270}
271
272/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
273fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
274    let mut result = Vec::new();
275    let mut i = 0;
276    while i < data.len() {
277        if data[i] == 0x00 {
278            if i + 1 < data.len() && data[i + 1] == 0xFF {
279                result.push(0x00);
280                i += 2;
281            } else {
282                return Ok((result, i + 1)); // terminator consumed
283            }
284        } else {
285            result.push(data[i]);
286            i += 1;
287        }
288    }
289    Err(SqlError::InvalidValue(
290        "unterminated null-escaped string".into(),
291    ))
292}
293
294// ── Row encoding (for B+ tree values - non-PK columns) ─────────────
295
296/// Encode non-PK column values into a row.
297/// Format: [col_count: u16][null_bitmap][per-column: data_type(u8) + data_len(u32) + data]
298pub fn encode_row(values: &[Value]) -> Vec<u8> {
299    let col_count = values.len();
300    let bitmap_bytes = col_count.div_ceil(8);
301    let mut buf = Vec::new();
302
303    // Column count
304    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
305
306    // Null bitmap
307    let mut bitmap = vec![0u8; bitmap_bytes];
308    for (i, v) in values.iter().enumerate() {
309        if v.is_null() {
310            bitmap[i / 8] |= 1 << (i % 8);
311        }
312    }
313    buf.extend_from_slice(&bitmap);
314
315    // Column data
316    for v in values {
317        if v.is_null() {
318            continue;
319        }
320        match v {
321            Value::Integer(i) => {
322                buf.push(DataType::Integer.type_tag());
323                buf.extend_from_slice(&8u32.to_le_bytes());
324                buf.extend_from_slice(&i.to_le_bytes());
325            }
326            Value::Real(r) => {
327                buf.push(DataType::Real.type_tag());
328                buf.extend_from_slice(&8u32.to_le_bytes());
329                buf.extend_from_slice(&r.to_le_bytes());
330            }
331            Value::Boolean(b) => {
332                buf.push(DataType::Boolean.type_tag());
333                buf.extend_from_slice(&1u32.to_le_bytes());
334                buf.push(if *b { 1 } else { 0 });
335            }
336            Value::Text(s) => {
337                let bytes = s.as_bytes();
338                buf.push(DataType::Text.type_tag());
339                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
340                buf.extend_from_slice(bytes);
341            }
342            Value::Blob(data) => {
343                buf.push(DataType::Blob.type_tag());
344                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
345                buf.extend_from_slice(data);
346            }
347            Value::Null => unreachable!(),
348        }
349    }
350
351    buf
352}
353
354pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
355    buf.clear();
356    let col_count = values.len();
357    let bitmap_bytes = col_count.div_ceil(8);
358
359    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
360
361    let bitmap_start = buf.len();
362    buf.resize(buf.len() + bitmap_bytes, 0);
363
364    for (i, v) in values.iter().enumerate() {
365        if v.is_null() {
366            buf[bitmap_start + i / 8] |= 1 << (i % 8);
367            continue;
368        }
369        match v {
370            Value::Integer(val) => {
371                buf.push(DataType::Integer.type_tag());
372                buf.extend_from_slice(&8u32.to_le_bytes());
373                buf.extend_from_slice(&val.to_le_bytes());
374            }
375            Value::Real(r) => {
376                buf.push(DataType::Real.type_tag());
377                buf.extend_from_slice(&8u32.to_le_bytes());
378                buf.extend_from_slice(&r.to_le_bytes());
379            }
380            Value::Boolean(b) => {
381                buf.push(DataType::Boolean.type_tag());
382                buf.extend_from_slice(&1u32.to_le_bytes());
383                buf.push(if *b { 1 } else { 0 });
384            }
385            Value::Text(s) => {
386                let bytes = s.as_bytes();
387                buf.push(DataType::Text.type_tag());
388                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
389                buf.extend_from_slice(bytes);
390            }
391            Value::Blob(data) => {
392                buf.push(DataType::Blob.type_tag());
393                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
394                buf.extend_from_slice(data);
395            }
396            Value::Null => unreachable!(),
397        }
398    }
399}
400
401fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
402    match DataType::from_tag(type_tag) {
403        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
404            data[..8].try_into().unwrap(),
405        ))),
406        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
407            data[..8].try_into().unwrap(),
408        ))),
409        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
410        Some(DataType::Text) => {
411            let s = std::str::from_utf8(data)
412                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
413            Ok(Value::Text(CompactString::from(s)))
414        }
415        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
416        _ => Err(SqlError::InvalidValue(format!(
417            "unknown column type tag: {type_tag}"
418        ))),
419    }
420}
421
422fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
423    if data.len() < 2 {
424        return Err(SqlError::InvalidValue("row data too short".into()));
425    }
426    let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
427    let bitmap_bytes = col_count.div_ceil(8);
428    let pos = 2;
429    if data.len() < pos + bitmap_bytes {
430        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
431    }
432    Ok((
433        col_count,
434        &data[pos..pos + bitmap_bytes],
435        pos + bitmap_bytes,
436    ))
437}
438
439pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
440    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
441
442    let mut values = Vec::with_capacity(col_count);
443    for i in 0..col_count {
444        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
445            values.push(Value::Null);
446            continue;
447        }
448
449        if pos + 5 > data.len() {
450            return Err(SqlError::InvalidValue("truncated column data".into()));
451        }
452        let type_tag = data[pos];
453        pos += 1;
454        let data_len =
455            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
456        pos += 4;
457
458        if pos + data_len > data.len() {
459            return Err(SqlError::InvalidValue("truncated column value".into()));
460        }
461
462        values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
463        pos += data_len;
464    }
465
466    Ok(values)
467}
468
469/// Returns the number of non-PK columns stored in a row value blob.
470#[inline]
471pub fn row_non_pk_count(data: &[u8]) -> usize {
472    u16::from_le_bytes([data[0], data[1]]) as usize
473}
474
475pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
476    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
477
478    for i in 0..col_count {
479        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
480            continue;
481        }
482
483        if pos + 5 > data.len() {
484            return Err(SqlError::InvalidValue("truncated column data".into()));
485        }
486        let type_tag = data[pos];
487        pos += 1;
488        let data_len =
489            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
490        pos += 4;
491
492        if pos + data_len > data.len() {
493            return Err(SqlError::InvalidValue("truncated column value".into()));
494        }
495
496        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
497            out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
498        }
499        pos += data_len;
500    }
501
502    Ok(())
503}
504
505pub fn decode_pk_into(
506    key: &[u8],
507    count: usize,
508    out: &mut [Value],
509    pk_mapping: &[usize],
510) -> Result<()> {
511    let mut pos = 0;
512    for i in 0..count {
513        let (v, n) = decode_key_value(&key[pos..])?;
514        if i < pk_mapping.len() {
515            out[pk_mapping[i]] = v;
516        }
517        pos += n;
518    }
519    Ok(())
520}
521
522pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
523    if targets.is_empty() {
524        return Ok(Vec::new());
525    }
526    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
527
528    let mut results = Vec::with_capacity(targets.len());
529    let mut ti = 0;
530
531    for col in 0..col_count {
532        if ti >= targets.len() {
533            break;
534        }
535        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
536
537        if col == targets[ti] {
538            if is_null {
539                results.push(Value::Null);
540            } else {
541                if pos + 5 > data.len() {
542                    return Err(SqlError::InvalidValue("truncated column data".into()));
543                }
544                let type_tag = data[pos];
545                pos += 1;
546                let data_len =
547                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
548                        as usize;
549                pos += 4;
550                if pos + data_len > data.len() {
551                    return Err(SqlError::InvalidValue("truncated column value".into()));
552                }
553                results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
554                pos += data_len;
555            }
556            ti += 1;
557        } else if !is_null {
558            if pos + 5 > data.len() {
559                return Err(SqlError::InvalidValue("truncated column data".into()));
560            }
561            let data_len =
562                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
563                    as usize;
564            pos += 5 + data_len;
565        }
566    }
567
568    // Targets beyond stored column count get Null
569    while ti < targets.len() {
570        results.push(Value::Null);
571        ti += 1;
572    }
573
574    Ok(results)
575}
576
577pub fn decode_columns_into(
578    data: &[u8],
579    targets: &[usize],
580    schema_cols: &[usize],
581    row: &mut [Value],
582) -> Result<()> {
583    if targets.is_empty() {
584        return Ok(());
585    }
586    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
587
588    let mut ti = 0;
589    for col in 0..col_count {
590        if ti >= targets.len() {
591            break;
592        }
593        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
594
595        if col == targets[ti] {
596            if is_null {
597                row[schema_cols[ti]] = Value::Null;
598            } else {
599                if pos + 5 > data.len() {
600                    return Err(SqlError::InvalidValue("truncated column data".into()));
601                }
602                let type_tag = data[pos];
603                pos += 1;
604                let data_len =
605                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
606                        as usize;
607                pos += 4;
608                if pos + data_len > data.len() {
609                    return Err(SqlError::InvalidValue("truncated column value".into()));
610                }
611                row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
612                pos += data_len;
613            }
614            ti += 1;
615        } else if !is_null {
616            if pos + 5 > data.len() {
617                return Err(SqlError::InvalidValue("truncated column data".into()));
618            }
619            let data_len =
620                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
621                    as usize;
622            pos += 5 + data_len;
623        }
624    }
625
626    Ok(())
627}
628
629#[derive(Debug, Clone, Copy)]
630pub enum RawColumn<'a> {
631    Null,
632    Integer(i64),
633    Real(f64),
634    Boolean(bool),
635    Text(&'a str),
636    Blob(&'a [u8]),
637}
638
639impl<'a> RawColumn<'a> {
640    pub fn to_value(self) -> Value {
641        match self {
642            RawColumn::Null => Value::Null,
643            RawColumn::Integer(i) => Value::Integer(i),
644            RawColumn::Real(r) => Value::Real(r),
645            RawColumn::Boolean(b) => Value::Boolean(b),
646            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
647            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
648        }
649    }
650
651    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
652        use std::cmp::Ordering;
653        match (self, other) {
654            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
655            (RawColumn::Null, _) | (_, Value::Null) => None,
656            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
657            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
658            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
659            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
660            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
661            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
662            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
663            _ => None,
664        }
665    }
666
667    pub fn eq_value(&self, other: &Value) -> bool {
668        match (self, other) {
669            (RawColumn::Null, Value::Null) => true,
670            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
671            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
672            (RawColumn::Real(a), Value::Real(b)) => a == b,
673            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
674            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
675            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
676            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
677            _ => false,
678        }
679    }
680
681    pub fn as_f64(&self) -> Option<f64> {
682        match self {
683            RawColumn::Integer(i) => Some(*i as f64),
684            RawColumn::Real(r) => Some(*r),
685            _ => None,
686        }
687    }
688
689    pub fn as_i64(&self) -> Option<i64> {
690        match self {
691            RawColumn::Integer(i) => Some(*i),
692            _ => None,
693        }
694    }
695}
696
697fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
698    match DataType::from_tag(type_tag) {
699        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
700            data[..8].try_into().unwrap(),
701        ))),
702        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
703            data[..8].try_into().unwrap(),
704        ))),
705        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
706        Some(DataType::Text) => {
707            let s = std::str::from_utf8(data)
708                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
709            Ok(RawColumn::Text(s))
710        }
711        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
712        _ => Err(SqlError::InvalidValue(format!(
713            "unknown column type tag: {type_tag}"
714        ))),
715    }
716}
717
718pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
719    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
720    if target >= col_count {
721        return Ok(RawColumn::Null);
722    }
723
724    for col in 0..=target {
725        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
726
727        if col == target {
728            if is_null {
729                return Ok(RawColumn::Null);
730            }
731            if pos + 5 > data.len() {
732                return Err(SqlError::InvalidValue("truncated column data".into()));
733            }
734            let type_tag = data[pos];
735            pos += 1;
736            let data_len =
737                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
738                    as usize;
739            pos += 4;
740            if pos + data_len > data.len() {
741                return Err(SqlError::InvalidValue("truncated column value".into()));
742            }
743            return decode_value_raw(type_tag, &data[pos..pos + data_len]);
744        } else if !is_null {
745            if pos + 5 > data.len() {
746                return Err(SqlError::InvalidValue("truncated column data".into()));
747            }
748            let data_len =
749                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
750                    as usize;
751            pos += 5 + data_len;
752        }
753    }
754
755    unreachable!()
756}
757
758pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
759    if key.is_empty() || key[0] != TAG_INTEGER {
760        return Err(SqlError::InvalidValue("not an integer key".into()));
761    }
762    let (val, _) = decode_integer(&key[1..])?;
763    match val {
764        Value::Integer(i) => Ok(i),
765        _ => unreachable!(),
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772
773    // ── Key encoding tests ──────────────────────────────────────────
774
775    #[test]
776    fn key_null() {
777        let encoded = encode_key_value(&Value::Null);
778        let (decoded, n) = decode_key_value(&encoded).unwrap();
779        assert_eq!(n, 1);
780        assert_eq!(decoded, Value::Null);
781    }
782
783    #[test]
784    fn key_boolean() {
785        let f_enc = encode_key_value(&Value::Boolean(false));
786        let t_enc = encode_key_value(&Value::Boolean(true));
787        assert!(f_enc < t_enc);
788
789        let (f_dec, _) = decode_key_value(&f_enc).unwrap();
790        let (t_dec, _) = decode_key_value(&t_enc).unwrap();
791        assert_eq!(f_dec, Value::Boolean(false));
792        assert_eq!(t_dec, Value::Boolean(true));
793    }
794
795    #[test]
796    fn key_integer_roundtrip() {
797        let test_values = [
798            i64::MIN,
799            -1_000_000,
800            -256,
801            -1,
802            0,
803            1,
804            127,
805            128,
806            255,
807            256,
808            65535,
809            1_000_000,
810            i64::MAX,
811        ];
812        for &v in &test_values {
813            let encoded = encode_key_value(&Value::Integer(v));
814            let (decoded, _) = decode_key_value(&encoded).unwrap();
815            assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
816        }
817    }
818
819    #[test]
820    fn key_integer_sort_order() {
821        let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
822        let encoded: Vec<Vec<u8>> = values
823            .iter()
824            .map(|&v| encode_key_value(&Value::Integer(v)))
825            .collect();
826
827        for i in 0..encoded.len() - 1 {
828            assert!(
829                encoded[i] < encoded[i + 1],
830                "sort order broken: {} vs {}",
831                values[i],
832                values[i + 1]
833            );
834        }
835    }
836
837    #[test]
838    fn key_real_roundtrip() {
839        let test_values = [
840            f64::NEG_INFINITY,
841            -1e100,
842            -1.0,
843            -f64::MIN_POSITIVE,
844            -0.0,
845            0.0,
846            f64::MIN_POSITIVE,
847            0.5,
848            1.0,
849            1e100,
850            f64::INFINITY,
851        ];
852        for &v in &test_values {
853            let encoded = encode_key_value(&Value::Real(v));
854            let (decoded, _) = decode_key_value(&encoded).unwrap();
855            match decoded {
856                Value::Real(r) => {
857                    assert!(
858                        v.to_bits() == r.to_bits(),
859                        "roundtrip failed for {v}: got {r}"
860                    );
861                }
862                _ => panic!("expected Real"),
863            }
864        }
865    }
866
867    #[test]
868    fn key_real_sort_order() {
869        let values = [
870            f64::NEG_INFINITY,
871            -100.0,
872            -1.0,
873            -0.0,
874            0.0,
875            1.0,
876            100.0,
877            f64::INFINITY,
878        ];
879        let encoded: Vec<Vec<u8>> = values
880            .iter()
881            .map(|&v| encode_key_value(&Value::Real(v)))
882            .collect();
883
884        for i in 0..encoded.len() - 1 {
885            assert!(
886                encoded[i] <= encoded[i + 1],
887                "sort order broken: {} vs {}",
888                values[i],
889                values[i + 1]
890            );
891        }
892    }
893
894    #[test]
895    fn key_text_roundtrip() {
896        let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
897        for &v in &test_values {
898            let encoded = encode_key_value(&Value::Text(v.into()));
899            let (decoded, _) = decode_key_value(&encoded).unwrap();
900            assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
901        }
902    }
903
904    #[test]
905    fn key_text_sort_order() {
906        let values = ["", "a", "ab", "b", "ba", "z"];
907        let encoded: Vec<Vec<u8>> = values
908            .iter()
909            .map(|&v| encode_key_value(&Value::Text(v.into())))
910            .collect();
911
912        for i in 0..encoded.len() - 1 {
913            assert!(
914                encoded[i] < encoded[i + 1],
915                "sort order broken: {:?} vs {:?}",
916                values[i],
917                values[i + 1]
918            );
919        }
920    }
921
922    #[test]
923    fn key_blob_roundtrip() {
924        let test_values: Vec<Vec<u8>> = vec![
925            vec![],
926            vec![0x00],
927            vec![0x00, 0xFF],
928            vec![0xFF, 0x00],
929            vec![0x00, 0x00, 0x00],
930        ];
931        for v in &test_values {
932            let encoded = encode_key_value(&Value::Blob(v.clone()));
933            let (decoded, _) = decode_key_value(&encoded).unwrap();
934            assert_eq!(decoded, Value::Blob(v.clone()));
935        }
936    }
937
938    #[test]
939    fn key_composite_roundtrip() {
940        let values = vec![
941            Value::Integer(42),
942            Value::Text("hello".into()),
943            Value::Boolean(true),
944        ];
945        let encoded = encode_composite_key(&values);
946        let decoded = decode_composite_key(&encoded, 3).unwrap();
947        assert_eq!(decoded[0], Value::Integer(42));
948        assert_eq!(decoded[1], Value::Text("hello".into()));
949        assert_eq!(decoded[2], Value::Boolean(true));
950    }
951
952    #[test]
953    fn key_composite_sort_order() {
954        // Composite keys: (1, "b") < (1, "c") < (2, "a")
955        let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
956        let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
957        let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
958        assert!(k1 < k2);
959        assert!(k2 < k3);
960    }
961
962    #[test]
963    fn key_cross_type_ordering() {
964        let null = encode_key_value(&Value::Null);
965        let bool_val = encode_key_value(&Value::Boolean(false));
966        let int = encode_key_value(&Value::Integer(0));
967        let text = encode_key_value(&Value::Text("".into()));
968        let blob = encode_key_value(&Value::Blob(vec![]));
969
970        assert!(null < blob);
971        assert!(blob < text);
972        assert!(text < bool_val);
973        assert!(bool_val < int);
974    }
975
976    // ── Row encoding tests ──────────────────────────────────────────
977
978    #[test]
979    fn row_roundtrip_simple() {
980        let values = vec![
981            Value::Integer(42),
982            Value::Text("hello".into()),
983            Value::Boolean(true),
984        ];
985        let encoded = encode_row(&values);
986        let decoded = decode_row(&encoded).unwrap();
987        assert_eq!(decoded.len(), 3);
988        assert_eq!(decoded[0], Value::Integer(42));
989        assert_eq!(decoded[1], Value::Text("hello".into()));
990        assert_eq!(decoded[2], Value::Boolean(true));
991    }
992
993    #[test]
994    fn row_roundtrip_with_nulls() {
995        let values = vec![
996            Value::Integer(1),
997            Value::Null,
998            Value::Text("test".into()),
999            Value::Null,
1000        ];
1001        let encoded = encode_row(&values);
1002        let decoded = decode_row(&encoded).unwrap();
1003        assert_eq!(decoded.len(), 4);
1004        assert_eq!(decoded[0], Value::Integer(1));
1005        assert!(decoded[1].is_null());
1006        assert_eq!(decoded[2], Value::Text("test".into()));
1007        assert!(decoded[3].is_null());
1008    }
1009
1010    #[test]
1011    fn row_roundtrip_empty() {
1012        let values: Vec<Value> = vec![];
1013        let encoded = encode_row(&values);
1014        let decoded = decode_row(&encoded).unwrap();
1015        assert!(decoded.is_empty());
1016    }
1017
1018    #[test]
1019    fn row_roundtrip_all_types() {
1020        let values = vec![
1021            Value::Integer(-100),
1022            Value::Real(3.15),
1023            Value::Text("hello world".into()),
1024            Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1025            Value::Boolean(false),
1026            Value::Null,
1027        ];
1028        let encoded = encode_row(&values);
1029        let decoded = decode_row(&encoded).unwrap();
1030        assert_eq!(decoded.len(), 6);
1031        assert_eq!(decoded[0], Value::Integer(-100));
1032        assert_eq!(decoded[1], Value::Real(3.15));
1033        assert_eq!(decoded[2], Value::Text("hello world".into()));
1034        assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1035        assert_eq!(decoded[4], Value::Boolean(false));
1036        assert!(decoded[5].is_null());
1037    }
1038
1039    #[test]
1040    fn null_escaped_with_embedded_nulls() {
1041        let text = "before\0after";
1042        let encoded = encode_key_value(&Value::Text(text.into()));
1043        let (decoded, _) = decode_key_value(&encoded).unwrap();
1044        assert_eq!(decoded, Value::Text(text.into()));
1045    }
1046
1047    #[test]
1048    fn key_integer_edge_cases() {
1049        for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1050            let encoded = encode_key_value(&Value::Integer(v));
1051            let (decoded, n) = decode_key_value(&encoded).unwrap();
1052            assert_eq!(n, encoded.len());
1053            assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1054        }
1055    }
1056
1057    #[test]
1058    fn decode_columns_single() {
1059        let values = vec![
1060            Value::Integer(42),
1061            Value::Text("hello".into()),
1062            Value::Boolean(true),
1063        ];
1064        let encoded = encode_row(&values);
1065        let cols = decode_columns(&encoded, &[1]).unwrap();
1066        assert_eq!(cols.len(), 1);
1067        assert_eq!(cols[0], Value::Text("hello".into()));
1068    }
1069
1070    #[test]
1071    fn decode_columns_multiple() {
1072        let values = vec![
1073            Value::Integer(1),
1074            Value::Real(2.5),
1075            Value::Text("skip".into()),
1076            Value::Boolean(false),
1077            Value::Blob(vec![0xAB]),
1078        ];
1079        let encoded = encode_row(&values);
1080        let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1081        assert_eq!(cols.len(), 3);
1082        assert_eq!(cols[0], Value::Integer(1));
1083        assert_eq!(cols[1], Value::Boolean(false));
1084        assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1085    }
1086
1087    #[test]
1088    fn decode_columns_with_nulls() {
1089        let values = vec![
1090            Value::Integer(10),
1091            Value::Null,
1092            Value::Text("after_null".into()),
1093            Value::Null,
1094            Value::Boolean(true),
1095        ];
1096        let encoded = encode_row(&values);
1097        let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1098        assert_eq!(cols.len(), 3);
1099        assert!(cols[0].is_null());
1100        assert_eq!(cols[1], Value::Text("after_null".into()));
1101        assert_eq!(cols[2], Value::Boolean(true));
1102    }
1103
1104    #[test]
1105    fn decode_columns_first_and_last() {
1106        let values = vec![
1107            Value::Text("first".into()),
1108            Value::Integer(99),
1109            Value::Boolean(false),
1110            Value::Real(3.125),
1111        ];
1112        let encoded = encode_row(&values);
1113        let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1114        assert_eq!(cols.len(), 2);
1115        assert_eq!(cols[0], Value::Text("first".into()));
1116        assert_eq!(cols[1], Value::Real(3.125));
1117    }
1118
1119    #[test]
1120    fn decode_columns_empty_targets() {
1121        let values = vec![Value::Integer(1)];
1122        let encoded = encode_row(&values);
1123        let cols = decode_columns(&encoded, &[]).unwrap();
1124        assert!(cols.is_empty());
1125    }
1126
1127    #[test]
1128    fn decode_columns_all_matches_full_decode() {
1129        let values = vec![
1130            Value::Integer(-100),
1131            Value::Real(3.15),
1132            Value::Text("hello world".into()),
1133            Value::Blob(vec![0xDE, 0xAD]),
1134            Value::Boolean(false),
1135            Value::Null,
1136        ];
1137        let encoded = encode_row(&values);
1138        let full = decode_row(&encoded).unwrap();
1139        let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1140        assert_eq!(full, selective);
1141    }
1142
1143    #[test]
1144    fn raw_column_integer() {
1145        let values = vec![Value::Integer(42), Value::Text("hello".into())];
1146        let encoded = encode_row(&values);
1147        let raw = decode_column_raw(&encoded, 0).unwrap();
1148        assert!(matches!(raw, RawColumn::Integer(42)));
1149        assert_eq!(raw.to_value(), Value::Integer(42));
1150    }
1151
1152    #[test]
1153    fn raw_column_text_borrows() {
1154        let values = vec![Value::Integer(1), Value::Text("hello".into())];
1155        let encoded = encode_row(&values);
1156        let raw = decode_column_raw(&encoded, 1).unwrap();
1157        match raw {
1158            RawColumn::Text(s) => assert_eq!(s, "hello"),
1159            other => panic!("expected Text, got {other:?}"),
1160        }
1161    }
1162
1163    #[test]
1164    fn raw_column_null() {
1165        let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1166        let encoded = encode_row(&values);
1167        let raw = decode_column_raw(&encoded, 1).unwrap();
1168        assert!(matches!(raw, RawColumn::Null));
1169    }
1170
1171    #[test]
1172    fn raw_column_last() {
1173        let values = vec![
1174            Value::Integer(1),
1175            Value::Text("skip".into()),
1176            Value::Real(3.15),
1177        ];
1178        let encoded = encode_row(&values);
1179        let raw = decode_column_raw(&encoded, 2).unwrap();
1180        match raw {
1181            RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1182            other => panic!("expected Real, got {other:?}"),
1183        }
1184    }
1185
1186    #[test]
1187    fn raw_column_out_of_bounds_returns_null() {
1188        let values = vec![Value::Integer(1)];
1189        let encoded = encode_row(&values);
1190        assert!(matches!(
1191            decode_column_raw(&encoded, 1).unwrap(),
1192            RawColumn::Null
1193        ));
1194    }
1195
1196    #[test]
1197    fn raw_column_eq_value() {
1198        let raw_int = RawColumn::Integer(42);
1199        assert!(raw_int.eq_value(&Value::Integer(42)));
1200        assert!(!raw_int.eq_value(&Value::Integer(43)));
1201        assert!(raw_int.eq_value(&Value::Real(42.0)));
1202
1203        let raw_text = RawColumn::Text("hello");
1204        assert!(raw_text.eq_value(&Value::Text("hello".into())));
1205        assert!(!raw_text.eq_value(&Value::Text("world".into())));
1206    }
1207
1208    #[test]
1209    fn raw_column_cmp_value() {
1210        use std::cmp::Ordering;
1211        let raw = RawColumn::Integer(42);
1212        assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1213        assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1214        assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1215        assert_eq!(raw.cmp_value(&Value::Null), None);
1216    }
1217
1218    #[test]
1219    fn raw_column_as_numeric() {
1220        assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1221        assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1222        assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1223        assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1224        assert_eq!(RawColumn::Text("x").as_f64(), None);
1225        assert_eq!(RawColumn::Null.as_i64(), None);
1226    }
1227
1228    #[test]
1229    fn decode_pk_integer_roundtrip() {
1230        for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1231            let encoded = encode_key_value(&Value::Integer(v));
1232            let decoded = decode_pk_integer(&encoded).unwrap();
1233            assert_eq!(decoded, v);
1234        }
1235    }
1236
1237    #[test]
1238    fn decode_pk_integer_rejects_non_integer() {
1239        let encoded = encode_key_value(&Value::Text("hello".into()));
1240        assert!(decode_pk_integer(&encoded).is_err());
1241    }
1242
1243    #[test]
1244    fn raw_column_blob() {
1245        let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1246        let encoded = encode_row(&values);
1247        let raw = decode_column_raw(&encoded, 0).unwrap();
1248        match raw {
1249            RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1250            other => panic!("expected Blob, got {other:?}"),
1251        }
1252    }
1253
1254    #[test]
1255    fn raw_column_matches_full_decode() {
1256        let values = vec![
1257            Value::Integer(-100),
1258            Value::Real(3.15),
1259            Value::Text("hello world".into()),
1260            Value::Blob(vec![0xDE, 0xAD]),
1261            Value::Boolean(false),
1262            Value::Null,
1263        ];
1264        let encoded = encode_row(&values);
1265        let full = decode_row(&encoded).unwrap();
1266        for (i, expected) in full.iter().enumerate() {
1267            let raw = decode_column_raw(&encoded, i).unwrap();
1268            assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1269        }
1270    }
1271}