1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24pub fn encode_key_value(value: &Value) -> Vec<u8> {
26 let mut buf = Vec::with_capacity(16);
27 encode_key_value_into(value, &mut buf);
28 buf
29}
30
31pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33 let mut buf = Vec::new();
34 for v in values {
35 buf.extend_from_slice(&encode_key_value(v));
36 }
37 buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41 buf.clear();
42 for v in values {
43 encode_key_value_into(v, buf);
44 }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48 buf.clear();
49 for &i in indices {
50 encode_key_value_into(&row[i as usize], buf);
51 }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56 buf.clear();
57 encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61 value: &Value,
62 coll: crate::types::Collation,
63 buf: &mut Vec<u8>,
64) {
65 match (value, coll) {
66 (Value::Text(s), crate::types::Collation::NoCase) => {
67 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68 }
69 (Value::Text(s), crate::types::Collation::Rtrim) => {
70 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71 }
72 _ => encode_key_value_into(value, buf),
73 }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77 match value {
78 Value::Null => buf.push(TAG_NULL),
79 Value::Boolean(b) => {
80 buf.push(TAG_BOOLEAN);
81 buf.push(if *b { 0x01 } else { 0x00 });
82 }
83 Value::Integer(i) => encode_integer_into(*i, buf),
84 Value::Real(r) => encode_real_into(*r, buf),
85 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90 Value::Interval {
91 months,
92 days,
93 micros,
94 } => {
95 buf.push(TAG_INTERVAL);
97 let mut mb = months.to_be_bytes();
98 mb[0] ^= 0x80;
99 buf.extend_from_slice(&mb);
100 let mut db = days.to_be_bytes();
101 db[0] ^= 0x80;
102 buf.extend_from_slice(&db);
103 let mut ub = micros.to_be_bytes();
104 ub[0] ^= 0x80;
105 buf.extend_from_slice(&ub);
106 }
107 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109 Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110 Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111 Value::Array(a) => encode_array_into(a, buf),
112 Value::Vector(v) => encode_vector_into(v, buf),
113 }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117 buf.push(TAG_VECTOR);
118 let mut inner = Vec::with_capacity(2 + v.len() * 4);
119 inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120 for &x in v {
121 inner.extend_from_slice(&x.to_le_bytes());
122 }
123 encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127 buf.push(TAG_ARRAY);
128 let mut inner = Vec::new();
129 for v in elems {
130 encode_key_value_into(v, &mut inner);
131 }
132 encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136 for &b in data {
137 if b == 0x00 {
138 buf.push(0x00);
139 buf.push(0xFF);
140 } else {
141 buf.push(b);
142 }
143 }
144 buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148 encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157 buf.push(tag);
158 if val == 0 {
159 buf.push(0x80);
160 return;
161 }
162 if val > 0 {
163 let bytes = val.to_be_bytes();
164 let start = bytes.iter().position(|&b| b != 0).unwrap();
165 let byte_count = (8 - start) as u8;
166 buf.push(0x80 + byte_count);
167 buf.extend_from_slice(&bytes[start..]);
168 } else {
169 let abs_val = if val == i64::MIN {
170 u64::MAX / 2 + 1
171 } else {
172 (-val) as u64
173 };
174 let bytes = abs_val.to_be_bytes();
175 let start = bytes.iter().position(|&b| b != 0).unwrap();
176 let byte_count = (8 - start) as u8;
177 buf.push(0x80 - byte_count);
178 for &b in &bytes[start..] {
179 buf.push(!b);
180 }
181 }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185 buf.push(TAG_REAL);
186 let bits = val.to_bits();
187 let encoded = if val.is_sign_negative() {
188 !bits
189 } else {
190 bits ^ (1u64 << 63)
191 };
192 buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196 buf.push(tag);
197 for &b in data {
198 if b == 0x00 {
199 buf.push(0x00);
200 buf.push(0xFF);
201 } else {
202 buf.push(b);
203 }
204 }
205 buf.push(0x00);
206}
207
208pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210 if data.is_empty() {
211 return Err(SqlError::InvalidValue("empty key data".into()));
212 }
213 match data[0] {
214 TAG_NULL => Ok((Value::Null, 1)),
215 TAG_BOOLEAN => {
216 if data.len() < 2 {
217 return Err(SqlError::InvalidValue("truncated boolean".into()));
218 }
219 Ok((Value::Boolean(data[1] != 0), 2))
220 }
221 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226 (Value::Date(d), n + 1)
227 }),
228 TAG_TIMESTAMP => {
229 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230 }
231 TAG_INTERVAL => {
232 if data.len() < 1 + 16 {
233 return Err(SqlError::InvalidValue("truncated interval".into()));
234 }
235 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236 mb[0] ^= 0x80;
237 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238 db[0] ^= 0x80;
239 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240 ub[0] ^= 0x80;
241 Ok((
242 Value::Interval {
243 months: i32::from_be_bytes(mb),
244 days: i32::from_be_bytes(db),
245 micros: i64::from_be_bytes(ub),
246 },
247 17,
248 ))
249 }
250 TAG_TEXT => {
251 let (bytes, n) = decode_null_escaped(&data[1..])?;
252 let s = String::from_utf8(bytes)
253 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254 Ok((Value::Text(CompactString::from(s)), n + 1))
255 }
256 TAG_BLOB => {
257 let (bytes, n) = decode_null_escaped(&data[1..])?;
258 Ok((Value::Blob(bytes), n + 1))
259 }
260 TAG_JSON => {
261 let (bytes, n) = decode_null_escaped(&data[1..])?;
262 let s = String::from_utf8(bytes)
263 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264 Ok((Value::Json(CompactString::from(s)), n + 1))
265 }
266 TAG_JSONB => {
267 let (bytes, n) = decode_null_escaped(&data[1..])?;
268 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269 }
270 TAG_TSVECTOR => {
271 let (bytes, n) = decode_null_escaped(&data[1..])?;
272 Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273 }
274 TAG_TSQUERY => {
275 let (bytes, n) = decode_null_escaped(&data[1..])?;
276 Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277 }
278 TAG_ARRAY => {
279 let (inner, n) = decode_null_escaped(&data[1..])?;
280 let mut elems = Vec::new();
281 let mut pos = 0;
282 while pos < inner.len() {
283 let (v, vlen) = decode_key_value(&inner[pos..])?;
284 elems.push(v);
285 pos += vlen;
286 }
287 Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288 }
289 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
290 }
291}
292
293pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
295 let mut values = Vec::with_capacity(count);
296 let mut pos = 0;
297 for _ in 0..count {
298 let (v, n) = decode_key_value(&data[pos..])?;
299 values.push(v);
300 pos += n;
301 }
302 Ok(values)
303}
304
305fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
306 let (v, n) = decode_signed_varint(data)?;
307 Ok((Value::Integer(v), n))
308}
309
310pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
312 if data.is_empty() {
313 return Err(SqlError::InvalidValue("truncated integer".into()));
314 }
315 let marker = data[0];
316 if marker == 0x80 {
317 return Ok((0, 1));
318 }
319 if marker > 0x80 {
320 let byte_count = (marker - 0x80) as usize;
321 if data.len() < 1 + byte_count {
322 return Err(SqlError::InvalidValue("truncated positive integer".into()));
323 }
324 let mut bytes = [0u8; 8];
325 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
326 let val = i64::from_be_bytes(bytes);
327 Ok((val, 1 + byte_count))
328 } else {
329 let byte_count = (0x80 - marker) as usize;
330 if data.len() < 1 + byte_count {
331 return Err(SqlError::InvalidValue("truncated negative integer".into()));
332 }
333 let mut bytes = [0u8; 8];
334 for i in 0..byte_count {
335 bytes[8 - byte_count + i] = !data[1 + i];
336 }
337 let abs_val = u64::from_be_bytes(bytes);
338 let val = (-(abs_val as i128)) as i64;
339 Ok((val, 1 + byte_count))
340 }
341}
342
343fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
344 if data.len() < 8 {
345 return Err(SqlError::InvalidValue("truncated real".into()));
346 }
347 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
348 let bits = if encoded & (1u64 << 63) != 0 {
349 encoded ^ (1u64 << 63)
351 } else {
352 !encoded
354 };
355 let val = f64::from_bits(bits);
356 Ok((Value::Real(val), 8))
357}
358
359fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
361 let mut result = Vec::new();
362 let mut i = 0;
363 while i < data.len() {
364 if data[i] == 0x00 {
365 if i + 1 < data.len() && data[i + 1] == 0xFF {
366 result.push(0x00);
367 i += 2;
368 } else {
369 return Ok((result, i + 1)); }
371 } else {
372 result.push(data[i]);
373 i += 1;
374 }
375 }
376 Err(SqlError::InvalidValue(
377 "unterminated null-escaped string".into(),
378 ))
379}
380
381fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
382 match v {
383 Value::Integer(val) => {
384 buf.push(DataType::Integer.type_tag());
385 buf.extend_from_slice(&val.to_le_bytes());
386 }
387 Value::Real(r) => {
388 buf.push(DataType::Real.type_tag());
389 buf.extend_from_slice(&r.to_le_bytes());
390 }
391 Value::Boolean(b) => {
392 buf.push(DataType::Boolean.type_tag());
393 buf.push(if *b { 1 } else { 0 });
394 }
395 Value::Text(s) => {
396 let bytes = s.as_bytes();
397 buf.push(DataType::Text.type_tag());
398 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
399 buf.extend_from_slice(bytes);
400 }
401 Value::Blob(data) => {
402 buf.push(DataType::Blob.type_tag());
403 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
404 buf.extend_from_slice(data);
405 }
406 Value::Time(t) => {
407 buf.push(DataType::Time.type_tag());
408 buf.extend_from_slice(&t.to_le_bytes());
409 }
410 Value::Date(d) => {
411 buf.push(DataType::Date.type_tag());
412 buf.extend_from_slice(&d.to_le_bytes());
413 }
414 Value::Timestamp(t) => {
415 buf.push(DataType::Timestamp.type_tag());
416 buf.extend_from_slice(&t.to_le_bytes());
417 }
418 Value::Interval {
419 months,
420 days,
421 micros,
422 } => {
423 buf.push(DataType::Interval.type_tag());
424 buf.extend_from_slice(&months.to_le_bytes());
425 buf.extend_from_slice(&days.to_le_bytes());
426 buf.extend_from_slice(µs.to_le_bytes());
427 }
428 Value::Json(s) => {
429 let bytes = s.as_bytes();
430 buf.push(DataType::Json.type_tag());
431 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
432 buf.extend_from_slice(bytes);
433 }
434 Value::Jsonb(b) => {
435 buf.push(DataType::Jsonb.type_tag());
436 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
437 buf.extend_from_slice(b);
438 }
439 Value::TsVector(b) => {
440 buf.push(DataType::TsVector.type_tag());
441 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
442 buf.extend_from_slice(b);
443 }
444 Value::TsQuery(b) => {
445 buf.push(DataType::TsQuery.type_tag());
446 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
447 buf.extend_from_slice(b);
448 }
449 Value::Array(a) => {
450 buf.push(DataType::Array.type_tag());
451 let len = encoded_array_v2_size(a);
452 buf.extend_from_slice(&(len as u32).to_le_bytes());
453 let start = buf.len();
454 buf.resize(start + len, 0);
455 write_array_v2_into_slice(a, &mut buf[start..start + len]);
456 }
457 Value::Vector(v) => {
458 buf.push(
459 DataType::Vector {
460 dim: v.len() as u16,
461 }
462 .type_tag(),
463 );
464 let len = 2 + v.len() * 4;
465 buf.extend_from_slice(&(len as u32).to_le_bytes());
466 buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
467 for &x in v.iter() {
468 buf.extend_from_slice(&x.to_le_bytes());
469 }
470 }
471 Value::Null => unreachable!(),
472 }
473}
474
475pub fn encode_row(values: &[Value]) -> Vec<u8> {
476 let mut buf = Vec::new();
477 encode_row_into(values, &mut buf);
478 buf
479}
480
481pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
482 buf.clear();
483 let col_count = values.len();
484 let bitmap_bytes = col_count.div_ceil(8);
485
486 let header = (col_count as u16) | V2_FLAG;
487 buf.extend_from_slice(&header.to_le_bytes());
488
489 let bitmap_start = buf.len();
490 buf.resize(buf.len() + bitmap_bytes, 0);
491
492 for (i, v) in values.iter().enumerate() {
493 if v.is_null() {
494 buf[bitmap_start + i / 8] |= 1 << (i % 8);
495 continue;
496 }
497 encode_cell_v2(v, buf);
498 }
499}
500
501pub enum TemplateSlot {
503 Null,
504 IntHole,
505 Const(Value),
506}
507
508pub struct RowTemplate {
509 pub template: Vec<u8>,
510 pub slot_offsets: Vec<(usize, usize)>,
512}
513
514pub fn build_row_template(phys_count: usize, slots: &[TemplateSlot]) -> RowTemplate {
515 let bitmap_bytes = phys_count.div_ceil(8);
516 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
517 let header = (phys_count as u16) | V2_FLAG;
518 template.extend_from_slice(&header.to_le_bytes());
519 let bitmap_start = template.len();
520 template.resize(bitmap_start + bitmap_bytes, 0);
521 let mut slot_offsets = Vec::new();
522 let set_null = |template: &mut [u8], slot: usize| {
523 template[bitmap_start + slot / 8] |= 1 << (slot % 8);
524 };
525 for (slot, kind) in slots.iter().enumerate() {
526 match kind {
527 TemplateSlot::Null => set_null(&mut template, slot),
528 TemplateSlot::IntHole => {
529 template.push(DataType::Integer.type_tag());
530 let value_offset = template.len();
531 template.extend_from_slice(&[0u8; 8]);
532 slot_offsets.push((slot, value_offset));
533 }
534 TemplateSlot::Const(v) if v.is_null() => set_null(&mut template, slot),
535 TemplateSlot::Const(v) => encode_cell_v2(v, &mut template),
536 }
537 }
538 RowTemplate {
539 template,
540 slot_offsets,
541 }
542}
543
544#[inline]
546pub fn encode_row_with_template(
547 tmpl: &RowTemplate,
548 values: &[Value],
549 buf: &mut Vec<u8>,
550) -> Result<()> {
551 buf.clear();
552 buf.extend_from_slice(&tmpl.template);
553 for &(slot, off) in &tmpl.slot_offsets {
554 match &values[slot] {
555 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
556 other => {
557 return Err(SqlError::TypeMismatch {
558 expected: "Integer".into(),
559 got: other.data_type().to_string(),
560 });
561 }
562 }
563 }
564 Ok(())
565}
566
567fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
568 match DataType::from_tag(type_tag) {
569 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
570 data[..8].try_into().unwrap(),
571 ))),
572 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
573 data[..8].try_into().unwrap(),
574 ))),
575 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
576 Some(DataType::Text) => {
577 let s = std::str::from_utf8(data)
578 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
579 Ok(Value::Text(CompactString::from(s)))
580 }
581 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
582 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
583 data[..8].try_into().unwrap(),
584 ))),
585 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
586 data[..4].try_into().unwrap(),
587 ))),
588 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
589 data[..8].try_into().unwrap(),
590 ))),
591 Some(DataType::Interval) => {
592 if data.len() < 16 {
593 return Err(SqlError::InvalidValue("truncated interval".into()));
594 }
595 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
596 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
597 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
598 Ok(Value::Interval {
599 months,
600 days,
601 micros,
602 })
603 }
604 Some(DataType::Json) => {
605 let s = std::str::from_utf8(data)
606 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
607 Ok(Value::Json(CompactString::from(s)))
608 }
609 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
610 Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
611 Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
612 Some(DataType::Array) => decode_array_v2(data),
613 Some(DataType::Vector { .. }) => decode_vector(data),
614 _ => Err(SqlError::InvalidValue(format!(
615 "unknown column type tag: {type_tag}"
616 ))),
617 }
618}
619
620fn decode_vector(data: &[u8]) -> Result<Value> {
621 if data.len() < 2 {
622 return Err(SqlError::InvalidValue("truncated vector".into()));
623 }
624 let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
625 if data.len() < 2 + dim * 4 {
626 return Err(SqlError::InvalidValue("truncated vector payload".into()));
627 }
628 let mut v = Vec::with_capacity(dim);
629 for i in 0..dim {
630 let off = 2 + i * 4;
631 v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
632 }
633 Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
634}
635
636fn encoded_array_v2_size(elems: &[Value]) -> usize {
637 let mut total = 4;
638 for elem in elems {
639 if elem.is_null() {
640 total += 1;
641 continue;
642 }
643 total += 1 + 1;
644 let tag = elem.data_type().type_tag();
645 match fixed_width_size(tag) {
646 Some(n) => total += n,
647 None => total += 4 + variable_cell_payload_size(elem),
648 }
649 }
650 total
651}
652
653fn variable_cell_payload_size(v: &Value) -> usize {
654 match v {
655 Value::Text(s) => s.len(),
656 Value::Blob(b) => b.len(),
657 Value::Json(s) => s.len(),
658 Value::Jsonb(b) => b.len(),
659 Value::TsVector(b) => b.len(),
660 Value::TsQuery(b) => b.len(),
661 Value::Array(a) => encoded_array_v2_size(a),
662 Value::Vector(v) => 2 + v.len() * 4,
663 _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
664 }
665}
666
667fn value_encoded_size_v2(v: &Value) -> Option<usize> {
668 if v.is_null() {
669 return None;
670 }
671 Some(match fixed_width_size(v.data_type().type_tag()) {
672 Some(n) => n,
673 None => variable_cell_payload_size(v),
674 })
675}
676
677fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
678 match v {
679 Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
680 Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
681 Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
682 Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
683 Value::Blob(b) => out[..b.len()].copy_from_slice(b),
684 Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
685 Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
686 Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
687 Value::Interval {
688 months,
689 days,
690 micros,
691 } => {
692 out[..4].copy_from_slice(&months.to_le_bytes());
693 out[4..8].copy_from_slice(&days.to_le_bytes());
694 out[8..16].copy_from_slice(µs.to_le_bytes());
695 }
696 Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
697 Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
698 Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
699 Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
700 Value::Array(a) => write_array_v2_into_slice(a, out),
701 Value::Vector(v) => {
702 out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
703 let mut pos = 2;
704 for &x in v.iter() {
705 out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
706 pos += 4;
707 }
708 }
709 Value::Null => unreachable!(),
710 }
711}
712
713fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
714 out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
715 let mut pos = 4;
716 for elem in elems {
717 if elem.is_null() {
718 out[pos] = 0xFF;
719 pos += 1;
720 continue;
721 }
722 out[pos] = 0x00;
723 pos += 1;
724 let tag = elem.data_type().type_tag();
725 out[pos] = tag;
726 pos += 1;
727 match fixed_width_size(tag) {
728 Some(n) => {
729 write_value_payload_v2(elem, &mut out[pos..pos + n]);
730 pos += n;
731 }
732 None => {
733 let payload_len = variable_cell_payload_size(elem);
734 out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
735 pos += 4;
736 write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
737 pos += payload_len;
738 }
739 }
740 }
741}
742
743fn decode_array_v2(data: &[u8]) -> Result<Value> {
744 if data.len() < 4 {
745 return Err(SqlError::InvalidValue("truncated array length".into()));
746 }
747 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
748 let mut pos = 4;
749 let mut elems = Vec::with_capacity(count);
750 for _ in 0..count {
751 if pos >= data.len() {
752 return Err(SqlError::InvalidValue("truncated array elements".into()));
753 }
754 if data[pos] == 0xFF {
755 elems.push(Value::Null);
756 pos += 1;
757 continue;
758 }
759 if data[pos] != 0x00 {
760 return Err(SqlError::InvalidValue(
761 "invalid array element marker".into(),
762 ));
763 }
764 pos += 1;
765 if pos >= data.len() {
766 return Err(SqlError::InvalidValue("truncated array element".into()));
767 }
768 let type_tag = data[pos];
769 pos += 1;
770 let (val, advance) = match fixed_width_size(type_tag) {
771 Some(n) => {
772 if pos + n > data.len() {
773 return Err(SqlError::InvalidValue(
774 "truncated fixed-width array element".into(),
775 ));
776 }
777 let v = decode_value(type_tag, &data[pos..pos + n])?;
778 (v, n)
779 }
780 None => {
781 if pos + 4 > data.len() {
782 return Err(SqlError::InvalidValue(
783 "truncated array element length".into(),
784 ));
785 }
786 let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
787 pos += 4;
788 if pos + len > data.len() {
789 return Err(SqlError::InvalidValue(
790 "truncated variable-width array element".into(),
791 ));
792 }
793 let v = decode_value(type_tag, &data[pos..pos + len])?;
794 (v, len)
795 }
796 };
797 pos += advance;
798 elems.push(val);
799 }
800 Ok(Value::Array(std::sync::Arc::new(elems)))
801}
802
803#[derive(Clone, Copy, PartialEq, Eq, Debug)]
806pub(crate) enum RowVersion {
807 V1,
808 V2,
809}
810
811pub(crate) const V2_FLAG: u16 = 0x8000;
812pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
813
814#[inline]
815pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
816 match DataType::from_tag(type_tag)? {
817 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
818 DataType::Date => Some(4),
819 DataType::Boolean => Some(1),
820 DataType::Interval => Some(16),
821 DataType::Text
822 | DataType::Blob
823 | DataType::Json
824 | DataType::Jsonb
825 | DataType::TsVector
826 | DataType::TsQuery
827 | DataType::Array
828 | DataType::Vector { .. }
829 | DataType::Null => None,
830 }
831}
832
833#[inline]
836fn cell_extent(
837 data: &[u8],
838 type_tag: u8,
839 after_tag: usize,
840 version: RowVersion,
841) -> Result<(usize, usize)> {
842 let fixed = match version {
843 RowVersion::V2 => fixed_width_size(type_tag),
844 RowVersion::V1 => None,
845 };
846 if let Some(n) = fixed {
847 return Ok((n, after_tag));
848 }
849 if after_tag + 4 > data.len() {
850 return Err(SqlError::InvalidValue("truncated column data".into()));
851 }
852 let len = u32::from_le_bytes([
853 data[after_tag],
854 data[after_tag + 1],
855 data[after_tag + 2],
856 data[after_tag + 3],
857 ]) as usize;
858 Ok((len, after_tag + 4))
859}
860
861#[inline]
862fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
863 if pos >= data.len() {
864 return Err(SqlError::InvalidValue("truncated column data".into()));
865 }
866 let type_tag = data[pos];
867 let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
868 if body_pos + data_len > data.len() {
869 return Err(SqlError::InvalidValue("truncated column value".into()));
870 }
871 Ok((
872 type_tag,
873 &data[body_pos..body_pos + data_len],
874 body_pos + data_len,
875 ))
876}
877
878#[inline]
880fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
881 if pos >= data.len() {
882 return Err(SqlError::InvalidValue("truncated column data".into()));
883 }
884 let type_tag = data[pos];
885 let (data_len, body_pos) = cell_extent(data, type_tag, pos + 1, version)?;
886 Ok(body_pos + data_len)
887}
888
889fn copy_cell_to_v2(
890 data: &[u8],
891 pos: usize,
892 version: RowVersion,
893 out: &mut Vec<u8>,
894) -> Result<usize> {
895 let (tag, body, next) = read_cell(data, pos, version)?;
896 out.push(tag);
897 if fixed_width_size(tag).is_none() {
898 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
899 }
900 out.extend_from_slice(body);
901 Ok(next)
902}
903
904fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
905 if data.len() < 2 {
906 return Err(SqlError::InvalidValue("row data too short".into()));
907 }
908 let raw = u16::from_le_bytes([data[0], data[1]]);
909 let version = if raw & V2_FLAG != 0 {
910 RowVersion::V2
911 } else {
912 RowVersion::V1
913 };
914 let col_count = (raw & COL_COUNT_MASK) as usize;
915 let bitmap_bytes = col_count.div_ceil(8);
916 let pos = 2;
917 if data.len() < pos + bitmap_bytes {
918 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
919 }
920 Ok((
921 version,
922 col_count,
923 &data[pos..pos + bitmap_bytes],
924 pos + bitmap_bytes,
925 ))
926}
927
928pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
929 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
930
931 let mut values = Vec::with_capacity(col_count);
932 for i in 0..col_count {
933 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
934 values.push(Value::Null);
935 continue;
936 }
937 let (type_tag, body, next) = read_cell(data, pos, version)?;
938 values.push(decode_value(type_tag, body)?);
939 pos = next;
940 }
941
942 Ok(values)
943}
944
945pub(crate) fn decode_row_push(data: &[u8], expected: usize, out: &mut Vec<Value>) -> Result<bool> {
948 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
949 if col_count != expected {
950 return Ok(false);
951 }
952 for col in 0..col_count {
953 if bitmap[col / 8] & (1 << (col % 8)) != 0 {
954 out.push(Value::Null);
955 } else {
956 let (type_tag, body, next) = read_cell(data, pos, version)?;
957 out.push(decode_value(type_tag, body)?);
958 pos = next;
959 }
960 }
961 Ok(true)
962}
963
964#[inline]
966pub fn row_non_pk_count(data: &[u8]) -> usize {
967 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
968}
969
970pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
971 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
972
973 for i in 0..col_count {
974 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
975 continue;
976 }
977 let (type_tag, body, next) = read_cell(data, pos, version)?;
978 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
979 out[col_mapping[i]] = decode_value(type_tag, body)?;
980 }
981 pos = next;
982 }
983
984 Ok(())
985}
986
987pub fn decode_pk_into(
988 key: &[u8],
989 count: usize,
990 out: &mut [Value],
991 pk_mapping: &[usize],
992) -> Result<()> {
993 let mut pos = 0;
994 for i in 0..count {
995 let (v, n) = decode_key_value(&key[pos..])?;
996 if i < pk_mapping.len() {
997 out[pk_mapping[i]] = v;
998 }
999 pos += n;
1000 }
1001 Ok(())
1002}
1003
1004pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
1005 if targets.is_empty() {
1006 return Ok(Vec::new());
1007 }
1008 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1009
1010 let mut results = Vec::with_capacity(targets.len());
1011 let mut ti = 0;
1012
1013 for col in 0..col_count {
1014 if ti >= targets.len() {
1015 break;
1016 }
1017 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1018
1019 if col == targets[ti] {
1020 if is_null {
1021 results.push(Value::Null);
1022 } else {
1023 let (type_tag, body, next) = read_cell(data, pos, version)?;
1024 results.push(decode_value(type_tag, body)?);
1025 pos = next;
1026 }
1027 ti += 1;
1028 } else if !is_null {
1029 pos = skip_cell(data, pos, version)?;
1030 }
1031 }
1032
1033 while ti < targets.len() {
1034 results.push(Value::Null);
1035 ti += 1;
1036 }
1037
1038 Ok(results)
1039}
1040
1041pub fn decode_columns_into(
1042 data: &[u8],
1043 targets: &[usize],
1044 schema_cols: &[usize],
1045 row: &mut [Value],
1046) -> Result<()> {
1047 if targets.is_empty() {
1048 return Ok(());
1049 }
1050 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1051
1052 let mut ti = 0;
1053 for col in 0..col_count {
1054 if ti >= targets.len() {
1055 break;
1056 }
1057 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1058
1059 if col == targets[ti] {
1060 if is_null {
1061 row[schema_cols[ti]] = Value::Null;
1062 } else {
1063 let (type_tag, body, next) = read_cell(data, pos, version)?;
1064 row[schema_cols[ti]] = decode_value(type_tag, body)?;
1065 pos = next;
1066 }
1067 ti += 1;
1068 } else if !is_null {
1069 pos = skip_cell(data, pos, version)?;
1070 }
1071 }
1072
1073 Ok(())
1074}
1075
1076struct OffsetTarget {
1077 cell_pos: usize,
1078 tag: u8,
1079 fixed_width: Option<usize>,
1080 out_pos: usize,
1081}
1082
1083pub(crate) struct ProjectedOffsetPlan {
1086 expected_header: u16,
1087 body_start: usize,
1088 nonnull_mask: Vec<u8>,
1089 targets: Vec<OffsetTarget>,
1090}
1091
1092impl ProjectedOffsetPlan {
1093 pub(crate) fn build(phys_tags: &[u8], targets: &[(usize, usize)]) -> Option<Self> {
1096 if targets.is_empty() {
1097 return None;
1098 }
1099 let max_t = targets.iter().map(|&(p, _)| p).max()?;
1100 let mut offsets = Vec::with_capacity(max_t + 1);
1101 let mut acc = 0usize;
1102 for (i, &tag) in phys_tags.iter().enumerate().take(max_t + 1) {
1103 offsets.push(acc);
1104 if i < max_t {
1105 acc += 1 + fixed_width_size(tag)?;
1106 }
1107 }
1108 let mut out_targets = Vec::with_capacity(targets.len());
1109 for &(p, out_pos) in targets {
1110 let tag = phys_tags[p];
1111 out_targets.push(OffsetTarget {
1112 cell_pos: offsets[p],
1113 tag,
1114 fixed_width: fixed_width_size(tag),
1115 out_pos,
1116 });
1117 }
1118 let phys_count = phys_tags.len();
1119 let bitmap_bytes = phys_count.div_ceil(8);
1120 let mut nonnull_mask = vec![0u8; bitmap_bytes];
1121 for bit in 0..=max_t {
1122 nonnull_mask[bit / 8] |= 1 << (bit % 8);
1123 }
1124 Some(Self {
1125 expected_header: V2_FLAG | (phys_count as u16),
1126 body_start: 2 + bitmap_bytes,
1127 nonnull_mask,
1128 targets: out_targets,
1129 })
1130 }
1131
1132 #[inline]
1134 fn layout_ok(&self, data: &[u8]) -> bool {
1135 if data.len() < self.body_start
1136 || u16::from_le_bytes([data[0], data[1]]) != self.expected_header
1137 {
1138 return false;
1139 }
1140 self.nonnull_mask
1141 .iter()
1142 .enumerate()
1143 .all(|(i, &m)| data[2 + i] & m == 0)
1144 }
1145
1146 #[inline]
1148 fn read_target(&self, data: &[u8], t: &OffsetTarget) -> Result<Option<Value>> {
1149 let pos = self.body_start + t.cell_pos;
1150 if data.get(pos) != Some(&t.tag) {
1151 return Ok(None);
1152 }
1153 let after_tag = pos + 1;
1154 let (len, body_pos) = match t.fixed_width {
1155 Some(n) => (n, after_tag),
1156 None => match data.get(after_tag..after_tag + 4) {
1157 Some(lb) => (
1158 u32::from_le_bytes(lb.try_into().unwrap()) as usize,
1159 after_tag + 4,
1160 ),
1161 None => return Ok(None),
1162 },
1163 };
1164 match data.get(body_pos..body_pos + len) {
1165 Some(body) => Ok(Some(decode_value(t.tag, body)?)),
1166 None => Ok(None),
1167 }
1168 }
1169
1170 pub(crate) fn decode_into(&self, data: &[u8], row: &mut [Value]) -> Result<bool> {
1172 if !self.layout_ok(data) {
1173 return Ok(false);
1174 }
1175 for t in &self.targets {
1176 match self.read_target(data, t)? {
1177 Some(v) => row[t.out_pos] = v,
1178 None => return Ok(false),
1179 }
1180 }
1181 Ok(true)
1182 }
1183
1184 pub(crate) fn decode_push(&self, data: &[u8], out: &mut Vec<Value>) -> Result<bool> {
1187 if !self.layout_ok(data) {
1188 return Ok(false);
1189 }
1190 for t in &self.targets {
1191 match self.read_target(data, t)? {
1192 Some(v) => out.push(v),
1193 None => return Ok(false),
1194 }
1195 }
1196 Ok(true)
1197 }
1198}
1199
1200#[derive(Debug, Clone, Copy)]
1201pub enum RawColumn<'a> {
1202 Null,
1203 Integer(i64),
1204 Real(f64),
1205 Boolean(bool),
1206 Text(&'a str),
1207 Blob(&'a [u8]),
1208 Time(i64),
1209 Date(i32),
1210 Timestamp(i64),
1211 Interval { months: i32, days: i32, micros: i64 },
1212 Json(&'a str),
1213 Jsonb(&'a [u8]),
1214 TsVector(&'a [u8]),
1215 TsQuery(&'a [u8]),
1216 Array(&'a [u8]),
1217 Vector(&'a [u8]),
1218}
1219
1220impl<'a> RawColumn<'a> {
1221 pub fn to_value(self) -> Value {
1222 match self {
1223 RawColumn::Null => Value::Null,
1224 RawColumn::Integer(i) => Value::Integer(i),
1225 RawColumn::Real(r) => Value::Real(r),
1226 RawColumn::Boolean(b) => Value::Boolean(b),
1227 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1228 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1229 RawColumn::Time(t) => Value::Time(t),
1230 RawColumn::Date(d) => Value::Date(d),
1231 RawColumn::Timestamp(t) => Value::Timestamp(t),
1232 RawColumn::Interval {
1233 months,
1234 days,
1235 micros,
1236 } => Value::Interval {
1237 months,
1238 days,
1239 micros,
1240 },
1241 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1242 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1243 RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1244 RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1245 RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1246 RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1247 }
1248 }
1249
1250 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1251 use std::cmp::Ordering;
1252 match (self, other) {
1253 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1254 (RawColumn::Null, _) | (_, Value::Null) => None,
1255 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1256 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1257 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1258 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1259 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1260 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1261 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1262 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1263 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1264 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1265 (
1266 RawColumn::Interval {
1267 months: am,
1268 days: ad,
1269 micros: au,
1270 },
1271 Value::Interval {
1272 months: bm,
1273 days: bd,
1274 micros: bu,
1275 },
1276 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1277 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1278 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1279 (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1280 (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1281 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1282 Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1283 _ => None,
1284 },
1285 _ => None,
1286 }
1287 }
1288
1289 pub fn eq_value(&self, other: &Value) -> bool {
1290 match (self, other) {
1291 (RawColumn::Null, Value::Null) => true,
1292 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1293 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1294 (RawColumn::Real(a), Value::Real(b)) => a == b,
1295 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1296 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1297 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1298 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1299 (RawColumn::Time(a), Value::Time(b)) => a == b,
1300 (RawColumn::Date(a), Value::Date(b)) => a == b,
1301 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1302 (
1303 RawColumn::Interval {
1304 months: am,
1305 days: ad,
1306 micros: au,
1307 },
1308 Value::Interval {
1309 months: bm,
1310 days: bd,
1311 micros: bu,
1312 },
1313 ) => am == bm && ad == bd && au == bu,
1314 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1315 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1316 (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1317 (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1318 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1319 Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1320 _ => false,
1321 },
1322 _ => false,
1323 }
1324 }
1325
1326 pub fn as_f64(&self) -> Option<f64> {
1327 match self {
1328 RawColumn::Integer(i) => Some(*i as f64),
1329 RawColumn::Real(r) => Some(*r),
1330 _ => None,
1331 }
1332 }
1333
1334 pub fn as_i64(&self) -> Option<i64> {
1335 match self {
1336 RawColumn::Integer(i) => Some(*i),
1337 RawColumn::Time(t) => Some(*t),
1338 RawColumn::Date(d) => Some(*d as i64),
1339 RawColumn::Timestamp(t) => Some(*t),
1340 _ => None,
1341 }
1342 }
1343}
1344
1345fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1346 match DataType::from_tag(type_tag) {
1347 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1348 data[..8].try_into().unwrap(),
1349 ))),
1350 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1351 data[..8].try_into().unwrap(),
1352 ))),
1353 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1354 Some(DataType::Text) => {
1355 let s = std::str::from_utf8(data)
1356 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1357 Ok(RawColumn::Text(s))
1358 }
1359 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1360 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1361 data[..8].try_into().unwrap(),
1362 ))),
1363 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1364 data[..4].try_into().unwrap(),
1365 ))),
1366 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1367 data[..8].try_into().unwrap(),
1368 ))),
1369 Some(DataType::Interval) => {
1370 if data.len() < 16 {
1371 return Err(SqlError::InvalidValue("truncated interval".into()));
1372 }
1373 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1374 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1375 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1376 Ok(RawColumn::Interval {
1377 months,
1378 days,
1379 micros,
1380 })
1381 }
1382 Some(DataType::Json) => {
1383 let s = std::str::from_utf8(data)
1384 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1385 Ok(RawColumn::Json(s))
1386 }
1387 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1388 Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1389 Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1390 Some(DataType::Array) => Ok(RawColumn::Array(data)),
1391 Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1392 _ => Err(SqlError::InvalidValue(format!(
1393 "unknown column type tag: {type_tag}"
1394 ))),
1395 }
1396}
1397
1398pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1400 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1401 if target >= col_count || new_val.is_null() {
1402 return Ok(false);
1403 }
1404 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1405 if was_null {
1406 return Ok(false);
1407 }
1408 for col in 0..target {
1409 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1410 if !is_null {
1411 pos = skip_cell(data, pos, version)?;
1412 }
1413 }
1414 if pos >= data.len() {
1415 return Err(SqlError::InvalidValue("truncated column data".into()));
1416 }
1417 let type_tag = data[pos];
1418 let (old_data_len, val_start) = match version {
1419 RowVersion::V2 => match fixed_width_size(type_tag) {
1420 Some(n) => (n, pos + 1),
1421 None => {
1422 if pos + 5 > data.len() {
1423 return Err(SqlError::InvalidValue("truncated column data".into()));
1424 }
1425 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1426 (len, pos + 5)
1427 }
1428 },
1429 RowVersion::V1 => {
1430 if pos + 5 > data.len() {
1431 return Err(SqlError::InvalidValue("truncated column data".into()));
1432 }
1433 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1434 (len, pos + 5)
1435 }
1436 };
1437 let new_data_len = match value_encoded_size_v2(new_val) {
1438 Some(n) => n,
1439 None => return Ok(false),
1440 };
1441 if new_data_len != old_data_len {
1442 return Ok(false);
1443 }
1444 data[pos] = new_val.data_type().type_tag();
1445 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1446 Ok(true)
1447}
1448
1449pub fn patch_row_column(
1451 data: &[u8],
1452 target: usize,
1453 new_val: &Value,
1454 out: &mut Vec<u8>,
1455) -> Result<()> {
1456 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1457
1458 let new_col_count = if target >= col_count {
1459 target + 1
1460 } else {
1461 col_count
1462 };
1463 let new_bitmap_bytes = new_col_count.div_ceil(8);
1464 let bitmap_bytes = col_count.div_ceil(8);
1465 out.clear();
1466
1467 let header = (new_col_count as u16) | V2_FLAG;
1468 out.extend_from_slice(&header.to_le_bytes());
1469 let bitmap_start = out.len();
1470 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1471 for _ in bitmap_bytes..new_bitmap_bytes {
1472 out.push(0xFF);
1473 }
1474 if new_val.is_null() {
1475 out[bitmap_start + target / 8] |= 1 << (target % 8);
1476 } else {
1477 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1478 }
1479
1480 let mut pos = header_end;
1481 for col in 0..new_col_count {
1482 let was_null = if col < col_count {
1483 bitmap[col / 8] & (1 << (col % 8)) != 0
1484 } else {
1485 true
1486 };
1487
1488 if col == target {
1489 if !was_null {
1490 pos = skip_cell(data, pos, version)?;
1491 }
1492 if !new_val.is_null() {
1493 encode_cell_v2(new_val, out);
1494 }
1495 } else if !was_null {
1496 pos = copy_cell_to_v2(data, pos, version, out)?;
1497 }
1498 }
1499 Ok(())
1500}
1501
1502pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1503 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1504 if target >= col_count {
1505 return Ok(RawColumn::Null);
1506 }
1507
1508 for col in 0..=target {
1509 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1510
1511 if col == target {
1512 if is_null {
1513 return Ok(RawColumn::Null);
1514 }
1515 let (type_tag, body, _) = read_cell(data, pos, version)?;
1516 return decode_value_raw(type_tag, body);
1517 } else if !is_null {
1518 pos = skip_cell(data, pos, version)?;
1519 }
1520 }
1521
1522 unreachable!()
1523}
1524
1525pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1527 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1528 if target >= col_count {
1529 return Ok((RawColumn::Null, usize::MAX));
1530 }
1531
1532 for col in 0..=target {
1533 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1534
1535 if col == target {
1536 if is_null {
1537 return Ok((RawColumn::Null, usize::MAX));
1538 }
1539 let tag_offset = pos;
1540 let (type_tag, body, _) = read_cell(data, pos, version)?;
1541 let raw = decode_value_raw(type_tag, body)?;
1542 return Ok((raw, tag_offset));
1543 } else if !is_null {
1544 pos = skip_cell(data, pos, version)?;
1545 }
1546 }
1547
1548 unreachable!()
1549}
1550
1551pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1553 if offset == usize::MAX || new_val.is_null() {
1554 return Ok(false);
1555 }
1556 if data.len() < 2 || offset >= data.len() {
1557 return Err(SqlError::InvalidValue("truncated column data".into()));
1558 }
1559 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1560 RowVersion::V2
1561 } else {
1562 RowVersion::V1
1563 };
1564 let type_tag = data[offset];
1565 let (old_data_len, val_start) = match version {
1566 RowVersion::V2 => match fixed_width_size(type_tag) {
1567 Some(n) => (n, offset + 1),
1568 None => {
1569 if offset + 5 > data.len() {
1570 return Err(SqlError::InvalidValue("truncated column data".into()));
1571 }
1572 let len =
1573 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1574 (len, offset + 5)
1575 }
1576 },
1577 RowVersion::V1 => {
1578 if offset + 5 > data.len() {
1579 return Err(SqlError::InvalidValue("truncated column data".into()));
1580 }
1581 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1582 (len, offset + 5)
1583 }
1584 };
1585 let new_data_len = match value_encoded_size_v2(new_val) {
1586 Some(n) => n,
1587 None => return Ok(false),
1588 };
1589 if new_data_len != old_data_len {
1590 return Ok(false);
1591 }
1592 data[offset] = new_val.data_type().type_tag();
1593 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1594 Ok(true)
1595}
1596
1597pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1598 if key.is_empty() || key[0] != TAG_INTEGER {
1599 return Err(SqlError::InvalidValue("not an integer key".into()));
1600 }
1601 let (val, _) = decode_signed_varint(&key[1..])?;
1602 Ok(val)
1603}
1604
1605#[cfg(test)]
1606#[path = "encoding_tests.rs"]
1607mod tests;