Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18/// Encode a single value into an order-preserving byte sequence.
19pub fn encode_key_value(value: &Value) -> Vec<u8> {
20    let mut buf = Vec::with_capacity(16);
21    encode_key_value_into(value, &mut buf);
22    buf
23}
24
25/// Encode a composite key (multiple values concatenated).
26pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27    let mut buf = Vec::new();
28    for v in values {
29        buf.extend_from_slice(&encode_key_value(v));
30    }
31    buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35    buf.clear();
36    for v in values {
37        encode_key_value_into(v, buf);
38    }
39}
40
41pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
42    buf.clear();
43    for &i in indices {
44        encode_key_value_into(&row[i as usize], buf);
45    }
46}
47
48#[inline]
49pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
50    buf.clear();
51    encode_signed_varint(TAG_INTEGER, val, buf);
52}
53
54pub(crate) fn encode_key_value_collated_into(
55    value: &Value,
56    coll: crate::types::Collation,
57    buf: &mut Vec<u8>,
58) {
59    match (value, coll) {
60        (Value::Text(s), crate::types::Collation::NoCase) => {
61            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
62        }
63        (Value::Text(s), crate::types::Collation::Rtrim) => {
64            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
65        }
66        _ => encode_key_value_into(value, buf),
67    }
68}
69
70pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
71    match value {
72        Value::Null => buf.push(TAG_NULL),
73        Value::Boolean(b) => {
74            buf.push(TAG_BOOLEAN);
75            buf.push(if *b { 0x01 } else { 0x00 });
76        }
77        Value::Integer(i) => encode_integer_into(*i, buf),
78        Value::Real(r) => encode_real_into(*r, buf),
79        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
80        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
81        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
82        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
83        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
84        Value::Interval {
85            months,
86            days,
87            micros,
88        } => {
89            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
90            buf.push(TAG_INTERVAL);
91            let mut mb = months.to_be_bytes();
92            mb[0] ^= 0x80;
93            buf.extend_from_slice(&mb);
94            let mut db = days.to_be_bytes();
95            db[0] ^= 0x80;
96            buf.extend_from_slice(&db);
97            let mut ub = micros.to_be_bytes();
98            ub[0] ^= 0x80;
99            buf.extend_from_slice(&ub);
100        }
101    }
102}
103
104fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
105    encode_signed_varint(TAG_INTEGER, val, buf);
106}
107
108/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
109/// Layout: [tag] [marker] [data bytes].
110/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
111/// 0x80-n for negative (n one's-complemented bytes follow).
112/// Byte-wise lex compare matches signed integer order.
113pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
114    buf.push(tag);
115    if val == 0 {
116        buf.push(0x80);
117        return;
118    }
119    if val > 0 {
120        let bytes = val.to_be_bytes();
121        let start = bytes.iter().position(|&b| b != 0).unwrap();
122        let byte_count = (8 - start) as u8;
123        buf.push(0x80 + byte_count);
124        buf.extend_from_slice(&bytes[start..]);
125    } else {
126        let abs_val = if val == i64::MIN {
127            u64::MAX / 2 + 1
128        } else {
129            (-val) as u64
130        };
131        let bytes = abs_val.to_be_bytes();
132        let start = bytes.iter().position(|&b| b != 0).unwrap();
133        let byte_count = (8 - start) as u8;
134        buf.push(0x80 - byte_count);
135        for &b in &bytes[start..] {
136            buf.push(!b);
137        }
138    }
139}
140
141fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
142    buf.push(TAG_REAL);
143    let bits = val.to_bits();
144    let encoded = if val.is_sign_negative() {
145        !bits
146    } else {
147        bits ^ (1u64 << 63)
148    };
149    buf.extend_from_slice(&encoded.to_be_bytes());
150}
151
152fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
153    buf.push(tag);
154    for &b in data {
155        if b == 0x00 {
156            buf.push(0x00);
157            buf.push(0xFF);
158        } else {
159            buf.push(b);
160        }
161    }
162    buf.push(0x00);
163}
164
165/// Decode a single key value, returning the value and the number of bytes consumed.
166pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
167    if data.is_empty() {
168        return Err(SqlError::InvalidValue("empty key data".into()));
169    }
170    match data[0] {
171        TAG_NULL => Ok((Value::Null, 1)),
172        TAG_BOOLEAN => {
173            if data.len() < 2 {
174                return Err(SqlError::InvalidValue("truncated boolean".into()));
175            }
176            Ok((Value::Boolean(data[1] != 0), 2))
177        }
178        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
179        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
180        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
181        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
182            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
183            (Value::Date(d), n + 1)
184        }),
185        TAG_TIMESTAMP => {
186            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
187        }
188        TAG_INTERVAL => {
189            if data.len() < 1 + 16 {
190                return Err(SqlError::InvalidValue("truncated interval".into()));
191            }
192            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
193            mb[0] ^= 0x80;
194            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
195            db[0] ^= 0x80;
196            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
197            ub[0] ^= 0x80;
198            Ok((
199                Value::Interval {
200                    months: i32::from_be_bytes(mb),
201                    days: i32::from_be_bytes(db),
202                    micros: i64::from_be_bytes(ub),
203                },
204                17,
205            ))
206        }
207        TAG_TEXT => {
208            let (bytes, n) = decode_null_escaped(&data[1..])?;
209            let s = String::from_utf8(bytes)
210                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
211            Ok((Value::Text(CompactString::from(s)), n + 1))
212        }
213        TAG_BLOB => {
214            let (bytes, n) = decode_null_escaped(&data[1..])?;
215            Ok((Value::Blob(bytes), n + 1))
216        }
217        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
218    }
219}
220
221/// Decode a composite key into multiple values.
222pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
223    let mut values = Vec::with_capacity(count);
224    let mut pos = 0;
225    for _ in 0..count {
226        let (v, n) = decode_key_value(&data[pos..])?;
227        values.push(v);
228        pos += n;
229    }
230    Ok(values)
231}
232
233fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
234    let (v, n) = decode_signed_varint(data)?;
235    Ok((Value::Integer(v), n))
236}
237
238/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
239pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
240    if data.is_empty() {
241        return Err(SqlError::InvalidValue("truncated integer".into()));
242    }
243    let marker = data[0];
244    if marker == 0x80 {
245        return Ok((0, 1));
246    }
247    if marker > 0x80 {
248        let byte_count = (marker - 0x80) as usize;
249        if data.len() < 1 + byte_count {
250            return Err(SqlError::InvalidValue("truncated positive integer".into()));
251        }
252        let mut bytes = [0u8; 8];
253        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
254        let val = i64::from_be_bytes(bytes);
255        Ok((val, 1 + byte_count))
256    } else {
257        let byte_count = (0x80 - marker) as usize;
258        if data.len() < 1 + byte_count {
259            return Err(SqlError::InvalidValue("truncated negative integer".into()));
260        }
261        let mut bytes = [0u8; 8];
262        for i in 0..byte_count {
263            bytes[8 - byte_count + i] = !data[1 + i];
264        }
265        let abs_val = u64::from_be_bytes(bytes);
266        let val = (-(abs_val as i128)) as i64;
267        Ok((val, 1 + byte_count))
268    }
269}
270
271fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
272    if data.len() < 8 {
273        return Err(SqlError::InvalidValue("truncated real".into()));
274    }
275    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
276    let bits = if encoded & (1u64 << 63) != 0 {
277        // Was positive: undo sign bit flip
278        encoded ^ (1u64 << 63)
279    } else {
280        // Was negative: undo full inversion
281        !encoded
282    };
283    let val = f64::from_bits(bits);
284    Ok((Value::Real(val), 8))
285}
286
287/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
288fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
289    let mut result = Vec::new();
290    let mut i = 0;
291    while i < data.len() {
292        if data[i] == 0x00 {
293            if i + 1 < data.len() && data[i + 1] == 0xFF {
294                result.push(0x00);
295                i += 2;
296            } else {
297                return Ok((result, i + 1)); // terminator consumed
298            }
299        } else {
300            result.push(data[i]);
301            i += 1;
302        }
303    }
304    Err(SqlError::InvalidValue(
305        "unterminated null-escaped string".into(),
306    ))
307}
308
309fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
310    match v {
311        Value::Integer(val) => {
312            buf.push(DataType::Integer.type_tag());
313            buf.extend_from_slice(&val.to_le_bytes());
314        }
315        Value::Real(r) => {
316            buf.push(DataType::Real.type_tag());
317            buf.extend_from_slice(&r.to_le_bytes());
318        }
319        Value::Boolean(b) => {
320            buf.push(DataType::Boolean.type_tag());
321            buf.push(if *b { 1 } else { 0 });
322        }
323        Value::Text(s) => {
324            let bytes = s.as_bytes();
325            buf.push(DataType::Text.type_tag());
326            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
327            buf.extend_from_slice(bytes);
328        }
329        Value::Blob(data) => {
330            buf.push(DataType::Blob.type_tag());
331            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
332            buf.extend_from_slice(data);
333        }
334        Value::Time(t) => {
335            buf.push(DataType::Time.type_tag());
336            buf.extend_from_slice(&t.to_le_bytes());
337        }
338        Value::Date(d) => {
339            buf.push(DataType::Date.type_tag());
340            buf.extend_from_slice(&d.to_le_bytes());
341        }
342        Value::Timestamp(t) => {
343            buf.push(DataType::Timestamp.type_tag());
344            buf.extend_from_slice(&t.to_le_bytes());
345        }
346        Value::Interval {
347            months,
348            days,
349            micros,
350        } => {
351            buf.push(DataType::Interval.type_tag());
352            buf.extend_from_slice(&months.to_le_bytes());
353            buf.extend_from_slice(&days.to_le_bytes());
354            buf.extend_from_slice(&micros.to_le_bytes());
355        }
356        Value::Null => unreachable!(),
357    }
358}
359
360pub fn encode_row(values: &[Value]) -> Vec<u8> {
361    let mut buf = Vec::new();
362    encode_row_into(values, &mut buf);
363    buf
364}
365
366pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
367    buf.clear();
368    let col_count = values.len();
369    let bitmap_bytes = col_count.div_ceil(8);
370
371    let header = (col_count as u16) | V2_FLAG;
372    buf.extend_from_slice(&header.to_le_bytes());
373
374    let bitmap_start = buf.len();
375    buf.resize(buf.len() + bitmap_bytes, 0);
376
377    for (i, v) in values.iter().enumerate() {
378        if v.is_null() {
379            buf[bitmap_start + i / 8] |= 1 << (i % 8);
380            continue;
381        }
382        encode_cell_v2(v, buf);
383    }
384}
385
386pub struct IntRowTemplate {
387    pub template: Vec<u8>,
388    pub slot_offsets: Vec<(usize, usize)>,
389}
390
391pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
392    let bitmap_bytes = phys_count.div_ceil(8);
393    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
394    let header = (phys_count as u16) | V2_FLAG;
395    template.extend_from_slice(&header.to_le_bytes());
396    let bitmap_start = template.len();
397    template.resize(bitmap_start + bitmap_bytes, 0);
398    for &i in null_slots {
399        template[bitmap_start + i / 8] |= 1 << (i % 8);
400    }
401    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
402    for slot in 0..phys_count {
403        if null_slots.contains(&slot) {
404            continue;
405        }
406        template.push(DataType::Integer.type_tag());
407        let value_offset = template.len();
408        template.extend_from_slice(&[0u8; 8]);
409        slot_offsets.push((slot, value_offset));
410    }
411    IntRowTemplate {
412        template,
413        slot_offsets,
414    }
415}
416
417/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
418#[inline]
419pub fn encode_int_row_with_template(
420    tmpl: &IntRowTemplate,
421    values: &[Value],
422    buf: &mut Vec<u8>,
423) -> Result<()> {
424    buf.clear();
425    buf.extend_from_slice(&tmpl.template);
426    for &(slot, off) in &tmpl.slot_offsets {
427        match &values[slot] {
428            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
429            other => {
430                return Err(SqlError::TypeMismatch {
431                    expected: "Integer".into(),
432                    got: other.data_type().to_string(),
433                });
434            }
435        }
436    }
437    Ok(())
438}
439
440fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
441    match DataType::from_tag(type_tag) {
442        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
443            data[..8].try_into().unwrap(),
444        ))),
445        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
446            data[..8].try_into().unwrap(),
447        ))),
448        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
449        Some(DataType::Text) => {
450            let s = std::str::from_utf8(data)
451                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
452            Ok(Value::Text(CompactString::from(s)))
453        }
454        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
455        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
456            data[..8].try_into().unwrap(),
457        ))),
458        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
459            data[..4].try_into().unwrap(),
460        ))),
461        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
462            data[..8].try_into().unwrap(),
463        ))),
464        Some(DataType::Interval) => {
465            if data.len() < 16 {
466                return Err(SqlError::InvalidValue("truncated interval".into()));
467            }
468            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
469            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
470            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
471            Ok(Value::Interval {
472                months,
473                days,
474                micros,
475            })
476        }
477        _ => Err(SqlError::InvalidValue(format!(
478            "unknown column type tag: {type_tag}"
479        ))),
480    }
481}
482
483/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
484/// High bit of `col_count:u16` flags V2.
485#[derive(Clone, Copy, PartialEq, Eq, Debug)]
486pub(crate) enum RowVersion {
487    V1,
488    V2,
489}
490
491pub(crate) const V2_FLAG: u16 = 0x8000;
492pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
493
494#[inline]
495pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
496    match DataType::from_tag(type_tag)? {
497        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
498        DataType::Date => Some(4),
499        DataType::Boolean => Some(1),
500        DataType::Interval => Some(16),
501        DataType::Text | DataType::Blob | DataType::Null => None,
502    }
503}
504
505#[inline]
506fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
507    if pos >= data.len() {
508        return Err(SqlError::InvalidValue("truncated column data".into()));
509    }
510    let type_tag = data[pos];
511    let after_tag = pos + 1;
512    let (data_len, body_pos) = match version {
513        RowVersion::V2 => match fixed_width_size(type_tag) {
514            Some(n) => (n, after_tag),
515            None => {
516                if after_tag + 4 > data.len() {
517                    return Err(SqlError::InvalidValue("truncated column data".into()));
518                }
519                let len = u32::from_le_bytes([
520                    data[after_tag],
521                    data[after_tag + 1],
522                    data[after_tag + 2],
523                    data[after_tag + 3],
524                ]) as usize;
525                (len, after_tag + 4)
526            }
527        },
528        RowVersion::V1 => {
529            if after_tag + 4 > data.len() {
530                return Err(SqlError::InvalidValue("truncated column data".into()));
531            }
532            let len = u32::from_le_bytes([
533                data[after_tag],
534                data[after_tag + 1],
535                data[after_tag + 2],
536                data[after_tag + 3],
537            ]) as usize;
538            (len, after_tag + 4)
539        }
540    };
541    if body_pos + data_len > data.len() {
542        return Err(SqlError::InvalidValue("truncated column value".into()));
543    }
544    Ok((
545        type_tag,
546        &data[body_pos..body_pos + data_len],
547        body_pos + data_len,
548    ))
549}
550
551#[inline]
552fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
553    let (_, _, next) = read_cell(data, pos, version)?;
554    Ok(next)
555}
556
557fn copy_cell_to_v2(
558    data: &[u8],
559    pos: usize,
560    version: RowVersion,
561    out: &mut Vec<u8>,
562) -> Result<usize> {
563    let (tag, body, next) = read_cell(data, pos, version)?;
564    out.push(tag);
565    if fixed_width_size(tag).is_none() {
566        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
567    }
568    out.extend_from_slice(body);
569    Ok(next)
570}
571
572fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
573    if data.len() < 2 {
574        return Err(SqlError::InvalidValue("row data too short".into()));
575    }
576    let raw = u16::from_le_bytes([data[0], data[1]]);
577    let version = if raw & V2_FLAG != 0 {
578        RowVersion::V2
579    } else {
580        RowVersion::V1
581    };
582    let col_count = (raw & COL_COUNT_MASK) as usize;
583    let bitmap_bytes = col_count.div_ceil(8);
584    let pos = 2;
585    if data.len() < pos + bitmap_bytes {
586        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
587    }
588    Ok((
589        version,
590        col_count,
591        &data[pos..pos + bitmap_bytes],
592        pos + bitmap_bytes,
593    ))
594}
595
596pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
597    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
598
599    let mut values = Vec::with_capacity(col_count);
600    for i in 0..col_count {
601        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
602            values.push(Value::Null);
603            continue;
604        }
605        let (type_tag, body, next) = read_cell(data, pos, version)?;
606        values.push(decode_value(type_tag, body)?);
607        pos = next;
608    }
609
610    Ok(values)
611}
612
613/// Returns the number of non-PK columns stored in a row value blob.
614#[inline]
615pub fn row_non_pk_count(data: &[u8]) -> usize {
616    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
617}
618
619pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
620    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
621
622    for i in 0..col_count {
623        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
624            continue;
625        }
626        let (type_tag, body, next) = read_cell(data, pos, version)?;
627        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
628            out[col_mapping[i]] = decode_value(type_tag, body)?;
629        }
630        pos = next;
631    }
632
633    Ok(())
634}
635
636pub fn decode_pk_into(
637    key: &[u8],
638    count: usize,
639    out: &mut [Value],
640    pk_mapping: &[usize],
641) -> Result<()> {
642    let mut pos = 0;
643    for i in 0..count {
644        let (v, n) = decode_key_value(&key[pos..])?;
645        if i < pk_mapping.len() {
646            out[pk_mapping[i]] = v;
647        }
648        pos += n;
649    }
650    Ok(())
651}
652
653pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
654    if targets.is_empty() {
655        return Ok(Vec::new());
656    }
657    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
658
659    let mut results = Vec::with_capacity(targets.len());
660    let mut ti = 0;
661
662    for col in 0..col_count {
663        if ti >= targets.len() {
664            break;
665        }
666        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
667
668        if col == targets[ti] {
669            if is_null {
670                results.push(Value::Null);
671            } else {
672                let (type_tag, body, next) = read_cell(data, pos, version)?;
673                results.push(decode_value(type_tag, body)?);
674                pos = next;
675            }
676            ti += 1;
677        } else if !is_null {
678            pos = skip_cell(data, pos, version)?;
679        }
680    }
681
682    while ti < targets.len() {
683        results.push(Value::Null);
684        ti += 1;
685    }
686
687    Ok(results)
688}
689
690pub fn decode_columns_into(
691    data: &[u8],
692    targets: &[usize],
693    schema_cols: &[usize],
694    row: &mut [Value],
695) -> Result<()> {
696    if targets.is_empty() {
697        return Ok(());
698    }
699    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
700
701    let mut ti = 0;
702    for col in 0..col_count {
703        if ti >= targets.len() {
704            break;
705        }
706        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
707
708        if col == targets[ti] {
709            if is_null {
710                row[schema_cols[ti]] = Value::Null;
711            } else {
712                let (type_tag, body, next) = read_cell(data, pos, version)?;
713                row[schema_cols[ti]] = decode_value(type_tag, body)?;
714                pos = next;
715            }
716            ti += 1;
717        } else if !is_null {
718            pos = skip_cell(data, pos, version)?;
719        }
720    }
721
722    Ok(())
723}
724
725#[derive(Debug, Clone, Copy)]
726pub enum RawColumn<'a> {
727    Null,
728    Integer(i64),
729    Real(f64),
730    Boolean(bool),
731    Text(&'a str),
732    Blob(&'a [u8]),
733    Time(i64),
734    Date(i32),
735    Timestamp(i64),
736    Interval { months: i32, days: i32, micros: i64 },
737}
738
739impl<'a> RawColumn<'a> {
740    pub fn to_value(self) -> Value {
741        match self {
742            RawColumn::Null => Value::Null,
743            RawColumn::Integer(i) => Value::Integer(i),
744            RawColumn::Real(r) => Value::Real(r),
745            RawColumn::Boolean(b) => Value::Boolean(b),
746            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
747            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
748            RawColumn::Time(t) => Value::Time(t),
749            RawColumn::Date(d) => Value::Date(d),
750            RawColumn::Timestamp(t) => Value::Timestamp(t),
751            RawColumn::Interval {
752                months,
753                days,
754                micros,
755            } => Value::Interval {
756                months,
757                days,
758                micros,
759            },
760        }
761    }
762
763    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
764        use std::cmp::Ordering;
765        match (self, other) {
766            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
767            (RawColumn::Null, _) | (_, Value::Null) => None,
768            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
769            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
770            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
771            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
772            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
773            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
774            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
775            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
776            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
777            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
778            (
779                RawColumn::Interval {
780                    months: am,
781                    days: ad,
782                    micros: au,
783                },
784                Value::Interval {
785                    months: bm,
786                    days: bd,
787                    micros: bu,
788                },
789            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
790            _ => None,
791        }
792    }
793
794    pub fn eq_value(&self, other: &Value) -> bool {
795        match (self, other) {
796            (RawColumn::Null, Value::Null) => true,
797            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
798            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
799            (RawColumn::Real(a), Value::Real(b)) => a == b,
800            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
801            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
802            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
803            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
804            (RawColumn::Time(a), Value::Time(b)) => a == b,
805            (RawColumn::Date(a), Value::Date(b)) => a == b,
806            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
807            (
808                RawColumn::Interval {
809                    months: am,
810                    days: ad,
811                    micros: au,
812                },
813                Value::Interval {
814                    months: bm,
815                    days: bd,
816                    micros: bu,
817                },
818            ) => am == bm && ad == bd && au == bu,
819            _ => false,
820        }
821    }
822
823    pub fn as_f64(&self) -> Option<f64> {
824        match self {
825            RawColumn::Integer(i) => Some(*i as f64),
826            RawColumn::Real(r) => Some(*r),
827            _ => None,
828        }
829    }
830
831    pub fn as_i64(&self) -> Option<i64> {
832        match self {
833            RawColumn::Integer(i) => Some(*i),
834            RawColumn::Time(t) => Some(*t),
835            RawColumn::Date(d) => Some(*d as i64),
836            RawColumn::Timestamp(t) => Some(*t),
837            _ => None,
838        }
839    }
840}
841
842fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
843    match DataType::from_tag(type_tag) {
844        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
845            data[..8].try_into().unwrap(),
846        ))),
847        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
848            data[..8].try_into().unwrap(),
849        ))),
850        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
851        Some(DataType::Text) => {
852            let s = std::str::from_utf8(data)
853                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
854            Ok(RawColumn::Text(s))
855        }
856        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
857        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
858            data[..8].try_into().unwrap(),
859        ))),
860        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
861            data[..4].try_into().unwrap(),
862        ))),
863        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
864            data[..8].try_into().unwrap(),
865        ))),
866        Some(DataType::Interval) => {
867            if data.len() < 16 {
868                return Err(SqlError::InvalidValue("truncated interval".into()));
869            }
870            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
871            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
872            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
873            Ok(RawColumn::Interval {
874                months,
875                days,
876                micros,
877            })
878        }
879        _ => Err(SqlError::InvalidValue(format!(
880            "unknown column type tag: {type_tag}"
881        ))),
882    }
883}
884
885/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
886pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
887    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
888    if target >= col_count || new_val.is_null() {
889        return Ok(false);
890    }
891    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
892    if was_null {
893        return Ok(false);
894    }
895    for col in 0..target {
896        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
897        if !is_null {
898            pos = skip_cell(data, pos, version)?;
899        }
900    }
901    let type_tag = data[pos];
902    let (old_data_len, val_start) = match version {
903        RowVersion::V2 => match fixed_width_size(type_tag) {
904            Some(n) => (n, pos + 1),
905            None => {
906                if pos + 5 > data.len() {
907                    return Err(SqlError::InvalidValue("truncated column data".into()));
908                }
909                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
910                (len, pos + 5)
911            }
912        },
913        RowVersion::V1 => {
914            if pos + 5 > data.len() {
915                return Err(SqlError::InvalidValue("truncated column data".into()));
916            }
917            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
918            (len, pos + 5)
919        }
920    };
921    let new_data_len = match new_val {
922        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
923        Value::Date(_) => 4,
924        Value::Interval { .. } => 16,
925        Value::Boolean(_) => 1,
926        Value::Text(s) => s.len(),
927        Value::Blob(b) => b.len(),
928        Value::Null => return Ok(false),
929    };
930    if new_data_len != old_data_len {
931        return Ok(false);
932    }
933    data[pos] = new_val.data_type().type_tag();
934    match new_val {
935        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
936        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
937        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
938        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
939        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
940        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
941        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
942        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
943        Value::Interval {
944            months,
945            days,
946            micros,
947        } => {
948            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
949            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
950            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
951        }
952        Value::Null => unreachable!(),
953    }
954    Ok(true)
955}
956
957/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
958pub fn patch_row_column(
959    data: &[u8],
960    target: usize,
961    new_val: &Value,
962    out: &mut Vec<u8>,
963) -> Result<()> {
964    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
965
966    let new_col_count = if target >= col_count {
967        target + 1
968    } else {
969        col_count
970    };
971    let new_bitmap_bytes = new_col_count.div_ceil(8);
972    let bitmap_bytes = col_count.div_ceil(8);
973    out.clear();
974
975    let header = (new_col_count as u16) | V2_FLAG;
976    out.extend_from_slice(&header.to_le_bytes());
977    let bitmap_start = out.len();
978    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
979    for _ in bitmap_bytes..new_bitmap_bytes {
980        out.push(0xFF);
981    }
982    if new_val.is_null() {
983        out[bitmap_start + target / 8] |= 1 << (target % 8);
984    } else {
985        out[bitmap_start + target / 8] &= !(1 << (target % 8));
986    }
987
988    let mut pos = header_end;
989    for col in 0..new_col_count {
990        let was_null = if col < col_count {
991            bitmap[col / 8] & (1 << (col % 8)) != 0
992        } else {
993            true
994        };
995
996        if col == target {
997            if !was_null {
998                pos = skip_cell(data, pos, version)?;
999            }
1000            if !new_val.is_null() {
1001                encode_cell_v2(new_val, out);
1002            }
1003        } else if !was_null {
1004            pos = copy_cell_to_v2(data, pos, version, out)?;
1005        }
1006    }
1007    Ok(())
1008}
1009
1010pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1011    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1012    if target >= col_count {
1013        return Ok(RawColumn::Null);
1014    }
1015
1016    for col in 0..=target {
1017        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1018
1019        if col == target {
1020            if is_null {
1021                return Ok(RawColumn::Null);
1022            }
1023            let (type_tag, body, _) = read_cell(data, pos, version)?;
1024            return decode_value_raw(type_tag, body);
1025        } else if !is_null {
1026            pos = skip_cell(data, pos, version)?;
1027        }
1028    }
1029
1030    unreachable!()
1031}
1032
1033/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1034pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1035    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1036    if target >= col_count {
1037        return Ok((RawColumn::Null, usize::MAX));
1038    }
1039
1040    for col in 0..=target {
1041        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1042
1043        if col == target {
1044            if is_null {
1045                return Ok((RawColumn::Null, usize::MAX));
1046            }
1047            let tag_offset = pos;
1048            let (type_tag, body, _) = read_cell(data, pos, version)?;
1049            let raw = decode_value_raw(type_tag, body)?;
1050            return Ok((raw, tag_offset));
1051        } else if !is_null {
1052            pos = skip_cell(data, pos, version)?;
1053        }
1054    }
1055
1056    unreachable!()
1057}
1058
1059/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1060pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1061    if offset == usize::MAX || new_val.is_null() {
1062        return Ok(false);
1063    }
1064    if data.len() < 2 || offset >= data.len() {
1065        return Err(SqlError::InvalidValue("truncated column data".into()));
1066    }
1067    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1068        RowVersion::V2
1069    } else {
1070        RowVersion::V1
1071    };
1072    let type_tag = data[offset];
1073    let (old_data_len, val_start) = match version {
1074        RowVersion::V2 => match fixed_width_size(type_tag) {
1075            Some(n) => (n, offset + 1),
1076            None => {
1077                if offset + 5 > data.len() {
1078                    return Err(SqlError::InvalidValue("truncated column data".into()));
1079                }
1080                let len =
1081                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1082                (len, offset + 5)
1083            }
1084        },
1085        RowVersion::V1 => {
1086            if offset + 5 > data.len() {
1087                return Err(SqlError::InvalidValue("truncated column data".into()));
1088            }
1089            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1090            (len, offset + 5)
1091        }
1092    };
1093    let new_data_len = match new_val {
1094        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1095        Value::Date(_) => 4,
1096        Value::Interval { .. } => 16,
1097        Value::Boolean(_) => 1,
1098        Value::Text(s) => s.len(),
1099        Value::Blob(b) => b.len(),
1100        Value::Null => return Ok(false),
1101    };
1102    if new_data_len != old_data_len {
1103        return Ok(false);
1104    }
1105    data[offset] = new_val.data_type().type_tag();
1106    match new_val {
1107        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1108        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1109        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1110        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1111        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1112        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1113        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1114        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1115        Value::Interval {
1116            months,
1117            days,
1118            micros,
1119        } => {
1120            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1121            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1122            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1123        }
1124        Value::Null => unreachable!(),
1125    }
1126    Ok(true)
1127}
1128
1129pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1130    if key.is_empty() || key[0] != TAG_INTEGER {
1131        return Err(SqlError::InvalidValue("not an integer key".into()));
1132    }
1133    let (val, _) = decode_integer(&key[1..])?;
1134    match val {
1135        Value::Integer(i) => Ok(i),
1136        _ => unreachable!(),
1137    }
1138}
1139
1140#[cfg(test)]
1141#[path = "encoding_tests.rs"]
1142mod tests;