Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24/// Encode a single value into an order-preserving byte sequence.
25pub fn encode_key_value(value: &Value) -> Vec<u8> {
26    let mut buf = Vec::with_capacity(16);
27    encode_key_value_into(value, &mut buf);
28    buf
29}
30
31/// Encode a composite key (multiple values concatenated).
32pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33    let mut buf = Vec::new();
34    for v in values {
35        buf.extend_from_slice(&encode_key_value(v));
36    }
37    buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41    buf.clear();
42    for v in values {
43        encode_key_value_into(v, buf);
44    }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48    buf.clear();
49    for &i in indices {
50        encode_key_value_into(&row[i as usize], buf);
51    }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56    buf.clear();
57    encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61    value: &Value,
62    coll: crate::types::Collation,
63    buf: &mut Vec<u8>,
64) {
65    match (value, coll) {
66        (Value::Text(s), crate::types::Collation::NoCase) => {
67            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68        }
69        (Value::Text(s), crate::types::Collation::Rtrim) => {
70            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71        }
72        _ => encode_key_value_into(value, buf),
73    }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77    match value {
78        Value::Null => buf.push(TAG_NULL),
79        Value::Boolean(b) => {
80            buf.push(TAG_BOOLEAN);
81            buf.push(if *b { 0x01 } else { 0x00 });
82        }
83        Value::Integer(i) => encode_integer_into(*i, buf),
84        Value::Real(r) => encode_real_into(*r, buf),
85        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90        Value::Interval {
91            months,
92            days,
93            micros,
94        } => {
95            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
96            buf.push(TAG_INTERVAL);
97            let mut mb = months.to_be_bytes();
98            mb[0] ^= 0x80;
99            buf.extend_from_slice(&mb);
100            let mut db = days.to_be_bytes();
101            db[0] ^= 0x80;
102            buf.extend_from_slice(&db);
103            let mut ub = micros.to_be_bytes();
104            ub[0] ^= 0x80;
105            buf.extend_from_slice(&ub);
106        }
107        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109        Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110        Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111        Value::Array(a) => encode_array_into(a, buf),
112        Value::Vector(v) => encode_vector_into(v, buf),
113    }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117    buf.push(TAG_VECTOR);
118    let mut inner = Vec::with_capacity(2 + v.len() * 4);
119    inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120    for &x in v {
121        inner.extend_from_slice(&x.to_le_bytes());
122    }
123    encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127    buf.push(TAG_ARRAY);
128    let mut inner = Vec::new();
129    for v in elems {
130        encode_key_value_into(v, &mut inner);
131    }
132    encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136    for &b in data {
137        if b == 0x00 {
138            buf.push(0x00);
139            buf.push(0xFF);
140        } else {
141            buf.push(b);
142        }
143    }
144    buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148    encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
152/// Layout: [tag] [marker] [data bytes].
153/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
154/// 0x80-n for negative (n one's-complemented bytes follow).
155/// Byte-wise lex compare matches signed integer order.
156pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157    buf.push(tag);
158    if val == 0 {
159        buf.push(0x80);
160        return;
161    }
162    if val > 0 {
163        let bytes = val.to_be_bytes();
164        let start = bytes.iter().position(|&b| b != 0).unwrap();
165        let byte_count = (8 - start) as u8;
166        buf.push(0x80 + byte_count);
167        buf.extend_from_slice(&bytes[start..]);
168    } else {
169        let abs_val = if val == i64::MIN {
170            u64::MAX / 2 + 1
171        } else {
172            (-val) as u64
173        };
174        let bytes = abs_val.to_be_bytes();
175        let start = bytes.iter().position(|&b| b != 0).unwrap();
176        let byte_count = (8 - start) as u8;
177        buf.push(0x80 - byte_count);
178        for &b in &bytes[start..] {
179            buf.push(!b);
180        }
181    }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185    buf.push(TAG_REAL);
186    let bits = val.to_bits();
187    let encoded = if val.is_sign_negative() {
188        !bits
189    } else {
190        bits ^ (1u64 << 63)
191    };
192    buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196    buf.push(tag);
197    for &b in data {
198        if b == 0x00 {
199            buf.push(0x00);
200            buf.push(0xFF);
201        } else {
202            buf.push(b);
203        }
204    }
205    buf.push(0x00);
206}
207
208/// Decode a single key value, returning the value and the number of bytes consumed.
209pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210    if data.is_empty() {
211        return Err(SqlError::InvalidValue("empty key data".into()));
212    }
213    match data[0] {
214        TAG_NULL => Ok((Value::Null, 1)),
215        TAG_BOOLEAN => {
216            if data.len() < 2 {
217                return Err(SqlError::InvalidValue("truncated boolean".into()));
218            }
219            Ok((Value::Boolean(data[1] != 0), 2))
220        }
221        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226            (Value::Date(d), n + 1)
227        }),
228        TAG_TIMESTAMP => {
229            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230        }
231        TAG_INTERVAL => {
232            if data.len() < 1 + 16 {
233                return Err(SqlError::InvalidValue("truncated interval".into()));
234            }
235            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236            mb[0] ^= 0x80;
237            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238            db[0] ^= 0x80;
239            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240            ub[0] ^= 0x80;
241            Ok((
242                Value::Interval {
243                    months: i32::from_be_bytes(mb),
244                    days: i32::from_be_bytes(db),
245                    micros: i64::from_be_bytes(ub),
246                },
247                17,
248            ))
249        }
250        TAG_TEXT => {
251            let (bytes, n) = decode_null_escaped(&data[1..])?;
252            let s = String::from_utf8(bytes)
253                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254            Ok((Value::Text(CompactString::from(s)), n + 1))
255        }
256        TAG_BLOB => {
257            let (bytes, n) = decode_null_escaped(&data[1..])?;
258            Ok((Value::Blob(bytes), n + 1))
259        }
260        TAG_JSON => {
261            let (bytes, n) = decode_null_escaped(&data[1..])?;
262            let s = String::from_utf8(bytes)
263                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264            Ok((Value::Json(CompactString::from(s)), n + 1))
265        }
266        TAG_JSONB => {
267            let (bytes, n) = decode_null_escaped(&data[1..])?;
268            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269        }
270        TAG_TSVECTOR => {
271            let (bytes, n) = decode_null_escaped(&data[1..])?;
272            Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273        }
274        TAG_TSQUERY => {
275            let (bytes, n) = decode_null_escaped(&data[1..])?;
276            Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277        }
278        TAG_ARRAY => {
279            let (inner, n) = decode_null_escaped(&data[1..])?;
280            let mut elems = Vec::new();
281            let mut pos = 0;
282            while pos < inner.len() {
283                let (v, vlen) = decode_key_value(&inner[pos..])?;
284                elems.push(v);
285                pos += vlen;
286            }
287            Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288        }
289        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
290    }
291}
292
293/// Decode a composite key into multiple values.
294pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
295    let mut values = Vec::with_capacity(count);
296    let mut pos = 0;
297    for _ in 0..count {
298        let (v, n) = decode_key_value(&data[pos..])?;
299        values.push(v);
300        pos += n;
301    }
302    Ok(values)
303}
304
305fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
306    let (v, n) = decode_signed_varint(data)?;
307    Ok((Value::Integer(v), n))
308}
309
310/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
311pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
312    if data.is_empty() {
313        return Err(SqlError::InvalidValue("truncated integer".into()));
314    }
315    let marker = data[0];
316    if marker == 0x80 {
317        return Ok((0, 1));
318    }
319    if marker > 0x80 {
320        let byte_count = (marker - 0x80) as usize;
321        if data.len() < 1 + byte_count {
322            return Err(SqlError::InvalidValue("truncated positive integer".into()));
323        }
324        let mut bytes = [0u8; 8];
325        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
326        let val = i64::from_be_bytes(bytes);
327        Ok((val, 1 + byte_count))
328    } else {
329        let byte_count = (0x80 - marker) as usize;
330        if data.len() < 1 + byte_count {
331            return Err(SqlError::InvalidValue("truncated negative integer".into()));
332        }
333        let mut bytes = [0u8; 8];
334        for i in 0..byte_count {
335            bytes[8 - byte_count + i] = !data[1 + i];
336        }
337        let abs_val = u64::from_be_bytes(bytes);
338        let val = (-(abs_val as i128)) as i64;
339        Ok((val, 1 + byte_count))
340    }
341}
342
343fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
344    if data.len() < 8 {
345        return Err(SqlError::InvalidValue("truncated real".into()));
346    }
347    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
348    let bits = if encoded & (1u64 << 63) != 0 {
349        // Was positive: undo sign bit flip
350        encoded ^ (1u64 << 63)
351    } else {
352        // Was negative: undo full inversion
353        !encoded
354    };
355    let val = f64::from_bits(bits);
356    Ok((Value::Real(val), 8))
357}
358
359/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
360fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
361    let mut result = Vec::new();
362    let mut i = 0;
363    while i < data.len() {
364        if data[i] == 0x00 {
365            if i + 1 < data.len() && data[i + 1] == 0xFF {
366                result.push(0x00);
367                i += 2;
368            } else {
369                return Ok((result, i + 1)); // terminator consumed
370            }
371        } else {
372            result.push(data[i]);
373            i += 1;
374        }
375    }
376    Err(SqlError::InvalidValue(
377        "unterminated null-escaped string".into(),
378    ))
379}
380
381fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
382    match v {
383        Value::Integer(val) => {
384            buf.push(DataType::Integer.type_tag());
385            buf.extend_from_slice(&val.to_le_bytes());
386        }
387        Value::Real(r) => {
388            buf.push(DataType::Real.type_tag());
389            buf.extend_from_slice(&r.to_le_bytes());
390        }
391        Value::Boolean(b) => {
392            buf.push(DataType::Boolean.type_tag());
393            buf.push(if *b { 1 } else { 0 });
394        }
395        Value::Text(s) => {
396            let bytes = s.as_bytes();
397            buf.push(DataType::Text.type_tag());
398            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
399            buf.extend_from_slice(bytes);
400        }
401        Value::Blob(data) => {
402            buf.push(DataType::Blob.type_tag());
403            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
404            buf.extend_from_slice(data);
405        }
406        Value::Time(t) => {
407            buf.push(DataType::Time.type_tag());
408            buf.extend_from_slice(&t.to_le_bytes());
409        }
410        Value::Date(d) => {
411            buf.push(DataType::Date.type_tag());
412            buf.extend_from_slice(&d.to_le_bytes());
413        }
414        Value::Timestamp(t) => {
415            buf.push(DataType::Timestamp.type_tag());
416            buf.extend_from_slice(&t.to_le_bytes());
417        }
418        Value::Interval {
419            months,
420            days,
421            micros,
422        } => {
423            buf.push(DataType::Interval.type_tag());
424            buf.extend_from_slice(&months.to_le_bytes());
425            buf.extend_from_slice(&days.to_le_bytes());
426            buf.extend_from_slice(&micros.to_le_bytes());
427        }
428        Value::Json(s) => {
429            let bytes = s.as_bytes();
430            buf.push(DataType::Json.type_tag());
431            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
432            buf.extend_from_slice(bytes);
433        }
434        Value::Jsonb(b) => {
435            buf.push(DataType::Jsonb.type_tag());
436            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
437            buf.extend_from_slice(b);
438        }
439        Value::TsVector(b) => {
440            buf.push(DataType::TsVector.type_tag());
441            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
442            buf.extend_from_slice(b);
443        }
444        Value::TsQuery(b) => {
445            buf.push(DataType::TsQuery.type_tag());
446            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
447            buf.extend_from_slice(b);
448        }
449        Value::Array(a) => {
450            buf.push(DataType::Array.type_tag());
451            let len = encoded_array_v2_size(a);
452            buf.extend_from_slice(&(len as u32).to_le_bytes());
453            let start = buf.len();
454            buf.resize(start + len, 0);
455            write_array_v2_into_slice(a, &mut buf[start..start + len]);
456        }
457        Value::Vector(v) => {
458            buf.push(
459                DataType::Vector {
460                    dim: v.len() as u16,
461                }
462                .type_tag(),
463            );
464            let len = 2 + v.len() * 4;
465            buf.extend_from_slice(&(len as u32).to_le_bytes());
466            buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
467            for &x in v.iter() {
468                buf.extend_from_slice(&x.to_le_bytes());
469            }
470        }
471        Value::Null => unreachable!(),
472    }
473}
474
475pub fn encode_row(values: &[Value]) -> Vec<u8> {
476    let mut buf = Vec::new();
477    encode_row_into(values, &mut buf);
478    buf
479}
480
481pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
482    buf.clear();
483    let col_count = values.len();
484    let bitmap_bytes = col_count.div_ceil(8);
485
486    let header = (col_count as u16) | V2_FLAG;
487    buf.extend_from_slice(&header.to_le_bytes());
488
489    let bitmap_start = buf.len();
490    buf.resize(buf.len() + bitmap_bytes, 0);
491
492    for (i, v) in values.iter().enumerate() {
493        if v.is_null() {
494            buf[bitmap_start + i / 8] |= 1 << (i % 8);
495            continue;
496        }
497        encode_cell_v2(v, buf);
498    }
499}
500
501pub struct IntRowTemplate {
502    pub template: Vec<u8>,
503    pub slot_offsets: Vec<(usize, usize)>,
504}
505
506pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
507    let bitmap_bytes = phys_count.div_ceil(8);
508    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
509    let header = (phys_count as u16) | V2_FLAG;
510    template.extend_from_slice(&header.to_le_bytes());
511    let bitmap_start = template.len();
512    template.resize(bitmap_start + bitmap_bytes, 0);
513    for &i in null_slots {
514        template[bitmap_start + i / 8] |= 1 << (i % 8);
515    }
516    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
517    for slot in 0..phys_count {
518        if null_slots.contains(&slot) {
519            continue;
520        }
521        template.push(DataType::Integer.type_tag());
522        let value_offset = template.len();
523        template.extend_from_slice(&[0u8; 8]);
524        slot_offsets.push((slot, value_offset));
525    }
526    IntRowTemplate {
527        template,
528        slot_offsets,
529    }
530}
531
532/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
533#[inline]
534pub fn encode_int_row_with_template(
535    tmpl: &IntRowTemplate,
536    values: &[Value],
537    buf: &mut Vec<u8>,
538) -> Result<()> {
539    buf.clear();
540    buf.extend_from_slice(&tmpl.template);
541    for &(slot, off) in &tmpl.slot_offsets {
542        match &values[slot] {
543            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
544            other => {
545                return Err(SqlError::TypeMismatch {
546                    expected: "Integer".into(),
547                    got: other.data_type().to_string(),
548                });
549            }
550        }
551    }
552    Ok(())
553}
554
555fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
556    match DataType::from_tag(type_tag) {
557        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
558            data[..8].try_into().unwrap(),
559        ))),
560        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
561            data[..8].try_into().unwrap(),
562        ))),
563        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
564        Some(DataType::Text) => {
565            let s = std::str::from_utf8(data)
566                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
567            Ok(Value::Text(CompactString::from(s)))
568        }
569        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
570        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
571            data[..8].try_into().unwrap(),
572        ))),
573        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
574            data[..4].try_into().unwrap(),
575        ))),
576        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
577            data[..8].try_into().unwrap(),
578        ))),
579        Some(DataType::Interval) => {
580            if data.len() < 16 {
581                return Err(SqlError::InvalidValue("truncated interval".into()));
582            }
583            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
584            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
585            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
586            Ok(Value::Interval {
587                months,
588                days,
589                micros,
590            })
591        }
592        Some(DataType::Json) => {
593            let s = std::str::from_utf8(data)
594                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
595            Ok(Value::Json(CompactString::from(s)))
596        }
597        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
598        Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
599        Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
600        Some(DataType::Array) => decode_array_v2(data),
601        Some(DataType::Vector { .. }) => decode_vector(data),
602        _ => Err(SqlError::InvalidValue(format!(
603            "unknown column type tag: {type_tag}"
604        ))),
605    }
606}
607
608fn decode_vector(data: &[u8]) -> Result<Value> {
609    if data.len() < 2 {
610        return Err(SqlError::InvalidValue("truncated vector".into()));
611    }
612    let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
613    if data.len() < 2 + dim * 4 {
614        return Err(SqlError::InvalidValue("truncated vector payload".into()));
615    }
616    let mut v = Vec::with_capacity(dim);
617    for i in 0..dim {
618        let off = 2 + i * 4;
619        v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
620    }
621    Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
622}
623
624fn encoded_array_v2_size(elems: &[Value]) -> usize {
625    let mut total = 4;
626    for elem in elems {
627        if elem.is_null() {
628            total += 1;
629            continue;
630        }
631        total += 1 + 1;
632        let tag = elem.data_type().type_tag();
633        match fixed_width_size(tag) {
634            Some(n) => total += n,
635            None => total += 4 + variable_cell_payload_size(elem),
636        }
637    }
638    total
639}
640
641fn variable_cell_payload_size(v: &Value) -> usize {
642    match v {
643        Value::Text(s) => s.len(),
644        Value::Blob(b) => b.len(),
645        Value::Json(s) => s.len(),
646        Value::Jsonb(b) => b.len(),
647        Value::TsVector(b) => b.len(),
648        Value::TsQuery(b) => b.len(),
649        Value::Array(a) => encoded_array_v2_size(a),
650        Value::Vector(v) => 2 + v.len() * 4,
651        _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
652    }
653}
654
655fn value_encoded_size_v2(v: &Value) -> Option<usize> {
656    if v.is_null() {
657        return None;
658    }
659    Some(match fixed_width_size(v.data_type().type_tag()) {
660        Some(n) => n,
661        None => variable_cell_payload_size(v),
662    })
663}
664
665fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
666    match v {
667        Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
668        Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
669        Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
670        Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
671        Value::Blob(b) => out[..b.len()].copy_from_slice(b),
672        Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
673        Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
674        Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
675        Value::Interval {
676            months,
677            days,
678            micros,
679        } => {
680            out[..4].copy_from_slice(&months.to_le_bytes());
681            out[4..8].copy_from_slice(&days.to_le_bytes());
682            out[8..16].copy_from_slice(&micros.to_le_bytes());
683        }
684        Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
685        Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
686        Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
687        Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
688        Value::Array(a) => write_array_v2_into_slice(a, out),
689        Value::Vector(v) => {
690            out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
691            let mut pos = 2;
692            for &x in v.iter() {
693                out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
694                pos += 4;
695            }
696        }
697        Value::Null => unreachable!(),
698    }
699}
700
701fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
702    out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
703    let mut pos = 4;
704    for elem in elems {
705        if elem.is_null() {
706            out[pos] = 0xFF;
707            pos += 1;
708            continue;
709        }
710        out[pos] = 0x00;
711        pos += 1;
712        let tag = elem.data_type().type_tag();
713        out[pos] = tag;
714        pos += 1;
715        match fixed_width_size(tag) {
716            Some(n) => {
717                write_value_payload_v2(elem, &mut out[pos..pos + n]);
718                pos += n;
719            }
720            None => {
721                let payload_len = variable_cell_payload_size(elem);
722                out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
723                pos += 4;
724                write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
725                pos += payload_len;
726            }
727        }
728    }
729}
730
731fn decode_array_v2(data: &[u8]) -> Result<Value> {
732    if data.len() < 4 {
733        return Err(SqlError::InvalidValue("truncated array length".into()));
734    }
735    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
736    let mut pos = 4;
737    let mut elems = Vec::with_capacity(count);
738    for _ in 0..count {
739        if pos >= data.len() {
740            return Err(SqlError::InvalidValue("truncated array elements".into()));
741        }
742        if data[pos] == 0xFF {
743            elems.push(Value::Null);
744            pos += 1;
745            continue;
746        }
747        if data[pos] != 0x00 {
748            return Err(SqlError::InvalidValue(
749                "invalid array element marker".into(),
750            ));
751        }
752        pos += 1;
753        if pos >= data.len() {
754            return Err(SqlError::InvalidValue("truncated array element".into()));
755        }
756        let type_tag = data[pos];
757        pos += 1;
758        let (val, advance) = match fixed_width_size(type_tag) {
759            Some(n) => {
760                if pos + n > data.len() {
761                    return Err(SqlError::InvalidValue(
762                        "truncated fixed-width array element".into(),
763                    ));
764                }
765                let v = decode_value(type_tag, &data[pos..pos + n])?;
766                (v, n)
767            }
768            None => {
769                if pos + 4 > data.len() {
770                    return Err(SqlError::InvalidValue(
771                        "truncated array element length".into(),
772                    ));
773                }
774                let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
775                pos += 4;
776                if pos + len > data.len() {
777                    return Err(SqlError::InvalidValue(
778                        "truncated variable-width array element".into(),
779                    ));
780                }
781                let v = decode_value(type_tag, &data[pos..pos + len])?;
782                (v, len)
783            }
784        };
785        pos += advance;
786        elems.push(val);
787    }
788    Ok(Value::Array(std::sync::Arc::new(elems)))
789}
790
791/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
792/// High bit of `col_count:u16` flags V2.
793#[derive(Clone, Copy, PartialEq, Eq, Debug)]
794pub(crate) enum RowVersion {
795    V1,
796    V2,
797}
798
799pub(crate) const V2_FLAG: u16 = 0x8000;
800pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
801
802#[inline]
803pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
804    match DataType::from_tag(type_tag)? {
805        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
806        DataType::Date => Some(4),
807        DataType::Boolean => Some(1),
808        DataType::Interval => Some(16),
809        DataType::Text
810        | DataType::Blob
811        | DataType::Json
812        | DataType::Jsonb
813        | DataType::TsVector
814        | DataType::TsQuery
815        | DataType::Array
816        | DataType::Vector { .. }
817        | DataType::Null => None,
818    }
819}
820
821#[inline]
822fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
823    if pos >= data.len() {
824        return Err(SqlError::InvalidValue("truncated column data".into()));
825    }
826    let type_tag = data[pos];
827    let after_tag = pos + 1;
828    let (data_len, body_pos) = match version {
829        RowVersion::V2 => match fixed_width_size(type_tag) {
830            Some(n) => (n, after_tag),
831            None => {
832                if after_tag + 4 > data.len() {
833                    return Err(SqlError::InvalidValue("truncated column data".into()));
834                }
835                let len = u32::from_le_bytes([
836                    data[after_tag],
837                    data[after_tag + 1],
838                    data[after_tag + 2],
839                    data[after_tag + 3],
840                ]) as usize;
841                (len, after_tag + 4)
842            }
843        },
844        RowVersion::V1 => {
845            if after_tag + 4 > data.len() {
846                return Err(SqlError::InvalidValue("truncated column data".into()));
847            }
848            let len = u32::from_le_bytes([
849                data[after_tag],
850                data[after_tag + 1],
851                data[after_tag + 2],
852                data[after_tag + 3],
853            ]) as usize;
854            (len, after_tag + 4)
855        }
856    };
857    if body_pos + data_len > data.len() {
858        return Err(SqlError::InvalidValue("truncated column value".into()));
859    }
860    Ok((
861        type_tag,
862        &data[body_pos..body_pos + data_len],
863        body_pos + data_len,
864    ))
865}
866
867#[inline]
868fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
869    let (_, _, next) = read_cell(data, pos, version)?;
870    Ok(next)
871}
872
873fn copy_cell_to_v2(
874    data: &[u8],
875    pos: usize,
876    version: RowVersion,
877    out: &mut Vec<u8>,
878) -> Result<usize> {
879    let (tag, body, next) = read_cell(data, pos, version)?;
880    out.push(tag);
881    if fixed_width_size(tag).is_none() {
882        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
883    }
884    out.extend_from_slice(body);
885    Ok(next)
886}
887
888fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
889    if data.len() < 2 {
890        return Err(SqlError::InvalidValue("row data too short".into()));
891    }
892    let raw = u16::from_le_bytes([data[0], data[1]]);
893    let version = if raw & V2_FLAG != 0 {
894        RowVersion::V2
895    } else {
896        RowVersion::V1
897    };
898    let col_count = (raw & COL_COUNT_MASK) as usize;
899    let bitmap_bytes = col_count.div_ceil(8);
900    let pos = 2;
901    if data.len() < pos + bitmap_bytes {
902        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
903    }
904    Ok((
905        version,
906        col_count,
907        &data[pos..pos + bitmap_bytes],
908        pos + bitmap_bytes,
909    ))
910}
911
912pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
913    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
914
915    let mut values = Vec::with_capacity(col_count);
916    for i in 0..col_count {
917        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
918            values.push(Value::Null);
919            continue;
920        }
921        let (type_tag, body, next) = read_cell(data, pos, version)?;
922        values.push(decode_value(type_tag, body)?);
923        pos = next;
924    }
925
926    Ok(values)
927}
928
929/// Returns the number of non-PK columns stored in a row value blob.
930#[inline]
931pub fn row_non_pk_count(data: &[u8]) -> usize {
932    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
933}
934
935pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
936    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
937
938    for i in 0..col_count {
939        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
940            continue;
941        }
942        let (type_tag, body, next) = read_cell(data, pos, version)?;
943        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
944            out[col_mapping[i]] = decode_value(type_tag, body)?;
945        }
946        pos = next;
947    }
948
949    Ok(())
950}
951
952pub fn decode_pk_into(
953    key: &[u8],
954    count: usize,
955    out: &mut [Value],
956    pk_mapping: &[usize],
957) -> Result<()> {
958    let mut pos = 0;
959    for i in 0..count {
960        let (v, n) = decode_key_value(&key[pos..])?;
961        if i < pk_mapping.len() {
962            out[pk_mapping[i]] = v;
963        }
964        pos += n;
965    }
966    Ok(())
967}
968
969pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
970    if targets.is_empty() {
971        return Ok(Vec::new());
972    }
973    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
974
975    let mut results = Vec::with_capacity(targets.len());
976    let mut ti = 0;
977
978    for col in 0..col_count {
979        if ti >= targets.len() {
980            break;
981        }
982        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
983
984        if col == targets[ti] {
985            if is_null {
986                results.push(Value::Null);
987            } else {
988                let (type_tag, body, next) = read_cell(data, pos, version)?;
989                results.push(decode_value(type_tag, body)?);
990                pos = next;
991            }
992            ti += 1;
993        } else if !is_null {
994            pos = skip_cell(data, pos, version)?;
995        }
996    }
997
998    while ti < targets.len() {
999        results.push(Value::Null);
1000        ti += 1;
1001    }
1002
1003    Ok(results)
1004}
1005
1006pub fn decode_columns_into(
1007    data: &[u8],
1008    targets: &[usize],
1009    schema_cols: &[usize],
1010    row: &mut [Value],
1011) -> Result<()> {
1012    if targets.is_empty() {
1013        return Ok(());
1014    }
1015    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1016
1017    let mut ti = 0;
1018    for col in 0..col_count {
1019        if ti >= targets.len() {
1020            break;
1021        }
1022        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1023
1024        if col == targets[ti] {
1025            if is_null {
1026                row[schema_cols[ti]] = Value::Null;
1027            } else {
1028                let (type_tag, body, next) = read_cell(data, pos, version)?;
1029                row[schema_cols[ti]] = decode_value(type_tag, body)?;
1030                pos = next;
1031            }
1032            ti += 1;
1033        } else if !is_null {
1034            pos = skip_cell(data, pos, version)?;
1035        }
1036    }
1037
1038    Ok(())
1039}
1040
1041#[derive(Debug, Clone, Copy)]
1042pub enum RawColumn<'a> {
1043    Null,
1044    Integer(i64),
1045    Real(f64),
1046    Boolean(bool),
1047    Text(&'a str),
1048    Blob(&'a [u8]),
1049    Time(i64),
1050    Date(i32),
1051    Timestamp(i64),
1052    Interval { months: i32, days: i32, micros: i64 },
1053    Json(&'a str),
1054    Jsonb(&'a [u8]),
1055    TsVector(&'a [u8]),
1056    TsQuery(&'a [u8]),
1057    Array(&'a [u8]),
1058    Vector(&'a [u8]),
1059}
1060
1061impl<'a> RawColumn<'a> {
1062    pub fn to_value(self) -> Value {
1063        match self {
1064            RawColumn::Null => Value::Null,
1065            RawColumn::Integer(i) => Value::Integer(i),
1066            RawColumn::Real(r) => Value::Real(r),
1067            RawColumn::Boolean(b) => Value::Boolean(b),
1068            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1069            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1070            RawColumn::Time(t) => Value::Time(t),
1071            RawColumn::Date(d) => Value::Date(d),
1072            RawColumn::Timestamp(t) => Value::Timestamp(t),
1073            RawColumn::Interval {
1074                months,
1075                days,
1076                micros,
1077            } => Value::Interval {
1078                months,
1079                days,
1080                micros,
1081            },
1082            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1083            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1084            RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1085            RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1086            RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1087            RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1088        }
1089    }
1090
1091    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1092        use std::cmp::Ordering;
1093        match (self, other) {
1094            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1095            (RawColumn::Null, _) | (_, Value::Null) => None,
1096            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1097            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1098            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1099            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1100            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1101            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1102            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1103            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1104            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1105            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1106            (
1107                RawColumn::Interval {
1108                    months: am,
1109                    days: ad,
1110                    micros: au,
1111                },
1112                Value::Interval {
1113                    months: bm,
1114                    days: bd,
1115                    micros: bu,
1116                },
1117            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1118            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1119            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1120            (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1121            (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1122            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1123                Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1124                _ => None,
1125            },
1126            _ => None,
1127        }
1128    }
1129
1130    pub fn eq_value(&self, other: &Value) -> bool {
1131        match (self, other) {
1132            (RawColumn::Null, Value::Null) => true,
1133            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1134            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1135            (RawColumn::Real(a), Value::Real(b)) => a == b,
1136            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1137            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1138            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1139            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1140            (RawColumn::Time(a), Value::Time(b)) => a == b,
1141            (RawColumn::Date(a), Value::Date(b)) => a == b,
1142            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1143            (
1144                RawColumn::Interval {
1145                    months: am,
1146                    days: ad,
1147                    micros: au,
1148                },
1149                Value::Interval {
1150                    months: bm,
1151                    days: bd,
1152                    micros: bu,
1153                },
1154            ) => am == bm && ad == bd && au == bu,
1155            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1156            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1157            (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1158            (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1159            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1160                Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1161                _ => false,
1162            },
1163            _ => false,
1164        }
1165    }
1166
1167    pub fn as_f64(&self) -> Option<f64> {
1168        match self {
1169            RawColumn::Integer(i) => Some(*i as f64),
1170            RawColumn::Real(r) => Some(*r),
1171            _ => None,
1172        }
1173    }
1174
1175    pub fn as_i64(&self) -> Option<i64> {
1176        match self {
1177            RawColumn::Integer(i) => Some(*i),
1178            RawColumn::Time(t) => Some(*t),
1179            RawColumn::Date(d) => Some(*d as i64),
1180            RawColumn::Timestamp(t) => Some(*t),
1181            _ => None,
1182        }
1183    }
1184}
1185
1186fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1187    match DataType::from_tag(type_tag) {
1188        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1189            data[..8].try_into().unwrap(),
1190        ))),
1191        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1192            data[..8].try_into().unwrap(),
1193        ))),
1194        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1195        Some(DataType::Text) => {
1196            let s = std::str::from_utf8(data)
1197                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1198            Ok(RawColumn::Text(s))
1199        }
1200        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1201        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1202            data[..8].try_into().unwrap(),
1203        ))),
1204        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1205            data[..4].try_into().unwrap(),
1206        ))),
1207        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1208            data[..8].try_into().unwrap(),
1209        ))),
1210        Some(DataType::Interval) => {
1211            if data.len() < 16 {
1212                return Err(SqlError::InvalidValue("truncated interval".into()));
1213            }
1214            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1215            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1216            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1217            Ok(RawColumn::Interval {
1218                months,
1219                days,
1220                micros,
1221            })
1222        }
1223        Some(DataType::Json) => {
1224            let s = std::str::from_utf8(data)
1225                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1226            Ok(RawColumn::Json(s))
1227        }
1228        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1229        Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1230        Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1231        Some(DataType::Array) => Ok(RawColumn::Array(data)),
1232        Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1233        _ => Err(SqlError::InvalidValue(format!(
1234            "unknown column type tag: {type_tag}"
1235        ))),
1236    }
1237}
1238
1239/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
1240pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1241    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1242    if target >= col_count || new_val.is_null() {
1243        return Ok(false);
1244    }
1245    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1246    if was_null {
1247        return Ok(false);
1248    }
1249    for col in 0..target {
1250        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1251        if !is_null {
1252            pos = skip_cell(data, pos, version)?;
1253        }
1254    }
1255    let type_tag = data[pos];
1256    let (old_data_len, val_start) = match version {
1257        RowVersion::V2 => match fixed_width_size(type_tag) {
1258            Some(n) => (n, pos + 1),
1259            None => {
1260                if pos + 5 > data.len() {
1261                    return Err(SqlError::InvalidValue("truncated column data".into()));
1262                }
1263                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1264                (len, pos + 5)
1265            }
1266        },
1267        RowVersion::V1 => {
1268            if pos + 5 > data.len() {
1269                return Err(SqlError::InvalidValue("truncated column data".into()));
1270            }
1271            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1272            (len, pos + 5)
1273        }
1274    };
1275    let new_data_len = match value_encoded_size_v2(new_val) {
1276        Some(n) => n,
1277        None => return Ok(false),
1278    };
1279    if new_data_len != old_data_len {
1280        return Ok(false);
1281    }
1282    data[pos] = new_val.data_type().type_tag();
1283    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1284    Ok(true)
1285}
1286
1287/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1288pub fn patch_row_column(
1289    data: &[u8],
1290    target: usize,
1291    new_val: &Value,
1292    out: &mut Vec<u8>,
1293) -> Result<()> {
1294    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1295
1296    let new_col_count = if target >= col_count {
1297        target + 1
1298    } else {
1299        col_count
1300    };
1301    let new_bitmap_bytes = new_col_count.div_ceil(8);
1302    let bitmap_bytes = col_count.div_ceil(8);
1303    out.clear();
1304
1305    let header = (new_col_count as u16) | V2_FLAG;
1306    out.extend_from_slice(&header.to_le_bytes());
1307    let bitmap_start = out.len();
1308    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1309    for _ in bitmap_bytes..new_bitmap_bytes {
1310        out.push(0xFF);
1311    }
1312    if new_val.is_null() {
1313        out[bitmap_start + target / 8] |= 1 << (target % 8);
1314    } else {
1315        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1316    }
1317
1318    let mut pos = header_end;
1319    for col in 0..new_col_count {
1320        let was_null = if col < col_count {
1321            bitmap[col / 8] & (1 << (col % 8)) != 0
1322        } else {
1323            true
1324        };
1325
1326        if col == target {
1327            if !was_null {
1328                pos = skip_cell(data, pos, version)?;
1329            }
1330            if !new_val.is_null() {
1331                encode_cell_v2(new_val, out);
1332            }
1333        } else if !was_null {
1334            pos = copy_cell_to_v2(data, pos, version, out)?;
1335        }
1336    }
1337    Ok(())
1338}
1339
1340pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1341    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1342    if target >= col_count {
1343        return Ok(RawColumn::Null);
1344    }
1345
1346    for col in 0..=target {
1347        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1348
1349        if col == target {
1350            if is_null {
1351                return Ok(RawColumn::Null);
1352            }
1353            let (type_tag, body, _) = read_cell(data, pos, version)?;
1354            return decode_value_raw(type_tag, body);
1355        } else if !is_null {
1356            pos = skip_cell(data, pos, version)?;
1357        }
1358    }
1359
1360    unreachable!()
1361}
1362
1363/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1364pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1365    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1366    if target >= col_count {
1367        return Ok((RawColumn::Null, usize::MAX));
1368    }
1369
1370    for col in 0..=target {
1371        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1372
1373        if col == target {
1374            if is_null {
1375                return Ok((RawColumn::Null, usize::MAX));
1376            }
1377            let tag_offset = pos;
1378            let (type_tag, body, _) = read_cell(data, pos, version)?;
1379            let raw = decode_value_raw(type_tag, body)?;
1380            return Ok((raw, tag_offset));
1381        } else if !is_null {
1382            pos = skip_cell(data, pos, version)?;
1383        }
1384    }
1385
1386    unreachable!()
1387}
1388
1389/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1390pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1391    if offset == usize::MAX || new_val.is_null() {
1392        return Ok(false);
1393    }
1394    if data.len() < 2 || offset >= data.len() {
1395        return Err(SqlError::InvalidValue("truncated column data".into()));
1396    }
1397    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1398        RowVersion::V2
1399    } else {
1400        RowVersion::V1
1401    };
1402    let type_tag = data[offset];
1403    let (old_data_len, val_start) = match version {
1404        RowVersion::V2 => match fixed_width_size(type_tag) {
1405            Some(n) => (n, offset + 1),
1406            None => {
1407                if offset + 5 > data.len() {
1408                    return Err(SqlError::InvalidValue("truncated column data".into()));
1409                }
1410                let len =
1411                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1412                (len, offset + 5)
1413            }
1414        },
1415        RowVersion::V1 => {
1416            if offset + 5 > data.len() {
1417                return Err(SqlError::InvalidValue("truncated column data".into()));
1418            }
1419            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1420            (len, offset + 5)
1421        }
1422    };
1423    let new_data_len = match value_encoded_size_v2(new_val) {
1424        Some(n) => n,
1425        None => return Ok(false),
1426    };
1427    if new_data_len != old_data_len {
1428        return Ok(false);
1429    }
1430    data[offset] = new_val.data_type().type_tag();
1431    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1432    Ok(true)
1433}
1434
1435pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1436    if key.is_empty() || key[0] != TAG_INTEGER {
1437        return Err(SqlError::InvalidValue("not an integer key".into()));
1438    }
1439    let (val, _) = decode_integer(&key[1..])?;
1440    match val {
1441        Value::Integer(i) => Ok(i),
1442        _ => unreachable!(),
1443    }
1444}
1445
1446#[cfg(test)]
1447#[path = "encoding_tests.rs"]
1448mod tests;