Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6// ── Key encoding (order-preserving) ─────────────────────────────────
7
8/// Type tag bytes for key encoding. Ordering: NULL < BLOB < TEXT < BOOLEAN < INTEGER < REAL
9const TAG_NULL: u8 = 0x00;
10const TAG_BLOB: u8 = 0x01;
11const TAG_TEXT: u8 = 0x02;
12const TAG_BOOLEAN: u8 = 0x03;
13const TAG_INTEGER: u8 = 0x04;
14const TAG_REAL: u8 = 0x05;
15
16/// Encode a single value into an order-preserving byte sequence.
17pub fn encode_key_value(value: &Value) -> Vec<u8> {
18    match value {
19        Value::Null => vec![TAG_NULL],
20        Value::Boolean(b) => vec![TAG_BOOLEAN, if *b { 0x01 } else { 0x00 }],
21        Value::Integer(i) => encode_integer(*i),
22        Value::Real(r) => encode_real(*r),
23        Value::Text(s) => encode_bytes(TAG_TEXT, s.as_bytes()),
24        Value::Blob(b) => encode_bytes(TAG_BLOB, b),
25    }
26}
27
28/// Encode a composite key (multiple values concatenated).
29pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
30    let mut buf = Vec::new();
31    for v in values {
32        buf.extend_from_slice(&encode_key_value(v));
33    }
34    buf
35}
36
37pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
38    buf.clear();
39    for v in values {
40        encode_key_value_into(v, buf);
41    }
42}
43
44fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
45    match value {
46        Value::Null => buf.push(TAG_NULL),
47        Value::Boolean(b) => {
48            buf.push(TAG_BOOLEAN);
49            buf.push(if *b { 0x01 } else { 0x00 });
50        }
51        Value::Integer(i) => encode_integer_into(*i, buf),
52        Value::Real(r) => encode_real_into(*r, buf),
53        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
54        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
55    }
56}
57
58fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
59    buf.push(TAG_INTEGER);
60    if val == 0 {
61        buf.push(0x80);
62        return;
63    }
64    if val > 0 {
65        let bytes = val.to_be_bytes();
66        let start = bytes.iter().position(|&b| b != 0).unwrap();
67        let byte_count = (8 - start) as u8;
68        buf.push(0x80 + byte_count);
69        buf.extend_from_slice(&bytes[start..]);
70    } else {
71        let abs_val = if val == i64::MIN {
72            u64::MAX / 2 + 1
73        } else {
74            (-val) as u64
75        };
76        let bytes = abs_val.to_be_bytes();
77        let start = bytes.iter().position(|&b| b != 0).unwrap();
78        let byte_count = (8 - start) as u8;
79        buf.push(0x80 - byte_count);
80        for &b in &bytes[start..] {
81            buf.push(!b);
82        }
83    }
84}
85
86fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
87    buf.push(TAG_REAL);
88    let bits = val.to_bits();
89    let encoded = if val.is_sign_negative() {
90        !bits
91    } else {
92        bits ^ (1u64 << 63)
93    };
94    buf.extend_from_slice(&encoded.to_be_bytes());
95}
96
97fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
98    buf.push(tag);
99    for &b in data {
100        if b == 0x00 {
101            buf.push(0x00);
102            buf.push(0xFF);
103        } else {
104            buf.push(b);
105        }
106    }
107    buf.push(0x00);
108}
109
110/// Decode a single key value, returning the value and the number of bytes consumed.
111pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
112    if data.is_empty() {
113        return Err(SqlError::InvalidValue("empty key data".into()));
114    }
115    match data[0] {
116        TAG_NULL => Ok((Value::Null, 1)),
117        TAG_BOOLEAN => {
118            if data.len() < 2 {
119                return Err(SqlError::InvalidValue("truncated boolean".into()));
120            }
121            Ok((Value::Boolean(data[1] != 0), 2))
122        }
123        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
124        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
125        TAG_TEXT => {
126            let (bytes, n) = decode_null_escaped(&data[1..])?;
127            let s = String::from_utf8(bytes)
128                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
129            Ok((Value::Text(CompactString::from(s)), n + 1))
130        }
131        TAG_BLOB => {
132            let (bytes, n) = decode_null_escaped(&data[1..])?;
133            Ok((Value::Blob(bytes), n + 1))
134        }
135        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
136    }
137}
138
139/// Decode a composite key into multiple values.
140pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
141    let mut values = Vec::with_capacity(count);
142    let mut pos = 0;
143    for _ in 0..count {
144        let (v, n) = decode_key_value(&data[pos..])?;
145        values.push(v);
146        pos += n;
147    }
148    Ok(values)
149}
150
151// ── Integer encoding (variable-width) ───────────────────────────────
152
153fn encode_integer(val: i64) -> Vec<u8> {
154    let mut buf = vec![TAG_INTEGER];
155    if val == 0 {
156        buf.push(0x80);
157        return buf;
158    }
159    if val > 0 {
160        let bytes = val.to_be_bytes();
161        // Find first non-zero byte
162        let start = bytes.iter().position(|&b| b != 0).unwrap();
163        let byte_count = (8 - start) as u8;
164        buf.push(0x80 + byte_count);
165        buf.extend_from_slice(&bytes[start..]);
166    } else {
167        // Negative: one's complement of absolute value
168        let abs_val = if val == i64::MIN {
169            // Special case: |i64::MIN| doesn't fit in i64
170            u64::MAX / 2 + 1
171        } else {
172            (-val) as u64
173        };
174        let bytes = abs_val.to_be_bytes();
175        let start = bytes.iter().position(|&b| b != 0).unwrap();
176        let byte_count = (8 - start) as u8;
177        buf.push(0x80 - byte_count);
178        // One's complement: invert all bits
179        for &b in &bytes[start..] {
180            buf.push(!b);
181        }
182    }
183    buf
184}
185
186fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
187    if data.is_empty() {
188        return Err(SqlError::InvalidValue("truncated integer".into()));
189    }
190    let marker = data[0];
191    if marker == 0x80 {
192        return Ok((Value::Integer(0), 1));
193    }
194    if marker > 0x80 {
195        // Positive
196        let byte_count = (marker - 0x80) as usize;
197        if data.len() < 1 + byte_count {
198            return Err(SqlError::InvalidValue("truncated positive integer".into()));
199        }
200        let mut bytes = [0u8; 8];
201        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
202        let val = i64::from_be_bytes(bytes);
203        Ok((Value::Integer(val), 1 + byte_count))
204    } else {
205        // Negative
206        let byte_count = (0x80 - marker) as usize;
207        if data.len() < 1 + byte_count {
208            return Err(SqlError::InvalidValue("truncated negative integer".into()));
209        }
210        let mut bytes = [0u8; 8];
211        for i in 0..byte_count {
212            bytes[8 - byte_count + i] = !data[1 + i];
213        }
214        let abs_val = u64::from_be_bytes(bytes);
215        // Use wrapping negation to handle i64::MIN correctly
216        let val = (-(abs_val as i128)) as i64;
217        Ok((Value::Integer(val), 1 + byte_count))
218    }
219}
220
221// ── Real encoding (IEEE 754 sign-bit manipulation) ──────────────────
222
223fn encode_real(val: f64) -> Vec<u8> {
224    let mut buf = vec![TAG_REAL];
225    let bits = val.to_bits();
226    let encoded = if val.is_sign_negative() {
227        // Negative (including -0.0): flip ALL bits
228        !bits
229    } else {
230        // Positive (including +0.0): flip sign bit only
231        bits ^ (1u64 << 63)
232    };
233    buf.extend_from_slice(&encoded.to_be_bytes());
234    buf
235}
236
237fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
238    if data.len() < 8 {
239        return Err(SqlError::InvalidValue("truncated real".into()));
240    }
241    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
242    let bits = if encoded & (1u64 << 63) != 0 {
243        // Was positive: undo sign bit flip
244        encoded ^ (1u64 << 63)
245    } else {
246        // Was negative: undo full inversion
247        !encoded
248    };
249    let val = f64::from_bits(bits);
250    Ok((Value::Real(val), 8))
251}
252
253// ── Null-escaped byte encoding ──────────────────────────────────────
254
255/// Encode bytes with null-escape: 0x00 -> 0x00 0xFF, terminated by bare 0x00.
256fn encode_bytes(tag: u8, data: &[u8]) -> Vec<u8> {
257    let mut buf = Vec::with_capacity(data.len() + 2);
258    buf.push(tag);
259    for &b in data {
260        if b == 0x00 {
261            buf.push(0x00);
262            buf.push(0xFF);
263        } else {
264            buf.push(b);
265        }
266    }
267    buf.push(0x00); // terminator
268    buf
269}
270
271/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
272fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
273    let mut result = Vec::new();
274    let mut i = 0;
275    while i < data.len() {
276        if data[i] == 0x00 {
277            if i + 1 < data.len() && data[i + 1] == 0xFF {
278                result.push(0x00);
279                i += 2;
280            } else {
281                return Ok((result, i + 1)); // terminator consumed
282            }
283        } else {
284            result.push(data[i]);
285            i += 1;
286        }
287    }
288    Err(SqlError::InvalidValue(
289        "unterminated null-escaped string".into(),
290    ))
291}
292
293// ── Row encoding (for B+ tree values - non-PK columns) ─────────────
294
295/// Encode non-PK columns. Format: [col_count:u16][null_bitmap][type(u8)+len(u32)+data per col]
296pub fn encode_row(values: &[Value]) -> Vec<u8> {
297    let col_count = values.len();
298    let bitmap_bytes = col_count.div_ceil(8);
299    let mut buf = Vec::new();
300
301    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
302    let mut bitmap = vec![0u8; bitmap_bytes];
303    for (i, v) in values.iter().enumerate() {
304        if v.is_null() {
305            bitmap[i / 8] |= 1 << (i % 8);
306        }
307    }
308    buf.extend_from_slice(&bitmap);
309
310    for v in values {
311        if v.is_null() {
312            continue;
313        }
314        match v {
315            Value::Integer(i) => {
316                buf.push(DataType::Integer.type_tag());
317                buf.extend_from_slice(&8u32.to_le_bytes());
318                buf.extend_from_slice(&i.to_le_bytes());
319            }
320            Value::Real(r) => {
321                buf.push(DataType::Real.type_tag());
322                buf.extend_from_slice(&8u32.to_le_bytes());
323                buf.extend_from_slice(&r.to_le_bytes());
324            }
325            Value::Boolean(b) => {
326                buf.push(DataType::Boolean.type_tag());
327                buf.extend_from_slice(&1u32.to_le_bytes());
328                buf.push(if *b { 1 } else { 0 });
329            }
330            Value::Text(s) => {
331                let bytes = s.as_bytes();
332                buf.push(DataType::Text.type_tag());
333                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
334                buf.extend_from_slice(bytes);
335            }
336            Value::Blob(data) => {
337                buf.push(DataType::Blob.type_tag());
338                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
339                buf.extend_from_slice(data);
340            }
341            Value::Null => unreachable!(),
342        }
343    }
344
345    buf
346}
347
348pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
349    buf.clear();
350    let col_count = values.len();
351    let bitmap_bytes = col_count.div_ceil(8);
352
353    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
354
355    let bitmap_start = buf.len();
356    buf.resize(buf.len() + bitmap_bytes, 0);
357
358    for (i, v) in values.iter().enumerate() {
359        if v.is_null() {
360            buf[bitmap_start + i / 8] |= 1 << (i % 8);
361            continue;
362        }
363        match v {
364            Value::Integer(val) => {
365                buf.push(DataType::Integer.type_tag());
366                buf.extend_from_slice(&8u32.to_le_bytes());
367                buf.extend_from_slice(&val.to_le_bytes());
368            }
369            Value::Real(r) => {
370                buf.push(DataType::Real.type_tag());
371                buf.extend_from_slice(&8u32.to_le_bytes());
372                buf.extend_from_slice(&r.to_le_bytes());
373            }
374            Value::Boolean(b) => {
375                buf.push(DataType::Boolean.type_tag());
376                buf.extend_from_slice(&1u32.to_le_bytes());
377                buf.push(if *b { 1 } else { 0 });
378            }
379            Value::Text(s) => {
380                let bytes = s.as_bytes();
381                buf.push(DataType::Text.type_tag());
382                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
383                buf.extend_from_slice(bytes);
384            }
385            Value::Blob(data) => {
386                buf.push(DataType::Blob.type_tag());
387                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
388                buf.extend_from_slice(data);
389            }
390            Value::Null => unreachable!(),
391        }
392    }
393}
394
395fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
396    match DataType::from_tag(type_tag) {
397        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
398            data[..8].try_into().unwrap(),
399        ))),
400        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
401            data[..8].try_into().unwrap(),
402        ))),
403        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
404        Some(DataType::Text) => {
405            let s = std::str::from_utf8(data)
406                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
407            Ok(Value::Text(CompactString::from(s)))
408        }
409        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
410        _ => Err(SqlError::InvalidValue(format!(
411            "unknown column type tag: {type_tag}"
412        ))),
413    }
414}
415
416fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
417    if data.len() < 2 {
418        return Err(SqlError::InvalidValue("row data too short".into()));
419    }
420    let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
421    let bitmap_bytes = col_count.div_ceil(8);
422    let pos = 2;
423    if data.len() < pos + bitmap_bytes {
424        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
425    }
426    Ok((
427        col_count,
428        &data[pos..pos + bitmap_bytes],
429        pos + bitmap_bytes,
430    ))
431}
432
433pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
434    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
435
436    let mut values = Vec::with_capacity(col_count);
437    for i in 0..col_count {
438        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
439            values.push(Value::Null);
440            continue;
441        }
442
443        if pos + 5 > data.len() {
444            return Err(SqlError::InvalidValue("truncated column data".into()));
445        }
446        let type_tag = data[pos];
447        pos += 1;
448        let data_len =
449            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
450        pos += 4;
451
452        if pos + data_len > data.len() {
453            return Err(SqlError::InvalidValue("truncated column value".into()));
454        }
455
456        values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
457        pos += data_len;
458    }
459
460    Ok(values)
461}
462
463/// Returns the number of non-PK columns stored in a row value blob.
464#[inline]
465pub fn row_non_pk_count(data: &[u8]) -> usize {
466    u16::from_le_bytes([data[0], data[1]]) as usize
467}
468
469pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
470    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
471
472    for i in 0..col_count {
473        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
474            continue;
475        }
476
477        if pos + 5 > data.len() {
478            return Err(SqlError::InvalidValue("truncated column data".into()));
479        }
480        let type_tag = data[pos];
481        pos += 1;
482        let data_len =
483            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
484        pos += 4;
485
486        if pos + data_len > data.len() {
487            return Err(SqlError::InvalidValue("truncated column value".into()));
488        }
489
490        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
491            out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
492        }
493        pos += data_len;
494    }
495
496    Ok(())
497}
498
499pub fn decode_pk_into(
500    key: &[u8],
501    count: usize,
502    out: &mut [Value],
503    pk_mapping: &[usize],
504) -> Result<()> {
505    let mut pos = 0;
506    for i in 0..count {
507        let (v, n) = decode_key_value(&key[pos..])?;
508        if i < pk_mapping.len() {
509            out[pk_mapping[i]] = v;
510        }
511        pos += n;
512    }
513    Ok(())
514}
515
516pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
517    if targets.is_empty() {
518        return Ok(Vec::new());
519    }
520    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
521
522    let mut results = Vec::with_capacity(targets.len());
523    let mut ti = 0;
524
525    for col in 0..col_count {
526        if ti >= targets.len() {
527            break;
528        }
529        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
530
531        if col == targets[ti] {
532            if is_null {
533                results.push(Value::Null);
534            } else {
535                if pos + 5 > data.len() {
536                    return Err(SqlError::InvalidValue("truncated column data".into()));
537                }
538                let type_tag = data[pos];
539                pos += 1;
540                let data_len =
541                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
542                        as usize;
543                pos += 4;
544                if pos + data_len > data.len() {
545                    return Err(SqlError::InvalidValue("truncated column value".into()));
546                }
547                results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
548                pos += data_len;
549            }
550            ti += 1;
551        } else if !is_null {
552            if pos + 5 > data.len() {
553                return Err(SqlError::InvalidValue("truncated column data".into()));
554            }
555            let data_len =
556                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
557                    as usize;
558            pos += 5 + data_len;
559        }
560    }
561
562    // Targets beyond stored column count get Null
563    while ti < targets.len() {
564        results.push(Value::Null);
565        ti += 1;
566    }
567
568    Ok(results)
569}
570
571pub fn decode_columns_into(
572    data: &[u8],
573    targets: &[usize],
574    schema_cols: &[usize],
575    row: &mut [Value],
576) -> Result<()> {
577    if targets.is_empty() {
578        return Ok(());
579    }
580    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
581
582    let mut ti = 0;
583    for col in 0..col_count {
584        if ti >= targets.len() {
585            break;
586        }
587        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
588
589        if col == targets[ti] {
590            if is_null {
591                row[schema_cols[ti]] = Value::Null;
592            } else {
593                if pos + 5 > data.len() {
594                    return Err(SqlError::InvalidValue("truncated column data".into()));
595                }
596                let type_tag = data[pos];
597                pos += 1;
598                let data_len =
599                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
600                        as usize;
601                pos += 4;
602                if pos + data_len > data.len() {
603                    return Err(SqlError::InvalidValue("truncated column value".into()));
604                }
605                row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
606                pos += data_len;
607            }
608            ti += 1;
609        } else if !is_null {
610            if pos + 5 > data.len() {
611                return Err(SqlError::InvalidValue("truncated column data".into()));
612            }
613            let data_len =
614                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
615                    as usize;
616            pos += 5 + data_len;
617        }
618    }
619
620    Ok(())
621}
622
623#[derive(Debug, Clone, Copy)]
624pub enum RawColumn<'a> {
625    Null,
626    Integer(i64),
627    Real(f64),
628    Boolean(bool),
629    Text(&'a str),
630    Blob(&'a [u8]),
631}
632
633impl<'a> RawColumn<'a> {
634    pub fn to_value(self) -> Value {
635        match self {
636            RawColumn::Null => Value::Null,
637            RawColumn::Integer(i) => Value::Integer(i),
638            RawColumn::Real(r) => Value::Real(r),
639            RawColumn::Boolean(b) => Value::Boolean(b),
640            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
641            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
642        }
643    }
644
645    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
646        use std::cmp::Ordering;
647        match (self, other) {
648            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
649            (RawColumn::Null, _) | (_, Value::Null) => None,
650            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
651            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
652            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
653            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
654            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
655            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
656            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
657            _ => None,
658        }
659    }
660
661    pub fn eq_value(&self, other: &Value) -> bool {
662        match (self, other) {
663            (RawColumn::Null, Value::Null) => true,
664            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
665            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
666            (RawColumn::Real(a), Value::Real(b)) => a == b,
667            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
668            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
669            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
670            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
671            _ => false,
672        }
673    }
674
675    pub fn as_f64(&self) -> Option<f64> {
676        match self {
677            RawColumn::Integer(i) => Some(*i as f64),
678            RawColumn::Real(r) => Some(*r),
679            _ => None,
680        }
681    }
682
683    pub fn as_i64(&self) -> Option<i64> {
684        match self {
685            RawColumn::Integer(i) => Some(*i),
686            _ => None,
687        }
688    }
689}
690
691fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
692    match DataType::from_tag(type_tag) {
693        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
694            data[..8].try_into().unwrap(),
695        ))),
696        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
697            data[..8].try_into().unwrap(),
698        ))),
699        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
700        Some(DataType::Text) => {
701            let s = std::str::from_utf8(data)
702                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
703            Ok(RawColumn::Text(s))
704        }
705        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
706        _ => Err(SqlError::InvalidValue(format!(
707            "unknown column type tag: {type_tag}"
708        ))),
709    }
710}
711
712/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
713pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
714    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
715    if target >= col_count || new_val.is_null() {
716        return Ok(false);
717    }
718    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
719    if was_null {
720        return Ok(false);
721    }
722    for col in 0..target {
723        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
724        if !is_null {
725            if pos + 5 > data.len() {
726                return Err(SqlError::InvalidValue("truncated column data".into()));
727            }
728            let data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
729            pos += 5 + data_len;
730        }
731    }
732    if pos + 5 > data.len() {
733        return Err(SqlError::InvalidValue("truncated column data".into()));
734    }
735    let old_data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
736    let new_data_len = match new_val {
737        Value::Integer(_) | Value::Real(_) => 8,
738        Value::Boolean(_) => 1,
739        Value::Text(s) => s.len(),
740        Value::Blob(b) => b.len(),
741        Value::Null => return Ok(false),
742    };
743    if new_data_len != old_data_len {
744        return Ok(false);
745    }
746    data[pos] = new_val.data_type().type_tag();
747    let val_start = pos + 5;
748    match new_val {
749        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
750        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
751        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
752        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
753        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
754        Value::Null => unreachable!(),
755    }
756    Ok(true)
757}
758
759/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
760pub fn patch_row_column(
761    data: &[u8],
762    target: usize,
763    new_val: &Value,
764    out: &mut Vec<u8>,
765) -> Result<()> {
766    let (col_count, bitmap, header_end) = parse_row_header(data)?;
767
768    // Expand row if target is beyond stored col count (ALTER TABLE ADD COLUMN)
769    let new_col_count = if target >= col_count {
770        target + 1
771    } else {
772        col_count
773    };
774    let new_bitmap_bytes = new_col_count.div_ceil(8);
775    let bitmap_bytes = col_count.div_ceil(8);
776    out.clear();
777
778    out.extend_from_slice(&(new_col_count as u16).to_le_bytes());
779    let bitmap_start = out.len();
780    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
781    for _ in bitmap_bytes..new_bitmap_bytes {
782        out.push(0xFF); // new columns default to NULL
783    }
784    if new_val.is_null() {
785        out[bitmap_start + target / 8] |= 1 << (target % 8);
786    } else {
787        out[bitmap_start + target / 8] &= !(1 << (target % 8));
788    }
789
790    let mut pos = header_end;
791    for col in 0..new_col_count {
792        let was_null = if col < col_count {
793            bitmap[col / 8] & (1 << (col % 8)) != 0
794        } else {
795            true // columns beyond old col_count are NULL on disk
796        };
797
798        if col == target {
799            if !was_null && pos + 5 <= data.len() {
800                let data_len = u32::from_le_bytes([
801                    data[pos + 1],
802                    data[pos + 2],
803                    data[pos + 3],
804                    data[pos + 4],
805                ]) as usize;
806                pos += 5 + data_len;
807            }
808            if !new_val.is_null() {
809                match new_val {
810                    Value::Integer(v) => {
811                        out.push(DataType::Integer.type_tag());
812                        out.extend_from_slice(&8u32.to_le_bytes());
813                        out.extend_from_slice(&v.to_le_bytes());
814                    }
815                    Value::Real(r) => {
816                        out.push(DataType::Real.type_tag());
817                        out.extend_from_slice(&8u32.to_le_bytes());
818                        out.extend_from_slice(&r.to_le_bytes());
819                    }
820                    Value::Boolean(b) => {
821                        out.push(DataType::Boolean.type_tag());
822                        out.extend_from_slice(&1u32.to_le_bytes());
823                        out.push(if *b { 1 } else { 0 });
824                    }
825                    Value::Text(s) => {
826                        let bytes = s.as_bytes();
827                        out.push(DataType::Text.type_tag());
828                        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
829                        out.extend_from_slice(bytes);
830                    }
831                    Value::Blob(d) => {
832                        out.push(DataType::Blob.type_tag());
833                        out.extend_from_slice(&(d.len() as u32).to_le_bytes());
834                        out.extend_from_slice(d);
835                    }
836                    Value::Null => unreachable!(),
837                }
838            }
839        } else if was_null {
840        } else {
841            if pos + 5 > data.len() {
842                return Err(SqlError::InvalidValue("truncated column data".into()));
843            }
844            let data_len =
845                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
846                    as usize;
847            let end = pos + 5 + data_len;
848            if end > data.len() {
849                return Err(SqlError::InvalidValue("truncated column value".into()));
850            }
851            out.extend_from_slice(&data[pos..end]);
852            pos = end;
853        }
854    }
855    Ok(())
856}
857
858pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
859    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
860    if target >= col_count {
861        return Ok(RawColumn::Null);
862    }
863
864    for col in 0..=target {
865        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
866
867        if col == target {
868            if is_null {
869                return Ok(RawColumn::Null);
870            }
871            if pos + 5 > data.len() {
872                return Err(SqlError::InvalidValue("truncated column data".into()));
873            }
874            let type_tag = data[pos];
875            pos += 1;
876            let data_len =
877                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
878                    as usize;
879            pos += 4;
880            if pos + data_len > data.len() {
881                return Err(SqlError::InvalidValue("truncated column value".into()));
882            }
883            return decode_value_raw(type_tag, &data[pos..pos + data_len]);
884        } else if !is_null {
885            if pos + 5 > data.len() {
886                return Err(SqlError::InvalidValue("truncated column data".into()));
887            }
888            let data_len =
889                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
890                    as usize;
891            pos += 5 + data_len;
892        }
893    }
894
895    unreachable!()
896}
897
898/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
899pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
900    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
901    if target >= col_count {
902        return Ok((RawColumn::Null, usize::MAX));
903    }
904
905    for col in 0..=target {
906        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
907
908        if col == target {
909            if is_null {
910                return Ok((RawColumn::Null, usize::MAX));
911            }
912            if pos + 5 > data.len() {
913                return Err(SqlError::InvalidValue("truncated column data".into()));
914            }
915            let tag_offset = pos;
916            let type_tag = data[pos];
917            pos += 1;
918            let data_len =
919                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
920                    as usize;
921            pos += 4;
922            if pos + data_len > data.len() {
923                return Err(SqlError::InvalidValue("truncated column value".into()));
924            }
925            let raw = decode_value_raw(type_tag, &data[pos..pos + data_len])?;
926            return Ok((raw, tag_offset));
927        } else if !is_null {
928            if pos + 5 > data.len() {
929                return Err(SqlError::InvalidValue("truncated column data".into()));
930            }
931            let data_len =
932                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
933                    as usize;
934            pos += 5 + data_len;
935        }
936    }
937
938    unreachable!()
939}
940
941/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
942pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
943    if offset == usize::MAX || new_val.is_null() {
944        return Ok(false);
945    }
946    if offset + 5 > data.len() {
947        return Err(SqlError::InvalidValue("truncated column data".into()));
948    }
949    let old_data_len =
950        u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
951    let new_data_len = match new_val {
952        Value::Integer(_) | Value::Real(_) => 8,
953        Value::Boolean(_) => 1,
954        Value::Text(s) => s.len(),
955        Value::Blob(b) => b.len(),
956        Value::Null => return Ok(false),
957    };
958    if new_data_len != old_data_len {
959        return Ok(false);
960    }
961    data[offset] = new_val.data_type().type_tag();
962    let val_start = offset + 5;
963    match new_val {
964        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
965        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
966        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
967        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
968        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
969        Value::Null => unreachable!(),
970    }
971    Ok(true)
972}
973
974pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
975    if key.is_empty() || key[0] != TAG_INTEGER {
976        return Err(SqlError::InvalidValue("not an integer key".into()));
977    }
978    let (val, _) = decode_integer(&key[1..])?;
979    match val {
980        Value::Integer(i) => Ok(i),
981        _ => unreachable!(),
982    }
983}
984
985#[cfg(test)]
986mod tests {
987    use super::*;
988
989    // ── Key encoding tests ──────────────────────────────────────────
990
991    #[test]
992    fn key_null() {
993        let encoded = encode_key_value(&Value::Null);
994        let (decoded, n) = decode_key_value(&encoded).unwrap();
995        assert_eq!(n, 1);
996        assert_eq!(decoded, Value::Null);
997    }
998
999    #[test]
1000    fn key_boolean() {
1001        let f_enc = encode_key_value(&Value::Boolean(false));
1002        let t_enc = encode_key_value(&Value::Boolean(true));
1003        assert!(f_enc < t_enc);
1004
1005        let (f_dec, _) = decode_key_value(&f_enc).unwrap();
1006        let (t_dec, _) = decode_key_value(&t_enc).unwrap();
1007        assert_eq!(f_dec, Value::Boolean(false));
1008        assert_eq!(t_dec, Value::Boolean(true));
1009    }
1010
1011    #[test]
1012    fn key_integer_roundtrip() {
1013        let test_values = [
1014            i64::MIN,
1015            -1_000_000,
1016            -256,
1017            -1,
1018            0,
1019            1,
1020            127,
1021            128,
1022            255,
1023            256,
1024            65535,
1025            1_000_000,
1026            i64::MAX,
1027        ];
1028        for &v in &test_values {
1029            let encoded = encode_key_value(&Value::Integer(v));
1030            let (decoded, _) = decode_key_value(&encoded).unwrap();
1031            assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
1032        }
1033    }
1034
1035    #[test]
1036    fn key_integer_sort_order() {
1037        let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
1038        let encoded: Vec<Vec<u8>> = values
1039            .iter()
1040            .map(|&v| encode_key_value(&Value::Integer(v)))
1041            .collect();
1042
1043        for i in 0..encoded.len() - 1 {
1044            assert!(
1045                encoded[i] < encoded[i + 1],
1046                "sort order broken: {} vs {}",
1047                values[i],
1048                values[i + 1]
1049            );
1050        }
1051    }
1052
1053    #[test]
1054    fn key_real_roundtrip() {
1055        let test_values = [
1056            f64::NEG_INFINITY,
1057            -1e100,
1058            -1.0,
1059            -f64::MIN_POSITIVE,
1060            -0.0,
1061            0.0,
1062            f64::MIN_POSITIVE,
1063            0.5,
1064            1.0,
1065            1e100,
1066            f64::INFINITY,
1067        ];
1068        for &v in &test_values {
1069            let encoded = encode_key_value(&Value::Real(v));
1070            let (decoded, _) = decode_key_value(&encoded).unwrap();
1071            match decoded {
1072                Value::Real(r) => {
1073                    assert!(
1074                        v.to_bits() == r.to_bits(),
1075                        "roundtrip failed for {v}: got {r}"
1076                    );
1077                }
1078                _ => panic!("expected Real"),
1079            }
1080        }
1081    }
1082
1083    #[test]
1084    fn key_real_sort_order() {
1085        let values = [
1086            f64::NEG_INFINITY,
1087            -100.0,
1088            -1.0,
1089            -0.0,
1090            0.0,
1091            1.0,
1092            100.0,
1093            f64::INFINITY,
1094        ];
1095        let encoded: Vec<Vec<u8>> = values
1096            .iter()
1097            .map(|&v| encode_key_value(&Value::Real(v)))
1098            .collect();
1099
1100        for i in 0..encoded.len() - 1 {
1101            assert!(
1102                encoded[i] <= encoded[i + 1],
1103                "sort order broken: {} vs {}",
1104                values[i],
1105                values[i + 1]
1106            );
1107        }
1108    }
1109
1110    #[test]
1111    fn key_text_roundtrip() {
1112        let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
1113        for &v in &test_values {
1114            let encoded = encode_key_value(&Value::Text(v.into()));
1115            let (decoded, _) = decode_key_value(&encoded).unwrap();
1116            assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
1117        }
1118    }
1119
1120    #[test]
1121    fn key_text_sort_order() {
1122        let values = ["", "a", "ab", "b", "ba", "z"];
1123        let encoded: Vec<Vec<u8>> = values
1124            .iter()
1125            .map(|&v| encode_key_value(&Value::Text(v.into())))
1126            .collect();
1127
1128        for i in 0..encoded.len() - 1 {
1129            assert!(
1130                encoded[i] < encoded[i + 1],
1131                "sort order broken: {:?} vs {:?}",
1132                values[i],
1133                values[i + 1]
1134            );
1135        }
1136    }
1137
1138    #[test]
1139    fn key_blob_roundtrip() {
1140        let test_values: Vec<Vec<u8>> = vec![
1141            vec![],
1142            vec![0x00],
1143            vec![0x00, 0xFF],
1144            vec![0xFF, 0x00],
1145            vec![0x00, 0x00, 0x00],
1146        ];
1147        for v in &test_values {
1148            let encoded = encode_key_value(&Value::Blob(v.clone()));
1149            let (decoded, _) = decode_key_value(&encoded).unwrap();
1150            assert_eq!(decoded, Value::Blob(v.clone()));
1151        }
1152    }
1153
1154    #[test]
1155    fn key_composite_roundtrip() {
1156        let values = vec![
1157            Value::Integer(42),
1158            Value::Text("hello".into()),
1159            Value::Boolean(true),
1160        ];
1161        let encoded = encode_composite_key(&values);
1162        let decoded = decode_composite_key(&encoded, 3).unwrap();
1163        assert_eq!(decoded[0], Value::Integer(42));
1164        assert_eq!(decoded[1], Value::Text("hello".into()));
1165        assert_eq!(decoded[2], Value::Boolean(true));
1166    }
1167
1168    #[test]
1169    fn key_composite_sort_order() {
1170        // Composite keys: (1, "b") < (1, "c") < (2, "a")
1171        let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
1172        let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
1173        let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
1174        assert!(k1 < k2);
1175        assert!(k2 < k3);
1176    }
1177
1178    #[test]
1179    fn key_cross_type_ordering() {
1180        let null = encode_key_value(&Value::Null);
1181        let bool_val = encode_key_value(&Value::Boolean(false));
1182        let int = encode_key_value(&Value::Integer(0));
1183        let text = encode_key_value(&Value::Text("".into()));
1184        let blob = encode_key_value(&Value::Blob(vec![]));
1185
1186        assert!(null < blob);
1187        assert!(blob < text);
1188        assert!(text < bool_val);
1189        assert!(bool_val < int);
1190    }
1191
1192    // ── Row encoding tests ──────────────────────────────────────────
1193
1194    #[test]
1195    fn row_roundtrip_simple() {
1196        let values = vec![
1197            Value::Integer(42),
1198            Value::Text("hello".into()),
1199            Value::Boolean(true),
1200        ];
1201        let encoded = encode_row(&values);
1202        let decoded = decode_row(&encoded).unwrap();
1203        assert_eq!(decoded.len(), 3);
1204        assert_eq!(decoded[0], Value::Integer(42));
1205        assert_eq!(decoded[1], Value::Text("hello".into()));
1206        assert_eq!(decoded[2], Value::Boolean(true));
1207    }
1208
1209    #[test]
1210    fn row_roundtrip_with_nulls() {
1211        let values = vec![
1212            Value::Integer(1),
1213            Value::Null,
1214            Value::Text("test".into()),
1215            Value::Null,
1216        ];
1217        let encoded = encode_row(&values);
1218        let decoded = decode_row(&encoded).unwrap();
1219        assert_eq!(decoded.len(), 4);
1220        assert_eq!(decoded[0], Value::Integer(1));
1221        assert!(decoded[1].is_null());
1222        assert_eq!(decoded[2], Value::Text("test".into()));
1223        assert!(decoded[3].is_null());
1224    }
1225
1226    #[test]
1227    fn row_roundtrip_empty() {
1228        let values: Vec<Value> = vec![];
1229        let encoded = encode_row(&values);
1230        let decoded = decode_row(&encoded).unwrap();
1231        assert!(decoded.is_empty());
1232    }
1233
1234    #[test]
1235    fn row_roundtrip_all_types() {
1236        let values = vec![
1237            Value::Integer(-100),
1238            Value::Real(3.15),
1239            Value::Text("hello world".into()),
1240            Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1241            Value::Boolean(false),
1242            Value::Null,
1243        ];
1244        let encoded = encode_row(&values);
1245        let decoded = decode_row(&encoded).unwrap();
1246        assert_eq!(decoded.len(), 6);
1247        assert_eq!(decoded[0], Value::Integer(-100));
1248        assert_eq!(decoded[1], Value::Real(3.15));
1249        assert_eq!(decoded[2], Value::Text("hello world".into()));
1250        assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1251        assert_eq!(decoded[4], Value::Boolean(false));
1252        assert!(decoded[5].is_null());
1253    }
1254
1255    #[test]
1256    fn null_escaped_with_embedded_nulls() {
1257        let text = "before\0after";
1258        let encoded = encode_key_value(&Value::Text(text.into()));
1259        let (decoded, _) = decode_key_value(&encoded).unwrap();
1260        assert_eq!(decoded, Value::Text(text.into()));
1261    }
1262
1263    #[test]
1264    fn key_integer_edge_cases() {
1265        for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1266            let encoded = encode_key_value(&Value::Integer(v));
1267            let (decoded, n) = decode_key_value(&encoded).unwrap();
1268            assert_eq!(n, encoded.len());
1269            assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1270        }
1271    }
1272
1273    #[test]
1274    fn decode_columns_single() {
1275        let values = vec![
1276            Value::Integer(42),
1277            Value::Text("hello".into()),
1278            Value::Boolean(true),
1279        ];
1280        let encoded = encode_row(&values);
1281        let cols = decode_columns(&encoded, &[1]).unwrap();
1282        assert_eq!(cols.len(), 1);
1283        assert_eq!(cols[0], Value::Text("hello".into()));
1284    }
1285
1286    #[test]
1287    fn decode_columns_multiple() {
1288        let values = vec![
1289            Value::Integer(1),
1290            Value::Real(2.5),
1291            Value::Text("skip".into()),
1292            Value::Boolean(false),
1293            Value::Blob(vec![0xAB]),
1294        ];
1295        let encoded = encode_row(&values);
1296        let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1297        assert_eq!(cols.len(), 3);
1298        assert_eq!(cols[0], Value::Integer(1));
1299        assert_eq!(cols[1], Value::Boolean(false));
1300        assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1301    }
1302
1303    #[test]
1304    fn decode_columns_with_nulls() {
1305        let values = vec![
1306            Value::Integer(10),
1307            Value::Null,
1308            Value::Text("after_null".into()),
1309            Value::Null,
1310            Value::Boolean(true),
1311        ];
1312        let encoded = encode_row(&values);
1313        let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1314        assert_eq!(cols.len(), 3);
1315        assert!(cols[0].is_null());
1316        assert_eq!(cols[1], Value::Text("after_null".into()));
1317        assert_eq!(cols[2], Value::Boolean(true));
1318    }
1319
1320    #[test]
1321    fn decode_columns_first_and_last() {
1322        let values = vec![
1323            Value::Text("first".into()),
1324            Value::Integer(99),
1325            Value::Boolean(false),
1326            Value::Real(3.125),
1327        ];
1328        let encoded = encode_row(&values);
1329        let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1330        assert_eq!(cols.len(), 2);
1331        assert_eq!(cols[0], Value::Text("first".into()));
1332        assert_eq!(cols[1], Value::Real(3.125));
1333    }
1334
1335    #[test]
1336    fn decode_columns_empty_targets() {
1337        let values = vec![Value::Integer(1)];
1338        let encoded = encode_row(&values);
1339        let cols = decode_columns(&encoded, &[]).unwrap();
1340        assert!(cols.is_empty());
1341    }
1342
1343    #[test]
1344    fn decode_columns_all_matches_full_decode() {
1345        let values = vec![
1346            Value::Integer(-100),
1347            Value::Real(3.15),
1348            Value::Text("hello world".into()),
1349            Value::Blob(vec![0xDE, 0xAD]),
1350            Value::Boolean(false),
1351            Value::Null,
1352        ];
1353        let encoded = encode_row(&values);
1354        let full = decode_row(&encoded).unwrap();
1355        let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1356        assert_eq!(full, selective);
1357    }
1358
1359    #[test]
1360    fn raw_column_integer() {
1361        let values = vec![Value::Integer(42), Value::Text("hello".into())];
1362        let encoded = encode_row(&values);
1363        let raw = decode_column_raw(&encoded, 0).unwrap();
1364        assert!(matches!(raw, RawColumn::Integer(42)));
1365        assert_eq!(raw.to_value(), Value::Integer(42));
1366    }
1367
1368    #[test]
1369    fn raw_column_text_borrows() {
1370        let values = vec![Value::Integer(1), Value::Text("hello".into())];
1371        let encoded = encode_row(&values);
1372        let raw = decode_column_raw(&encoded, 1).unwrap();
1373        match raw {
1374            RawColumn::Text(s) => assert_eq!(s, "hello"),
1375            other => panic!("expected Text, got {other:?}"),
1376        }
1377    }
1378
1379    #[test]
1380    fn raw_column_null() {
1381        let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1382        let encoded = encode_row(&values);
1383        let raw = decode_column_raw(&encoded, 1).unwrap();
1384        assert!(matches!(raw, RawColumn::Null));
1385    }
1386
1387    #[test]
1388    fn raw_column_last() {
1389        let values = vec![
1390            Value::Integer(1),
1391            Value::Text("skip".into()),
1392            Value::Real(3.15),
1393        ];
1394        let encoded = encode_row(&values);
1395        let raw = decode_column_raw(&encoded, 2).unwrap();
1396        match raw {
1397            RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1398            other => panic!("expected Real, got {other:?}"),
1399        }
1400    }
1401
1402    #[test]
1403    fn raw_column_out_of_bounds_returns_null() {
1404        let values = vec![Value::Integer(1)];
1405        let encoded = encode_row(&values);
1406        assert!(matches!(
1407            decode_column_raw(&encoded, 1).unwrap(),
1408            RawColumn::Null
1409        ));
1410    }
1411
1412    #[test]
1413    fn raw_column_eq_value() {
1414        let raw_int = RawColumn::Integer(42);
1415        assert!(raw_int.eq_value(&Value::Integer(42)));
1416        assert!(!raw_int.eq_value(&Value::Integer(43)));
1417        assert!(raw_int.eq_value(&Value::Real(42.0)));
1418
1419        let raw_text = RawColumn::Text("hello");
1420        assert!(raw_text.eq_value(&Value::Text("hello".into())));
1421        assert!(!raw_text.eq_value(&Value::Text("world".into())));
1422    }
1423
1424    #[test]
1425    fn raw_column_cmp_value() {
1426        use std::cmp::Ordering;
1427        let raw = RawColumn::Integer(42);
1428        assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1429        assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1430        assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1431        assert_eq!(raw.cmp_value(&Value::Null), None);
1432    }
1433
1434    #[test]
1435    fn raw_column_as_numeric() {
1436        assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1437        assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1438        assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1439        assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1440        assert_eq!(RawColumn::Text("x").as_f64(), None);
1441        assert_eq!(RawColumn::Null.as_i64(), None);
1442    }
1443
1444    #[test]
1445    fn decode_pk_integer_roundtrip() {
1446        for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1447            let encoded = encode_key_value(&Value::Integer(v));
1448            let decoded = decode_pk_integer(&encoded).unwrap();
1449            assert_eq!(decoded, v);
1450        }
1451    }
1452
1453    #[test]
1454    fn decode_pk_integer_rejects_non_integer() {
1455        let encoded = encode_key_value(&Value::Text("hello".into()));
1456        assert!(decode_pk_integer(&encoded).is_err());
1457    }
1458
1459    #[test]
1460    fn raw_column_blob() {
1461        let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1462        let encoded = encode_row(&values);
1463        let raw = decode_column_raw(&encoded, 0).unwrap();
1464        match raw {
1465            RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1466            other => panic!("expected Blob, got {other:?}"),
1467        }
1468    }
1469
1470    #[test]
1471    fn raw_column_matches_full_decode() {
1472        let values = vec![
1473            Value::Integer(-100),
1474            Value::Real(3.15),
1475            Value::Text("hello world".into()),
1476            Value::Blob(vec![0xDE, 0xAD]),
1477            Value::Boolean(false),
1478            Value::Null,
1479        ];
1480        let encoded = encode_row(&values);
1481        let full = decode_row(&encoded).unwrap();
1482        for (i, expected) in full.iter().enumerate() {
1483            let raw = decode_column_raw(&encoded, i).unwrap();
1484            assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1485        }
1486    }
1487}