Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18/// Encode a single value into an order-preserving byte sequence.
19pub fn encode_key_value(value: &Value) -> Vec<u8> {
20    let mut buf = Vec::with_capacity(16);
21    encode_key_value_into(value, &mut buf);
22    buf
23}
24
25/// Encode a composite key (multiple values concatenated).
26pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27    let mut buf = Vec::new();
28    for v in values {
29        buf.extend_from_slice(&encode_key_value(v));
30    }
31    buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35    buf.clear();
36    for v in values {
37        encode_key_value_into(v, buf);
38    }
39}
40
41fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
42    match value {
43        Value::Null => buf.push(TAG_NULL),
44        Value::Boolean(b) => {
45            buf.push(TAG_BOOLEAN);
46            buf.push(if *b { 0x01 } else { 0x00 });
47        }
48        Value::Integer(i) => encode_integer_into(*i, buf),
49        Value::Real(r) => encode_real_into(*r, buf),
50        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
51        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
52        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
53        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
54        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
55        Value::Interval {
56            months,
57            days,
58            micros,
59        } => {
60            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
61            buf.push(TAG_INTERVAL);
62            let mut mb = months.to_be_bytes();
63            mb[0] ^= 0x80;
64            buf.extend_from_slice(&mb);
65            let mut db = days.to_be_bytes();
66            db[0] ^= 0x80;
67            buf.extend_from_slice(&db);
68            let mut ub = micros.to_be_bytes();
69            ub[0] ^= 0x80;
70            buf.extend_from_slice(&ub);
71        }
72    }
73}
74
75fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
76    encode_signed_varint(TAG_INTEGER, val, buf);
77}
78
79/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
80/// Layout: [tag] [marker] [data bytes].
81/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
82/// 0x80-n for negative (n one's-complemented bytes follow).
83/// Byte-wise lex compare matches signed integer order.
84pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
85    buf.push(tag);
86    if val == 0 {
87        buf.push(0x80);
88        return;
89    }
90    if val > 0 {
91        let bytes = val.to_be_bytes();
92        let start = bytes.iter().position(|&b| b != 0).unwrap();
93        let byte_count = (8 - start) as u8;
94        buf.push(0x80 + byte_count);
95        buf.extend_from_slice(&bytes[start..]);
96    } else {
97        let abs_val = if val == i64::MIN {
98            u64::MAX / 2 + 1
99        } else {
100            (-val) as u64
101        };
102        let bytes = abs_val.to_be_bytes();
103        let start = bytes.iter().position(|&b| b != 0).unwrap();
104        let byte_count = (8 - start) as u8;
105        buf.push(0x80 - byte_count);
106        for &b in &bytes[start..] {
107            buf.push(!b);
108        }
109    }
110}
111
112fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
113    buf.push(TAG_REAL);
114    let bits = val.to_bits();
115    let encoded = if val.is_sign_negative() {
116        !bits
117    } else {
118        bits ^ (1u64 << 63)
119    };
120    buf.extend_from_slice(&encoded.to_be_bytes());
121}
122
123fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
124    buf.push(tag);
125    for &b in data {
126        if b == 0x00 {
127            buf.push(0x00);
128            buf.push(0xFF);
129        } else {
130            buf.push(b);
131        }
132    }
133    buf.push(0x00);
134}
135
136/// Decode a single key value, returning the value and the number of bytes consumed.
137pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
138    if data.is_empty() {
139        return Err(SqlError::InvalidValue("empty key data".into()));
140    }
141    match data[0] {
142        TAG_NULL => Ok((Value::Null, 1)),
143        TAG_BOOLEAN => {
144            if data.len() < 2 {
145                return Err(SqlError::InvalidValue("truncated boolean".into()));
146            }
147            Ok((Value::Boolean(data[1] != 0), 2))
148        }
149        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
150        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
151        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
152        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
153            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
154            (Value::Date(d), n + 1)
155        }),
156        TAG_TIMESTAMP => {
157            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
158        }
159        TAG_INTERVAL => {
160            if data.len() < 1 + 16 {
161                return Err(SqlError::InvalidValue("truncated interval".into()));
162            }
163            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
164            mb[0] ^= 0x80;
165            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
166            db[0] ^= 0x80;
167            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
168            ub[0] ^= 0x80;
169            Ok((
170                Value::Interval {
171                    months: i32::from_be_bytes(mb),
172                    days: i32::from_be_bytes(db),
173                    micros: i64::from_be_bytes(ub),
174                },
175                17,
176            ))
177        }
178        TAG_TEXT => {
179            let (bytes, n) = decode_null_escaped(&data[1..])?;
180            let s = String::from_utf8(bytes)
181                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
182            Ok((Value::Text(CompactString::from(s)), n + 1))
183        }
184        TAG_BLOB => {
185            let (bytes, n) = decode_null_escaped(&data[1..])?;
186            Ok((Value::Blob(bytes), n + 1))
187        }
188        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
189    }
190}
191
192/// Decode a composite key into multiple values.
193pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
194    let mut values = Vec::with_capacity(count);
195    let mut pos = 0;
196    for _ in 0..count {
197        let (v, n) = decode_key_value(&data[pos..])?;
198        values.push(v);
199        pos += n;
200    }
201    Ok(values)
202}
203
204fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
205    let (v, n) = decode_signed_varint(data)?;
206    Ok((Value::Integer(v), n))
207}
208
209/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
210pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
211    if data.is_empty() {
212        return Err(SqlError::InvalidValue("truncated integer".into()));
213    }
214    let marker = data[0];
215    if marker == 0x80 {
216        return Ok((0, 1));
217    }
218    if marker > 0x80 {
219        let byte_count = (marker - 0x80) as usize;
220        if data.len() < 1 + byte_count {
221            return Err(SqlError::InvalidValue("truncated positive integer".into()));
222        }
223        let mut bytes = [0u8; 8];
224        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
225        let val = i64::from_be_bytes(bytes);
226        Ok((val, 1 + byte_count))
227    } else {
228        let byte_count = (0x80 - marker) as usize;
229        if data.len() < 1 + byte_count {
230            return Err(SqlError::InvalidValue("truncated negative integer".into()));
231        }
232        let mut bytes = [0u8; 8];
233        for i in 0..byte_count {
234            bytes[8 - byte_count + i] = !data[1 + i];
235        }
236        let abs_val = u64::from_be_bytes(bytes);
237        let val = (-(abs_val as i128)) as i64;
238        Ok((val, 1 + byte_count))
239    }
240}
241
242fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
243    if data.len() < 8 {
244        return Err(SqlError::InvalidValue("truncated real".into()));
245    }
246    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
247    let bits = if encoded & (1u64 << 63) != 0 {
248        // Was positive: undo sign bit flip
249        encoded ^ (1u64 << 63)
250    } else {
251        // Was negative: undo full inversion
252        !encoded
253    };
254    let val = f64::from_bits(bits);
255    Ok((Value::Real(val), 8))
256}
257
258/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
259fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
260    let mut result = Vec::new();
261    let mut i = 0;
262    while i < data.len() {
263        if data[i] == 0x00 {
264            if i + 1 < data.len() && data[i + 1] == 0xFF {
265                result.push(0x00);
266                i += 2;
267            } else {
268                return Ok((result, i + 1)); // terminator consumed
269            }
270        } else {
271            result.push(data[i]);
272            i += 1;
273        }
274    }
275    Err(SqlError::InvalidValue(
276        "unterminated null-escaped string".into(),
277    ))
278}
279
280/// Encode non-PK columns. Format: [col_count:u16][null_bitmap][type(u8)+len(u32)+data per col]
281pub fn encode_row(values: &[Value]) -> Vec<u8> {
282    let col_count = values.len();
283    let bitmap_bytes = col_count.div_ceil(8);
284    let mut buf = Vec::new();
285
286    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
287    let mut bitmap = vec![0u8; bitmap_bytes];
288    for (i, v) in values.iter().enumerate() {
289        if v.is_null() {
290            bitmap[i / 8] |= 1 << (i % 8);
291        }
292    }
293    buf.extend_from_slice(&bitmap);
294
295    for v in values {
296        if v.is_null() {
297            continue;
298        }
299        match v {
300            Value::Integer(i) => {
301                buf.push(DataType::Integer.type_tag());
302                buf.extend_from_slice(&8u32.to_le_bytes());
303                buf.extend_from_slice(&i.to_le_bytes());
304            }
305            Value::Real(r) => {
306                buf.push(DataType::Real.type_tag());
307                buf.extend_from_slice(&8u32.to_le_bytes());
308                buf.extend_from_slice(&r.to_le_bytes());
309            }
310            Value::Boolean(b) => {
311                buf.push(DataType::Boolean.type_tag());
312                buf.extend_from_slice(&1u32.to_le_bytes());
313                buf.push(if *b { 1 } else { 0 });
314            }
315            Value::Text(s) => {
316                let bytes = s.as_bytes();
317                buf.push(DataType::Text.type_tag());
318                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
319                buf.extend_from_slice(bytes);
320            }
321            Value::Blob(data) => {
322                buf.push(DataType::Blob.type_tag());
323                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
324                buf.extend_from_slice(data);
325            }
326            Value::Time(t) => {
327                buf.push(DataType::Time.type_tag());
328                buf.extend_from_slice(&8u32.to_le_bytes());
329                buf.extend_from_slice(&t.to_le_bytes());
330            }
331            Value::Date(d) => {
332                buf.push(DataType::Date.type_tag());
333                buf.extend_from_slice(&4u32.to_le_bytes());
334                buf.extend_from_slice(&d.to_le_bytes());
335            }
336            Value::Timestamp(t) => {
337                buf.push(DataType::Timestamp.type_tag());
338                buf.extend_from_slice(&8u32.to_le_bytes());
339                buf.extend_from_slice(&t.to_le_bytes());
340            }
341            Value::Interval {
342                months,
343                days,
344                micros,
345            } => {
346                buf.push(DataType::Interval.type_tag());
347                buf.extend_from_slice(&16u32.to_le_bytes());
348                buf.extend_from_slice(&months.to_le_bytes());
349                buf.extend_from_slice(&days.to_le_bytes());
350                buf.extend_from_slice(&micros.to_le_bytes());
351            }
352            Value::Null => unreachable!(),
353        }
354    }
355
356    buf
357}
358
359pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
360    buf.clear();
361    let col_count = values.len();
362    let bitmap_bytes = col_count.div_ceil(8);
363
364    buf.extend_from_slice(&(col_count as u16).to_le_bytes());
365
366    let bitmap_start = buf.len();
367    buf.resize(buf.len() + bitmap_bytes, 0);
368
369    for (i, v) in values.iter().enumerate() {
370        if v.is_null() {
371            buf[bitmap_start + i / 8] |= 1 << (i % 8);
372            continue;
373        }
374        match v {
375            Value::Integer(val) => {
376                buf.push(DataType::Integer.type_tag());
377                buf.extend_from_slice(&8u32.to_le_bytes());
378                buf.extend_from_slice(&val.to_le_bytes());
379            }
380            Value::Real(r) => {
381                buf.push(DataType::Real.type_tag());
382                buf.extend_from_slice(&8u32.to_le_bytes());
383                buf.extend_from_slice(&r.to_le_bytes());
384            }
385            Value::Boolean(b) => {
386                buf.push(DataType::Boolean.type_tag());
387                buf.extend_from_slice(&1u32.to_le_bytes());
388                buf.push(if *b { 1 } else { 0 });
389            }
390            Value::Text(s) => {
391                let bytes = s.as_bytes();
392                buf.push(DataType::Text.type_tag());
393                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
394                buf.extend_from_slice(bytes);
395            }
396            Value::Blob(data) => {
397                buf.push(DataType::Blob.type_tag());
398                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
399                buf.extend_from_slice(data);
400            }
401            Value::Time(t) => {
402                buf.push(DataType::Time.type_tag());
403                buf.extend_from_slice(&8u32.to_le_bytes());
404                buf.extend_from_slice(&t.to_le_bytes());
405            }
406            Value::Date(d) => {
407                buf.push(DataType::Date.type_tag());
408                buf.extend_from_slice(&4u32.to_le_bytes());
409                buf.extend_from_slice(&d.to_le_bytes());
410            }
411            Value::Timestamp(t) => {
412                buf.push(DataType::Timestamp.type_tag());
413                buf.extend_from_slice(&8u32.to_le_bytes());
414                buf.extend_from_slice(&t.to_le_bytes());
415            }
416            Value::Interval {
417                months,
418                days,
419                micros,
420            } => {
421                buf.push(DataType::Interval.type_tag());
422                buf.extend_from_slice(&16u32.to_le_bytes());
423                buf.extend_from_slice(&months.to_le_bytes());
424                buf.extend_from_slice(&days.to_le_bytes());
425                buf.extend_from_slice(&micros.to_le_bytes());
426            }
427            Value::Null => unreachable!(),
428        }
429    }
430}
431
432fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
433    match DataType::from_tag(type_tag) {
434        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
435            data[..8].try_into().unwrap(),
436        ))),
437        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
438            data[..8].try_into().unwrap(),
439        ))),
440        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
441        Some(DataType::Text) => {
442            let s = std::str::from_utf8(data)
443                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
444            Ok(Value::Text(CompactString::from(s)))
445        }
446        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
447        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
448            data[..8].try_into().unwrap(),
449        ))),
450        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
451            data[..4].try_into().unwrap(),
452        ))),
453        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
454            data[..8].try_into().unwrap(),
455        ))),
456        Some(DataType::Interval) => {
457            if data.len() < 16 {
458                return Err(SqlError::InvalidValue("truncated interval".into()));
459            }
460            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
461            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
462            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
463            Ok(Value::Interval {
464                months,
465                days,
466                micros,
467            })
468        }
469        _ => Err(SqlError::InvalidValue(format!(
470            "unknown column type tag: {type_tag}"
471        ))),
472    }
473}
474
475fn parse_row_header(data: &[u8]) -> Result<(usize, &[u8], usize)> {
476    if data.len() < 2 {
477        return Err(SqlError::InvalidValue("row data too short".into()));
478    }
479    let col_count = u16::from_le_bytes([data[0], data[1]]) as usize;
480    let bitmap_bytes = col_count.div_ceil(8);
481    let pos = 2;
482    if data.len() < pos + bitmap_bytes {
483        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
484    }
485    Ok((
486        col_count,
487        &data[pos..pos + bitmap_bytes],
488        pos + bitmap_bytes,
489    ))
490}
491
492pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
493    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
494
495    let mut values = Vec::with_capacity(col_count);
496    for i in 0..col_count {
497        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
498            values.push(Value::Null);
499            continue;
500        }
501
502        if pos + 5 > data.len() {
503            return Err(SqlError::InvalidValue("truncated column data".into()));
504        }
505        let type_tag = data[pos];
506        pos += 1;
507        let data_len =
508            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
509        pos += 4;
510
511        if pos + data_len > data.len() {
512            return Err(SqlError::InvalidValue("truncated column value".into()));
513        }
514
515        values.push(decode_value(type_tag, &data[pos..pos + data_len])?);
516        pos += data_len;
517    }
518
519    Ok(values)
520}
521
522/// Returns the number of non-PK columns stored in a row value blob.
523#[inline]
524pub fn row_non_pk_count(data: &[u8]) -> usize {
525    u16::from_le_bytes([data[0], data[1]]) as usize
526}
527
528pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
529    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
530
531    for i in 0..col_count {
532        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
533            continue;
534        }
535
536        if pos + 5 > data.len() {
537            return Err(SqlError::InvalidValue("truncated column data".into()));
538        }
539        let type_tag = data[pos];
540        pos += 1;
541        let data_len =
542            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
543        pos += 4;
544
545        if pos + data_len > data.len() {
546            return Err(SqlError::InvalidValue("truncated column value".into()));
547        }
548
549        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
550            out[col_mapping[i]] = decode_value(type_tag, &data[pos..pos + data_len])?;
551        }
552        pos += data_len;
553    }
554
555    Ok(())
556}
557
558pub fn decode_pk_into(
559    key: &[u8],
560    count: usize,
561    out: &mut [Value],
562    pk_mapping: &[usize],
563) -> Result<()> {
564    let mut pos = 0;
565    for i in 0..count {
566        let (v, n) = decode_key_value(&key[pos..])?;
567        if i < pk_mapping.len() {
568            out[pk_mapping[i]] = v;
569        }
570        pos += n;
571    }
572    Ok(())
573}
574
575pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
576    if targets.is_empty() {
577        return Ok(Vec::new());
578    }
579    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
580
581    let mut results = Vec::with_capacity(targets.len());
582    let mut ti = 0;
583
584    for col in 0..col_count {
585        if ti >= targets.len() {
586            break;
587        }
588        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
589
590        if col == targets[ti] {
591            if is_null {
592                results.push(Value::Null);
593            } else {
594                if pos + 5 > data.len() {
595                    return Err(SqlError::InvalidValue("truncated column data".into()));
596                }
597                let type_tag = data[pos];
598                pos += 1;
599                let data_len =
600                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
601                        as usize;
602                pos += 4;
603                if pos + data_len > data.len() {
604                    return Err(SqlError::InvalidValue("truncated column value".into()));
605                }
606                results.push(decode_value(type_tag, &data[pos..pos + data_len])?);
607                pos += data_len;
608            }
609            ti += 1;
610        } else if !is_null {
611            if pos + 5 > data.len() {
612                return Err(SqlError::InvalidValue("truncated column data".into()));
613            }
614            let data_len =
615                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
616                    as usize;
617            pos += 5 + data_len;
618        }
619    }
620
621    // Targets beyond stored column count get Null
622    while ti < targets.len() {
623        results.push(Value::Null);
624        ti += 1;
625    }
626
627    Ok(results)
628}
629
630pub fn decode_columns_into(
631    data: &[u8],
632    targets: &[usize],
633    schema_cols: &[usize],
634    row: &mut [Value],
635) -> Result<()> {
636    if targets.is_empty() {
637        return Ok(());
638    }
639    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
640
641    let mut ti = 0;
642    for col in 0..col_count {
643        if ti >= targets.len() {
644            break;
645        }
646        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
647
648        if col == targets[ti] {
649            if is_null {
650                row[schema_cols[ti]] = Value::Null;
651            } else {
652                if pos + 5 > data.len() {
653                    return Err(SqlError::InvalidValue("truncated column data".into()));
654                }
655                let type_tag = data[pos];
656                pos += 1;
657                let data_len =
658                    u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
659                        as usize;
660                pos += 4;
661                if pos + data_len > data.len() {
662                    return Err(SqlError::InvalidValue("truncated column value".into()));
663                }
664                row[schema_cols[ti]] = decode_value(type_tag, &data[pos..pos + data_len])?;
665                pos += data_len;
666            }
667            ti += 1;
668        } else if !is_null {
669            if pos + 5 > data.len() {
670                return Err(SqlError::InvalidValue("truncated column data".into()));
671            }
672            let data_len =
673                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
674                    as usize;
675            pos += 5 + data_len;
676        }
677    }
678
679    Ok(())
680}
681
682#[derive(Debug, Clone, Copy)]
683pub enum RawColumn<'a> {
684    Null,
685    Integer(i64),
686    Real(f64),
687    Boolean(bool),
688    Text(&'a str),
689    Blob(&'a [u8]),
690    Time(i64),
691    Date(i32),
692    Timestamp(i64),
693    Interval { months: i32, days: i32, micros: i64 },
694}
695
696impl<'a> RawColumn<'a> {
697    pub fn to_value(self) -> Value {
698        match self {
699            RawColumn::Null => Value::Null,
700            RawColumn::Integer(i) => Value::Integer(i),
701            RawColumn::Real(r) => Value::Real(r),
702            RawColumn::Boolean(b) => Value::Boolean(b),
703            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
704            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
705            RawColumn::Time(t) => Value::Time(t),
706            RawColumn::Date(d) => Value::Date(d),
707            RawColumn::Timestamp(t) => Value::Timestamp(t),
708            RawColumn::Interval {
709                months,
710                days,
711                micros,
712            } => Value::Interval {
713                months,
714                days,
715                micros,
716            },
717        }
718    }
719
720    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
721        use std::cmp::Ordering;
722        match (self, other) {
723            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
724            (RawColumn::Null, _) | (_, Value::Null) => None,
725            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
726            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
727            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
728            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
729            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
730            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
731            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
732            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
733            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
734            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
735            (
736                RawColumn::Interval {
737                    months: am,
738                    days: ad,
739                    micros: au,
740                },
741                Value::Interval {
742                    months: bm,
743                    days: bd,
744                    micros: bu,
745                },
746            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
747            _ => None,
748        }
749    }
750
751    pub fn eq_value(&self, other: &Value) -> bool {
752        match (self, other) {
753            (RawColumn::Null, Value::Null) => true,
754            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
755            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
756            (RawColumn::Real(a), Value::Real(b)) => a == b,
757            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
758            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
759            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
760            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
761            (RawColumn::Time(a), Value::Time(b)) => a == b,
762            (RawColumn::Date(a), Value::Date(b)) => a == b,
763            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
764            (
765                RawColumn::Interval {
766                    months: am,
767                    days: ad,
768                    micros: au,
769                },
770                Value::Interval {
771                    months: bm,
772                    days: bd,
773                    micros: bu,
774                },
775            ) => am == bm && ad == bd && au == bu,
776            _ => false,
777        }
778    }
779
780    pub fn as_f64(&self) -> Option<f64> {
781        match self {
782            RawColumn::Integer(i) => Some(*i as f64),
783            RawColumn::Real(r) => Some(*r),
784            _ => None,
785        }
786    }
787
788    pub fn as_i64(&self) -> Option<i64> {
789        match self {
790            RawColumn::Integer(i) => Some(*i),
791            RawColumn::Time(t) => Some(*t),
792            RawColumn::Date(d) => Some(*d as i64),
793            RawColumn::Timestamp(t) => Some(*t),
794            _ => None,
795        }
796    }
797}
798
799fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
800    match DataType::from_tag(type_tag) {
801        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
802            data[..8].try_into().unwrap(),
803        ))),
804        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
805            data[..8].try_into().unwrap(),
806        ))),
807        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
808        Some(DataType::Text) => {
809            let s = std::str::from_utf8(data)
810                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
811            Ok(RawColumn::Text(s))
812        }
813        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
814        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
815            data[..8].try_into().unwrap(),
816        ))),
817        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
818            data[..4].try_into().unwrap(),
819        ))),
820        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
821            data[..8].try_into().unwrap(),
822        ))),
823        Some(DataType::Interval) => {
824            if data.len() < 16 {
825                return Err(SqlError::InvalidValue("truncated interval".into()));
826            }
827            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
828            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
829            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
830            Ok(RawColumn::Interval {
831                months,
832                days,
833                micros,
834            })
835        }
836        _ => Err(SqlError::InvalidValue(format!(
837            "unknown column type tag: {type_tag}"
838        ))),
839    }
840}
841
842/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
843pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
844    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
845    if target >= col_count || new_val.is_null() {
846        return Ok(false);
847    }
848    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
849    if was_null {
850        return Ok(false);
851    }
852    for col in 0..target {
853        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
854        if !is_null {
855            if pos + 5 > data.len() {
856                return Err(SqlError::InvalidValue("truncated column data".into()));
857            }
858            let data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
859            pos += 5 + data_len;
860        }
861    }
862    if pos + 5 > data.len() {
863        return Err(SqlError::InvalidValue("truncated column data".into()));
864    }
865    let old_data_len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
866    let new_data_len = match new_val {
867        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
868        Value::Date(_) => 4,
869        Value::Interval { .. } => 16,
870        Value::Boolean(_) => 1,
871        Value::Text(s) => s.len(),
872        Value::Blob(b) => b.len(),
873        Value::Null => return Ok(false),
874    };
875    if new_data_len != old_data_len {
876        return Ok(false);
877    }
878    data[pos] = new_val.data_type().type_tag();
879    let val_start = pos + 5;
880    match new_val {
881        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
882        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
883        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
884        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
885        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
886        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
887        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
888        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
889        Value::Interval {
890            months,
891            days,
892            micros,
893        } => {
894            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
895            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
896            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
897        }
898        Value::Null => unreachable!(),
899    }
900    Ok(true)
901}
902
903/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
904pub fn patch_row_column(
905    data: &[u8],
906    target: usize,
907    new_val: &Value,
908    out: &mut Vec<u8>,
909) -> Result<()> {
910    let (col_count, bitmap, header_end) = parse_row_header(data)?;
911
912    // Expand row if target is beyond stored col count (ALTER TABLE ADD COLUMN)
913    let new_col_count = if target >= col_count {
914        target + 1
915    } else {
916        col_count
917    };
918    let new_bitmap_bytes = new_col_count.div_ceil(8);
919    let bitmap_bytes = col_count.div_ceil(8);
920    out.clear();
921
922    out.extend_from_slice(&(new_col_count as u16).to_le_bytes());
923    let bitmap_start = out.len();
924    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
925    for _ in bitmap_bytes..new_bitmap_bytes {
926        out.push(0xFF); // new columns default to NULL
927    }
928    if new_val.is_null() {
929        out[bitmap_start + target / 8] |= 1 << (target % 8);
930    } else {
931        out[bitmap_start + target / 8] &= !(1 << (target % 8));
932    }
933
934    let mut pos = header_end;
935    for col in 0..new_col_count {
936        let was_null = if col < col_count {
937            bitmap[col / 8] & (1 << (col % 8)) != 0
938        } else {
939            true // columns beyond old col_count are NULL on disk
940        };
941
942        if col == target {
943            if !was_null && pos + 5 <= data.len() {
944                let data_len = u32::from_le_bytes([
945                    data[pos + 1],
946                    data[pos + 2],
947                    data[pos + 3],
948                    data[pos + 4],
949                ]) as usize;
950                pos += 5 + data_len;
951            }
952            if !new_val.is_null() {
953                match new_val {
954                    Value::Integer(v) => {
955                        out.push(DataType::Integer.type_tag());
956                        out.extend_from_slice(&8u32.to_le_bytes());
957                        out.extend_from_slice(&v.to_le_bytes());
958                    }
959                    Value::Real(r) => {
960                        out.push(DataType::Real.type_tag());
961                        out.extend_from_slice(&8u32.to_le_bytes());
962                        out.extend_from_slice(&r.to_le_bytes());
963                    }
964                    Value::Boolean(b) => {
965                        out.push(DataType::Boolean.type_tag());
966                        out.extend_from_slice(&1u32.to_le_bytes());
967                        out.push(if *b { 1 } else { 0 });
968                    }
969                    Value::Text(s) => {
970                        let bytes = s.as_bytes();
971                        out.push(DataType::Text.type_tag());
972                        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
973                        out.extend_from_slice(bytes);
974                    }
975                    Value::Blob(d) => {
976                        out.push(DataType::Blob.type_tag());
977                        out.extend_from_slice(&(d.len() as u32).to_le_bytes());
978                        out.extend_from_slice(d);
979                    }
980                    Value::Time(t) => {
981                        out.push(DataType::Time.type_tag());
982                        out.extend_from_slice(&8u32.to_le_bytes());
983                        out.extend_from_slice(&t.to_le_bytes());
984                    }
985                    Value::Date(d) => {
986                        out.push(DataType::Date.type_tag());
987                        out.extend_from_slice(&4u32.to_le_bytes());
988                        out.extend_from_slice(&d.to_le_bytes());
989                    }
990                    Value::Timestamp(t) => {
991                        out.push(DataType::Timestamp.type_tag());
992                        out.extend_from_slice(&8u32.to_le_bytes());
993                        out.extend_from_slice(&t.to_le_bytes());
994                    }
995                    Value::Interval {
996                        months,
997                        days,
998                        micros,
999                    } => {
1000                        out.push(DataType::Interval.type_tag());
1001                        out.extend_from_slice(&16u32.to_le_bytes());
1002                        out.extend_from_slice(&months.to_le_bytes());
1003                        out.extend_from_slice(&days.to_le_bytes());
1004                        out.extend_from_slice(&micros.to_le_bytes());
1005                    }
1006                    Value::Null => unreachable!(),
1007                }
1008            }
1009        } else if was_null {
1010        } else {
1011            if pos + 5 > data.len() {
1012                return Err(SqlError::InvalidValue("truncated column data".into()));
1013            }
1014            let data_len =
1015                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1016                    as usize;
1017            let end = pos + 5 + data_len;
1018            if end > data.len() {
1019                return Err(SqlError::InvalidValue("truncated column value".into()));
1020            }
1021            out.extend_from_slice(&data[pos..end]);
1022            pos = end;
1023        }
1024    }
1025    Ok(())
1026}
1027
1028pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1029    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
1030    if target >= col_count {
1031        return Ok(RawColumn::Null);
1032    }
1033
1034    for col in 0..=target {
1035        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1036
1037        if col == target {
1038            if is_null {
1039                return Ok(RawColumn::Null);
1040            }
1041            if pos + 5 > data.len() {
1042                return Err(SqlError::InvalidValue("truncated column data".into()));
1043            }
1044            let type_tag = data[pos];
1045            pos += 1;
1046            let data_len =
1047                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
1048                    as usize;
1049            pos += 4;
1050            if pos + data_len > data.len() {
1051                return Err(SqlError::InvalidValue("truncated column value".into()));
1052            }
1053            return decode_value_raw(type_tag, &data[pos..pos + data_len]);
1054        } else if !is_null {
1055            if pos + 5 > data.len() {
1056                return Err(SqlError::InvalidValue("truncated column data".into()));
1057            }
1058            let data_len =
1059                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1060                    as usize;
1061            pos += 5 + data_len;
1062        }
1063    }
1064
1065    unreachable!()
1066}
1067
1068/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1069pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1070    let (col_count, bitmap, mut pos) = parse_row_header(data)?;
1071    if target >= col_count {
1072        return Ok((RawColumn::Null, usize::MAX));
1073    }
1074
1075    for col in 0..=target {
1076        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1077
1078        if col == target {
1079            if is_null {
1080                return Ok((RawColumn::Null, usize::MAX));
1081            }
1082            if pos + 5 > data.len() {
1083                return Err(SqlError::InvalidValue("truncated column data".into()));
1084            }
1085            let tag_offset = pos;
1086            let type_tag = data[pos];
1087            pos += 1;
1088            let data_len =
1089                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
1090                    as usize;
1091            pos += 4;
1092            if pos + data_len > data.len() {
1093                return Err(SqlError::InvalidValue("truncated column value".into()));
1094            }
1095            let raw = decode_value_raw(type_tag, &data[pos..pos + data_len])?;
1096            return Ok((raw, tag_offset));
1097        } else if !is_null {
1098            if pos + 5 > data.len() {
1099                return Err(SqlError::InvalidValue("truncated column data".into()));
1100            }
1101            let data_len =
1102                u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], data[pos + 4]])
1103                    as usize;
1104            pos += 5 + data_len;
1105        }
1106    }
1107
1108    unreachable!()
1109}
1110
1111/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1112pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1113    if offset == usize::MAX || new_val.is_null() {
1114        return Ok(false);
1115    }
1116    if offset + 5 > data.len() {
1117        return Err(SqlError::InvalidValue("truncated column data".into()));
1118    }
1119    let old_data_len =
1120        u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1121    let new_data_len = match new_val {
1122        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1123        Value::Date(_) => 4,
1124        Value::Interval { .. } => 16,
1125        Value::Boolean(_) => 1,
1126        Value::Text(s) => s.len(),
1127        Value::Blob(b) => b.len(),
1128        Value::Null => return Ok(false),
1129    };
1130    if new_data_len != old_data_len {
1131        return Ok(false);
1132    }
1133    data[offset] = new_val.data_type().type_tag();
1134    let val_start = offset + 5;
1135    match new_val {
1136        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1137        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1138        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1139        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1140        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1141        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1142        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1143        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1144        Value::Interval {
1145            months,
1146            days,
1147            micros,
1148        } => {
1149            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1150            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1151            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1152        }
1153        Value::Null => unreachable!(),
1154    }
1155    Ok(true)
1156}
1157
1158pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1159    if key.is_empty() || key[0] != TAG_INTEGER {
1160        return Err(SqlError::InvalidValue("not an integer key".into()));
1161    }
1162    let (val, _) = decode_integer(&key[1..])?;
1163    match val {
1164        Value::Integer(i) => Ok(i),
1165        _ => unreachable!(),
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use super::*;
1172
1173    #[test]
1174    fn key_null() {
1175        let encoded = encode_key_value(&Value::Null);
1176        let (decoded, n) = decode_key_value(&encoded).unwrap();
1177        assert_eq!(n, 1);
1178        assert_eq!(decoded, Value::Null);
1179    }
1180
1181    #[test]
1182    fn key_boolean() {
1183        let f_enc = encode_key_value(&Value::Boolean(false));
1184        let t_enc = encode_key_value(&Value::Boolean(true));
1185        assert!(f_enc < t_enc);
1186
1187        let (f_dec, _) = decode_key_value(&f_enc).unwrap();
1188        let (t_dec, _) = decode_key_value(&t_enc).unwrap();
1189        assert_eq!(f_dec, Value::Boolean(false));
1190        assert_eq!(t_dec, Value::Boolean(true));
1191    }
1192
1193    #[test]
1194    fn key_integer_roundtrip() {
1195        let test_values = [
1196            i64::MIN,
1197            -1_000_000,
1198            -256,
1199            -1,
1200            0,
1201            1,
1202            127,
1203            128,
1204            255,
1205            256,
1206            65535,
1207            1_000_000,
1208            i64::MAX,
1209        ];
1210        for &v in &test_values {
1211            let encoded = encode_key_value(&Value::Integer(v));
1212            let (decoded, _) = decode_key_value(&encoded).unwrap();
1213            assert_eq!(decoded, Value::Integer(v), "roundtrip failed for {v}");
1214        }
1215    }
1216
1217    #[test]
1218    fn key_integer_sort_order() {
1219        let values: Vec<i64> = vec![i64::MIN, -1_000_000, -1, 0, 1, 1_000_000, i64::MAX];
1220        let encoded: Vec<Vec<u8>> = values
1221            .iter()
1222            .map(|&v| encode_key_value(&Value::Integer(v)))
1223            .collect();
1224
1225        for i in 0..encoded.len() - 1 {
1226            assert!(
1227                encoded[i] < encoded[i + 1],
1228                "sort order broken: {} vs {}",
1229                values[i],
1230                values[i + 1]
1231            );
1232        }
1233    }
1234
1235    #[test]
1236    fn key_real_roundtrip() {
1237        let test_values = [
1238            f64::NEG_INFINITY,
1239            -1e100,
1240            -1.0,
1241            -f64::MIN_POSITIVE,
1242            -0.0,
1243            0.0,
1244            f64::MIN_POSITIVE,
1245            0.5,
1246            1.0,
1247            1e100,
1248            f64::INFINITY,
1249        ];
1250        for &v in &test_values {
1251            let encoded = encode_key_value(&Value::Real(v));
1252            let (decoded, _) = decode_key_value(&encoded).unwrap();
1253            match decoded {
1254                Value::Real(r) => {
1255                    assert!(
1256                        v.to_bits() == r.to_bits(),
1257                        "roundtrip failed for {v}: got {r}"
1258                    );
1259                }
1260                _ => panic!("expected Real"),
1261            }
1262        }
1263    }
1264
1265    #[test]
1266    fn key_real_sort_order() {
1267        let values = [
1268            f64::NEG_INFINITY,
1269            -100.0,
1270            -1.0,
1271            -0.0,
1272            0.0,
1273            1.0,
1274            100.0,
1275            f64::INFINITY,
1276        ];
1277        let encoded: Vec<Vec<u8>> = values
1278            .iter()
1279            .map(|&v| encode_key_value(&Value::Real(v)))
1280            .collect();
1281
1282        for i in 0..encoded.len() - 1 {
1283            assert!(
1284                encoded[i] <= encoded[i + 1],
1285                "sort order broken: {} vs {}",
1286                values[i],
1287                values[i + 1]
1288            );
1289        }
1290    }
1291
1292    #[test]
1293    fn key_text_roundtrip() {
1294        let test_values = ["", "hello", "world", "hello\0world", "\0\0\0"];
1295        for &v in &test_values {
1296            let encoded = encode_key_value(&Value::Text(v.into()));
1297            let (decoded, _) = decode_key_value(&encoded).unwrap();
1298            assert_eq!(decoded, Value::Text(v.into()), "roundtrip failed for {v:?}");
1299        }
1300    }
1301
1302    #[test]
1303    fn key_text_sort_order() {
1304        let values = ["", "a", "ab", "b", "ba", "z"];
1305        let encoded: Vec<Vec<u8>> = values
1306            .iter()
1307            .map(|&v| encode_key_value(&Value::Text(v.into())))
1308            .collect();
1309
1310        for i in 0..encoded.len() - 1 {
1311            assert!(
1312                encoded[i] < encoded[i + 1],
1313                "sort order broken: {:?} vs {:?}",
1314                values[i],
1315                values[i + 1]
1316            );
1317        }
1318    }
1319
1320    #[test]
1321    fn key_blob_roundtrip() {
1322        let test_values: Vec<Vec<u8>> = vec![
1323            vec![],
1324            vec![0x00],
1325            vec![0x00, 0xFF],
1326            vec![0xFF, 0x00],
1327            vec![0x00, 0x00, 0x00],
1328        ];
1329        for v in &test_values {
1330            let encoded = encode_key_value(&Value::Blob(v.clone()));
1331            let (decoded, _) = decode_key_value(&encoded).unwrap();
1332            assert_eq!(decoded, Value::Blob(v.clone()));
1333        }
1334    }
1335
1336    #[test]
1337    fn key_composite_roundtrip() {
1338        let values = vec![
1339            Value::Integer(42),
1340            Value::Text("hello".into()),
1341            Value::Boolean(true),
1342        ];
1343        let encoded = encode_composite_key(&values);
1344        let decoded = decode_composite_key(&encoded, 3).unwrap();
1345        assert_eq!(decoded[0], Value::Integer(42));
1346        assert_eq!(decoded[1], Value::Text("hello".into()));
1347        assert_eq!(decoded[2], Value::Boolean(true));
1348    }
1349
1350    #[test]
1351    fn key_composite_sort_order() {
1352        // Composite keys: (1, "b") < (1, "c") < (2, "a")
1353        let k1 = encode_composite_key(&[Value::Integer(1), Value::Text("b".into())]);
1354        let k2 = encode_composite_key(&[Value::Integer(1), Value::Text("c".into())]);
1355        let k3 = encode_composite_key(&[Value::Integer(2), Value::Text("a".into())]);
1356        assert!(k1 < k2);
1357        assert!(k2 < k3);
1358    }
1359
1360    #[test]
1361    fn key_cross_type_ordering() {
1362        let null = encode_key_value(&Value::Null);
1363        let bool_val = encode_key_value(&Value::Boolean(false));
1364        let int = encode_key_value(&Value::Integer(0));
1365        let text = encode_key_value(&Value::Text("".into()));
1366        let blob = encode_key_value(&Value::Blob(vec![]));
1367
1368        assert!(null < blob);
1369        assert!(blob < text);
1370        assert!(text < bool_val);
1371        assert!(bool_val < int);
1372    }
1373
1374    #[test]
1375    fn row_roundtrip_simple() {
1376        let values = vec![
1377            Value::Integer(42),
1378            Value::Text("hello".into()),
1379            Value::Boolean(true),
1380        ];
1381        let encoded = encode_row(&values);
1382        let decoded = decode_row(&encoded).unwrap();
1383        assert_eq!(decoded.len(), 3);
1384        assert_eq!(decoded[0], Value::Integer(42));
1385        assert_eq!(decoded[1], Value::Text("hello".into()));
1386        assert_eq!(decoded[2], Value::Boolean(true));
1387    }
1388
1389    #[test]
1390    fn row_roundtrip_with_nulls() {
1391        let values = vec![
1392            Value::Integer(1),
1393            Value::Null,
1394            Value::Text("test".into()),
1395            Value::Null,
1396        ];
1397        let encoded = encode_row(&values);
1398        let decoded = decode_row(&encoded).unwrap();
1399        assert_eq!(decoded.len(), 4);
1400        assert_eq!(decoded[0], Value::Integer(1));
1401        assert!(decoded[1].is_null());
1402        assert_eq!(decoded[2], Value::Text("test".into()));
1403        assert!(decoded[3].is_null());
1404    }
1405
1406    #[test]
1407    fn row_roundtrip_empty() {
1408        let values: Vec<Value> = vec![];
1409        let encoded = encode_row(&values);
1410        let decoded = decode_row(&encoded).unwrap();
1411        assert!(decoded.is_empty());
1412    }
1413
1414    #[test]
1415    fn row_roundtrip_all_types() {
1416        let values = vec![
1417            Value::Integer(-100),
1418            Value::Real(3.15),
1419            Value::Text("hello world".into()),
1420            Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1421            Value::Boolean(false),
1422            Value::Null,
1423        ];
1424        let encoded = encode_row(&values);
1425        let decoded = decode_row(&encoded).unwrap();
1426        assert_eq!(decoded.len(), 6);
1427        assert_eq!(decoded[0], Value::Integer(-100));
1428        assert_eq!(decoded[1], Value::Real(3.15));
1429        assert_eq!(decoded[2], Value::Text("hello world".into()));
1430        assert_eq!(decoded[3], Value::Blob(vec![0xDE, 0xAD, 0xBE, 0xEF]));
1431        assert_eq!(decoded[4], Value::Boolean(false));
1432        assert!(decoded[5].is_null());
1433    }
1434
1435    #[test]
1436    fn null_escaped_with_embedded_nulls() {
1437        let text = "before\0after";
1438        let encoded = encode_key_value(&Value::Text(text.into()));
1439        let (decoded, _) = decode_key_value(&encoded).unwrap();
1440        assert_eq!(decoded, Value::Text(text.into()));
1441    }
1442
1443    #[test]
1444    fn key_integer_edge_cases() {
1445        for v in [i64::MIN, i64::MIN + 1, -1, 0, 1, i64::MAX - 1, i64::MAX] {
1446            let encoded = encode_key_value(&Value::Integer(v));
1447            let (decoded, n) = decode_key_value(&encoded).unwrap();
1448            assert_eq!(n, encoded.len());
1449            assert_eq!(decoded, Value::Integer(v), "edge case failed for {v}");
1450        }
1451    }
1452
1453    #[test]
1454    fn decode_columns_single() {
1455        let values = vec![
1456            Value::Integer(42),
1457            Value::Text("hello".into()),
1458            Value::Boolean(true),
1459        ];
1460        let encoded = encode_row(&values);
1461        let cols = decode_columns(&encoded, &[1]).unwrap();
1462        assert_eq!(cols.len(), 1);
1463        assert_eq!(cols[0], Value::Text("hello".into()));
1464    }
1465
1466    #[test]
1467    fn decode_columns_multiple() {
1468        let values = vec![
1469            Value::Integer(1),
1470            Value::Real(2.5),
1471            Value::Text("skip".into()),
1472            Value::Boolean(false),
1473            Value::Blob(vec![0xAB]),
1474        ];
1475        let encoded = encode_row(&values);
1476        let cols = decode_columns(&encoded, &[0, 3, 4]).unwrap();
1477        assert_eq!(cols.len(), 3);
1478        assert_eq!(cols[0], Value::Integer(1));
1479        assert_eq!(cols[1], Value::Boolean(false));
1480        assert_eq!(cols[2], Value::Blob(vec![0xAB]));
1481    }
1482
1483    #[test]
1484    fn decode_columns_with_nulls() {
1485        let values = vec![
1486            Value::Integer(10),
1487            Value::Null,
1488            Value::Text("after_null".into()),
1489            Value::Null,
1490            Value::Boolean(true),
1491        ];
1492        let encoded = encode_row(&values);
1493        let cols = decode_columns(&encoded, &[1, 2, 4]).unwrap();
1494        assert_eq!(cols.len(), 3);
1495        assert!(cols[0].is_null());
1496        assert_eq!(cols[1], Value::Text("after_null".into()));
1497        assert_eq!(cols[2], Value::Boolean(true));
1498    }
1499
1500    #[test]
1501    fn decode_columns_first_and_last() {
1502        let values = vec![
1503            Value::Text("first".into()),
1504            Value::Integer(99),
1505            Value::Boolean(false),
1506            Value::Real(3.125),
1507        ];
1508        let encoded = encode_row(&values);
1509        let cols = decode_columns(&encoded, &[0, 3]).unwrap();
1510        assert_eq!(cols.len(), 2);
1511        assert_eq!(cols[0], Value::Text("first".into()));
1512        assert_eq!(cols[1], Value::Real(3.125));
1513    }
1514
1515    #[test]
1516    fn decode_columns_empty_targets() {
1517        let values = vec![Value::Integer(1)];
1518        let encoded = encode_row(&values);
1519        let cols = decode_columns(&encoded, &[]).unwrap();
1520        assert!(cols.is_empty());
1521    }
1522
1523    #[test]
1524    fn decode_columns_all_matches_full_decode() {
1525        let values = vec![
1526            Value::Integer(-100),
1527            Value::Real(3.15),
1528            Value::Text("hello world".into()),
1529            Value::Blob(vec![0xDE, 0xAD]),
1530            Value::Boolean(false),
1531            Value::Null,
1532        ];
1533        let encoded = encode_row(&values);
1534        let full = decode_row(&encoded).unwrap();
1535        let selective = decode_columns(&encoded, &[0, 1, 2, 3, 4, 5]).unwrap();
1536        assert_eq!(full, selective);
1537    }
1538
1539    #[test]
1540    fn raw_column_integer() {
1541        let values = vec![Value::Integer(42), Value::Text("hello".into())];
1542        let encoded = encode_row(&values);
1543        let raw = decode_column_raw(&encoded, 0).unwrap();
1544        assert!(matches!(raw, RawColumn::Integer(42)));
1545        assert_eq!(raw.to_value(), Value::Integer(42));
1546    }
1547
1548    #[test]
1549    fn raw_column_text_borrows() {
1550        let values = vec![Value::Integer(1), Value::Text("hello".into())];
1551        let encoded = encode_row(&values);
1552        let raw = decode_column_raw(&encoded, 1).unwrap();
1553        match raw {
1554            RawColumn::Text(s) => assert_eq!(s, "hello"),
1555            other => panic!("expected Text, got {other:?}"),
1556        }
1557    }
1558
1559    #[test]
1560    fn raw_column_null() {
1561        let values = vec![Value::Integer(1), Value::Null, Value::Boolean(true)];
1562        let encoded = encode_row(&values);
1563        let raw = decode_column_raw(&encoded, 1).unwrap();
1564        assert!(matches!(raw, RawColumn::Null));
1565    }
1566
1567    #[test]
1568    fn raw_column_last() {
1569        let values = vec![
1570            Value::Integer(1),
1571            Value::Text("skip".into()),
1572            Value::Real(3.15),
1573        ];
1574        let encoded = encode_row(&values);
1575        let raw = decode_column_raw(&encoded, 2).unwrap();
1576        match raw {
1577            RawColumn::Real(r) => assert!((r - 3.15).abs() < 1e-10),
1578            other => panic!("expected Real, got {other:?}"),
1579        }
1580    }
1581
1582    #[test]
1583    fn raw_column_out_of_bounds_returns_null() {
1584        let values = vec![Value::Integer(1)];
1585        let encoded = encode_row(&values);
1586        assert!(matches!(
1587            decode_column_raw(&encoded, 1).unwrap(),
1588            RawColumn::Null
1589        ));
1590    }
1591
1592    #[test]
1593    fn raw_column_eq_value() {
1594        let raw_int = RawColumn::Integer(42);
1595        assert!(raw_int.eq_value(&Value::Integer(42)));
1596        assert!(!raw_int.eq_value(&Value::Integer(43)));
1597        assert!(raw_int.eq_value(&Value::Real(42.0)));
1598
1599        let raw_text = RawColumn::Text("hello");
1600        assert!(raw_text.eq_value(&Value::Text("hello".into())));
1601        assert!(!raw_text.eq_value(&Value::Text("world".into())));
1602    }
1603
1604    #[test]
1605    fn raw_column_cmp_value() {
1606        use std::cmp::Ordering;
1607        let raw = RawColumn::Integer(42);
1608        assert_eq!(raw.cmp_value(&Value::Integer(42)), Some(Ordering::Equal));
1609        assert_eq!(raw.cmp_value(&Value::Integer(50)), Some(Ordering::Less));
1610        assert_eq!(raw.cmp_value(&Value::Integer(10)), Some(Ordering::Greater));
1611        assert_eq!(raw.cmp_value(&Value::Null), None);
1612    }
1613
1614    #[test]
1615    fn raw_column_as_numeric() {
1616        assert_eq!(RawColumn::Integer(42).as_i64(), Some(42));
1617        assert_eq!(RawColumn::Integer(42).as_f64(), Some(42.0));
1618        assert_eq!(RawColumn::Real(3.15).as_f64(), Some(3.15));
1619        assert_eq!(RawColumn::Real(3.15).as_i64(), None);
1620        assert_eq!(RawColumn::Text("x").as_f64(), None);
1621        assert_eq!(RawColumn::Null.as_i64(), None);
1622    }
1623
1624    #[test]
1625    fn decode_pk_integer_roundtrip() {
1626        for v in [0i64, 1, -1, 42, -1000, i64::MIN, i64::MAX] {
1627            let encoded = encode_key_value(&Value::Integer(v));
1628            let decoded = decode_pk_integer(&encoded).unwrap();
1629            assert_eq!(decoded, v);
1630        }
1631    }
1632
1633    #[test]
1634    fn decode_pk_integer_rejects_non_integer() {
1635        let encoded = encode_key_value(&Value::Text("hello".into()));
1636        assert!(decode_pk_integer(&encoded).is_err());
1637    }
1638
1639    #[test]
1640    fn raw_column_blob() {
1641        let values = vec![Value::Blob(vec![0xDE, 0xAD])];
1642        let encoded = encode_row(&values);
1643        let raw = decode_column_raw(&encoded, 0).unwrap();
1644        match raw {
1645            RawColumn::Blob(b) => assert_eq!(b, &[0xDE, 0xAD]),
1646            other => panic!("expected Blob, got {other:?}"),
1647        }
1648    }
1649
1650    #[test]
1651    fn raw_column_matches_full_decode() {
1652        let values = vec![
1653            Value::Integer(-100),
1654            Value::Real(3.15),
1655            Value::Text("hello world".into()),
1656            Value::Blob(vec![0xDE, 0xAD]),
1657            Value::Boolean(false),
1658            Value::Null,
1659        ];
1660        let encoded = encode_row(&values);
1661        let full = decode_row(&encoded).unwrap();
1662        for (i, expected) in full.iter().enumerate() {
1663            let raw = decode_column_raw(&encoded, i).unwrap();
1664            assert_eq!(raw.to_value(), *expected, "mismatch at column {i}");
1665        }
1666    }
1667}