Skip to main content

citadel_sql/
encoding.rs

1//! Order-preserving key encoding and row encoding for non-PK column storage.
2
3use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6/// Type tags for order-preserving key encoding.
7const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21
22/// Encode a single value into an order-preserving byte sequence.
23pub fn encode_key_value(value: &Value) -> Vec<u8> {
24    let mut buf = Vec::with_capacity(16);
25    encode_key_value_into(value, &mut buf);
26    buf
27}
28
29/// Encode a composite key (multiple values concatenated).
30pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31    let mut buf = Vec::new();
32    for v in values {
33        buf.extend_from_slice(&encode_key_value(v));
34    }
35    buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39    buf.clear();
40    for v in values {
41        encode_key_value_into(v, buf);
42    }
43}
44
45pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
46    buf.clear();
47    for &i in indices {
48        encode_key_value_into(&row[i as usize], buf);
49    }
50}
51
52#[inline]
53pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
54    buf.clear();
55    encode_signed_varint(TAG_INTEGER, val, buf);
56}
57
58pub(crate) fn encode_key_value_collated_into(
59    value: &Value,
60    coll: crate::types::Collation,
61    buf: &mut Vec<u8>,
62) {
63    match (value, coll) {
64        (Value::Text(s), crate::types::Collation::NoCase) => {
65            encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
66        }
67        (Value::Text(s), crate::types::Collation::Rtrim) => {
68            encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
69        }
70        _ => encode_key_value_into(value, buf),
71    }
72}
73
74pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
75    match value {
76        Value::Null => buf.push(TAG_NULL),
77        Value::Boolean(b) => {
78            buf.push(TAG_BOOLEAN);
79            buf.push(if *b { 0x01 } else { 0x00 });
80        }
81        Value::Integer(i) => encode_integer_into(*i, buf),
82        Value::Real(r) => encode_real_into(*r, buf),
83        Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
84        Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
85        Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
86        Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
87        Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
88        Value::Interval {
89            months,
90            days,
91            micros,
92        } => {
93            // 17 bytes: tag + (i32,i32,i64) BE with sign-flipped high byte per field.
94            buf.push(TAG_INTERVAL);
95            let mut mb = months.to_be_bytes();
96            mb[0] ^= 0x80;
97            buf.extend_from_slice(&mb);
98            let mut db = days.to_be_bytes();
99            db[0] ^= 0x80;
100            buf.extend_from_slice(&db);
101            let mut ub = micros.to_be_bytes();
102            ub[0] ^= 0x80;
103            buf.extend_from_slice(&ub);
104        }
105        Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
106        Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
107        Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
108        Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
109    }
110}
111
112fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
113    encode_signed_varint(TAG_INTEGER, val, buf);
114}
115
116/// Order-preserving variable-width codec for signed i64 with a caller-supplied tag byte.
117/// Layout: [tag] [marker] [data bytes].
118/// marker = 0x80 for zero; 0x80+n for positive (n bytes follow);
119/// 0x80-n for negative (n one's-complemented bytes follow).
120/// Byte-wise lex compare matches signed integer order.
121pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
122    buf.push(tag);
123    if val == 0 {
124        buf.push(0x80);
125        return;
126    }
127    if val > 0 {
128        let bytes = val.to_be_bytes();
129        let start = bytes.iter().position(|&b| b != 0).unwrap();
130        let byte_count = (8 - start) as u8;
131        buf.push(0x80 + byte_count);
132        buf.extend_from_slice(&bytes[start..]);
133    } else {
134        let abs_val = if val == i64::MIN {
135            u64::MAX / 2 + 1
136        } else {
137            (-val) as u64
138        };
139        let bytes = abs_val.to_be_bytes();
140        let start = bytes.iter().position(|&b| b != 0).unwrap();
141        let byte_count = (8 - start) as u8;
142        buf.push(0x80 - byte_count);
143        for &b in &bytes[start..] {
144            buf.push(!b);
145        }
146    }
147}
148
149fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
150    buf.push(TAG_REAL);
151    let bits = val.to_bits();
152    let encoded = if val.is_sign_negative() {
153        !bits
154    } else {
155        bits ^ (1u64 << 63)
156    };
157    buf.extend_from_slice(&encoded.to_be_bytes());
158}
159
160fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
161    buf.push(tag);
162    for &b in data {
163        if b == 0x00 {
164            buf.push(0x00);
165            buf.push(0xFF);
166        } else {
167            buf.push(b);
168        }
169    }
170    buf.push(0x00);
171}
172
173/// Decode a single key value, returning the value and the number of bytes consumed.
174pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
175    if data.is_empty() {
176        return Err(SqlError::InvalidValue("empty key data".into()));
177    }
178    match data[0] {
179        TAG_NULL => Ok((Value::Null, 1)),
180        TAG_BOOLEAN => {
181            if data.len() < 2 {
182                return Err(SqlError::InvalidValue("truncated boolean".into()));
183            }
184            Ok((Value::Boolean(data[1] != 0), 2))
185        }
186        TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
187        TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
188        TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
189        TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
190            let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
191            (Value::Date(d), n + 1)
192        }),
193        TAG_TIMESTAMP => {
194            decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
195        }
196        TAG_INTERVAL => {
197            if data.len() < 1 + 16 {
198                return Err(SqlError::InvalidValue("truncated interval".into()));
199            }
200            let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
201            mb[0] ^= 0x80;
202            let mut db: [u8; 4] = data[5..9].try_into().unwrap();
203            db[0] ^= 0x80;
204            let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
205            ub[0] ^= 0x80;
206            Ok((
207                Value::Interval {
208                    months: i32::from_be_bytes(mb),
209                    days: i32::from_be_bytes(db),
210                    micros: i64::from_be_bytes(ub),
211                },
212                17,
213            ))
214        }
215        TAG_TEXT => {
216            let (bytes, n) = decode_null_escaped(&data[1..])?;
217            let s = String::from_utf8(bytes)
218                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
219            Ok((Value::Text(CompactString::from(s)), n + 1))
220        }
221        TAG_BLOB => {
222            let (bytes, n) = decode_null_escaped(&data[1..])?;
223            Ok((Value::Blob(bytes), n + 1))
224        }
225        TAG_JSON => {
226            let (bytes, n) = decode_null_escaped(&data[1..])?;
227            let s = String::from_utf8(bytes)
228                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
229            Ok((Value::Json(CompactString::from(s)), n + 1))
230        }
231        TAG_JSONB => {
232            let (bytes, n) = decode_null_escaped(&data[1..])?;
233            Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
234        }
235        TAG_TSVECTOR => {
236            let (bytes, n) = decode_null_escaped(&data[1..])?;
237            Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
238        }
239        TAG_TSQUERY => {
240            let (bytes, n) = decode_null_escaped(&data[1..])?;
241            Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
242        }
243        tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
244    }
245}
246
247/// Decode a composite key into multiple values.
248pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
249    let mut values = Vec::with_capacity(count);
250    let mut pos = 0;
251    for _ in 0..count {
252        let (v, n) = decode_key_value(&data[pos..])?;
253        values.push(v);
254        pos += n;
255    }
256    Ok(values)
257}
258
259fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
260    let (v, n) = decode_signed_varint(data)?;
261    Ok((Value::Integer(v), n))
262}
263
264/// Decode the variable-width codec emitted by `encode_signed_varint` (tag byte already consumed).
265pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
266    if data.is_empty() {
267        return Err(SqlError::InvalidValue("truncated integer".into()));
268    }
269    let marker = data[0];
270    if marker == 0x80 {
271        return Ok((0, 1));
272    }
273    if marker > 0x80 {
274        let byte_count = (marker - 0x80) as usize;
275        if data.len() < 1 + byte_count {
276            return Err(SqlError::InvalidValue("truncated positive integer".into()));
277        }
278        let mut bytes = [0u8; 8];
279        bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
280        let val = i64::from_be_bytes(bytes);
281        Ok((val, 1 + byte_count))
282    } else {
283        let byte_count = (0x80 - marker) as usize;
284        if data.len() < 1 + byte_count {
285            return Err(SqlError::InvalidValue("truncated negative integer".into()));
286        }
287        let mut bytes = [0u8; 8];
288        for i in 0..byte_count {
289            bytes[8 - byte_count + i] = !data[1 + i];
290        }
291        let abs_val = u64::from_be_bytes(bytes);
292        let val = (-(abs_val as i128)) as i64;
293        Ok((val, 1 + byte_count))
294    }
295}
296
297fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
298    if data.len() < 8 {
299        return Err(SqlError::InvalidValue("truncated real".into()));
300    }
301    let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
302    let bits = if encoded & (1u64 << 63) != 0 {
303        // Was positive: undo sign bit flip
304        encoded ^ (1u64 << 63)
305    } else {
306        // Was negative: undo full inversion
307        !encoded
308    };
309    let val = f64::from_bits(bits);
310    Ok((Value::Real(val), 8))
311}
312
313/// Decode null-escaped bytes. Returns (decoded bytes, bytes consumed including terminator).
314fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
315    let mut result = Vec::new();
316    let mut i = 0;
317    while i < data.len() {
318        if data[i] == 0x00 {
319            if i + 1 < data.len() && data[i + 1] == 0xFF {
320                result.push(0x00);
321                i += 2;
322            } else {
323                return Ok((result, i + 1)); // terminator consumed
324            }
325        } else {
326            result.push(data[i]);
327            i += 1;
328        }
329    }
330    Err(SqlError::InvalidValue(
331        "unterminated null-escaped string".into(),
332    ))
333}
334
335fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
336    match v {
337        Value::Integer(val) => {
338            buf.push(DataType::Integer.type_tag());
339            buf.extend_from_slice(&val.to_le_bytes());
340        }
341        Value::Real(r) => {
342            buf.push(DataType::Real.type_tag());
343            buf.extend_from_slice(&r.to_le_bytes());
344        }
345        Value::Boolean(b) => {
346            buf.push(DataType::Boolean.type_tag());
347            buf.push(if *b { 1 } else { 0 });
348        }
349        Value::Text(s) => {
350            let bytes = s.as_bytes();
351            buf.push(DataType::Text.type_tag());
352            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
353            buf.extend_from_slice(bytes);
354        }
355        Value::Blob(data) => {
356            buf.push(DataType::Blob.type_tag());
357            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
358            buf.extend_from_slice(data);
359        }
360        Value::Time(t) => {
361            buf.push(DataType::Time.type_tag());
362            buf.extend_from_slice(&t.to_le_bytes());
363        }
364        Value::Date(d) => {
365            buf.push(DataType::Date.type_tag());
366            buf.extend_from_slice(&d.to_le_bytes());
367        }
368        Value::Timestamp(t) => {
369            buf.push(DataType::Timestamp.type_tag());
370            buf.extend_from_slice(&t.to_le_bytes());
371        }
372        Value::Interval {
373            months,
374            days,
375            micros,
376        } => {
377            buf.push(DataType::Interval.type_tag());
378            buf.extend_from_slice(&months.to_le_bytes());
379            buf.extend_from_slice(&days.to_le_bytes());
380            buf.extend_from_slice(&micros.to_le_bytes());
381        }
382        Value::Json(s) => {
383            let bytes = s.as_bytes();
384            buf.push(DataType::Json.type_tag());
385            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
386            buf.extend_from_slice(bytes);
387        }
388        Value::Jsonb(b) => {
389            buf.push(DataType::Jsonb.type_tag());
390            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
391            buf.extend_from_slice(b);
392        }
393        Value::TsVector(b) => {
394            buf.push(DataType::TsVector.type_tag());
395            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
396            buf.extend_from_slice(b);
397        }
398        Value::TsQuery(b) => {
399            buf.push(DataType::TsQuery.type_tag());
400            buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
401            buf.extend_from_slice(b);
402        }
403        Value::Null => unreachable!(),
404    }
405}
406
407pub fn encode_row(values: &[Value]) -> Vec<u8> {
408    let mut buf = Vec::new();
409    encode_row_into(values, &mut buf);
410    buf
411}
412
413pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
414    buf.clear();
415    let col_count = values.len();
416    let bitmap_bytes = col_count.div_ceil(8);
417
418    let header = (col_count as u16) | V2_FLAG;
419    buf.extend_from_slice(&header.to_le_bytes());
420
421    let bitmap_start = buf.len();
422    buf.resize(buf.len() + bitmap_bytes, 0);
423
424    for (i, v) in values.iter().enumerate() {
425        if v.is_null() {
426            buf[bitmap_start + i / 8] |= 1 << (i % 8);
427            continue;
428        }
429        encode_cell_v2(v, buf);
430    }
431}
432
433pub struct IntRowTemplate {
434    pub template: Vec<u8>,
435    pub slot_offsets: Vec<(usize, usize)>,
436}
437
438pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
439    let bitmap_bytes = phys_count.div_ceil(8);
440    let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
441    let header = (phys_count as u16) | V2_FLAG;
442    template.extend_from_slice(&header.to_le_bytes());
443    let bitmap_start = template.len();
444    template.resize(bitmap_start + bitmap_bytes, 0);
445    for &i in null_slots {
446        template[bitmap_start + i / 8] |= 1 << (i % 8);
447    }
448    let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
449    for slot in 0..phys_count {
450        if null_slots.contains(&slot) {
451            continue;
452        }
453        template.push(DataType::Integer.type_tag());
454        let value_offset = template.len();
455        template.extend_from_slice(&[0u8; 8]);
456        slot_offsets.push((slot, value_offset));
457    }
458    IntRowTemplate {
459        template,
460        slot_offsets,
461    }
462}
463
464/// Caller must guarantee every non-NULL `values[slot]` is `Value::Integer`.
465#[inline]
466pub fn encode_int_row_with_template(
467    tmpl: &IntRowTemplate,
468    values: &[Value],
469    buf: &mut Vec<u8>,
470) -> Result<()> {
471    buf.clear();
472    buf.extend_from_slice(&tmpl.template);
473    for &(slot, off) in &tmpl.slot_offsets {
474        match &values[slot] {
475            Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
476            other => {
477                return Err(SqlError::TypeMismatch {
478                    expected: "Integer".into(),
479                    got: other.data_type().to_string(),
480                });
481            }
482        }
483    }
484    Ok(())
485}
486
487fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
488    match DataType::from_tag(type_tag) {
489        Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
490            data[..8].try_into().unwrap(),
491        ))),
492        Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
493            data[..8].try_into().unwrap(),
494        ))),
495        Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
496        Some(DataType::Text) => {
497            let s = std::str::from_utf8(data)
498                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
499            Ok(Value::Text(CompactString::from(s)))
500        }
501        Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
502        Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
503            data[..8].try_into().unwrap(),
504        ))),
505        Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
506            data[..4].try_into().unwrap(),
507        ))),
508        Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
509            data[..8].try_into().unwrap(),
510        ))),
511        Some(DataType::Interval) => {
512            if data.len() < 16 {
513                return Err(SqlError::InvalidValue("truncated interval".into()));
514            }
515            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
516            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
517            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
518            Ok(Value::Interval {
519                months,
520                days,
521                micros,
522            })
523        }
524        Some(DataType::Json) => {
525            let s = std::str::from_utf8(data)
526                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
527            Ok(Value::Json(CompactString::from(s)))
528        }
529        Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
530        Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
531        Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
532        _ => Err(SqlError::InvalidValue(format!(
533            "unknown column type tag: {type_tag}"
534        ))),
535    }
536}
537
538/// V1 cells: `[tag:u8][len:u32][data]`. V2 cells drop `len` for fixed-width types.
539/// High bit of `col_count:u16` flags V2.
540#[derive(Clone, Copy, PartialEq, Eq, Debug)]
541pub(crate) enum RowVersion {
542    V1,
543    V2,
544}
545
546pub(crate) const V2_FLAG: u16 = 0x8000;
547pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
548
549#[inline]
550pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
551    match DataType::from_tag(type_tag)? {
552        DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
553        DataType::Date => Some(4),
554        DataType::Boolean => Some(1),
555        DataType::Interval => Some(16),
556        DataType::Text
557        | DataType::Blob
558        | DataType::Json
559        | DataType::Jsonb
560        | DataType::TsVector
561        | DataType::TsQuery
562        | DataType::Null => None,
563    }
564}
565
566#[inline]
567fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
568    if pos >= data.len() {
569        return Err(SqlError::InvalidValue("truncated column data".into()));
570    }
571    let type_tag = data[pos];
572    let after_tag = pos + 1;
573    let (data_len, body_pos) = match version {
574        RowVersion::V2 => match fixed_width_size(type_tag) {
575            Some(n) => (n, after_tag),
576            None => {
577                if after_tag + 4 > data.len() {
578                    return Err(SqlError::InvalidValue("truncated column data".into()));
579                }
580                let len = u32::from_le_bytes([
581                    data[after_tag],
582                    data[after_tag + 1],
583                    data[after_tag + 2],
584                    data[after_tag + 3],
585                ]) as usize;
586                (len, after_tag + 4)
587            }
588        },
589        RowVersion::V1 => {
590            if after_tag + 4 > data.len() {
591                return Err(SqlError::InvalidValue("truncated column data".into()));
592            }
593            let len = u32::from_le_bytes([
594                data[after_tag],
595                data[after_tag + 1],
596                data[after_tag + 2],
597                data[after_tag + 3],
598            ]) as usize;
599            (len, after_tag + 4)
600        }
601    };
602    if body_pos + data_len > data.len() {
603        return Err(SqlError::InvalidValue("truncated column value".into()));
604    }
605    Ok((
606        type_tag,
607        &data[body_pos..body_pos + data_len],
608        body_pos + data_len,
609    ))
610}
611
612#[inline]
613fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
614    let (_, _, next) = read_cell(data, pos, version)?;
615    Ok(next)
616}
617
618fn copy_cell_to_v2(
619    data: &[u8],
620    pos: usize,
621    version: RowVersion,
622    out: &mut Vec<u8>,
623) -> Result<usize> {
624    let (tag, body, next) = read_cell(data, pos, version)?;
625    out.push(tag);
626    if fixed_width_size(tag).is_none() {
627        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
628    }
629    out.extend_from_slice(body);
630    Ok(next)
631}
632
633fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
634    if data.len() < 2 {
635        return Err(SqlError::InvalidValue("row data too short".into()));
636    }
637    let raw = u16::from_le_bytes([data[0], data[1]]);
638    let version = if raw & V2_FLAG != 0 {
639        RowVersion::V2
640    } else {
641        RowVersion::V1
642    };
643    let col_count = (raw & COL_COUNT_MASK) as usize;
644    let bitmap_bytes = col_count.div_ceil(8);
645    let pos = 2;
646    if data.len() < pos + bitmap_bytes {
647        return Err(SqlError::InvalidValue("truncated null bitmap".into()));
648    }
649    Ok((
650        version,
651        col_count,
652        &data[pos..pos + bitmap_bytes],
653        pos + bitmap_bytes,
654    ))
655}
656
657pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
658    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
659
660    let mut values = Vec::with_capacity(col_count);
661    for i in 0..col_count {
662        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
663            values.push(Value::Null);
664            continue;
665        }
666        let (type_tag, body, next) = read_cell(data, pos, version)?;
667        values.push(decode_value(type_tag, body)?);
668        pos = next;
669    }
670
671    Ok(values)
672}
673
674/// Returns the number of non-PK columns stored in a row value blob.
675#[inline]
676pub fn row_non_pk_count(data: &[u8]) -> usize {
677    (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
678}
679
680pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
681    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
682
683    for i in 0..col_count {
684        if bitmap[i / 8] & (1 << (i % 8)) != 0 {
685            continue;
686        }
687        let (type_tag, body, next) = read_cell(data, pos, version)?;
688        if i < col_mapping.len() && col_mapping[i] != usize::MAX {
689            out[col_mapping[i]] = decode_value(type_tag, body)?;
690        }
691        pos = next;
692    }
693
694    Ok(())
695}
696
697pub fn decode_pk_into(
698    key: &[u8],
699    count: usize,
700    out: &mut [Value],
701    pk_mapping: &[usize],
702) -> Result<()> {
703    let mut pos = 0;
704    for i in 0..count {
705        let (v, n) = decode_key_value(&key[pos..])?;
706        if i < pk_mapping.len() {
707            out[pk_mapping[i]] = v;
708        }
709        pos += n;
710    }
711    Ok(())
712}
713
714pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
715    if targets.is_empty() {
716        return Ok(Vec::new());
717    }
718    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
719
720    let mut results = Vec::with_capacity(targets.len());
721    let mut ti = 0;
722
723    for col in 0..col_count {
724        if ti >= targets.len() {
725            break;
726        }
727        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
728
729        if col == targets[ti] {
730            if is_null {
731                results.push(Value::Null);
732            } else {
733                let (type_tag, body, next) = read_cell(data, pos, version)?;
734                results.push(decode_value(type_tag, body)?);
735                pos = next;
736            }
737            ti += 1;
738        } else if !is_null {
739            pos = skip_cell(data, pos, version)?;
740        }
741    }
742
743    while ti < targets.len() {
744        results.push(Value::Null);
745        ti += 1;
746    }
747
748    Ok(results)
749}
750
751pub fn decode_columns_into(
752    data: &[u8],
753    targets: &[usize],
754    schema_cols: &[usize],
755    row: &mut [Value],
756) -> Result<()> {
757    if targets.is_empty() {
758        return Ok(());
759    }
760    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
761
762    let mut ti = 0;
763    for col in 0..col_count {
764        if ti >= targets.len() {
765            break;
766        }
767        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
768
769        if col == targets[ti] {
770            if is_null {
771                row[schema_cols[ti]] = Value::Null;
772            } else {
773                let (type_tag, body, next) = read_cell(data, pos, version)?;
774                row[schema_cols[ti]] = decode_value(type_tag, body)?;
775                pos = next;
776            }
777            ti += 1;
778        } else if !is_null {
779            pos = skip_cell(data, pos, version)?;
780        }
781    }
782
783    Ok(())
784}
785
786#[derive(Debug, Clone, Copy)]
787pub enum RawColumn<'a> {
788    Null,
789    Integer(i64),
790    Real(f64),
791    Boolean(bool),
792    Text(&'a str),
793    Blob(&'a [u8]),
794    Time(i64),
795    Date(i32),
796    Timestamp(i64),
797    Interval { months: i32, days: i32, micros: i64 },
798    Json(&'a str),
799    Jsonb(&'a [u8]),
800    TsVector(&'a [u8]),
801    TsQuery(&'a [u8]),
802}
803
804impl<'a> RawColumn<'a> {
805    pub fn to_value(self) -> Value {
806        match self {
807            RawColumn::Null => Value::Null,
808            RawColumn::Integer(i) => Value::Integer(i),
809            RawColumn::Real(r) => Value::Real(r),
810            RawColumn::Boolean(b) => Value::Boolean(b),
811            RawColumn::Text(s) => Value::Text(CompactString::from(s)),
812            RawColumn::Blob(b) => Value::Blob(b.to_vec()),
813            RawColumn::Time(t) => Value::Time(t),
814            RawColumn::Date(d) => Value::Date(d),
815            RawColumn::Timestamp(t) => Value::Timestamp(t),
816            RawColumn::Interval {
817                months,
818                days,
819                micros,
820            } => Value::Interval {
821                months,
822                days,
823                micros,
824            },
825            RawColumn::Json(s) => Value::Json(CompactString::from(s)),
826            RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
827            RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
828            RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
829        }
830    }
831
832    pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
833        use std::cmp::Ordering;
834        match (self, other) {
835            (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
836            (RawColumn::Null, _) | (_, Value::Null) => None,
837            (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
838            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
839            (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
840            (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
841            (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
842            (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
843            (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
844            (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
845            (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
846            (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
847            (
848                RawColumn::Interval {
849                    months: am,
850                    days: ad,
851                    micros: au,
852                },
853                Value::Interval {
854                    months: bm,
855                    days: bd,
856                    micros: bu,
857                },
858            ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
859            (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
860            (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
861            (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
862            (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
863            _ => None,
864        }
865    }
866
867    pub fn eq_value(&self, other: &Value) -> bool {
868        match (self, other) {
869            (RawColumn::Null, Value::Null) => true,
870            (RawColumn::Integer(a), Value::Integer(b)) => a == b,
871            (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
872            (RawColumn::Real(a), Value::Real(b)) => a == b,
873            (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
874            (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
875            (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
876            (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
877            (RawColumn::Time(a), Value::Time(b)) => a == b,
878            (RawColumn::Date(a), Value::Date(b)) => a == b,
879            (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
880            (
881                RawColumn::Interval {
882                    months: am,
883                    days: ad,
884                    micros: au,
885                },
886                Value::Interval {
887                    months: bm,
888                    days: bd,
889                    micros: bu,
890                },
891            ) => am == bm && ad == bd && au == bu,
892            (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
893            (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
894            (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
895            (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
896            _ => false,
897        }
898    }
899
900    pub fn as_f64(&self) -> Option<f64> {
901        match self {
902            RawColumn::Integer(i) => Some(*i as f64),
903            RawColumn::Real(r) => Some(*r),
904            _ => None,
905        }
906    }
907
908    pub fn as_i64(&self) -> Option<i64> {
909        match self {
910            RawColumn::Integer(i) => Some(*i),
911            RawColumn::Time(t) => Some(*t),
912            RawColumn::Date(d) => Some(*d as i64),
913            RawColumn::Timestamp(t) => Some(*t),
914            _ => None,
915        }
916    }
917}
918
919fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
920    match DataType::from_tag(type_tag) {
921        Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
922            data[..8].try_into().unwrap(),
923        ))),
924        Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
925            data[..8].try_into().unwrap(),
926        ))),
927        Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
928        Some(DataType::Text) => {
929            let s = std::str::from_utf8(data)
930                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
931            Ok(RawColumn::Text(s))
932        }
933        Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
934        Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
935            data[..8].try_into().unwrap(),
936        ))),
937        Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
938            data[..4].try_into().unwrap(),
939        ))),
940        Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
941            data[..8].try_into().unwrap(),
942        ))),
943        Some(DataType::Interval) => {
944            if data.len() < 16 {
945                return Err(SqlError::InvalidValue("truncated interval".into()));
946            }
947            let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
948            let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
949            let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
950            Ok(RawColumn::Interval {
951                months,
952                days,
953                micros,
954            })
955        }
956        Some(DataType::Json) => {
957            let s = std::str::from_utf8(data)
958                .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
959            Ok(RawColumn::Json(s))
960        }
961        Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
962        Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
963        Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
964        _ => Err(SqlError::InvalidValue(format!(
965            "unknown column type tag: {type_tag}"
966        ))),
967    }
968}
969
970/// Patch column in-place if value size unchanged. Ok(false) = size mismatch, use `patch_row_column`.
971pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
972    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
973    if target >= col_count || new_val.is_null() {
974        return Ok(false);
975    }
976    let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
977    if was_null {
978        return Ok(false);
979    }
980    for col in 0..target {
981        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
982        if !is_null {
983            pos = skip_cell(data, pos, version)?;
984        }
985    }
986    let type_tag = data[pos];
987    let (old_data_len, val_start) = match version {
988        RowVersion::V2 => match fixed_width_size(type_tag) {
989            Some(n) => (n, pos + 1),
990            None => {
991                if pos + 5 > data.len() {
992                    return Err(SqlError::InvalidValue("truncated column data".into()));
993                }
994                let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
995                (len, pos + 5)
996            }
997        },
998        RowVersion::V1 => {
999            if pos + 5 > data.len() {
1000                return Err(SqlError::InvalidValue("truncated column data".into()));
1001            }
1002            let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1003            (len, pos + 5)
1004        }
1005    };
1006    let new_data_len = match new_val {
1007        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1008        Value::Date(_) => 4,
1009        Value::Interval { .. } => 16,
1010        Value::Boolean(_) => 1,
1011        Value::Text(s) => s.len(),
1012        Value::Blob(b) => b.len(),
1013        Value::Json(s) => s.len(),
1014        Value::Jsonb(b) => b.len(),
1015        Value::TsVector(b) => b.len(),
1016        Value::TsQuery(b) => b.len(),
1017        Value::Null => return Ok(false),
1018    };
1019    if new_data_len != old_data_len {
1020        return Ok(false);
1021    }
1022    data[pos] = new_val.data_type().type_tag();
1023    match new_val {
1024        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1025        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1026        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1027        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1028        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1029        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1030        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1031        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1032        Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1033        Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1034        Value::TsVector(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1035        Value::TsQuery(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1036        Value::Interval {
1037            months,
1038            days,
1039            micros,
1040        } => {
1041            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1042            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1043            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1044        }
1045        Value::Null => unreachable!(),
1046    }
1047    Ok(true)
1048}
1049
1050/// Patch a single column in encoded row, writing result into `out`. Copies others unchanged.
1051pub fn patch_row_column(
1052    data: &[u8],
1053    target: usize,
1054    new_val: &Value,
1055    out: &mut Vec<u8>,
1056) -> Result<()> {
1057    let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1058
1059    let new_col_count = if target >= col_count {
1060        target + 1
1061    } else {
1062        col_count
1063    };
1064    let new_bitmap_bytes = new_col_count.div_ceil(8);
1065    let bitmap_bytes = col_count.div_ceil(8);
1066    out.clear();
1067
1068    let header = (new_col_count as u16) | V2_FLAG;
1069    out.extend_from_slice(&header.to_le_bytes());
1070    let bitmap_start = out.len();
1071    out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1072    for _ in bitmap_bytes..new_bitmap_bytes {
1073        out.push(0xFF);
1074    }
1075    if new_val.is_null() {
1076        out[bitmap_start + target / 8] |= 1 << (target % 8);
1077    } else {
1078        out[bitmap_start + target / 8] &= !(1 << (target % 8));
1079    }
1080
1081    let mut pos = header_end;
1082    for col in 0..new_col_count {
1083        let was_null = if col < col_count {
1084            bitmap[col / 8] & (1 << (col % 8)) != 0
1085        } else {
1086            true
1087        };
1088
1089        if col == target {
1090            if !was_null {
1091                pos = skip_cell(data, pos, version)?;
1092            }
1093            if !new_val.is_null() {
1094                encode_cell_v2(new_val, out);
1095            }
1096        } else if !was_null {
1097            pos = copy_cell_to_v2(data, pos, version, out)?;
1098        }
1099    }
1100    Ok(())
1101}
1102
1103pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1104    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1105    if target >= col_count {
1106        return Ok(RawColumn::Null);
1107    }
1108
1109    for col in 0..=target {
1110        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1111
1112        if col == target {
1113            if is_null {
1114                return Ok(RawColumn::Null);
1115            }
1116            let (type_tag, body, _) = read_cell(data, pos, version)?;
1117            return decode_value_raw(type_tag, body);
1118        } else if !is_null {
1119            pos = skip_cell(data, pos, version)?;
1120        }
1121    }
1122
1123    unreachable!()
1124}
1125
1126/// Like `decode_column_raw` but also returns the byte offset (usize::MAX if NULL).
1127pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1128    let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1129    if target >= col_count {
1130        return Ok((RawColumn::Null, usize::MAX));
1131    }
1132
1133    for col in 0..=target {
1134        let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1135
1136        if col == target {
1137            if is_null {
1138                return Ok((RawColumn::Null, usize::MAX));
1139            }
1140            let tag_offset = pos;
1141            let (type_tag, body, _) = read_cell(data, pos, version)?;
1142            let raw = decode_value_raw(type_tag, body)?;
1143            return Ok((raw, tag_offset));
1144        } else if !is_null {
1145            pos = skip_cell(data, pos, version)?;
1146        }
1147    }
1148
1149    unreachable!()
1150}
1151
1152/// Patch at a known byte offset. Ok(false) if size mismatch or NULL offset.
1153pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1154    if offset == usize::MAX || new_val.is_null() {
1155        return Ok(false);
1156    }
1157    if data.len() < 2 || offset >= data.len() {
1158        return Err(SqlError::InvalidValue("truncated column data".into()));
1159    }
1160    let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1161        RowVersion::V2
1162    } else {
1163        RowVersion::V1
1164    };
1165    let type_tag = data[offset];
1166    let (old_data_len, val_start) = match version {
1167        RowVersion::V2 => match fixed_width_size(type_tag) {
1168            Some(n) => (n, offset + 1),
1169            None => {
1170                if offset + 5 > data.len() {
1171                    return Err(SqlError::InvalidValue("truncated column data".into()));
1172                }
1173                let len =
1174                    u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1175                (len, offset + 5)
1176            }
1177        },
1178        RowVersion::V1 => {
1179            if offset + 5 > data.len() {
1180                return Err(SqlError::InvalidValue("truncated column data".into()));
1181            }
1182            let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1183            (len, offset + 5)
1184        }
1185    };
1186    let new_data_len = match new_val {
1187        Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1188        Value::Date(_) => 4,
1189        Value::Interval { .. } => 16,
1190        Value::Boolean(_) => 1,
1191        Value::Text(s) => s.len(),
1192        Value::Blob(b) => b.len(),
1193        Value::Json(s) => s.len(),
1194        Value::Jsonb(b) => b.len(),
1195        Value::TsVector(b) => b.len(),
1196        Value::TsQuery(b) => b.len(),
1197        Value::Null => return Ok(false),
1198    };
1199    if new_data_len != old_data_len {
1200        return Ok(false);
1201    }
1202    data[offset] = new_val.data_type().type_tag();
1203    match new_val {
1204        Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1205        Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1206        Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1207        Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1208        Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1209        Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1210        Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1211        Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1212        Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1213        Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1214        Value::TsVector(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1215        Value::TsQuery(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1216        Value::Interval {
1217            months,
1218            days,
1219            micros,
1220        } => {
1221            data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1222            data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1223            data[val_start + 8..val_start + 16].copy_from_slice(&micros.to_le_bytes());
1224        }
1225        Value::Null => unreachable!(),
1226    }
1227    Ok(true)
1228}
1229
1230pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1231    if key.is_empty() || key[0] != TAG_INTEGER {
1232        return Err(SqlError::InvalidValue("not an integer key".into()));
1233    }
1234    let (val, _) = decode_integer(&key[1..])?;
1235    match val {
1236        Value::Integer(i) => Ok(i),
1237        _ => unreachable!(),
1238    }
1239}
1240
1241#[cfg(test)]
1242#[path = "encoding_tests.rs"]
1243mod tests;