Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19
20/// Encode a single value into an order-preserving byte sequence.
21pub fn encode_key_value(value: &Value) -> Vec<u8> {
22    let mut buf = Vec::with_capacity(16);
23    encode_key_value_into(value, &mut buf);
24    buf
25}
26
27/// Encode a composite key (multiple values concatenated).
28pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
29    let mut buf = Vec::new();
30    for v in values {
31        buf.extend_from_slice(&encode_key_value(v));
32    }
33    buf
34}
35
36pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
37    buf.clear();
38    for v in values {
39        encode_key_value_into(v, buf);
40    }
41}
42
43pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
44    buf.clear();
45    for &i in indices {
46        encode_key_value_into(&row[i as usize], buf);
47    }
48}
49
50#[inline]
51pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
52    buf.clear();
53    encode_signed_varint(TAG_INTEGER, val, buf);
54}
55
56pub(crate) fn encode_key_value_collated_into(
57    value: &Value,
58    coll: crate::types::Collation,
59    buf: &mut Vec<u8>,
60) {
61    match (value, coll) {
62        (Value::Text(s), crate::types::Collation::NoCase) => {
63            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
64        }
65        (Value::Text(s), crate::types::Collation::Rtrim) => {
66            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
67        }
68        _ => encode_key_value_into(value, buf),
69    }
70}
71
72pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
73    match value {
74        Value::Null => buf.push(TAG_NULL),
75        Value::Boolean(b) => {
76            buf.push(TAG_BOOLEAN);
77            buf.push(if *b { 0x01 } else { 0x00 });
78        }
79        Value::Integer(i) => encode_integer_into(*i, buf),
80        Value::Real(r) => encode_real_into(*r, buf),
81        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
82        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
83        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
84        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
85        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
86        Value::Interval {
87            months,
88            days,
89            micros,
90        } => {
91            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
92            buf.push(TAG_INTERVAL);
93            let mut mb = months.to_be_bytes();
94            mb[0] ^= 0x80;
95            buf.extend_from_slice(&mb);
96            let mut db = days.to_be_bytes();
97            db[0] ^= 0x80;
98            buf.extend_from_slice(&db);
99            let mut ub = micros.to_be_bytes();
100            ub[0] ^= 0x80;
101            buf.extend_from_slice(&ub);
102        }
103        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
104        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
105    }
106}
107
108fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
109    encode_signed_varint(TAG_INTEGER, val, buf);
110}
111
112/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
113/// Layout: [tag] [marker] [data bytes].
114/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
115/// 0x80-n for negative (n one's-complemented bytes follow).
116/// Byte-wise lex compare matches signed integer order.
117pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
118    buf.push(tag);
119    if val == 0 {
120        buf.push(0x80);
121        return;
122    }
123    if val > 0 {
124        let bytes = val.to_be_bytes();
125        let start = bytes.iter().position(|&b| b != 0).unwrap();
126        let byte_count = (8 - start) as u8;
127        buf.push(0x80 + byte_count);
128        buf.extend_from_slice(&bytes[start..]);
129    } else {
130        let abs_val = if val == i64::MIN {
131            u64::MAX / 2 + 1
132        } else {
133            (-val) as u64
134        };
135        let bytes = abs_val.to_be_bytes();
136        let start = bytes.iter().position(|&b| b != 0).unwrap();
137        let byte_count = (8 - start) as u8;
138        buf.push(0x80 - byte_count);
139        for &b in &bytes[start..] {
140            buf.push(!b);
141        }
142    }
143}
144
145fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
146    buf.push(TAG_REAL);
147    let bits = val.to_bits();
148    let encoded = if val.is_sign_negative() {
149        !bits
150    } else {
151        bits ^ (1u64 << 63)
152    };
153    buf.extend_from_slice(&encoded.to_be_bytes());
154}
155
156fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
157    buf.push(tag);
158    for &b in data {
159        if b == 0x00 {
160            buf.push(0x00);
161            buf.push(0xFF);
162        } else {
163            buf.push(b);
164        }
165    }
166    buf.push(0x00);
167}
168
169/// Decode a single key value, returning the value and the number of bytes consumed.
170pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
171    if data.is_empty() {
172        return Err(SqlError::InvalidValue("empty key data".into()));
173    }
174    match data[0] {
175        TAG_NULL => Ok((Value::Null, 1)),
176        TAG_BOOLEAN => {
177            if data.len() < 2 {
178                return Err(SqlError::InvalidValue("truncated boolean".into()));
179            }
180            Ok((Value::Boolean(data[1] != 0), 2))
181        }
182        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
183        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
184        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
185        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
186            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
187            (Value::Date(d), n + 1)
188        }),
189        TAG_TIMESTAMP => {
190            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
191        }
192        TAG_INTERVAL => {
193            if data.len() < 1 + 16 {
194                return Err(SqlError::InvalidValue("truncated interval".into()));
195            }
196            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
197            mb[0] ^= 0x80;
198            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
199            db[0] ^= 0x80;
200            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
201            ub[0] ^= 0x80;
202            Ok((
203                Value::Interval {
204                    months: i32::from_be_bytes(mb),
205                    days: i32::from_be_bytes(db),
206                    micros: i64::from_be_bytes(ub),
207                },
208                17,
209            ))
210        }
211        TAG_TEXT => {
212            let (bytes, n) = decode_null_escaped(&data[1..])?;
213            let s = String::from_utf8(bytes)
214                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
215            Ok((Value::Text(CompactString::from(s)), n + 1))
216        }
217        TAG_BLOB => {
218            let (bytes, n) = decode_null_escaped(&data[1..])?;
219            Ok((Value::Blob(bytes), n + 1))
220        }
221        TAG_JSON => {
222            let (bytes, n) = decode_null_escaped(&data[1..])?;
223            let s = String::from_utf8(bytes)
224                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
225            Ok((Value::Json(CompactString::from(s)), n + 1))
226        }
227        TAG_JSONB => {
228            let (bytes, n) = decode_null_escaped(&data[1..])?;
229            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
230        }
231        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
232    }
233}
234
235/// Decode a composite key into multiple values.
236pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
237    let mut values = Vec::with_capacity(count);
238    let mut pos = 0;
239    for _ in 0..count {
240        let (v, n) = decode_key_value(&data[pos..])?;
241        values.push(v);
242        pos += n;
243    }
244    Ok(values)
245}
246
247fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
248    let (v, n) = decode_signed_varint(data)?;
249    Ok((Value::Integer(v), n))
250}
251
252/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
253pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
254    if data.is_empty() {
255        return Err(SqlError::InvalidValue("truncated integer".into()));
256    }
257    let marker = data[0];
258    if marker == 0x80 {
259        return Ok((0, 1));
260    }
261    if marker > 0x80 {
262        let byte_count = (marker - 0x80) as usize;
263        if data.len() < 1 + byte_count {
264            return Err(SqlError::InvalidValue("truncated positive integer".into()));
265        }
266        let mut bytes = [0u8; 8];
267        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
268        let val = i64::from_be_bytes(bytes);
269        Ok((val, 1 + byte_count))
270    } else {
271        let byte_count = (0x80 - marker) as usize;
272        if data.len() < 1 + byte_count {
273            return Err(SqlError::InvalidValue("truncated negative integer".into()));
274        }
275        let mut bytes = [0u8; 8];
276        for i in 0..byte_count {
277            bytes[8 - byte_count + i] = !data[1 + i];
278        }
279        let abs_val = u64::from_be_bytes(bytes);
280        let val = (-(abs_val as i128)) as i64;
281        Ok((val, 1 + byte_count))
282    }
283}
284
285fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
286    if data.len() < 8 {
287        return Err(SqlError::InvalidValue("truncated real".into()));
288    }
289    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
290    let bits = if encoded & (1u64 << 63) != 0 {
291        // Was positive: undo sign bit flip
292        encoded ^ (1u64 << 63)
293    } else {
294        // Was negative: undo full inversion
295        !encoded
296    };
297    let val = f64::from_bits(bits);
298    Ok((Value::Real(val), 8))
299}
300
301/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
302fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
303    let mut result = Vec::new();
304    let mut i = 0;
305    while i < data.len() {
306        if data[i] == 0x00 {
307            if i + 1 < data.len() && data[i + 1] == 0xFF {
308                result.push(0x00);
309                i += 2;
310            } else {
311                return Ok((result, i + 1)); // terminator consumed
312            }
313        } else {
314            result.push(data[i]);
315            i += 1;
316        }
317    }
318    Err(SqlError::InvalidValue(
319        "unterminated null-escaped string".into(),
320    ))
321}
322
323fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
324    match v {
325        Value::Integer(val) => {
326            buf.push(DataType::Integer.type_tag());
327            buf.extend_from_slice(&val.to_le_bytes());
328        }
329        Value::Real(r) => {
330            buf.push(DataType::Real.type_tag());
331            buf.extend_from_slice(&r.to_le_bytes());
332        }
333        Value::Boolean(b) => {
334            buf.push(DataType::Boolean.type_tag());
335            buf.push(if *b { 1 } else { 0 });
336        }
337        Value::Text(s) => {
338            let bytes = s.as_bytes();
339            buf.push(DataType::Text.type_tag());
340            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
341            buf.extend_from_slice(bytes);
342        }
343        Value::Blob(data) => {
344            buf.push(DataType::Blob.type_tag());
345            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
346            buf.extend_from_slice(data);
347        }
348        Value::Time(t) => {
349            buf.push(DataType::Time.type_tag());
350            buf.extend_from_slice(&t.to_le_bytes());
351        }
352        Value::Date(d) => {
353            buf.push(DataType::Date.type_tag());
354            buf.extend_from_slice(&d.to_le_bytes());
355        }
356        Value::Timestamp(t) => {
357            buf.push(DataType::Timestamp.type_tag());
358            buf.extend_from_slice(&t.to_le_bytes());
359        }
360        Value::Interval {
361            months,
362            days,
363            micros,
364        } => {
365            buf.push(DataType::Interval.type_tag());
366            buf.extend_from_slice(&months.to_le_bytes());
367            buf.extend_from_slice(&days.to_le_bytes());
368            buf.extend_from_slice(&micros.to_le_bytes());
369        }
370        Value::Json(s) => {
371            let bytes = s.as_bytes();
372            buf.push(DataType::Json.type_tag());
373            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
374            buf.extend_from_slice(bytes);
375        }
376        Value::Jsonb(b) => {
377            buf.push(DataType::Jsonb.type_tag());
378            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
379            buf.extend_from_slice(b);
380        }
381        Value::Null => unreachable!(),
382    }
383}
384
385pub fn encode_row(values: &[Value]) -> Vec<u8> {
386    let mut buf = Vec::new();
387    encode_row_into(values, &mut buf);
388    buf
389}
390
391pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
392    buf.clear();
393    let col_count = values.len();
394    let bitmap_bytes = col_count.div_ceil(8);
395
396    let header = (col_count as u16) | V2_FLAG;
397    buf.extend_from_slice(&header.to_le_bytes());
398
399    let bitmap_start = buf.len();
400    buf.resize(buf.len() + bitmap_bytes, 0);
401
402    for (i, v) in values.iter().enumerate() {
403        if v.is_null() {
404            buf[bitmap_start + i / 8] |= 1 << (i % 8);
405            continue;
406        }
407        encode_cell_v2(v, buf);
408    }
409}
410
411pub struct IntRowTemplate {
412    pub template: Vec<u8>,
413    pub slot_offsets: Vec<(usize, usize)>,
414}
415
416pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
417    let bitmap_bytes = phys_count.div_ceil(8);
418    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
419    let header = (phys_count as u16) | V2_FLAG;
420    template.extend_from_slice(&header.to_le_bytes());
421    let bitmap_start = template.len();
422    template.resize(bitmap_start + bitmap_bytes, 0);
423    for &i in null_slots {
424        template[bitmap_start + i / 8] |= 1 << (i % 8);
425    }
426    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
427    for slot in 0..phys_count {
428        if null_slots.contains(&slot) {
429            continue;
430        }
431        template.push(DataType::Integer.type_tag());
432        let value_offset = template.len();
433        template.extend_from_slice(&[0u8; 8]);
434        slot_offsets.push((slot, value_offset));
435    }
436    IntRowTemplate {
437        template,
438        slot_offsets,
439    }
440}
441
442/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
443#[inline]
444pub fn encode_int_row_with_template(
445    tmpl: &IntRowTemplate,
446    values: &[Value],
447    buf: &mut Vec<u8>,
448) -> Result<()> {
449    buf.clear();
450    buf.extend_from_slice(&tmpl.template);
451    for &(slot, off) in &tmpl.slot_offsets {
452        match &values[slot] {
453            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
454            other => {
455                return Err(SqlError::TypeMismatch {
456                    expected: "Integer".into(),
457                    got: other.data_type().to_string(),
458                });
459            }
460        }
461    }
462    Ok(())
463}
464
465fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
466    match DataType::from_tag(type_tag) {
467        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
468            data[..8].try_into().unwrap(),
469        ))),
470        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
471            data[..8].try_into().unwrap(),
472        ))),
473        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
474        Some(DataType::Text) => {
475            let s = std::str::from_utf8(data)
476                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
477            Ok(Value::Text(CompactString::from(s)))
478        }
479        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
480        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
481            data[..8].try_into().unwrap(),
482        ))),
483        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
484            data[..4].try_into().unwrap(),
485        ))),
486        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
487            data[..8].try_into().unwrap(),
488        ))),
489        Some(DataType::Interval) => {
490            if data.len() < 16 {
491                return Err(SqlError::InvalidValue("truncated interval".into()));
492            }
493            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
494            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
495            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
496            Ok(Value::Interval {
497                months,
498                days,
499                micros,
500            })
501        }
502        Some(DataType::Json) => {
503            let s = std::str::from_utf8(data)
504                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
505            Ok(Value::Json(CompactString::from(s)))
506        }
507        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
508        _ => Err(SqlError::InvalidValue(format!(
509            "unknown column type tag: {type_tag}"
510        ))),
511    }
512}
513
514/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
515/// High bit of `col_count:u16` flags V2.
516#[derive(Clone, Copy, PartialEq, Eq, Debug)]
517pub(crate) enum RowVersion {
518    V1,
519    V2,
520}
521
522pub(crate) const V2_FLAG: u16 = 0x8000;
523pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
524
525#[inline]
526pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
527    match DataType::from_tag(type_tag)? {
528        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
529        DataType::Date => Some(4),
530        DataType::Boolean => Some(1),
531        DataType::Interval => Some(16),
532        DataType::Text | DataType::Blob | DataType::Json | DataType::Jsonb | DataType::Null => None,
533    }
534}
535
536#[inline]
537fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
538    if pos >= data.len() {
539        return Err(SqlError::InvalidValue("truncated column data".into()));
540    }
541    let type_tag = data[pos];
542    let after_tag = pos + 1;
543    let (data_len, body_pos) = match version {
544        RowVersion::V2 => match fixed_width_size(type_tag) {
545            Some(n) => (n, after_tag),
546            None => {
547                if after_tag + 4 > data.len() {
548                    return Err(SqlError::InvalidValue("truncated column data".into()));
549                }
550                let len = u32::from_le_bytes([
551                    data[after_tag],
552                    data[after_tag + 1],
553                    data[after_tag + 2],
554                    data[after_tag + 3],
555                ]) as usize;
556                (len, after_tag + 4)
557            }
558        },
559        RowVersion::V1 => {
560            if after_tag + 4 > data.len() {
561                return Err(SqlError::InvalidValue("truncated column data".into()));
562            }
563            let len = u32::from_le_bytes([
564                data[after_tag],
565                data[after_tag + 1],
566                data[after_tag + 2],
567                data[after_tag + 3],
568            ]) as usize;
569            (len, after_tag + 4)
570        }
571    };
572    if body_pos + data_len > data.len() {
573        return Err(SqlError::InvalidValue("truncated column value".into()));
574    }
575    Ok((
576        type_tag,
577        &data[body_pos..body_pos + data_len],
578        body_pos + data_len,
579    ))
580}
581
582#[inline]
583fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
584    let (_, _, next) = read_cell(data, pos, version)?;
585    Ok(next)
586}
587
588fn copy_cell_to_v2(
589    data: &[u8],
590    pos: usize,
591    version: RowVersion,
592    out: &mut Vec<u8>,
593) -> Result<usize> {
594    let (tag, body, next) = read_cell(data, pos, version)?;
595    out.push(tag);
596    if fixed_width_size(tag).is_none() {
597        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
598    }
599    out.extend_from_slice(body);
600    Ok(next)
601}
602
603fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
604    if data.len() < 2 {
605        return Err(SqlError::InvalidValue("row data too short".into()));
606    }
607    let raw = u16::from_le_bytes([data[0], data[1]]);
608    let version = if raw & V2_FLAG != 0 {
609        RowVersion::V2
610    } else {
611        RowVersion::V1
612    };
613    let col_count = (raw & COL_COUNT_MASK) as usize;
614    let bitmap_bytes = col_count.div_ceil(8);
615    let pos = 2;
616    if data.len() < pos + bitmap_bytes {
617        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
618    }
619    Ok((
620        version,
621        col_count,
622        &data[pos..pos + bitmap_bytes],
623        pos + bitmap_bytes,
624    ))
625}
626
627pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
628    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
629
630    let mut values = Vec::with_capacity(col_count);
631    for i in 0..col_count {
632        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
633            values.push(Value::Null);
634            continue;
635        }
636        let (type_tag, body, next) = read_cell(data, pos, version)?;
637        values.push(decode_value(type_tag, body)?);
638        pos = next;
639    }
640
641    Ok(values)
642}
643
644/// Returns the number of non-PK columns stored in a row value blob.
645#[inline]
646pub fn row_non_pk_count(data: &[u8]) -> usize {
647    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
648}
649
650pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
651    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
652
653    for i in 0..col_count {
654        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
655            continue;
656        }
657        let (type_tag, body, next) = read_cell(data, pos, version)?;
658        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
659            out[col_mapping[i]] = decode_value(type_tag, body)?;
660        }
661        pos = next;
662    }
663
664    Ok(())
665}
666
667pub fn decode_pk_into(
668    key: &[u8],
669    count: usize,
670    out: &mut [Value],
671    pk_mapping: &[usize],
672) -> Result<()> {
673    let mut pos = 0;
674    for i in 0..count {
675        let (v, n) = decode_key_value(&key[pos..])?;
676        if i < pk_mapping.len() {
677            out[pk_mapping[i]] = v;
678        }
679        pos += n;
680    }
681    Ok(())
682}
683
684pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
685    if targets.is_empty() {
686        return Ok(Vec::new());
687    }
688    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
689
690    let mut results = Vec::with_capacity(targets.len());
691    let mut ti = 0;
692
693    for col in 0..col_count {
694        if ti >= targets.len() {
695            break;
696        }
697        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
698
699        if col == targets[ti] {
700            if is_null {
701                results.push(Value::Null);
702            } else {
703                let (type_tag, body, next) = read_cell(data, pos, version)?;
704                results.push(decode_value(type_tag, body)?);
705                pos = next;
706            }
707            ti += 1;
708        } else if !is_null {
709            pos = skip_cell(data, pos, version)?;
710        }
711    }
712
713    while ti < targets.len() {
714        results.push(Value::Null);
715        ti += 1;
716    }
717
718    Ok(results)
719}
720
721pub fn decode_columns_into(
722    data: &[u8],
723    targets: &[usize],
724    schema_cols: &[usize],
725    row: &mut [Value],
726) -> Result<()> {
727    if targets.is_empty() {
728        return Ok(());
729    }
730    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
731
732    let mut ti = 0;
733    for col in 0..col_count {
734        if ti >= targets.len() {
735            break;
736        }
737        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
738
739        if col == targets[ti] {
740            if is_null {
741                row[schema_cols[ti]] = Value::Null;
742            } else {
743                let (type_tag, body, next) = read_cell(data, pos, version)?;
744                row[schema_cols[ti]] = decode_value(type_tag, body)?;
745                pos = next;
746            }
747            ti += 1;
748        } else if !is_null {
749            pos = skip_cell(data, pos, version)?;
750        }
751    }
752
753    Ok(())
754}
755
756#[derive(Debug, Clone, Copy)]
757pub enum RawColumn<'a> {
758    Null,
759    Integer(i64),
760    Real(f64),
761    Boolean(bool),
762    Text(&'a str),
763    Blob(&'a [u8]),
764    Time(i64),
765    Date(i32),
766    Timestamp(i64),
767    Interval { months: i32, days: i32, micros: i64 },
768    Json(&'a str),
769    Jsonb(&'a [u8]),
770}
771
772impl<'a> RawColumn<'a> {
773    pub fn to_value(self) -> Value {
774        match self {
775            RawColumn::Null => Value::Null,
776            RawColumn::Integer(i) => Value::Integer(i),
777            RawColumn::Real(r) => Value::Real(r),
778            RawColumn::Boolean(b) => Value::Boolean(b),
779            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
780            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
781            RawColumn::Time(t) => Value::Time(t),
782            RawColumn::Date(d) => Value::Date(d),
783            RawColumn::Timestamp(t) => Value::Timestamp(t),
784            RawColumn::Interval {
785                months,
786                days,
787                micros,
788            } => Value::Interval {
789                months,
790                days,
791                micros,
792            },
793            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
794            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
795        }
796    }
797
798    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
799        use std::cmp::Ordering;
800        match (self, other) {
801            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
802            (RawColumn::Null, _) | (_, Value::Null) => None,
803            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
804            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
805            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
806            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
807            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
808            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
809            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
810            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
811            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
812            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
813            (
814                RawColumn::Interval {
815                    months: am,
816                    days: ad,
817                    micros: au,
818                },
819                Value::Interval {
820                    months: bm,
821                    days: bd,
822                    micros: bu,
823                },
824            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
825            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
826            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
827            _ => None,
828        }
829    }
830
831    pub fn eq_value(&self, other: &Value) -> bool {
832        match (self, other) {
833            (RawColumn::Null, Value::Null) => true,
834            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
835            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
836            (RawColumn::Real(a), Value::Real(b)) => a == b,
837            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
838            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
839            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
840            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
841            (RawColumn::Time(a), Value::Time(b)) => a == b,
842            (RawColumn::Date(a), Value::Date(b)) => a == b,
843            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
844            (
845                RawColumn::Interval {
846                    months: am,
847                    days: ad,
848                    micros: au,
849                },
850                Value::Interval {
851                    months: bm,
852                    days: bd,
853                    micros: bu,
854                },
855            ) => am == bm && ad == bd && au == bu,
856            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
857            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
858            _ => false,
859        }
860    }
861
862    pub fn as_f64(&self) -> Option<f64> {
863        match self {
864            RawColumn::Integer(i) => Some(*i as f64),
865            RawColumn::Real(r) => Some(*r),
866            _ => None,
867        }
868    }
869
870    pub fn as_i64(&self) -> Option<i64> {
871        match self {
872            RawColumn::Integer(i) => Some(*i),
873            RawColumn::Time(t) => Some(*t),
874            RawColumn::Date(d) => Some(*d as i64),
875            RawColumn::Timestamp(t) => Some(*t),
876            _ => None,
877        }
878    }
879}
880
881fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
882    match DataType::from_tag(type_tag) {
883        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
884            data[..8].try_into().unwrap(),
885        ))),
886        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
887            data[..8].try_into().unwrap(),
888        ))),
889        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
890        Some(DataType::Text) => {
891            let s = std::str::from_utf8(data)
892                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
893            Ok(RawColumn::Text(s))
894        }
895        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
896        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
897            data[..8].try_into().unwrap(),
898        ))),
899        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
900            data[..4].try_into().unwrap(),
901        ))),
902        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
903            data[..8].try_into().unwrap(),
904        ))),
905        Some(DataType::Interval) => {
906            if data.len() < 16 {
907                return Err(SqlError::InvalidValue("truncated interval".into()));
908            }
909            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
910            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
911            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
912            Ok(RawColumn::Interval {
913                months,
914                days,
915                micros,
916            })
917        }
918        Some(DataType::Json) => {
919            let s = std::str::from_utf8(data)
920                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
921            Ok(RawColumn::Json(s))
922        }
923        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
924        _ => Err(SqlError::InvalidValue(format!(
925            "unknown column type tag: {type_tag}"
926        ))),
927    }
928}
929
930/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
931pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
932    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
933    if target >= col_count || new_val.is_null() {
934        return Ok(false);
935    }
936    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
937    if was_null {
938        return Ok(false);
939    }
940    for col in 0..target {
941        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
942        if !is_null {
943            pos = skip_cell(data, pos, version)?;
944        }
945    }
946    let type_tag = data[pos];
947    let (old_data_len, val_start) = match version {
948        RowVersion::V2 => match fixed_width_size(type_tag) {
949            Some(n) => (n, pos + 1),
950            None => {
951                if pos + 5 > data.len() {
952                    return Err(SqlError::InvalidValue("truncated column data".into()));
953                }
954                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
955                (len, pos + 5)
956            }
957        },
958        RowVersion::V1 => {
959            if pos + 5 > data.len() {
960                return Err(SqlError::InvalidValue("truncated column data".into()));
961            }
962            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
963            (len, pos + 5)
964        }
965    };
966    let new_data_len = match new_val {
967        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
968        Value::Date(_) => 4,
969        Value::Interval { .. } => 16,
970        Value::Boolean(_) => 1,
971        Value::Text(s) => s.len(),
972        Value::Blob(b) => b.len(),
973        Value::Json(s) => s.len(),
974        Value::Jsonb(b) => b.len(),
975        Value::Null => return Ok(false),
976    };
977    if new_data_len != old_data_len {
978        return Ok(false);
979    }
980    data[pos] = new_val.data_type().type_tag();
981    match new_val {
982        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
983        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
984        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
985        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
986        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
987        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
988        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
989        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
990        Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
991        Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
992        Value::Interval {
993            months,
994            days,
995            micros,
996        } => {
997            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
998            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
999            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1000        }
1001        Value::Null => unreachable!(),
1002    }
1003    Ok(true)
1004}
1005
1006/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1007pub fn patch_row_column(
1008    data: &[u8],
1009    target: usize,
1010    new_val: &Value,
1011    out: &mut Vec<u8>,
1012) -> Result<()> {
1013    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1014
1015    let new_col_count = if target >= col_count {
1016        target + 1
1017    } else {
1018        col_count
1019    };
1020    let new_bitmap_bytes = new_col_count.div_ceil(8);
1021    let bitmap_bytes = col_count.div_ceil(8);
1022    out.clear();
1023
1024    let header = (new_col_count as u16) | V2_FLAG;
1025    out.extend_from_slice(&header.to_le_bytes());
1026    let bitmap_start = out.len();
1027    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1028    for _ in bitmap_bytes..new_bitmap_bytes {
1029        out.push(0xFF);
1030    }
1031    if new_val.is_null() {
1032        out[bitmap_start + target / 8] |= 1 << (target % 8);
1033    } else {
1034        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1035    }
1036
1037    let mut pos = header_end;
1038    for col in 0..new_col_count {
1039        let was_null = if col < col_count {
1040            bitmap[col / 8] & (1 << (col % 8)) != 0
1041        } else {
1042            true
1043        };
1044
1045        if col == target {
1046            if !was_null {
1047                pos = skip_cell(data, pos, version)?;
1048            }
1049            if !new_val.is_null() {
1050                encode_cell_v2(new_val, out);
1051            }
1052        } else if !was_null {
1053            pos = copy_cell_to_v2(data, pos, version, out)?;
1054        }
1055    }
1056    Ok(())
1057}
1058
1059pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1060    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1061    if target >= col_count {
1062        return Ok(RawColumn::Null);
1063    }
1064
1065    for col in 0..=target {
1066        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1067
1068        if col == target {
1069            if is_null {
1070                return Ok(RawColumn::Null);
1071            }
1072            let (type_tag, body, _) = read_cell(data, pos, version)?;
1073            return decode_value_raw(type_tag, body);
1074        } else if !is_null {
1075            pos = skip_cell(data, pos, version)?;
1076        }
1077    }
1078
1079    unreachable!()
1080}
1081
1082/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1083pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1084    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1085    if target >= col_count {
1086        return Ok((RawColumn::Null, usize::MAX));
1087    }
1088
1089    for col in 0..=target {
1090        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1091
1092        if col == target {
1093            if is_null {
1094                return Ok((RawColumn::Null, usize::MAX));
1095            }
1096            let tag_offset = pos;
1097            let (type_tag, body, _) = read_cell(data, pos, version)?;
1098            let raw = decode_value_raw(type_tag, body)?;
1099            return Ok((raw, tag_offset));
1100        } else if !is_null {
1101            pos = skip_cell(data, pos, version)?;
1102        }
1103    }
1104
1105    unreachable!()
1106}
1107
1108/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1109pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1110    if offset == usize::MAX || new_val.is_null() {
1111        return Ok(false);
1112    }
1113    if data.len() < 2 || offset >= data.len() {
1114        return Err(SqlError::InvalidValue("truncated column data".into()));
1115    }
1116    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1117        RowVersion::V2
1118    } else {
1119        RowVersion::V1
1120    };
1121    let type_tag = data[offset];
1122    let (old_data_len, val_start) = match version {
1123        RowVersion::V2 => match fixed_width_size(type_tag) {
1124            Some(n) => (n, offset + 1),
1125            None => {
1126                if offset + 5 > data.len() {
1127                    return Err(SqlError::InvalidValue("truncated column data".into()));
1128                }
1129                let len =
1130                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1131                (len, offset + 5)
1132            }
1133        },
1134        RowVersion::V1 => {
1135            if offset + 5 > data.len() {
1136                return Err(SqlError::InvalidValue("truncated column data".into()));
1137            }
1138            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1139            (len, offset + 5)
1140        }
1141    };
1142    let new_data_len = match new_val {
1143        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1144        Value::Date(_) => 4,
1145        Value::Interval { .. } => 16,
1146        Value::Boolean(_) => 1,
1147        Value::Text(s) => s.len(),
1148        Value::Blob(b) => b.len(),
1149        Value::Json(s) => s.len(),
1150        Value::Jsonb(b) => b.len(),
1151        Value::Null => return Ok(false),
1152    };
1153    if new_data_len != old_data_len {
1154        return Ok(false);
1155    }
1156    data[offset] = new_val.data_type().type_tag();
1157    match new_val {
1158        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1159        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1160        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1161        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1162        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1163        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1164        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1165        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1166        Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1167        Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1168        Value::Interval {
1169            months,
1170            days,
1171            micros,
1172        } => {
1173            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1174            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1175            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1176        }
1177        Value::Null => unreachable!(),
1178    }
1179    Ok(true)
1180}
1181
1182pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1183    if key.is_empty() || key[0] != TAG_INTEGER {
1184        return Err(SqlError::InvalidValue("not an integer key".into()));
1185    }
1186    let (val, _) = decode_integer(&key[1..])?;
1187    match val {
1188        Value::Integer(i) => Ok(i),
1189        _ => unreachable!(),
1190    }
1191}
1192
1193#[cfg(test)]
1194#[path = "encoding_tests.rs"]
1195mod tests;