Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding (tuple layer)
2//! and row encoding for non-PK column storage.
3
4use crate::error::{Result, SqlError};
5use crate::types::{CompactString, DataType, Value};
6
7// ── Key encoding (order-preserving) ─────────────────────────────────
8
9/// Type tag bytes for key encoding. Ordering: NULL < BLOB < TEXT < BOOLEAN < INTEGER < REAL
10const TAG_NULL: u8 = 0x00;
11const TAG_BLOB: u8 = 0x01;
12const TAG_TEXT: u8 = 0x02;
13const TAG_BOOLEAN: u8 = 0x03;
14const TAG_INTEGER: u8 = 0x04;
15const TAG_REAL: u8 = 0x05;
16
17/// Encode a single value into an order-preserving byte sequence.
18pub fn encode_key_value(value: &Value) -> Vec<u8> {
19    match value {
20        Value::Null => vec![TAG_NULL],
21        Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
22        Value::Integer(i) => encode_integer(*i),
23        Value::Real(r) => encode_real(*r),
24        Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
25        Value::Blob(b) => encode_bytes(TAG_BLOB, b),
26    }
27}
28
29/// Encode a composite key (multiple values concatenated).
30pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31    let mut buf = Vec::new();
32    for v in values {
33        buf.extend_from_slice(&encode_key_value(v));
34    }
35    buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39    buf.clear();
40    for v in values {
41        encode_key_value_into(v, buf);
42    }
43}
44
45fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
46    match value {
47        Value::Null => buf.push(TAG_NULL),
48        Value::Boolean(b) => {
49            buf.push(TAG_BOOLEAN);
50            buf.push(if *b { 0x01 } else { 0x00 });
51        }
52        Value::Integer(i) => encode_integer_into(*i, buf),
53        Value::Real(r) => encode_real_into(*r, buf),
54        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
55        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
56    }
57}
58
59fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
60    buf.push(TAG_INTEGER);
61    if val == 0 {
62        buf.push(0x80);
63        return;
64    }
65    if val > 0 {
66        let bytes = val.to_be_bytes();
67        let start = bytes.iter().position(|&b| b != 0).unwrap();
68        let byte_count = (8 - start) as u8;
69        buf.push(0x80 + byte_count);
70        buf.extend_from_slice(&bytes[start..]);
71    } else {
72        let abs_val = if val == i64::MIN {
73            u64::MAX / 2 + 1
74        } else {
75            (-val) as u64
76        };
77        let bytes = abs_val.to_be_bytes();
78        let start = bytes.iter().position(|&b| b != 0).unwrap();
79        let byte_count = (8 - start) as u8;
80        buf.push(0x80 - byte_count);
81        for &b in &bytes[start..] {
82            buf.push(!b);
83        }
84    }
85}
86
87fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
88    buf.push(TAG_REAL);
89    let bits = val.to_bits();
90    let encoded = if val.is_sign_negative() {
91        !bits
92    } else {
93        bits ^ (1u64 << 63)
94    };
95    buf.extend_from_slice(&encoded.to_be_bytes());
96}
97
98fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
99    buf.push(tag);
100    for &b in data {
101        if b == 0x00 {
102            buf.push(0x00);
103            buf.push(0xFF);
104        } else {
105            buf.push(b);
106        }
107    }
108    buf.push(0x00);
109}
110
111/// Decode a single key value, returning the value and the number of bytes consumed.
112pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
113    if data.is_empty() {
114        return Err(SqlError::InvalidValue("empty key data".into()));
115    }
116    match data[0] {
117        TAG_NULL => Ok((Value::Null, 1)),
118        TAG_BOOLEAN => {
119            if data.len() < 2 {
120                return Err(SqlError::InvalidValue("truncated boolean".into()));
121            }
122            Ok((Value::Boolean(data[1] != 0), 2))
123        }
124        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
125        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
126        TAG_TEXT => {
127            let (bytes, n) = decode_null_escaped(&data[1..])?;
128            let s = String::from_utf8(bytes)
129                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
130            Ok((Value::Text(CompactString::from(s)), n + 1))
131        }
132        TAG_BLOB => {
133            let (bytes, n) = decode_null_escaped(&data[1..])?;
134            Ok((Value::Blob(bytes), n + 1))
135        }
136        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
137    }
138}
139
140/// Decode a composite key into multiple values.
141pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
142    let mut values = Vec::with_capacity(count);
143    let mut pos = 0;
144    for _ in 0..count {
145        let (v, n) = decode_key_value(&data[pos..])?;
146        values.push(v);
147        pos += n;
148    }
149    Ok(values)
150}
151
152// ── Integer encoding (variable-width) ───────────────────────────────
153
154fn encode_integer(val: i64) -> Vec<u8> {
155    let mut buf = vec![TAG_INTEGER];
156    if val == 0 {
157        buf.push(0x80);
158        return buf;
159    }
160    if val > 0 {
161        let bytes = val.to_be_bytes();
162        // Find first non-zero byte
163        let start = bytes.iter().position(|&b| b != 0).unwrap();
164        let byte_count = (8 - start) as u8;
165        buf.push(0x80 + byte_count);
166        buf.extend_from_slice(&bytes[start..]);
167    } else {
168        // Negative: one's complement of absolute value
169        let abs_val = if val == i64::MIN {
170            // Special case: |i64::MIN| doesn't fit in i64
171            u64::MAX / 2 + 1
172        } else {
173            (-val) as u64
174        };
175        let bytes = abs_val.to_be_bytes();
176        let start = bytes.iter().position(|&b| b != 0).unwrap();
177        let byte_count = (8 - start) as u8;
178        buf.push(0x80 - byte_count);
179        // One's complement: invert all bits
180        for &b in &bytes[start..] {
181            buf.push(!b);
182        }
183    }
184    buf
185}
186
187fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
188    if data.is_empty() {
189        return Err(SqlError::InvalidValue("truncated integer".into()));
190    }
191    let marker = data[0];
192    if marker == 0x80 {
193        return Ok((Value::Integer(0), 1));
194    }
195    if marker > 0x80 {
196        // Positive
197        let byte_count = (marker - 0x80) as usize;
198        if data.len() < 1 + byte_count {
199            return Err(SqlError::InvalidValue("truncated positive integer".into()));
200        }
201        let mut bytes = [0u8; 8];
202        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
203        let val = i64::from_be_bytes(bytes);
204        Ok((Value::Integer(val), 1 + byte_count))
205    } else {
206        // Negative
207        let byte_count = (0x80 - marker) as usize;
208        if data.len() < 1 + byte_count {
209            return Err(SqlError::InvalidValue("truncated negative integer".into()));
210        }
211        let mut bytes = [0u8; 8];
212        for i in 0..byte_count {
213            bytes[8 - byte_count + i] = !data[1 + i];
214        }
215        let abs_val = u64::from_be_bytes(bytes);
216        // Use wrapping negation to handle i64::MIN correctly
217        let val = (-(abs_val as i128)) as i64;
218        Ok((Value::Integer(val), 1 + byte_count))
219    }
220}
221
222// ── Real encoding (IEEE 754 sign-bit manipulation) ──────────────────
223
224fn encode_real(val: f64) -> Vec<u8> {
225    let mut buf = vec![TAG_REAL];
226    let bits = val.to_bits();
227    let encoded = if val.is_sign_negative() {
228        // Negative (including -0.0): flip ALL bits
229        !bits
230    } else {
231        // Positive (including +0.0): flip sign bit only
232        bits ^ (1u64 << 63)
233    };
234    buf.extend_from_slice(&encoded.to_be_bytes());
235    buf
236}
237
238fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
239    if data.len() < 8 {
240        return Err(SqlError::InvalidValue("truncated real".into()));
241    }
242    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
243    let bits = if encoded & (1u64 << 63) != 0 {
244        // Was positive: undo sign bit flip
245        encoded ^ (1u64 << 63)
246    } else {
247        // Was negative: undo full inversion
248        !encoded
249    };
250    let val = f64::from_bits(bits);
251    Ok((Value::Real(val), 8))
252}
253
254// ── Null-escaped byte encoding ──────────────────────────────────────
255
256/// Encode bytes with null-escape: 0x00 → 0x00 0xFF, terminated by bare 0x00.
257fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
258    let mut buf = Vec::with_capacity(data.len() + 2);
259    buf.push(tag);
260    for &b in data {
261        if b == 0x00 {
262            buf.push(0x00);
263            buf.push(0xFF);
264        } else {
265            buf.push(b);
266        }
267    }
268    buf.push(0x00); // terminator
269    buf
270}
271
272/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
273fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
274    let mut result = Vec::new();
275    let mut i = 0;
276    while i < data.len() {
277        if data[i] == 0x00 {
278            if i + 1 < data.len() && data[i + 1] == 0xFF {
279                result.push(0x00);
280                i += 2;
281            } else {
282                return Ok((result, i + 1)); // terminator consumed
283            }
284        } else {
285            result.push(data[i]);
286            i += 1;
287        }
288    }
289    Err(SqlError::InvalidValue(
290        "unterminated null-escaped string".into(),
291    ))
292}
293
294// ── Row encoding (for B+ tree values — non-PK columns) ─────────────
295
296/// Encode non-PK column values into a row.
297/// Format: [col_count: u16][null_bitmap][per-column: data_type(u8) + data_len(u32) + data]
298pub fn encode_row(values: &[Value]) -> Vec<u8> {
299    let col_count = values.len();
300    let bitmap_bytes = col_count.div_ceil(8);
301    let mut buf = Vec::new();
302
303    // Column count
304    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
305
306    // Null bitmap
307    let mut bitmap = vec![0u8; bitmap_bytes];
308    for (i, v) in values.iter().enumerate() {
309        if v.is_null() {
310            bitmap[i / 8] |= 1 << (i % 8);
311        }
312    }
313    buf.extend_from_slice(&bitmap);
314
315    // Column data
316    for v in values {
317        if v.is_null() {
318            continue;
319        }
320        match v {
321            Value::Integer(i) => {
322                buf.push(DataType::Integer.type_tag());
323                buf.extend_from_slice(&8u32.to_le_bytes());
324                buf.extend_from_slice(&i.to_le_bytes());
325            }
326            Value::Real(r) => {
327                buf.push(DataType::Real.type_tag());
328                buf.extend_from_slice(&8u32.to_le_bytes());
329                buf.extend_from_slice(&r.to_le_bytes());
330            }
331            Value::Boolean(b) => {
332                buf.push(DataType::Boolean.type_tag());
333                buf.extend_from_slice(&1u32.to_le_bytes());
334                buf.push(if *b { 1 } else { 0 });
335            }
336            Value::Text(s) => {
337                let bytes = s.as_bytes();
338                buf.push(DataType::Text.type_tag());
339                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
340                buf.extend_from_slice(bytes);
341            }
342            Value::Blob(data) => {
343                buf.push(DataType::Blob.type_tag());
344                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
345                buf.extend_from_slice(data);
346            }
347            Value::Null => unreachable!(),
348        }
349    }
350
351    buf
352}
353
354pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
355    buf.clear();
356    let col_count = values.len();
357    let bitmap_bytes = col_count.div_ceil(8);
358
359    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
360
361    let bitmap_start = buf.len();
362    buf.resize(buf.len() + bitmap_bytes, 0);
363
364    for (i, v) in values.iter().enumerate() {
365        if v.is_null() {
366            buf[bitmap_start + i / 8] |= 1 << (i % 8);
367            continue;
368        }
369        match v {
370            Value::Integer(val) => {
371                buf.push(DataType::Integer.type_tag());
372                buf.extend_from_slice(&8u32.to_le_bytes());
373                buf.extend_from_slice(&val.to_le_bytes());
374            }
375            Value::Real(r) => {
376                buf.push(DataType::Real.type_tag());
377                buf.extend_from_slice(&8u32.to_le_bytes());
378                buf.extend_from_slice(&r.to_le_bytes());
379            }
380            Value::Boolean(b) => {
381                buf.push(DataType::Boolean.type_tag());
382                buf.extend_from_slice(&1u32.to_le_bytes());
383                buf.push(if *b { 1 } else { 0 });
384            }
385            Value::Text(s) => {
386                let bytes = s.as_bytes();
387                buf.push(DataType::Text.type_tag());
388                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
389                buf.extend_from_slice(bytes);
390            }
391            Value::Blob(data) => {
392                buf.push(DataType::Blob.type_tag());
393                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
394                buf.extend_from_slice(data);
395            }
396            Value::Null => unreachable!(),
397        }
398    }
399}
400
401fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
402    match DataType::from_tag(type_tag) {
403        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
404            data[..8].try_into().unwrap(),
405        ))),
406        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
407            data[..8].try_into().unwrap(),
408        ))),
409        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
410        Some(DataType::Text) => {
411            let s = std::str::from_utf8(data)
412                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
413            Ok(Value::Text(CompactString::from(s)))
414        }
415        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
416        _ => Err(SqlError::InvalidValue(format!(
417            "unknown column type tag: {type_tag}"
418        ))),
419    }
420}
421
422fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
423    if data.len() < 2 {
424        return Err(SqlError::InvalidValue("row data too short".into()));
425    }
426    let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
427    let bitmap_bytes = col_count.div_ceil(8);
428    let pos = 2;
429    if data.len() < pos + bitmap_bytes {
430        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
431    }
432    Ok((
433        col_count,
434        &data[pos..pos + bitmap_bytes],
435        pos + bitmap_bytes,
436    ))
437}
438
439pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
440    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
441
442    let mut values = Vec::with_capacity(col_count);
443    for i in 0..col_count {
444        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
445            values.push(Value::Null);
446            continue;
447        }
448
449        if pos + 5 > data.len() {
450            return Err(SqlError::InvalidValue("truncated column data".into()));
451        }
452        let type_tag = data[pos];
453        pos += 1;
454        let data_len =
455            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
456        pos += 4;
457
458        if pos + data_len > data.len() {
459            return Err(SqlError::InvalidValue("truncated column value".into()));
460        }
461
462        values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
463        pos += data_len;
464    }
465
466    Ok(values)
467}
468
469pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
470    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
471
472    for i in 0..col_count {
473        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
474            continue;
475        }
476
477        if pos + 5 > data.len() {
478            return Err(SqlError::InvalidValue("truncated column data".into()));
479        }
480        let type_tag = data[pos];
481        pos += 1;
482        let data_len =
483            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
484        pos += 4;
485
486        if pos + data_len > data.len() {
487            return Err(SqlError::InvalidValue("truncated column value".into()));
488        }
489
490        if i < col_mapping.len() {
491            out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
492        }
493        pos += data_len;
494    }
495
496    Ok(())
497}
498
499pub fn decode_pk_into(
500    key: &[u8],
501    count: usize,
502    out: &mut [Value],
503    pk_mapping: &[usize],
504) -> Result<()> {
505    let mut pos = 0;
506    for i in 0..count {
507        let (v, n) = decode_key_value(&key[pos..])?;
508        if i < pk_mapping.len() {
509            out[pk_mapping[i]] = v;
510        }
511        pos += n;
512    }
513    Ok(())
514}
515
516pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
517    if targets.is_empty() {
518        return Ok(Vec::new());
519    }
520    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
521    if *targets.last().unwrap() >= col_count {
522        return Err(SqlError::InvalidValue("column index out of bounds".into()));
523    }
524
525    let mut results = Vec::with_capacity(targets.len());
526    let mut ti = 0;
527
528    for col in 0..col_count {
529        if ti >= targets.len() {
530            break;
531        }
532        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
533
534        if col == targets[ti] {
535            if is_null {
536                results.push(Value::Null);
537            } else {
538                if pos + 5 > data.len() {
539                    return Err(SqlError::InvalidValue("truncated column data".into()));
540                }
541                let type_tag = data[pos];
542                pos += 1;
543                let data_len =
544                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
545                        as usize;
546                pos += 4;
547                if pos + data_len > data.len() {
548                    return Err(SqlError::InvalidValue("truncated column value".into()));
549                }
550                results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
551                pos += data_len;
552            }
553            ti += 1;
554        } else if !is_null {
555            if pos + 5 > data.len() {
556                return Err(SqlError::InvalidValue("truncated column data".into()));
557            }
558            let data_len =
559                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
560                    as usize;
561            pos += 5 + data_len;
562        }
563    }
564
565    Ok(results)
566}
567
568pub fn decode_columns_into(
569    data: &[u8],
570    targets: &[usize],
571    schema_cols: &[usize],
572    row: &mut [Value],
573) -> Result<()> {
574    if targets.is_empty() {
575        return Ok(());
576    }
577    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
578    if *targets.last().unwrap() >= col_count {
579        return Err(SqlError::InvalidValue("column index out of bounds".into()));
580    }
581
582    let mut ti = 0;
583    for col in 0..col_count {
584        if ti >= targets.len() {
585            break;
586        }
587        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
588
589        if col == targets[ti] {
590            if is_null {
591                row[schema_cols[ti]] = Value::Null;
592            } else {
593                if pos + 5 > data.len() {
594                    return Err(SqlError::InvalidValue("truncated column data".into()));
595                }
596                let type_tag = data[pos];
597                pos += 1;
598                let data_len =
599                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
600                        as usize;
601                pos += 4;
602                if pos + data_len > data.len() {
603                    return Err(SqlError::InvalidValue("truncated column value".into()));
604                }
605                row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
606                pos += data_len;
607            }
608            ti += 1;
609        } else if !is_null {
610            if pos + 5 > data.len() {
611                return Err(SqlError::InvalidValue("truncated column data".into()));
612            }
613            let data_len =
614                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
615                    as usize;
616            pos += 5 + data_len;
617        }
618    }
619
620    Ok(())
621}
622
623#[derive(Debug, Clone, Copy)]
624pub enum RawColumn<'a> {
625    Null,
626    Integer(i64),
627    Real(f64),
628    Boolean(bool),
629    Text(&'a str),
630    Blob(&'a [u8]),
631}
632
633impl<'a> RawColumn<'a> {
634    pub fn to_value(self) -> Value {
635        match self {
636            RawColumn::Null => Value::Null,
637            RawColumn::Integer(i) => Value::Integer(i),
638            RawColumn::Real(r) => Value::Real(r),
639            RawColumn::Boolean(b) => Value::Boolean(b),
640            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
641            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
642        }
643    }
644
645    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
646        use std::cmp::Ordering;
647        match (self, other) {
648            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
649            (RawColumn::Null, _) | (_, Value::Null) => None,
650            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
651            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
652            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
653            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
654            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
655            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
656            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
657            _ => None,
658        }
659    }
660
661    pub fn eq_value(&self, other: &Value) -> bool {
662        match (self, other) {
663            (RawColumn::Null, Value::Null) => true,
664            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
665            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
666            (RawColumn::Real(a), Value::Real(b)) => a == b,
667            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
668            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
669            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
670            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
671            _ => false,
672        }
673    }
674
675    pub fn as_f64(&self) -> Option<f64> {
676        match self {
677            RawColumn::Integer(i) => Some(*i as f64),
678            RawColumn::Real(r) => Some(*r),
679            _ => None,
680        }
681    }
682
683    pub fn as_i64(&self) -> Option<i64> {
684        match self {
685            RawColumn::Integer(i) => Some(*i),
686            _ => None,
687        }
688    }
689}
690
691fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
692    match DataType::from_tag(type_tag) {
693        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
694            data[..8].try_into().unwrap(),
695        ))),
696        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
697            data[..8].try_into().unwrap(),
698        ))),
699        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
700        Some(DataType::Text) => {
701            let s = std::str::from_utf8(data)
702                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
703            Ok(RawColumn::Text(s))
704        }
705        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
706        _ => Err(SqlError::InvalidValue(format!(
707            "unknown column type tag: {type_tag}"
708        ))),
709    }
710}
711
712pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
713    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
714    if target >= col_count {
715        return Err(SqlError::InvalidValue("column index out of bounds".into()));
716    }
717
718    for col in 0..=target {
719        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
720
721        if col == target {
722            if is_null {
723                return Ok(RawColumn::Null);
724            }
725            if pos + 5 > data.len() {
726                return Err(SqlError::InvalidValue("truncated column data".into()));
727            }
728            let type_tag = data[pos];
729            pos += 1;
730            let data_len =
731                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
732                    as usize;
733            pos += 4;
734            if pos + data_len > data.len() {
735                return Err(SqlError::InvalidValue("truncated column value".into()));
736            }
737            return decode_value_raw(type_tag, &data[pos..pos + data_len]);
738        } else if !is_null {
739            if pos + 5 > data.len() {
740                return Err(SqlError::InvalidValue("truncated column data".into()));
741            }
742            let data_len =
743                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
744                    as usize;
745            pos += 5 + data_len;
746        }
747    }
748
749    unreachable!()
750}
751
752pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
753    if key.is_empty() || key[0] != TAG_INTEGER {
754        return Err(SqlError::InvalidValue("not an integer key".into()));
755    }
756    let (val, _) = decode_integer(&key[1..])?;
757    match val {
758        Value::Integer(i) => Ok(i),
759        _ => unreachable!(),
760    }
761}
762
763#[cfg(test)]
764mod tests {
765    use super::*;
766
767    // ── Key encoding tests ──────────────────────────────────────────
768
769    #[test]
770    fn key_null() {
771        let encoded = encode_key_value(&Value::Null);
772        let (decoded, n) = decode_key_value(&encoded).unwrap();
773        assert_eq!(n, 1);
774        assert_eq!(decoded, Value::Null);
775    }
776
777    #[test]
778    fn key_boolean() {
779        let f_enc = encode_key_value(&Value::Boolean(false));
780        let t_enc = encode_key_value(&Value::Boolean(true));
781        assert!(f_enc < t_enc);
782
783        let (f_dec, _) = decode_key_value(&f_enc).unwrap();
784        let (t_dec, _) = decode_key_value(&t_enc).unwrap();
785        assert_eq!(f_dec, Value::Boolean(false));
786        assert_eq!(t_dec, Value::Boolean(true));
787    }
788
789    #[test]
790    fn key_integer_roundtrip() {
791        let test_values = [
792            i64::MIN,
793            -1_000_000,
794            -256,
795            -1,
796            0,
797            1,
798            127,
799            128,
800            255,
801            256,
802            65535,
803            1_000_000,
804            i64::MAX,
805        ];
806        for &v in &test_values {
807            let encoded = encode_key_value(&Value::Integer(v));
808            let (decoded, _) = decode_key_value(&encoded).unwrap();
809            assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
810        }
811    }
812
813    #[test]
814    fn key_integer_sort_order() {
815        let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
816        let encoded: Vec<Vec<u8>> = values
817            .iter()
818            .map(|&v| encode_key_value(&Value::Integer(v)))
819            .collect();
820
821        for i in 0..encoded.len() - 1 {
822            assert!(
823                encoded[i] < encoded[i + 1],
824                "sort order broken: {} vs {}",
825                values[i],
826                values[i + 1]
827            );
828        }
829    }
830
831    #[test]
832    fn key_real_roundtrip() {
833        let test_values = [
834            f64::NEG_INFINITY,
835            -1e100,
836            -1.0,
837            -f64::MIN_POSITIVE,
838            -0.0,
839            0.0,
840            f64::MIN_POSITIVE,
841            0.5,
842            1.0,
843            1e100,
844            f64::INFINITY,
845        ];
846        for &v in &test_values {
847            let encoded = encode_key_value(&Value::Real(v));
848            let (decoded, _) = decode_key_value(&encoded).unwrap();
849            match decoded {
850                Value::Real(r) => {
851                    assert!(
852                        v.to_bits() == r.to_bits(),
853                        "roundtrip failed for {v}: got {r}"
854                    );
855                }
856                _ => panic!("expected Real"),
857            }
858        }
859    }
860
861    #[test]
862    fn key_real_sort_order() {
863        let values = [
864            f64::NEG_INFINITY,
865            -100.0,
866            -1.0,
867            -0.0,
868            0.0,
869            1.0,
870            100.0,
871            f64::INFINITY,
872        ];
873        let encoded: Vec<Vec<u8>> = values
874            .iter()
875            .map(|&v| encode_key_value(&Value::Real(v)))
876            .collect();
877
878        for i in 0..encoded.len() - 1 {
879            assert!(
880                encoded[i] <= encoded[i + 1],
881                "sort order broken: {} vs {}",
882                values[i],
883                values[i + 1]
884            );
885        }
886    }
887
888    #[test]
889    fn key_text_roundtrip() {
890        let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
891        for &v in &test_values {
892            let encoded = encode_key_value(&Value::Text(v.into()));
893            let (decoded, _) = decode_key_value(&encoded).unwrap();
894            assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
895        }
896    }
897
898    #[test]
899    fn key_text_sort_order() {
900        let values = ["", "a", "ab", "b", "ba", "z"];
901        let encoded: Vec<Vec<u8>> = values
902            .iter()
903            .map(|&v| encode_key_value(&Value::Text(v.into())))
904            .collect();
905
906        for i in 0..encoded.len() - 1 {
907            assert!(
908                encoded[i] < encoded[i + 1],
909                "sort order broken: {:?} vs {:?}",
910                values[i],
911                values[i + 1]
912            );
913        }
914    }
915
916    #[test]
917    fn key_blob_roundtrip() {
918        let test_values: Vec<Vec<u8>> = vec![
919            vec![],
920            vec![0x00],
921            vec![0x00, 0xFF],
922            vec![0xFF, 0x00],
923            vec![0x00, 0x00, 0x00],
924        ];
925        for v in &test_values {
926            let encoded = encode_key_value(&Value::Blob(v.clone()));
927            let (decoded, _) = decode_key_value(&encoded).unwrap();
928            assert_eq!(decoded, Value::Blob(v.clone()));
929        }
930    }
931
932    #[test]
933    fn key_composite_roundtrip() {
934        let values = vec![
935            Value::Integer(42),
936            Value::Text("hello".into()),
937            Value::Boolean(true),
938        ];
939        let encoded = encode_composite_key(&values);
940        let decoded = decode_composite_key(&encoded, 3).unwrap();
941        assert_eq!(decoded[0], Value::Integer(42));
942        assert_eq!(decoded[1], Value::Text("hello".into()));
943        assert_eq!(decoded[2], Value::Boolean(true));
944    }
945
946    #[test]
947    fn key_composite_sort_order() {
948        // Composite keys: (1, "b") < (1, "c") < (2, "a")
949        let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
950        let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
951        let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
952        assert!(k1 < k2);
953        assert!(k2 < k3);
954    }
955
956    #[test]
957    fn key_cross_type_ordering() {
958        let null = encode_key_value(&Value::Null);
959        let bool_val = encode_key_value(&Value::Boolean(false));
960        let int = encode_key_value(&Value::Integer(0));
961        let text = encode_key_value(&Value::Text("".into()));
962        let blob = encode_key_value(&Value::Blob(vec![]));
963
964        assert!(null < blob);
965        assert!(blob < text);
966        assert!(text < bool_val);
967        assert!(bool_val < int);
968    }
969
970    // ── Row encoding tests ──────────────────────────────────────────
971
972    #[test]
973    fn row_roundtrip_simple() {
974        let values = vec![
975            Value::Integer(42),
976            Value::Text("hello".into()),
977            Value::Boolean(true),
978        ];
979        let encoded = encode_row(&values);
980        let decoded = decode_row(&encoded).unwrap();
981        assert_eq!(decoded.len(), 3);
982        assert_eq!(decoded[0], Value::Integer(42));
983        assert_eq!(decoded[1], Value::Text("hello".into()));
984        assert_eq!(decoded[2], Value::Boolean(true));
985    }
986
987    #[test]
988    fn row_roundtrip_with_nulls() {
989        let values = vec![
990            Value::Integer(1),
991            Value::Null,
992            Value::Text("test".into()),
993            Value::Null,
994        ];
995        let encoded = encode_row(&values);
996        let decoded = decode_row(&encoded).unwrap();
997        assert_eq!(decoded.len(), 4);
998        assert_eq!(decoded[0], Value::Integer(1));
999        assert!(decoded[1].is_null());
1000        assert_eq!(decoded[2], Value::Text("test".into()));
1001        assert!(decoded[3].is_null());
1002    }
1003
1004    #[test]
1005    fn row_roundtrip_empty() {
1006        let values: Vec<Value> = vec![];
1007        let encoded = encode_row(&values);
1008        let decoded = decode_row(&encoded).unwrap();
1009        assert!(decoded.is_empty());
1010    }
1011
1012    #[test]
1013    fn row_roundtrip_all_types() {
1014        let values = vec![
1015            Value::Integer(-100),
1016            Value::Real(3.15),
1017            Value::Text("hello world".into()),
1018            Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1019            Value::Boolean(false),
1020            Value::Null,
1021        ];
1022        let encoded = encode_row(&values);
1023        let decoded = decode_row(&encoded).unwrap();
1024        assert_eq!(decoded.len(), 6);
1025        assert_eq!(decoded[0], Value::Integer(-100));
1026        assert_eq!(decoded[1], Value::Real(3.15));
1027        assert_eq!(decoded[2], Value::Text("hello world".into()));
1028        assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1029        assert_eq!(decoded[4], Value::Boolean(false));
1030        assert!(decoded[5].is_null());
1031    }
1032
1033    #[test]
1034    fn null_escaped_with_embedded_nulls() {
1035        let text = "before\0after";
1036        let encoded = encode_key_value(&Value::Text(text.into()));
1037        let (decoded, _) = decode_key_value(&encoded).unwrap();
1038        assert_eq!(decoded, Value::Text(text.into()));
1039    }
1040
1041    #[test]
1042    fn key_integer_edge_cases() {
1043        for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1044            let encoded = encode_key_value(&Value::Integer(v));
1045            let (decoded, n) = decode_key_value(&encoded).unwrap();
1046            assert_eq!(n, encoded.len());
1047            assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1048        }
1049    }
1050
1051    #[test]
1052    fn decode_columns_single() {
1053        let values = vec![
1054            Value::Integer(42),
1055            Value::Text("hello".into()),
1056            Value::Boolean(true),
1057        ];
1058        let encoded = encode_row(&values);
1059        let cols = decode_columns(&encoded, &[1]).unwrap();
1060        assert_eq!(cols.len(), 1);
1061        assert_eq!(cols[0], Value::Text("hello".into()));
1062    }
1063
1064    #[test]
1065    fn decode_columns_multiple() {
1066        let values = vec![
1067            Value::Integer(1),
1068            Value::Real(2.5),
1069            Value::Text("skip".into()),
1070            Value::Boolean(false),
1071            Value::Blob(vec![0xAB]),
1072        ];
1073        let encoded = encode_row(&values);
1074        let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1075        assert_eq!(cols.len(), 3);
1076        assert_eq!(cols[0], Value::Integer(1));
1077        assert_eq!(cols[1], Value::Boolean(false));
1078        assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1079    }
1080
1081    #[test]
1082    fn decode_columns_with_nulls() {
1083        let values = vec![
1084            Value::Integer(10),
1085            Value::Null,
1086            Value::Text("after_null".into()),
1087            Value::Null,
1088            Value::Boolean(true),
1089        ];
1090        let encoded = encode_row(&values);
1091        let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1092        assert_eq!(cols.len(), 3);
1093        assert!(cols[0].is_null());
1094        assert_eq!(cols[1], Value::Text("after_null".into()));
1095        assert_eq!(cols[2], Value::Boolean(true));
1096    }
1097
1098    #[test]
1099    fn decode_columns_first_and_last() {
1100        let values = vec![
1101            Value::Text("first".into()),
1102            Value::Integer(99),
1103            Value::Boolean(false),
1104            Value::Real(3.125),
1105        ];
1106        let encoded = encode_row(&values);
1107        let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1108        assert_eq!(cols.len(), 2);
1109        assert_eq!(cols[0], Value::Text("first".into()));
1110        assert_eq!(cols[1], Value::Real(3.125));
1111    }
1112
1113    #[test]
1114    fn decode_columns_empty_targets() {
1115        let values = vec![Value::Integer(1)];
1116        let encoded = encode_row(&values);
1117        let cols = decode_columns(&encoded, &[]).unwrap();
1118        assert!(cols.is_empty());
1119    }
1120
1121    #[test]
1122    fn decode_columns_all_matches_full_decode() {
1123        let values = vec![
1124            Value::Integer(-100),
1125            Value::Real(3.15),
1126            Value::Text("hello world".into()),
1127            Value::Blob(vec![0xDE, 0xAD]),
1128            Value::Boolean(false),
1129            Value::Null,
1130        ];
1131        let encoded = encode_row(&values);
1132        let full = decode_row(&encoded).unwrap();
1133        let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1134        assert_eq!(full, selective);
1135    }
1136
1137    #[test]
1138    fn raw_column_integer() {
1139        let values = vec![Value::Integer(42), Value::Text("hello".into())];
1140        let encoded = encode_row(&values);
1141        let raw = decode_column_raw(&encoded, 0).unwrap();
1142        assert!(matches!(raw, RawColumn::Integer(42)));
1143        assert_eq!(raw.to_value(), Value::Integer(42));
1144    }
1145
1146    #[test]
1147    fn raw_column_text_borrows() {
1148        let values = vec![Value::Integer(1), Value::Text("hello".into())];
1149        let encoded = encode_row(&values);
1150        let raw = decode_column_raw(&encoded, 1).unwrap();
1151        match raw {
1152            RawColumn::Text(s) => assert_eq!(s, "hello"),
1153            other => panic!("expected Text, got {other:?}"),
1154        }
1155    }
1156
1157    #[test]
1158    fn raw_column_null() {
1159        let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1160        let encoded = encode_row(&values);
1161        let raw = decode_column_raw(&encoded, 1).unwrap();
1162        assert!(matches!(raw, RawColumn::Null));
1163    }
1164
1165    #[test]
1166    fn raw_column_last() {
1167        let values = vec![
1168            Value::Integer(1),
1169            Value::Text("skip".into()),
1170            Value::Real(3.15),
1171        ];
1172        let encoded = encode_row(&values);
1173        let raw = decode_column_raw(&encoded, 2).unwrap();
1174        match raw {
1175            RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1176            other => panic!("expected Real, got {other:?}"),
1177        }
1178    }
1179
1180    #[test]
1181    fn raw_column_out_of_bounds() {
1182        let values = vec![Value::Integer(1)];
1183        let encoded = encode_row(&values);
1184        assert!(decode_column_raw(&encoded, 1).is_err());
1185    }
1186
1187    #[test]
1188    fn raw_column_eq_value() {
1189        let raw_int = RawColumn::Integer(42);
1190        assert!(raw_int.eq_value(&Value::Integer(42)));
1191        assert!(!raw_int.eq_value(&Value::Integer(43)));
1192        assert!(raw_int.eq_value(&Value::Real(42.0)));
1193
1194        let raw_text = RawColumn::Text("hello");
1195        assert!(raw_text.eq_value(&Value::Text("hello".into())));
1196        assert!(!raw_text.eq_value(&Value::Text("world".into())));
1197    }
1198
1199    #[test]
1200    fn raw_column_cmp_value() {
1201        use std::cmp::Ordering;
1202        let raw = RawColumn::Integer(42);
1203        assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1204        assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1205        assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1206        assert_eq!(raw.cmp_value(&Value::Null), None);
1207    }
1208
1209    #[test]
1210    fn raw_column_as_numeric() {
1211        assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1212        assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1213        assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1214        assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1215        assert_eq!(RawColumn::Text("x").as_f64(), None);
1216        assert_eq!(RawColumn::Null.as_i64(), None);
1217    }
1218
1219    #[test]
1220    fn decode_pk_integer_roundtrip() {
1221        for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1222            let encoded = encode_key_value(&Value::Integer(v));
1223            let decoded = decode_pk_integer(&encoded).unwrap();
1224            assert_eq!(decoded, v);
1225        }
1226    }
1227
1228    #[test]
1229    fn decode_pk_integer_rejects_non_integer() {
1230        let encoded = encode_key_value(&Value::Text("hello".into()));
1231        assert!(decode_pk_integer(&encoded).is_err());
1232    }
1233
1234    #[test]
1235    fn raw_column_blob() {
1236        let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1237        let encoded = encode_row(&values);
1238        let raw = decode_column_raw(&encoded, 0).unwrap();
1239        match raw {
1240            RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1241            other => panic!("expected Blob, got {other:?}"),
1242        }
1243    }
1244
1245    #[test]
1246    fn raw_column_matches_full_decode() {
1247        let values = vec![
1248            Value::Integer(-100),
1249            Value::Real(3.15),
1250            Value::Text("hello world".into()),
1251            Value::Blob(vec![0xDE, 0xAD]),
1252            Value::Boolean(false),
1253            Value::Null,
1254        ];
1255        let encoded = encode_row(&values);
1256        let full = decode_row(&encoded).unwrap();
1257        for (i, expected) in full.iter().enumerate() {
1258            let raw = decode_column_raw(&encoded, i).unwrap();
1259            assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1260        }
1261    }
1262}