1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18pub fn encode_key_value(value: &Value) -> Vec<u8> {
20 let mut buf = Vec::with_capacity(16);
21 encode_key_value_into(value, &mut buf);
22 buf
23}
24
25pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27 let mut buf = Vec::new();
28 for v in values {
29 buf.extend_from_slice(&encode_key_value(v));
30 }
31 buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35 buf.clear();
36 for v in values {
37 encode_key_value_into(v, buf);
38 }
39}
40
41pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
42 buf.clear();
43 for &i in indices {
44 encode_key_value_into(&row[i as usize], buf);
45 }
46}
47
48#[inline]
49pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
50 buf.clear();
51 encode_signed_varint(TAG_INTEGER, val, buf);
52}
53
54pub(crate) fn encode_key_value_collated_into(
55 value: &Value,
56 coll: crate::types::Collation,
57 buf: &mut Vec<u8>,
58) {
59 match (value, coll) {
60 (Value::Text(s), crate::types::Collation::NoCase) => {
61 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
62 }
63 (Value::Text(s), crate::types::Collation::Rtrim) => {
64 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
65 }
66 _ => encode_key_value_into(value, buf),
67 }
68}
69
70pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
71 match value {
72 Value::Null => buf.push(TAG_NULL),
73 Value::Boolean(b) => {
74 buf.push(TAG_BOOLEAN);
75 buf.push(if *b { 0x01 } else { 0x00 });
76 }
77 Value::Integer(i) => encode_integer_into(*i, buf),
78 Value::Real(r) => encode_real_into(*r, buf),
79 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
80 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
81 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
82 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
83 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
84 Value::Interval {
85 months,
86 days,
87 micros,
88 } => {
89 buf.push(TAG_INTERVAL);
91 let mut mb = months.to_be_bytes();
92 mb[0] ^= 0x80;
93 buf.extend_from_slice(&mb);
94 let mut db = days.to_be_bytes();
95 db[0] ^= 0x80;
96 buf.extend_from_slice(&db);
97 let mut ub = micros.to_be_bytes();
98 ub[0] ^= 0x80;
99 buf.extend_from_slice(&ub);
100 }
101 }
102}
103
104fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
105 encode_signed_varint(TAG_INTEGER, val, buf);
106}
107
108pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
114 buf.push(tag);
115 if val == 0 {
116 buf.push(0x80);
117 return;
118 }
119 if val > 0 {
120 let bytes = val.to_be_bytes();
121 let start = bytes.iter().position(|&b| b != 0).unwrap();
122 let byte_count = (8 - start) as u8;
123 buf.push(0x80 + byte_count);
124 buf.extend_from_slice(&bytes[start..]);
125 } else {
126 let abs_val = if val == i64::MIN {
127 u64::MAX / 2 + 1
128 } else {
129 (-val) as u64
130 };
131 let bytes = abs_val.to_be_bytes();
132 let start = bytes.iter().position(|&b| b != 0).unwrap();
133 let byte_count = (8 - start) as u8;
134 buf.push(0x80 - byte_count);
135 for &b in &bytes[start..] {
136 buf.push(!b);
137 }
138 }
139}
140
141fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
142 buf.push(TAG_REAL);
143 let bits = val.to_bits();
144 let encoded = if val.is_sign_negative() {
145 !bits
146 } else {
147 bits ^ (1u64 << 63)
148 };
149 buf.extend_from_slice(&encoded.to_be_bytes());
150}
151
152fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
153 buf.push(tag);
154 for &b in data {
155 if b == 0x00 {
156 buf.push(0x00);
157 buf.push(0xFF);
158 } else {
159 buf.push(b);
160 }
161 }
162 buf.push(0x00);
163}
164
165pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
167 if data.is_empty() {
168 return Err(SqlError::InvalidValue("empty key data".into()));
169 }
170 match data[0] {
171 TAG_NULL => Ok((Value::Null, 1)),
172 TAG_BOOLEAN => {
173 if data.len() < 2 {
174 return Err(SqlError::InvalidValue("truncated boolean".into()));
175 }
176 Ok((Value::Boolean(data[1] != 0), 2))
177 }
178 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
179 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
180 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
181 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
182 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
183 (Value::Date(d), n + 1)
184 }),
185 TAG_TIMESTAMP => {
186 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
187 }
188 TAG_INTERVAL => {
189 if data.len() < 1 + 16 {
190 return Err(SqlError::InvalidValue("truncated interval".into()));
191 }
192 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
193 mb[0] ^= 0x80;
194 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
195 db[0] ^= 0x80;
196 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
197 ub[0] ^= 0x80;
198 Ok((
199 Value::Interval {
200 months: i32::from_be_bytes(mb),
201 days: i32::from_be_bytes(db),
202 micros: i64::from_be_bytes(ub),
203 },
204 17,
205 ))
206 }
207 TAG_TEXT => {
208 let (bytes, n) = decode_null_escaped(&data[1..])?;
209 let s = String::from_utf8(bytes)
210 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
211 Ok((Value::Text(CompactString::from(s)), n + 1))
212 }
213 TAG_BLOB => {
214 let (bytes, n) = decode_null_escaped(&data[1..])?;
215 Ok((Value::Blob(bytes), n + 1))
216 }
217 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
218 }
219}
220
221pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
223 let mut values = Vec::with_capacity(count);
224 let mut pos = 0;
225 for _ in 0..count {
226 let (v, n) = decode_key_value(&data[pos..])?;
227 values.push(v);
228 pos += n;
229 }
230 Ok(values)
231}
232
233fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
234 let (v, n) = decode_signed_varint(data)?;
235 Ok((Value::Integer(v), n))
236}
237
238pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
240 if data.is_empty() {
241 return Err(SqlError::InvalidValue("truncated integer".into()));
242 }
243 let marker = data[0];
244 if marker == 0x80 {
245 return Ok((0, 1));
246 }
247 if marker > 0x80 {
248 let byte_count = (marker - 0x80) as usize;
249 if data.len() < 1 + byte_count {
250 return Err(SqlError::InvalidValue("truncated positive integer".into()));
251 }
252 let mut bytes = [0u8; 8];
253 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
254 let val = i64::from_be_bytes(bytes);
255 Ok((val, 1 + byte_count))
256 } else {
257 let byte_count = (0x80 - marker) as usize;
258 if data.len() < 1 + byte_count {
259 return Err(SqlError::InvalidValue("truncated negative integer".into()));
260 }
261 let mut bytes = [0u8; 8];
262 for i in 0..byte_count {
263 bytes[8 - byte_count + i] = !data[1 + i];
264 }
265 let abs_val = u64::from_be_bytes(bytes);
266 let val = (-(abs_val as i128)) as i64;
267 Ok((val, 1 + byte_count))
268 }
269}
270
271fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
272 if data.len() < 8 {
273 return Err(SqlError::InvalidValue("truncated real".into()));
274 }
275 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
276 let bits = if encoded & (1u64 << 63) != 0 {
277 encoded ^ (1u64 << 63)
279 } else {
280 !encoded
282 };
283 let val = f64::from_bits(bits);
284 Ok((Value::Real(val), 8))
285}
286
287fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
289 let mut result = Vec::new();
290 let mut i = 0;
291 while i < data.len() {
292 if data[i] == 0x00 {
293 if i + 1 < data.len() && data[i + 1] == 0xFF {
294 result.push(0x00);
295 i += 2;
296 } else {
297 return Ok((result, i + 1)); }
299 } else {
300 result.push(data[i]);
301 i += 1;
302 }
303 }
304 Err(SqlError::InvalidValue(
305 "unterminated null-escaped string".into(),
306 ))
307}
308
309fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
310 match v {
311 Value::Integer(val) => {
312 buf.push(DataType::Integer.type_tag());
313 buf.extend_from_slice(&val.to_le_bytes());
314 }
315 Value::Real(r) => {
316 buf.push(DataType::Real.type_tag());
317 buf.extend_from_slice(&r.to_le_bytes());
318 }
319 Value::Boolean(b) => {
320 buf.push(DataType::Boolean.type_tag());
321 buf.push(if *b { 1 } else { 0 });
322 }
323 Value::Text(s) => {
324 let bytes = s.as_bytes();
325 buf.push(DataType::Text.type_tag());
326 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
327 buf.extend_from_slice(bytes);
328 }
329 Value::Blob(data) => {
330 buf.push(DataType::Blob.type_tag());
331 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
332 buf.extend_from_slice(data);
333 }
334 Value::Time(t) => {
335 buf.push(DataType::Time.type_tag());
336 buf.extend_from_slice(&t.to_le_bytes());
337 }
338 Value::Date(d) => {
339 buf.push(DataType::Date.type_tag());
340 buf.extend_from_slice(&d.to_le_bytes());
341 }
342 Value::Timestamp(t) => {
343 buf.push(DataType::Timestamp.type_tag());
344 buf.extend_from_slice(&t.to_le_bytes());
345 }
346 Value::Interval {
347 months,
348 days,
349 micros,
350 } => {
351 buf.push(DataType::Interval.type_tag());
352 buf.extend_from_slice(&months.to_le_bytes());
353 buf.extend_from_slice(&days.to_le_bytes());
354 buf.extend_from_slice(µs.to_le_bytes());
355 }
356 Value::Null => unreachable!(),
357 }
358}
359
360pub fn encode_row(values: &[Value]) -> Vec<u8> {
361 let mut buf = Vec::new();
362 encode_row_into(values, &mut buf);
363 buf
364}
365
366pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
367 buf.clear();
368 let col_count = values.len();
369 let bitmap_bytes = col_count.div_ceil(8);
370
371 let header = (col_count as u16) | V2_FLAG;
372 buf.extend_from_slice(&header.to_le_bytes());
373
374 let bitmap_start = buf.len();
375 buf.resize(buf.len() + bitmap_bytes, 0);
376
377 for (i, v) in values.iter().enumerate() {
378 if v.is_null() {
379 buf[bitmap_start + i / 8] |= 1 << (i % 8);
380 continue;
381 }
382 encode_cell_v2(v, buf);
383 }
384}
385
386pub struct IntRowTemplate {
387 pub template: Vec<u8>,
388 pub slot_offsets: Vec<(usize, usize)>,
389}
390
391pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
392 let bitmap_bytes = phys_count.div_ceil(8);
393 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
394 let header = (phys_count as u16) | V2_FLAG;
395 template.extend_from_slice(&header.to_le_bytes());
396 let bitmap_start = template.len();
397 template.resize(bitmap_start + bitmap_bytes, 0);
398 for &i in null_slots {
399 template[bitmap_start + i / 8] |= 1 << (i % 8);
400 }
401 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
402 for slot in 0..phys_count {
403 if null_slots.contains(&slot) {
404 continue;
405 }
406 template.push(DataType::Integer.type_tag());
407 let value_offset = template.len();
408 template.extend_from_slice(&[0u8; 8]);
409 slot_offsets.push((slot, value_offset));
410 }
411 IntRowTemplate {
412 template,
413 slot_offsets,
414 }
415}
416
417#[inline]
419pub fn encode_int_row_with_template(
420 tmpl: &IntRowTemplate,
421 values: &[Value],
422 buf: &mut Vec<u8>,
423) -> Result<()> {
424 buf.clear();
425 buf.extend_from_slice(&tmpl.template);
426 for &(slot, off) in &tmpl.slot_offsets {
427 match &values[slot] {
428 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
429 other => {
430 return Err(SqlError::TypeMismatch {
431 expected: "Integer".into(),
432 got: other.data_type().to_string(),
433 });
434 }
435 }
436 }
437 Ok(())
438}
439
440fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
441 match DataType::from_tag(type_tag) {
442 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
443 data[..8].try_into().unwrap(),
444 ))),
445 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
446 data[..8].try_into().unwrap(),
447 ))),
448 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
449 Some(DataType::Text) => {
450 let s = std::str::from_utf8(data)
451 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
452 Ok(Value::Text(CompactString::from(s)))
453 }
454 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
455 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
456 data[..8].try_into().unwrap(),
457 ))),
458 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
459 data[..4].try_into().unwrap(),
460 ))),
461 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
462 data[..8].try_into().unwrap(),
463 ))),
464 Some(DataType::Interval) => {
465 if data.len() < 16 {
466 return Err(SqlError::InvalidValue("truncated interval".into()));
467 }
468 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
469 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
470 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
471 Ok(Value::Interval {
472 months,
473 days,
474 micros,
475 })
476 }
477 _ => Err(SqlError::InvalidValue(format!(
478 "unknown column type tag: {type_tag}"
479 ))),
480 }
481}
482
483#[derive(Clone, Copy, PartialEq, Eq, Debug)]
486pub(crate) enum RowVersion {
487 V1,
488 V2,
489}
490
491pub(crate) const V2_FLAG: u16 = 0x8000;
492pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
493
494#[inline]
495pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
496 match DataType::from_tag(type_tag)? {
497 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
498 DataType::Date => Some(4),
499 DataType::Boolean => Some(1),
500 DataType::Interval => Some(16),
501 DataType::Text | DataType::Blob | DataType::Null => None,
502 }
503}
504
505#[inline]
506fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
507 if pos >= data.len() {
508 return Err(SqlError::InvalidValue("truncated column data".into()));
509 }
510 let type_tag = data[pos];
511 let after_tag = pos + 1;
512 let (data_len, body_pos) = match version {
513 RowVersion::V2 => match fixed_width_size(type_tag) {
514 Some(n) => (n, after_tag),
515 None => {
516 if after_tag + 4 > data.len() {
517 return Err(SqlError::InvalidValue("truncated column data".into()));
518 }
519 let len = u32::from_le_bytes([
520 data[after_tag],
521 data[after_tag + 1],
522 data[after_tag + 2],
523 data[after_tag + 3],
524 ]) as usize;
525 (len, after_tag + 4)
526 }
527 },
528 RowVersion::V1 => {
529 if after_tag + 4 > data.len() {
530 return Err(SqlError::InvalidValue("truncated column data".into()));
531 }
532 let len = u32::from_le_bytes([
533 data[after_tag],
534 data[after_tag + 1],
535 data[after_tag + 2],
536 data[after_tag + 3],
537 ]) as usize;
538 (len, after_tag + 4)
539 }
540 };
541 if body_pos + data_len > data.len() {
542 return Err(SqlError::InvalidValue("truncated column value".into()));
543 }
544 Ok((
545 type_tag,
546 &data[body_pos..body_pos + data_len],
547 body_pos + data_len,
548 ))
549}
550
551#[inline]
552fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
553 let (_, _, next) = read_cell(data, pos, version)?;
554 Ok(next)
555}
556
557fn copy_cell_to_v2(
558 data: &[u8],
559 pos: usize,
560 version: RowVersion,
561 out: &mut Vec<u8>,
562) -> Result<usize> {
563 let (tag, body, next) = read_cell(data, pos, version)?;
564 out.push(tag);
565 if fixed_width_size(tag).is_none() {
566 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
567 }
568 out.extend_from_slice(body);
569 Ok(next)
570}
571
572fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
573 if data.len() < 2 {
574 return Err(SqlError::InvalidValue("row data too short".into()));
575 }
576 let raw = u16::from_le_bytes([data[0], data[1]]);
577 let version = if raw & V2_FLAG != 0 {
578 RowVersion::V2
579 } else {
580 RowVersion::V1
581 };
582 let col_count = (raw & COL_COUNT_MASK) as usize;
583 let bitmap_bytes = col_count.div_ceil(8);
584 let pos = 2;
585 if data.len() < pos + bitmap_bytes {
586 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
587 }
588 Ok((
589 version,
590 col_count,
591 &data[pos..pos + bitmap_bytes],
592 pos + bitmap_bytes,
593 ))
594}
595
596pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
597 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
598
599 let mut values = Vec::with_capacity(col_count);
600 for i in 0..col_count {
601 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
602 values.push(Value::Null);
603 continue;
604 }
605 let (type_tag, body, next) = read_cell(data, pos, version)?;
606 values.push(decode_value(type_tag, body)?);
607 pos = next;
608 }
609
610 Ok(values)
611}
612
613#[inline]
615pub fn row_non_pk_count(data: &[u8]) -> usize {
616 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
617}
618
619pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
620 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
621
622 for i in 0..col_count {
623 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
624 continue;
625 }
626 let (type_tag, body, next) = read_cell(data, pos, version)?;
627 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
628 out[col_mapping[i]] = decode_value(type_tag, body)?;
629 }
630 pos = next;
631 }
632
633 Ok(())
634}
635
636pub fn decode_pk_into(
637 key: &[u8],
638 count: usize,
639 out: &mut [Value],
640 pk_mapping: &[usize],
641) -> Result<()> {
642 let mut pos = 0;
643 for i in 0..count {
644 let (v, n) = decode_key_value(&key[pos..])?;
645 if i < pk_mapping.len() {
646 out[pk_mapping[i]] = v;
647 }
648 pos += n;
649 }
650 Ok(())
651}
652
653pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
654 if targets.is_empty() {
655 return Ok(Vec::new());
656 }
657 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
658
659 let mut results = Vec::with_capacity(targets.len());
660 let mut ti = 0;
661
662 for col in 0..col_count {
663 if ti >= targets.len() {
664 break;
665 }
666 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
667
668 if col == targets[ti] {
669 if is_null {
670 results.push(Value::Null);
671 } else {
672 let (type_tag, body, next) = read_cell(data, pos, version)?;
673 results.push(decode_value(type_tag, body)?);
674 pos = next;
675 }
676 ti += 1;
677 } else if !is_null {
678 pos = skip_cell(data, pos, version)?;
679 }
680 }
681
682 while ti < targets.len() {
683 results.push(Value::Null);
684 ti += 1;
685 }
686
687 Ok(results)
688}
689
690pub fn decode_columns_into(
691 data: &[u8],
692 targets: &[usize],
693 schema_cols: &[usize],
694 row: &mut [Value],
695) -> Result<()> {
696 if targets.is_empty() {
697 return Ok(());
698 }
699 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
700
701 let mut ti = 0;
702 for col in 0..col_count {
703 if ti >= targets.len() {
704 break;
705 }
706 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
707
708 if col == targets[ti] {
709 if is_null {
710 row[schema_cols[ti]] = Value::Null;
711 } else {
712 let (type_tag, body, next) = read_cell(data, pos, version)?;
713 row[schema_cols[ti]] = decode_value(type_tag, body)?;
714 pos = next;
715 }
716 ti += 1;
717 } else if !is_null {
718 pos = skip_cell(data, pos, version)?;
719 }
720 }
721
722 Ok(())
723}
724
725#[derive(Debug, Clone, Copy)]
726pub enum RawColumn<'a> {
727 Null,
728 Integer(i64),
729 Real(f64),
730 Boolean(bool),
731 Text(&'a str),
732 Blob(&'a [u8]),
733 Time(i64),
734 Date(i32),
735 Timestamp(i64),
736 Interval { months: i32, days: i32, micros: i64 },
737}
738
739impl<'a> RawColumn<'a> {
740 pub fn to_value(self) -> Value {
741 match self {
742 RawColumn::Null => Value::Null,
743 RawColumn::Integer(i) => Value::Integer(i),
744 RawColumn::Real(r) => Value::Real(r),
745 RawColumn::Boolean(b) => Value::Boolean(b),
746 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
747 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
748 RawColumn::Time(t) => Value::Time(t),
749 RawColumn::Date(d) => Value::Date(d),
750 RawColumn::Timestamp(t) => Value::Timestamp(t),
751 RawColumn::Interval {
752 months,
753 days,
754 micros,
755 } => Value::Interval {
756 months,
757 days,
758 micros,
759 },
760 }
761 }
762
763 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
764 use std::cmp::Ordering;
765 match (self, other) {
766 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
767 (RawColumn::Null, _) | (_, Value::Null) => None,
768 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
769 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
770 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
771 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
772 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
773 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
774 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
775 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
776 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
777 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
778 (
779 RawColumn::Interval {
780 months: am,
781 days: ad,
782 micros: au,
783 },
784 Value::Interval {
785 months: bm,
786 days: bd,
787 micros: bu,
788 },
789 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
790 _ => None,
791 }
792 }
793
794 pub fn eq_value(&self, other: &Value) -> bool {
795 match (self, other) {
796 (RawColumn::Null, Value::Null) => true,
797 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
798 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
799 (RawColumn::Real(a), Value::Real(b)) => a == b,
800 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
801 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
802 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
803 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
804 (RawColumn::Time(a), Value::Time(b)) => a == b,
805 (RawColumn::Date(a), Value::Date(b)) => a == b,
806 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
807 (
808 RawColumn::Interval {
809 months: am,
810 days: ad,
811 micros: au,
812 },
813 Value::Interval {
814 months: bm,
815 days: bd,
816 micros: bu,
817 },
818 ) => am == bm && ad == bd && au == bu,
819 _ => false,
820 }
821 }
822
823 pub fn as_f64(&self) -> Option<f64> {
824 match self {
825 RawColumn::Integer(i) => Some(*i as f64),
826 RawColumn::Real(r) => Some(*r),
827 _ => None,
828 }
829 }
830
831 pub fn as_i64(&self) -> Option<i64> {
832 match self {
833 RawColumn::Integer(i) => Some(*i),
834 RawColumn::Time(t) => Some(*t),
835 RawColumn::Date(d) => Some(*d as i64),
836 RawColumn::Timestamp(t) => Some(*t),
837 _ => None,
838 }
839 }
840}
841
842fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
843 match DataType::from_tag(type_tag) {
844 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
845 data[..8].try_into().unwrap(),
846 ))),
847 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
848 data[..8].try_into().unwrap(),
849 ))),
850 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
851 Some(DataType::Text) => {
852 let s = std::str::from_utf8(data)
853 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
854 Ok(RawColumn::Text(s))
855 }
856 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
857 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
858 data[..8].try_into().unwrap(),
859 ))),
860 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
861 data[..4].try_into().unwrap(),
862 ))),
863 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
864 data[..8].try_into().unwrap(),
865 ))),
866 Some(DataType::Interval) => {
867 if data.len() < 16 {
868 return Err(SqlError::InvalidValue("truncated interval".into()));
869 }
870 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
871 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
872 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
873 Ok(RawColumn::Interval {
874 months,
875 days,
876 micros,
877 })
878 }
879 _ => Err(SqlError::InvalidValue(format!(
880 "unknown column type tag: {type_tag}"
881 ))),
882 }
883}
884
885pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
887 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
888 if target >= col_count || new_val.is_null() {
889 return Ok(false);
890 }
891 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
892 if was_null {
893 return Ok(false);
894 }
895 for col in 0..target {
896 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
897 if !is_null {
898 pos = skip_cell(data, pos, version)?;
899 }
900 }
901 let type_tag = data[pos];
902 let (old_data_len, val_start) = match version {
903 RowVersion::V2 => match fixed_width_size(type_tag) {
904 Some(n) => (n, pos + 1),
905 None => {
906 if pos + 5 > data.len() {
907 return Err(SqlError::InvalidValue("truncated column data".into()));
908 }
909 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
910 (len, pos + 5)
911 }
912 },
913 RowVersion::V1 => {
914 if pos + 5 > data.len() {
915 return Err(SqlError::InvalidValue("truncated column data".into()));
916 }
917 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
918 (len, pos + 5)
919 }
920 };
921 let new_data_len = match new_val {
922 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
923 Value::Date(_) => 4,
924 Value::Interval { .. } => 16,
925 Value::Boolean(_) => 1,
926 Value::Text(s) => s.len(),
927 Value::Blob(b) => b.len(),
928 Value::Null => return Ok(false),
929 };
930 if new_data_len != old_data_len {
931 return Ok(false);
932 }
933 data[pos] = new_val.data_type().type_tag();
934 match new_val {
935 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
936 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
937 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
938 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
939 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
940 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
941 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
942 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
943 Value::Interval {
944 months,
945 days,
946 micros,
947 } => {
948 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
949 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
950 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
951 }
952 Value::Null => unreachable!(),
953 }
954 Ok(true)
955}
956
957pub fn patch_row_column(
959 data: &[u8],
960 target: usize,
961 new_val: &Value,
962 out: &mut Vec<u8>,
963) -> Result<()> {
964 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
965
966 let new_col_count = if target >= col_count {
967 target + 1
968 } else {
969 col_count
970 };
971 let new_bitmap_bytes = new_col_count.div_ceil(8);
972 let bitmap_bytes = col_count.div_ceil(8);
973 out.clear();
974
975 let header = (new_col_count as u16) | V2_FLAG;
976 out.extend_from_slice(&header.to_le_bytes());
977 let bitmap_start = out.len();
978 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
979 for _ in bitmap_bytes..new_bitmap_bytes {
980 out.push(0xFF);
981 }
982 if new_val.is_null() {
983 out[bitmap_start + target / 8] |= 1 << (target % 8);
984 } else {
985 out[bitmap_start + target / 8] &= !(1 << (target % 8));
986 }
987
988 let mut pos = header_end;
989 for col in 0..new_col_count {
990 let was_null = if col < col_count {
991 bitmap[col / 8] & (1 << (col % 8)) != 0
992 } else {
993 true
994 };
995
996 if col == target {
997 if !was_null {
998 pos = skip_cell(data, pos, version)?;
999 }
1000 if !new_val.is_null() {
1001 encode_cell_v2(new_val, out);
1002 }
1003 } else if !was_null {
1004 pos = copy_cell_to_v2(data, pos, version, out)?;
1005 }
1006 }
1007 Ok(())
1008}
1009
1010pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1011 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1012 if target >= col_count {
1013 return Ok(RawColumn::Null);
1014 }
1015
1016 for col in 0..=target {
1017 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1018
1019 if col == target {
1020 if is_null {
1021 return Ok(RawColumn::Null);
1022 }
1023 let (type_tag, body, _) = read_cell(data, pos, version)?;
1024 return decode_value_raw(type_tag, body);
1025 } else if !is_null {
1026 pos = skip_cell(data, pos, version)?;
1027 }
1028 }
1029
1030 unreachable!()
1031}
1032
1033pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1035 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1036 if target >= col_count {
1037 return Ok((RawColumn::Null, usize::MAX));
1038 }
1039
1040 for col in 0..=target {
1041 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1042
1043 if col == target {
1044 if is_null {
1045 return Ok((RawColumn::Null, usize::MAX));
1046 }
1047 let tag_offset = pos;
1048 let (type_tag, body, _) = read_cell(data, pos, version)?;
1049 let raw = decode_value_raw(type_tag, body)?;
1050 return Ok((raw, tag_offset));
1051 } else if !is_null {
1052 pos = skip_cell(data, pos, version)?;
1053 }
1054 }
1055
1056 unreachable!()
1057}
1058
1059pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1061 if offset == usize::MAX || new_val.is_null() {
1062 return Ok(false);
1063 }
1064 if data.len() < 2 || offset >= data.len() {
1065 return Err(SqlError::InvalidValue("truncated column data".into()));
1066 }
1067 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1068 RowVersion::V2
1069 } else {
1070 RowVersion::V1
1071 };
1072 let type_tag = data[offset];
1073 let (old_data_len, val_start) = match version {
1074 RowVersion::V2 => match fixed_width_size(type_tag) {
1075 Some(n) => (n, offset + 1),
1076 None => {
1077 if offset + 5 > data.len() {
1078 return Err(SqlError::InvalidValue("truncated column data".into()));
1079 }
1080 let len =
1081 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1082 (len, offset + 5)
1083 }
1084 },
1085 RowVersion::V1 => {
1086 if offset + 5 > data.len() {
1087 return Err(SqlError::InvalidValue("truncated column data".into()));
1088 }
1089 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1090 (len, offset + 5)
1091 }
1092 };
1093 let new_data_len = match new_val {
1094 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1095 Value::Date(_) => 4,
1096 Value::Interval { .. } => 16,
1097 Value::Boolean(_) => 1,
1098 Value::Text(s) => s.len(),
1099 Value::Blob(b) => b.len(),
1100 Value::Null => return Ok(false),
1101 };
1102 if new_data_len != old_data_len {
1103 return Ok(false);
1104 }
1105 data[offset] = new_val.data_type().type_tag();
1106 match new_val {
1107 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1108 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1109 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1110 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1111 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1112 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1113 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1114 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1115 Value::Interval {
1116 months,
1117 days,
1118 micros,
1119 } => {
1120 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1121 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1122 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1123 }
1124 Value::Null => unreachable!(),
1125 }
1126 Ok(true)
1127}
1128
1129pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1130 if key.is_empty() || key[0] != TAG_INTEGER {
1131 return Err(SqlError::InvalidValue("not an integer key".into()));
1132 }
1133 let (val, _) = decode_integer(&key[1..])?;
1134 match val {
1135 Value::Integer(i) => Ok(i),
1136 _ => unreachable!(),
1137 }
1138}
1139
1140#[cfg(test)]
1141#[path = "encoding_tests.rs"]
1142mod tests;