Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7pub(crate) const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24/// Encode a single value into an order-preserving byte sequence.
25pub fn encode_key_value(value: &Value) -> Vec<u8> {
26    let mut buf = Vec::with_capacity(16);
27    encode_key_value_into(value, &mut buf);
28    buf
29}
30
31/// Encode a composite key (multiple values concatenated).
32pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33    let mut buf = Vec::new();
34    for v in values {
35        buf.extend_from_slice(&encode_key_value(v));
36    }
37    buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41    buf.clear();
42    for v in values {
43        encode_key_value_into(v, buf);
44    }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48    buf.clear();
49    for &i in indices {
50        encode_key_value_into(&row[i as usize], buf);
51    }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56    buf.clear();
57    encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61    value: &Value,
62    coll: crate::types::Collation,
63    buf: &mut Vec<u8>,
64) {
65    match (value, coll) {
66        (Value::Text(s), crate::types::Collation::NoCase) => {
67            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68        }
69        (Value::Text(s), crate::types::Collation::Rtrim) => {
70            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71        }
72        _ => encode_key_value_into(value, buf),
73    }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77    match value {
78        Value::Null => buf.push(TAG_NULL),
79        Value::Boolean(b) => {
80            buf.push(TAG_BOOLEAN);
81            buf.push(if *b { 0x01 } else { 0x00 });
82        }
83        Value::Integer(i) => encode_integer_into(*i, buf),
84        Value::Real(r) => encode_real_into(*r, buf),
85        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90        Value::Interval {
91            months,
92            days,
93            micros,
94        } => {
95            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
96            buf.push(TAG_INTERVAL);
97            let mut mb = months.to_be_bytes();
98            mb[0] ^= 0x80;
99            buf.extend_from_slice(&mb);
100            let mut db = days.to_be_bytes();
101            db[0] ^= 0x80;
102            buf.extend_from_slice(&db);
103            let mut ub = micros.to_be_bytes();
104            ub[0] ^= 0x80;
105            buf.extend_from_slice(&ub);
106        }
107        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109        Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110        Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111        Value::Array(a) => encode_array_into(a, buf),
112        Value::Vector(v) => encode_vector_into(v, buf),
113    }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117    buf.push(TAG_VECTOR);
118    let mut inner = Vec::with_capacity(2 + v.len() * 4);
119    inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120    for &x in v {
121        inner.extend_from_slice(&x.to_le_bytes());
122    }
123    encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127    buf.push(TAG_ARRAY);
128    let mut inner = Vec::new();
129    for v in elems {
130        encode_key_value_into(v, &mut inner);
131    }
132    encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136    for &b in data {
137        if b == 0x00 {
138            buf.push(0x00);
139            buf.push(0xFF);
140        } else {
141            buf.push(b);
142        }
143    }
144    buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148    encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
152/// Layout: [tag] [marker] [data bytes].
153/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
154/// 0x80-n for negative (n one's-complemented bytes follow).
155/// Byte-wise lex compare matches signed integer order.
156pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157    buf.push(tag);
158    if val == 0 {
159        buf.push(0x80);
160        return;
161    }
162    if val > 0 {
163        let bytes = val.to_be_bytes();
164        let start = bytes.iter().position(|&b| b != 0).unwrap();
165        let byte_count = (8 - start) as u8;
166        buf.push(0x80 + byte_count);
167        buf.extend_from_slice(&bytes[start..]);
168    } else {
169        let abs_val = if val == i64::MIN {
170            u64::MAX / 2 + 1
171        } else {
172            (-val) as u64
173        };
174        let bytes = abs_val.to_be_bytes();
175        let start = bytes.iter().position(|&b| b != 0).unwrap();
176        let byte_count = (8 - start) as u8;
177        buf.push(0x80 - byte_count);
178        for &b in &bytes[start..] {
179            buf.push(!b);
180        }
181    }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185    buf.push(TAG_REAL);
186    let bits = val.to_bits();
187    let encoded = if val.is_sign_negative() {
188        !bits
189    } else {
190        bits ^ (1u64 << 63)
191    };
192    buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196    buf.push(tag);
197    for &b in data {
198        if b == 0x00 {
199            buf.push(0x00);
200            buf.push(0xFF);
201        } else {
202            buf.push(b);
203        }
204    }
205    buf.push(0x00);
206}
207
208/// Decode a single key value, returning the value and the number of bytes consumed.
209pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210    if data.is_empty() {
211        return Err(SqlError::InvalidValue("empty key data".into()));
212    }
213    match data[0] {
214        TAG_NULL => Ok((Value::Null, 1)),
215        TAG_BOOLEAN => {
216            if data.len() < 2 {
217                return Err(SqlError::InvalidValue("truncated boolean".into()));
218            }
219            Ok((Value::Boolean(data[1] != 0), 2))
220        }
221        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226            (Value::Date(d), n + 1)
227        }),
228        TAG_TIMESTAMP => {
229            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230        }
231        TAG_INTERVAL => {
232            if data.len() < 1 + 16 {
233                return Err(SqlError::InvalidValue("truncated interval".into()));
234            }
235            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236            mb[0] ^= 0x80;
237            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238            db[0] ^= 0x80;
239            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240            ub[0] ^= 0x80;
241            Ok((
242                Value::Interval {
243                    months: i32::from_be_bytes(mb),
244                    days: i32::from_be_bytes(db),
245                    micros: i64::from_be_bytes(ub),
246                },
247                17,
248            ))
249        }
250        TAG_TEXT => {
251            let (bytes, n) = decode_null_escaped(&data[1..])?;
252            let s = String::from_utf8(bytes)
253                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254            Ok((Value::Text(CompactString::from(s)), n + 1))
255        }
256        TAG_BLOB => {
257            let (bytes, n) = decode_null_escaped(&data[1..])?;
258            Ok((Value::Blob(bytes), n + 1))
259        }
260        TAG_JSON => {
261            let (bytes, n) = decode_null_escaped(&data[1..])?;
262            let s = String::from_utf8(bytes)
263                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264            Ok((Value::Json(CompactString::from(s)), n + 1))
265        }
266        TAG_JSONB => {
267            let (bytes, n) = decode_null_escaped(&data[1..])?;
268            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269        }
270        TAG_TSVECTOR => {
271            let (bytes, n) = decode_null_escaped(&data[1..])?;
272            Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273        }
274        TAG_TSQUERY => {
275            let (bytes, n) = decode_null_escaped(&data[1..])?;
276            Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277        }
278        TAG_ARRAY => {
279            let (inner, n) = decode_null_escaped(&data[1..])?;
280            let mut elems = Vec::new();
281            let mut pos = 0;
282            while pos < inner.len() {
283                let (v, vlen) = decode_key_value(&inner[pos..])?;
284                elems.push(v);
285                pos += vlen;
286            }
287            Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288        }
289        TAG_VECTOR => {
290            let (inner, n) = decode_null_escaped(&data[1..])?;
291            if inner.len() < 2 {
292                return Err(SqlError::InvalidValue("truncated vector key".into()));
293            }
294            let dim = u16::from_le_bytes([inner[0], inner[1]]) as usize;
295            if inner.len() != 2 + dim * 4 {
296                return Err(SqlError::InvalidValue("truncated vector key".into()));
297            }
298            let elems: Vec<f32> = inner[2..]
299                .chunks_exact(4)
300                .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
301                .collect();
302            Ok((Value::Vector(std::sync::Arc::from(elems)), n + 1))
303        }
304        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
305    }
306}
307
308/// Decode a composite key into multiple values.
309pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
310    let mut values = Vec::with_capacity(count);
311    let mut pos = 0;
312    for _ in 0..count {
313        let (v, n) = decode_key_value(&data[pos..])?;
314        values.push(v);
315        pos += n;
316    }
317    Ok(values)
318}
319
320fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
321    let (v, n) = decode_signed_varint(data)?;
322    Ok((Value::Integer(v), n))
323}
324
325/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
326pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
327    if data.is_empty() {
328        return Err(SqlError::InvalidValue("truncated integer".into()));
329    }
330    let marker = data[0];
331    if marker == 0x80 {
332        return Ok((0, 1));
333    }
334    if marker > 0x80 {
335        let byte_count = (marker - 0x80) as usize;
336        if data.len() < 1 + byte_count {
337            return Err(SqlError::InvalidValue("truncated positive integer".into()));
338        }
339        let mut bytes = [0u8; 8];
340        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
341        let val = i64::from_be_bytes(bytes);
342        Ok((val, 1 + byte_count))
343    } else {
344        let byte_count = (0x80 - marker) as usize;
345        if data.len() < 1 + byte_count {
346            return Err(SqlError::InvalidValue("truncated negative integer".into()));
347        }
348        let mut bytes = [0u8; 8];
349        for i in 0..byte_count {
350            bytes[8 - byte_count + i] = !data[1 + i];
351        }
352        let abs_val = u64::from_be_bytes(bytes);
353        let val = (-(abs_val as i128)) as i64;
354        Ok((val, 1 + byte_count))
355    }
356}
357
358fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
359    if data.len() < 8 {
360        return Err(SqlError::InvalidValue("truncated real".into()));
361    }
362    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
363    let bits = if encoded & (1u64 << 63) != 0 {
364        // Was positive: undo sign bit flip
365        encoded ^ (1u64 << 63)
366    } else {
367        // Was negative: undo full inversion
368        !encoded
369    };
370    let val = f64::from_bits(bits);
371    Ok((Value::Real(val), 8))
372}
373
374/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
375/// Byte length of one encoded key component, decoding nothing.
376pub(crate) fn skip_key_value(data: &[u8]) -> Result<usize> {
377    if data.is_empty() {
378        return Err(SqlError::InvalidValue("empty key data".into()));
379    }
380    match data[0] {
381        TAG_NULL => Ok(1),
382        TAG_BOOLEAN => Ok(2),
383        TAG_INTEGER => decode_integer(&data[1..]).map(|(_, n)| n + 1),
384        TAG_REAL => decode_real(&data[1..]).map(|(_, n)| n + 1),
385        TAG_TIME | TAG_DATE | TAG_TIMESTAMP => decode_signed_varint(&data[1..]).map(|(_, n)| n + 1),
386        TAG_INTERVAL => Ok(17),
387        // Every remaining tag wraps a null-escaped payload.
388        _ => skip_null_escaped(&data[1..]).map(|n| n + 1),
389    }
390}
391
392fn skip_null_escaped(data: &[u8]) -> Result<usize> {
393    let mut i = 0;
394    while i < data.len() {
395        if data[i] == 0x00 {
396            if i + 1 < data.len() && data[i + 1] == 0xFF {
397                i += 2;
398            } else {
399                return Ok(i + 1);
400            }
401        } else {
402            i += 1;
403        }
404    }
405    Err(SqlError::InvalidValue(
406        "unterminated null-escaped string".into(),
407    ))
408}
409
410fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
411    let mut result = Vec::new();
412    let mut i = 0;
413    while i < data.len() {
414        if data[i] == 0x00 {
415            if i + 1 < data.len() && data[i + 1] == 0xFF {
416                result.push(0x00);
417                i += 2;
418            } else {
419                return Ok((result, i + 1)); // terminator consumed
420            }
421        } else {
422            result.push(data[i]);
423            i += 1;
424        }
425    }
426    Err(SqlError::InvalidValue(
427        "unterminated null-escaped string".into(),
428    ))
429}
430
431fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
432    match v {
433        Value::Integer(val) => {
434            buf.push(DataType::Integer.type_tag());
435            buf.extend_from_slice(&val.to_le_bytes());
436        }
437        Value::Real(r) => {
438            buf.push(DataType::Real.type_tag());
439            buf.extend_from_slice(&r.to_le_bytes());
440        }
441        Value::Boolean(b) => {
442            buf.push(DataType::Boolean.type_tag());
443            buf.push(if *b { 1 } else { 0 });
444        }
445        Value::Text(s) => {
446            let bytes = s.as_bytes();
447            buf.push(DataType::Text.type_tag());
448            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
449            buf.extend_from_slice(bytes);
450        }
451        Value::Blob(data) => {
452            buf.push(DataType::Blob.type_tag());
453            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
454            buf.extend_from_slice(data);
455        }
456        Value::Time(t) => {
457            buf.push(DataType::Time.type_tag());
458            buf.extend_from_slice(&t.to_le_bytes());
459        }
460        Value::Date(d) => {
461            buf.push(DataType::Date.type_tag());
462            buf.extend_from_slice(&d.to_le_bytes());
463        }
464        Value::Timestamp(t) => {
465            buf.push(DataType::Timestamp.type_tag());
466            buf.extend_from_slice(&t.to_le_bytes());
467        }
468        Value::Interval {
469            months,
470            days,
471            micros,
472        } => {
473            buf.push(DataType::Interval.type_tag());
474            buf.extend_from_slice(&months.to_le_bytes());
475            buf.extend_from_slice(&days.to_le_bytes());
476            buf.extend_from_slice(&micros.to_le_bytes());
477        }
478        Value::Json(s) => {
479            let bytes = s.as_bytes();
480            buf.push(DataType::Json.type_tag());
481            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
482            buf.extend_from_slice(bytes);
483        }
484        Value::Jsonb(b) => {
485            buf.push(DataType::Jsonb.type_tag());
486            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
487            buf.extend_from_slice(b);
488        }
489        Value::TsVector(b) => {
490            buf.push(DataType::TsVector.type_tag());
491            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
492            buf.extend_from_slice(b);
493        }
494        Value::TsQuery(b) => {
495            buf.push(DataType::TsQuery.type_tag());
496            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
497            buf.extend_from_slice(b);
498        }
499        Value::Array(a) => {
500            buf.push(DataType::Array.type_tag());
501            let len = encoded_array_v2_size(a);
502            buf.extend_from_slice(&(len as u32).to_le_bytes());
503            let start = buf.len();
504            buf.resize(start + len, 0);
505            write_array_v2_into_slice(a, &mut buf[start..start + len]);
506        }
507        Value::Vector(v) => {
508            buf.push(
509                DataType::Vector {
510                    dim: v.len() as u16,
511                }
512                .type_tag(),
513            );
514            let len = 2 + v.len() * 4;
515            buf.extend_from_slice(&(len as u32).to_le_bytes());
516            buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
517            for &x in v.iter() {
518                buf.extend_from_slice(&x.to_le_bytes());
519            }
520        }
521        Value::Null => unreachable!(),
522    }
523}
524
525pub fn encode_row(values: &[Value]) -> Vec<u8> {
526    let mut buf = Vec::new();
527    encode_row_into(values, &mut buf);
528    buf
529}
530
531pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
532    buf.clear();
533    let col_count = values.len();
534    let bitmap_bytes = col_count.div_ceil(8);
535
536    let header = (col_count as u16) | V2_FLAG;
537    buf.extend_from_slice(&header.to_le_bytes());
538
539    let bitmap_start = buf.len();
540    buf.resize(buf.len() + bitmap_bytes, 0);
541
542    for (i, v) in values.iter().enumerate() {
543        if v.is_null() {
544            buf[bitmap_start + i / 8] |= 1 << (i % 8);
545            continue;
546        }
547        encode_cell_v2(v, buf);
548    }
549}
550
551/// A physical slot: NULL, a runtime-filled integer hole, or a frozen constant.
552pub enum TemplateSlot {
553    Null,
554    IntHole,
555    Const(Value),
556}
557
558pub struct RowTemplate {
559    pub template: Vec<u8>,
560    /// `(slot, byte offset)` of each integer hole the runtime fills.
561    pub slot_offsets: Vec<(usize, usize)>,
562}
563
564pub fn build_row_template(phys_count: usize, slots: &[TemplateSlot]) -> RowTemplate {
565    let bitmap_bytes = phys_count.div_ceil(8);
566    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
567    let header = (phys_count as u16) | V2_FLAG;
568    template.extend_from_slice(&header.to_le_bytes());
569    let bitmap_start = template.len();
570    template.resize(bitmap_start + bitmap_bytes, 0);
571    let mut slot_offsets = Vec::new();
572    let set_null = |template: &mut [u8], slot: usize| {
573        template[bitmap_start + slot / 8] |= 1 << (slot % 8);
574    };
575    for (slot, kind) in slots.iter().enumerate() {
576        match kind {
577            TemplateSlot::Null => set_null(&mut template, slot),
578            TemplateSlot::IntHole => {
579                template.push(DataType::Integer.type_tag());
580                let value_offset = template.len();
581                template.extend_from_slice(&[0u8; 8]);
582                slot_offsets.push((slot, value_offset));
583            }
584            TemplateSlot::Const(v) if v.is_null() => set_null(&mut template, slot),
585            TemplateSlot::Const(v) => encode_cell_v2(v, &mut template),
586        }
587    }
588    RowTemplate {
589        template,
590        slot_offsets,
591    }
592}
593
594#[inline]
595pub fn encode_row_with_template(
596    tmpl: &RowTemplate,
597    values: &[Value],
598    buf: &mut Vec<u8>,
599) -> Result<()> {
600    // NULL in an int hole removes its cell: take the generic encoder.
601    if tmpl
602        .slot_offsets
603        .iter()
604        .any(|&(slot, _)| values[slot].is_null())
605    {
606        encode_row_into(values, buf);
607        return Ok(());
608    }
609    buf.clear();
610    buf.extend_from_slice(&tmpl.template);
611    for &(slot, off) in &tmpl.slot_offsets {
612        match &values[slot] {
613            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
614            other => {
615                return Err(SqlError::TypeMismatch {
616                    expected: "Integer".into(),
617                    got: other.data_type().to_string(),
618                });
619            }
620        }
621    }
622    Ok(())
623}
624
625fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
626    match DataType::from_tag(type_tag) {
627        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
628            data[..8].try_into().unwrap(),
629        ))),
630        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
631            data[..8].try_into().unwrap(),
632        ))),
633        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
634        Some(DataType::Text) => {
635            let s = std::str::from_utf8(data)
636                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
637            Ok(Value::Text(CompactString::from(s)))
638        }
639        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
640        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
641            data[..8].try_into().unwrap(),
642        ))),
643        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
644            data[..4].try_into().unwrap(),
645        ))),
646        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
647            data[..8].try_into().unwrap(),
648        ))),
649        Some(DataType::Interval) => {
650            if data.len() < 16 {
651                return Err(SqlError::InvalidValue("truncated interval".into()));
652            }
653            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
654            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
655            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
656            Ok(Value::Interval {
657                months,
658                days,
659                micros,
660            })
661        }
662        Some(DataType::Json) => {
663            let s = std::str::from_utf8(data)
664                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
665            Ok(Value::Json(CompactString::from(s)))
666        }
667        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
668        Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
669        Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
670        Some(DataType::Array) => decode_array_v2(data),
671        Some(DataType::Vector { .. }) => decode_vector(data),
672        _ => Err(SqlError::InvalidValue(format!(
673            "unknown column type tag: {type_tag}"
674        ))),
675    }
676}
677
678fn decode_vector(data: &[u8]) -> Result<Value> {
679    if data.len() < 2 {
680        return Err(SqlError::InvalidValue("truncated vector".into()));
681    }
682    let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
683    if data.len() < 2 + dim * 4 {
684        return Err(SqlError::InvalidValue("truncated vector payload".into()));
685    }
686    let mut v = Vec::with_capacity(dim);
687    for i in 0..dim {
688        let off = 2 + i * 4;
689        v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
690    }
691    Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
692}
693
694fn encoded_array_v2_size(elems: &[Value]) -> usize {
695    let mut total = 4;
696    for elem in elems {
697        if elem.is_null() {
698            total += 1;
699            continue;
700        }
701        total += 1 + 1;
702        let tag = elem.data_type().type_tag();
703        match fixed_width_size(tag) {
704            Some(n) => total += n,
705            None => total += 4 + variable_cell_payload_size(elem),
706        }
707    }
708    total
709}
710
711fn variable_cell_payload_size(v: &Value) -> usize {
712    match v {
713        Value::Text(s) => s.len(),
714        Value::Blob(b) => b.len(),
715        Value::Json(s) => s.len(),
716        Value::Jsonb(b) => b.len(),
717        Value::TsVector(b) => b.len(),
718        Value::TsQuery(b) => b.len(),
719        Value::Array(a) => encoded_array_v2_size(a),
720        Value::Vector(v) => 2 + v.len() * 4,
721        _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
722    }
723}
724
725fn value_encoded_size_v2(v: &Value) -> Option<usize> {
726    if v.is_null() {
727        return None;
728    }
729    Some(match fixed_width_size(v.data_type().type_tag()) {
730        Some(n) => n,
731        None => variable_cell_payload_size(v),
732    })
733}
734
735fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
736    match v {
737        Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
738        Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
739        Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
740        Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
741        Value::Blob(b) => out[..b.len()].copy_from_slice(b),
742        Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
743        Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
744        Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
745        Value::Interval {
746            months,
747            days,
748            micros,
749        } => {
750            out[..4].copy_from_slice(&months.to_le_bytes());
751            out[4..8].copy_from_slice(&days.to_le_bytes());
752            out[8..16].copy_from_slice(&micros.to_le_bytes());
753        }
754        Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
755        Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
756        Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
757        Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
758        Value::Array(a) => write_array_v2_into_slice(a, out),
759        Value::Vector(v) => {
760            out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
761            let mut pos = 2;
762            for &x in v.iter() {
763                out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
764                pos += 4;
765            }
766        }
767        Value::Null => unreachable!(),
768    }
769}
770
771fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
772    out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
773    let mut pos = 4;
774    for elem in elems {
775        if elem.is_null() {
776            out[pos] = 0xFF;
777            pos += 1;
778            continue;
779        }
780        out[pos] = 0x00;
781        pos += 1;
782        let tag = elem.data_type().type_tag();
783        out[pos] = tag;
784        pos += 1;
785        match fixed_width_size(tag) {
786            Some(n) => {
787                write_value_payload_v2(elem, &mut out[pos..pos + n]);
788                pos += n;
789            }
790            None => {
791                let payload_len = variable_cell_payload_size(elem);
792                out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
793                pos += 4;
794                write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
795                pos += payload_len;
796            }
797        }
798    }
799}
800
801fn decode_array_v2(data: &[u8]) -> Result<Value> {
802    if data.len() < 4 {
803        return Err(SqlError::InvalidValue("truncated array length".into()));
804    }
805    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
806    let mut pos = 4;
807    let mut elems = Vec::with_capacity(count);
808    for _ in 0..count {
809        if pos >= data.len() {
810            return Err(SqlError::InvalidValue("truncated array elements".into()));
811        }
812        if data[pos] == 0xFF {
813            elems.push(Value::Null);
814            pos += 1;
815            continue;
816        }
817        if data[pos] != 0x00 {
818            return Err(SqlError::InvalidValue(
819                "invalid array element marker".into(),
820            ));
821        }
822        pos += 1;
823        if pos >= data.len() {
824            return Err(SqlError::InvalidValue("truncated array element".into()));
825        }
826        let type_tag = data[pos];
827        pos += 1;
828        let (val, advance) = match fixed_width_size(type_tag) {
829            Some(n) => {
830                if pos + n > data.len() {
831                    return Err(SqlError::InvalidValue(
832                        "truncated fixed-width array element".into(),
833                    ));
834                }
835                let v = decode_value(type_tag, &data[pos..pos + n])?;
836                (v, n)
837            }
838            None => {
839                if pos + 4 > data.len() {
840                    return Err(SqlError::InvalidValue(
841                        "truncated array element length".into(),
842                    ));
843                }
844                let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
845                pos += 4;
846                if pos + len > data.len() {
847                    return Err(SqlError::InvalidValue(
848                        "truncated variable-width array element".into(),
849                    ));
850                }
851                let v = decode_value(type_tag, &data[pos..pos + len])?;
852                (v, len)
853            }
854        };
855        pos += advance;
856        elems.push(val);
857    }
858    Ok(Value::Array(std::sync::Arc::new(elems)))
859}
860
861/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
862/// High bit of `col_count:u16` flags V2.
863#[derive(Clone, Copy, PartialEq, Eq, Debug)]
864pub(crate) enum RowVersion {
865    V1,
866    V2,
867}
868
869pub(crate) const V2_FLAG: u16 = 0x8000;
870pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
871
872#[inline]
873pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
874    match DataType::from_tag(type_tag)? {
875        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
876        DataType::Date => Some(4),
877        DataType::Boolean => Some(1),
878        DataType::Interval => Some(16),
879        DataType::Text
880        | DataType::Blob
881        | DataType::Json
882        | DataType::Jsonb
883        | DataType::TsVector
884        | DataType::TsQuery
885        | DataType::Array
886        | DataType::Vector { .. }
887        | DataType::Null => None,
888    }
889}
890
891/// Resolve a cell's `(data_len, body_pos)` from its tag. Variable-width cells carry a
892/// u32 length prefix; V2 fixed-width cells omit it.
893#[inline]
894fn cell_extent(
895    data: &[u8],
896    type_tag: u8,
897    after_tag: usize,
898    version: RowVersion,
899) -> Result<(usize, usize)> {
900    let fixed = match version {
901        RowVersion::V2 => fixed_width_size(type_tag),
902        RowVersion::V1 => None,
903    };
904    if let Some(n) = fixed {
905        return Ok((n, after_tag));
906    }
907    if after_tag + 4 > data.len() {
908        return Err(SqlError::InvalidValue("truncated column data".into()));
909    }
910    let len = u32::from_le_bytes([
911        data[after_tag],
912        data[after_tag + 1],
913        data[after_tag + 2],
914        data[after_tag + 3],
915    ]) as usize;
916    Ok((len, after_tag + 4))
917}
918
919#[inline]
920fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
921    if pos >= data.len() {
922        return Err(SqlError::InvalidValue("truncated column data".into()));
923    }
924    let type_tag = data[pos];
925    let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
926    if body_pos + data_len > data.len() {
927        return Err(SqlError::InvalidValue("truncated column value".into()));
928    }
929    Ok((
930        type_tag,
931        &data[body_pos..body_pos + data_len],
932        body_pos + data_len,
933    ))
934}
935
936/// Next cell position by offset; the body is left unsliced (the next read validates it).
937#[inline]
938fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
939    if pos >= data.len() {
940        return Err(SqlError::InvalidValue("truncated column data".into()));
941    }
942    let type_tag = data[pos];
943    let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
944    Ok(body_pos + data_len)
945}
946
947fn copy_cell_to_v2(
948    data: &[u8],
949    pos: usize,
950    version: RowVersion,
951    out: &mut Vec<u8>,
952) -> Result<usize> {
953    let (tag, body, next) = read_cell(data, pos, version)?;
954    out.push(tag);
955    if fixed_width_size(tag).is_none() {
956        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
957    }
958    out.extend_from_slice(body);
959    Ok(next)
960}
961
962fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
963    if data.len() < 2 {
964        return Err(SqlError::InvalidValue("row data too short".into()));
965    }
966    let raw = u16::from_le_bytes([data[0], data[1]]);
967    let version = if raw & V2_FLAG != 0 {
968        RowVersion::V2
969    } else {
970        RowVersion::V1
971    };
972    let col_count = (raw & COL_COUNT_MASK) as usize;
973    let bitmap_bytes = col_count.div_ceil(8);
974    let pos = 2;
975    if data.len() < pos + bitmap_bytes {
976        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
977    }
978    Ok((
979        version,
980        col_count,
981        &data[pos..pos + bitmap_bytes],
982        pos + bitmap_bytes,
983    ))
984}
985
986pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
987    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
988
989    let mut values = Vec::with_capacity(col_count);
990    for i in 0..col_count {
991        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
992            values.push(Value::Null);
993            continue;
994        }
995        let (type_tag, body, next) = read_cell(data, pos, version)?;
996        values.push(decode_value(type_tag, body)?);
997        pos = next;
998    }
999
1000    Ok(values)
1001}
1002
1003/// Push non-PK cells onto `out` in physical order. `Ok(false)` if stored count != `expected`.
1004/// Sound only when physical order == logical order (no dropped slots).
1005pub(crate) fn decode_row_push(data: &[u8], expected: usize, out: &mut Vec<Value>) -> Result<bool> {
1006    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1007    if col_count != expected {
1008        return Ok(false);
1009    }
1010    for col in 0..col_count {
1011        if bitmap[col / 8] & (1 << (col % 8)) != 0 {
1012            out.push(Value::Null);
1013        } else {
1014            let (type_tag, body, next) = read_cell(data, pos, version)?;
1015            out.push(decode_value(type_tag, body)?);
1016            pos = next;
1017        }
1018    }
1019    Ok(true)
1020}
1021
1022/// Returns the number of non-PK columns stored in a row value blob.
1023#[inline]
1024pub fn row_non_pk_count(data: &[u8]) -> usize {
1025    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
1026}
1027
1028pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
1029    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1030
1031    for i in 0..col_count {
1032        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
1033            continue;
1034        }
1035        let (type_tag, body, next) = read_cell(data, pos, version)?;
1036        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
1037            out[col_mapping[i]] = decode_value(type_tag, body)?;
1038        }
1039        pos = next;
1040    }
1041
1042    Ok(())
1043}
1044
1045pub fn decode_pk_into(
1046    key: &[u8],
1047    count: usize,
1048    out: &mut [Value],
1049    pk_mapping: &[usize],
1050) -> Result<()> {
1051    let mut pos = 0;
1052    for i in 0..count {
1053        let (v, n) = decode_key_value(&key[pos..])?;
1054        if i < pk_mapping.len() {
1055            out[pk_mapping[i]] = v;
1056        }
1057        pos += n;
1058    }
1059    Ok(())
1060}
1061
1062pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
1063    if targets.is_empty() {
1064        return Ok(Vec::new());
1065    }
1066    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1067
1068    let mut results = Vec::with_capacity(targets.len());
1069    let mut ti = 0;
1070
1071    for col in 0..col_count {
1072        if ti >= targets.len() {
1073            break;
1074        }
1075        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1076
1077        if col == targets[ti] {
1078            if is_null {
1079                results.push(Value::Null);
1080            } else {
1081                let (type_tag, body, next) = read_cell(data, pos, version)?;
1082                results.push(decode_value(type_tag, body)?);
1083                pos = next;
1084            }
1085            ti += 1;
1086        } else if !is_null {
1087            pos = skip_cell(data, pos, version)?;
1088        }
1089    }
1090
1091    while ti < targets.len() {
1092        results.push(Value::Null);
1093        ti += 1;
1094    }
1095
1096    Ok(results)
1097}
1098
1099pub fn decode_columns_into(
1100    data: &[u8],
1101    targets: &[usize],
1102    schema_cols: &[usize],
1103    row: &mut [Value],
1104) -> Result<()> {
1105    if targets.is_empty() {
1106        return Ok(());
1107    }
1108    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1109
1110    let mut ti = 0;
1111    for col in 0..col_count {
1112        if ti >= targets.len() {
1113            break;
1114        }
1115        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1116
1117        if col == targets[ti] {
1118            if is_null {
1119                row[schema_cols[ti]] = Value::Null;
1120            } else {
1121                let (type_tag, body, next) = read_cell(data, pos, version)?;
1122                row[schema_cols[ti]] = decode_value(type_tag, body)?;
1123                pos = next;
1124            }
1125            ti += 1;
1126        } else if !is_null {
1127            pos = skip_cell(data, pos, version)?;
1128        }
1129    }
1130
1131    Ok(())
1132}
1133
1134struct OffsetTarget {
1135    cell_pos: usize,
1136    tag: u8,
1137    fixed_width: Option<usize>,
1138    out_pos: usize,
1139}
1140
1141/// Reads projected non-PK columns from a V2 row by static byte offset. Built only when
1142/// every column before the last target is fixed-width, so offsets are constant.
1143pub(crate) struct ProjectedOffsetPlan {
1144    expected_header: u16,
1145    body_start: usize,
1146    nonnull_mask: Vec<u8>,
1147    targets: Vec<OffsetTarget>,
1148}
1149
1150impl ProjectedOffsetPlan {
1151    /// `targets` = `(physical_index, out_position)`. `None` if a variable-width column
1152    /// precedes the last target.
1153    pub(crate) fn build(phys_tags: &[u8], targets: &[(usize, usize)]) -> Option<Self> {
1154        if targets.is_empty() {
1155            return None;
1156        }
1157        let max_t = targets.iter().map(|&(p, _)| p).max()?;
1158        let mut offsets = Vec::with_capacity(max_t + 1);
1159        let mut acc = 0usize;
1160        for (i, &tag) in phys_tags.iter().enumerate().take(max_t + 1) {
1161            offsets.push(acc);
1162            if i < max_t {
1163                acc += 1 + fixed_width_size(tag)?;
1164            }
1165        }
1166        let mut out_targets = Vec::with_capacity(targets.len());
1167        for &(p, out_pos) in targets {
1168            let tag = phys_tags[p];
1169            out_targets.push(OffsetTarget {
1170                cell_pos: offsets[p],
1171                tag,
1172                fixed_width: fixed_width_size(tag),
1173                out_pos,
1174            });
1175        }
1176        let phys_count = phys_tags.len();
1177        let bitmap_bytes = phys_count.div_ceil(8);
1178        let mut nonnull_mask = vec![0u8; bitmap_bytes];
1179        for bit in 0..=max_t {
1180            nonnull_mask[bit / 8] |= 1 << (bit % 8);
1181        }
1182        Some(Self {
1183            expected_header: V2_FLAG | (phys_count as u16),
1184            body_start: 2 + bitmap_bytes,
1185            nonnull_mask,
1186            targets: out_targets,
1187        })
1188    }
1189
1190    /// True when `data` matches the plan's V2 header and has no NULL in the static prefix.
1191    #[inline]
1192    fn layout_ok(&self, data: &[u8]) -> bool {
1193        if data.len() < self.body_start
1194            || u16::from_le_bytes([data[0], data[1]]) != self.expected_header
1195        {
1196            return false;
1197        }
1198        self.nonnull_mask
1199            .iter()
1200            .enumerate()
1201            .all(|(i, &m)| data[2 + i] & m == 0)
1202    }
1203
1204    /// Decode one target by static offset. `Ok(None)` = tag/bounds mismatch (fall back).
1205    #[inline]
1206    fn read_target(&self, data: &[u8], t: &OffsetTarget) -> Result<Option<Value>> {
1207        let pos = self.body_start + t.cell_pos;
1208        if data.get(pos) != Some(&t.tag) {
1209            return Ok(None);
1210        }
1211        let after_tag = pos + 1;
1212        let (len, body_pos) = match t.fixed_width {
1213            Some(n) => (n, after_tag),
1214            None => match data.get(after_tag..after_tag + 4) {
1215                Some(lb) => (
1216                    u32::from_le_bytes(lb.try_into().unwrap()) as usize,
1217                    after_tag + 4,
1218                ),
1219                None => return Ok(None),
1220            },
1221        };
1222        match data.get(body_pos..body_pos + len) {
1223            Some(body) => Ok(Some(decode_value(t.tag, body)?)),
1224            None => Ok(None),
1225        }
1226    }
1227
1228    /// Decode planned columns by index into `row`. `Ok(false)` = layout mismatch (fall back).
1229    pub(crate) fn decode_into(&self, data: &[u8], row: &mut [Value]) -> Result<bool> {
1230        if !self.layout_ok(data) {
1231            return Ok(false);
1232        }
1233        for t in &self.targets {
1234            match self.read_target(data, t)? {
1235                Some(v) => row[t.out_pos] = v,
1236                None => return Ok(false),
1237            }
1238        }
1239        Ok(true)
1240    }
1241
1242    /// Push planned columns onto `out` (monotonic projection only). `Ok(false)` = layout
1243    /// mismatch; `out` may be left partially pushed and must be discarded by the caller.
1244    pub(crate) fn decode_push(&self, data: &[u8], out: &mut Vec<Value>) -> Result<bool> {
1245        if !self.layout_ok(data) {
1246            return Ok(false);
1247        }
1248        for t in &self.targets {
1249            match self.read_target(data, t)? {
1250                Some(v) => out.push(v),
1251                None => return Ok(false),
1252            }
1253        }
1254        Ok(true)
1255    }
1256}
1257
1258#[derive(Debug, Clone, Copy)]
1259pub enum RawColumn<'a> {
1260    Null,
1261    Integer(i64),
1262    Real(f64),
1263    Boolean(bool),
1264    Text(&'a str),
1265    Blob(&'a [u8]),
1266    Time(i64),
1267    Date(i32),
1268    Timestamp(i64),
1269    Interval { months: i32, days: i32, micros: i64 },
1270    Json(&'a str),
1271    Jsonb(&'a [u8]),
1272    TsVector(&'a [u8]),
1273    TsQuery(&'a [u8]),
1274    Array(&'a [u8]),
1275    Vector(&'a [u8]),
1276}
1277
1278impl<'a> RawColumn<'a> {
1279    pub fn to_value(self) -> Value {
1280        match self {
1281            RawColumn::Null => Value::Null,
1282            RawColumn::Integer(i) => Value::Integer(i),
1283            RawColumn::Real(r) => Value::Real(r),
1284            RawColumn::Boolean(b) => Value::Boolean(b),
1285            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1286            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1287            RawColumn::Time(t) => Value::Time(t),
1288            RawColumn::Date(d) => Value::Date(d),
1289            RawColumn::Timestamp(t) => Value::Timestamp(t),
1290            RawColumn::Interval {
1291                months,
1292                days,
1293                micros,
1294            } => Value::Interval {
1295                months,
1296                days,
1297                micros,
1298            },
1299            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1300            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1301            RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1302            RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1303            RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1304            RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1305        }
1306    }
1307
1308    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1309        use std::cmp::Ordering;
1310        match (self, other) {
1311            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1312            (RawColumn::Null, _) | (_, Value::Null) => None,
1313            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1314            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1315            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1316            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1317            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1318            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1319            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1320            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1321            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1322            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1323            (
1324                RawColumn::Interval {
1325                    months: am,
1326                    days: ad,
1327                    micros: au,
1328                },
1329                Value::Interval {
1330                    months: bm,
1331                    days: bd,
1332                    micros: bu,
1333                },
1334            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1335            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1336            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1337            (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1338            (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1339            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1340                Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1341                _ => None,
1342            },
1343            _ => None,
1344        }
1345    }
1346
1347    pub fn eq_value(&self, other: &Value) -> bool {
1348        match (self, other) {
1349            (RawColumn::Null, Value::Null) => true,
1350            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1351            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1352            (RawColumn::Real(a), Value::Real(b)) => a == b,
1353            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1354            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1355            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1356            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1357            (RawColumn::Time(a), Value::Time(b)) => a == b,
1358            (RawColumn::Date(a), Value::Date(b)) => a == b,
1359            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1360            (
1361                RawColumn::Interval {
1362                    months: am,
1363                    days: ad,
1364                    micros: au,
1365                },
1366                Value::Interval {
1367                    months: bm,
1368                    days: bd,
1369                    micros: bu,
1370                },
1371            ) => am == bm && ad == bd && au == bu,
1372            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1373            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1374            (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1375            (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1376            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1377                Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1378                _ => false,
1379            },
1380            _ => false,
1381        }
1382    }
1383
1384    pub fn as_f64(&self) -> Option<f64> {
1385        match self {
1386            RawColumn::Integer(i) => Some(*i as f64),
1387            RawColumn::Real(r) => Some(*r),
1388            _ => None,
1389        }
1390    }
1391
1392    pub fn as_i64(&self) -> Option<i64> {
1393        match self {
1394            RawColumn::Integer(i) => Some(*i),
1395            RawColumn::Time(t) => Some(*t),
1396            RawColumn::Date(d) => Some(*d as i64),
1397            RawColumn::Timestamp(t) => Some(*t),
1398            _ => None,
1399        }
1400    }
1401}
1402
1403fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1404    match DataType::from_tag(type_tag) {
1405        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1406            data[..8].try_into().unwrap(),
1407        ))),
1408        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1409            data[..8].try_into().unwrap(),
1410        ))),
1411        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1412        Some(DataType::Text) => {
1413            let s = std::str::from_utf8(data)
1414                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1415            Ok(RawColumn::Text(s))
1416        }
1417        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1418        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1419            data[..8].try_into().unwrap(),
1420        ))),
1421        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1422            data[..4].try_into().unwrap(),
1423        ))),
1424        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1425            data[..8].try_into().unwrap(),
1426        ))),
1427        Some(DataType::Interval) => {
1428            if data.len() < 16 {
1429                return Err(SqlError::InvalidValue("truncated interval".into()));
1430            }
1431            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1432            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1433            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1434            Ok(RawColumn::Interval {
1435                months,
1436                days,
1437                micros,
1438            })
1439        }
1440        Some(DataType::Json) => {
1441            let s = std::str::from_utf8(data)
1442                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1443            Ok(RawColumn::Json(s))
1444        }
1445        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1446        Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1447        Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1448        Some(DataType::Array) => Ok(RawColumn::Array(data)),
1449        Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1450        _ => Err(SqlError::InvalidValue(format!(
1451            "unknown column type tag: {type_tag}"
1452        ))),
1453    }
1454}
1455
1456/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
1457pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1458    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1459    if target >= col_count || new_val.is_null() {
1460        return Ok(false);
1461    }
1462    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1463    if was_null {
1464        return Ok(false);
1465    }
1466    for col in 0..target {
1467        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1468        if !is_null {
1469            pos = skip_cell(data, pos, version)?;
1470        }
1471    }
1472    if pos >= data.len() {
1473        return Err(SqlError::InvalidValue("truncated column data".into()));
1474    }
1475    let type_tag = data[pos];
1476    let (old_data_len, val_start) = match version {
1477        RowVersion::V2 => match fixed_width_size(type_tag) {
1478            Some(n) => (n, pos + 1),
1479            None => {
1480                if pos + 5 > data.len() {
1481                    return Err(SqlError::InvalidValue("truncated column data".into()));
1482                }
1483                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1484                (len, pos + 5)
1485            }
1486        },
1487        RowVersion::V1 => {
1488            if pos + 5 > data.len() {
1489                return Err(SqlError::InvalidValue("truncated column data".into()));
1490            }
1491            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1492            (len, pos + 5)
1493        }
1494    };
1495    let new_data_len = match value_encoded_size_v2(new_val) {
1496        Some(n) => n,
1497        None => return Ok(false),
1498    };
1499    if new_data_len != old_data_len {
1500        return Ok(false);
1501    }
1502    data[pos] = new_val.data_type().type_tag();
1503    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1504    Ok(true)
1505}
1506
1507/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1508pub fn patch_row_column(
1509    data: &[u8],
1510    target: usize,
1511    new_val: &Value,
1512    out: &mut Vec<u8>,
1513) -> Result<()> {
1514    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1515
1516    let new_col_count = if target >= col_count {
1517        target + 1
1518    } else {
1519        col_count
1520    };
1521    let new_bitmap_bytes = new_col_count.div_ceil(8);
1522    let bitmap_bytes = col_count.div_ceil(8);
1523    out.clear();
1524
1525    let header = (new_col_count as u16) | V2_FLAG;
1526    out.extend_from_slice(&header.to_le_bytes());
1527    let bitmap_start = out.len();
1528    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1529    for _ in bitmap_bytes..new_bitmap_bytes {
1530        out.push(0xFF);
1531    }
1532    if new_val.is_null() {
1533        out[bitmap_start + target / 8] |= 1 << (target % 8);
1534    } else {
1535        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1536    }
1537
1538    let mut pos = header_end;
1539    for col in 0..new_col_count {
1540        let was_null = if col < col_count {
1541            bitmap[col / 8] & (1 << (col % 8)) != 0
1542        } else {
1543            true
1544        };
1545
1546        if col == target {
1547            if !was_null {
1548                pos = skip_cell(data, pos, version)?;
1549            }
1550            if !new_val.is_null() {
1551                encode_cell_v2(new_val, out);
1552            }
1553        } else if !was_null {
1554            pos = copy_cell_to_v2(data, pos, version, out)?;
1555        }
1556    }
1557    Ok(())
1558}
1559
1560pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1561    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1562    if target >= col_count {
1563        return Ok(RawColumn::Null);
1564    }
1565
1566    for col in 0..=target {
1567        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1568
1569        if col == target {
1570            if is_null {
1571                return Ok(RawColumn::Null);
1572            }
1573            let (type_tag, body, _) = read_cell(data, pos, version)?;
1574            return decode_value_raw(type_tag, body);
1575        } else if !is_null {
1576            pos = skip_cell(data, pos, version)?;
1577        }
1578    }
1579
1580    unreachable!()
1581}
1582
1583/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1584pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1585    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1586    if target >= col_count {
1587        return Ok((RawColumn::Null, usize::MAX));
1588    }
1589
1590    for col in 0..=target {
1591        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1592
1593        if col == target {
1594            if is_null {
1595                return Ok((RawColumn::Null, usize::MAX));
1596            }
1597            let tag_offset = pos;
1598            let (type_tag, body, _) = read_cell(data, pos, version)?;
1599            let raw = decode_value_raw(type_tag, body)?;
1600            return Ok((raw, tag_offset));
1601        } else if !is_null {
1602            pos = skip_cell(data, pos, version)?;
1603        }
1604    }
1605
1606    unreachable!()
1607}
1608
1609/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1610pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1611    if offset == usize::MAX || new_val.is_null() {
1612        return Ok(false);
1613    }
1614    if data.len() < 2 || offset >= data.len() {
1615        return Err(SqlError::InvalidValue("truncated column data".into()));
1616    }
1617    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1618        RowVersion::V2
1619    } else {
1620        RowVersion::V1
1621    };
1622    let type_tag = data[offset];
1623    let (old_data_len, val_start) = match version {
1624        RowVersion::V2 => match fixed_width_size(type_tag) {
1625            Some(n) => (n, offset + 1),
1626            None => {
1627                if offset + 5 > data.len() {
1628                    return Err(SqlError::InvalidValue("truncated column data".into()));
1629                }
1630                let len =
1631                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1632                (len, offset + 5)
1633            }
1634        },
1635        RowVersion::V1 => {
1636            if offset + 5 > data.len() {
1637                return Err(SqlError::InvalidValue("truncated column data".into()));
1638            }
1639            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1640            (len, offset + 5)
1641        }
1642    };
1643    let new_data_len = match value_encoded_size_v2(new_val) {
1644        Some(n) => n,
1645        None => return Ok(false),
1646    };
1647    if new_data_len != old_data_len {
1648        return Ok(false);
1649    }
1650    data[offset] = new_val.data_type().type_tag();
1651    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1652    Ok(true)
1653}
1654
1655pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1656    if key.is_empty() || key[0] != TAG_INTEGER {
1657        return Err(SqlError::InvalidValue("not an integer key".into()));
1658    }
1659    let (val, _) = decode_signed_varint(&key[1..])?;
1660    Ok(val)
1661}
1662
1663#[cfg(test)]
1664#[path = "encoding_tests.rs"]
1665mod tests;