1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6pub(crate) const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24pub fn encode_key_value(value: &Value) -> Vec<u8> {
26 let mut buf = Vec::with_capacity(16);
27 encode_key_value_into(value, &mut buf);
28 buf
29}
30
31pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33 let mut buf = Vec::new();
34 for v in values {
35 buf.extend_from_slice(&encode_key_value(v));
36 }
37 buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41 buf.clear();
42 for v in values {
43 encode_key_value_into(v, buf);
44 }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48 buf.clear();
49 for &i in indices {
50 encode_key_value_into(&row[i as usize], buf);
51 }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56 buf.clear();
57 encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61 value: &Value,
62 coll: crate::types::Collation,
63 buf: &mut Vec<u8>,
64) {
65 match (value, coll) {
66 (Value::Text(s), crate::types::Collation::NoCase) => {
67 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68 }
69 (Value::Text(s), crate::types::Collation::Rtrim) => {
70 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71 }
72 _ => encode_key_value_into(value, buf),
73 }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77 match value {
78 Value::Null => buf.push(TAG_NULL),
79 Value::Boolean(b) => {
80 buf.push(TAG_BOOLEAN);
81 buf.push(if *b { 0x01 } else { 0x00 });
82 }
83 Value::Integer(i) => encode_integer_into(*i, buf),
84 Value::Real(r) => encode_real_into(*r, buf),
85 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90 Value::Interval {
91 months,
92 days,
93 micros,
94 } => {
95 buf.push(TAG_INTERVAL);
97 let mut mb = months.to_be_bytes();
98 mb[0] ^= 0x80;
99 buf.extend_from_slice(&mb);
100 let mut db = days.to_be_bytes();
101 db[0] ^= 0x80;
102 buf.extend_from_slice(&db);
103 let mut ub = micros.to_be_bytes();
104 ub[0] ^= 0x80;
105 buf.extend_from_slice(&ub);
106 }
107 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109 Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110 Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111 Value::Array(a) => encode_array_into(a, buf),
112 Value::Vector(v) => encode_vector_into(v, buf),
113 }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117 buf.push(TAG_VECTOR);
118 let mut inner = Vec::with_capacity(2 + v.len() * 4);
119 inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120 for &x in v {
121 inner.extend_from_slice(&x.to_le_bytes());
122 }
123 encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127 buf.push(TAG_ARRAY);
128 let mut inner = Vec::new();
129 for v in elems {
130 encode_key_value_into(v, &mut inner);
131 }
132 encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136 for &b in data {
137 if b == 0x00 {
138 buf.push(0x00);
139 buf.push(0xFF);
140 } else {
141 buf.push(b);
142 }
143 }
144 buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148 encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157 buf.push(tag);
158 if val == 0 {
159 buf.push(0x80);
160 return;
161 }
162 if val > 0 {
163 let bytes = val.to_be_bytes();
164 let start = bytes.iter().position(|&b| b != 0).unwrap();
165 let byte_count = (8 - start) as u8;
166 buf.push(0x80 + byte_count);
167 buf.extend_from_slice(&bytes[start..]);
168 } else {
169 let abs_val = if val == i64::MIN {
170 u64::MAX / 2 + 1
171 } else {
172 (-val) as u64
173 };
174 let bytes = abs_val.to_be_bytes();
175 let start = bytes.iter().position(|&b| b != 0).unwrap();
176 let byte_count = (8 - start) as u8;
177 buf.push(0x80 - byte_count);
178 for &b in &bytes[start..] {
179 buf.push(!b);
180 }
181 }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185 buf.push(TAG_REAL);
186 let bits = val.to_bits();
187 let encoded = if val.is_sign_negative() {
188 !bits
189 } else {
190 bits ^ (1u64 << 63)
191 };
192 buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196 buf.push(tag);
197 for &b in data {
198 if b == 0x00 {
199 buf.push(0x00);
200 buf.push(0xFF);
201 } else {
202 buf.push(b);
203 }
204 }
205 buf.push(0x00);
206}
207
208pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210 if data.is_empty() {
211 return Err(SqlError::InvalidValue("empty key data".into()));
212 }
213 match data[0] {
214 TAG_NULL => Ok((Value::Null, 1)),
215 TAG_BOOLEAN => {
216 if data.len() < 2 {
217 return Err(SqlError::InvalidValue("truncated boolean".into()));
218 }
219 Ok((Value::Boolean(data[1] != 0), 2))
220 }
221 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226 (Value::Date(d), n + 1)
227 }),
228 TAG_TIMESTAMP => {
229 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230 }
231 TAG_INTERVAL => {
232 if data.len() < 1 + 16 {
233 return Err(SqlError::InvalidValue("truncated interval".into()));
234 }
235 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236 mb[0] ^= 0x80;
237 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238 db[0] ^= 0x80;
239 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240 ub[0] ^= 0x80;
241 Ok((
242 Value::Interval {
243 months: i32::from_be_bytes(mb),
244 days: i32::from_be_bytes(db),
245 micros: i64::from_be_bytes(ub),
246 },
247 17,
248 ))
249 }
250 TAG_TEXT => {
251 let (bytes, n) = decode_null_escaped(&data[1..])?;
252 let s = String::from_utf8(bytes)
253 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254 Ok((Value::Text(CompactString::from(s)), n + 1))
255 }
256 TAG_BLOB => {
257 let (bytes, n) = decode_null_escaped(&data[1..])?;
258 Ok((Value::Blob(bytes), n + 1))
259 }
260 TAG_JSON => {
261 let (bytes, n) = decode_null_escaped(&data[1..])?;
262 let s = String::from_utf8(bytes)
263 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264 Ok((Value::Json(CompactString::from(s)), n + 1))
265 }
266 TAG_JSONB => {
267 let (bytes, n) = decode_null_escaped(&data[1..])?;
268 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269 }
270 TAG_TSVECTOR => {
271 let (bytes, n) = decode_null_escaped(&data[1..])?;
272 Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273 }
274 TAG_TSQUERY => {
275 let (bytes, n) = decode_null_escaped(&data[1..])?;
276 Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277 }
278 TAG_ARRAY => {
279 let (inner, n) = decode_null_escaped(&data[1..])?;
280 let mut elems = Vec::new();
281 let mut pos = 0;
282 while pos < inner.len() {
283 let (v, vlen) = decode_key_value(&inner[pos..])?;
284 elems.push(v);
285 pos += vlen;
286 }
287 Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288 }
289 TAG_VECTOR => {
290 let (inner, n) = decode_null_escaped(&data[1..])?;
291 if inner.len() < 2 {
292 return Err(SqlError::InvalidValue("truncated vector key".into()));
293 }
294 let dim = u16::from_le_bytes([inner[0], inner[1]]) as usize;
295 if inner.len() != 2 + dim * 4 {
296 return Err(SqlError::InvalidValue("truncated vector key".into()));
297 }
298 let elems: Vec<f32> = inner[2..]
299 .chunks_exact(4)
300 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
301 .collect();
302 Ok((Value::Vector(std::sync::Arc::from(elems)), n + 1))
303 }
304 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
305 }
306}
307
308pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
310 let mut values = Vec::with_capacity(count);
311 let mut pos = 0;
312 for _ in 0..count {
313 let (v, n) = decode_key_value(&data[pos..])?;
314 values.push(v);
315 pos += n;
316 }
317 Ok(values)
318}
319
320fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
321 let (v, n) = decode_signed_varint(data)?;
322 Ok((Value::Integer(v), n))
323}
324
325pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
327 if data.is_empty() {
328 return Err(SqlError::InvalidValue("truncated integer".into()));
329 }
330 let marker = data[0];
331 if marker == 0x80 {
332 return Ok((0, 1));
333 }
334 if marker > 0x80 {
335 let byte_count = (marker - 0x80) as usize;
336 if data.len() < 1 + byte_count {
337 return Err(SqlError::InvalidValue("truncated positive integer".into()));
338 }
339 let mut bytes = [0u8; 8];
340 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
341 let val = i64::from_be_bytes(bytes);
342 Ok((val, 1 + byte_count))
343 } else {
344 let byte_count = (0x80 - marker) as usize;
345 if data.len() < 1 + byte_count {
346 return Err(SqlError::InvalidValue("truncated negative integer".into()));
347 }
348 let mut bytes = [0u8; 8];
349 for i in 0..byte_count {
350 bytes[8 - byte_count + i] = !data[1 + i];
351 }
352 let abs_val = u64::from_be_bytes(bytes);
353 let val = (-(abs_val as i128)) as i64;
354 Ok((val, 1 + byte_count))
355 }
356}
357
358fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
359 if data.len() < 8 {
360 return Err(SqlError::InvalidValue("truncated real".into()));
361 }
362 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
363 let bits = if encoded & (1u64 << 63) != 0 {
364 encoded ^ (1u64 << 63)
366 } else {
367 !encoded
369 };
370 let val = f64::from_bits(bits);
371 Ok((Value::Real(val), 8))
372}
373
374pub(crate) fn skip_key_value(data: &[u8]) -> Result<usize> {
377 if data.is_empty() {
378 return Err(SqlError::InvalidValue("empty key data".into()));
379 }
380 match data[0] {
381 TAG_NULL => Ok(1),
382 TAG_BOOLEAN => Ok(2),
383 TAG_INTEGER => decode_integer(&data[1..]).map(|(_, n)| n + 1),
384 TAG_REAL => decode_real(&data[1..]).map(|(_, n)| n + 1),
385 TAG_TIME | TAG_DATE | TAG_TIMESTAMP => decode_signed_varint(&data[1..]).map(|(_, n)| n + 1),
386 TAG_INTERVAL => Ok(17),
387 _ => skip_null_escaped(&data[1..]).map(|n| n + 1),
389 }
390}
391
392fn skip_null_escaped(data: &[u8]) -> Result<usize> {
393 let mut i = 0;
394 while i < data.len() {
395 if data[i] == 0x00 {
396 if i + 1 < data.len() && data[i + 1] == 0xFF {
397 i += 2;
398 } else {
399 return Ok(i + 1);
400 }
401 } else {
402 i += 1;
403 }
404 }
405 Err(SqlError::InvalidValue(
406 "unterminated null-escaped string".into(),
407 ))
408}
409
410fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
411 let mut result = Vec::new();
412 let mut i = 0;
413 while i < data.len() {
414 if data[i] == 0x00 {
415 if i + 1 < data.len() && data[i + 1] == 0xFF {
416 result.push(0x00);
417 i += 2;
418 } else {
419 return Ok((result, i + 1)); }
421 } else {
422 result.push(data[i]);
423 i += 1;
424 }
425 }
426 Err(SqlError::InvalidValue(
427 "unterminated null-escaped string".into(),
428 ))
429}
430
431fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
432 match v {
433 Value::Integer(val) => {
434 buf.push(DataType::Integer.type_tag());
435 buf.extend_from_slice(&val.to_le_bytes());
436 }
437 Value::Real(r) => {
438 buf.push(DataType::Real.type_tag());
439 buf.extend_from_slice(&r.to_le_bytes());
440 }
441 Value::Boolean(b) => {
442 buf.push(DataType::Boolean.type_tag());
443 buf.push(if *b { 1 } else { 0 });
444 }
445 Value::Text(s) => {
446 let bytes = s.as_bytes();
447 buf.push(DataType::Text.type_tag());
448 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
449 buf.extend_from_slice(bytes);
450 }
451 Value::Blob(data) => {
452 buf.push(DataType::Blob.type_tag());
453 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
454 buf.extend_from_slice(data);
455 }
456 Value::Time(t) => {
457 buf.push(DataType::Time.type_tag());
458 buf.extend_from_slice(&t.to_le_bytes());
459 }
460 Value::Date(d) => {
461 buf.push(DataType::Date.type_tag());
462 buf.extend_from_slice(&d.to_le_bytes());
463 }
464 Value::Timestamp(t) => {
465 buf.push(DataType::Timestamp.type_tag());
466 buf.extend_from_slice(&t.to_le_bytes());
467 }
468 Value::Interval {
469 months,
470 days,
471 micros,
472 } => {
473 buf.push(DataType::Interval.type_tag());
474 buf.extend_from_slice(&months.to_le_bytes());
475 buf.extend_from_slice(&days.to_le_bytes());
476 buf.extend_from_slice(µs.to_le_bytes());
477 }
478 Value::Json(s) => {
479 let bytes = s.as_bytes();
480 buf.push(DataType::Json.type_tag());
481 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
482 buf.extend_from_slice(bytes);
483 }
484 Value::Jsonb(b) => {
485 buf.push(DataType::Jsonb.type_tag());
486 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
487 buf.extend_from_slice(b);
488 }
489 Value::TsVector(b) => {
490 buf.push(DataType::TsVector.type_tag());
491 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
492 buf.extend_from_slice(b);
493 }
494 Value::TsQuery(b) => {
495 buf.push(DataType::TsQuery.type_tag());
496 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
497 buf.extend_from_slice(b);
498 }
499 Value::Array(a) => {
500 buf.push(DataType::Array.type_tag());
501 let len = encoded_array_v2_size(a);
502 buf.extend_from_slice(&(len as u32).to_le_bytes());
503 let start = buf.len();
504 buf.resize(start + len, 0);
505 write_array_v2_into_slice(a, &mut buf[start..start + len]);
506 }
507 Value::Vector(v) => {
508 buf.push(
509 DataType::Vector {
510 dim: v.len() as u16,
511 }
512 .type_tag(),
513 );
514 let len = 2 + v.len() * 4;
515 buf.extend_from_slice(&(len as u32).to_le_bytes());
516 buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
517 for &x in v.iter() {
518 buf.extend_from_slice(&x.to_le_bytes());
519 }
520 }
521 Value::Null => unreachable!(),
522 }
523}
524
525pub fn encode_row(values: &[Value]) -> Vec<u8> {
526 let mut buf = Vec::new();
527 encode_row_into(values, &mut buf);
528 buf
529}
530
531pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
532 buf.clear();
533 let col_count = values.len();
534 let bitmap_bytes = col_count.div_ceil(8);
535
536 let header = (col_count as u16) | V2_FLAG;
537 buf.extend_from_slice(&header.to_le_bytes());
538
539 let bitmap_start = buf.len();
540 buf.resize(buf.len() + bitmap_bytes, 0);
541
542 for (i, v) in values.iter().enumerate() {
543 if v.is_null() {
544 buf[bitmap_start + i / 8] |= 1 << (i % 8);
545 continue;
546 }
547 encode_cell_v2(v, buf);
548 }
549}
550
551pub enum TemplateSlot {
553 Null,
554 IntHole,
555 Const(Value),
556}
557
558pub struct RowTemplate {
559 pub template: Vec<u8>,
560 pub slot_offsets: Vec<(usize, usize)>,
562}
563
564pub fn build_row_template(phys_count: usize, slots: &[TemplateSlot]) -> RowTemplate {
565 let bitmap_bytes = phys_count.div_ceil(8);
566 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
567 let header = (phys_count as u16) | V2_FLAG;
568 template.extend_from_slice(&header.to_le_bytes());
569 let bitmap_start = template.len();
570 template.resize(bitmap_start + bitmap_bytes, 0);
571 let mut slot_offsets = Vec::new();
572 let set_null = |template: &mut [u8], slot: usize| {
573 template[bitmap_start + slot / 8] |= 1 << (slot % 8);
574 };
575 for (slot, kind) in slots.iter().enumerate() {
576 match kind {
577 TemplateSlot::Null => set_null(&mut template, slot),
578 TemplateSlot::IntHole => {
579 template.push(DataType::Integer.type_tag());
580 let value_offset = template.len();
581 template.extend_from_slice(&[0u8; 8]);
582 slot_offsets.push((slot, value_offset));
583 }
584 TemplateSlot::Const(v) if v.is_null() => set_null(&mut template, slot),
585 TemplateSlot::Const(v) => encode_cell_v2(v, &mut template),
586 }
587 }
588 RowTemplate {
589 template,
590 slot_offsets,
591 }
592}
593
594#[inline]
595pub fn encode_row_with_template(
596 tmpl: &RowTemplate,
597 values: &[Value],
598 buf: &mut Vec<u8>,
599) -> Result<()> {
600 if tmpl
602 .slot_offsets
603 .iter()
604 .any(|&(slot, _)| values[slot].is_null())
605 {
606 encode_row_into(values, buf);
607 return Ok(());
608 }
609 buf.clear();
610 buf.extend_from_slice(&tmpl.template);
611 for &(slot, off) in &tmpl.slot_offsets {
612 match &values[slot] {
613 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
614 other => {
615 return Err(SqlError::TypeMismatch {
616 expected: "Integer".into(),
617 got: other.data_type().to_string(),
618 });
619 }
620 }
621 }
622 Ok(())
623}
624
625fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
626 match DataType::from_tag(type_tag) {
627 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
628 data[..8].try_into().unwrap(),
629 ))),
630 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
631 data[..8].try_into().unwrap(),
632 ))),
633 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
634 Some(DataType::Text) => {
635 let s = std::str::from_utf8(data)
636 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
637 Ok(Value::Text(CompactString::from(s)))
638 }
639 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
640 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
641 data[..8].try_into().unwrap(),
642 ))),
643 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
644 data[..4].try_into().unwrap(),
645 ))),
646 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
647 data[..8].try_into().unwrap(),
648 ))),
649 Some(DataType::Interval) => {
650 if data.len() < 16 {
651 return Err(SqlError::InvalidValue("truncated interval".into()));
652 }
653 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
654 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
655 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
656 Ok(Value::Interval {
657 months,
658 days,
659 micros,
660 })
661 }
662 Some(DataType::Json) => {
663 let s = std::str::from_utf8(data)
664 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
665 Ok(Value::Json(CompactString::from(s)))
666 }
667 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
668 Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
669 Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
670 Some(DataType::Array) => decode_array_v2(data),
671 Some(DataType::Vector { .. }) => decode_vector(data),
672 _ => Err(SqlError::InvalidValue(format!(
673 "unknown column type tag: {type_tag}"
674 ))),
675 }
676}
677
678fn decode_vector(data: &[u8]) -> Result<Value> {
679 if data.len() < 2 {
680 return Err(SqlError::InvalidValue("truncated vector".into()));
681 }
682 let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
683 if data.len() < 2 + dim * 4 {
684 return Err(SqlError::InvalidValue("truncated vector payload".into()));
685 }
686 let mut v = Vec::with_capacity(dim);
687 for i in 0..dim {
688 let off = 2 + i * 4;
689 v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
690 }
691 Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
692}
693
694fn encoded_array_v2_size(elems: &[Value]) -> usize {
695 let mut total = 4;
696 for elem in elems {
697 if elem.is_null() {
698 total += 1;
699 continue;
700 }
701 total += 1 + 1;
702 let tag = elem.data_type().type_tag();
703 match fixed_width_size(tag) {
704 Some(n) => total += n,
705 None => total += 4 + variable_cell_payload_size(elem),
706 }
707 }
708 total
709}
710
711fn variable_cell_payload_size(v: &Value) -> usize {
712 match v {
713 Value::Text(s) => s.len(),
714 Value::Blob(b) => b.len(),
715 Value::Json(s) => s.len(),
716 Value::Jsonb(b) => b.len(),
717 Value::TsVector(b) => b.len(),
718 Value::TsQuery(b) => b.len(),
719 Value::Array(a) => encoded_array_v2_size(a),
720 Value::Vector(v) => 2 + v.len() * 4,
721 _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
722 }
723}
724
725fn value_encoded_size_v2(v: &Value) -> Option<usize> {
726 if v.is_null() {
727 return None;
728 }
729 Some(match fixed_width_size(v.data_type().type_tag()) {
730 Some(n) => n,
731 None => variable_cell_payload_size(v),
732 })
733}
734
735fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
736 match v {
737 Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
738 Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
739 Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
740 Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
741 Value::Blob(b) => out[..b.len()].copy_from_slice(b),
742 Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
743 Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
744 Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
745 Value::Interval {
746 months,
747 days,
748 micros,
749 } => {
750 out[..4].copy_from_slice(&months.to_le_bytes());
751 out[4..8].copy_from_slice(&days.to_le_bytes());
752 out[8..16].copy_from_slice(µs.to_le_bytes());
753 }
754 Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
755 Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
756 Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
757 Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
758 Value::Array(a) => write_array_v2_into_slice(a, out),
759 Value::Vector(v) => {
760 out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
761 let mut pos = 2;
762 for &x in v.iter() {
763 out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
764 pos += 4;
765 }
766 }
767 Value::Null => unreachable!(),
768 }
769}
770
771fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
772 out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
773 let mut pos = 4;
774 for elem in elems {
775 if elem.is_null() {
776 out[pos] = 0xFF;
777 pos += 1;
778 continue;
779 }
780 out[pos] = 0x00;
781 pos += 1;
782 let tag = elem.data_type().type_tag();
783 out[pos] = tag;
784 pos += 1;
785 match fixed_width_size(tag) {
786 Some(n) => {
787 write_value_payload_v2(elem, &mut out[pos..pos + n]);
788 pos += n;
789 }
790 None => {
791 let payload_len = variable_cell_payload_size(elem);
792 out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
793 pos += 4;
794 write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
795 pos += payload_len;
796 }
797 }
798 }
799}
800
801fn decode_array_v2(data: &[u8]) -> Result<Value> {
802 if data.len() < 4 {
803 return Err(SqlError::InvalidValue("truncated array length".into()));
804 }
805 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
806 let mut pos = 4;
807 let mut elems = Vec::with_capacity(count);
808 for _ in 0..count {
809 if pos >= data.len() {
810 return Err(SqlError::InvalidValue("truncated array elements".into()));
811 }
812 if data[pos] == 0xFF {
813 elems.push(Value::Null);
814 pos += 1;
815 continue;
816 }
817 if data[pos] != 0x00 {
818 return Err(SqlError::InvalidValue(
819 "invalid array element marker".into(),
820 ));
821 }
822 pos += 1;
823 if pos >= data.len() {
824 return Err(SqlError::InvalidValue("truncated array element".into()));
825 }
826 let type_tag = data[pos];
827 pos += 1;
828 let (val, advance) = match fixed_width_size(type_tag) {
829 Some(n) => {
830 if pos + n > data.len() {
831 return Err(SqlError::InvalidValue(
832 "truncated fixed-width array element".into(),
833 ));
834 }
835 let v = decode_value(type_tag, &data[pos..pos + n])?;
836 (v, n)
837 }
838 None => {
839 if pos + 4 > data.len() {
840 return Err(SqlError::InvalidValue(
841 "truncated array element length".into(),
842 ));
843 }
844 let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
845 pos += 4;
846 if pos + len > data.len() {
847 return Err(SqlError::InvalidValue(
848 "truncated variable-width array element".into(),
849 ));
850 }
851 let v = decode_value(type_tag, &data[pos..pos + len])?;
852 (v, len)
853 }
854 };
855 pos += advance;
856 elems.push(val);
857 }
858 Ok(Value::Array(std::sync::Arc::new(elems)))
859}
860
861#[derive(Clone, Copy, PartialEq, Eq, Debug)]
864pub(crate) enum RowVersion {
865 V1,
866 V2,
867}
868
869pub(crate) const V2_FLAG: u16 = 0x8000;
870pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
871
872#[inline]
873pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
874 match DataType::from_tag(type_tag)? {
875 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
876 DataType::Date => Some(4),
877 DataType::Boolean => Some(1),
878 DataType::Interval => Some(16),
879 DataType::Text
880 | DataType::Blob
881 | DataType::Json
882 | DataType::Jsonb
883 | DataType::TsVector
884 | DataType::TsQuery
885 | DataType::Array
886 | DataType::Vector { .. }
887 | DataType::Null => None,
888 }
889}
890
891#[inline]
894fn cell_extent(
895 data: &[u8],
896 type_tag: u8,
897 after_tag: usize,
898 version: RowVersion,
899) -> Result<(usize, usize)> {
900 let fixed = match version {
901 RowVersion::V2 => fixed_width_size(type_tag),
902 RowVersion::V1 => None,
903 };
904 if let Some(n) = fixed {
905 return Ok((n, after_tag));
906 }
907 if after_tag + 4 > data.len() {
908 return Err(SqlError::InvalidValue("truncated column data".into()));
909 }
910 let len = u32::from_le_bytes([
911 data[after_tag],
912 data[after_tag + 1],
913 data[after_tag + 2],
914 data[after_tag + 3],
915 ]) as usize;
916 Ok((len, after_tag + 4))
917}
918
919#[inline]
920fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
921 if pos >= data.len() {
922 return Err(SqlError::InvalidValue("truncated column data".into()));
923 }
924 let type_tag = data[pos];
925 let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
926 if body_pos + data_len > data.len() {
927 return Err(SqlError::InvalidValue("truncated column value".into()));
928 }
929 Ok((
930 type_tag,
931 &data[body_pos..body_pos + data_len],
932 body_pos + data_len,
933 ))
934}
935
936#[inline]
938fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
939 if pos >= data.len() {
940 return Err(SqlError::InvalidValue("truncated column data".into()));
941 }
942 let type_tag = data[pos];
943 let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
944 Ok(body_pos + data_len)
945}
946
947fn copy_cell_to_v2(
948 data: &[u8],
949 pos: usize,
950 version: RowVersion,
951 out: &mut Vec<u8>,
952) -> Result<usize> {
953 let (tag, body, next) = read_cell(data, pos, version)?;
954 out.push(tag);
955 if fixed_width_size(tag).is_none() {
956 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
957 }
958 out.extend_from_slice(body);
959 Ok(next)
960}
961
962fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
963 if data.len() < 2 {
964 return Err(SqlError::InvalidValue("row data too short".into()));
965 }
966 let raw = u16::from_le_bytes([data[0], data[1]]);
967 let version = if raw & V2_FLAG != 0 {
968 RowVersion::V2
969 } else {
970 RowVersion::V1
971 };
972 let col_count = (raw & COL_COUNT_MASK) as usize;
973 let bitmap_bytes = col_count.div_ceil(8);
974 let pos = 2;
975 if data.len() < pos + bitmap_bytes {
976 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
977 }
978 Ok((
979 version,
980 col_count,
981 &data[pos..pos + bitmap_bytes],
982 pos + bitmap_bytes,
983 ))
984}
985
986pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
987 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
988
989 let mut values = Vec::with_capacity(col_count);
990 for i in 0..col_count {
991 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
992 values.push(Value::Null);
993 continue;
994 }
995 let (type_tag, body, next) = read_cell(data, pos, version)?;
996 values.push(decode_value(type_tag, body)?);
997 pos = next;
998 }
999
1000 Ok(values)
1001}
1002
1003pub(crate) fn decode_row_push(data: &[u8], expected: usize, out: &mut Vec<Value>) -> Result<bool> {
1006 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1007 if col_count != expected {
1008 return Ok(false);
1009 }
1010 for col in 0..col_count {
1011 if bitmap[col / 8] & (1 << (col % 8)) != 0 {
1012 out.push(Value::Null);
1013 } else {
1014 let (type_tag, body, next) = read_cell(data, pos, version)?;
1015 out.push(decode_value(type_tag, body)?);
1016 pos = next;
1017 }
1018 }
1019 Ok(true)
1020}
1021
1022#[inline]
1024pub fn row_non_pk_count(data: &[u8]) -> usize {
1025 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
1026}
1027
1028pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
1029 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1030
1031 for i in 0..col_count {
1032 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
1033 continue;
1034 }
1035 let (type_tag, body, next) = read_cell(data, pos, version)?;
1036 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
1037 out[col_mapping[i]] = decode_value(type_tag, body)?;
1038 }
1039 pos = next;
1040 }
1041
1042 Ok(())
1043}
1044
1045pub fn decode_pk_into(
1046 key: &[u8],
1047 count: usize,
1048 out: &mut [Value],
1049 pk_mapping: &[usize],
1050) -> Result<()> {
1051 let mut pos = 0;
1052 for i in 0..count {
1053 let (v, n) = decode_key_value(&key[pos..])?;
1054 if i < pk_mapping.len() {
1055 out[pk_mapping[i]] = v;
1056 }
1057 pos += n;
1058 }
1059 Ok(())
1060}
1061
1062pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
1063 if targets.is_empty() {
1064 return Ok(Vec::new());
1065 }
1066 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1067
1068 let mut results = Vec::with_capacity(targets.len());
1069 let mut ti = 0;
1070
1071 for col in 0..col_count {
1072 if ti >= targets.len() {
1073 break;
1074 }
1075 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1076
1077 if col == targets[ti] {
1078 if is_null {
1079 results.push(Value::Null);
1080 } else {
1081 let (type_tag, body, next) = read_cell(data, pos, version)?;
1082 results.push(decode_value(type_tag, body)?);
1083 pos = next;
1084 }
1085 ti += 1;
1086 } else if !is_null {
1087 pos = skip_cell(data, pos, version)?;
1088 }
1089 }
1090
1091 while ti < targets.len() {
1092 results.push(Value::Null);
1093 ti += 1;
1094 }
1095
1096 Ok(results)
1097}
1098
1099pub fn decode_columns_into(
1100 data: &[u8],
1101 targets: &[usize],
1102 schema_cols: &[usize],
1103 row: &mut [Value],
1104) -> Result<()> {
1105 if targets.is_empty() {
1106 return Ok(());
1107 }
1108 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1109
1110 let mut ti = 0;
1111 for col in 0..col_count {
1112 if ti >= targets.len() {
1113 break;
1114 }
1115 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1116
1117 if col == targets[ti] {
1118 if is_null {
1119 row[schema_cols[ti]] = Value::Null;
1120 } else {
1121 let (type_tag, body, next) = read_cell(data, pos, version)?;
1122 row[schema_cols[ti]] = decode_value(type_tag, body)?;
1123 pos = next;
1124 }
1125 ti += 1;
1126 } else if !is_null {
1127 pos = skip_cell(data, pos, version)?;
1128 }
1129 }
1130
1131 Ok(())
1132}
1133
1134struct OffsetTarget {
1135 cell_pos: usize,
1136 tag: u8,
1137 fixed_width: Option<usize>,
1138 out_pos: usize,
1139}
1140
1141pub(crate) struct ProjectedOffsetPlan {
1144 expected_header: u16,
1145 body_start: usize,
1146 nonnull_mask: Vec<u8>,
1147 targets: Vec<OffsetTarget>,
1148}
1149
1150impl ProjectedOffsetPlan {
1151 pub(crate) fn build(phys_tags: &[u8], targets: &[(usize, usize)]) -> Option<Self> {
1154 if targets.is_empty() {
1155 return None;
1156 }
1157 let max_t = targets.iter().map(|&(p, _)| p).max()?;
1158 let mut offsets = Vec::with_capacity(max_t + 1);
1159 let mut acc = 0usize;
1160 for (i, &tag) in phys_tags.iter().enumerate().take(max_t + 1) {
1161 offsets.push(acc);
1162 if i < max_t {
1163 acc += 1 + fixed_width_size(tag)?;
1164 }
1165 }
1166 let mut out_targets = Vec::with_capacity(targets.len());
1167 for &(p, out_pos) in targets {
1168 let tag = phys_tags[p];
1169 out_targets.push(OffsetTarget {
1170 cell_pos: offsets[p],
1171 tag,
1172 fixed_width: fixed_width_size(tag),
1173 out_pos,
1174 });
1175 }
1176 let phys_count = phys_tags.len();
1177 let bitmap_bytes = phys_count.div_ceil(8);
1178 let mut nonnull_mask = vec![0u8; bitmap_bytes];
1179 for bit in 0..=max_t {
1180 nonnull_mask[bit / 8] |= 1 << (bit % 8);
1181 }
1182 Some(Self {
1183 expected_header: V2_FLAG | (phys_count as u16),
1184 body_start: 2 + bitmap_bytes,
1185 nonnull_mask,
1186 targets: out_targets,
1187 })
1188 }
1189
1190 #[inline]
1192 fn layout_ok(&self, data: &[u8]) -> bool {
1193 if data.len() < self.body_start
1194 || u16::from_le_bytes([data[0], data[1]]) != self.expected_header
1195 {
1196 return false;
1197 }
1198 self.nonnull_mask
1199 .iter()
1200 .enumerate()
1201 .all(|(i, &m)| data[2 + i] & m == 0)
1202 }
1203
1204 #[inline]
1206 fn read_target(&self, data: &[u8], t: &OffsetTarget) -> Result<Option<Value>> {
1207 let pos = self.body_start + t.cell_pos;
1208 if data.get(pos) != Some(&t.tag) {
1209 return Ok(None);
1210 }
1211 let after_tag = pos + 1;
1212 let (len, body_pos) = match t.fixed_width {
1213 Some(n) => (n, after_tag),
1214 None => match data.get(after_tag..after_tag + 4) {
1215 Some(lb) => (
1216 u32::from_le_bytes(lb.try_into().unwrap()) as usize,
1217 after_tag + 4,
1218 ),
1219 None => return Ok(None),
1220 },
1221 };
1222 match data.get(body_pos..body_pos + len) {
1223 Some(body) => Ok(Some(decode_value(t.tag, body)?)),
1224 None => Ok(None),
1225 }
1226 }
1227
1228 pub(crate) fn decode_into(&self, data: &[u8], row: &mut [Value]) -> Result<bool> {
1230 if !self.layout_ok(data) {
1231 return Ok(false);
1232 }
1233 for t in &self.targets {
1234 match self.read_target(data, t)? {
1235 Some(v) => row[t.out_pos] = v,
1236 None => return Ok(false),
1237 }
1238 }
1239 Ok(true)
1240 }
1241
1242 pub(crate) fn decode_push(&self, data: &[u8], out: &mut Vec<Value>) -> Result<bool> {
1245 if !self.layout_ok(data) {
1246 return Ok(false);
1247 }
1248 for t in &self.targets {
1249 match self.read_target(data, t)? {
1250 Some(v) => out.push(v),
1251 None => return Ok(false),
1252 }
1253 }
1254 Ok(true)
1255 }
1256}
1257
1258#[derive(Debug, Clone, Copy)]
1259pub enum RawColumn<'a> {
1260 Null,
1261 Integer(i64),
1262 Real(f64),
1263 Boolean(bool),
1264 Text(&'a str),
1265 Blob(&'a [u8]),
1266 Time(i64),
1267 Date(i32),
1268 Timestamp(i64),
1269 Interval { months: i32, days: i32, micros: i64 },
1270 Json(&'a str),
1271 Jsonb(&'a [u8]),
1272 TsVector(&'a [u8]),
1273 TsQuery(&'a [u8]),
1274 Array(&'a [u8]),
1275 Vector(&'a [u8]),
1276}
1277
1278impl<'a> RawColumn<'a> {
1279 pub fn to_value(self) -> Value {
1280 match self {
1281 RawColumn::Null => Value::Null,
1282 RawColumn::Integer(i) => Value::Integer(i),
1283 RawColumn::Real(r) => Value::Real(r),
1284 RawColumn::Boolean(b) => Value::Boolean(b),
1285 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1286 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1287 RawColumn::Time(t) => Value::Time(t),
1288 RawColumn::Date(d) => Value::Date(d),
1289 RawColumn::Timestamp(t) => Value::Timestamp(t),
1290 RawColumn::Interval {
1291 months,
1292 days,
1293 micros,
1294 } => Value::Interval {
1295 months,
1296 days,
1297 micros,
1298 },
1299 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1300 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1301 RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1302 RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1303 RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1304 RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1305 }
1306 }
1307
1308 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1309 use std::cmp::Ordering;
1310 match (self, other) {
1311 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1312 (RawColumn::Null, _) | (_, Value::Null) => None,
1313 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1314 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1315 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1316 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1317 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1318 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1319 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1320 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1321 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1322 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1323 (
1324 RawColumn::Interval {
1325 months: am,
1326 days: ad,
1327 micros: au,
1328 },
1329 Value::Interval {
1330 months: bm,
1331 days: bd,
1332 micros: bu,
1333 },
1334 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1335 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1336 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1337 (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1338 (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1339 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1340 Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1341 _ => None,
1342 },
1343 _ => None,
1344 }
1345 }
1346
1347 pub fn eq_value(&self, other: &Value) -> bool {
1348 match (self, other) {
1349 (RawColumn::Null, Value::Null) => true,
1350 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1351 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1352 (RawColumn::Real(a), Value::Real(b)) => a == b,
1353 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1354 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1355 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1356 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1357 (RawColumn::Time(a), Value::Time(b)) => a == b,
1358 (RawColumn::Date(a), Value::Date(b)) => a == b,
1359 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1360 (
1361 RawColumn::Interval {
1362 months: am,
1363 days: ad,
1364 micros: au,
1365 },
1366 Value::Interval {
1367 months: bm,
1368 days: bd,
1369 micros: bu,
1370 },
1371 ) => am == bm && ad == bd && au == bu,
1372 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1373 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1374 (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1375 (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1376 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1377 Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1378 _ => false,
1379 },
1380 _ => false,
1381 }
1382 }
1383
1384 pub fn as_f64(&self) -> Option<f64> {
1385 match self {
1386 RawColumn::Integer(i) => Some(*i as f64),
1387 RawColumn::Real(r) => Some(*r),
1388 _ => None,
1389 }
1390 }
1391
1392 pub fn as_i64(&self) -> Option<i64> {
1393 match self {
1394 RawColumn::Integer(i) => Some(*i),
1395 RawColumn::Time(t) => Some(*t),
1396 RawColumn::Date(d) => Some(*d as i64),
1397 RawColumn::Timestamp(t) => Some(*t),
1398 _ => None,
1399 }
1400 }
1401}
1402
1403fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1404 match DataType::from_tag(type_tag) {
1405 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1406 data[..8].try_into().unwrap(),
1407 ))),
1408 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1409 data[..8].try_into().unwrap(),
1410 ))),
1411 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1412 Some(DataType::Text) => {
1413 let s = std::str::from_utf8(data)
1414 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1415 Ok(RawColumn::Text(s))
1416 }
1417 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1418 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1419 data[..8].try_into().unwrap(),
1420 ))),
1421 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1422 data[..4].try_into().unwrap(),
1423 ))),
1424 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1425 data[..8].try_into().unwrap(),
1426 ))),
1427 Some(DataType::Interval) => {
1428 if data.len() < 16 {
1429 return Err(SqlError::InvalidValue("truncated interval".into()));
1430 }
1431 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1432 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1433 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1434 Ok(RawColumn::Interval {
1435 months,
1436 days,
1437 micros,
1438 })
1439 }
1440 Some(DataType::Json) => {
1441 let s = std::str::from_utf8(data)
1442 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1443 Ok(RawColumn::Json(s))
1444 }
1445 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1446 Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1447 Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1448 Some(DataType::Array) => Ok(RawColumn::Array(data)),
1449 Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1450 _ => Err(SqlError::InvalidValue(format!(
1451 "unknown column type tag: {type_tag}"
1452 ))),
1453 }
1454}
1455
1456pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1458 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1459 if target >= col_count || new_val.is_null() {
1460 return Ok(false);
1461 }
1462 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1463 if was_null {
1464 return Ok(false);
1465 }
1466 for col in 0..target {
1467 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1468 if !is_null {
1469 pos = skip_cell(data, pos, version)?;
1470 }
1471 }
1472 if pos >= data.len() {
1473 return Err(SqlError::InvalidValue("truncated column data".into()));
1474 }
1475 let type_tag = data[pos];
1476 let (old_data_len, val_start) = match version {
1477 RowVersion::V2 => match fixed_width_size(type_tag) {
1478 Some(n) => (n, pos + 1),
1479 None => {
1480 if pos + 5 > data.len() {
1481 return Err(SqlError::InvalidValue("truncated column data".into()));
1482 }
1483 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1484 (len, pos + 5)
1485 }
1486 },
1487 RowVersion::V1 => {
1488 if pos + 5 > data.len() {
1489 return Err(SqlError::InvalidValue("truncated column data".into()));
1490 }
1491 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1492 (len, pos + 5)
1493 }
1494 };
1495 let new_data_len = match value_encoded_size_v2(new_val) {
1496 Some(n) => n,
1497 None => return Ok(false),
1498 };
1499 if new_data_len != old_data_len {
1500 return Ok(false);
1501 }
1502 data[pos] = new_val.data_type().type_tag();
1503 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1504 Ok(true)
1505}
1506
1507pub fn patch_row_column(
1509 data: &[u8],
1510 target: usize,
1511 new_val: &Value,
1512 out: &mut Vec<u8>,
1513) -> Result<()> {
1514 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1515
1516 let new_col_count = if target >= col_count {
1517 target + 1
1518 } else {
1519 col_count
1520 };
1521 let new_bitmap_bytes = new_col_count.div_ceil(8);
1522 let bitmap_bytes = col_count.div_ceil(8);
1523 out.clear();
1524
1525 let header = (new_col_count as u16) | V2_FLAG;
1526 out.extend_from_slice(&header.to_le_bytes());
1527 let bitmap_start = out.len();
1528 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1529 for _ in bitmap_bytes..new_bitmap_bytes {
1530 out.push(0xFF);
1531 }
1532 if new_val.is_null() {
1533 out[bitmap_start + target / 8] |= 1 << (target % 8);
1534 } else {
1535 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1536 }
1537
1538 let mut pos = header_end;
1539 for col in 0..new_col_count {
1540 let was_null = if col < col_count {
1541 bitmap[col / 8] & (1 << (col % 8)) != 0
1542 } else {
1543 true
1544 };
1545
1546 if col == target {
1547 if !was_null {
1548 pos = skip_cell(data, pos, version)?;
1549 }
1550 if !new_val.is_null() {
1551 encode_cell_v2(new_val, out);
1552 }
1553 } else if !was_null {
1554 pos = copy_cell_to_v2(data, pos, version, out)?;
1555 }
1556 }
1557 Ok(())
1558}
1559
1560pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1561 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1562 if target >= col_count {
1563 return Ok(RawColumn::Null);
1564 }
1565
1566 for col in 0..=target {
1567 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1568
1569 if col == target {
1570 if is_null {
1571 return Ok(RawColumn::Null);
1572 }
1573 let (type_tag, body, _) = read_cell(data, pos, version)?;
1574 return decode_value_raw(type_tag, body);
1575 } else if !is_null {
1576 pos = skip_cell(data, pos, version)?;
1577 }
1578 }
1579
1580 unreachable!()
1581}
1582
1583pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1585 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1586 if target >= col_count {
1587 return Ok((RawColumn::Null, usize::MAX));
1588 }
1589
1590 for col in 0..=target {
1591 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1592
1593 if col == target {
1594 if is_null {
1595 return Ok((RawColumn::Null, usize::MAX));
1596 }
1597 let tag_offset = pos;
1598 let (type_tag, body, _) = read_cell(data, pos, version)?;
1599 let raw = decode_value_raw(type_tag, body)?;
1600 return Ok((raw, tag_offset));
1601 } else if !is_null {
1602 pos = skip_cell(data, pos, version)?;
1603 }
1604 }
1605
1606 unreachable!()
1607}
1608
1609pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1611 if offset == usize::MAX || new_val.is_null() {
1612 return Ok(false);
1613 }
1614 if data.len() < 2 || offset >= data.len() {
1615 return Err(SqlError::InvalidValue("truncated column data".into()));
1616 }
1617 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1618 RowVersion::V2
1619 } else {
1620 RowVersion::V1
1621 };
1622 let type_tag = data[offset];
1623 let (old_data_len, val_start) = match version {
1624 RowVersion::V2 => match fixed_width_size(type_tag) {
1625 Some(n) => (n, offset + 1),
1626 None => {
1627 if offset + 5 > data.len() {
1628 return Err(SqlError::InvalidValue("truncated column data".into()));
1629 }
1630 let len =
1631 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1632 (len, offset + 5)
1633 }
1634 },
1635 RowVersion::V1 => {
1636 if offset + 5 > data.len() {
1637 return Err(SqlError::InvalidValue("truncated column data".into()));
1638 }
1639 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1640 (len, offset + 5)
1641 }
1642 };
1643 let new_data_len = match value_encoded_size_v2(new_val) {
1644 Some(n) => n,
1645 None => return Ok(false),
1646 };
1647 if new_data_len != old_data_len {
1648 return Ok(false);
1649 }
1650 data[offset] = new_val.data_type().type_tag();
1651 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1652 Ok(true)
1653}
1654
1655pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1656 if key.is_empty() || key[0] != TAG_INTEGER {
1657 return Err(SqlError::InvalidValue("not an integer key".into()));
1658 }
1659 let (val, _) = decode_signed_varint(&key[1..])?;
1660 Ok(val)
1661}
1662
1663#[cfg(test)]
1664#[path = "encoding_tests.rs"]
1665mod tests;