1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19
20pub fn encode_key_value(value: &Value) -> Vec<u8> {
22 let mut buf = Vec::with_capacity(16);
23 encode_key_value_into(value, &mut buf);
24 buf
25}
26
27pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
29 let mut buf = Vec::new();
30 for v in values {
31 buf.extend_from_slice(&encode_key_value(v));
32 }
33 buf
34}
35
36pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
37 buf.clear();
38 for v in values {
39 encode_key_value_into(v, buf);
40 }
41}
42
43pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
44 buf.clear();
45 for &i in indices {
46 encode_key_value_into(&row[i as usize], buf);
47 }
48}
49
50#[inline]
51pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
52 buf.clear();
53 encode_signed_varint(TAG_INTEGER, val, buf);
54}
55
56pub(crate) fn encode_key_value_collated_into(
57 value: &Value,
58 coll: crate::types::Collation,
59 buf: &mut Vec<u8>,
60) {
61 match (value, coll) {
62 (Value::Text(s), crate::types::Collation::NoCase) => {
63 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
64 }
65 (Value::Text(s), crate::types::Collation::Rtrim) => {
66 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
67 }
68 _ => encode_key_value_into(value, buf),
69 }
70}
71
72pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
73 match value {
74 Value::Null => buf.push(TAG_NULL),
75 Value::Boolean(b) => {
76 buf.push(TAG_BOOLEAN);
77 buf.push(if *b { 0x01 } else { 0x00 });
78 }
79 Value::Integer(i) => encode_integer_into(*i, buf),
80 Value::Real(r) => encode_real_into(*r, buf),
81 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
82 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
83 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
84 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
85 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
86 Value::Interval {
87 months,
88 days,
89 micros,
90 } => {
91 buf.push(TAG_INTERVAL);
93 let mut mb = months.to_be_bytes();
94 mb[0] ^= 0x80;
95 buf.extend_from_slice(&mb);
96 let mut db = days.to_be_bytes();
97 db[0] ^= 0x80;
98 buf.extend_from_slice(&db);
99 let mut ub = micros.to_be_bytes();
100 ub[0] ^= 0x80;
101 buf.extend_from_slice(&ub);
102 }
103 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
104 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
105 }
106}
107
108fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
109 encode_signed_varint(TAG_INTEGER, val, buf);
110}
111
112pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
118 buf.push(tag);
119 if val == 0 {
120 buf.push(0x80);
121 return;
122 }
123 if val > 0 {
124 let bytes = val.to_be_bytes();
125 let start = bytes.iter().position(|&b| b != 0).unwrap();
126 let byte_count = (8 - start) as u8;
127 buf.push(0x80 + byte_count);
128 buf.extend_from_slice(&bytes[start..]);
129 } else {
130 let abs_val = if val == i64::MIN {
131 u64::MAX / 2 + 1
132 } else {
133 (-val) as u64
134 };
135 let bytes = abs_val.to_be_bytes();
136 let start = bytes.iter().position(|&b| b != 0).unwrap();
137 let byte_count = (8 - start) as u8;
138 buf.push(0x80 - byte_count);
139 for &b in &bytes[start..] {
140 buf.push(!b);
141 }
142 }
143}
144
145fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
146 buf.push(TAG_REAL);
147 let bits = val.to_bits();
148 let encoded = if val.is_sign_negative() {
149 !bits
150 } else {
151 bits ^ (1u64 << 63)
152 };
153 buf.extend_from_slice(&encoded.to_be_bytes());
154}
155
156fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
157 buf.push(tag);
158 for &b in data {
159 if b == 0x00 {
160 buf.push(0x00);
161 buf.push(0xFF);
162 } else {
163 buf.push(b);
164 }
165 }
166 buf.push(0x00);
167}
168
169pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
171 if data.is_empty() {
172 return Err(SqlError::InvalidValue("empty key data".into()));
173 }
174 match data[0] {
175 TAG_NULL => Ok((Value::Null, 1)),
176 TAG_BOOLEAN => {
177 if data.len() < 2 {
178 return Err(SqlError::InvalidValue("truncated boolean".into()));
179 }
180 Ok((Value::Boolean(data[1] != 0), 2))
181 }
182 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
183 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
184 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
185 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
186 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
187 (Value::Date(d), n + 1)
188 }),
189 TAG_TIMESTAMP => {
190 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
191 }
192 TAG_INTERVAL => {
193 if data.len() < 1 + 16 {
194 return Err(SqlError::InvalidValue("truncated interval".into()));
195 }
196 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
197 mb[0] ^= 0x80;
198 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
199 db[0] ^= 0x80;
200 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
201 ub[0] ^= 0x80;
202 Ok((
203 Value::Interval {
204 months: i32::from_be_bytes(mb),
205 days: i32::from_be_bytes(db),
206 micros: i64::from_be_bytes(ub),
207 },
208 17,
209 ))
210 }
211 TAG_TEXT => {
212 let (bytes, n) = decode_null_escaped(&data[1..])?;
213 let s = String::from_utf8(bytes)
214 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
215 Ok((Value::Text(CompactString::from(s)), n + 1))
216 }
217 TAG_BLOB => {
218 let (bytes, n) = decode_null_escaped(&data[1..])?;
219 Ok((Value::Blob(bytes), n + 1))
220 }
221 TAG_JSON => {
222 let (bytes, n) = decode_null_escaped(&data[1..])?;
223 let s = String::from_utf8(bytes)
224 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
225 Ok((Value::Json(CompactString::from(s)), n + 1))
226 }
227 TAG_JSONB => {
228 let (bytes, n) = decode_null_escaped(&data[1..])?;
229 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
230 }
231 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
232 }
233}
234
235pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
237 let mut values = Vec::with_capacity(count);
238 let mut pos = 0;
239 for _ in 0..count {
240 let (v, n) = decode_key_value(&data[pos..])?;
241 values.push(v);
242 pos += n;
243 }
244 Ok(values)
245}
246
247fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
248 let (v, n) = decode_signed_varint(data)?;
249 Ok((Value::Integer(v), n))
250}
251
252pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
254 if data.is_empty() {
255 return Err(SqlError::InvalidValue("truncated integer".into()));
256 }
257 let marker = data[0];
258 if marker == 0x80 {
259 return Ok((0, 1));
260 }
261 if marker > 0x80 {
262 let byte_count = (marker - 0x80) as usize;
263 if data.len() < 1 + byte_count {
264 return Err(SqlError::InvalidValue("truncated positive integer".into()));
265 }
266 let mut bytes = [0u8; 8];
267 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
268 let val = i64::from_be_bytes(bytes);
269 Ok((val, 1 + byte_count))
270 } else {
271 let byte_count = (0x80 - marker) as usize;
272 if data.len() < 1 + byte_count {
273 return Err(SqlError::InvalidValue("truncated negative integer".into()));
274 }
275 let mut bytes = [0u8; 8];
276 for i in 0..byte_count {
277 bytes[8 - byte_count + i] = !data[1 + i];
278 }
279 let abs_val = u64::from_be_bytes(bytes);
280 let val = (-(abs_val as i128)) as i64;
281 Ok((val, 1 + byte_count))
282 }
283}
284
285fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
286 if data.len() < 8 {
287 return Err(SqlError::InvalidValue("truncated real".into()));
288 }
289 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
290 let bits = if encoded & (1u64 << 63) != 0 {
291 encoded ^ (1u64 << 63)
293 } else {
294 !encoded
296 };
297 let val = f64::from_bits(bits);
298 Ok((Value::Real(val), 8))
299}
300
301fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
303 let mut result = Vec::new();
304 let mut i = 0;
305 while i < data.len() {
306 if data[i] == 0x00 {
307 if i + 1 < data.len() && data[i + 1] == 0xFF {
308 result.push(0x00);
309 i += 2;
310 } else {
311 return Ok((result, i + 1)); }
313 } else {
314 result.push(data[i]);
315 i += 1;
316 }
317 }
318 Err(SqlError::InvalidValue(
319 "unterminated null-escaped string".into(),
320 ))
321}
322
323fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
324 match v {
325 Value::Integer(val) => {
326 buf.push(DataType::Integer.type_tag());
327 buf.extend_from_slice(&val.to_le_bytes());
328 }
329 Value::Real(r) => {
330 buf.push(DataType::Real.type_tag());
331 buf.extend_from_slice(&r.to_le_bytes());
332 }
333 Value::Boolean(b) => {
334 buf.push(DataType::Boolean.type_tag());
335 buf.push(if *b { 1 } else { 0 });
336 }
337 Value::Text(s) => {
338 let bytes = s.as_bytes();
339 buf.push(DataType::Text.type_tag());
340 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
341 buf.extend_from_slice(bytes);
342 }
343 Value::Blob(data) => {
344 buf.push(DataType::Blob.type_tag());
345 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
346 buf.extend_from_slice(data);
347 }
348 Value::Time(t) => {
349 buf.push(DataType::Time.type_tag());
350 buf.extend_from_slice(&t.to_le_bytes());
351 }
352 Value::Date(d) => {
353 buf.push(DataType::Date.type_tag());
354 buf.extend_from_slice(&d.to_le_bytes());
355 }
356 Value::Timestamp(t) => {
357 buf.push(DataType::Timestamp.type_tag());
358 buf.extend_from_slice(&t.to_le_bytes());
359 }
360 Value::Interval {
361 months,
362 days,
363 micros,
364 } => {
365 buf.push(DataType::Interval.type_tag());
366 buf.extend_from_slice(&months.to_le_bytes());
367 buf.extend_from_slice(&days.to_le_bytes());
368 buf.extend_from_slice(µs.to_le_bytes());
369 }
370 Value::Json(s) => {
371 let bytes = s.as_bytes();
372 buf.push(DataType::Json.type_tag());
373 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
374 buf.extend_from_slice(bytes);
375 }
376 Value::Jsonb(b) => {
377 buf.push(DataType::Jsonb.type_tag());
378 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
379 buf.extend_from_slice(b);
380 }
381 Value::Null => unreachable!(),
382 }
383}
384
385pub fn encode_row(values: &[Value]) -> Vec<u8> {
386 let mut buf = Vec::new();
387 encode_row_into(values, &mut buf);
388 buf
389}
390
391pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
392 buf.clear();
393 let col_count = values.len();
394 let bitmap_bytes = col_count.div_ceil(8);
395
396 let header = (col_count as u16) | V2_FLAG;
397 buf.extend_from_slice(&header.to_le_bytes());
398
399 let bitmap_start = buf.len();
400 buf.resize(buf.len() + bitmap_bytes, 0);
401
402 for (i, v) in values.iter().enumerate() {
403 if v.is_null() {
404 buf[bitmap_start + i / 8] |= 1 << (i % 8);
405 continue;
406 }
407 encode_cell_v2(v, buf);
408 }
409}
410
411pub struct IntRowTemplate {
412 pub template: Vec<u8>,
413 pub slot_offsets: Vec<(usize, usize)>,
414}
415
416pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
417 let bitmap_bytes = phys_count.div_ceil(8);
418 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
419 let header = (phys_count as u16) | V2_FLAG;
420 template.extend_from_slice(&header.to_le_bytes());
421 let bitmap_start = template.len();
422 template.resize(bitmap_start + bitmap_bytes, 0);
423 for &i in null_slots {
424 template[bitmap_start + i / 8] |= 1 << (i % 8);
425 }
426 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
427 for slot in 0..phys_count {
428 if null_slots.contains(&slot) {
429 continue;
430 }
431 template.push(DataType::Integer.type_tag());
432 let value_offset = template.len();
433 template.extend_from_slice(&[0u8; 8]);
434 slot_offsets.push((slot, value_offset));
435 }
436 IntRowTemplate {
437 template,
438 slot_offsets,
439 }
440}
441
442#[inline]
444pub fn encode_int_row_with_template(
445 tmpl: &IntRowTemplate,
446 values: &[Value],
447 buf: &mut Vec<u8>,
448) -> Result<()> {
449 buf.clear();
450 buf.extend_from_slice(&tmpl.template);
451 for &(slot, off) in &tmpl.slot_offsets {
452 match &values[slot] {
453 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
454 other => {
455 return Err(SqlError::TypeMismatch {
456 expected: "Integer".into(),
457 got: other.data_type().to_string(),
458 });
459 }
460 }
461 }
462 Ok(())
463}
464
465fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
466 match DataType::from_tag(type_tag) {
467 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
468 data[..8].try_into().unwrap(),
469 ))),
470 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
471 data[..8].try_into().unwrap(),
472 ))),
473 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
474 Some(DataType::Text) => {
475 let s = std::str::from_utf8(data)
476 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
477 Ok(Value::Text(CompactString::from(s)))
478 }
479 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
480 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
481 data[..8].try_into().unwrap(),
482 ))),
483 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
484 data[..4].try_into().unwrap(),
485 ))),
486 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
487 data[..8].try_into().unwrap(),
488 ))),
489 Some(DataType::Interval) => {
490 if data.len() < 16 {
491 return Err(SqlError::InvalidValue("truncated interval".into()));
492 }
493 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
494 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
495 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
496 Ok(Value::Interval {
497 months,
498 days,
499 micros,
500 })
501 }
502 Some(DataType::Json) => {
503 let s = std::str::from_utf8(data)
504 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
505 Ok(Value::Json(CompactString::from(s)))
506 }
507 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
508 _ => Err(SqlError::InvalidValue(format!(
509 "unknown column type tag: {type_tag}"
510 ))),
511 }
512}
513
514#[derive(Clone, Copy, PartialEq, Eq, Debug)]
517pub(crate) enum RowVersion {
518 V1,
519 V2,
520}
521
522pub(crate) const V2_FLAG: u16 = 0x8000;
523pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
524
525#[inline]
526pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
527 match DataType::from_tag(type_tag)? {
528 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
529 DataType::Date => Some(4),
530 DataType::Boolean => Some(1),
531 DataType::Interval => Some(16),
532 DataType::Text | DataType::Blob | DataType::Json | DataType::Jsonb | DataType::Null => None,
533 }
534}
535
536#[inline]
537fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
538 if pos >= data.len() {
539 return Err(SqlError::InvalidValue("truncated column data".into()));
540 }
541 let type_tag = data[pos];
542 let after_tag = pos + 1;
543 let (data_len, body_pos) = match version {
544 RowVersion::V2 => match fixed_width_size(type_tag) {
545 Some(n) => (n, after_tag),
546 None => {
547 if after_tag + 4 > data.len() {
548 return Err(SqlError::InvalidValue("truncated column data".into()));
549 }
550 let len = u32::from_le_bytes([
551 data[after_tag],
552 data[after_tag + 1],
553 data[after_tag + 2],
554 data[after_tag + 3],
555 ]) as usize;
556 (len, after_tag + 4)
557 }
558 },
559 RowVersion::V1 => {
560 if after_tag + 4 > data.len() {
561 return Err(SqlError::InvalidValue("truncated column data".into()));
562 }
563 let len = u32::from_le_bytes([
564 data[after_tag],
565 data[after_tag + 1],
566 data[after_tag + 2],
567 data[after_tag + 3],
568 ]) as usize;
569 (len, after_tag + 4)
570 }
571 };
572 if body_pos + data_len > data.len() {
573 return Err(SqlError::InvalidValue("truncated column value".into()));
574 }
575 Ok((
576 type_tag,
577 &data[body_pos..body_pos + data_len],
578 body_pos + data_len,
579 ))
580}
581
582#[inline]
583fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
584 let (_, _, next) = read_cell(data, pos, version)?;
585 Ok(next)
586}
587
588fn copy_cell_to_v2(
589 data: &[u8],
590 pos: usize,
591 version: RowVersion,
592 out: &mut Vec<u8>,
593) -> Result<usize> {
594 let (tag, body, next) = read_cell(data, pos, version)?;
595 out.push(tag);
596 if fixed_width_size(tag).is_none() {
597 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
598 }
599 out.extend_from_slice(body);
600 Ok(next)
601}
602
603fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
604 if data.len() < 2 {
605 return Err(SqlError::InvalidValue("row data too short".into()));
606 }
607 let raw = u16::from_le_bytes([data[0], data[1]]);
608 let version = if raw & V2_FLAG != 0 {
609 RowVersion::V2
610 } else {
611 RowVersion::V1
612 };
613 let col_count = (raw & COL_COUNT_MASK) as usize;
614 let bitmap_bytes = col_count.div_ceil(8);
615 let pos = 2;
616 if data.len() < pos + bitmap_bytes {
617 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
618 }
619 Ok((
620 version,
621 col_count,
622 &data[pos..pos + bitmap_bytes],
623 pos + bitmap_bytes,
624 ))
625}
626
627pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
628 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
629
630 let mut values = Vec::with_capacity(col_count);
631 for i in 0..col_count {
632 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
633 values.push(Value::Null);
634 continue;
635 }
636 let (type_tag, body, next) = read_cell(data, pos, version)?;
637 values.push(decode_value(type_tag, body)?);
638 pos = next;
639 }
640
641 Ok(values)
642}
643
644#[inline]
646pub fn row_non_pk_count(data: &[u8]) -> usize {
647 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
648}
649
650pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
651 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
652
653 for i in 0..col_count {
654 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
655 continue;
656 }
657 let (type_tag, body, next) = read_cell(data, pos, version)?;
658 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
659 out[col_mapping[i]] = decode_value(type_tag, body)?;
660 }
661 pos = next;
662 }
663
664 Ok(())
665}
666
667pub fn decode_pk_into(
668 key: &[u8],
669 count: usize,
670 out: &mut [Value],
671 pk_mapping: &[usize],
672) -> Result<()> {
673 let mut pos = 0;
674 for i in 0..count {
675 let (v, n) = decode_key_value(&key[pos..])?;
676 if i < pk_mapping.len() {
677 out[pk_mapping[i]] = v;
678 }
679 pos += n;
680 }
681 Ok(())
682}
683
684pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
685 if targets.is_empty() {
686 return Ok(Vec::new());
687 }
688 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
689
690 let mut results = Vec::with_capacity(targets.len());
691 let mut ti = 0;
692
693 for col in 0..col_count {
694 if ti >= targets.len() {
695 break;
696 }
697 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
698
699 if col == targets[ti] {
700 if is_null {
701 results.push(Value::Null);
702 } else {
703 let (type_tag, body, next) = read_cell(data, pos, version)?;
704 results.push(decode_value(type_tag, body)?);
705 pos = next;
706 }
707 ti += 1;
708 } else if !is_null {
709 pos = skip_cell(data, pos, version)?;
710 }
711 }
712
713 while ti < targets.len() {
714 results.push(Value::Null);
715 ti += 1;
716 }
717
718 Ok(results)
719}
720
721pub fn decode_columns_into(
722 data: &[u8],
723 targets: &[usize],
724 schema_cols: &[usize],
725 row: &mut [Value],
726) -> Result<()> {
727 if targets.is_empty() {
728 return Ok(());
729 }
730 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
731
732 let mut ti = 0;
733 for col in 0..col_count {
734 if ti >= targets.len() {
735 break;
736 }
737 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
738
739 if col == targets[ti] {
740 if is_null {
741 row[schema_cols[ti]] = Value::Null;
742 } else {
743 let (type_tag, body, next) = read_cell(data, pos, version)?;
744 row[schema_cols[ti]] = decode_value(type_tag, body)?;
745 pos = next;
746 }
747 ti += 1;
748 } else if !is_null {
749 pos = skip_cell(data, pos, version)?;
750 }
751 }
752
753 Ok(())
754}
755
756#[derive(Debug, Clone, Copy)]
757pub enum RawColumn<'a> {
758 Null,
759 Integer(i64),
760 Real(f64),
761 Boolean(bool),
762 Text(&'a str),
763 Blob(&'a [u8]),
764 Time(i64),
765 Date(i32),
766 Timestamp(i64),
767 Interval { months: i32, days: i32, micros: i64 },
768 Json(&'a str),
769 Jsonb(&'a [u8]),
770}
771
772impl<'a> RawColumn<'a> {
773 pub fn to_value(self) -> Value {
774 match self {
775 RawColumn::Null => Value::Null,
776 RawColumn::Integer(i) => Value::Integer(i),
777 RawColumn::Real(r) => Value::Real(r),
778 RawColumn::Boolean(b) => Value::Boolean(b),
779 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
780 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
781 RawColumn::Time(t) => Value::Time(t),
782 RawColumn::Date(d) => Value::Date(d),
783 RawColumn::Timestamp(t) => Value::Timestamp(t),
784 RawColumn::Interval {
785 months,
786 days,
787 micros,
788 } => Value::Interval {
789 months,
790 days,
791 micros,
792 },
793 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
794 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
795 }
796 }
797
798 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
799 use std::cmp::Ordering;
800 match (self, other) {
801 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
802 (RawColumn::Null, _) | (_, Value::Null) => None,
803 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
804 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
805 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
806 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
807 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
808 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
809 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
810 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
811 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
812 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
813 (
814 RawColumn::Interval {
815 months: am,
816 days: ad,
817 micros: au,
818 },
819 Value::Interval {
820 months: bm,
821 days: bd,
822 micros: bu,
823 },
824 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
825 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
826 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
827 _ => None,
828 }
829 }
830
831 pub fn eq_value(&self, other: &Value) -> bool {
832 match (self, other) {
833 (RawColumn::Null, Value::Null) => true,
834 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
835 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
836 (RawColumn::Real(a), Value::Real(b)) => a == b,
837 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
838 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
839 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
840 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
841 (RawColumn::Time(a), Value::Time(b)) => a == b,
842 (RawColumn::Date(a), Value::Date(b)) => a == b,
843 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
844 (
845 RawColumn::Interval {
846 months: am,
847 days: ad,
848 micros: au,
849 },
850 Value::Interval {
851 months: bm,
852 days: bd,
853 micros: bu,
854 },
855 ) => am == bm && ad == bd && au == bu,
856 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
857 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
858 _ => false,
859 }
860 }
861
862 pub fn as_f64(&self) -> Option<f64> {
863 match self {
864 RawColumn::Integer(i) => Some(*i as f64),
865 RawColumn::Real(r) => Some(*r),
866 _ => None,
867 }
868 }
869
870 pub fn as_i64(&self) -> Option<i64> {
871 match self {
872 RawColumn::Integer(i) => Some(*i),
873 RawColumn::Time(t) => Some(*t),
874 RawColumn::Date(d) => Some(*d as i64),
875 RawColumn::Timestamp(t) => Some(*t),
876 _ => None,
877 }
878 }
879}
880
881fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
882 match DataType::from_tag(type_tag) {
883 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
884 data[..8].try_into().unwrap(),
885 ))),
886 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
887 data[..8].try_into().unwrap(),
888 ))),
889 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
890 Some(DataType::Text) => {
891 let s = std::str::from_utf8(data)
892 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
893 Ok(RawColumn::Text(s))
894 }
895 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
896 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
897 data[..8].try_into().unwrap(),
898 ))),
899 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
900 data[..4].try_into().unwrap(),
901 ))),
902 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
903 data[..8].try_into().unwrap(),
904 ))),
905 Some(DataType::Interval) => {
906 if data.len() < 16 {
907 return Err(SqlError::InvalidValue("truncated interval".into()));
908 }
909 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
910 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
911 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
912 Ok(RawColumn::Interval {
913 months,
914 days,
915 micros,
916 })
917 }
918 Some(DataType::Json) => {
919 let s = std::str::from_utf8(data)
920 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
921 Ok(RawColumn::Json(s))
922 }
923 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
924 _ => Err(SqlError::InvalidValue(format!(
925 "unknown column type tag: {type_tag}"
926 ))),
927 }
928}
929
930pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
932 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
933 if target >= col_count || new_val.is_null() {
934 return Ok(false);
935 }
936 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
937 if was_null {
938 return Ok(false);
939 }
940 for col in 0..target {
941 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
942 if !is_null {
943 pos = skip_cell(data, pos, version)?;
944 }
945 }
946 let type_tag = data[pos];
947 let (old_data_len, val_start) = match version {
948 RowVersion::V2 => match fixed_width_size(type_tag) {
949 Some(n) => (n, pos + 1),
950 None => {
951 if pos + 5 > data.len() {
952 return Err(SqlError::InvalidValue("truncated column data".into()));
953 }
954 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
955 (len, pos + 5)
956 }
957 },
958 RowVersion::V1 => {
959 if pos + 5 > data.len() {
960 return Err(SqlError::InvalidValue("truncated column data".into()));
961 }
962 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
963 (len, pos + 5)
964 }
965 };
966 let new_data_len = match new_val {
967 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
968 Value::Date(_) => 4,
969 Value::Interval { .. } => 16,
970 Value::Boolean(_) => 1,
971 Value::Text(s) => s.len(),
972 Value::Blob(b) => b.len(),
973 Value::Json(s) => s.len(),
974 Value::Jsonb(b) => b.len(),
975 Value::Null => return Ok(false),
976 };
977 if new_data_len != old_data_len {
978 return Ok(false);
979 }
980 data[pos] = new_val.data_type().type_tag();
981 match new_val {
982 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
983 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
984 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
985 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
986 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
987 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
988 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
989 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
990 Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
991 Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
992 Value::Interval {
993 months,
994 days,
995 micros,
996 } => {
997 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
998 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
999 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1000 }
1001 Value::Null => unreachable!(),
1002 }
1003 Ok(true)
1004}
1005
1006pub fn patch_row_column(
1008 data: &[u8],
1009 target: usize,
1010 new_val: &Value,
1011 out: &mut Vec<u8>,
1012) -> Result<()> {
1013 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1014
1015 let new_col_count = if target >= col_count {
1016 target + 1
1017 } else {
1018 col_count
1019 };
1020 let new_bitmap_bytes = new_col_count.div_ceil(8);
1021 let bitmap_bytes = col_count.div_ceil(8);
1022 out.clear();
1023
1024 let header = (new_col_count as u16) | V2_FLAG;
1025 out.extend_from_slice(&header.to_le_bytes());
1026 let bitmap_start = out.len();
1027 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1028 for _ in bitmap_bytes..new_bitmap_bytes {
1029 out.push(0xFF);
1030 }
1031 if new_val.is_null() {
1032 out[bitmap_start + target / 8] |= 1 << (target % 8);
1033 } else {
1034 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1035 }
1036
1037 let mut pos = header_end;
1038 for col in 0..new_col_count {
1039 let was_null = if col < col_count {
1040 bitmap[col / 8] & (1 << (col % 8)) != 0
1041 } else {
1042 true
1043 };
1044
1045 if col == target {
1046 if !was_null {
1047 pos = skip_cell(data, pos, version)?;
1048 }
1049 if !new_val.is_null() {
1050 encode_cell_v2(new_val, out);
1051 }
1052 } else if !was_null {
1053 pos = copy_cell_to_v2(data, pos, version, out)?;
1054 }
1055 }
1056 Ok(())
1057}
1058
1059pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1060 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1061 if target >= col_count {
1062 return Ok(RawColumn::Null);
1063 }
1064
1065 for col in 0..=target {
1066 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1067
1068 if col == target {
1069 if is_null {
1070 return Ok(RawColumn::Null);
1071 }
1072 let (type_tag, body, _) = read_cell(data, pos, version)?;
1073 return decode_value_raw(type_tag, body);
1074 } else if !is_null {
1075 pos = skip_cell(data, pos, version)?;
1076 }
1077 }
1078
1079 unreachable!()
1080}
1081
1082pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1084 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1085 if target >= col_count {
1086 return Ok((RawColumn::Null, usize::MAX));
1087 }
1088
1089 for col in 0..=target {
1090 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1091
1092 if col == target {
1093 if is_null {
1094 return Ok((RawColumn::Null, usize::MAX));
1095 }
1096 let tag_offset = pos;
1097 let (type_tag, body, _) = read_cell(data, pos, version)?;
1098 let raw = decode_value_raw(type_tag, body)?;
1099 return Ok((raw, tag_offset));
1100 } else if !is_null {
1101 pos = skip_cell(data, pos, version)?;
1102 }
1103 }
1104
1105 unreachable!()
1106}
1107
1108pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1110 if offset == usize::MAX || new_val.is_null() {
1111 return Ok(false);
1112 }
1113 if data.len() < 2 || offset >= data.len() {
1114 return Err(SqlError::InvalidValue("truncated column data".into()));
1115 }
1116 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1117 RowVersion::V2
1118 } else {
1119 RowVersion::V1
1120 };
1121 let type_tag = data[offset];
1122 let (old_data_len, val_start) = match version {
1123 RowVersion::V2 => match fixed_width_size(type_tag) {
1124 Some(n) => (n, offset + 1),
1125 None => {
1126 if offset + 5 > data.len() {
1127 return Err(SqlError::InvalidValue("truncated column data".into()));
1128 }
1129 let len =
1130 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1131 (len, offset + 5)
1132 }
1133 },
1134 RowVersion::V1 => {
1135 if offset + 5 > data.len() {
1136 return Err(SqlError::InvalidValue("truncated column data".into()));
1137 }
1138 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1139 (len, offset + 5)
1140 }
1141 };
1142 let new_data_len = match new_val {
1143 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1144 Value::Date(_) => 4,
1145 Value::Interval { .. } => 16,
1146 Value::Boolean(_) => 1,
1147 Value::Text(s) => s.len(),
1148 Value::Blob(b) => b.len(),
1149 Value::Json(s) => s.len(),
1150 Value::Jsonb(b) => b.len(),
1151 Value::Null => return Ok(false),
1152 };
1153 if new_data_len != old_data_len {
1154 return Ok(false);
1155 }
1156 data[offset] = new_val.data_type().type_tag();
1157 match new_val {
1158 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1159 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1160 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1161 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1162 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1163 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1164 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1165 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1166 Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1167 Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1168 Value::Interval {
1169 months,
1170 days,
1171 micros,
1172 } => {
1173 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1174 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1175 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1176 }
1177 Value::Null => unreachable!(),
1178 }
1179 Ok(true)
1180}
1181
1182pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1183 if key.is_empty() || key[0] != TAG_INTEGER {
1184 return Err(SqlError::InvalidValue("not an integer key".into()));
1185 }
1186 let (val, _) = decode_integer(&key[1..])?;
1187 match val {
1188 Value::Integer(i) => Ok(i),
1189 _ => unreachable!(),
1190 }
1191}
1192
1193#[cfg(test)]
1194#[path = "encoding_tests.rs"]
1195mod tests;