Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24/// Encode a single value into an order-preserving byte sequence.
25pub fn encode_key_value(value: &Value) -> Vec<u8> {
26    let mut buf = Vec::with_capacity(16);
27    encode_key_value_into(value, &mut buf);
28    buf
29}
30
31/// Encode a composite key (multiple values concatenated).
32pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33    let mut buf = Vec::new();
34    for v in values {
35        buf.extend_from_slice(&encode_key_value(v));
36    }
37    buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41    buf.clear();
42    for v in values {
43        encode_key_value_into(v, buf);
44    }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48    buf.clear();
49    for &i in indices {
50        encode_key_value_into(&row[i as usize], buf);
51    }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56    buf.clear();
57    encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61    value: &Value,
62    coll: crate::types::Collation,
63    buf: &mut Vec<u8>,
64) {
65    match (value, coll) {
66        (Value::Text(s), crate::types::Collation::NoCase) => {
67            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68        }
69        (Value::Text(s), crate::types::Collation::Rtrim) => {
70            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71        }
72        _ => encode_key_value_into(value, buf),
73    }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77    match value {
78        Value::Null => buf.push(TAG_NULL),
79        Value::Boolean(b) => {
80            buf.push(TAG_BOOLEAN);
81            buf.push(if *b { 0x01 } else { 0x00 });
82        }
83        Value::Integer(i) => encode_integer_into(*i, buf),
84        Value::Real(r) => encode_real_into(*r, buf),
85        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90        Value::Interval {
91            months,
92            days,
93            micros,
94        } => {
95            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
96            buf.push(TAG_INTERVAL);
97            let mut mb = months.to_be_bytes();
98            mb[0] ^= 0x80;
99            buf.extend_from_slice(&mb);
100            let mut db = days.to_be_bytes();
101            db[0] ^= 0x80;
102            buf.extend_from_slice(&db);
103            let mut ub = micros.to_be_bytes();
104            ub[0] ^= 0x80;
105            buf.extend_from_slice(&ub);
106        }
107        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109        Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110        Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111        Value::Array(a) => encode_array_into(a, buf),
112        Value::Vector(v) => encode_vector_into(v, buf),
113    }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117    buf.push(TAG_VECTOR);
118    let mut inner = Vec::with_capacity(2 + v.len() * 4);
119    inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120    for &x in v {
121        inner.extend_from_slice(&x.to_le_bytes());
122    }
123    encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127    buf.push(TAG_ARRAY);
128    let mut inner = Vec::new();
129    for v in elems {
130        encode_key_value_into(v, &mut inner);
131    }
132    encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136    for &b in data {
137        if b == 0x00 {
138            buf.push(0x00);
139            buf.push(0xFF);
140        } else {
141            buf.push(b);
142        }
143    }
144    buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148    encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
152/// Layout: [tag] [marker] [data bytes].
153/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
154/// 0x80-n for negative (n one's-complemented bytes follow).
155/// Byte-wise lex compare matches signed integer order.
156pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157    buf.push(tag);
158    if val == 0 {
159        buf.push(0x80);
160        return;
161    }
162    if val > 0 {
163        let bytes = val.to_be_bytes();
164        let start = bytes.iter().position(|&b| b != 0).unwrap();
165        let byte_count = (8 - start) as u8;
166        buf.push(0x80 + byte_count);
167        buf.extend_from_slice(&bytes[start..]);
168    } else {
169        let abs_val = if val == i64::MIN {
170            u64::MAX / 2 + 1
171        } else {
172            (-val) as u64
173        };
174        let bytes = abs_val.to_be_bytes();
175        let start = bytes.iter().position(|&b| b != 0).unwrap();
176        let byte_count = (8 - start) as u8;
177        buf.push(0x80 - byte_count);
178        for &b in &bytes[start..] {
179            buf.push(!b);
180        }
181    }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185    buf.push(TAG_REAL);
186    let bits = val.to_bits();
187    let encoded = if val.is_sign_negative() {
188        !bits
189    } else {
190        bits ^ (1u64 << 63)
191    };
192    buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196    buf.push(tag);
197    for &b in data {
198        if b == 0x00 {
199            buf.push(0x00);
200            buf.push(0xFF);
201        } else {
202            buf.push(b);
203        }
204    }
205    buf.push(0x00);
206}
207
208/// Decode a single key value, returning the value and the number of bytes consumed.
209pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210    if data.is_empty() {
211        return Err(SqlError::InvalidValue("empty key data".into()));
212    }
213    match data[0] {
214        TAG_NULL => Ok((Value::Null, 1)),
215        TAG_BOOLEAN => {
216            if data.len() < 2 {
217                return Err(SqlError::InvalidValue("truncated boolean".into()));
218            }
219            Ok((Value::Boolean(data[1] != 0), 2))
220        }
221        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226            (Value::Date(d), n + 1)
227        }),
228        TAG_TIMESTAMP => {
229            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230        }
231        TAG_INTERVAL => {
232            if data.len() < 1 + 16 {
233                return Err(SqlError::InvalidValue("truncated interval".into()));
234            }
235            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236            mb[0] ^= 0x80;
237            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238            db[0] ^= 0x80;
239            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240            ub[0] ^= 0x80;
241            Ok((
242                Value::Interval {
243                    months: i32::from_be_bytes(mb),
244                    days: i32::from_be_bytes(db),
245                    micros: i64::from_be_bytes(ub),
246                },
247                17,
248            ))
249        }
250        TAG_TEXT => {
251            let (bytes, n) = decode_null_escaped(&data[1..])?;
252            let s = String::from_utf8(bytes)
253                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254            Ok((Value::Text(CompactString::from(s)), n + 1))
255        }
256        TAG_BLOB => {
257            let (bytes, n) = decode_null_escaped(&data[1..])?;
258            Ok((Value::Blob(bytes), n + 1))
259        }
260        TAG_JSON => {
261            let (bytes, n) = decode_null_escaped(&data[1..])?;
262            let s = String::from_utf8(bytes)
263                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264            Ok((Value::Json(CompactString::from(s)), n + 1))
265        }
266        TAG_JSONB => {
267            let (bytes, n) = decode_null_escaped(&data[1..])?;
268            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269        }
270        TAG_TSVECTOR => {
271            let (bytes, n) = decode_null_escaped(&data[1..])?;
272            Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273        }
274        TAG_TSQUERY => {
275            let (bytes, n) = decode_null_escaped(&data[1..])?;
276            Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277        }
278        TAG_ARRAY => {
279            let (inner, n) = decode_null_escaped(&data[1..])?;
280            let mut elems = Vec::new();
281            let mut pos = 0;
282            while pos < inner.len() {
283                let (v, vlen) = decode_key_value(&inner[pos..])?;
284                elems.push(v);
285                pos += vlen;
286            }
287            Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288        }
289        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
290    }
291}
292
293/// Decode a composite key into multiple values.
294pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
295    let mut values = Vec::with_capacity(count);
296    let mut pos = 0;
297    for _ in 0..count {
298        let (v, n) = decode_key_value(&data[pos..])?;
299        values.push(v);
300        pos += n;
301    }
302    Ok(values)
303}
304
305fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
306    let (v, n) = decode_signed_varint(data)?;
307    Ok((Value::Integer(v), n))
308}
309
310/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
311pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
312    if data.is_empty() {
313        return Err(SqlError::InvalidValue("truncated integer".into()));
314    }
315    let marker = data[0];
316    if marker == 0x80 {
317        return Ok((0, 1));
318    }
319    if marker > 0x80 {
320        let byte_count = (marker - 0x80) as usize;
321        if data.len() < 1 + byte_count {
322            return Err(SqlError::InvalidValue("truncated positive integer".into()));
323        }
324        let mut bytes = [0u8; 8];
325        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
326        let val = i64::from_be_bytes(bytes);
327        Ok((val, 1 + byte_count))
328    } else {
329        let byte_count = (0x80 - marker) as usize;
330        if data.len() < 1 + byte_count {
331            return Err(SqlError::InvalidValue("truncated negative integer".into()));
332        }
333        let mut bytes = [0u8; 8];
334        for i in 0..byte_count {
335            bytes[8 - byte_count + i] = !data[1 + i];
336        }
337        let abs_val = u64::from_be_bytes(bytes);
338        let val = (-(abs_val as i128)) as i64;
339        Ok((val, 1 + byte_count))
340    }
341}
342
343fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
344    if data.len() < 8 {
345        return Err(SqlError::InvalidValue("truncated real".into()));
346    }
347    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
348    let bits = if encoded & (1u64 << 63) != 0 {
349        // Was positive: undo sign bit flip
350        encoded ^ (1u64 << 63)
351    } else {
352        // Was negative: undo full inversion
353        !encoded
354    };
355    let val = f64::from_bits(bits);
356    Ok((Value::Real(val), 8))
357}
358
359/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
360fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
361    let mut result = Vec::new();
362    let mut i = 0;
363    while i < data.len() {
364        if data[i] == 0x00 {
365            if i + 1 < data.len() && data[i + 1] == 0xFF {
366                result.push(0x00);
367                i += 2;
368            } else {
369                return Ok((result, i + 1)); // terminator consumed
370            }
371        } else {
372            result.push(data[i]);
373            i += 1;
374        }
375    }
376    Err(SqlError::InvalidValue(
377        "unterminated null-escaped string".into(),
378    ))
379}
380
381fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
382    match v {
383        Value::Integer(val) => {
384            buf.push(DataType::Integer.type_tag());
385            buf.extend_from_slice(&val.to_le_bytes());
386        }
387        Value::Real(r) => {
388            buf.push(DataType::Real.type_tag());
389            buf.extend_from_slice(&r.to_le_bytes());
390        }
391        Value::Boolean(b) => {
392            buf.push(DataType::Boolean.type_tag());
393            buf.push(if *b { 1 } else { 0 });
394        }
395        Value::Text(s) => {
396            let bytes = s.as_bytes();
397            buf.push(DataType::Text.type_tag());
398            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
399            buf.extend_from_slice(bytes);
400        }
401        Value::Blob(data) => {
402            buf.push(DataType::Blob.type_tag());
403            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
404            buf.extend_from_slice(data);
405        }
406        Value::Time(t) => {
407            buf.push(DataType::Time.type_tag());
408            buf.extend_from_slice(&t.to_le_bytes());
409        }
410        Value::Date(d) => {
411            buf.push(DataType::Date.type_tag());
412            buf.extend_from_slice(&d.to_le_bytes());
413        }
414        Value::Timestamp(t) => {
415            buf.push(DataType::Timestamp.type_tag());
416            buf.extend_from_slice(&t.to_le_bytes());
417        }
418        Value::Interval {
419            months,
420            days,
421            micros,
422        } => {
423            buf.push(DataType::Interval.type_tag());
424            buf.extend_from_slice(&months.to_le_bytes());
425            buf.extend_from_slice(&days.to_le_bytes());
426            buf.extend_from_slice(&micros.to_le_bytes());
427        }
428        Value::Json(s) => {
429            let bytes = s.as_bytes();
430            buf.push(DataType::Json.type_tag());
431            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
432            buf.extend_from_slice(bytes);
433        }
434        Value::Jsonb(b) => {
435            buf.push(DataType::Jsonb.type_tag());
436            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
437            buf.extend_from_slice(b);
438        }
439        Value::TsVector(b) => {
440            buf.push(DataType::TsVector.type_tag());
441            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
442            buf.extend_from_slice(b);
443        }
444        Value::TsQuery(b) => {
445            buf.push(DataType::TsQuery.type_tag());
446            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
447            buf.extend_from_slice(b);
448        }
449        Value::Array(a) => {
450            buf.push(DataType::Array.type_tag());
451            let len = encoded_array_v2_size(a);
452            buf.extend_from_slice(&(len as u32).to_le_bytes());
453            let start = buf.len();
454            buf.resize(start + len, 0);
455            write_array_v2_into_slice(a, &mut buf[start..start + len]);
456        }
457        Value::Vector(v) => {
458            buf.push(
459                DataType::Vector {
460                    dim: v.len() as u16,
461                }
462                .type_tag(),
463            );
464            let len = 2 + v.len() * 4;
465            buf.extend_from_slice(&(len as u32).to_le_bytes());
466            buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
467            for &x in v.iter() {
468                buf.extend_from_slice(&x.to_le_bytes());
469            }
470        }
471        Value::Null => unreachable!(),
472    }
473}
474
475pub fn encode_row(values: &[Value]) -> Vec<u8> {
476    let mut buf = Vec::new();
477    encode_row_into(values, &mut buf);
478    buf
479}
480
481pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
482    buf.clear();
483    let col_count = values.len();
484    let bitmap_bytes = col_count.div_ceil(8);
485
486    let header = (col_count as u16) | V2_FLAG;
487    buf.extend_from_slice(&header.to_le_bytes());
488
489    let bitmap_start = buf.len();
490    buf.resize(buf.len() + bitmap_bytes, 0);
491
492    for (i, v) in values.iter().enumerate() {
493        if v.is_null() {
494            buf[bitmap_start + i / 8] |= 1 << (i % 8);
495            continue;
496        }
497        encode_cell_v2(v, buf);
498    }
499}
500
501/// A physical slot: NULL, a runtime-filled integer hole, or a frozen constant.
502pub enum TemplateSlot {
503    Null,
504    IntHole,
505    Const(Value),
506}
507
508pub struct RowTemplate {
509    pub template: Vec<u8>,
510    /// `(slot, byte offset)` of each integer hole the runtime fills.
511    pub slot_offsets: Vec<(usize, usize)>,
512}
513
514pub fn build_row_template(phys_count: usize, slots: &[TemplateSlot]) -> RowTemplate {
515    let bitmap_bytes = phys_count.div_ceil(8);
516    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
517    let header = (phys_count as u16) | V2_FLAG;
518    template.extend_from_slice(&header.to_le_bytes());
519    let bitmap_start = template.len();
520    template.resize(bitmap_start + bitmap_bytes, 0);
521    let mut slot_offsets = Vec::new();
522    let set_null = |template: &mut [u8], slot: usize| {
523        template[bitmap_start + slot / 8] |= 1 << (slot % 8);
524    };
525    for (slot, kind) in slots.iter().enumerate() {
526        match kind {
527            TemplateSlot::Null => set_null(&mut template, slot),
528            TemplateSlot::IntHole => {
529                template.push(DataType::Integer.type_tag());
530                let value_offset = template.len();
531                template.extend_from_slice(&[0u8; 8]);
532                slot_offsets.push((slot, value_offset));
533            }
534            TemplateSlot::Const(v) if v.is_null() => set_null(&mut template, slot),
535            TemplateSlot::Const(v) => encode_cell_v2(v, &mut template),
536        }
537    }
538    RowTemplate {
539        template,
540        slot_offsets,
541    }
542}
543
544/// Caller must guarantee every `values[slot]` for an integer hole is `Value::Integer`.
545#[inline]
546pub fn encode_row_with_template(
547    tmpl: &RowTemplate,
548    values: &[Value],
549    buf: &mut Vec<u8>,
550) -> Result<()> {
551    buf.clear();
552    buf.extend_from_slice(&tmpl.template);
553    for &(slot, off) in &tmpl.slot_offsets {
554        match &values[slot] {
555            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
556            other => {
557                return Err(SqlError::TypeMismatch {
558                    expected: "Integer".into(),
559                    got: other.data_type().to_string(),
560                });
561            }
562        }
563    }
564    Ok(())
565}
566
567fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
568    match DataType::from_tag(type_tag) {
569        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
570            data[..8].try_into().unwrap(),
571        ))),
572        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
573            data[..8].try_into().unwrap(),
574        ))),
575        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
576        Some(DataType::Text) => {
577            let s = std::str::from_utf8(data)
578                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
579            Ok(Value::Text(CompactString::from(s)))
580        }
581        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
582        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
583            data[..8].try_into().unwrap(),
584        ))),
585        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
586            data[..4].try_into().unwrap(),
587        ))),
588        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
589            data[..8].try_into().unwrap(),
590        ))),
591        Some(DataType::Interval) => {
592            if data.len() < 16 {
593                return Err(SqlError::InvalidValue("truncated interval".into()));
594            }
595            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
596            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
597            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
598            Ok(Value::Interval {
599                months,
600                days,
601                micros,
602            })
603        }
604        Some(DataType::Json) => {
605            let s = std::str::from_utf8(data)
606                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
607            Ok(Value::Json(CompactString::from(s)))
608        }
609        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
610        Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
611        Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
612        Some(DataType::Array) => decode_array_v2(data),
613        Some(DataType::Vector { .. }) => decode_vector(data),
614        _ => Err(SqlError::InvalidValue(format!(
615            "unknown column type tag: {type_tag}"
616        ))),
617    }
618}
619
620fn decode_vector(data: &[u8]) -> Result<Value> {
621    if data.len() < 2 {
622        return Err(SqlError::InvalidValue("truncated vector".into()));
623    }
624    let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
625    if data.len() < 2 + dim * 4 {
626        return Err(SqlError::InvalidValue("truncated vector payload".into()));
627    }
628    let mut v = Vec::with_capacity(dim);
629    for i in 0..dim {
630        let off = 2 + i * 4;
631        v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
632    }
633    Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
634}
635
636fn encoded_array_v2_size(elems: &[Value]) -> usize {
637    let mut total = 4;
638    for elem in elems {
639        if elem.is_null() {
640            total += 1;
641            continue;
642        }
643        total += 1 + 1;
644        let tag = elem.data_type().type_tag();
645        match fixed_width_size(tag) {
646            Some(n) => total += n,
647            None => total += 4 + variable_cell_payload_size(elem),
648        }
649    }
650    total
651}
652
653fn variable_cell_payload_size(v: &Value) -> usize {
654    match v {
655        Value::Text(s) => s.len(),
656        Value::Blob(b) => b.len(),
657        Value::Json(s) => s.len(),
658        Value::Jsonb(b) => b.len(),
659        Value::TsVector(b) => b.len(),
660        Value::TsQuery(b) => b.len(),
661        Value::Array(a) => encoded_array_v2_size(a),
662        Value::Vector(v) => 2 + v.len() * 4,
663        _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
664    }
665}
666
667fn value_encoded_size_v2(v: &Value) -> Option<usize> {
668    if v.is_null() {
669        return None;
670    }
671    Some(match fixed_width_size(v.data_type().type_tag()) {
672        Some(n) => n,
673        None => variable_cell_payload_size(v),
674    })
675}
676
677fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
678    match v {
679        Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
680        Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
681        Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
682        Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
683        Value::Blob(b) => out[..b.len()].copy_from_slice(b),
684        Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
685        Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
686        Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
687        Value::Interval {
688            months,
689            days,
690            micros,
691        } => {
692            out[..4].copy_from_slice(&months.to_le_bytes());
693            out[4..8].copy_from_slice(&days.to_le_bytes());
694            out[8..16].copy_from_slice(&micros.to_le_bytes());
695        }
696        Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
697        Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
698        Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
699        Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
700        Value::Array(a) => write_array_v2_into_slice(a, out),
701        Value::Vector(v) => {
702            out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
703            let mut pos = 2;
704            for &x in v.iter() {
705                out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
706                pos += 4;
707            }
708        }
709        Value::Null => unreachable!(),
710    }
711}
712
713fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
714    out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
715    let mut pos = 4;
716    for elem in elems {
717        if elem.is_null() {
718            out[pos] = 0xFF;
719            pos += 1;
720            continue;
721        }
722        out[pos] = 0x00;
723        pos += 1;
724        let tag = elem.data_type().type_tag();
725        out[pos] = tag;
726        pos += 1;
727        match fixed_width_size(tag) {
728            Some(n) => {
729                write_value_payload_v2(elem, &mut out[pos..pos + n]);
730                pos += n;
731            }
732            None => {
733                let payload_len = variable_cell_payload_size(elem);
734                out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
735                pos += 4;
736                write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
737                pos += payload_len;
738            }
739        }
740    }
741}
742
743fn decode_array_v2(data: &[u8]) -> Result<Value> {
744    if data.len() < 4 {
745        return Err(SqlError::InvalidValue("truncated array length".into()));
746    }
747    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
748    let mut pos = 4;
749    let mut elems = Vec::with_capacity(count);
750    for _ in 0..count {
751        if pos >= data.len() {
752            return Err(SqlError::InvalidValue("truncated array elements".into()));
753        }
754        if data[pos] == 0xFF {
755            elems.push(Value::Null);
756            pos += 1;
757            continue;
758        }
759        if data[pos] != 0x00 {
760            return Err(SqlError::InvalidValue(
761                "invalid array element marker".into(),
762            ));
763        }
764        pos += 1;
765        if pos >= data.len() {
766            return Err(SqlError::InvalidValue("truncated array element".into()));
767        }
768        let type_tag = data[pos];
769        pos += 1;
770        let (val, advance) = match fixed_width_size(type_tag) {
771            Some(n) => {
772                if pos + n > data.len() {
773                    return Err(SqlError::InvalidValue(
774                        "truncated fixed-width array element".into(),
775                    ));
776                }
777                let v = decode_value(type_tag, &data[pos..pos + n])?;
778                (v, n)
779            }
780            None => {
781                if pos + 4 > data.len() {
782                    return Err(SqlError::InvalidValue(
783                        "truncated array element length".into(),
784                    ));
785                }
786                let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
787                pos += 4;
788                if pos + len > data.len() {
789                    return Err(SqlError::InvalidValue(
790                        "truncated variable-width array element".into(),
791                    ));
792                }
793                let v = decode_value(type_tag, &data[pos..pos + len])?;
794                (v, len)
795            }
796        };
797        pos += advance;
798        elems.push(val);
799    }
800    Ok(Value::Array(std::sync::Arc::new(elems)))
801}
802
803/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
804/// High bit of `col_count:u16` flags V2.
805#[derive(Clone, Copy, PartialEq, Eq, Debug)]
806pub(crate) enum RowVersion {
807    V1,
808    V2,
809}
810
811pub(crate) const V2_FLAG: u16 = 0x8000;
812pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
813
814#[inline]
815pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
816    match DataType::from_tag(type_tag)? {
817        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
818        DataType::Date => Some(4),
819        DataType::Boolean => Some(1),
820        DataType::Interval => Some(16),
821        DataType::Text
822        | DataType::Blob
823        | DataType::Json
824        | DataType::Jsonb
825        | DataType::TsVector
826        | DataType::TsQuery
827        | DataType::Array
828        | DataType::Vector { .. }
829        | DataType::Null => None,
830    }
831}
832
833/// Resolve a cell's `(data_len, body_pos)` from its tag. Variable-width cells carry a
834/// u32 length prefix; V2 fixed-width cells omit it.
835#[inline]
836fn cell_extent(
837    data: &[u8],
838    type_tag: u8,
839    after_tag: usize,
840    version: RowVersion,
841) -> Result<(usize, usize)> {
842    let fixed = match version {
843        RowVersion::V2 => fixed_width_size(type_tag),
844        RowVersion::V1 => None,
845    };
846    if let Some(n) = fixed {
847        return Ok((n, after_tag));
848    }
849    if after_tag + 4 > data.len() {
850        return Err(SqlError::InvalidValue("truncated column data".into()));
851    }
852    let len = u32::from_le_bytes([
853        data[after_tag],
854        data[after_tag + 1],
855        data[after_tag + 2],
856        data[after_tag + 3],
857    ]) as usize;
858    Ok((len, after_tag + 4))
859}
860
861#[inline]
862fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
863    if pos >= data.len() {
864        return Err(SqlError::InvalidValue("truncated column data".into()));
865    }
866    let type_tag = data[pos];
867    let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
868    if body_pos + data_len > data.len() {
869        return Err(SqlError::InvalidValue("truncated column value".into()));
870    }
871    Ok((
872        type_tag,
873        &data[body_pos..body_pos + data_len],
874        body_pos + data_len,
875    ))
876}
877
878/// Next cell position by offset; the body is left unsliced (the next read validates it).
879#[inline]
880fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
881    if pos >= data.len() {
882        return Err(SqlError::InvalidValue("truncated column data".into()));
883    }
884    let type_tag = data[pos];
885    let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
886    Ok(body_pos + data_len)
887}
888
889fn copy_cell_to_v2(
890    data: &[u8],
891    pos: usize,
892    version: RowVersion,
893    out: &mut Vec<u8>,
894) -> Result<usize> {
895    let (tag, body, next) = read_cell(data, pos, version)?;
896    out.push(tag);
897    if fixed_width_size(tag).is_none() {
898        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
899    }
900    out.extend_from_slice(body);
901    Ok(next)
902}
903
904fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
905    if data.len() < 2 {
906        return Err(SqlError::InvalidValue("row data too short".into()));
907    }
908    let raw = u16::from_le_bytes([data[0], data[1]]);
909    let version = if raw & V2_FLAG != 0 {
910        RowVersion::V2
911    } else {
912        RowVersion::V1
913    };
914    let col_count = (raw & COL_COUNT_MASK) as usize;
915    let bitmap_bytes = col_count.div_ceil(8);
916    let pos = 2;
917    if data.len() < pos + bitmap_bytes {
918        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
919    }
920    Ok((
921        version,
922        col_count,
923        &data[pos..pos + bitmap_bytes],
924        pos + bitmap_bytes,
925    ))
926}
927
928pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
929    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
930
931    let mut values = Vec::with_capacity(col_count);
932    for i in 0..col_count {
933        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
934            values.push(Value::Null);
935            continue;
936        }
937        let (type_tag, body, next) = read_cell(data, pos, version)?;
938        values.push(decode_value(type_tag, body)?);
939        pos = next;
940    }
941
942    Ok(values)
943}
944
945/// Push non-PK cells onto `out` in physical order. `Ok(false)` if stored count != `expected`.
946/// Sound only when physical order == logical order (no dropped slots).
947pub(crate) fn decode_row_push(data: &[u8], expected: usize, out: &mut Vec<Value>) -> Result<bool> {
948    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
949    if col_count != expected {
950        return Ok(false);
951    }
952    for col in 0..col_count {
953        if bitmap[col / 8] & (1 << (col % 8)) != 0 {
954            out.push(Value::Null);
955        } else {
956            let (type_tag, body, next) = read_cell(data, pos, version)?;
957            out.push(decode_value(type_tag, body)?);
958            pos = next;
959        }
960    }
961    Ok(true)
962}
963
964/// Returns the number of non-PK columns stored in a row value blob.
965#[inline]
966pub fn row_non_pk_count(data: &[u8]) -> usize {
967    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
968}
969
970pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
971    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
972
973    for i in 0..col_count {
974        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
975            continue;
976        }
977        let (type_tag, body, next) = read_cell(data, pos, version)?;
978        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
979            out[col_mapping[i]] = decode_value(type_tag, body)?;
980        }
981        pos = next;
982    }
983
984    Ok(())
985}
986
987pub fn decode_pk_into(
988    key: &[u8],
989    count: usize,
990    out: &mut [Value],
991    pk_mapping: &[usize],
992) -> Result<()> {
993    let mut pos = 0;
994    for i in 0..count {
995        let (v, n) = decode_key_value(&key[pos..])?;
996        if i < pk_mapping.len() {
997            out[pk_mapping[i]] = v;
998        }
999        pos += n;
1000    }
1001    Ok(())
1002}
1003
1004pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
1005    if targets.is_empty() {
1006        return Ok(Vec::new());
1007    }
1008    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1009
1010    let mut results = Vec::with_capacity(targets.len());
1011    let mut ti = 0;
1012
1013    for col in 0..col_count {
1014        if ti >= targets.len() {
1015            break;
1016        }
1017        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1018
1019        if col == targets[ti] {
1020            if is_null {
1021                results.push(Value::Null);
1022            } else {
1023                let (type_tag, body, next) = read_cell(data, pos, version)?;
1024                results.push(decode_value(type_tag, body)?);
1025                pos = next;
1026            }
1027            ti += 1;
1028        } else if !is_null {
1029            pos = skip_cell(data, pos, version)?;
1030        }
1031    }
1032
1033    while ti < targets.len() {
1034        results.push(Value::Null);
1035        ti += 1;
1036    }
1037
1038    Ok(results)
1039}
1040
1041pub fn decode_columns_into(
1042    data: &[u8],
1043    targets: &[usize],
1044    schema_cols: &[usize],
1045    row: &mut [Value],
1046) -> Result<()> {
1047    if targets.is_empty() {
1048        return Ok(());
1049    }
1050    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1051
1052    let mut ti = 0;
1053    for col in 0..col_count {
1054        if ti >= targets.len() {
1055            break;
1056        }
1057        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1058
1059        if col == targets[ti] {
1060            if is_null {
1061                row[schema_cols[ti]] = Value::Null;
1062            } else {
1063                let (type_tag, body, next) = read_cell(data, pos, version)?;
1064                row[schema_cols[ti]] = decode_value(type_tag, body)?;
1065                pos = next;
1066            }
1067            ti += 1;
1068        } else if !is_null {
1069            pos = skip_cell(data, pos, version)?;
1070        }
1071    }
1072
1073    Ok(())
1074}
1075
1076struct OffsetTarget {
1077    cell_pos: usize,
1078    tag: u8,
1079    fixed_width: Option<usize>,
1080    out_pos: usize,
1081}
1082
1083/// Reads projected non-PK columns from a V2 row by static byte offset. Built only when
1084/// every column before the last target is fixed-width, so offsets are constant.
1085pub(crate) struct ProjectedOffsetPlan {
1086    expected_header: u16,
1087    body_start: usize,
1088    nonnull_mask: Vec<u8>,
1089    targets: Vec<OffsetTarget>,
1090}
1091
1092impl ProjectedOffsetPlan {
1093    /// `targets` = `(physical_index, out_position)`. `None` if a variable-width column
1094    /// precedes the last target.
1095    pub(crate) fn build(phys_tags: &[u8], targets: &[(usize, usize)]) -> Option<Self> {
1096        if targets.is_empty() {
1097            return None;
1098        }
1099        let max_t = targets.iter().map(|&(p, _)| p).max()?;
1100        let mut offsets = Vec::with_capacity(max_t + 1);
1101        let mut acc = 0usize;
1102        for (i, &tag) in phys_tags.iter().enumerate().take(max_t + 1) {
1103            offsets.push(acc);
1104            if i < max_t {
1105                acc += 1 + fixed_width_size(tag)?;
1106            }
1107        }
1108        let mut out_targets = Vec::with_capacity(targets.len());
1109        for &(p, out_pos) in targets {
1110            let tag = phys_tags[p];
1111            out_targets.push(OffsetTarget {
1112                cell_pos: offsets[p],
1113                tag,
1114                fixed_width: fixed_width_size(tag),
1115                out_pos,
1116            });
1117        }
1118        let phys_count = phys_tags.len();
1119        let bitmap_bytes = phys_count.div_ceil(8);
1120        let mut nonnull_mask = vec![0u8; bitmap_bytes];
1121        for bit in 0..=max_t {
1122            nonnull_mask[bit / 8] |= 1 << (bit % 8);
1123        }
1124        Some(Self {
1125            expected_header: V2_FLAG | (phys_count as u16),
1126            body_start: 2 + bitmap_bytes,
1127            nonnull_mask,
1128            targets: out_targets,
1129        })
1130    }
1131
1132    /// True when `data` matches the plan's V2 header and has no NULL in the static prefix.
1133    #[inline]
1134    fn layout_ok(&self, data: &[u8]) -> bool {
1135        if data.len() < self.body_start
1136            || u16::from_le_bytes([data[0], data[1]]) != self.expected_header
1137        {
1138            return false;
1139        }
1140        self.nonnull_mask
1141            .iter()
1142            .enumerate()
1143            .all(|(i, &m)| data[2 + i] & m == 0)
1144    }
1145
1146    /// Decode one target by static offset. `Ok(None)` = tag/bounds mismatch (fall back).
1147    #[inline]
1148    fn read_target(&self, data: &[u8], t: &OffsetTarget) -> Result<Option<Value>> {
1149        let pos = self.body_start + t.cell_pos;
1150        if data.get(pos) != Some(&t.tag) {
1151            return Ok(None);
1152        }
1153        let after_tag = pos + 1;
1154        let (len, body_pos) = match t.fixed_width {
1155            Some(n) => (n, after_tag),
1156            None => match data.get(after_tag..after_tag + 4) {
1157                Some(lb) => (
1158                    u32::from_le_bytes(lb.try_into().unwrap()) as usize,
1159                    after_tag + 4,
1160                ),
1161                None => return Ok(None),
1162            },
1163        };
1164        match data.get(body_pos..body_pos + len) {
1165            Some(body) => Ok(Some(decode_value(t.tag, body)?)),
1166            None => Ok(None),
1167        }
1168    }
1169
1170    /// Decode planned columns by index into `row`. `Ok(false)` = layout mismatch (fall back).
1171    pub(crate) fn decode_into(&self, data: &[u8], row: &mut [Value]) -> Result<bool> {
1172        if !self.layout_ok(data) {
1173            return Ok(false);
1174        }
1175        for t in &self.targets {
1176            match self.read_target(data, t)? {
1177                Some(v) => row[t.out_pos] = v,
1178                None => return Ok(false),
1179            }
1180        }
1181        Ok(true)
1182    }
1183
1184    /// Push planned columns onto `out` (monotonic projection only). `Ok(false)` = layout
1185    /// mismatch; `out` may be left partially pushed and must be discarded by the caller.
1186    pub(crate) fn decode_push(&self, data: &[u8], out: &mut Vec<Value>) -> Result<bool> {
1187        if !self.layout_ok(data) {
1188            return Ok(false);
1189        }
1190        for t in &self.targets {
1191            match self.read_target(data, t)? {
1192                Some(v) => out.push(v),
1193                None => return Ok(false),
1194            }
1195        }
1196        Ok(true)
1197    }
1198}
1199
1200#[derive(Debug, Clone, Copy)]
1201pub enum RawColumn<'a> {
1202    Null,
1203    Integer(i64),
1204    Real(f64),
1205    Boolean(bool),
1206    Text(&'a str),
1207    Blob(&'a [u8]),
1208    Time(i64),
1209    Date(i32),
1210    Timestamp(i64),
1211    Interval { months: i32, days: i32, micros: i64 },
1212    Json(&'a str),
1213    Jsonb(&'a [u8]),
1214    TsVector(&'a [u8]),
1215    TsQuery(&'a [u8]),
1216    Array(&'a [u8]),
1217    Vector(&'a [u8]),
1218}
1219
1220impl<'a> RawColumn<'a> {
1221    pub fn to_value(self) -> Value {
1222        match self {
1223            RawColumn::Null => Value::Null,
1224            RawColumn::Integer(i) => Value::Integer(i),
1225            RawColumn::Real(r) => Value::Real(r),
1226            RawColumn::Boolean(b) => Value::Boolean(b),
1227            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1228            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1229            RawColumn::Time(t) => Value::Time(t),
1230            RawColumn::Date(d) => Value::Date(d),
1231            RawColumn::Timestamp(t) => Value::Timestamp(t),
1232            RawColumn::Interval {
1233                months,
1234                days,
1235                micros,
1236            } => Value::Interval {
1237                months,
1238                days,
1239                micros,
1240            },
1241            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1242            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1243            RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1244            RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1245            RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1246            RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1247        }
1248    }
1249
1250    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1251        use std::cmp::Ordering;
1252        match (self, other) {
1253            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1254            (RawColumn::Null, _) | (_, Value::Null) => None,
1255            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1256            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1257            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1258            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1259            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1260            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1261            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1262            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1263            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1264            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1265            (
1266                RawColumn::Interval {
1267                    months: am,
1268                    days: ad,
1269                    micros: au,
1270                },
1271                Value::Interval {
1272                    months: bm,
1273                    days: bd,
1274                    micros: bu,
1275                },
1276            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1277            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1278            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1279            (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1280            (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1281            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1282                Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1283                _ => None,
1284            },
1285            _ => None,
1286        }
1287    }
1288
1289    pub fn eq_value(&self, other: &Value) -> bool {
1290        match (self, other) {
1291            (RawColumn::Null, Value::Null) => true,
1292            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1293            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1294            (RawColumn::Real(a), Value::Real(b)) => a == b,
1295            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1296            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1297            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1298            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1299            (RawColumn::Time(a), Value::Time(b)) => a == b,
1300            (RawColumn::Date(a), Value::Date(b)) => a == b,
1301            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1302            (
1303                RawColumn::Interval {
1304                    months: am,
1305                    days: ad,
1306                    micros: au,
1307                },
1308                Value::Interval {
1309                    months: bm,
1310                    days: bd,
1311                    micros: bu,
1312                },
1313            ) => am == bm && ad == bd && au == bu,
1314            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1315            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1316            (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1317            (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1318            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1319                Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1320                _ => false,
1321            },
1322            _ => false,
1323        }
1324    }
1325
1326    pub fn as_f64(&self) -> Option<f64> {
1327        match self {
1328            RawColumn::Integer(i) => Some(*i as f64),
1329            RawColumn::Real(r) => Some(*r),
1330            _ => None,
1331        }
1332    }
1333
1334    pub fn as_i64(&self) -> Option<i64> {
1335        match self {
1336            RawColumn::Integer(i) => Some(*i),
1337            RawColumn::Time(t) => Some(*t),
1338            RawColumn::Date(d) => Some(*d as i64),
1339            RawColumn::Timestamp(t) => Some(*t),
1340            _ => None,
1341        }
1342    }
1343}
1344
1345fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1346    match DataType::from_tag(type_tag) {
1347        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1348            data[..8].try_into().unwrap(),
1349        ))),
1350        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1351            data[..8].try_into().unwrap(),
1352        ))),
1353        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1354        Some(DataType::Text) => {
1355            let s = std::str::from_utf8(data)
1356                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1357            Ok(RawColumn::Text(s))
1358        }
1359        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1360        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1361            data[..8].try_into().unwrap(),
1362        ))),
1363        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1364            data[..4].try_into().unwrap(),
1365        ))),
1366        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1367            data[..8].try_into().unwrap(),
1368        ))),
1369        Some(DataType::Interval) => {
1370            if data.len() < 16 {
1371                return Err(SqlError::InvalidValue("truncated interval".into()));
1372            }
1373            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1374            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1375            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1376            Ok(RawColumn::Interval {
1377                months,
1378                days,
1379                micros,
1380            })
1381        }
1382        Some(DataType::Json) => {
1383            let s = std::str::from_utf8(data)
1384                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1385            Ok(RawColumn::Json(s))
1386        }
1387        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1388        Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1389        Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1390        Some(DataType::Array) => Ok(RawColumn::Array(data)),
1391        Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1392        _ => Err(SqlError::InvalidValue(format!(
1393            "unknown column type tag: {type_tag}"
1394        ))),
1395    }
1396}
1397
1398/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
1399pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1400    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1401    if target >= col_count || new_val.is_null() {
1402        return Ok(false);
1403    }
1404    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1405    if was_null {
1406        return Ok(false);
1407    }
1408    for col in 0..target {
1409        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1410        if !is_null {
1411            pos = skip_cell(data, pos, version)?;
1412        }
1413    }
1414    if pos >= data.len() {
1415        return Err(SqlError::InvalidValue("truncated column data".into()));
1416    }
1417    let type_tag = data[pos];
1418    let (old_data_len, val_start) = match version {
1419        RowVersion::V2 => match fixed_width_size(type_tag) {
1420            Some(n) => (n, pos + 1),
1421            None => {
1422                if pos + 5 > data.len() {
1423                    return Err(SqlError::InvalidValue("truncated column data".into()));
1424                }
1425                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1426                (len, pos + 5)
1427            }
1428        },
1429        RowVersion::V1 => {
1430            if pos + 5 > data.len() {
1431                return Err(SqlError::InvalidValue("truncated column data".into()));
1432            }
1433            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1434            (len, pos + 5)
1435        }
1436    };
1437    let new_data_len = match value_encoded_size_v2(new_val) {
1438        Some(n) => n,
1439        None => return Ok(false),
1440    };
1441    if new_data_len != old_data_len {
1442        return Ok(false);
1443    }
1444    data[pos] = new_val.data_type().type_tag();
1445    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1446    Ok(true)
1447}
1448
1449/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1450pub fn patch_row_column(
1451    data: &[u8],
1452    target: usize,
1453    new_val: &Value,
1454    out: &mut Vec<u8>,
1455) -> Result<()> {
1456    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1457
1458    let new_col_count = if target >= col_count {
1459        target + 1
1460    } else {
1461        col_count
1462    };
1463    let new_bitmap_bytes = new_col_count.div_ceil(8);
1464    let bitmap_bytes = col_count.div_ceil(8);
1465    out.clear();
1466
1467    let header = (new_col_count as u16) | V2_FLAG;
1468    out.extend_from_slice(&header.to_le_bytes());
1469    let bitmap_start = out.len();
1470    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1471    for _ in bitmap_bytes..new_bitmap_bytes {
1472        out.push(0xFF);
1473    }
1474    if new_val.is_null() {
1475        out[bitmap_start + target / 8] |= 1 << (target % 8);
1476    } else {
1477        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1478    }
1479
1480    let mut pos = header_end;
1481    for col in 0..new_col_count {
1482        let was_null = if col < col_count {
1483            bitmap[col / 8] & (1 << (col % 8)) != 0
1484        } else {
1485            true
1486        };
1487
1488        if col == target {
1489            if !was_null {
1490                pos = skip_cell(data, pos, version)?;
1491            }
1492            if !new_val.is_null() {
1493                encode_cell_v2(new_val, out);
1494            }
1495        } else if !was_null {
1496            pos = copy_cell_to_v2(data, pos, version, out)?;
1497        }
1498    }
1499    Ok(())
1500}
1501
1502pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1503    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1504    if target >= col_count {
1505        return Ok(RawColumn::Null);
1506    }
1507
1508    for col in 0..=target {
1509        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1510
1511        if col == target {
1512            if is_null {
1513                return Ok(RawColumn::Null);
1514            }
1515            let (type_tag, body, _) = read_cell(data, pos, version)?;
1516            return decode_value_raw(type_tag, body);
1517        } else if !is_null {
1518            pos = skip_cell(data, pos, version)?;
1519        }
1520    }
1521
1522    unreachable!()
1523}
1524
1525/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1526pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1527    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1528    if target >= col_count {
1529        return Ok((RawColumn::Null, usize::MAX));
1530    }
1531
1532    for col in 0..=target {
1533        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1534
1535        if col == target {
1536            if is_null {
1537                return Ok((RawColumn::Null, usize::MAX));
1538            }
1539            let tag_offset = pos;
1540            let (type_tag, body, _) = read_cell(data, pos, version)?;
1541            let raw = decode_value_raw(type_tag, body)?;
1542            return Ok((raw, tag_offset));
1543        } else if !is_null {
1544            pos = skip_cell(data, pos, version)?;
1545        }
1546    }
1547
1548    unreachable!()
1549}
1550
1551/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1552pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1553    if offset == usize::MAX || new_val.is_null() {
1554        return Ok(false);
1555    }
1556    if data.len() < 2 || offset >= data.len() {
1557        return Err(SqlError::InvalidValue("truncated column data".into()));
1558    }
1559    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1560        RowVersion::V2
1561    } else {
1562        RowVersion::V1
1563    };
1564    let type_tag = data[offset];
1565    let (old_data_len, val_start) = match version {
1566        RowVersion::V2 => match fixed_width_size(type_tag) {
1567            Some(n) => (n, offset + 1),
1568            None => {
1569                if offset + 5 > data.len() {
1570                    return Err(SqlError::InvalidValue("truncated column data".into()));
1571                }
1572                let len =
1573                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1574                (len, offset + 5)
1575            }
1576        },
1577        RowVersion::V1 => {
1578            if offset + 5 > data.len() {
1579                return Err(SqlError::InvalidValue("truncated column data".into()));
1580            }
1581            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1582            (len, offset + 5)
1583        }
1584    };
1585    let new_data_len = match value_encoded_size_v2(new_val) {
1586        Some(n) => n,
1587        None => return Ok(false),
1588    };
1589    if new_data_len != old_data_len {
1590        return Ok(false);
1591    }
1592    data[offset] = new_val.data_type().type_tag();
1593    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1594    Ok(true)
1595}
1596
1597pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1598    if key.is_empty() || key[0] != TAG_INTEGER {
1599        return Err(SqlError::InvalidValue("not an integer key".into()));
1600    }
1601    let (val, _) = decode_signed_varint(&key[1..])?;
1602    Ok(val)
1603}
1604
1605#[cfg(test)]
1606#[path = "encoding_tests.rs"]
1607mod tests;