1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17
18pub fn encode_key_value(value: &Value) -> Vec<u8> {
20 let mut buf = Vec::with_capacity(16);
21 encode_key_value_into(value, &mut buf);
22 buf
23}
24
25pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
27 let mut buf = Vec::new();
28 for v in values {
29 buf.extend_from_slice(&encode_key_value(v));
30 }
31 buf
32}
33
34pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
35 buf.clear();
36 for v in values {
37 encode_key_value_into(v, buf);
38 }
39}
40
41#[inline]
42pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
43 buf.clear();
44 encode_signed_varint(TAG_INTEGER, val, buf);
45}
46
47fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
48 match value {
49 Value::Null => buf.push(TAG_NULL),
50 Value::Boolean(b) => {
51 buf.push(TAG_BOOLEAN);
52 buf.push(if *b { 0x01 } else { 0x00 });
53 }
54 Value::Integer(i) => encode_integer_into(*i, buf),
55 Value::Real(r) => encode_real_into(*r, buf),
56 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
57 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
58 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
59 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
60 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
61 Value::Interval {
62 months,
63 days,
64 micros,
65 } => {
66 buf.push(TAG_INTERVAL);
68 let mut mb = months.to_be_bytes();
69 mb[0] ^= 0x80;
70 buf.extend_from_slice(&mb);
71 let mut db = days.to_be_bytes();
72 db[0] ^= 0x80;
73 buf.extend_from_slice(&db);
74 let mut ub = micros.to_be_bytes();
75 ub[0] ^= 0x80;
76 buf.extend_from_slice(&ub);
77 }
78 }
79}
80
81fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
82 encode_signed_varint(TAG_INTEGER, val, buf);
83}
84
85pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
91 buf.push(tag);
92 if val == 0 {
93 buf.push(0x80);
94 return;
95 }
96 if val > 0 {
97 let bytes = val.to_be_bytes();
98 let start = bytes.iter().position(|&b| b != 0).unwrap();
99 let byte_count = (8 - start) as u8;
100 buf.push(0x80 + byte_count);
101 buf.extend_from_slice(&bytes[start..]);
102 } else {
103 let abs_val = if val == i64::MIN {
104 u64::MAX / 2 + 1
105 } else {
106 (-val) as u64
107 };
108 let bytes = abs_val.to_be_bytes();
109 let start = bytes.iter().position(|&b| b != 0).unwrap();
110 let byte_count = (8 - start) as u8;
111 buf.push(0x80 - byte_count);
112 for &b in &bytes[start..] {
113 buf.push(!b);
114 }
115 }
116}
117
118fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
119 buf.push(TAG_REAL);
120 let bits = val.to_bits();
121 let encoded = if val.is_sign_negative() {
122 !bits
123 } else {
124 bits ^ (1u64 << 63)
125 };
126 buf.extend_from_slice(&encoded.to_be_bytes());
127}
128
129fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
130 buf.push(tag);
131 for &b in data {
132 if b == 0x00 {
133 buf.push(0x00);
134 buf.push(0xFF);
135 } else {
136 buf.push(b);
137 }
138 }
139 buf.push(0x00);
140}
141
142pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
144 if data.is_empty() {
145 return Err(SqlError::InvalidValue("empty key data".into()));
146 }
147 match data[0] {
148 TAG_NULL => Ok((Value::Null, 1)),
149 TAG_BOOLEAN => {
150 if data.len() < 2 {
151 return Err(SqlError::InvalidValue("truncated boolean".into()));
152 }
153 Ok((Value::Boolean(data[1] != 0), 2))
154 }
155 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
156 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
157 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
158 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
159 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
160 (Value::Date(d), n + 1)
161 }),
162 TAG_TIMESTAMP => {
163 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
164 }
165 TAG_INTERVAL => {
166 if data.len() < 1 + 16 {
167 return Err(SqlError::InvalidValue("truncated interval".into()));
168 }
169 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
170 mb[0] ^= 0x80;
171 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
172 db[0] ^= 0x80;
173 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
174 ub[0] ^= 0x80;
175 Ok((
176 Value::Interval {
177 months: i32::from_be_bytes(mb),
178 days: i32::from_be_bytes(db),
179 micros: i64::from_be_bytes(ub),
180 },
181 17,
182 ))
183 }
184 TAG_TEXT => {
185 let (bytes, n) = decode_null_escaped(&data[1..])?;
186 let s = String::from_utf8(bytes)
187 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
188 Ok((Value::Text(CompactString::from(s)), n + 1))
189 }
190 TAG_BLOB => {
191 let (bytes, n) = decode_null_escaped(&data[1..])?;
192 Ok((Value::Blob(bytes), n + 1))
193 }
194 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
195 }
196}
197
198pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
200 let mut values = Vec::with_capacity(count);
201 let mut pos = 0;
202 for _ in 0..count {
203 let (v, n) = decode_key_value(&data[pos..])?;
204 values.push(v);
205 pos += n;
206 }
207 Ok(values)
208}
209
210fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
211 let (v, n) = decode_signed_varint(data)?;
212 Ok((Value::Integer(v), n))
213}
214
215pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
217 if data.is_empty() {
218 return Err(SqlError::InvalidValue("truncated integer".into()));
219 }
220 let marker = data[0];
221 if marker == 0x80 {
222 return Ok((0, 1));
223 }
224 if marker > 0x80 {
225 let byte_count = (marker - 0x80) as usize;
226 if data.len() < 1 + byte_count {
227 return Err(SqlError::InvalidValue("truncated positive integer".into()));
228 }
229 let mut bytes = [0u8; 8];
230 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
231 let val = i64::from_be_bytes(bytes);
232 Ok((val, 1 + byte_count))
233 } else {
234 let byte_count = (0x80 - marker) as usize;
235 if data.len() < 1 + byte_count {
236 return Err(SqlError::InvalidValue("truncated negative integer".into()));
237 }
238 let mut bytes = [0u8; 8];
239 for i in 0..byte_count {
240 bytes[8 - byte_count + i] = !data[1 + i];
241 }
242 let abs_val = u64::from_be_bytes(bytes);
243 let val = (-(abs_val as i128)) as i64;
244 Ok((val, 1 + byte_count))
245 }
246}
247
248fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
249 if data.len() < 8 {
250 return Err(SqlError::InvalidValue("truncated real".into()));
251 }
252 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
253 let bits = if encoded & (1u64 << 63) != 0 {
254 encoded ^ (1u64 << 63)
256 } else {
257 !encoded
259 };
260 let val = f64::from_bits(bits);
261 Ok((Value::Real(val), 8))
262}
263
264fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
266 let mut result = Vec::new();
267 let mut i = 0;
268 while i < data.len() {
269 if data[i] == 0x00 {
270 if i + 1 < data.len() && data[i + 1] == 0xFF {
271 result.push(0x00);
272 i += 2;
273 } else {
274 return Ok((result, i + 1)); }
276 } else {
277 result.push(data[i]);
278 i += 1;
279 }
280 }
281 Err(SqlError::InvalidValue(
282 "unterminated null-escaped string".into(),
283 ))
284}
285
286fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
287 match v {
288 Value::Integer(val) => {
289 buf.push(DataType::Integer.type_tag());
290 buf.extend_from_slice(&val.to_le_bytes());
291 }
292 Value::Real(r) => {
293 buf.push(DataType::Real.type_tag());
294 buf.extend_from_slice(&r.to_le_bytes());
295 }
296 Value::Boolean(b) => {
297 buf.push(DataType::Boolean.type_tag());
298 buf.push(if *b { 1 } else { 0 });
299 }
300 Value::Text(s) => {
301 let bytes = s.as_bytes();
302 buf.push(DataType::Text.type_tag());
303 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
304 buf.extend_from_slice(bytes);
305 }
306 Value::Blob(data) => {
307 buf.push(DataType::Blob.type_tag());
308 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
309 buf.extend_from_slice(data);
310 }
311 Value::Time(t) => {
312 buf.push(DataType::Time.type_tag());
313 buf.extend_from_slice(&t.to_le_bytes());
314 }
315 Value::Date(d) => {
316 buf.push(DataType::Date.type_tag());
317 buf.extend_from_slice(&d.to_le_bytes());
318 }
319 Value::Timestamp(t) => {
320 buf.push(DataType::Timestamp.type_tag());
321 buf.extend_from_slice(&t.to_le_bytes());
322 }
323 Value::Interval {
324 months,
325 days,
326 micros,
327 } => {
328 buf.push(DataType::Interval.type_tag());
329 buf.extend_from_slice(&months.to_le_bytes());
330 buf.extend_from_slice(&days.to_le_bytes());
331 buf.extend_from_slice(µs.to_le_bytes());
332 }
333 Value::Null => unreachable!(),
334 }
335}
336
337pub fn encode_row(values: &[Value]) -> Vec<u8> {
338 let mut buf = Vec::new();
339 encode_row_into(values, &mut buf);
340 buf
341}
342
343pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
344 buf.clear();
345 let col_count = values.len();
346 let bitmap_bytes = col_count.div_ceil(8);
347
348 let header = (col_count as u16) | V2_FLAG;
349 buf.extend_from_slice(&header.to_le_bytes());
350
351 let bitmap_start = buf.len();
352 buf.resize(buf.len() + bitmap_bytes, 0);
353
354 for (i, v) in values.iter().enumerate() {
355 if v.is_null() {
356 buf[bitmap_start + i / 8] |= 1 << (i % 8);
357 continue;
358 }
359 encode_cell_v2(v, buf);
360 }
361}
362
363pub struct IntRowTemplate {
364 pub template: Vec<u8>,
365 pub slot_offsets: Vec<(usize, usize)>,
366}
367
368pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
369 let bitmap_bytes = phys_count.div_ceil(8);
370 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
371 let header = (phys_count as u16) | V2_FLAG;
372 template.extend_from_slice(&header.to_le_bytes());
373 let bitmap_start = template.len();
374 template.resize(bitmap_start + bitmap_bytes, 0);
375 for &i in null_slots {
376 template[bitmap_start + i / 8] |= 1 << (i % 8);
377 }
378 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
379 for slot in 0..phys_count {
380 if null_slots.contains(&slot) {
381 continue;
382 }
383 template.push(DataType::Integer.type_tag());
384 let value_offset = template.len();
385 template.extend_from_slice(&[0u8; 8]);
386 slot_offsets.push((slot, value_offset));
387 }
388 IntRowTemplate {
389 template,
390 slot_offsets,
391 }
392}
393
394#[inline]
396pub fn encode_int_row_with_template(
397 tmpl: &IntRowTemplate,
398 values: &[Value],
399 buf: &mut Vec<u8>,
400) -> Result<()> {
401 buf.clear();
402 buf.extend_from_slice(&tmpl.template);
403 for &(slot, off) in &tmpl.slot_offsets {
404 match &values[slot] {
405 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
406 other => {
407 return Err(SqlError::TypeMismatch {
408 expected: "Integer".into(),
409 got: other.data_type().to_string(),
410 });
411 }
412 }
413 }
414 Ok(())
415}
416
417fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
418 match DataType::from_tag(type_tag) {
419 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
420 data[..8].try_into().unwrap(),
421 ))),
422 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
423 data[..8].try_into().unwrap(),
424 ))),
425 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
426 Some(DataType::Text) => {
427 let s = std::str::from_utf8(data)
428 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
429 Ok(Value::Text(CompactString::from(s)))
430 }
431 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
432 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
433 data[..8].try_into().unwrap(),
434 ))),
435 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
436 data[..4].try_into().unwrap(),
437 ))),
438 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
439 data[..8].try_into().unwrap(),
440 ))),
441 Some(DataType::Interval) => {
442 if data.len() < 16 {
443 return Err(SqlError::InvalidValue("truncated interval".into()));
444 }
445 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
446 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
447 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
448 Ok(Value::Interval {
449 months,
450 days,
451 micros,
452 })
453 }
454 _ => Err(SqlError::InvalidValue(format!(
455 "unknown column type tag: {type_tag}"
456 ))),
457 }
458}
459
460#[derive(Clone, Copy, PartialEq, Eq, Debug)]
463pub(crate) enum RowVersion {
464 V1,
465 V2,
466}
467
468pub(crate) const V2_FLAG: u16 = 0x8000;
469pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
470
471#[inline]
472pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
473 match DataType::from_tag(type_tag)? {
474 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
475 DataType::Date => Some(4),
476 DataType::Boolean => Some(1),
477 DataType::Interval => Some(16),
478 DataType::Text | DataType::Blob | DataType::Null => None,
479 }
480}
481
482#[inline]
483fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
484 if pos >= data.len() {
485 return Err(SqlError::InvalidValue("truncated column data".into()));
486 }
487 let type_tag = data[pos];
488 let after_tag = pos + 1;
489 let (data_len, body_pos) = match version {
490 RowVersion::V2 => match fixed_width_size(type_tag) {
491 Some(n) => (n, after_tag),
492 None => {
493 if after_tag + 4 > data.len() {
494 return Err(SqlError::InvalidValue("truncated column data".into()));
495 }
496 let len = u32::from_le_bytes([
497 data[after_tag],
498 data[after_tag + 1],
499 data[after_tag + 2],
500 data[after_tag + 3],
501 ]) as usize;
502 (len, after_tag + 4)
503 }
504 },
505 RowVersion::V1 => {
506 if after_tag + 4 > data.len() {
507 return Err(SqlError::InvalidValue("truncated column data".into()));
508 }
509 let len = u32::from_le_bytes([
510 data[after_tag],
511 data[after_tag + 1],
512 data[after_tag + 2],
513 data[after_tag + 3],
514 ]) as usize;
515 (len, after_tag + 4)
516 }
517 };
518 if body_pos + data_len > data.len() {
519 return Err(SqlError::InvalidValue("truncated column value".into()));
520 }
521 Ok((
522 type_tag,
523 &data[body_pos..body_pos + data_len],
524 body_pos + data_len,
525 ))
526}
527
528#[inline]
529fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
530 let (_, _, next) = read_cell(data, pos, version)?;
531 Ok(next)
532}
533
534fn copy_cell_to_v2(
535 data: &[u8],
536 pos: usize,
537 version: RowVersion,
538 out: &mut Vec<u8>,
539) -> Result<usize> {
540 let (tag, body, next) = read_cell(data, pos, version)?;
541 out.push(tag);
542 if fixed_width_size(tag).is_none() {
543 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
544 }
545 out.extend_from_slice(body);
546 Ok(next)
547}
548
549fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
550 if data.len() < 2 {
551 return Err(SqlError::InvalidValue("row data too short".into()));
552 }
553 let raw = u16::from_le_bytes([data[0], data[1]]);
554 let version = if raw & V2_FLAG != 0 {
555 RowVersion::V2
556 } else {
557 RowVersion::V1
558 };
559 let col_count = (raw & COL_COUNT_MASK) as usize;
560 let bitmap_bytes = col_count.div_ceil(8);
561 let pos = 2;
562 if data.len() < pos + bitmap_bytes {
563 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
564 }
565 Ok((
566 version,
567 col_count,
568 &data[pos..pos + bitmap_bytes],
569 pos + bitmap_bytes,
570 ))
571}
572
573pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
574 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
575
576 let mut values = Vec::with_capacity(col_count);
577 for i in 0..col_count {
578 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
579 values.push(Value::Null);
580 continue;
581 }
582 let (type_tag, body, next) = read_cell(data, pos, version)?;
583 values.push(decode_value(type_tag, body)?);
584 pos = next;
585 }
586
587 Ok(values)
588}
589
590#[inline]
592pub fn row_non_pk_count(data: &[u8]) -> usize {
593 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
594}
595
596pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
597 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
598
599 for i in 0..col_count {
600 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
601 continue;
602 }
603 let (type_tag, body, next) = read_cell(data, pos, version)?;
604 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
605 out[col_mapping[i]] = decode_value(type_tag, body)?;
606 }
607 pos = next;
608 }
609
610 Ok(())
611}
612
613pub fn decode_pk_into(
614 key: &[u8],
615 count: usize,
616 out: &mut [Value],
617 pk_mapping: &[usize],
618) -> Result<()> {
619 let mut pos = 0;
620 for i in 0..count {
621 let (v, n) = decode_key_value(&key[pos..])?;
622 if i < pk_mapping.len() {
623 out[pk_mapping[i]] = v;
624 }
625 pos += n;
626 }
627 Ok(())
628}
629
630pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
631 if targets.is_empty() {
632 return Ok(Vec::new());
633 }
634 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
635
636 let mut results = Vec::with_capacity(targets.len());
637 let mut ti = 0;
638
639 for col in 0..col_count {
640 if ti >= targets.len() {
641 break;
642 }
643 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
644
645 if col == targets[ti] {
646 if is_null {
647 results.push(Value::Null);
648 } else {
649 let (type_tag, body, next) = read_cell(data, pos, version)?;
650 results.push(decode_value(type_tag, body)?);
651 pos = next;
652 }
653 ti += 1;
654 } else if !is_null {
655 pos = skip_cell(data, pos, version)?;
656 }
657 }
658
659 while ti < targets.len() {
660 results.push(Value::Null);
661 ti += 1;
662 }
663
664 Ok(results)
665}
666
667pub fn decode_columns_into(
668 data: &[u8],
669 targets: &[usize],
670 schema_cols: &[usize],
671 row: &mut [Value],
672) -> Result<()> {
673 if targets.is_empty() {
674 return Ok(());
675 }
676 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
677
678 let mut ti = 0;
679 for col in 0..col_count {
680 if ti >= targets.len() {
681 break;
682 }
683 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
684
685 if col == targets[ti] {
686 if is_null {
687 row[schema_cols[ti]] = Value::Null;
688 } else {
689 let (type_tag, body, next) = read_cell(data, pos, version)?;
690 row[schema_cols[ti]] = decode_value(type_tag, body)?;
691 pos = next;
692 }
693 ti += 1;
694 } else if !is_null {
695 pos = skip_cell(data, pos, version)?;
696 }
697 }
698
699 Ok(())
700}
701
702#[derive(Debug, Clone, Copy)]
703pub enum RawColumn<'a> {
704 Null,
705 Integer(i64),
706 Real(f64),
707 Boolean(bool),
708 Text(&'a str),
709 Blob(&'a [u8]),
710 Time(i64),
711 Date(i32),
712 Timestamp(i64),
713 Interval { months: i32, days: i32, micros: i64 },
714}
715
716impl<'a> RawColumn<'a> {
717 pub fn to_value(self) -> Value {
718 match self {
719 RawColumn::Null => Value::Null,
720 RawColumn::Integer(i) => Value::Integer(i),
721 RawColumn::Real(r) => Value::Real(r),
722 RawColumn::Boolean(b) => Value::Boolean(b),
723 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
724 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
725 RawColumn::Time(t) => Value::Time(t),
726 RawColumn::Date(d) => Value::Date(d),
727 RawColumn::Timestamp(t) => Value::Timestamp(t),
728 RawColumn::Interval {
729 months,
730 days,
731 micros,
732 } => Value::Interval {
733 months,
734 days,
735 micros,
736 },
737 }
738 }
739
740 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
741 use std::cmp::Ordering;
742 match (self, other) {
743 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
744 (RawColumn::Null, _) | (_, Value::Null) => None,
745 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
746 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
747 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
748 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
749 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
750 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
751 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
752 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
753 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
754 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
755 (
756 RawColumn::Interval {
757 months: am,
758 days: ad,
759 micros: au,
760 },
761 Value::Interval {
762 months: bm,
763 days: bd,
764 micros: bu,
765 },
766 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
767 _ => None,
768 }
769 }
770
771 pub fn eq_value(&self, other: &Value) -> bool {
772 match (self, other) {
773 (RawColumn::Null, Value::Null) => true,
774 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
775 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
776 (RawColumn::Real(a), Value::Real(b)) => a == b,
777 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
778 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
779 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
780 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
781 (RawColumn::Time(a), Value::Time(b)) => a == b,
782 (RawColumn::Date(a), Value::Date(b)) => a == b,
783 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
784 (
785 RawColumn::Interval {
786 months: am,
787 days: ad,
788 micros: au,
789 },
790 Value::Interval {
791 months: bm,
792 days: bd,
793 micros: bu,
794 },
795 ) => am == bm && ad == bd && au == bu,
796 _ => false,
797 }
798 }
799
800 pub fn as_f64(&self) -> Option<f64> {
801 match self {
802 RawColumn::Integer(i) => Some(*i as f64),
803 RawColumn::Real(r) => Some(*r),
804 _ => None,
805 }
806 }
807
808 pub fn as_i64(&self) -> Option<i64> {
809 match self {
810 RawColumn::Integer(i) => Some(*i),
811 RawColumn::Time(t) => Some(*t),
812 RawColumn::Date(d) => Some(*d as i64),
813 RawColumn::Timestamp(t) => Some(*t),
814 _ => None,
815 }
816 }
817}
818
819fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
820 match DataType::from_tag(type_tag) {
821 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
822 data[..8].try_into().unwrap(),
823 ))),
824 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
825 data[..8].try_into().unwrap(),
826 ))),
827 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
828 Some(DataType::Text) => {
829 let s = std::str::from_utf8(data)
830 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
831 Ok(RawColumn::Text(s))
832 }
833 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
834 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
835 data[..8].try_into().unwrap(),
836 ))),
837 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
838 data[..4].try_into().unwrap(),
839 ))),
840 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
841 data[..8].try_into().unwrap(),
842 ))),
843 Some(DataType::Interval) => {
844 if data.len() < 16 {
845 return Err(SqlError::InvalidValue("truncated interval".into()));
846 }
847 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
848 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
849 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
850 Ok(RawColumn::Interval {
851 months,
852 days,
853 micros,
854 })
855 }
856 _ => Err(SqlError::InvalidValue(format!(
857 "unknown column type tag: {type_tag}"
858 ))),
859 }
860}
861
862pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
864 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
865 if target >= col_count || new_val.is_null() {
866 return Ok(false);
867 }
868 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
869 if was_null {
870 return Ok(false);
871 }
872 for col in 0..target {
873 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
874 if !is_null {
875 pos = skip_cell(data, pos, version)?;
876 }
877 }
878 let type_tag = data[pos];
879 let (old_data_len, val_start) = match version {
880 RowVersion::V2 => match fixed_width_size(type_tag) {
881 Some(n) => (n, pos + 1),
882 None => {
883 if pos + 5 > data.len() {
884 return Err(SqlError::InvalidValue("truncated column data".into()));
885 }
886 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
887 (len, pos + 5)
888 }
889 },
890 RowVersion::V1 => {
891 if pos + 5 > data.len() {
892 return Err(SqlError::InvalidValue("truncated column data".into()));
893 }
894 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
895 (len, pos + 5)
896 }
897 };
898 let new_data_len = match new_val {
899 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
900 Value::Date(_) => 4,
901 Value::Interval { .. } => 16,
902 Value::Boolean(_) => 1,
903 Value::Text(s) => s.len(),
904 Value::Blob(b) => b.len(),
905 Value::Null => return Ok(false),
906 };
907 if new_data_len != old_data_len {
908 return Ok(false);
909 }
910 data[pos] = new_val.data_type().type_tag();
911 match new_val {
912 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
913 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
914 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
915 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
916 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
917 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
918 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
919 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
920 Value::Interval {
921 months,
922 days,
923 micros,
924 } => {
925 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
926 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
927 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
928 }
929 Value::Null => unreachable!(),
930 }
931 Ok(true)
932}
933
934pub fn patch_row_column(
936 data: &[u8],
937 target: usize,
938 new_val: &Value,
939 out: &mut Vec<u8>,
940) -> Result<()> {
941 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
942
943 let new_col_count = if target >= col_count {
944 target + 1
945 } else {
946 col_count
947 };
948 let new_bitmap_bytes = new_col_count.div_ceil(8);
949 let bitmap_bytes = col_count.div_ceil(8);
950 out.clear();
951
952 let header = (new_col_count as u16) | V2_FLAG;
953 out.extend_from_slice(&header.to_le_bytes());
954 let bitmap_start = out.len();
955 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
956 for _ in bitmap_bytes..new_bitmap_bytes {
957 out.push(0xFF);
958 }
959 if new_val.is_null() {
960 out[bitmap_start + target / 8] |= 1 << (target % 8);
961 } else {
962 out[bitmap_start + target / 8] &= !(1 << (target % 8));
963 }
964
965 let mut pos = header_end;
966 for col in 0..new_col_count {
967 let was_null = if col < col_count {
968 bitmap[col / 8] & (1 << (col % 8)) != 0
969 } else {
970 true
971 };
972
973 if col == target {
974 if !was_null {
975 pos = skip_cell(data, pos, version)?;
976 }
977 if !new_val.is_null() {
978 encode_cell_v2(new_val, out);
979 }
980 } else if !was_null {
981 pos = copy_cell_to_v2(data, pos, version, out)?;
982 }
983 }
984 Ok(())
985}
986
987pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
988 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
989 if target >= col_count {
990 return Ok(RawColumn::Null);
991 }
992
993 for col in 0..=target {
994 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
995
996 if col == target {
997 if is_null {
998 return Ok(RawColumn::Null);
999 }
1000 let (type_tag, body, _) = read_cell(data, pos, version)?;
1001 return decode_value_raw(type_tag, body);
1002 } else if !is_null {
1003 pos = skip_cell(data, pos, version)?;
1004 }
1005 }
1006
1007 unreachable!()
1008}
1009
1010pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1012 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1013 if target >= col_count {
1014 return Ok((RawColumn::Null, usize::MAX));
1015 }
1016
1017 for col in 0..=target {
1018 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1019
1020 if col == target {
1021 if is_null {
1022 return Ok((RawColumn::Null, usize::MAX));
1023 }
1024 let tag_offset = pos;
1025 let (type_tag, body, _) = read_cell(data, pos, version)?;
1026 let raw = decode_value_raw(type_tag, body)?;
1027 return Ok((raw, tag_offset));
1028 } else if !is_null {
1029 pos = skip_cell(data, pos, version)?;
1030 }
1031 }
1032
1033 unreachable!()
1034}
1035
1036pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1038 if offset == usize::MAX || new_val.is_null() {
1039 return Ok(false);
1040 }
1041 if data.len() < 2 || offset >= data.len() {
1042 return Err(SqlError::InvalidValue("truncated column data".into()));
1043 }
1044 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1045 RowVersion::V2
1046 } else {
1047 RowVersion::V1
1048 };
1049 let type_tag = data[offset];
1050 let (old_data_len, val_start) = match version {
1051 RowVersion::V2 => match fixed_width_size(type_tag) {
1052 Some(n) => (n, offset + 1),
1053 None => {
1054 if offset + 5 > data.len() {
1055 return Err(SqlError::InvalidValue("truncated column data".into()));
1056 }
1057 let len =
1058 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1059 (len, offset + 5)
1060 }
1061 },
1062 RowVersion::V1 => {
1063 if offset + 5 > data.len() {
1064 return Err(SqlError::InvalidValue("truncated column data".into()));
1065 }
1066 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1067 (len, offset + 5)
1068 }
1069 };
1070 let new_data_len = match new_val {
1071 Value::Integer(_) | Value::Real(_) | Value::Time(_) | Value::Timestamp(_) => 8,
1072 Value::Date(_) => 4,
1073 Value::Interval { .. } => 16,
1074 Value::Boolean(_) => 1,
1075 Value::Text(s) => s.len(),
1076 Value::Blob(b) => b.len(),
1077 Value::Null => return Ok(false),
1078 };
1079 if new_data_len != old_data_len {
1080 return Ok(false);
1081 }
1082 data[offset] = new_val.data_type().type_tag();
1083 match new_val {
1084 Value::Integer(v) => data[val_start..val_start + 8].copy_from_slice(&v.to_le_bytes()),
1085 Value::Real(r) => data[val_start..val_start + 8].copy_from_slice(&r.to_le_bytes()),
1086 Value::Boolean(b) => data[val_start] = if *b { 1 } else { 0 },
1087 Value::Text(s) => data[val_start..val_start + s.len()].copy_from_slice(s.as_bytes()),
1088 Value::Blob(d) => data[val_start..val_start + d.len()].copy_from_slice(d),
1089 Value::Time(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1090 Value::Date(d) => data[val_start..val_start + 4].copy_from_slice(&d.to_le_bytes()),
1091 Value::Timestamp(t) => data[val_start..val_start + 8].copy_from_slice(&t.to_le_bytes()),
1092 Value::Interval {
1093 months,
1094 days,
1095 micros,
1096 } => {
1097 data[val_start..val_start + 4].copy_from_slice(&months.to_le_bytes());
1098 data[val_start + 4..val_start + 8].copy_from_slice(&days.to_le_bytes());
1099 data[val_start + 8..val_start + 16].copy_from_slice(µs.to_le_bytes());
1100 }
1101 Value::Null => unreachable!(),
1102 }
1103 Ok(true)
1104}
1105
1106pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1107 if key.is_empty() || key[0] != TAG_INTEGER {
1108 return Err(SqlError::InvalidValue("not an integer key".into()));
1109 }
1110 let (val, _) = decode_integer(&key[1..])?;
1111 match val {
1112 Value::Integer(i) => Ok(i),
1113 _ => unreachable!(),
1114 }
1115}
1116
1117#[cfg(test)]
1118#[path = "encoding_tests.rs"]
1119mod tests;