Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18/// Encode a single value into an order-preserving byte sequence.
19pub fn encode_key_value(value: &Value) -> Vec<u8> {
20    let mut buf = Vec::with_capacity(16);
21    encode_key_value_into(value, &mut buf);
22    buf
23}
24
25/// Encode a composite key (multiple values concatenated).
26pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27    let mut buf = Vec::new();
28    for v in values {
29        buf.extend_from_slice(&encode_key_value(v));
30    }
31    buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35    buf.clear();
36    for v in values {
37        encode_key_value_into(v, buf);
38    }
39}
40
41#[inline]
42pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
43    buf.clear();
44    encode_signed_varint(TAG_INTEGER, val, buf);
45}
46
47fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
48    match value {
49        Value::Null => buf.push(TAG_NULL),
50        Value::Boolean(b) => {
51            buf.push(TAG_BOOLEAN);
52            buf.push(if *b { 0x01 } else { 0x00 });
53        }
54        Value::Integer(i) => encode_integer_into(*i, buf),
55        Value::Real(r) => encode_real_into(*r, buf),
56        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
57        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
58        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
59        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
60        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
61        Value::Interval {
62            months,
63            days,
64            micros,
65        } => {
66            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
67            buf.push(TAG_INTERVAL);
68            let mut mb = months.to_be_bytes();
69            mb[0] ^= 0x80;
70            buf.extend_from_slice(&mb);
71            let mut db = days.to_be_bytes();
72            db[0] ^= 0x80;
73            buf.extend_from_slice(&db);
74            let mut ub = micros.to_be_bytes();
75            ub[0] ^= 0x80;
76            buf.extend_from_slice(&ub);
77        }
78    }
79}
80
81fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
82    encode_signed_varint(TAG_INTEGER, val, buf);
83}
84
85/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
86/// Layout: [tag] [marker] [data bytes].
87/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
88/// 0x80-n for negative (n one's-complemented bytes follow).
89/// Byte-wise lex compare matches signed integer order.
90pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
91    buf.push(tag);
92    if val == 0 {
93        buf.push(0x80);
94        return;
95    }
96    if val > 0 {
97        let bytes = val.to_be_bytes();
98        let start = bytes.iter().position(|&b| b != 0).unwrap();
99        let byte_count = (8 - start) as u8;
100        buf.push(0x80 + byte_count);
101        buf.extend_from_slice(&bytes[start..]);
102    } else {
103        let abs_val = if val == i64::MIN {
104            u64::MAX / 2 + 1
105        } else {
106            (-val) as u64
107        };
108        let bytes = abs_val.to_be_bytes();
109        let start = bytes.iter().position(|&b| b != 0).unwrap();
110        let byte_count = (8 - start) as u8;
111        buf.push(0x80 - byte_count);
112        for &b in &bytes[start..] {
113            buf.push(!b);
114        }
115    }
116}
117
118fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
119    buf.push(TAG_REAL);
120    let bits = val.to_bits();
121    let encoded = if val.is_sign_negative() {
122        !bits
123    } else {
124        bits ^ (1u64 << 63)
125    };
126    buf.extend_from_slice(&encoded.to_be_bytes());
127}
128
129fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
130    buf.push(tag);
131    for &b in data {
132        if b == 0x00 {
133            buf.push(0x00);
134            buf.push(0xFF);
135        } else {
136            buf.push(b);
137        }
138    }
139    buf.push(0x00);
140}
141
142/// Decode a single key value, returning the value and the number of bytes consumed.
143pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
144    if data.is_empty() {
145        return Err(SqlError::InvalidValue("empty key data".into()));
146    }
147    match data[0] {
148        TAG_NULL => Ok((Value::Null, 1)),
149        TAG_BOOLEAN => {
150            if data.len() < 2 {
151                return Err(SqlError::InvalidValue("truncated boolean".into()));
152            }
153            Ok((Value::Boolean(data[1] != 0), 2))
154        }
155        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
156        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
157        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
158        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
159            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
160            (Value::Date(d), n + 1)
161        }),
162        TAG_TIMESTAMP => {
163            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
164        }
165        TAG_INTERVAL => {
166            if data.len() < 1 + 16 {
167                return Err(SqlError::InvalidValue("truncated interval".into()));
168            }
169            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
170            mb[0] ^= 0x80;
171            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
172            db[0] ^= 0x80;
173            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
174            ub[0] ^= 0x80;
175            Ok((
176                Value::Interval {
177                    months: i32::from_be_bytes(mb),
178                    days: i32::from_be_bytes(db),
179                    micros: i64::from_be_bytes(ub),
180                },
181                17,
182            ))
183        }
184        TAG_TEXT => {
185            let (bytes, n) = decode_null_escaped(&data[1..])?;
186            let s = String::from_utf8(bytes)
187                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
188            Ok((Value::Text(CompactString::from(s)), n + 1))
189        }
190        TAG_BLOB => {
191            let (bytes, n) = decode_null_escaped(&data[1..])?;
192            Ok((Value::Blob(bytes), n + 1))
193        }
194        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
195    }
196}
197
198/// Decode a composite key into multiple values.
199pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
200    let mut values = Vec::with_capacity(count);
201    let mut pos = 0;
202    for _ in 0..count {
203        let (v, n) = decode_key_value(&data[pos..])?;
204        values.push(v);
205        pos += n;
206    }
207    Ok(values)
208}
209
210fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
211    let (v, n) = decode_signed_varint(data)?;
212    Ok((Value::Integer(v), n))
213}
214
215/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
216pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
217    if data.is_empty() {
218        return Err(SqlError::InvalidValue("truncated integer".into()));
219    }
220    let marker = data[0];
221    if marker == 0x80 {
222        return Ok((0, 1));
223    }
224    if marker > 0x80 {
225        let byte_count = (marker - 0x80) as usize;
226        if data.len() < 1 + byte_count {
227            return Err(SqlError::InvalidValue("truncated positive integer".into()));
228        }
229        let mut bytes = [0u8; 8];
230        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
231        let val = i64::from_be_bytes(bytes);
232        Ok((val, 1 + byte_count))
233    } else {
234        let byte_count = (0x80 - marker) as usize;
235        if data.len() < 1 + byte_count {
236            return Err(SqlError::InvalidValue("truncated negative integer".into()));
237        }
238        let mut bytes = [0u8; 8];
239        for i in 0..byte_count {
240            bytes[8 - byte_count + i] = !data[1 + i];
241        }
242        let abs_val = u64::from_be_bytes(bytes);
243        let val = (-(abs_val as i128)) as i64;
244        Ok((val, 1 + byte_count))
245    }
246}
247
248fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
249    if data.len() < 8 {
250        return Err(SqlError::InvalidValue("truncated real".into()));
251    }
252    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
253    let bits = if encoded & (1u64 << 63) != 0 {
254        // Was positive: undo sign bit flip
255        encoded ^ (1u64 << 63)
256    } else {
257        // Was negative: undo full inversion
258        !encoded
259    };
260    let val = f64::from_bits(bits);
261    Ok((Value::Real(val), 8))
262}
263
264/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
265fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
266    let mut result = Vec::new();
267    let mut i = 0;
268    while i < data.len() {
269        if data[i] == 0x00 {
270            if i + 1 < data.len() && data[i + 1] == 0xFF {
271                result.push(0x00);
272                i += 2;
273            } else {
274                return Ok((result, i + 1)); // terminator consumed
275            }
276        } else {
277            result.push(data[i]);
278            i += 1;
279        }
280    }
281    Err(SqlError::InvalidValue(
282        "unterminated null-escaped string".into(),
283    ))
284}
285
286fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
287    match v {
288        Value::Integer(val) => {
289            buf.push(DataType::Integer.type_tag());
290            buf.extend_from_slice(&val.to_le_bytes());
291        }
292        Value::Real(r) => {
293            buf.push(DataType::Real.type_tag());
294            buf.extend_from_slice(&r.to_le_bytes());
295        }
296        Value::Boolean(b) => {
297            buf.push(DataType::Boolean.type_tag());
298            buf.push(if *b { 1 } else { 0 });
299        }
300        Value::Text(s) => {
301            let bytes = s.as_bytes();
302            buf.push(DataType::Text.type_tag());
303            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
304            buf.extend_from_slice(bytes);
305        }
306        Value::Blob(data) => {
307            buf.push(DataType::Blob.type_tag());
308            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
309            buf.extend_from_slice(data);
310        }
311        Value::Time(t) => {
312            buf.push(DataType::Time.type_tag());
313            buf.extend_from_slice(&t.to_le_bytes());
314        }
315        Value::Date(d) => {
316            buf.push(DataType::Date.type_tag());
317            buf.extend_from_slice(&d.to_le_bytes());
318        }
319        Value::Timestamp(t) => {
320            buf.push(DataType::Timestamp.type_tag());
321            buf.extend_from_slice(&t.to_le_bytes());
322        }
323        Value::Interval {
324            months,
325            days,
326            micros,
327        } => {
328            buf.push(DataType::Interval.type_tag());
329            buf.extend_from_slice(&months.to_le_bytes());
330            buf.extend_from_slice(&days.to_le_bytes());
331            buf.extend_from_slice(&micros.to_le_bytes());
332        }
333        Value::Null => unreachable!(),
334    }
335}
336
337pub fn encode_row(values: &[Value]) -> Vec<u8> {
338    let mut buf = Vec::new();
339    encode_row_into(values, &mut buf);
340    buf
341}
342
343pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
344    buf.clear();
345    let col_count = values.len();
346    let bitmap_bytes = col_count.div_ceil(8);
347
348    let header = (col_count as u16) | V2_FLAG;
349    buf.extend_from_slice(&header.to_le_bytes());
350
351    let bitmap_start = buf.len();
352    buf.resize(buf.len() + bitmap_bytes, 0);
353
354    for (i, v) in values.iter().enumerate() {
355        if v.is_null() {
356            buf[bitmap_start + i / 8] |= 1 << (i % 8);
357            continue;
358        }
359        encode_cell_v2(v, buf);
360    }
361}
362
363pub struct IntRowTemplate {
364    pub template: Vec<u8>,
365    pub slot_offsets: Vec<(usize, usize)>,
366}
367
368pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
369    let bitmap_bytes = phys_count.div_ceil(8);
370    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
371    let header = (phys_count as u16) | V2_FLAG;
372    template.extend_from_slice(&header.to_le_bytes());
373    let bitmap_start = template.len();
374    template.resize(bitmap_start + bitmap_bytes, 0);
375    for &i in null_slots {
376        template[bitmap_start + i / 8] |= 1 << (i % 8);
377    }
378    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
379    for slot in 0..phys_count {
380        if null_slots.contains(&slot) {
381            continue;
382        }
383        template.push(DataType::Integer.type_tag());
384        let value_offset = template.len();
385        template.extend_from_slice(&[0u8; 8]);
386        slot_offsets.push((slot, value_offset));
387    }
388    IntRowTemplate {
389        template,
390        slot_offsets,
391    }
392}
393
394/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
395#[inline]
396pub fn encode_int_row_with_template(
397    tmpl: &IntRowTemplate,
398    values: &[Value],
399    buf: &mut Vec<u8>,
400) -> Result<()> {
401    buf.clear();
402    buf.extend_from_slice(&tmpl.template);
403    for &(slot, off) in &tmpl.slot_offsets {
404        match &values[slot] {
405            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
406            other => {
407                return Err(SqlError::TypeMismatch {
408                    expected: "Integer".into(),
409                    got: other.data_type().to_string(),
410                });
411            }
412        }
413    }
414    Ok(())
415}
416
417fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
418    match DataType::from_tag(type_tag) {
419        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
420            data[..8].try_into().unwrap(),
421        ))),
422        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
423            data[..8].try_into().unwrap(),
424        ))),
425        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
426        Some(DataType::Text) => {
427            let s = std::str::from_utf8(data)
428                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
429            Ok(Value::Text(CompactString::from(s)))
430        }
431        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
432        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
433            data[..8].try_into().unwrap(),
434        ))),
435        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
436            data[..4].try_into().unwrap(),
437        ))),
438        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
439            data[..8].try_into().unwrap(),
440        ))),
441        Some(DataType::Interval) => {
442            if data.len() < 16 {
443                return Err(SqlError::InvalidValue("truncated interval".into()));
444            }
445            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
446            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
447            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
448            Ok(Value::Interval {
449                months,
450                days,
451                micros,
452            })
453        }
454        _ => Err(SqlError::InvalidValue(format!(
455            "unknown column type tag: {type_tag}"
456        ))),
457    }
458}
459
460/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
461/// High bit of `col_count:u16` flags V2.
462#[derive(Clone, Copy, PartialEq, Eq, Debug)]
463pub(crate) enum RowVersion {
464    V1,
465    V2,
466}
467
468pub(crate) const V2_FLAG: u16 = 0x8000;
469pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
470
471#[inline]
472pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
473    match DataType::from_tag(type_tag)? {
474        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
475        DataType::Date => Some(4),
476        DataType::Boolean => Some(1),
477        DataType::Interval => Some(16),
478        DataType::Text | DataType::Blob | DataType::Null => None,
479    }
480}
481
482#[inline]
483fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
484    if pos >= data.len() {
485        return Err(SqlError::InvalidValue("truncated column data".into()));
486    }
487    let type_tag = data[pos];
488    let after_tag = pos + 1;
489    let (data_len, body_pos) = match version {
490        RowVersion::V2 => match fixed_width_size(type_tag) {
491            Some(n) => (n, after_tag),
492            None => {
493                if after_tag + 4 > data.len() {
494                    return Err(SqlError::InvalidValue("truncated column data".into()));
495                }
496                let len = u32::from_le_bytes([
497                    data[after_tag],
498                    data[after_tag + 1],
499                    data[after_tag + 2],
500                    data[after_tag + 3],
501                ]) as usize;
502                (len, after_tag + 4)
503            }
504        },
505        RowVersion::V1 => {
506            if after_tag + 4 > data.len() {
507                return Err(SqlError::InvalidValue("truncated column data".into()));
508            }
509            let len = u32::from_le_bytes([
510                data[after_tag],
511                data[after_tag + 1],
512                data[after_tag + 2],
513                data[after_tag + 3],
514            ]) as usize;
515            (len, after_tag + 4)
516        }
517    };
518    if body_pos + data_len > data.len() {
519        return Err(SqlError::InvalidValue("truncated column value".into()));
520    }
521    Ok((
522        type_tag,
523        &data[body_pos..body_pos + data_len],
524        body_pos + data_len,
525    ))
526}
527
528#[inline]
529fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
530    let (_, _, next) = read_cell(data, pos, version)?;
531    Ok(next)
532}
533
534fn copy_cell_to_v2(
535    data: &[u8],
536    pos: usize,
537    version: RowVersion,
538    out: &mut Vec<u8>,
539) -> Result<usize> {
540    let (tag, body, next) = read_cell(data, pos, version)?;
541    out.push(tag);
542    if fixed_width_size(tag).is_none() {
543        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
544    }
545    out.extend_from_slice(body);
546    Ok(next)
547}
548
549fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
550    if data.len() < 2 {
551        return Err(SqlError::InvalidValue("row data too short".into()));
552    }
553    let raw = u16::from_le_bytes([data[0], data[1]]);
554    let version = if raw & V2_FLAG != 0 {
555        RowVersion::V2
556    } else {
557        RowVersion::V1
558    };
559    let col_count = (raw & COL_COUNT_MASK) as usize;
560    let bitmap_bytes = col_count.div_ceil(8);
561    let pos = 2;
562    if data.len() < pos + bitmap_bytes {
563        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
564    }
565    Ok((
566        version,
567        col_count,
568        &data[pos..pos + bitmap_bytes],
569        pos + bitmap_bytes,
570    ))
571}
572
573pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
574    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
575
576    let mut values = Vec::with_capacity(col_count);
577    for i in 0..col_count {
578        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
579            values.push(Value::Null);
580            continue;
581        }
582        let (type_tag, body, next) = read_cell(data, pos, version)?;
583        values.push(decode_value(type_tag, body)?);
584        pos = next;
585    }
586
587    Ok(values)
588}
589
590/// Returns the number of non-PK columns stored in a row value blob.
591#[inline]
592pub fn row_non_pk_count(data: &[u8]) -> usize {
593    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
594}
595
596pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
597    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
598
599    for i in 0..col_count {
600        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
601            continue;
602        }
603        let (type_tag, body, next) = read_cell(data, pos, version)?;
604        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
605            out[col_mapping[i]] = decode_value(type_tag, body)?;
606        }
607        pos = next;
608    }
609
610    Ok(())
611}
612
613pub fn decode_pk_into(
614    key: &[u8],
615    count: usize,
616    out: &mut [Value],
617    pk_mapping: &[usize],
618) -> Result<()> {
619    let mut pos = 0;
620    for i in 0..count {
621        let (v, n) = decode_key_value(&key[pos..])?;
622        if i < pk_mapping.len() {
623            out[pk_mapping[i]] = v;
624        }
625        pos += n;
626    }
627    Ok(())
628}
629
630pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
631    if targets.is_empty() {
632        return Ok(Vec::new());
633    }
634    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
635
636    let mut results = Vec::with_capacity(targets.len());
637    let mut ti = 0;
638
639    for col in 0..col_count {
640        if ti >= targets.len() {
641            break;
642        }
643        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
644
645        if col == targets[ti] {
646            if is_null {
647                results.push(Value::Null);
648            } else {
649                let (type_tag, body, next) = read_cell(data, pos, version)?;
650                results.push(decode_value(type_tag, body)?);
651                pos = next;
652            }
653            ti += 1;
654        } else if !is_null {
655            pos = skip_cell(data, pos, version)?;
656        }
657    }
658
659    while ti < targets.len() {
660        results.push(Value::Null);
661        ti += 1;
662    }
663
664    Ok(results)
665}
666
667pub fn decode_columns_into(
668    data: &[u8],
669    targets: &[usize],
670    schema_cols: &[usize],
671    row: &mut [Value],
672) -> Result<()> {
673    if targets.is_empty() {
674        return Ok(());
675    }
676    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
677
678    let mut ti = 0;
679    for col in 0..col_count {
680        if ti >= targets.len() {
681            break;
682        }
683        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
684
685        if col == targets[ti] {
686            if is_null {
687                row[schema_cols[ti]] = Value::Null;
688            } else {
689                let (type_tag, body, next) = read_cell(data, pos, version)?;
690                row[schema_cols[ti]] = decode_value(type_tag, body)?;
691                pos = next;
692            }
693            ti += 1;
694        } else if !is_null {
695            pos = skip_cell(data, pos, version)?;
696        }
697    }
698
699    Ok(())
700}
701
702#[derive(Debug, Clone, Copy)]
703pub enum RawColumn<'a> {
704    Null,
705    Integer(i64),
706    Real(f64),
707    Boolean(bool),
708    Text(&'a str),
709    Blob(&'a [u8]),
710    Time(i64),
711    Date(i32),
712    Timestamp(i64),
713    Interval { months: i32, days: i32, micros: i64 },
714}
715
716impl<'a> RawColumn<'a> {
717    pub fn to_value(self) -> Value {
718        match self {
719            RawColumn::Null => Value::Null,
720            RawColumn::Integer(i) => Value::Integer(i),
721            RawColumn::Real(r) => Value::Real(r),
722            RawColumn::Boolean(b) => Value::Boolean(b),
723            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
724            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
725            RawColumn::Time(t) => Value::Time(t),
726            RawColumn::Date(d) => Value::Date(d),
727            RawColumn::Timestamp(t) => Value::Timestamp(t),
728            RawColumn::Interval {
729                months,
730                days,
731                micros,
732            } => Value::Interval {
733                months,
734                days,
735                micros,
736            },
737        }
738    }
739
740    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
741        use std::cmp::Ordering;
742        match (self, other) {
743            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
744            (RawColumn::Null, _) | (_, Value::Null) => None,
745            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
746            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
747            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
748            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
749            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
750            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
751            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
752            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
753            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
754            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
755            (
756                RawColumn::Interval {
757                    months: am,
758                    days: ad,
759                    micros: au,
760                },
761                Value::Interval {
762                    months: bm,
763                    days: bd,
764                    micros: bu,
765                },
766            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
767            _ => None,
768        }
769    }
770
771    pub fn eq_value(&self, other: &Value) -> bool {
772        match (self, other) {
773            (RawColumn::Null, Value::Null) => true,
774            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
775            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
776            (RawColumn::Real(a), Value::Real(b)) => a == b,
777            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
778            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
779            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
780            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
781            (RawColumn::Time(a), Value::Time(b)) => a == b,
782            (RawColumn::Date(a), Value::Date(b)) => a == b,
783            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
784            (
785                RawColumn::Interval {
786                    months: am,
787                    days: ad,
788                    micros: au,
789                },
790                Value::Interval {
791                    months: bm,
792                    days: bd,
793                    micros: bu,
794                },
795            ) => am == bm && ad == bd && au == bu,
796            _ => false,
797        }
798    }
799
800    pub fn as_f64(&self) -> Option<f64> {
801        match self {
802            RawColumn::Integer(i) => Some(*i as f64),
803            RawColumn::Real(r) => Some(*r),
804            _ => None,
805        }
806    }
807
808    pub fn as_i64(&self) -> Option<i64> {
809        match self {
810            RawColumn::Integer(i) => Some(*i),
811            RawColumn::Time(t) => Some(*t),
812            RawColumn::Date(d) => Some(*d as i64),
813            RawColumn::Timestamp(t) => Some(*t),
814            _ => None,
815        }
816    }
817}
818
819fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
820    match DataType::from_tag(type_tag) {
821        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
822            data[..8].try_into().unwrap(),
823        ))),
824        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
825            data[..8].try_into().unwrap(),
826        ))),
827        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
828        Some(DataType::Text) => {
829            let s = std::str::from_utf8(data)
830                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
831            Ok(RawColumn::Text(s))
832        }
833        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
834        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
835            data[..8].try_into().unwrap(),
836        ))),
837        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
838            data[..4].try_into().unwrap(),
839        ))),
840        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
841            data[..8].try_into().unwrap(),
842        ))),
843        Some(DataType::Interval) => {
844            if data.len() < 16 {
845                return Err(SqlError::InvalidValue("truncated interval".into()));
846            }
847            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
848            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
849            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
850            Ok(RawColumn::Interval {
851                months,
852                days,
853                micros,
854            })
855        }
856        _ => Err(SqlError::InvalidValue(format!(
857            "unknown column type tag: {type_tag}"
858        ))),
859    }
860}
861
862/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
863pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
864    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
865    if target >= col_count || new_val.is_null() {
866        return Ok(false);
867    }
868    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
869    if was_null {
870        return Ok(false);
871    }
872    for col in 0..target {
873        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
874        if !is_null {
875            pos = skip_cell(data, pos, version)?;
876        }
877    }
878    let type_tag = data[pos];
879    let (old_data_len, val_start) = match version {
880        RowVersion::V2 => match fixed_width_size(type_tag) {
881            Some(n) => (n, pos + 1),
882            None => {
883                if pos + 5 > data.len() {
884                    return Err(SqlError::InvalidValue("truncated column data".into()));
885                }
886                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
887                (len, pos + 5)
888            }
889        },
890        RowVersion::V1 => {
891            if pos + 5 > data.len() {
892                return Err(SqlError::InvalidValue("truncated column data".into()));
893            }
894            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
895            (len, pos + 5)
896        }
897    };
898    let new_data_len = match new_val {
899        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
900        Value::Date(_) => 4,
901        Value::Interval { .. } => 16,
902        Value::Boolean(_) => 1,
903        Value::Text(s) => s.len(),
904        Value::Blob(b) => b.len(),
905        Value::Null => return Ok(false),
906    };
907    if new_data_len != old_data_len {
908        return Ok(false);
909    }
910    data[pos] = new_val.data_type().type_tag();
911    match new_val {
912        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
913        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
914        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
915        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
916        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
917        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
918        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
919        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
920        Value::Interval {
921            months,
922            days,
923            micros,
924        } => {
925            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
926            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
927            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
928        }
929        Value::Null => unreachable!(),
930    }
931    Ok(true)
932}
933
934/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
935pub fn patch_row_column(
936    data: &[u8],
937    target: usize,
938    new_val: &Value,
939    out: &mut Vec<u8>,
940) -> Result<()> {
941    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
942
943    let new_col_count = if target >= col_count {
944        target + 1
945    } else {
946        col_count
947    };
948    let new_bitmap_bytes = new_col_count.div_ceil(8);
949    let bitmap_bytes = col_count.div_ceil(8);
950    out.clear();
951
952    let header = (new_col_count as u16) | V2_FLAG;
953    out.extend_from_slice(&header.to_le_bytes());
954    let bitmap_start = out.len();
955    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
956    for _ in bitmap_bytes..new_bitmap_bytes {
957        out.push(0xFF);
958    }
959    if new_val.is_null() {
960        out[bitmap_start + target / 8] |= 1 << (target % 8);
961    } else {
962        out[bitmap_start + target / 8] &= !(1 << (target % 8));
963    }
964
965    let mut pos = header_end;
966    for col in 0..new_col_count {
967        let was_null = if col < col_count {
968            bitmap[col / 8] & (1 << (col % 8)) != 0
969        } else {
970            true
971        };
972
973        if col == target {
974            if !was_null {
975                pos = skip_cell(data, pos, version)?;
976            }
977            if !new_val.is_null() {
978                encode_cell_v2(new_val, out);
979            }
980        } else if !was_null {
981            pos = copy_cell_to_v2(data, pos, version, out)?;
982        }
983    }
984    Ok(())
985}
986
987pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
988    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
989    if target >= col_count {
990        return Ok(RawColumn::Null);
991    }
992
993    for col in 0..=target {
994        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
995
996        if col == target {
997            if is_null {
998                return Ok(RawColumn::Null);
999            }
1000            let (type_tag, body, _) = read_cell(data, pos, version)?;
1001            return decode_value_raw(type_tag, body);
1002        } else if !is_null {
1003            pos = skip_cell(data, pos, version)?;
1004        }
1005    }
1006
1007    unreachable!()
1008}
1009
1010/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1011pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1012    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1013    if target >= col_count {
1014        return Ok((RawColumn::Null, usize::MAX));
1015    }
1016
1017    for col in 0..=target {
1018        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1019
1020        if col == target {
1021            if is_null {
1022                return Ok((RawColumn::Null, usize::MAX));
1023            }
1024            let tag_offset = pos;
1025            let (type_tag, body, _) = read_cell(data, pos, version)?;
1026            let raw = decode_value_raw(type_tag, body)?;
1027            return Ok((raw, tag_offset));
1028        } else if !is_null {
1029            pos = skip_cell(data, pos, version)?;
1030        }
1031    }
1032
1033    unreachable!()
1034}
1035
1036/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1037pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1038    if offset == usize::MAX || new_val.is_null() {
1039        return Ok(false);
1040    }
1041    if data.len() < 2 || offset >= data.len() {
1042        return Err(SqlError::InvalidValue("truncated column data".into()));
1043    }
1044    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1045        RowVersion::V2
1046    } else {
1047        RowVersion::V1
1048    };
1049    let type_tag = data[offset];
1050    let (old_data_len, val_start) = match version {
1051        RowVersion::V2 => match fixed_width_size(type_tag) {
1052            Some(n) => (n, offset + 1),
1053            None => {
1054                if offset + 5 > data.len() {
1055                    return Err(SqlError::InvalidValue("truncated column data".into()));
1056                }
1057                let len =
1058                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1059                (len, offset + 5)
1060            }
1061        },
1062        RowVersion::V1 => {
1063            if offset + 5 > data.len() {
1064                return Err(SqlError::InvalidValue("truncated column data".into()));
1065            }
1066            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1067            (len, offset + 5)
1068        }
1069    };
1070    let new_data_len = match new_val {
1071        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1072        Value::Date(_) => 4,
1073        Value::Interval { .. } => 16,
1074        Value::Boolean(_) => 1,
1075        Value::Text(s) => s.len(),
1076        Value::Blob(b) => b.len(),
1077        Value::Null => return Ok(false),
1078    };
1079    if new_data_len != old_data_len {
1080        return Ok(false);
1081    }
1082    data[offset] = new_val.data_type().type_tag();
1083    match new_val {
1084        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1085        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1086        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1087        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1088        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1089        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1090        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1091        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1092        Value::Interval {
1093            months,
1094            days,
1095            micros,
1096        } => {
1097            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1098            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1099            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1100        }
1101        Value::Null => unreachable!(),
1102    }
1103    Ok(true)
1104}
1105
1106pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1107    if key.is_empty() || key[0] != TAG_INTEGER {
1108        return Err(SqlError::InvalidValue("not an integer key".into()));
1109    }
1110    let (val, _) = decode_integer(&key[1..])?;
1111    match val {
1112        Value::Integer(i) => Ok(i),
1113        _ => unreachable!(),
1114    }
1115}
1116
1117#[cfg(test)]
1118#[path = "encoding_tests.rs"]
1119mod tests;