1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22const TAG_VECTOR: u8 = 0x0F;
23
24pub fn encode_key_value(value: &Value) -> Vec<u8> {
26 let mut buf = Vec::with_capacity(16);
27 encode_key_value_into(value, &mut buf);
28 buf
29}
30
31pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
33 let mut buf = Vec::new();
34 for v in values {
35 buf.extend_from_slice(&encode_key_value(v));
36 }
37 buf
38}
39
40pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
41 buf.clear();
42 for v in values {
43 encode_key_value_into(v, buf);
44 }
45}
46
47pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
48 buf.clear();
49 for &i in indices {
50 encode_key_value_into(&row[i as usize], buf);
51 }
52}
53
54#[inline]
55pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
56 buf.clear();
57 encode_signed_varint(TAG_INTEGER, val, buf);
58}
59
60pub(crate) fn encode_key_value_collated_into(
61 value: &Value,
62 coll: crate::types::Collation,
63 buf: &mut Vec<u8>,
64) {
65 match (value, coll) {
66 (Value::Text(s), crate::types::Collation::NoCase) => {
67 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
68 }
69 (Value::Text(s), crate::types::Collation::Rtrim) => {
70 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
71 }
72 _ => encode_key_value_into(value, buf),
73 }
74}
75
76pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
77 match value {
78 Value::Null => buf.push(TAG_NULL),
79 Value::Boolean(b) => {
80 buf.push(TAG_BOOLEAN);
81 buf.push(if *b { 0x01 } else { 0x00 });
82 }
83 Value::Integer(i) => encode_integer_into(*i, buf),
84 Value::Real(r) => encode_real_into(*r, buf),
85 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
86 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
87 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
88 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
89 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
90 Value::Interval {
91 months,
92 days,
93 micros,
94 } => {
95 buf.push(TAG_INTERVAL);
97 let mut mb = months.to_be_bytes();
98 mb[0] ^= 0x80;
99 buf.extend_from_slice(&mb);
100 let mut db = days.to_be_bytes();
101 db[0] ^= 0x80;
102 buf.extend_from_slice(&db);
103 let mut ub = micros.to_be_bytes();
104 ub[0] ^= 0x80;
105 buf.extend_from_slice(&ub);
106 }
107 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
108 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
109 Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
110 Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
111 Value::Array(a) => encode_array_into(a, buf),
112 Value::Vector(v) => encode_vector_into(v, buf),
113 }
114}
115
116fn encode_vector_into(v: &[f32], buf: &mut Vec<u8>) {
117 buf.push(TAG_VECTOR);
118 let mut inner = Vec::with_capacity(2 + v.len() * 4);
119 inner.extend_from_slice(&(v.len() as u16).to_le_bytes());
120 for &x in v {
121 inner.extend_from_slice(&x.to_le_bytes());
122 }
123 encode_bytes_into_no_tag(&inner, buf);
124}
125
126fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
127 buf.push(TAG_ARRAY);
128 let mut inner = Vec::new();
129 for v in elems {
130 encode_key_value_into(v, &mut inner);
131 }
132 encode_bytes_into_no_tag(&inner, buf);
133}
134
135fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
136 for &b in data {
137 if b == 0x00 {
138 buf.push(0x00);
139 buf.push(0xFF);
140 } else {
141 buf.push(b);
142 }
143 }
144 buf.push(0x00);
145}
146
147fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
148 encode_signed_varint(TAG_INTEGER, val, buf);
149}
150
151pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
157 buf.push(tag);
158 if val == 0 {
159 buf.push(0x80);
160 return;
161 }
162 if val > 0 {
163 let bytes = val.to_be_bytes();
164 let start = bytes.iter().position(|&b| b != 0).unwrap();
165 let byte_count = (8 - start) as u8;
166 buf.push(0x80 + byte_count);
167 buf.extend_from_slice(&bytes[start..]);
168 } else {
169 let abs_val = if val == i64::MIN {
170 u64::MAX / 2 + 1
171 } else {
172 (-val) as u64
173 };
174 let bytes = abs_val.to_be_bytes();
175 let start = bytes.iter().position(|&b| b != 0).unwrap();
176 let byte_count = (8 - start) as u8;
177 buf.push(0x80 - byte_count);
178 for &b in &bytes[start..] {
179 buf.push(!b);
180 }
181 }
182}
183
184fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
185 buf.push(TAG_REAL);
186 let bits = val.to_bits();
187 let encoded = if val.is_sign_negative() {
188 !bits
189 } else {
190 bits ^ (1u64 << 63)
191 };
192 buf.extend_from_slice(&encoded.to_be_bytes());
193}
194
195fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
196 buf.push(tag);
197 for &b in data {
198 if b == 0x00 {
199 buf.push(0x00);
200 buf.push(0xFF);
201 } else {
202 buf.push(b);
203 }
204 }
205 buf.push(0x00);
206}
207
208pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
210 if data.is_empty() {
211 return Err(SqlError::InvalidValue("empty key data".into()));
212 }
213 match data[0] {
214 TAG_NULL => Ok((Value::Null, 1)),
215 TAG_BOOLEAN => {
216 if data.len() < 2 {
217 return Err(SqlError::InvalidValue("truncated boolean".into()));
218 }
219 Ok((Value::Boolean(data[1] != 0), 2))
220 }
221 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
222 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
223 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
224 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
225 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
226 (Value::Date(d), n + 1)
227 }),
228 TAG_TIMESTAMP => {
229 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
230 }
231 TAG_INTERVAL => {
232 if data.len() < 1 + 16 {
233 return Err(SqlError::InvalidValue("truncated interval".into()));
234 }
235 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
236 mb[0] ^= 0x80;
237 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
238 db[0] ^= 0x80;
239 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
240 ub[0] ^= 0x80;
241 Ok((
242 Value::Interval {
243 months: i32::from_be_bytes(mb),
244 days: i32::from_be_bytes(db),
245 micros: i64::from_be_bytes(ub),
246 },
247 17,
248 ))
249 }
250 TAG_TEXT => {
251 let (bytes, n) = decode_null_escaped(&data[1..])?;
252 let s = String::from_utf8(bytes)
253 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
254 Ok((Value::Text(CompactString::from(s)), n + 1))
255 }
256 TAG_BLOB => {
257 let (bytes, n) = decode_null_escaped(&data[1..])?;
258 Ok((Value::Blob(bytes), n + 1))
259 }
260 TAG_JSON => {
261 let (bytes, n) = decode_null_escaped(&data[1..])?;
262 let s = String::from_utf8(bytes)
263 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
264 Ok((Value::Json(CompactString::from(s)), n + 1))
265 }
266 TAG_JSONB => {
267 let (bytes, n) = decode_null_escaped(&data[1..])?;
268 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
269 }
270 TAG_TSVECTOR => {
271 let (bytes, n) = decode_null_escaped(&data[1..])?;
272 Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
273 }
274 TAG_TSQUERY => {
275 let (bytes, n) = decode_null_escaped(&data[1..])?;
276 Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
277 }
278 TAG_ARRAY => {
279 let (inner, n) = decode_null_escaped(&data[1..])?;
280 let mut elems = Vec::new();
281 let mut pos = 0;
282 while pos < inner.len() {
283 let (v, vlen) = decode_key_value(&inner[pos..])?;
284 elems.push(v);
285 pos += vlen;
286 }
287 Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
288 }
289 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
290 }
291}
292
293pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
295 let mut values = Vec::with_capacity(count);
296 let mut pos = 0;
297 for _ in 0..count {
298 let (v, n) = decode_key_value(&data[pos..])?;
299 values.push(v);
300 pos += n;
301 }
302 Ok(values)
303}
304
305fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
306 let (v, n) = decode_signed_varint(data)?;
307 Ok((Value::Integer(v), n))
308}
309
310pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
312 if data.is_empty() {
313 return Err(SqlError::InvalidValue("truncated integer".into()));
314 }
315 let marker = data[0];
316 if marker == 0x80 {
317 return Ok((0, 1));
318 }
319 if marker > 0x80 {
320 let byte_count = (marker - 0x80) as usize;
321 if data.len() < 1 + byte_count {
322 return Err(SqlError::InvalidValue("truncated positive integer".into()));
323 }
324 let mut bytes = [0u8; 8];
325 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
326 let val = i64::from_be_bytes(bytes);
327 Ok((val, 1 + byte_count))
328 } else {
329 let byte_count = (0x80 - marker) as usize;
330 if data.len() < 1 + byte_count {
331 return Err(SqlError::InvalidValue("truncated negative integer".into()));
332 }
333 let mut bytes = [0u8; 8];
334 for i in 0..byte_count {
335 bytes[8 - byte_count + i] = !data[1 + i];
336 }
337 let abs_val = u64::from_be_bytes(bytes);
338 let val = (-(abs_val as i128)) as i64;
339 Ok((val, 1 + byte_count))
340 }
341}
342
343fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
344 if data.len() < 8 {
345 return Err(SqlError::InvalidValue("truncated real".into()));
346 }
347 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
348 let bits = if encoded & (1u64 << 63) != 0 {
349 encoded ^ (1u64 << 63)
351 } else {
352 !encoded
354 };
355 let val = f64::from_bits(bits);
356 Ok((Value::Real(val), 8))
357}
358
359fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
361 let mut result = Vec::new();
362 let mut i = 0;
363 while i < data.len() {
364 if data[i] == 0x00 {
365 if i + 1 < data.len() && data[i + 1] == 0xFF {
366 result.push(0x00);
367 i += 2;
368 } else {
369 return Ok((result, i + 1)); }
371 } else {
372 result.push(data[i]);
373 i += 1;
374 }
375 }
376 Err(SqlError::InvalidValue(
377 "unterminated null-escaped string".into(),
378 ))
379}
380
381fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
382 match v {
383 Value::Integer(val) => {
384 buf.push(DataType::Integer.type_tag());
385 buf.extend_from_slice(&val.to_le_bytes());
386 }
387 Value::Real(r) => {
388 buf.push(DataType::Real.type_tag());
389 buf.extend_from_slice(&r.to_le_bytes());
390 }
391 Value::Boolean(b) => {
392 buf.push(DataType::Boolean.type_tag());
393 buf.push(if *b { 1 } else { 0 });
394 }
395 Value::Text(s) => {
396 let bytes = s.as_bytes();
397 buf.push(DataType::Text.type_tag());
398 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
399 buf.extend_from_slice(bytes);
400 }
401 Value::Blob(data) => {
402 buf.push(DataType::Blob.type_tag());
403 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
404 buf.extend_from_slice(data);
405 }
406 Value::Time(t) => {
407 buf.push(DataType::Time.type_tag());
408 buf.extend_from_slice(&t.to_le_bytes());
409 }
410 Value::Date(d) => {
411 buf.push(DataType::Date.type_tag());
412 buf.extend_from_slice(&d.to_le_bytes());
413 }
414 Value::Timestamp(t) => {
415 buf.push(DataType::Timestamp.type_tag());
416 buf.extend_from_slice(&t.to_le_bytes());
417 }
418 Value::Interval {
419 months,
420 days,
421 micros,
422 } => {
423 buf.push(DataType::Interval.type_tag());
424 buf.extend_from_slice(&months.to_le_bytes());
425 buf.extend_from_slice(&days.to_le_bytes());
426 buf.extend_from_slice(µs.to_le_bytes());
427 }
428 Value::Json(s) => {
429 let bytes = s.as_bytes();
430 buf.push(DataType::Json.type_tag());
431 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
432 buf.extend_from_slice(bytes);
433 }
434 Value::Jsonb(b) => {
435 buf.push(DataType::Jsonb.type_tag());
436 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
437 buf.extend_from_slice(b);
438 }
439 Value::TsVector(b) => {
440 buf.push(DataType::TsVector.type_tag());
441 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
442 buf.extend_from_slice(b);
443 }
444 Value::TsQuery(b) => {
445 buf.push(DataType::TsQuery.type_tag());
446 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
447 buf.extend_from_slice(b);
448 }
449 Value::Array(a) => {
450 buf.push(DataType::Array.type_tag());
451 let len = encoded_array_v2_size(a);
452 buf.extend_from_slice(&(len as u32).to_le_bytes());
453 let start = buf.len();
454 buf.resize(start + len, 0);
455 write_array_v2_into_slice(a, &mut buf[start..start + len]);
456 }
457 Value::Vector(v) => {
458 buf.push(
459 DataType::Vector {
460 dim: v.len() as u16,
461 }
462 .type_tag(),
463 );
464 let len = 2 + v.len() * 4;
465 buf.extend_from_slice(&(len as u32).to_le_bytes());
466 buf.extend_from_slice(&(v.len() as u16).to_le_bytes());
467 for &x in v.iter() {
468 buf.extend_from_slice(&x.to_le_bytes());
469 }
470 }
471 Value::Null => unreachable!(),
472 }
473}
474
475pub fn encode_row(values: &[Value]) -> Vec<u8> {
476 let mut buf = Vec::new();
477 encode_row_into(values, &mut buf);
478 buf
479}
480
481pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
482 buf.clear();
483 let col_count = values.len();
484 let bitmap_bytes = col_count.div_ceil(8);
485
486 let header = (col_count as u16) | V2_FLAG;
487 buf.extend_from_slice(&header.to_le_bytes());
488
489 let bitmap_start = buf.len();
490 buf.resize(buf.len() + bitmap_bytes, 0);
491
492 for (i, v) in values.iter().enumerate() {
493 if v.is_null() {
494 buf[bitmap_start + i / 8] |= 1 << (i % 8);
495 continue;
496 }
497 encode_cell_v2(v, buf);
498 }
499}
500
501pub struct IntRowTemplate {
502 pub template: Vec<u8>,
503 pub slot_offsets: Vec<(usize, usize)>,
504}
505
506pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
507 let bitmap_bytes = phys_count.div_ceil(8);
508 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
509 let header = (phys_count as u16) | V2_FLAG;
510 template.extend_from_slice(&header.to_le_bytes());
511 let bitmap_start = template.len();
512 template.resize(bitmap_start + bitmap_bytes, 0);
513 for &i in null_slots {
514 template[bitmap_start + i / 8] |= 1 << (i % 8);
515 }
516 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
517 for slot in 0..phys_count {
518 if null_slots.contains(&slot) {
519 continue;
520 }
521 template.push(DataType::Integer.type_tag());
522 let value_offset = template.len();
523 template.extend_from_slice(&[0u8; 8]);
524 slot_offsets.push((slot, value_offset));
525 }
526 IntRowTemplate {
527 template,
528 slot_offsets,
529 }
530}
531
532#[inline]
534pub fn encode_int_row_with_template(
535 tmpl: &IntRowTemplate,
536 values: &[Value],
537 buf: &mut Vec<u8>,
538) -> Result<()> {
539 buf.clear();
540 buf.extend_from_slice(&tmpl.template);
541 for &(slot, off) in &tmpl.slot_offsets {
542 match &values[slot] {
543 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
544 other => {
545 return Err(SqlError::TypeMismatch {
546 expected: "Integer".into(),
547 got: other.data_type().to_string(),
548 });
549 }
550 }
551 }
552 Ok(())
553}
554
555fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
556 match DataType::from_tag(type_tag) {
557 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
558 data[..8].try_into().unwrap(),
559 ))),
560 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
561 data[..8].try_into().unwrap(),
562 ))),
563 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
564 Some(DataType::Text) => {
565 let s = std::str::from_utf8(data)
566 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
567 Ok(Value::Text(CompactString::from(s)))
568 }
569 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
570 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
571 data[..8].try_into().unwrap(),
572 ))),
573 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
574 data[..4].try_into().unwrap(),
575 ))),
576 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
577 data[..8].try_into().unwrap(),
578 ))),
579 Some(DataType::Interval) => {
580 if data.len() < 16 {
581 return Err(SqlError::InvalidValue("truncated interval".into()));
582 }
583 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
584 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
585 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
586 Ok(Value::Interval {
587 months,
588 days,
589 micros,
590 })
591 }
592 Some(DataType::Json) => {
593 let s = std::str::from_utf8(data)
594 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
595 Ok(Value::Json(CompactString::from(s)))
596 }
597 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
598 Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
599 Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
600 Some(DataType::Array) => decode_array_v2(data),
601 Some(DataType::Vector { .. }) => decode_vector(data),
602 _ => Err(SqlError::InvalidValue(format!(
603 "unknown column type tag: {type_tag}"
604 ))),
605 }
606}
607
608fn decode_vector(data: &[u8]) -> Result<Value> {
609 if data.len() < 2 {
610 return Err(SqlError::InvalidValue("truncated vector".into()));
611 }
612 let dim = u16::from_le_bytes([data[0], data[1]]) as usize;
613 if data.len() < 2 + dim * 4 {
614 return Err(SqlError::InvalidValue("truncated vector payload".into()));
615 }
616 let mut v = Vec::with_capacity(dim);
617 for i in 0..dim {
618 let off = 2 + i * 4;
619 v.push(f32::from_le_bytes(data[off..off + 4].try_into().unwrap()));
620 }
621 Ok(Value::Vector(std::sync::Arc::from(v.into_boxed_slice())))
622}
623
624fn encoded_array_v2_size(elems: &[Value]) -> usize {
625 let mut total = 4;
626 for elem in elems {
627 if elem.is_null() {
628 total += 1;
629 continue;
630 }
631 total += 1 + 1;
632 let tag = elem.data_type().type_tag();
633 match fixed_width_size(tag) {
634 Some(n) => total += n,
635 None => total += 4 + variable_cell_payload_size(elem),
636 }
637 }
638 total
639}
640
641fn variable_cell_payload_size(v: &Value) -> usize {
642 match v {
643 Value::Text(s) => s.len(),
644 Value::Blob(b) => b.len(),
645 Value::Json(s) => s.len(),
646 Value::Jsonb(b) => b.len(),
647 Value::TsVector(b) => b.len(),
648 Value::TsQuery(b) => b.len(),
649 Value::Array(a) => encoded_array_v2_size(a),
650 Value::Vector(v) => 2 + v.len() * 4,
651 _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
652 }
653}
654
655fn value_encoded_size_v2(v: &Value) -> Option<usize> {
656 if v.is_null() {
657 return None;
658 }
659 Some(match fixed_width_size(v.data_type().type_tag()) {
660 Some(n) => n,
661 None => variable_cell_payload_size(v),
662 })
663}
664
665fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
666 match v {
667 Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
668 Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
669 Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
670 Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
671 Value::Blob(b) => out[..b.len()].copy_from_slice(b),
672 Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
673 Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
674 Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
675 Value::Interval {
676 months,
677 days,
678 micros,
679 } => {
680 out[..4].copy_from_slice(&months.to_le_bytes());
681 out[4..8].copy_from_slice(&days.to_le_bytes());
682 out[8..16].copy_from_slice(µs.to_le_bytes());
683 }
684 Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
685 Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
686 Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
687 Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
688 Value::Array(a) => write_array_v2_into_slice(a, out),
689 Value::Vector(v) => {
690 out[..2].copy_from_slice(&(v.len() as u16).to_le_bytes());
691 let mut pos = 2;
692 for &x in v.iter() {
693 out[pos..pos + 4].copy_from_slice(&x.to_le_bytes());
694 pos += 4;
695 }
696 }
697 Value::Null => unreachable!(),
698 }
699}
700
701fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
702 out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
703 let mut pos = 4;
704 for elem in elems {
705 if elem.is_null() {
706 out[pos] = 0xFF;
707 pos += 1;
708 continue;
709 }
710 out[pos] = 0x00;
711 pos += 1;
712 let tag = elem.data_type().type_tag();
713 out[pos] = tag;
714 pos += 1;
715 match fixed_width_size(tag) {
716 Some(n) => {
717 write_value_payload_v2(elem, &mut out[pos..pos + n]);
718 pos += n;
719 }
720 None => {
721 let payload_len = variable_cell_payload_size(elem);
722 out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
723 pos += 4;
724 write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
725 pos += payload_len;
726 }
727 }
728 }
729}
730
731fn decode_array_v2(data: &[u8]) -> Result<Value> {
732 if data.len() < 4 {
733 return Err(SqlError::InvalidValue("truncated array length".into()));
734 }
735 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
736 let mut pos = 4;
737 let mut elems = Vec::with_capacity(count);
738 for _ in 0..count {
739 if pos >= data.len() {
740 return Err(SqlError::InvalidValue("truncated array elements".into()));
741 }
742 if data[pos] == 0xFF {
743 elems.push(Value::Null);
744 pos += 1;
745 continue;
746 }
747 if data[pos] != 0x00 {
748 return Err(SqlError::InvalidValue(
749 "invalid array element marker".into(),
750 ));
751 }
752 pos += 1;
753 if pos >= data.len() {
754 return Err(SqlError::InvalidValue("truncated array element".into()));
755 }
756 let type_tag = data[pos];
757 pos += 1;
758 let (val, advance) = match fixed_width_size(type_tag) {
759 Some(n) => {
760 if pos + n > data.len() {
761 return Err(SqlError::InvalidValue(
762 "truncated fixed-width array element".into(),
763 ));
764 }
765 let v = decode_value(type_tag, &data[pos..pos + n])?;
766 (v, n)
767 }
768 None => {
769 if pos + 4 > data.len() {
770 return Err(SqlError::InvalidValue(
771 "truncated array element length".into(),
772 ));
773 }
774 let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
775 pos += 4;
776 if pos + len > data.len() {
777 return Err(SqlError::InvalidValue(
778 "truncated variable-width array element".into(),
779 ));
780 }
781 let v = decode_value(type_tag, &data[pos..pos + len])?;
782 (v, len)
783 }
784 };
785 pos += advance;
786 elems.push(val);
787 }
788 Ok(Value::Array(std::sync::Arc::new(elems)))
789}
790
791#[derive(Clone, Copy, PartialEq, Eq, Debug)]
794pub(crate) enum RowVersion {
795 V1,
796 V2,
797}
798
799pub(crate) const V2_FLAG: u16 = 0x8000;
800pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
801
802#[inline]
803pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
804 match DataType::from_tag(type_tag)? {
805 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
806 DataType::Date => Some(4),
807 DataType::Boolean => Some(1),
808 DataType::Interval => Some(16),
809 DataType::Text
810 | DataType::Blob
811 | DataType::Json
812 | DataType::Jsonb
813 | DataType::TsVector
814 | DataType::TsQuery
815 | DataType::Array
816 | DataType::Vector { .. }
817 | DataType::Null => None,
818 }
819}
820
821#[inline]
822fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
823 if pos >= data.len() {
824 return Err(SqlError::InvalidValue("truncated column data".into()));
825 }
826 let type_tag = data[pos];
827 let after_tag = pos + 1;
828 let (data_len, body_pos) = match version {
829 RowVersion::V2 => match fixed_width_size(type_tag) {
830 Some(n) => (n, after_tag),
831 None => {
832 if after_tag + 4 > data.len() {
833 return Err(SqlError::InvalidValue("truncated column data".into()));
834 }
835 let len = u32::from_le_bytes([
836 data[after_tag],
837 data[after_tag + 1],
838 data[after_tag + 2],
839 data[after_tag + 3],
840 ]) as usize;
841 (len, after_tag + 4)
842 }
843 },
844 RowVersion::V1 => {
845 if after_tag + 4 > data.len() {
846 return Err(SqlError::InvalidValue("truncated column data".into()));
847 }
848 let len = u32::from_le_bytes([
849 data[after_tag],
850 data[after_tag + 1],
851 data[after_tag + 2],
852 data[after_tag + 3],
853 ]) as usize;
854 (len, after_tag + 4)
855 }
856 };
857 if body_pos + data_len > data.len() {
858 return Err(SqlError::InvalidValue("truncated column value".into()));
859 }
860 Ok((
861 type_tag,
862 &data[body_pos..body_pos + data_len],
863 body_pos + data_len,
864 ))
865}
866
867#[inline]
868fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
869 let (_, _, next) = read_cell(data, pos, version)?;
870 Ok(next)
871}
872
873fn copy_cell_to_v2(
874 data: &[u8],
875 pos: usize,
876 version: RowVersion,
877 out: &mut Vec<u8>,
878) -> Result<usize> {
879 let (tag, body, next) = read_cell(data, pos, version)?;
880 out.push(tag);
881 if fixed_width_size(tag).is_none() {
882 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
883 }
884 out.extend_from_slice(body);
885 Ok(next)
886}
887
888fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
889 if data.len() < 2 {
890 return Err(SqlError::InvalidValue("row data too short".into()));
891 }
892 let raw = u16::from_le_bytes([data[0], data[1]]);
893 let version = if raw & V2_FLAG != 0 {
894 RowVersion::V2
895 } else {
896 RowVersion::V1
897 };
898 let col_count = (raw & COL_COUNT_MASK) as usize;
899 let bitmap_bytes = col_count.div_ceil(8);
900 let pos = 2;
901 if data.len() < pos + bitmap_bytes {
902 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
903 }
904 Ok((
905 version,
906 col_count,
907 &data[pos..pos + bitmap_bytes],
908 pos + bitmap_bytes,
909 ))
910}
911
912pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
913 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
914
915 let mut values = Vec::with_capacity(col_count);
916 for i in 0..col_count {
917 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
918 values.push(Value::Null);
919 continue;
920 }
921 let (type_tag, body, next) = read_cell(data, pos, version)?;
922 values.push(decode_value(type_tag, body)?);
923 pos = next;
924 }
925
926 Ok(values)
927}
928
929#[inline]
931pub fn row_non_pk_count(data: &[u8]) -> usize {
932 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
933}
934
935pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
936 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
937
938 for i in 0..col_count {
939 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
940 continue;
941 }
942 let (type_tag, body, next) = read_cell(data, pos, version)?;
943 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
944 out[col_mapping[i]] = decode_value(type_tag, body)?;
945 }
946 pos = next;
947 }
948
949 Ok(())
950}
951
952pub fn decode_pk_into(
953 key: &[u8],
954 count: usize,
955 out: &mut [Value],
956 pk_mapping: &[usize],
957) -> Result<()> {
958 let mut pos = 0;
959 for i in 0..count {
960 let (v, n) = decode_key_value(&key[pos..])?;
961 if i < pk_mapping.len() {
962 out[pk_mapping[i]] = v;
963 }
964 pos += n;
965 }
966 Ok(())
967}
968
969pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
970 if targets.is_empty() {
971 return Ok(Vec::new());
972 }
973 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
974
975 let mut results = Vec::with_capacity(targets.len());
976 let mut ti = 0;
977
978 for col in 0..col_count {
979 if ti >= targets.len() {
980 break;
981 }
982 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
983
984 if col == targets[ti] {
985 if is_null {
986 results.push(Value::Null);
987 } else {
988 let (type_tag, body, next) = read_cell(data, pos, version)?;
989 results.push(decode_value(type_tag, body)?);
990 pos = next;
991 }
992 ti += 1;
993 } else if !is_null {
994 pos = skip_cell(data, pos, version)?;
995 }
996 }
997
998 while ti < targets.len() {
999 results.push(Value::Null);
1000 ti += 1;
1001 }
1002
1003 Ok(results)
1004}
1005
1006pub fn decode_columns_into(
1007 data: &[u8],
1008 targets: &[usize],
1009 schema_cols: &[usize],
1010 row: &mut [Value],
1011) -> Result<()> {
1012 if targets.is_empty() {
1013 return Ok(());
1014 }
1015 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1016
1017 let mut ti = 0;
1018 for col in 0..col_count {
1019 if ti >= targets.len() {
1020 break;
1021 }
1022 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1023
1024 if col == targets[ti] {
1025 if is_null {
1026 row[schema_cols[ti]] = Value::Null;
1027 } else {
1028 let (type_tag, body, next) = read_cell(data, pos, version)?;
1029 row[schema_cols[ti]] = decode_value(type_tag, body)?;
1030 pos = next;
1031 }
1032 ti += 1;
1033 } else if !is_null {
1034 pos = skip_cell(data, pos, version)?;
1035 }
1036 }
1037
1038 Ok(())
1039}
1040
1041#[derive(Debug, Clone, Copy)]
1042pub enum RawColumn<'a> {
1043 Null,
1044 Integer(i64),
1045 Real(f64),
1046 Boolean(bool),
1047 Text(&'a str),
1048 Blob(&'a [u8]),
1049 Time(i64),
1050 Date(i32),
1051 Timestamp(i64),
1052 Interval { months: i32, days: i32, micros: i64 },
1053 Json(&'a str),
1054 Jsonb(&'a [u8]),
1055 TsVector(&'a [u8]),
1056 TsQuery(&'a [u8]),
1057 Array(&'a [u8]),
1058 Vector(&'a [u8]),
1059}
1060
1061impl<'a> RawColumn<'a> {
1062 pub fn to_value(self) -> Value {
1063 match self {
1064 RawColumn::Null => Value::Null,
1065 RawColumn::Integer(i) => Value::Integer(i),
1066 RawColumn::Real(r) => Value::Real(r),
1067 RawColumn::Boolean(b) => Value::Boolean(b),
1068 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1069 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1070 RawColumn::Time(t) => Value::Time(t),
1071 RawColumn::Date(d) => Value::Date(d),
1072 RawColumn::Timestamp(t) => Value::Timestamp(t),
1073 RawColumn::Interval {
1074 months,
1075 days,
1076 micros,
1077 } => Value::Interval {
1078 months,
1079 days,
1080 micros,
1081 },
1082 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1083 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1084 RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1085 RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1086 RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1087 RawColumn::Vector(bytes) => decode_vector(bytes).unwrap_or(Value::Null),
1088 }
1089 }
1090
1091 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1092 use std::cmp::Ordering;
1093 match (self, other) {
1094 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1095 (RawColumn::Null, _) | (_, Value::Null) => None,
1096 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1097 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1098 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1099 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1100 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1101 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1102 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1103 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1104 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1105 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1106 (
1107 RawColumn::Interval {
1108 months: am,
1109 days: ad,
1110 micros: au,
1111 },
1112 Value::Interval {
1113 months: bm,
1114 days: bd,
1115 micros: bu,
1116 },
1117 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1118 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1119 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1120 (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1121 (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1122 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1123 Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1124 _ => None,
1125 },
1126 _ => None,
1127 }
1128 }
1129
1130 pub fn eq_value(&self, other: &Value) -> bool {
1131 match (self, other) {
1132 (RawColumn::Null, Value::Null) => true,
1133 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1134 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1135 (RawColumn::Real(a), Value::Real(b)) => a == b,
1136 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1137 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1138 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1139 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1140 (RawColumn::Time(a), Value::Time(b)) => a == b,
1141 (RawColumn::Date(a), Value::Date(b)) => a == b,
1142 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1143 (
1144 RawColumn::Interval {
1145 months: am,
1146 days: ad,
1147 micros: au,
1148 },
1149 Value::Interval {
1150 months: bm,
1151 days: bd,
1152 micros: bu,
1153 },
1154 ) => am == bm && ad == bd && au == bu,
1155 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1156 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1157 (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1158 (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1159 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1160 Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1161 _ => false,
1162 },
1163 _ => false,
1164 }
1165 }
1166
1167 pub fn as_f64(&self) -> Option<f64> {
1168 match self {
1169 RawColumn::Integer(i) => Some(*i as f64),
1170 RawColumn::Real(r) => Some(*r),
1171 _ => None,
1172 }
1173 }
1174
1175 pub fn as_i64(&self) -> Option<i64> {
1176 match self {
1177 RawColumn::Integer(i) => Some(*i),
1178 RawColumn::Time(t) => Some(*t),
1179 RawColumn::Date(d) => Some(*d as i64),
1180 RawColumn::Timestamp(t) => Some(*t),
1181 _ => None,
1182 }
1183 }
1184}
1185
1186fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1187 match DataType::from_tag(type_tag) {
1188 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1189 data[..8].try_into().unwrap(),
1190 ))),
1191 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1192 data[..8].try_into().unwrap(),
1193 ))),
1194 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1195 Some(DataType::Text) => {
1196 let s = std::str::from_utf8(data)
1197 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1198 Ok(RawColumn::Text(s))
1199 }
1200 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1201 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1202 data[..8].try_into().unwrap(),
1203 ))),
1204 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1205 data[..4].try_into().unwrap(),
1206 ))),
1207 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1208 data[..8].try_into().unwrap(),
1209 ))),
1210 Some(DataType::Interval) => {
1211 if data.len() < 16 {
1212 return Err(SqlError::InvalidValue("truncated interval".into()));
1213 }
1214 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1215 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1216 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1217 Ok(RawColumn::Interval {
1218 months,
1219 days,
1220 micros,
1221 })
1222 }
1223 Some(DataType::Json) => {
1224 let s = std::str::from_utf8(data)
1225 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1226 Ok(RawColumn::Json(s))
1227 }
1228 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1229 Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1230 Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1231 Some(DataType::Array) => Ok(RawColumn::Array(data)),
1232 Some(DataType::Vector { .. }) => Ok(RawColumn::Vector(data)),
1233 _ => Err(SqlError::InvalidValue(format!(
1234 "unknown column type tag: {type_tag}"
1235 ))),
1236 }
1237}
1238
1239pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1241 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1242 if target >= col_count || new_val.is_null() {
1243 return Ok(false);
1244 }
1245 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1246 if was_null {
1247 return Ok(false);
1248 }
1249 for col in 0..target {
1250 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1251 if !is_null {
1252 pos = skip_cell(data, pos, version)?;
1253 }
1254 }
1255 let type_tag = data[pos];
1256 let (old_data_len, val_start) = match version {
1257 RowVersion::V2 => match fixed_width_size(type_tag) {
1258 Some(n) => (n, pos + 1),
1259 None => {
1260 if pos + 5 > data.len() {
1261 return Err(SqlError::InvalidValue("truncated column data".into()));
1262 }
1263 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1264 (len, pos + 5)
1265 }
1266 },
1267 RowVersion::V1 => {
1268 if pos + 5 > data.len() {
1269 return Err(SqlError::InvalidValue("truncated column data".into()));
1270 }
1271 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1272 (len, pos + 5)
1273 }
1274 };
1275 let new_data_len = match value_encoded_size_v2(new_val) {
1276 Some(n) => n,
1277 None => return Ok(false),
1278 };
1279 if new_data_len != old_data_len {
1280 return Ok(false);
1281 }
1282 data[pos] = new_val.data_type().type_tag();
1283 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1284 Ok(true)
1285}
1286
1287pub fn patch_row_column(
1289 data: &[u8],
1290 target: usize,
1291 new_val: &Value,
1292 out: &mut Vec<u8>,
1293) -> Result<()> {
1294 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1295
1296 let new_col_count = if target >= col_count {
1297 target + 1
1298 } else {
1299 col_count
1300 };
1301 let new_bitmap_bytes = new_col_count.div_ceil(8);
1302 let bitmap_bytes = col_count.div_ceil(8);
1303 out.clear();
1304
1305 let header = (new_col_count as u16) | V2_FLAG;
1306 out.extend_from_slice(&header.to_le_bytes());
1307 let bitmap_start = out.len();
1308 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1309 for _ in bitmap_bytes..new_bitmap_bytes {
1310 out.push(0xFF);
1311 }
1312 if new_val.is_null() {
1313 out[bitmap_start + target / 8] |= 1 << (target % 8);
1314 } else {
1315 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1316 }
1317
1318 let mut pos = header_end;
1319 for col in 0..new_col_count {
1320 let was_null = if col < col_count {
1321 bitmap[col / 8] & (1 << (col % 8)) != 0
1322 } else {
1323 true
1324 };
1325
1326 if col == target {
1327 if !was_null {
1328 pos = skip_cell(data, pos, version)?;
1329 }
1330 if !new_val.is_null() {
1331 encode_cell_v2(new_val, out);
1332 }
1333 } else if !was_null {
1334 pos = copy_cell_to_v2(data, pos, version, out)?;
1335 }
1336 }
1337 Ok(())
1338}
1339
1340pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1341 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1342 if target >= col_count {
1343 return Ok(RawColumn::Null);
1344 }
1345
1346 for col in 0..=target {
1347 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1348
1349 if col == target {
1350 if is_null {
1351 return Ok(RawColumn::Null);
1352 }
1353 let (type_tag, body, _) = read_cell(data, pos, version)?;
1354 return decode_value_raw(type_tag, body);
1355 } else if !is_null {
1356 pos = skip_cell(data, pos, version)?;
1357 }
1358 }
1359
1360 unreachable!()
1361}
1362
1363pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1365 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1366 if target >= col_count {
1367 return Ok((RawColumn::Null, usize::MAX));
1368 }
1369
1370 for col in 0..=target {
1371 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1372
1373 if col == target {
1374 if is_null {
1375 return Ok((RawColumn::Null, usize::MAX));
1376 }
1377 let tag_offset = pos;
1378 let (type_tag, body, _) = read_cell(data, pos, version)?;
1379 let raw = decode_value_raw(type_tag, body)?;
1380 return Ok((raw, tag_offset));
1381 } else if !is_null {
1382 pos = skip_cell(data, pos, version)?;
1383 }
1384 }
1385
1386 unreachable!()
1387}
1388
1389pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1391 if offset == usize::MAX || new_val.is_null() {
1392 return Ok(false);
1393 }
1394 if data.len() < 2 || offset >= data.len() {
1395 return Err(SqlError::InvalidValue("truncated column data".into()));
1396 }
1397 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1398 RowVersion::V2
1399 } else {
1400 RowVersion::V1
1401 };
1402 let type_tag = data[offset];
1403 let (old_data_len, val_start) = match version {
1404 RowVersion::V2 => match fixed_width_size(type_tag) {
1405 Some(n) => (n, offset + 1),
1406 None => {
1407 if offset + 5 > data.len() {
1408 return Err(SqlError::InvalidValue("truncated column data".into()));
1409 }
1410 let len =
1411 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1412 (len, offset + 5)
1413 }
1414 },
1415 RowVersion::V1 => {
1416 if offset + 5 > data.len() {
1417 return Err(SqlError::InvalidValue("truncated column data".into()));
1418 }
1419 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1420 (len, offset + 5)
1421 }
1422 };
1423 let new_data_len = match value_encoded_size_v2(new_val) {
1424 Some(n) => n,
1425 None => return Ok(false),
1426 };
1427 if new_data_len != old_data_len {
1428 return Ok(false);
1429 }
1430 data[offset] = new_val.data_type().type_tag();
1431 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1432 Ok(true)
1433}
1434
1435pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1436 if key.is_empty() || key[0] != TAG_INTEGER {
1437 return Err(SqlError::InvalidValue("not an integer key".into()));
1438 }
1439 let (val, _) = decode_integer(&key[1..])?;
1440 match val {
1441 Value::Integer(i) => Ok(i),
1442 _ => unreachable!(),
1443 }
1444}
1445
1446#[cfg(test)]
1447#[path = "encoding_tests.rs"]
1448mod tests;