1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21
22pub fn encode_key_value(value: &Value) -> Vec<u8> {
24 let mut buf = Vec::with_capacity(16);
25 encode_key_value_into(value, &mut buf);
26 buf
27}
28
29pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
31 let mut buf = Vec::new();
32 for v in values {
33 buf.extend_from_slice(&encode_key_value(v));
34 }
35 buf
36}
37
38pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
39 buf.clear();
40 for v in values {
41 encode_key_value_into(v, buf);
42 }
43}
44
45pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
46 buf.clear();
47 for &i in indices {
48 encode_key_value_into(&row[i as usize], buf);
49 }
50}
51
52#[inline]
53pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
54 buf.clear();
55 encode_signed_varint(TAG_INTEGER, val, buf);
56}
57
58pub(crate) fn encode_key_value_collated_into(
59 value: &Value,
60 coll: crate::types::Collation,
61 buf: &mut Vec<u8>,
62) {
63 match (value, coll) {
64 (Value::Text(s), crate::types::Collation::NoCase) => {
65 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
66 }
67 (Value::Text(s), crate::types::Collation::Rtrim) => {
68 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
69 }
70 _ => encode_key_value_into(value, buf),
71 }
72}
73
74pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
75 match value {
76 Value::Null => buf.push(TAG_NULL),
77 Value::Boolean(b) => {
78 buf.push(TAG_BOOLEAN);
79 buf.push(if *b { 0x01 } else { 0x00 });
80 }
81 Value::Integer(i) => encode_integer_into(*i, buf),
82 Value::Real(r) => encode_real_into(*r, buf),
83 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
84 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
85 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
86 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
87 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
88 Value::Interval {
89 months,
90 days,
91 micros,
92 } => {
93 buf.push(TAG_INTERVAL);
95 let mut mb = months.to_be_bytes();
96 mb[0] ^= 0x80;
97 buf.extend_from_slice(&mb);
98 let mut db = days.to_be_bytes();
99 db[0] ^= 0x80;
100 buf.extend_from_slice(&db);
101 let mut ub = micros.to_be_bytes();
102 ub[0] ^= 0x80;
103 buf.extend_from_slice(&ub);
104 }
105 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
106 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
107 Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
108 Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
109 }
110}
111
112fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
113 encode_signed_varint(TAG_INTEGER, val, buf);
114}
115
116pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
122 buf.push(tag);
123 if val == 0 {
124 buf.push(0x80);
125 return;
126 }
127 if val > 0 {
128 let bytes = val.to_be_bytes();
129 let start = bytes.iter().position(|&b| b != 0).unwrap();
130 let byte_count = (8 - start) as u8;
131 buf.push(0x80 + byte_count);
132 buf.extend_from_slice(&bytes[start..]);
133 } else {
134 let abs_val = if val == i64::MIN {
135 u64::MAX / 2 + 1
136 } else {
137 (-val) as u64
138 };
139 let bytes = abs_val.to_be_bytes();
140 let start = bytes.iter().position(|&b| b != 0).unwrap();
141 let byte_count = (8 - start) as u8;
142 buf.push(0x80 - byte_count);
143 for &b in &bytes[start..] {
144 buf.push(!b);
145 }
146 }
147}
148
149fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
150 buf.push(TAG_REAL);
151 let bits = val.to_bits();
152 let encoded = if val.is_sign_negative() {
153 !bits
154 } else {
155 bits ^ (1u64 << 63)
156 };
157 buf.extend_from_slice(&encoded.to_be_bytes());
158}
159
160fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
161 buf.push(tag);
162 for &b in data {
163 if b == 0x00 {
164 buf.push(0x00);
165 buf.push(0xFF);
166 } else {
167 buf.push(b);
168 }
169 }
170 buf.push(0x00);
171}
172
173pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
175 if data.is_empty() {
176 return Err(SqlError::InvalidValue("empty key data".into()));
177 }
178 match data[0] {
179 TAG_NULL => Ok((Value::Null, 1)),
180 TAG_BOOLEAN => {
181 if data.len() < 2 {
182 return Err(SqlError::InvalidValue("truncated boolean".into()));
183 }
184 Ok((Value::Boolean(data[1] != 0), 2))
185 }
186 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
187 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
188 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
189 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
190 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
191 (Value::Date(d), n + 1)
192 }),
193 TAG_TIMESTAMP => {
194 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
195 }
196 TAG_INTERVAL => {
197 if data.len() < 1 + 16 {
198 return Err(SqlError::InvalidValue("truncated interval".into()));
199 }
200 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
201 mb[0] ^= 0x80;
202 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
203 db[0] ^= 0x80;
204 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
205 ub[0] ^= 0x80;
206 Ok((
207 Value::Interval {
208 months: i32::from_be_bytes(mb),
209 days: i32::from_be_bytes(db),
210 micros: i64::from_be_bytes(ub),
211 },
212 17,
213 ))
214 }
215 TAG_TEXT => {
216 let (bytes, n) = decode_null_escaped(&data[1..])?;
217 let s = String::from_utf8(bytes)
218 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
219 Ok((Value::Text(CompactString::from(s)), n + 1))
220 }
221 TAG_BLOB => {
222 let (bytes, n) = decode_null_escaped(&data[1..])?;
223 Ok((Value::Blob(bytes), n + 1))
224 }
225 TAG_JSON => {
226 let (bytes, n) = decode_null_escaped(&data[1..])?;
227 let s = String::from_utf8(bytes)
228 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
229 Ok((Value::Json(CompactString::from(s)), n + 1))
230 }
231 TAG_JSONB => {
232 let (bytes, n) = decode_null_escaped(&data[1..])?;
233 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
234 }
235 TAG_TSVECTOR => {
236 let (bytes, n) = decode_null_escaped(&data[1..])?;
237 Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
238 }
239 TAG_TSQUERY => {
240 let (bytes, n) = decode_null_escaped(&data[1..])?;
241 Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
242 }
243 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
244 }
245}
246
247pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
249 let mut values = Vec::with_capacity(count);
250 let mut pos = 0;
251 for _ in 0..count {
252 let (v, n) = decode_key_value(&data[pos..])?;
253 values.push(v);
254 pos += n;
255 }
256 Ok(values)
257}
258
259fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
260 let (v, n) = decode_signed_varint(data)?;
261 Ok((Value::Integer(v), n))
262}
263
264pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
266 if data.is_empty() {
267 return Err(SqlError::InvalidValue("truncated integer".into()));
268 }
269 let marker = data[0];
270 if marker == 0x80 {
271 return Ok((0, 1));
272 }
273 if marker > 0x80 {
274 let byte_count = (marker - 0x80) as usize;
275 if data.len() < 1 + byte_count {
276 return Err(SqlError::InvalidValue("truncated positive integer".into()));
277 }
278 let mut bytes = [0u8; 8];
279 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
280 let val = i64::from_be_bytes(bytes);
281 Ok((val, 1 + byte_count))
282 } else {
283 let byte_count = (0x80 - marker) as usize;
284 if data.len() < 1 + byte_count {
285 return Err(SqlError::InvalidValue("truncated negative integer".into()));
286 }
287 let mut bytes = [0u8; 8];
288 for i in 0..byte_count {
289 bytes[8 - byte_count + i] = !data[1 + i];
290 }
291 let abs_val = u64::from_be_bytes(bytes);
292 let val = (-(abs_val as i128)) as i64;
293 Ok((val, 1 + byte_count))
294 }
295}
296
297fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
298 if data.len() < 8 {
299 return Err(SqlError::InvalidValue("truncated real".into()));
300 }
301 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
302 let bits = if encoded & (1u64 << 63) != 0 {
303 encoded ^ (1u64 << 63)
305 } else {
306 !encoded
308 };
309 let val = f64::from_bits(bits);
310 Ok((Value::Real(val), 8))
311}
312
313fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
315 let mut result = Vec::new();
316 let mut i = 0;
317 while i < data.len() {
318 if data[i] == 0x00 {
319 if i + 1 < data.len() && data[i + 1] == 0xFF {
320 result.push(0x00);
321 i += 2;
322 } else {
323 return Ok((result, i + 1)); }
325 } else {
326 result.push(data[i]);
327 i += 1;
328 }
329 }
330 Err(SqlError::InvalidValue(
331 "unterminated null-escaped string".into(),
332 ))
333}
334
335fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
336 match v {
337 Value::Integer(val) => {
338 buf.push(DataType::Integer.type_tag());
339 buf.extend_from_slice(&val.to_le_bytes());
340 }
341 Value::Real(r) => {
342 buf.push(DataType::Real.type_tag());
343 buf.extend_from_slice(&r.to_le_bytes());
344 }
345 Value::Boolean(b) => {
346 buf.push(DataType::Boolean.type_tag());
347 buf.push(if *b { 1 } else { 0 });
348 }
349 Value::Text(s) => {
350 let bytes = s.as_bytes();
351 buf.push(DataType::Text.type_tag());
352 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
353 buf.extend_from_slice(bytes);
354 }
355 Value::Blob(data) => {
356 buf.push(DataType::Blob.type_tag());
357 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
358 buf.extend_from_slice(data);
359 }
360 Value::Time(t) => {
361 buf.push(DataType::Time.type_tag());
362 buf.extend_from_slice(&t.to_le_bytes());
363 }
364 Value::Date(d) => {
365 buf.push(DataType::Date.type_tag());
366 buf.extend_from_slice(&d.to_le_bytes());
367 }
368 Value::Timestamp(t) => {
369 buf.push(DataType::Timestamp.type_tag());
370 buf.extend_from_slice(&t.to_le_bytes());
371 }
372 Value::Interval {
373 months,
374 days,
375 micros,
376 } => {
377 buf.push(DataType::Interval.type_tag());
378 buf.extend_from_slice(&months.to_le_bytes());
379 buf.extend_from_slice(&days.to_le_bytes());
380 buf.extend_from_slice(µs.to_le_bytes());
381 }
382 Value::Json(s) => {
383 let bytes = s.as_bytes();
384 buf.push(DataType::Json.type_tag());
385 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
386 buf.extend_from_slice(bytes);
387 }
388 Value::Jsonb(b) => {
389 buf.push(DataType::Jsonb.type_tag());
390 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
391 buf.extend_from_slice(b);
392 }
393 Value::TsVector(b) => {
394 buf.push(DataType::TsVector.type_tag());
395 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
396 buf.extend_from_slice(b);
397 }
398 Value::TsQuery(b) => {
399 buf.push(DataType::TsQuery.type_tag());
400 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
401 buf.extend_from_slice(b);
402 }
403 Value::Null => unreachable!(),
404 }
405}
406
407pub fn encode_row(values: &[Value]) -> Vec<u8> {
408 let mut buf = Vec::new();
409 encode_row_into(values, &mut buf);
410 buf
411}
412
413pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
414 buf.clear();
415 let col_count = values.len();
416 let bitmap_bytes = col_count.div_ceil(8);
417
418 let header = (col_count as u16) | V2_FLAG;
419 buf.extend_from_slice(&header.to_le_bytes());
420
421 let bitmap_start = buf.len();
422 buf.resize(buf.len() + bitmap_bytes, 0);
423
424 for (i, v) in values.iter().enumerate() {
425 if v.is_null() {
426 buf[bitmap_start + i / 8] |= 1 << (i % 8);
427 continue;
428 }
429 encode_cell_v2(v, buf);
430 }
431}
432
433pub struct IntRowTemplate {
434 pub template: Vec<u8>,
435 pub slot_offsets: Vec<(usize, usize)>,
436}
437
438pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
439 let bitmap_bytes = phys_count.div_ceil(8);
440 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
441 let header = (phys_count as u16) | V2_FLAG;
442 template.extend_from_slice(&header.to_le_bytes());
443 let bitmap_start = template.len();
444 template.resize(bitmap_start + bitmap_bytes, 0);
445 for &i in null_slots {
446 template[bitmap_start + i / 8] |= 1 << (i % 8);
447 }
448 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
449 for slot in 0..phys_count {
450 if null_slots.contains(&slot) {
451 continue;
452 }
453 template.push(DataType::Integer.type_tag());
454 let value_offset = template.len();
455 template.extend_from_slice(&[0u8; 8]);
456 slot_offsets.push((slot, value_offset));
457 }
458 IntRowTemplate {
459 template,
460 slot_offsets,
461 }
462}
463
464#[inline]
466pub fn encode_int_row_with_template(
467 tmpl: &IntRowTemplate,
468 values: &[Value],
469 buf: &mut Vec<u8>,
470) -> Result<()> {
471 buf.clear();
472 buf.extend_from_slice(&tmpl.template);
473 for &(slot, off) in &tmpl.slot_offsets {
474 match &values[slot] {
475 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
476 other => {
477 return Err(SqlError::TypeMismatch {
478 expected: "Integer".into(),
479 got: other.data_type().to_string(),
480 });
481 }
482 }
483 }
484 Ok(())
485}
486
487fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
488 match DataType::from_tag(type_tag) {
489 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
490 data[..8].try_into().unwrap(),
491 ))),
492 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
493 data[..8].try_into().unwrap(),
494 ))),
495 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
496 Some(DataType::Text) => {
497 let s = std::str::from_utf8(data)
498 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
499 Ok(Value::Text(CompactString::from(s)))
500 }
501 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
502 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
503 data[..8].try_into().unwrap(),
504 ))),
505 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
506 data[..4].try_into().unwrap(),
507 ))),
508 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
509 data[..8].try_into().unwrap(),
510 ))),
511 Some(DataType::Interval) => {
512 if data.len() < 16 {
513 return Err(SqlError::InvalidValue("truncated interval".into()));
514 }
515 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
516 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
517 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
518 Ok(Value::Interval {
519 months,
520 days,
521 micros,
522 })
523 }
524 Some(DataType::Json) => {
525 let s = std::str::from_utf8(data)
526 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
527 Ok(Value::Json(CompactString::from(s)))
528 }
529 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
530 Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
531 Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
532 _ => Err(SqlError::InvalidValue(format!(
533 "unknown column type tag: {type_tag}"
534 ))),
535 }
536}
537
538#[derive(Clone, Copy, PartialEq, Eq, Debug)]
541pub(crate) enum RowVersion {
542 V1,
543 V2,
544}
545
546pub(crate) const V2_FLAG: u16 = 0x8000;
547pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
548
549#[inline]
550pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
551 match DataType::from_tag(type_tag)? {
552 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
553 DataType::Date => Some(4),
554 DataType::Boolean => Some(1),
555 DataType::Interval => Some(16),
556 DataType::Text
557 | DataType::Blob
558 | DataType::Json
559 | DataType::Jsonb
560 | DataType::TsVector
561 | DataType::TsQuery
562 | DataType::Null => None,
563 }
564}
565
566#[inline]
567fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
568 if pos >= data.len() {
569 return Err(SqlError::InvalidValue("truncated column data".into()));
570 }
571 let type_tag = data[pos];
572 let after_tag = pos + 1;
573 let (data_len, body_pos) = match version {
574 RowVersion::V2 => match fixed_width_size(type_tag) {
575 Some(n) => (n, after_tag),
576 None => {
577 if after_tag + 4 > data.len() {
578 return Err(SqlError::InvalidValue("truncated column data".into()));
579 }
580 let len = u32::from_le_bytes([
581 data[after_tag],
582 data[after_tag + 1],
583 data[after_tag + 2],
584 data[after_tag + 3],
585 ]) as usize;
586 (len, after_tag + 4)
587 }
588 },
589 RowVersion::V1 => {
590 if after_tag + 4 > data.len() {
591 return Err(SqlError::InvalidValue("truncated column data".into()));
592 }
593 let len = u32::from_le_bytes([
594 data[after_tag],
595 data[after_tag + 1],
596 data[after_tag + 2],
597 data[after_tag + 3],
598 ]) as usize;
599 (len, after_tag + 4)
600 }
601 };
602 if body_pos + data_len > data.len() {
603 return Err(SqlError::InvalidValue("truncated column value".into()));
604 }
605 Ok((
606 type_tag,
607 &data[body_pos..body_pos + data_len],
608 body_pos + data_len,
609 ))
610}
611
612#[inline]
613fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
614 let (_, _, next) = read_cell(data, pos, version)?;
615 Ok(next)
616}
617
618fn copy_cell_to_v2(
619 data: &[u8],
620 pos: usize,
621 version: RowVersion,
622 out: &mut Vec<u8>,
623) -> Result<usize> {
624 let (tag, body, next) = read_cell(data, pos, version)?;
625 out.push(tag);
626 if fixed_width_size(tag).is_none() {
627 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
628 }
629 out.extend_from_slice(body);
630 Ok(next)
631}
632
633fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
634 if data.len() < 2 {
635 return Err(SqlError::InvalidValue("row data too short".into()));
636 }
637 let raw = u16::from_le_bytes([data[0], data[1]]);
638 let version = if raw & V2_FLAG != 0 {
639 RowVersion::V2
640 } else {
641 RowVersion::V1
642 };
643 let col_count = (raw & COL_COUNT_MASK) as usize;
644 let bitmap_bytes = col_count.div_ceil(8);
645 let pos = 2;
646 if data.len() < pos + bitmap_bytes {
647 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
648 }
649 Ok((
650 version,
651 col_count,
652 &data[pos..pos + bitmap_bytes],
653 pos + bitmap_bytes,
654 ))
655}
656
657pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
658 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
659
660 let mut values = Vec::with_capacity(col_count);
661 for i in 0..col_count {
662 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
663 values.push(Value::Null);
664 continue;
665 }
666 let (type_tag, body, next) = read_cell(data, pos, version)?;
667 values.push(decode_value(type_tag, body)?);
668 pos = next;
669 }
670
671 Ok(values)
672}
673
674#[inline]
676pub fn row_non_pk_count(data: &[u8]) -> usize {
677 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
678}
679
680pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
681 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
682
683 for i in 0..col_count {
684 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
685 continue;
686 }
687 let (type_tag, body, next) = read_cell(data, pos, version)?;
688 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
689 out[col_mapping[i]] = decode_value(type_tag, body)?;
690 }
691 pos = next;
692 }
693
694 Ok(())
695}
696
697pub fn decode_pk_into(
698 key: &[u8],
699 count: usize,
700 out: &mut [Value],
701 pk_mapping: &[usize],
702) -> Result<()> {
703 let mut pos = 0;
704 for i in 0..count {
705 let (v, n) = decode_key_value(&key[pos..])?;
706 if i < pk_mapping.len() {
707 out[pk_mapping[i]] = v;
708 }
709 pos += n;
710 }
711 Ok(())
712}
713
714pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
715 if targets.is_empty() {
716 return Ok(Vec::new());
717 }
718 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
719
720 let mut results = Vec::with_capacity(targets.len());
721 let mut ti = 0;
722
723 for col in 0..col_count {
724 if ti >= targets.len() {
725 break;
726 }
727 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
728
729 if col == targets[ti] {
730 if is_null {
731 results.push(Value::Null);
732 } else {
733 let (type_tag, body, next) = read_cell(data, pos, version)?;
734 results.push(decode_value(type_tag, body)?);
735 pos = next;
736 }
737 ti += 1;
738 } else if !is_null {
739 pos = skip_cell(data, pos, version)?;
740 }
741 }
742
743 while ti < targets.len() {
744 results.push(Value::Null);
745 ti += 1;
746 }
747
748 Ok(results)
749}
750
751pub fn decode_columns_into(
752 data: &[u8],
753 targets: &[usize],
754 schema_cols: &[usize],
755 row: &mut [Value],
756) -> Result<()> {
757 if targets.is_empty() {
758 return Ok(());
759 }
760 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
761
762 let mut ti = 0;
763 for col in 0..col_count {
764 if ti >= targets.len() {
765 break;
766 }
767 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
768
769 if col == targets[ti] {
770 if is_null {
771 row[schema_cols[ti]] = Value::Null;
772 } else {
773 let (type_tag, body, next) = read_cell(data, pos, version)?;
774 row[schema_cols[ti]] = decode_value(type_tag, body)?;
775 pos = next;
776 }
777 ti += 1;
778 } else if !is_null {
779 pos = skip_cell(data, pos, version)?;
780 }
781 }
782
783 Ok(())
784}
785
786#[derive(Debug, Clone, Copy)]
787pub enum RawColumn<'a> {
788 Null,
789 Integer(i64),
790 Real(f64),
791 Boolean(bool),
792 Text(&'a str),
793 Blob(&'a [u8]),
794 Time(i64),
795 Date(i32),
796 Timestamp(i64),
797 Interval { months: i32, days: i32, micros: i64 },
798 Json(&'a str),
799 Jsonb(&'a [u8]),
800 TsVector(&'a [u8]),
801 TsQuery(&'a [u8]),
802}
803
804impl<'a> RawColumn<'a> {
805 pub fn to_value(self) -> Value {
806 match self {
807 RawColumn::Null => Value::Null,
808 RawColumn::Integer(i) => Value::Integer(i),
809 RawColumn::Real(r) => Value::Real(r),
810 RawColumn::Boolean(b) => Value::Boolean(b),
811 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
812 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
813 RawColumn::Time(t) => Value::Time(t),
814 RawColumn::Date(d) => Value::Date(d),
815 RawColumn::Timestamp(t) => Value::Timestamp(t),
816 RawColumn::Interval {
817 months,
818 days,
819 micros,
820 } => Value::Interval {
821 months,
822 days,
823 micros,
824 },
825 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
826 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
827 RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
828 RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
829 }
830 }
831
832 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
833 use std::cmp::Ordering;
834 match (self, other) {
835 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
836 (RawColumn::Null, _) | (_, Value::Null) => None,
837 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
838 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
839 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
840 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
841 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
842 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
843 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
844 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
845 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
846 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
847 (
848 RawColumn::Interval {
849 months: am,
850 days: ad,
851 micros: au,
852 },
853 Value::Interval {
854 months: bm,
855 days: bd,
856 micros: bu,
857 },
858 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
859 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
860 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
861 (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
862 (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
863 _ => None,
864 }
865 }
866
867 pub fn eq_value(&self, other: &Value) -> bool {
868 match (self, other) {
869 (RawColumn::Null, Value::Null) => true,
870 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
871 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
872 (RawColumn::Real(a), Value::Real(b)) => a == b,
873 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
874 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
875 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
876 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
877 (RawColumn::Time(a), Value::Time(b)) => a == b,
878 (RawColumn::Date(a), Value::Date(b)) => a == b,
879 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
880 (
881 RawColumn::Interval {
882 months: am,
883 days: ad,
884 micros: au,
885 },
886 Value::Interval {
887 months: bm,
888 days: bd,
889 micros: bu,
890 },
891 ) => am == bm && ad == bd && au == bu,
892 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
893 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
894 (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
895 (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
896 _ => false,
897 }
898 }
899
900 pub fn as_f64(&self) -> Option<f64> {
901 match self {
902 RawColumn::Integer(i) => Some(*i as f64),
903 RawColumn::Real(r) => Some(*r),
904 _ => None,
905 }
906 }
907
908 pub fn as_i64(&self) -> Option<i64> {
909 match self {
910 RawColumn::Integer(i) => Some(*i),
911 RawColumn::Time(t) => Some(*t),
912 RawColumn::Date(d) => Some(*d as i64),
913 RawColumn::Timestamp(t) => Some(*t),
914 _ => None,
915 }
916 }
917}
918
919fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
920 match DataType::from_tag(type_tag) {
921 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
922 data[..8].try_into().unwrap(),
923 ))),
924 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
925 data[..8].try_into().unwrap(),
926 ))),
927 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
928 Some(DataType::Text) => {
929 let s = std::str::from_utf8(data)
930 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
931 Ok(RawColumn::Text(s))
932 }
933 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
934 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
935 data[..8].try_into().unwrap(),
936 ))),
937 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
938 data[..4].try_into().unwrap(),
939 ))),
940 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
941 data[..8].try_into().unwrap(),
942 ))),
943 Some(DataType::Interval) => {
944 if data.len() < 16 {
945 return Err(SqlError::InvalidValue("truncated interval".into()));
946 }
947 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
948 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
949 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
950 Ok(RawColumn::Interval {
951 months,
952 days,
953 micros,
954 })
955 }
956 Some(DataType::Json) => {
957 let s = std::str::from_utf8(data)
958 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
959 Ok(RawColumn::Json(s))
960 }
961 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
962 Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
963 Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
964 _ => Err(SqlError::InvalidValue(format!(
965 "unknown column type tag: {type_tag}"
966 ))),
967 }
968}
969
970pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
972 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
973 if target >= col_count || new_val.is_null() {
974 return Ok(false);
975 }
976 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
977 if was_null {
978 return Ok(false);
979 }
980 for col in 0..target {
981 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
982 if !is_null {
983 pos = skip_cell(data, pos, version)?;
984 }
985 }
986 let type_tag = data[pos];
987 let (old_data_len, val_start) = match version {
988 RowVersion::V2 => match fixed_width_size(type_tag) {
989 Some(n) => (n, pos + 1),
990 None => {
991 if pos + 5 > data.len() {
992 return Err(SqlError::InvalidValue("truncated column data".into()));
993 }
994 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
995 (len, pos + 5)
996 }
997 },
998 RowVersion::V1 => {
999 if pos + 5 > data.len() {
1000 return Err(SqlError::InvalidValue("truncated column data".into()));
1001 }
1002 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1003 (len, pos + 5)
1004 }
1005 };
1006 let new_data_len = match new_val {
1007 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1008 Value::Date(_) => 4,
1009 Value::Interval { .. } => 16,
1010 Value::Boolean(_) => 1,
1011 Value::Text(s) => s.len(),
1012 Value::Blob(b) => b.len(),
1013 Value::Json(s) => s.len(),
1014 Value::Jsonb(b) => b.len(),
1015 Value::TsVector(b) => b.len(),
1016 Value::TsQuery(b) => b.len(),
1017 Value::Null => return Ok(false),
1018 };
1019 if new_data_len != old_data_len {
1020 return Ok(false);
1021 }
1022 data[pos] = new_val.data_type().type_tag();
1023 match new_val {
1024 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1025 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1026 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1027 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1028 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1029 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1030 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1031 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1032 Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1033 Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1034 Value::TsVector(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1035 Value::TsQuery(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1036 Value::Interval {
1037 months,
1038 days,
1039 micros,
1040 } => {
1041 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1042 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1043 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1044 }
1045 Value::Null => unreachable!(),
1046 }
1047 Ok(true)
1048}
1049
1050pub fn patch_row_column(
1052 data: &[u8],
1053 target: usize,
1054 new_val: &Value,
1055 out: &mut Vec<u8>,
1056) -> Result<()> {
1057 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1058
1059 let new_col_count = if target >= col_count {
1060 target + 1
1061 } else {
1062 col_count
1063 };
1064 let new_bitmap_bytes = new_col_count.div_ceil(8);
1065 let bitmap_bytes = col_count.div_ceil(8);
1066 out.clear();
1067
1068 let header = (new_col_count as u16) | V2_FLAG;
1069 out.extend_from_slice(&header.to_le_bytes());
1070 let bitmap_start = out.len();
1071 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1072 for _ in bitmap_bytes..new_bitmap_bytes {
1073 out.push(0xFF);
1074 }
1075 if new_val.is_null() {
1076 out[bitmap_start + target / 8] |= 1 << (target % 8);
1077 } else {
1078 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1079 }
1080
1081 let mut pos = header_end;
1082 for col in 0..new_col_count {
1083 let was_null = if col < col_count {
1084 bitmap[col / 8] & (1 << (col % 8)) != 0
1085 } else {
1086 true
1087 };
1088
1089 if col == target {
1090 if !was_null {
1091 pos = skip_cell(data, pos, version)?;
1092 }
1093 if !new_val.is_null() {
1094 encode_cell_v2(new_val, out);
1095 }
1096 } else if !was_null {
1097 pos = copy_cell_to_v2(data, pos, version, out)?;
1098 }
1099 }
1100 Ok(())
1101}
1102
1103pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1104 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1105 if target >= col_count {
1106 return Ok(RawColumn::Null);
1107 }
1108
1109 for col in 0..=target {
1110 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1111
1112 if col == target {
1113 if is_null {
1114 return Ok(RawColumn::Null);
1115 }
1116 let (type_tag, body, _) = read_cell(data, pos, version)?;
1117 return decode_value_raw(type_tag, body);
1118 } else if !is_null {
1119 pos = skip_cell(data, pos, version)?;
1120 }
1121 }
1122
1123 unreachable!()
1124}
1125
1126pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1128 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1129 if target >= col_count {
1130 return Ok((RawColumn::Null, usize::MAX));
1131 }
1132
1133 for col in 0..=target {
1134 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1135
1136 if col == target {
1137 if is_null {
1138 return Ok((RawColumn::Null, usize::MAX));
1139 }
1140 let tag_offset = pos;
1141 let (type_tag, body, _) = read_cell(data, pos, version)?;
1142 let raw = decode_value_raw(type_tag, body)?;
1143 return Ok((raw, tag_offset));
1144 } else if !is_null {
1145 pos = skip_cell(data, pos, version)?;
1146 }
1147 }
1148
1149 unreachable!()
1150}
1151
1152pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1154 if offset == usize::MAX || new_val.is_null() {
1155 return Ok(false);
1156 }
1157 if data.len() < 2 || offset >= data.len() {
1158 return Err(SqlError::InvalidValue("truncated column data".into()));
1159 }
1160 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1161 RowVersion::V2
1162 } else {
1163 RowVersion::V1
1164 };
1165 let type_tag = data[offset];
1166 let (old_data_len, val_start) = match version {
1167 RowVersion::V2 => match fixed_width_size(type_tag) {
1168 Some(n) => (n, offset + 1),
1169 None => {
1170 if offset + 5 > data.len() {
1171 return Err(SqlError::InvalidValue("truncated column data".into()));
1172 }
1173 let len =
1174 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1175 (len, offset + 5)
1176 }
1177 },
1178 RowVersion::V1 => {
1179 if offset + 5 > data.len() {
1180 return Err(SqlError::InvalidValue("truncated column data".into()));
1181 }
1182 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1183 (len, offset + 5)
1184 }
1185 };
1186 let new_data_len = match new_val {
1187 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1188 Value::Date(_) => 4,
1189 Value::Interval { .. } => 16,
1190 Value::Boolean(_) => 1,
1191 Value::Text(s) => s.len(),
1192 Value::Blob(b) => b.len(),
1193 Value::Json(s) => s.len(),
1194 Value::Jsonb(b) => b.len(),
1195 Value::TsVector(b) => b.len(),
1196 Value::TsQuery(b) => b.len(),
1197 Value::Null => return Ok(false),
1198 };
1199 if new_data_len != old_data_len {
1200 return Ok(false);
1201 }
1202 data[offset] = new_val.data_type().type_tag();
1203 match new_val {
1204 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1205 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1206 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1207 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1208 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1209 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1210 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1211 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1212 Value::Json(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1213 Value::Jsonb(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1214 Value::TsVector(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1215 Value::TsQuery(b) => data[val_start..val_start + b.len()].copy_from_slice(b),
1216 Value::Interval {
1217 months,
1218 days,
1219 micros,
1220 } => {
1221 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1222 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1223 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1224 }
1225 Value::Null => unreachable!(),
1226 }
1227 Ok(true)
1228}
1229
1230pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1231 if key.is_empty() || key[0] != TAG_INTEGER {
1232 return Err(SqlError::InvalidValue("not an integer key".into()));
1233 }
1234 let (val, _) = decode_integer(&key[1..])?;
1235 match val {
1236 Value::Integer(i) => Ok(i),
1237 _ => unreachable!(),
1238 }
1239}
1240
1241#[cfg(test)]
1242#[path = "encoding_tests.rs"]
1243mod tests;