Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22
23/// Encode a single value into an order-preserving byte sequence.
24pub fn encode_key_value(value: &Value) -> Vec<u8> {
25    let mut buf = Vec::with_capacity(16);
26    encode_key_value_into(value, &mut buf);
27    buf
28}
29
30/// Encode a composite key (multiple values concatenated).
31pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
32    let mut buf = Vec::new();
33    for v in values {
34        buf.extend_from_slice(&encode_key_value(v));
35    }
36    buf
37}
38
39pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
40    buf.clear();
41    for v in values {
42        encode_key_value_into(v, buf);
43    }
44}
45
46pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
47    buf.clear();
48    for &i in indices {
49        encode_key_value_into(&row[i as usize], buf);
50    }
51}
52
53#[inline]
54pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
55    buf.clear();
56    encode_signed_varint(TAG_INTEGER, val, buf);
57}
58
59pub(crate) fn encode_key_value_collated_into(
60    value: &Value,
61    coll: crate::types::Collation,
62    buf: &mut Vec<u8>,
63) {
64    match (value, coll) {
65        (Value::Text(s), crate::types::Collation::NoCase) => {
66            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
67        }
68        (Value::Text(s), crate::types::Collation::Rtrim) => {
69            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
70        }
71        _ => encode_key_value_into(value, buf),
72    }
73}
74
75pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
76    match value {
77        Value::Null => buf.push(TAG_NULL),
78        Value::Boolean(b) => {
79            buf.push(TAG_BOOLEAN);
80            buf.push(if *b { 0x01 } else { 0x00 });
81        }
82        Value::Integer(i) => encode_integer_into(*i, buf),
83        Value::Real(r) => encode_real_into(*r, buf),
84        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
85        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
86        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
87        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
88        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
89        Value::Interval {
90            months,
91            days,
92            micros,
93        } => {
94            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
95            buf.push(TAG_INTERVAL);
96            let mut mb = months.to_be_bytes();
97            mb[0] ^= 0x80;
98            buf.extend_from_slice(&mb);
99            let mut db = days.to_be_bytes();
100            db[0] ^= 0x80;
101            buf.extend_from_slice(&db);
102            let mut ub = micros.to_be_bytes();
103            ub[0] ^= 0x80;
104            buf.extend_from_slice(&ub);
105        }
106        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
107        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
108        Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
109        Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
110        Value::Array(a) => encode_array_into(a, buf),
111    }
112}
113
114fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
115    buf.push(TAG_ARRAY);
116    let mut inner = Vec::new();
117    for v in elems {
118        encode_key_value_into(v, &mut inner);
119    }
120    encode_bytes_into_no_tag(&inner, buf);
121}
122
123fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
124    for &b in data {
125        if b == 0x00 {
126            buf.push(0x00);
127            buf.push(0xFF);
128        } else {
129            buf.push(b);
130        }
131    }
132    buf.push(0x00);
133}
134
135fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
136    encode_signed_varint(TAG_INTEGER, val, buf);
137}
138
139/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
140/// Layout: [tag] [marker] [data bytes].
141/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
142/// 0x80-n for negative (n one's-complemented bytes follow).
143/// Byte-wise lex compare matches signed integer order.
144pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
145    buf.push(tag);
146    if val == 0 {
147        buf.push(0x80);
148        return;
149    }
150    if val > 0 {
151        let bytes = val.to_be_bytes();
152        let start = bytes.iter().position(|&b| b != 0).unwrap();
153        let byte_count = (8 - start) as u8;
154        buf.push(0x80 + byte_count);
155        buf.extend_from_slice(&bytes[start..]);
156    } else {
157        let abs_val = if val == i64::MIN {
158            u64::MAX / 2 + 1
159        } else {
160            (-val) as u64
161        };
162        let bytes = abs_val.to_be_bytes();
163        let start = bytes.iter().position(|&b| b != 0).unwrap();
164        let byte_count = (8 - start) as u8;
165        buf.push(0x80 - byte_count);
166        for &b in &bytes[start..] {
167            buf.push(!b);
168        }
169    }
170}
171
172fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
173    buf.push(TAG_REAL);
174    let bits = val.to_bits();
175    let encoded = if val.is_sign_negative() {
176        !bits
177    } else {
178        bits ^ (1u64 << 63)
179    };
180    buf.extend_from_slice(&encoded.to_be_bytes());
181}
182
183fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
184    buf.push(tag);
185    for &b in data {
186        if b == 0x00 {
187            buf.push(0x00);
188            buf.push(0xFF);
189        } else {
190            buf.push(b);
191        }
192    }
193    buf.push(0x00);
194}
195
196/// Decode a single key value, returning the value and the number of bytes consumed.
197pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
198    if data.is_empty() {
199        return Err(SqlError::InvalidValue("empty key data".into()));
200    }
201    match data[0] {
202        TAG_NULL => Ok((Value::Null, 1)),
203        TAG_BOOLEAN => {
204            if data.len() < 2 {
205                return Err(SqlError::InvalidValue("truncated boolean".into()));
206            }
207            Ok((Value::Boolean(data[1] != 0), 2))
208        }
209        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
210        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
211        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
212        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
213            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
214            (Value::Date(d), n + 1)
215        }),
216        TAG_TIMESTAMP => {
217            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
218        }
219        TAG_INTERVAL => {
220            if data.len() < 1 + 16 {
221                return Err(SqlError::InvalidValue("truncated interval".into()));
222            }
223            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
224            mb[0] ^= 0x80;
225            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
226            db[0] ^= 0x80;
227            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
228            ub[0] ^= 0x80;
229            Ok((
230                Value::Interval {
231                    months: i32::from_be_bytes(mb),
232                    days: i32::from_be_bytes(db),
233                    micros: i64::from_be_bytes(ub),
234                },
235                17,
236            ))
237        }
238        TAG_TEXT => {
239            let (bytes, n) = decode_null_escaped(&data[1..])?;
240            let s = String::from_utf8(bytes)
241                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
242            Ok((Value::Text(CompactString::from(s)), n + 1))
243        }
244        TAG_BLOB => {
245            let (bytes, n) = decode_null_escaped(&data[1..])?;
246            Ok((Value::Blob(bytes), n + 1))
247        }
248        TAG_JSON => {
249            let (bytes, n) = decode_null_escaped(&data[1..])?;
250            let s = String::from_utf8(bytes)
251                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
252            Ok((Value::Json(CompactString::from(s)), n + 1))
253        }
254        TAG_JSONB => {
255            let (bytes, n) = decode_null_escaped(&data[1..])?;
256            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
257        }
258        TAG_TSVECTOR => {
259            let (bytes, n) = decode_null_escaped(&data[1..])?;
260            Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
261        }
262        TAG_TSQUERY => {
263            let (bytes, n) = decode_null_escaped(&data[1..])?;
264            Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
265        }
266        TAG_ARRAY => {
267            let (inner, n) = decode_null_escaped(&data[1..])?;
268            let mut elems = Vec::new();
269            let mut pos = 0;
270            while pos < inner.len() {
271                let (v, vlen) = decode_key_value(&inner[pos..])?;
272                elems.push(v);
273                pos += vlen;
274            }
275            Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
276        }
277        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
278    }
279}
280
281/// Decode a composite key into multiple values.
282pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
283    let mut values = Vec::with_capacity(count);
284    let mut pos = 0;
285    for _ in 0..count {
286        let (v, n) = decode_key_value(&data[pos..])?;
287        values.push(v);
288        pos += n;
289    }
290    Ok(values)
291}
292
293fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
294    let (v, n) = decode_signed_varint(data)?;
295    Ok((Value::Integer(v), n))
296}
297
298/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
299pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
300    if data.is_empty() {
301        return Err(SqlError::InvalidValue("truncated integer".into()));
302    }
303    let marker = data[0];
304    if marker == 0x80 {
305        return Ok((0, 1));
306    }
307    if marker > 0x80 {
308        let byte_count = (marker - 0x80) as usize;
309        if data.len() < 1 + byte_count {
310            return Err(SqlError::InvalidValue("truncated positive integer".into()));
311        }
312        let mut bytes = [0u8; 8];
313        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
314        let val = i64::from_be_bytes(bytes);
315        Ok((val, 1 + byte_count))
316    } else {
317        let byte_count = (0x80 - marker) as usize;
318        if data.len() < 1 + byte_count {
319            return Err(SqlError::InvalidValue("truncated negative integer".into()));
320        }
321        let mut bytes = [0u8; 8];
322        for i in 0..byte_count {
323            bytes[8 - byte_count + i] = !data[1 + i];
324        }
325        let abs_val = u64::from_be_bytes(bytes);
326        let val = (-(abs_val as i128)) as i64;
327        Ok((val, 1 + byte_count))
328    }
329}
330
331fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
332    if data.len() < 8 {
333        return Err(SqlError::InvalidValue("truncated real".into()));
334    }
335    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
336    let bits = if encoded & (1u64 << 63) != 0 {
337        // Was positive: undo sign bit flip
338        encoded ^ (1u64 << 63)
339    } else {
340        // Was negative: undo full inversion
341        !encoded
342    };
343    let val = f64::from_bits(bits);
344    Ok((Value::Real(val), 8))
345}
346
347/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
348fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
349    let mut result = Vec::new();
350    let mut i = 0;
351    while i < data.len() {
352        if data[i] == 0x00 {
353            if i + 1 < data.len() && data[i + 1] == 0xFF {
354                result.push(0x00);
355                i += 2;
356            } else {
357                return Ok((result, i + 1)); // terminator consumed
358            }
359        } else {
360            result.push(data[i]);
361            i += 1;
362        }
363    }
364    Err(SqlError::InvalidValue(
365        "unterminated null-escaped string".into(),
366    ))
367}
368
369fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
370    match v {
371        Value::Integer(val) => {
372            buf.push(DataType::Integer.type_tag());
373            buf.extend_from_slice(&val.to_le_bytes());
374        }
375        Value::Real(r) => {
376            buf.push(DataType::Real.type_tag());
377            buf.extend_from_slice(&r.to_le_bytes());
378        }
379        Value::Boolean(b) => {
380            buf.push(DataType::Boolean.type_tag());
381            buf.push(if *b { 1 } else { 0 });
382        }
383        Value::Text(s) => {
384            let bytes = s.as_bytes();
385            buf.push(DataType::Text.type_tag());
386            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
387            buf.extend_from_slice(bytes);
388        }
389        Value::Blob(data) => {
390            buf.push(DataType::Blob.type_tag());
391            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
392            buf.extend_from_slice(data);
393        }
394        Value::Time(t) => {
395            buf.push(DataType::Time.type_tag());
396            buf.extend_from_slice(&t.to_le_bytes());
397        }
398        Value::Date(d) => {
399            buf.push(DataType::Date.type_tag());
400            buf.extend_from_slice(&d.to_le_bytes());
401        }
402        Value::Timestamp(t) => {
403            buf.push(DataType::Timestamp.type_tag());
404            buf.extend_from_slice(&t.to_le_bytes());
405        }
406        Value::Interval {
407            months,
408            days,
409            micros,
410        } => {
411            buf.push(DataType::Interval.type_tag());
412            buf.extend_from_slice(&months.to_le_bytes());
413            buf.extend_from_slice(&days.to_le_bytes());
414            buf.extend_from_slice(&micros.to_le_bytes());
415        }
416        Value::Json(s) => {
417            let bytes = s.as_bytes();
418            buf.push(DataType::Json.type_tag());
419            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
420            buf.extend_from_slice(bytes);
421        }
422        Value::Jsonb(b) => {
423            buf.push(DataType::Jsonb.type_tag());
424            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
425            buf.extend_from_slice(b);
426        }
427        Value::TsVector(b) => {
428            buf.push(DataType::TsVector.type_tag());
429            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
430            buf.extend_from_slice(b);
431        }
432        Value::TsQuery(b) => {
433            buf.push(DataType::TsQuery.type_tag());
434            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
435            buf.extend_from_slice(b);
436        }
437        Value::Array(a) => {
438            buf.push(DataType::Array.type_tag());
439            let len = encoded_array_v2_size(a);
440            buf.extend_from_slice(&(len as u32).to_le_bytes());
441            let start = buf.len();
442            buf.resize(start + len, 0);
443            write_array_v2_into_slice(a, &mut buf[start..start + len]);
444        }
445        Value::Null => unreachable!(),
446    }
447}
448
449pub fn encode_row(values: &[Value]) -> Vec<u8> {
450    let mut buf = Vec::new();
451    encode_row_into(values, &mut buf);
452    buf
453}
454
455pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
456    buf.clear();
457    let col_count = values.len();
458    let bitmap_bytes = col_count.div_ceil(8);
459
460    let header = (col_count as u16) | V2_FLAG;
461    buf.extend_from_slice(&header.to_le_bytes());
462
463    let bitmap_start = buf.len();
464    buf.resize(buf.len() + bitmap_bytes, 0);
465
466    for (i, v) in values.iter().enumerate() {
467        if v.is_null() {
468            buf[bitmap_start + i / 8] |= 1 << (i % 8);
469            continue;
470        }
471        encode_cell_v2(v, buf);
472    }
473}
474
475pub struct IntRowTemplate {
476    pub template: Vec<u8>,
477    pub slot_offsets: Vec<(usize, usize)>,
478}
479
480pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
481    let bitmap_bytes = phys_count.div_ceil(8);
482    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
483    let header = (phys_count as u16) | V2_FLAG;
484    template.extend_from_slice(&header.to_le_bytes());
485    let bitmap_start = template.len();
486    template.resize(bitmap_start + bitmap_bytes, 0);
487    for &i in null_slots {
488        template[bitmap_start + i / 8] |= 1 << (i % 8);
489    }
490    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
491    for slot in 0..phys_count {
492        if null_slots.contains(&slot) {
493            continue;
494        }
495        template.push(DataType::Integer.type_tag());
496        let value_offset = template.len();
497        template.extend_from_slice(&[0u8; 8]);
498        slot_offsets.push((slot, value_offset));
499    }
500    IntRowTemplate {
501        template,
502        slot_offsets,
503    }
504}
505
506/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
507#[inline]
508pub fn encode_int_row_with_template(
509    tmpl: &IntRowTemplate,
510    values: &[Value],
511    buf: &mut Vec<u8>,
512) -> Result<()> {
513    buf.clear();
514    buf.extend_from_slice(&tmpl.template);
515    for &(slot, off) in &tmpl.slot_offsets {
516        match &values[slot] {
517            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
518            other => {
519                return Err(SqlError::TypeMismatch {
520                    expected: "Integer".into(),
521                    got: other.data_type().to_string(),
522                });
523            }
524        }
525    }
526    Ok(())
527}
528
529fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
530    match DataType::from_tag(type_tag) {
531        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
532            data[..8].try_into().unwrap(),
533        ))),
534        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
535            data[..8].try_into().unwrap(),
536        ))),
537        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
538        Some(DataType::Text) => {
539            let s = std::str::from_utf8(data)
540                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
541            Ok(Value::Text(CompactString::from(s)))
542        }
543        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
544        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
545            data[..8].try_into().unwrap(),
546        ))),
547        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
548            data[..4].try_into().unwrap(),
549        ))),
550        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
551            data[..8].try_into().unwrap(),
552        ))),
553        Some(DataType::Interval) => {
554            if data.len() < 16 {
555                return Err(SqlError::InvalidValue("truncated interval".into()));
556            }
557            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
558            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
559            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
560            Ok(Value::Interval {
561                months,
562                days,
563                micros,
564            })
565        }
566        Some(DataType::Json) => {
567            let s = std::str::from_utf8(data)
568                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
569            Ok(Value::Json(CompactString::from(s)))
570        }
571        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
572        Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
573        Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
574        Some(DataType::Array) => decode_array_v2(data),
575        _ => Err(SqlError::InvalidValue(format!(
576            "unknown column type tag: {type_tag}"
577        ))),
578    }
579}
580
581fn encoded_array_v2_size(elems: &[Value]) -> usize {
582    let mut total = 4;
583    for elem in elems {
584        if elem.is_null() {
585            total += 1;
586            continue;
587        }
588        total += 1 + 1;
589        let tag = elem.data_type().type_tag();
590        match fixed_width_size(tag) {
591            Some(n) => total += n,
592            None => total += 4 + variable_cell_payload_size(elem),
593        }
594    }
595    total
596}
597
598fn variable_cell_payload_size(v: &Value) -> usize {
599    match v {
600        Value::Text(s) => s.len(),
601        Value::Blob(b) => b.len(),
602        Value::Json(s) => s.len(),
603        Value::Jsonb(b) => b.len(),
604        Value::TsVector(b) => b.len(),
605        Value::TsQuery(b) => b.len(),
606        Value::Array(a) => encoded_array_v2_size(a),
607        _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
608    }
609}
610
611fn value_encoded_size_v2(v: &Value) -> Option<usize> {
612    if v.is_null() {
613        return None;
614    }
615    Some(match fixed_width_size(v.data_type().type_tag()) {
616        Some(n) => n,
617        None => variable_cell_payload_size(v),
618    })
619}
620
621fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
622    match v {
623        Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
624        Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
625        Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
626        Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
627        Value::Blob(b) => out[..b.len()].copy_from_slice(b),
628        Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
629        Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
630        Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
631        Value::Interval {
632            months,
633            days,
634            micros,
635        } => {
636            out[..4].copy_from_slice(&months.to_le_bytes());
637            out[4..8].copy_from_slice(&days.to_le_bytes());
638            out[8..16].copy_from_slice(&micros.to_le_bytes());
639        }
640        Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
641        Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
642        Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
643        Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
644        Value::Array(a) => write_array_v2_into_slice(a, out),
645        Value::Null => unreachable!(),
646    }
647}
648
649fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
650    out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
651    let mut pos = 4;
652    for elem in elems {
653        if elem.is_null() {
654            out[pos] = 0xFF;
655            pos += 1;
656            continue;
657        }
658        out[pos] = 0x00;
659        pos += 1;
660        let tag = elem.data_type().type_tag();
661        out[pos] = tag;
662        pos += 1;
663        match fixed_width_size(tag) {
664            Some(n) => {
665                write_value_payload_v2(elem, &mut out[pos..pos + n]);
666                pos += n;
667            }
668            None => {
669                let payload_len = variable_cell_payload_size(elem);
670                out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
671                pos += 4;
672                write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
673                pos += payload_len;
674            }
675        }
676    }
677}
678
679fn decode_array_v2(data: &[u8]) -> Result<Value> {
680    if data.len() < 4 {
681        return Err(SqlError::InvalidValue("truncated array length".into()));
682    }
683    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
684    let mut pos = 4;
685    let mut elems = Vec::with_capacity(count);
686    for _ in 0..count {
687        if pos >= data.len() {
688            return Err(SqlError::InvalidValue("truncated array elements".into()));
689        }
690        if data[pos] == 0xFF {
691            elems.push(Value::Null);
692            pos += 1;
693            continue;
694        }
695        if data[pos] != 0x00 {
696            return Err(SqlError::InvalidValue(
697                "invalid array element marker".into(),
698            ));
699        }
700        pos += 1;
701        if pos >= data.len() {
702            return Err(SqlError::InvalidValue("truncated array element".into()));
703        }
704        let type_tag = data[pos];
705        pos += 1;
706        let (val, advance) = match fixed_width_size(type_tag) {
707            Some(n) => {
708                if pos + n > data.len() {
709                    return Err(SqlError::InvalidValue(
710                        "truncated fixed-width array element".into(),
711                    ));
712                }
713                let v = decode_value(type_tag, &data[pos..pos + n])?;
714                (v, n)
715            }
716            None => {
717                if pos + 4 > data.len() {
718                    return Err(SqlError::InvalidValue(
719                        "truncated array element length".into(),
720                    ));
721                }
722                let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
723                pos += 4;
724                if pos + len > data.len() {
725                    return Err(SqlError::InvalidValue(
726                        "truncated variable-width array element".into(),
727                    ));
728                }
729                let v = decode_value(type_tag, &data[pos..pos + len])?;
730                (v, len)
731            }
732        };
733        pos += advance;
734        elems.push(val);
735    }
736    Ok(Value::Array(std::sync::Arc::new(elems)))
737}
738
739/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
740/// High bit of `col_count:u16` flags V2.
741#[derive(Clone, Copy, PartialEq, Eq, Debug)]
742pub(crate) enum RowVersion {
743    V1,
744    V2,
745}
746
747pub(crate) const V2_FLAG: u16 = 0x8000;
748pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
749
750#[inline]
751pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
752    match DataType::from_tag(type_tag)? {
753        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
754        DataType::Date => Some(4),
755        DataType::Boolean => Some(1),
756        DataType::Interval => Some(16),
757        DataType::Text
758        | DataType::Blob
759        | DataType::Json
760        | DataType::Jsonb
761        | DataType::TsVector
762        | DataType::TsQuery
763        | DataType::Array
764        | DataType::Null => None,
765    }
766}
767
768#[inline]
769fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
770    if pos >= data.len() {
771        return Err(SqlError::InvalidValue("truncated column data".into()));
772    }
773    let type_tag = data[pos];
774    let after_tag = pos + 1;
775    let (data_len, body_pos) = match version {
776        RowVersion::V2 => match fixed_width_size(type_tag) {
777            Some(n) => (n, after_tag),
778            None => {
779                if after_tag + 4 > data.len() {
780                    return Err(SqlError::InvalidValue("truncated column data".into()));
781                }
782                let len = u32::from_le_bytes([
783                    data[after_tag],
784                    data[after_tag + 1],
785                    data[after_tag + 2],
786                    data[after_tag + 3],
787                ]) as usize;
788                (len, after_tag + 4)
789            }
790        },
791        RowVersion::V1 => {
792            if after_tag + 4 > data.len() {
793                return Err(SqlError::InvalidValue("truncated column data".into()));
794            }
795            let len = u32::from_le_bytes([
796                data[after_tag],
797                data[after_tag + 1],
798                data[after_tag + 2],
799                data[after_tag + 3],
800            ]) as usize;
801            (len, after_tag + 4)
802        }
803    };
804    if body_pos + data_len > data.len() {
805        return Err(SqlError::InvalidValue("truncated column value".into()));
806    }
807    Ok((
808        type_tag,
809        &data[body_pos..body_pos + data_len],
810        body_pos + data_len,
811    ))
812}
813
814#[inline]
815fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
816    let (_, _, next) = read_cell(data, pos, version)?;
817    Ok(next)
818}
819
820fn copy_cell_to_v2(
821    data: &[u8],
822    pos: usize,
823    version: RowVersion,
824    out: &mut Vec<u8>,
825) -> Result<usize> {
826    let (tag, body, next) = read_cell(data, pos, version)?;
827    out.push(tag);
828    if fixed_width_size(tag).is_none() {
829        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
830    }
831    out.extend_from_slice(body);
832    Ok(next)
833}
834
835fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
836    if data.len() < 2 {
837        return Err(SqlError::InvalidValue("row data too short".into()));
838    }
839    let raw = u16::from_le_bytes([data[0], data[1]]);
840    let version = if raw & V2_FLAG != 0 {
841        RowVersion::V2
842    } else {
843        RowVersion::V1
844    };
845    let col_count = (raw & COL_COUNT_MASK) as usize;
846    let bitmap_bytes = col_count.div_ceil(8);
847    let pos = 2;
848    if data.len() < pos + bitmap_bytes {
849        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
850    }
851    Ok((
852        version,
853        col_count,
854        &data[pos..pos + bitmap_bytes],
855        pos + bitmap_bytes,
856    ))
857}
858
859pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
860    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
861
862    let mut values = Vec::with_capacity(col_count);
863    for i in 0..col_count {
864        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
865            values.push(Value::Null);
866            continue;
867        }
868        let (type_tag, body, next) = read_cell(data, pos, version)?;
869        values.push(decode_value(type_tag, body)?);
870        pos = next;
871    }
872
873    Ok(values)
874}
875
876/// Returns the number of non-PK columns stored in a row value blob.
877#[inline]
878pub fn row_non_pk_count(data: &[u8]) -> usize {
879    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
880}
881
882pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
883    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
884
885    for i in 0..col_count {
886        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
887            continue;
888        }
889        let (type_tag, body, next) = read_cell(data, pos, version)?;
890        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
891            out[col_mapping[i]] = decode_value(type_tag, body)?;
892        }
893        pos = next;
894    }
895
896    Ok(())
897}
898
899pub fn decode_pk_into(
900    key: &[u8],
901    count: usize,
902    out: &mut [Value],
903    pk_mapping: &[usize],
904) -> Result<()> {
905    let mut pos = 0;
906    for i in 0..count {
907        let (v, n) = decode_key_value(&key[pos..])?;
908        if i < pk_mapping.len() {
909            out[pk_mapping[i]] = v;
910        }
911        pos += n;
912    }
913    Ok(())
914}
915
916pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
917    if targets.is_empty() {
918        return Ok(Vec::new());
919    }
920    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
921
922    let mut results = Vec::with_capacity(targets.len());
923    let mut ti = 0;
924
925    for col in 0..col_count {
926        if ti >= targets.len() {
927            break;
928        }
929        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
930
931        if col == targets[ti] {
932            if is_null {
933                results.push(Value::Null);
934            } else {
935                let (type_tag, body, next) = read_cell(data, pos, version)?;
936                results.push(decode_value(type_tag, body)?);
937                pos = next;
938            }
939            ti += 1;
940        } else if !is_null {
941            pos = skip_cell(data, pos, version)?;
942        }
943    }
944
945    while ti < targets.len() {
946        results.push(Value::Null);
947        ti += 1;
948    }
949
950    Ok(results)
951}
952
953pub fn decode_columns_into(
954    data: &[u8],
955    targets: &[usize],
956    schema_cols: &[usize],
957    row: &mut [Value],
958) -> Result<()> {
959    if targets.is_empty() {
960        return Ok(());
961    }
962    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
963
964    let mut ti = 0;
965    for col in 0..col_count {
966        if ti >= targets.len() {
967            break;
968        }
969        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
970
971        if col == targets[ti] {
972            if is_null {
973                row[schema_cols[ti]] = Value::Null;
974            } else {
975                let (type_tag, body, next) = read_cell(data, pos, version)?;
976                row[schema_cols[ti]] = decode_value(type_tag, body)?;
977                pos = next;
978            }
979            ti += 1;
980        } else if !is_null {
981            pos = skip_cell(data, pos, version)?;
982        }
983    }
984
985    Ok(())
986}
987
988#[derive(Debug, Clone, Copy)]
989pub enum RawColumn<'a> {
990    Null,
991    Integer(i64),
992    Real(f64),
993    Boolean(bool),
994    Text(&'a str),
995    Blob(&'a [u8]),
996    Time(i64),
997    Date(i32),
998    Timestamp(i64),
999    Interval { months: i32, days: i32, micros: i64 },
1000    Json(&'a str),
1001    Jsonb(&'a [u8]),
1002    TsVector(&'a [u8]),
1003    TsQuery(&'a [u8]),
1004    Array(&'a [u8]),
1005}
1006
1007impl<'a> RawColumn<'a> {
1008    pub fn to_value(self) -> Value {
1009        match self {
1010            RawColumn::Null => Value::Null,
1011            RawColumn::Integer(i) => Value::Integer(i),
1012            RawColumn::Real(r) => Value::Real(r),
1013            RawColumn::Boolean(b) => Value::Boolean(b),
1014            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1015            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1016            RawColumn::Time(t) => Value::Time(t),
1017            RawColumn::Date(d) => Value::Date(d),
1018            RawColumn::Timestamp(t) => Value::Timestamp(t),
1019            RawColumn::Interval {
1020                months,
1021                days,
1022                micros,
1023            } => Value::Interval {
1024                months,
1025                days,
1026                micros,
1027            },
1028            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1029            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1030            RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1031            RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1032            RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1033        }
1034    }
1035
1036    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1037        use std::cmp::Ordering;
1038        match (self, other) {
1039            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1040            (RawColumn::Null, _) | (_, Value::Null) => None,
1041            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1042            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1043            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1044            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1045            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1046            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1047            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1048            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1049            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1050            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1051            (
1052                RawColumn::Interval {
1053                    months: am,
1054                    days: ad,
1055                    micros: au,
1056                },
1057                Value::Interval {
1058                    months: bm,
1059                    days: bd,
1060                    micros: bu,
1061                },
1062            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1063            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1064            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1065            (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1066            (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1067            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1068                Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1069                _ => None,
1070            },
1071            _ => None,
1072        }
1073    }
1074
1075    pub fn eq_value(&self, other: &Value) -> bool {
1076        match (self, other) {
1077            (RawColumn::Null, Value::Null) => true,
1078            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1079            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1080            (RawColumn::Real(a), Value::Real(b)) => a == b,
1081            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1082            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1083            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1084            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1085            (RawColumn::Time(a), Value::Time(b)) => a == b,
1086            (RawColumn::Date(a), Value::Date(b)) => a == b,
1087            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1088            (
1089                RawColumn::Interval {
1090                    months: am,
1091                    days: ad,
1092                    micros: au,
1093                },
1094                Value::Interval {
1095                    months: bm,
1096                    days: bd,
1097                    micros: bu,
1098                },
1099            ) => am == bm && ad == bd && au == bu,
1100            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1101            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1102            (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1103            (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1104            (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1105                Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1106                _ => false,
1107            },
1108            _ => false,
1109        }
1110    }
1111
1112    pub fn as_f64(&self) -> Option<f64> {
1113        match self {
1114            RawColumn::Integer(i) => Some(*i as f64),
1115            RawColumn::Real(r) => Some(*r),
1116            _ => None,
1117        }
1118    }
1119
1120    pub fn as_i64(&self) -> Option<i64> {
1121        match self {
1122            RawColumn::Integer(i) => Some(*i),
1123            RawColumn::Time(t) => Some(*t),
1124            RawColumn::Date(d) => Some(*d as i64),
1125            RawColumn::Timestamp(t) => Some(*t),
1126            _ => None,
1127        }
1128    }
1129}
1130
1131fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1132    match DataType::from_tag(type_tag) {
1133        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1134            data[..8].try_into().unwrap(),
1135        ))),
1136        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1137            data[..8].try_into().unwrap(),
1138        ))),
1139        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1140        Some(DataType::Text) => {
1141            let s = std::str::from_utf8(data)
1142                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1143            Ok(RawColumn::Text(s))
1144        }
1145        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1146        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1147            data[..8].try_into().unwrap(),
1148        ))),
1149        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1150            data[..4].try_into().unwrap(),
1151        ))),
1152        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1153            data[..8].try_into().unwrap(),
1154        ))),
1155        Some(DataType::Interval) => {
1156            if data.len() < 16 {
1157                return Err(SqlError::InvalidValue("truncated interval".into()));
1158            }
1159            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1160            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1161            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1162            Ok(RawColumn::Interval {
1163                months,
1164                days,
1165                micros,
1166            })
1167        }
1168        Some(DataType::Json) => {
1169            let s = std::str::from_utf8(data)
1170                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1171            Ok(RawColumn::Json(s))
1172        }
1173        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1174        Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1175        Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1176        Some(DataType::Array) => Ok(RawColumn::Array(data)),
1177        _ => Err(SqlError::InvalidValue(format!(
1178            "unknown column type tag: {type_tag}"
1179        ))),
1180    }
1181}
1182
1183/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
1184pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1185    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1186    if target >= col_count || new_val.is_null() {
1187        return Ok(false);
1188    }
1189    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1190    if was_null {
1191        return Ok(false);
1192    }
1193    for col in 0..target {
1194        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1195        if !is_null {
1196            pos = skip_cell(data, pos, version)?;
1197        }
1198    }
1199    let type_tag = data[pos];
1200    let (old_data_len, val_start) = match version {
1201        RowVersion::V2 => match fixed_width_size(type_tag) {
1202            Some(n) => (n, pos + 1),
1203            None => {
1204                if pos + 5 > data.len() {
1205                    return Err(SqlError::InvalidValue("truncated column data".into()));
1206                }
1207                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1208                (len, pos + 5)
1209            }
1210        },
1211        RowVersion::V1 => {
1212            if pos + 5 > data.len() {
1213                return Err(SqlError::InvalidValue("truncated column data".into()));
1214            }
1215            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1216            (len, pos + 5)
1217        }
1218    };
1219    let new_data_len = match value_encoded_size_v2(new_val) {
1220        Some(n) => n,
1221        None => return Ok(false),
1222    };
1223    if new_data_len != old_data_len {
1224        return Ok(false);
1225    }
1226    data[pos] = new_val.data_type().type_tag();
1227    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1228    Ok(true)
1229}
1230
1231/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1232pub fn patch_row_column(
1233    data: &[u8],
1234    target: usize,
1235    new_val: &Value,
1236    out: &mut Vec<u8>,
1237) -> Result<()> {
1238    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1239
1240    let new_col_count = if target >= col_count {
1241        target + 1
1242    } else {
1243        col_count
1244    };
1245    let new_bitmap_bytes = new_col_count.div_ceil(8);
1246    let bitmap_bytes = col_count.div_ceil(8);
1247    out.clear();
1248
1249    let header = (new_col_count as u16) | V2_FLAG;
1250    out.extend_from_slice(&header.to_le_bytes());
1251    let bitmap_start = out.len();
1252    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1253    for _ in bitmap_bytes..new_bitmap_bytes {
1254        out.push(0xFF);
1255    }
1256    if new_val.is_null() {
1257        out[bitmap_start + target / 8] |= 1 << (target % 8);
1258    } else {
1259        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1260    }
1261
1262    let mut pos = header_end;
1263    for col in 0..new_col_count {
1264        let was_null = if col < col_count {
1265            bitmap[col / 8] & (1 << (col % 8)) != 0
1266        } else {
1267            true
1268        };
1269
1270        if col == target {
1271            if !was_null {
1272                pos = skip_cell(data, pos, version)?;
1273            }
1274            if !new_val.is_null() {
1275                encode_cell_v2(new_val, out);
1276            }
1277        } else if !was_null {
1278            pos = copy_cell_to_v2(data, pos, version, out)?;
1279        }
1280    }
1281    Ok(())
1282}
1283
1284pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1285    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1286    if target >= col_count {
1287        return Ok(RawColumn::Null);
1288    }
1289
1290    for col in 0..=target {
1291        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1292
1293        if col == target {
1294            if is_null {
1295                return Ok(RawColumn::Null);
1296            }
1297            let (type_tag, body, _) = read_cell(data, pos, version)?;
1298            return decode_value_raw(type_tag, body);
1299        } else if !is_null {
1300            pos = skip_cell(data, pos, version)?;
1301        }
1302    }
1303
1304    unreachable!()
1305}
1306
1307/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1308pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1309    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1310    if target >= col_count {
1311        return Ok((RawColumn::Null, usize::MAX));
1312    }
1313
1314    for col in 0..=target {
1315        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1316
1317        if col == target {
1318            if is_null {
1319                return Ok((RawColumn::Null, usize::MAX));
1320            }
1321            let tag_offset = pos;
1322            let (type_tag, body, _) = read_cell(data, pos, version)?;
1323            let raw = decode_value_raw(type_tag, body)?;
1324            return Ok((raw, tag_offset));
1325        } else if !is_null {
1326            pos = skip_cell(data, pos, version)?;
1327        }
1328    }
1329
1330    unreachable!()
1331}
1332
1333/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1334pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1335    if offset == usize::MAX || new_val.is_null() {
1336        return Ok(false);
1337    }
1338    if data.len() < 2 || offset >= data.len() {
1339        return Err(SqlError::InvalidValue("truncated column data".into()));
1340    }
1341    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1342        RowVersion::V2
1343    } else {
1344        RowVersion::V1
1345    };
1346    let type_tag = data[offset];
1347    let (old_data_len, val_start) = match version {
1348        RowVersion::V2 => match fixed_width_size(type_tag) {
1349            Some(n) => (n, offset + 1),
1350            None => {
1351                if offset + 5 > data.len() {
1352                    return Err(SqlError::InvalidValue("truncated column data".into()));
1353                }
1354                let len =
1355                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1356                (len, offset + 5)
1357            }
1358        },
1359        RowVersion::V1 => {
1360            if offset + 5 > data.len() {
1361                return Err(SqlError::InvalidValue("truncated column data".into()));
1362            }
1363            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1364            (len, offset + 5)
1365        }
1366    };
1367    let new_data_len = match value_encoded_size_v2(new_val) {
1368        Some(n) => n,
1369        None => return Ok(false),
1370    };
1371    if new_data_len != old_data_len {
1372        return Ok(false);
1373    }
1374    data[offset] = new_val.data_type().type_tag();
1375    write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1376    Ok(true)
1377}
1378
1379pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1380    if key.is_empty() || key[0] != TAG_INTEGER {
1381        return Err(SqlError::InvalidValue("not an integer key".into()));
1382    }
1383    let (val, _) = decode_integer(&key[1..])?;
1384    match val {
1385        Value::Integer(i) => Ok(i),
1386        _ => unreachable!(),
1387    }
1388}
1389
1390#[cfg(test)]
1391#[path = "encoding_tests.rs"]
1392mod tests;