1use crate::error::{Result, SqlError};
4use crate::types::{CompactString, DataType, Value};
5
6const TAG_NULL: u8 = 0x00;
8const TAG_BLOB: u8 = 0x01;
9const TAG_TEXT: u8 = 0x02;
10const TAG_BOOLEAN: u8 = 0x03;
11const TAG_INTEGER: u8 = 0x04;
12const TAG_REAL: u8 = 0x05;
13const TAG_TIME: u8 = 0x06;
14const TAG_DATE: u8 = 0x07;
15const TAG_TIMESTAMP: u8 = 0x08;
16const TAG_INTERVAL: u8 = 0x09;
17const TAG_JSON: u8 = 0x0A;
18const TAG_JSONB: u8 = 0x0B;
19const TAG_TSVECTOR: u8 = 0x0C;
20const TAG_TSQUERY: u8 = 0x0D;
21const TAG_ARRAY: u8 = 0x0E;
22
23pub fn encode_key_value(value: &Value) -> Vec<u8> {
25 let mut buf = Vec::with_capacity(16);
26 encode_key_value_into(value, &mut buf);
27 buf
28}
29
30pub fn encode_composite_key(values: &[Value]) -> Vec<u8> {
32 let mut buf = Vec::new();
33 for v in values {
34 buf.extend_from_slice(&encode_key_value(v));
35 }
36 buf
37}
38
39pub fn encode_composite_key_into(values: &[Value], buf: &mut Vec<u8>) {
40 buf.clear();
41 for v in values {
42 encode_key_value_into(v, buf);
43 }
44}
45
46pub fn encode_composite_key_from_indices(indices: &[u16], row: &[Value], buf: &mut Vec<u8>) {
47 buf.clear();
48 for &i in indices {
49 encode_key_value_into(&row[i as usize], buf);
50 }
51}
52
53#[inline]
54pub fn encode_int_key_into(val: i64, buf: &mut Vec<u8>) {
55 buf.clear();
56 encode_signed_varint(TAG_INTEGER, val, buf);
57}
58
59pub(crate) fn encode_key_value_collated_into(
60 value: &Value,
61 coll: crate::types::Collation,
62 buf: &mut Vec<u8>,
63) {
64 match (value, coll) {
65 (Value::Text(s), crate::types::Collation::NoCase) => {
66 encode_bytes_into(TAG_TEXT, s.to_ascii_lowercase().as_bytes(), buf);
67 }
68 (Value::Text(s), crate::types::Collation::Rtrim) => {
69 encode_bytes_into(TAG_TEXT, s.trim_end_matches(' ').as_bytes(), buf);
70 }
71 _ => encode_key_value_into(value, buf),
72 }
73}
74
75pub(crate) fn encode_key_value_into(value: &Value, buf: &mut Vec<u8>) {
76 match value {
77 Value::Null => buf.push(TAG_NULL),
78 Value::Boolean(b) => {
79 buf.push(TAG_BOOLEAN);
80 buf.push(if *b { 0x01 } else { 0x00 });
81 }
82 Value::Integer(i) => encode_integer_into(*i, buf),
83 Value::Real(r) => encode_real_into(*r, buf),
84 Value::Text(s) => encode_bytes_into(TAG_TEXT, s.as_bytes(), buf),
85 Value::Blob(b) => encode_bytes_into(TAG_BLOB, b, buf),
86 Value::Time(t) => encode_signed_varint(TAG_TIME, *t, buf),
87 Value::Date(d) => encode_signed_varint(TAG_DATE, i64::from(*d), buf),
88 Value::Timestamp(t) => encode_signed_varint(TAG_TIMESTAMP, *t, buf),
89 Value::Interval {
90 months,
91 days,
92 micros,
93 } => {
94 buf.push(TAG_INTERVAL);
96 let mut mb = months.to_be_bytes();
97 mb[0] ^= 0x80;
98 buf.extend_from_slice(&mb);
99 let mut db = days.to_be_bytes();
100 db[0] ^= 0x80;
101 buf.extend_from_slice(&db);
102 let mut ub = micros.to_be_bytes();
103 ub[0] ^= 0x80;
104 buf.extend_from_slice(&ub);
105 }
106 Value::Json(s) => encode_bytes_into(TAG_JSON, s.as_bytes(), buf),
107 Value::Jsonb(b) => encode_bytes_into(TAG_JSONB, b, buf),
108 Value::TsVector(b) => encode_bytes_into(TAG_TSVECTOR, b, buf),
109 Value::TsQuery(b) => encode_bytes_into(TAG_TSQUERY, b, buf),
110 Value::Array(a) => encode_array_into(a, buf),
111 }
112}
113
114fn encode_array_into(elems: &[Value], buf: &mut Vec<u8>) {
115 buf.push(TAG_ARRAY);
116 let mut inner = Vec::new();
117 for v in elems {
118 encode_key_value_into(v, &mut inner);
119 }
120 encode_bytes_into_no_tag(&inner, buf);
121}
122
123fn encode_bytes_into_no_tag(data: &[u8], buf: &mut Vec<u8>) {
124 for &b in data {
125 if b == 0x00 {
126 buf.push(0x00);
127 buf.push(0xFF);
128 } else {
129 buf.push(b);
130 }
131 }
132 buf.push(0x00);
133}
134
135fn encode_integer_into(val: i64, buf: &mut Vec<u8>) {
136 encode_signed_varint(TAG_INTEGER, val, buf);
137}
138
139pub(crate) fn encode_signed_varint(tag: u8, val: i64, buf: &mut Vec<u8>) {
145 buf.push(tag);
146 if val == 0 {
147 buf.push(0x80);
148 return;
149 }
150 if val > 0 {
151 let bytes = val.to_be_bytes();
152 let start = bytes.iter().position(|&b| b != 0).unwrap();
153 let byte_count = (8 - start) as u8;
154 buf.push(0x80 + byte_count);
155 buf.extend_from_slice(&bytes[start..]);
156 } else {
157 let abs_val = if val == i64::MIN {
158 u64::MAX / 2 + 1
159 } else {
160 (-val) as u64
161 };
162 let bytes = abs_val.to_be_bytes();
163 let start = bytes.iter().position(|&b| b != 0).unwrap();
164 let byte_count = (8 - start) as u8;
165 buf.push(0x80 - byte_count);
166 for &b in &bytes[start..] {
167 buf.push(!b);
168 }
169 }
170}
171
172fn encode_real_into(val: f64, buf: &mut Vec<u8>) {
173 buf.push(TAG_REAL);
174 let bits = val.to_bits();
175 let encoded = if val.is_sign_negative() {
176 !bits
177 } else {
178 bits ^ (1u64 << 63)
179 };
180 buf.extend_from_slice(&encoded.to_be_bytes());
181}
182
183fn encode_bytes_into(tag: u8, data: &[u8], buf: &mut Vec<u8>) {
184 buf.push(tag);
185 for &b in data {
186 if b == 0x00 {
187 buf.push(0x00);
188 buf.push(0xFF);
189 } else {
190 buf.push(b);
191 }
192 }
193 buf.push(0x00);
194}
195
196pub fn decode_key_value(data: &[u8]) -> Result<(Value, usize)> {
198 if data.is_empty() {
199 return Err(SqlError::InvalidValue("empty key data".into()));
200 }
201 match data[0] {
202 TAG_NULL => Ok((Value::Null, 1)),
203 TAG_BOOLEAN => {
204 if data.len() < 2 {
205 return Err(SqlError::InvalidValue("truncated boolean".into()));
206 }
207 Ok((Value::Boolean(data[1] != 0), 2))
208 }
209 TAG_INTEGER => decode_integer(&data[1..]).map(|(v, n)| (v, n + 1)),
210 TAG_REAL => decode_real(&data[1..]).map(|(v, n)| (v, n + 1)),
211 TAG_TIME => decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Time(v), n + 1)),
212 TAG_DATE => decode_signed_varint(&data[1..]).map(|(v, n)| {
213 let d = v.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
214 (Value::Date(d), n + 1)
215 }),
216 TAG_TIMESTAMP => {
217 decode_signed_varint(&data[1..]).map(|(v, n)| (Value::Timestamp(v), n + 1))
218 }
219 TAG_INTERVAL => {
220 if data.len() < 1 + 16 {
221 return Err(SqlError::InvalidValue("truncated interval".into()));
222 }
223 let mut mb: [u8; 4] = data[1..5].try_into().unwrap();
224 mb[0] ^= 0x80;
225 let mut db: [u8; 4] = data[5..9].try_into().unwrap();
226 db[0] ^= 0x80;
227 let mut ub: [u8; 8] = data[9..17].try_into().unwrap();
228 ub[0] ^= 0x80;
229 Ok((
230 Value::Interval {
231 months: i32::from_be_bytes(mb),
232 days: i32::from_be_bytes(db),
233 micros: i64::from_be_bytes(ub),
234 },
235 17,
236 ))
237 }
238 TAG_TEXT => {
239 let (bytes, n) = decode_null_escaped(&data[1..])?;
240 let s = String::from_utf8(bytes)
241 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in key".into()))?;
242 Ok((Value::Text(CompactString::from(s)), n + 1))
243 }
244 TAG_BLOB => {
245 let (bytes, n) = decode_null_escaped(&data[1..])?;
246 Ok((Value::Blob(bytes), n + 1))
247 }
248 TAG_JSON => {
249 let (bytes, n) = decode_null_escaped(&data[1..])?;
250 let s = String::from_utf8(bytes)
251 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON key".into()))?;
252 Ok((Value::Json(CompactString::from(s)), n + 1))
253 }
254 TAG_JSONB => {
255 let (bytes, n) = decode_null_escaped(&data[1..])?;
256 Ok((Value::Jsonb(std::sync::Arc::from(bytes)), n + 1))
257 }
258 TAG_TSVECTOR => {
259 let (bytes, n) = decode_null_escaped(&data[1..])?;
260 Ok((Value::TsVector(std::sync::Arc::from(bytes)), n + 1))
261 }
262 TAG_TSQUERY => {
263 let (bytes, n) = decode_null_escaped(&data[1..])?;
264 Ok((Value::TsQuery(std::sync::Arc::from(bytes)), n + 1))
265 }
266 TAG_ARRAY => {
267 let (inner, n) = decode_null_escaped(&data[1..])?;
268 let mut elems = Vec::new();
269 let mut pos = 0;
270 while pos < inner.len() {
271 let (v, vlen) = decode_key_value(&inner[pos..])?;
272 elems.push(v);
273 pos += vlen;
274 }
275 Ok((Value::Array(std::sync::Arc::new(elems)), n + 1))
276 }
277 tag => Err(SqlError::InvalidValue(format!("unknown key tag: {tag:#x}"))),
278 }
279}
280
281pub fn decode_composite_key(data: &[u8], count: usize) -> Result<Vec<Value>> {
283 let mut values = Vec::with_capacity(count);
284 let mut pos = 0;
285 for _ in 0..count {
286 let (v, n) = decode_key_value(&data[pos..])?;
287 values.push(v);
288 pos += n;
289 }
290 Ok(values)
291}
292
293fn decode_integer(data: &[u8]) -> Result<(Value, usize)> {
294 let (v, n) = decode_signed_varint(data)?;
295 Ok((Value::Integer(v), n))
296}
297
298pub(crate) fn decode_signed_varint(data: &[u8]) -> Result<(i64, usize)> {
300 if data.is_empty() {
301 return Err(SqlError::InvalidValue("truncated integer".into()));
302 }
303 let marker = data[0];
304 if marker == 0x80 {
305 return Ok((0, 1));
306 }
307 if marker > 0x80 {
308 let byte_count = (marker - 0x80) as usize;
309 if data.len() < 1 + byte_count {
310 return Err(SqlError::InvalidValue("truncated positive integer".into()));
311 }
312 let mut bytes = [0u8; 8];
313 bytes[8 - byte_count..].copy_from_slice(&data[1..1 + byte_count]);
314 let val = i64::from_be_bytes(bytes);
315 Ok((val, 1 + byte_count))
316 } else {
317 let byte_count = (0x80 - marker) as usize;
318 if data.len() < 1 + byte_count {
319 return Err(SqlError::InvalidValue("truncated negative integer".into()));
320 }
321 let mut bytes = [0u8; 8];
322 for i in 0..byte_count {
323 bytes[8 - byte_count + i] = !data[1 + i];
324 }
325 let abs_val = u64::from_be_bytes(bytes);
326 let val = (-(abs_val as i128)) as i64;
327 Ok((val, 1 + byte_count))
328 }
329}
330
331fn decode_real(data: &[u8]) -> Result<(Value, usize)> {
332 if data.len() < 8 {
333 return Err(SqlError::InvalidValue("truncated real".into()));
334 }
335 let encoded = u64::from_be_bytes(data[..8].try_into().unwrap());
336 let bits = if encoded & (1u64 << 63) != 0 {
337 encoded ^ (1u64 << 63)
339 } else {
340 !encoded
342 };
343 let val = f64::from_bits(bits);
344 Ok((Value::Real(val), 8))
345}
346
347fn decode_null_escaped(data: &[u8]) -> Result<(Vec<u8>, usize)> {
349 let mut result = Vec::new();
350 let mut i = 0;
351 while i < data.len() {
352 if data[i] == 0x00 {
353 if i + 1 < data.len() && data[i + 1] == 0xFF {
354 result.push(0x00);
355 i += 2;
356 } else {
357 return Ok((result, i + 1)); }
359 } else {
360 result.push(data[i]);
361 i += 1;
362 }
363 }
364 Err(SqlError::InvalidValue(
365 "unterminated null-escaped string".into(),
366 ))
367}
368
369fn encode_cell_v2(v: &Value, buf: &mut Vec<u8>) {
370 match v {
371 Value::Integer(val) => {
372 buf.push(DataType::Integer.type_tag());
373 buf.extend_from_slice(&val.to_le_bytes());
374 }
375 Value::Real(r) => {
376 buf.push(DataType::Real.type_tag());
377 buf.extend_from_slice(&r.to_le_bytes());
378 }
379 Value::Boolean(b) => {
380 buf.push(DataType::Boolean.type_tag());
381 buf.push(if *b { 1 } else { 0 });
382 }
383 Value::Text(s) => {
384 let bytes = s.as_bytes();
385 buf.push(DataType::Text.type_tag());
386 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
387 buf.extend_from_slice(bytes);
388 }
389 Value::Blob(data) => {
390 buf.push(DataType::Blob.type_tag());
391 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
392 buf.extend_from_slice(data);
393 }
394 Value::Time(t) => {
395 buf.push(DataType::Time.type_tag());
396 buf.extend_from_slice(&t.to_le_bytes());
397 }
398 Value::Date(d) => {
399 buf.push(DataType::Date.type_tag());
400 buf.extend_from_slice(&d.to_le_bytes());
401 }
402 Value::Timestamp(t) => {
403 buf.push(DataType::Timestamp.type_tag());
404 buf.extend_from_slice(&t.to_le_bytes());
405 }
406 Value::Interval {
407 months,
408 days,
409 micros,
410 } => {
411 buf.push(DataType::Interval.type_tag());
412 buf.extend_from_slice(&months.to_le_bytes());
413 buf.extend_from_slice(&days.to_le_bytes());
414 buf.extend_from_slice(µs.to_le_bytes());
415 }
416 Value::Json(s) => {
417 let bytes = s.as_bytes();
418 buf.push(DataType::Json.type_tag());
419 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
420 buf.extend_from_slice(bytes);
421 }
422 Value::Jsonb(b) => {
423 buf.push(DataType::Jsonb.type_tag());
424 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
425 buf.extend_from_slice(b);
426 }
427 Value::TsVector(b) => {
428 buf.push(DataType::TsVector.type_tag());
429 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
430 buf.extend_from_slice(b);
431 }
432 Value::TsQuery(b) => {
433 buf.push(DataType::TsQuery.type_tag());
434 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
435 buf.extend_from_slice(b);
436 }
437 Value::Array(a) => {
438 buf.push(DataType::Array.type_tag());
439 let len = encoded_array_v2_size(a);
440 buf.extend_from_slice(&(len as u32).to_le_bytes());
441 let start = buf.len();
442 buf.resize(start + len, 0);
443 write_array_v2_into_slice(a, &mut buf[start..start + len]);
444 }
445 Value::Null => unreachable!(),
446 }
447}
448
449pub fn encode_row(values: &[Value]) -> Vec<u8> {
450 let mut buf = Vec::new();
451 encode_row_into(values, &mut buf);
452 buf
453}
454
455pub fn encode_row_into(values: &[Value], buf: &mut Vec<u8>) {
456 buf.clear();
457 let col_count = values.len();
458 let bitmap_bytes = col_count.div_ceil(8);
459
460 let header = (col_count as u16) | V2_FLAG;
461 buf.extend_from_slice(&header.to_le_bytes());
462
463 let bitmap_start = buf.len();
464 buf.resize(buf.len() + bitmap_bytes, 0);
465
466 for (i, v) in values.iter().enumerate() {
467 if v.is_null() {
468 buf[bitmap_start + i / 8] |= 1 << (i % 8);
469 continue;
470 }
471 encode_cell_v2(v, buf);
472 }
473}
474
475pub struct IntRowTemplate {
476 pub template: Vec<u8>,
477 pub slot_offsets: Vec<(usize, usize)>,
478}
479
480pub fn build_int_row_template(phys_count: usize, null_slots: &[usize]) -> IntRowTemplate {
481 let bitmap_bytes = phys_count.div_ceil(8);
482 let mut template = Vec::with_capacity(2 + bitmap_bytes + phys_count * 9);
483 let header = (phys_count as u16) | V2_FLAG;
484 template.extend_from_slice(&header.to_le_bytes());
485 let bitmap_start = template.len();
486 template.resize(bitmap_start + bitmap_bytes, 0);
487 for &i in null_slots {
488 template[bitmap_start + i / 8] |= 1 << (i % 8);
489 }
490 let mut slot_offsets = Vec::with_capacity(phys_count.saturating_sub(null_slots.len()));
491 for slot in 0..phys_count {
492 if null_slots.contains(&slot) {
493 continue;
494 }
495 template.push(DataType::Integer.type_tag());
496 let value_offset = template.len();
497 template.extend_from_slice(&[0u8; 8]);
498 slot_offsets.push((slot, value_offset));
499 }
500 IntRowTemplate {
501 template,
502 slot_offsets,
503 }
504}
505
506#[inline]
508pub fn encode_int_row_with_template(
509 tmpl: &IntRowTemplate,
510 values: &[Value],
511 buf: &mut Vec<u8>,
512) -> Result<()> {
513 buf.clear();
514 buf.extend_from_slice(&tmpl.template);
515 for &(slot, off) in &tmpl.slot_offsets {
516 match &values[slot] {
517 Value::Integer(v) => buf[off..off + 8].copy_from_slice(&v.to_le_bytes()),
518 other => {
519 return Err(SqlError::TypeMismatch {
520 expected: "Integer".into(),
521 got: other.data_type().to_string(),
522 });
523 }
524 }
525 }
526 Ok(())
527}
528
529fn decode_value(type_tag: u8, data: &[u8]) -> Result<Value> {
530 match DataType::from_tag(type_tag) {
531 Some(DataType::Integer) => Ok(Value::Integer(i64::from_le_bytes(
532 data[..8].try_into().unwrap(),
533 ))),
534 Some(DataType::Real) => Ok(Value::Real(f64::from_le_bytes(
535 data[..8].try_into().unwrap(),
536 ))),
537 Some(DataType::Boolean) => Ok(Value::Boolean(data[0] != 0)),
538 Some(DataType::Text) => {
539 let s = std::str::from_utf8(data)
540 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
541 Ok(Value::Text(CompactString::from(s)))
542 }
543 Some(DataType::Blob) => Ok(Value::Blob(data.to_vec())),
544 Some(DataType::Time) => Ok(Value::Time(i64::from_le_bytes(
545 data[..8].try_into().unwrap(),
546 ))),
547 Some(DataType::Date) => Ok(Value::Date(i32::from_le_bytes(
548 data[..4].try_into().unwrap(),
549 ))),
550 Some(DataType::Timestamp) => Ok(Value::Timestamp(i64::from_le_bytes(
551 data[..8].try_into().unwrap(),
552 ))),
553 Some(DataType::Interval) => {
554 if data.len() < 16 {
555 return Err(SqlError::InvalidValue("truncated interval".into()));
556 }
557 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
558 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
559 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
560 Ok(Value::Interval {
561 months,
562 days,
563 micros,
564 })
565 }
566 Some(DataType::Json) => {
567 let s = std::str::from_utf8(data)
568 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
569 Ok(Value::Json(CompactString::from(s)))
570 }
571 Some(DataType::Jsonb) => Ok(Value::Jsonb(std::sync::Arc::from(data))),
572 Some(DataType::TsVector) => Ok(Value::TsVector(std::sync::Arc::from(data))),
573 Some(DataType::TsQuery) => Ok(Value::TsQuery(std::sync::Arc::from(data))),
574 Some(DataType::Array) => decode_array_v2(data),
575 _ => Err(SqlError::InvalidValue(format!(
576 "unknown column type tag: {type_tag}"
577 ))),
578 }
579}
580
581fn encoded_array_v2_size(elems: &[Value]) -> usize {
582 let mut total = 4;
583 for elem in elems {
584 if elem.is_null() {
585 total += 1;
586 continue;
587 }
588 total += 1 + 1;
589 let tag = elem.data_type().type_tag();
590 match fixed_width_size(tag) {
591 Some(n) => total += n,
592 None => total += 4 + variable_cell_payload_size(elem),
593 }
594 }
595 total
596}
597
598fn variable_cell_payload_size(v: &Value) -> usize {
599 match v {
600 Value::Text(s) => s.len(),
601 Value::Blob(b) => b.len(),
602 Value::Json(s) => s.len(),
603 Value::Jsonb(b) => b.len(),
604 Value::TsVector(b) => b.len(),
605 Value::TsQuery(b) => b.len(),
606 Value::Array(a) => encoded_array_v2_size(a),
607 _ => unreachable!("variable_cell_payload_size called on fixed-width value"),
608 }
609}
610
611fn value_encoded_size_v2(v: &Value) -> Option<usize> {
612 if v.is_null() {
613 return None;
614 }
615 Some(match fixed_width_size(v.data_type().type_tag()) {
616 Some(n) => n,
617 None => variable_cell_payload_size(v),
618 })
619}
620
621fn write_value_payload_v2(v: &Value, out: &mut [u8]) {
622 match v {
623 Value::Integer(i) => out[..8].copy_from_slice(&i.to_le_bytes()),
624 Value::Real(r) => out[..8].copy_from_slice(&r.to_le_bytes()),
625 Value::Boolean(b) => out[0] = if *b { 1 } else { 0 },
626 Value::Text(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
627 Value::Blob(b) => out[..b.len()].copy_from_slice(b),
628 Value::Time(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
629 Value::Date(d) => out[..4].copy_from_slice(&d.to_le_bytes()),
630 Value::Timestamp(t) => out[..8].copy_from_slice(&t.to_le_bytes()),
631 Value::Interval {
632 months,
633 days,
634 micros,
635 } => {
636 out[..4].copy_from_slice(&months.to_le_bytes());
637 out[4..8].copy_from_slice(&days.to_le_bytes());
638 out[8..16].copy_from_slice(µs.to_le_bytes());
639 }
640 Value::Json(s) => out[..s.len()].copy_from_slice(s.as_bytes()),
641 Value::Jsonb(b) => out[..b.len()].copy_from_slice(b),
642 Value::TsVector(b) => out[..b.len()].copy_from_slice(b),
643 Value::TsQuery(b) => out[..b.len()].copy_from_slice(b),
644 Value::Array(a) => write_array_v2_into_slice(a, out),
645 Value::Null => unreachable!(),
646 }
647}
648
649fn write_array_v2_into_slice(elems: &[Value], out: &mut [u8]) {
650 out[..4].copy_from_slice(&(elems.len() as u32).to_le_bytes());
651 let mut pos = 4;
652 for elem in elems {
653 if elem.is_null() {
654 out[pos] = 0xFF;
655 pos += 1;
656 continue;
657 }
658 out[pos] = 0x00;
659 pos += 1;
660 let tag = elem.data_type().type_tag();
661 out[pos] = tag;
662 pos += 1;
663 match fixed_width_size(tag) {
664 Some(n) => {
665 write_value_payload_v2(elem, &mut out[pos..pos + n]);
666 pos += n;
667 }
668 None => {
669 let payload_len = variable_cell_payload_size(elem);
670 out[pos..pos + 4].copy_from_slice(&(payload_len as u32).to_le_bytes());
671 pos += 4;
672 write_value_payload_v2(elem, &mut out[pos..pos + payload_len]);
673 pos += payload_len;
674 }
675 }
676 }
677}
678
679fn decode_array_v2(data: &[u8]) -> Result<Value> {
680 if data.len() < 4 {
681 return Err(SqlError::InvalidValue("truncated array length".into()));
682 }
683 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
684 let mut pos = 4;
685 let mut elems = Vec::with_capacity(count);
686 for _ in 0..count {
687 if pos >= data.len() {
688 return Err(SqlError::InvalidValue("truncated array elements".into()));
689 }
690 if data[pos] == 0xFF {
691 elems.push(Value::Null);
692 pos += 1;
693 continue;
694 }
695 if data[pos] != 0x00 {
696 return Err(SqlError::InvalidValue(
697 "invalid array element marker".into(),
698 ));
699 }
700 pos += 1;
701 if pos >= data.len() {
702 return Err(SqlError::InvalidValue("truncated array element".into()));
703 }
704 let type_tag = data[pos];
705 pos += 1;
706 let (val, advance) = match fixed_width_size(type_tag) {
707 Some(n) => {
708 if pos + n > data.len() {
709 return Err(SqlError::InvalidValue(
710 "truncated fixed-width array element".into(),
711 ));
712 }
713 let v = decode_value(type_tag, &data[pos..pos + n])?;
714 (v, n)
715 }
716 None => {
717 if pos + 4 > data.len() {
718 return Err(SqlError::InvalidValue(
719 "truncated array element length".into(),
720 ));
721 }
722 let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
723 pos += 4;
724 if pos + len > data.len() {
725 return Err(SqlError::InvalidValue(
726 "truncated variable-width array element".into(),
727 ));
728 }
729 let v = decode_value(type_tag, &data[pos..pos + len])?;
730 (v, len)
731 }
732 };
733 pos += advance;
734 elems.push(val);
735 }
736 Ok(Value::Array(std::sync::Arc::new(elems)))
737}
738
739#[derive(Clone, Copy, PartialEq, Eq, Debug)]
742pub(crate) enum RowVersion {
743 V1,
744 V2,
745}
746
747pub(crate) const V2_FLAG: u16 = 0x8000;
748pub(crate) const COL_COUNT_MASK: u16 = 0x7FFF;
749
750#[inline]
751pub(crate) fn fixed_width_size(type_tag: u8) -> Option<usize> {
752 match DataType::from_tag(type_tag)? {
753 DataType::Integer | DataType::Real | DataType::Time | DataType::Timestamp => Some(8),
754 DataType::Date => Some(4),
755 DataType::Boolean => Some(1),
756 DataType::Interval => Some(16),
757 DataType::Text
758 | DataType::Blob
759 | DataType::Json
760 | DataType::Jsonb
761 | DataType::TsVector
762 | DataType::TsQuery
763 | DataType::Array
764 | DataType::Null => None,
765 }
766}
767
768#[inline]
769fn read_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<(u8, &[u8], usize)> {
770 if pos >= data.len() {
771 return Err(SqlError::InvalidValue("truncated column data".into()));
772 }
773 let type_tag = data[pos];
774 let after_tag = pos + 1;
775 let (data_len, body_pos) = match version {
776 RowVersion::V2 => match fixed_width_size(type_tag) {
777 Some(n) => (n, after_tag),
778 None => {
779 if after_tag + 4 > data.len() {
780 return Err(SqlError::InvalidValue("truncated column data".into()));
781 }
782 let len = u32::from_le_bytes([
783 data[after_tag],
784 data[after_tag + 1],
785 data[after_tag + 2],
786 data[after_tag + 3],
787 ]) as usize;
788 (len, after_tag + 4)
789 }
790 },
791 RowVersion::V1 => {
792 if after_tag + 4 > data.len() {
793 return Err(SqlError::InvalidValue("truncated column data".into()));
794 }
795 let len = u32::from_le_bytes([
796 data[after_tag],
797 data[after_tag + 1],
798 data[after_tag + 2],
799 data[after_tag + 3],
800 ]) as usize;
801 (len, after_tag + 4)
802 }
803 };
804 if body_pos + data_len > data.len() {
805 return Err(SqlError::InvalidValue("truncated column value".into()));
806 }
807 Ok((
808 type_tag,
809 &data[body_pos..body_pos + data_len],
810 body_pos + data_len,
811 ))
812}
813
814#[inline]
815fn skip_cell(data: &[u8], pos: usize, version: RowVersion) -> Result<usize> {
816 let (_, _, next) = read_cell(data, pos, version)?;
817 Ok(next)
818}
819
820fn copy_cell_to_v2(
821 data: &[u8],
822 pos: usize,
823 version: RowVersion,
824 out: &mut Vec<u8>,
825) -> Result<usize> {
826 let (tag, body, next) = read_cell(data, pos, version)?;
827 out.push(tag);
828 if fixed_width_size(tag).is_none() {
829 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
830 }
831 out.extend_from_slice(body);
832 Ok(next)
833}
834
835fn parse_row_header(data: &[u8]) -> Result<(RowVersion, usize, &[u8], usize)> {
836 if data.len() < 2 {
837 return Err(SqlError::InvalidValue("row data too short".into()));
838 }
839 let raw = u16::from_le_bytes([data[0], data[1]]);
840 let version = if raw & V2_FLAG != 0 {
841 RowVersion::V2
842 } else {
843 RowVersion::V1
844 };
845 let col_count = (raw & COL_COUNT_MASK) as usize;
846 let bitmap_bytes = col_count.div_ceil(8);
847 let pos = 2;
848 if data.len() < pos + bitmap_bytes {
849 return Err(SqlError::InvalidValue("truncated null bitmap".into()));
850 }
851 Ok((
852 version,
853 col_count,
854 &data[pos..pos + bitmap_bytes],
855 pos + bitmap_bytes,
856 ))
857}
858
859pub fn decode_row(data: &[u8]) -> Result<Vec<Value>> {
860 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
861
862 let mut values = Vec::with_capacity(col_count);
863 for i in 0..col_count {
864 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
865 values.push(Value::Null);
866 continue;
867 }
868 let (type_tag, body, next) = read_cell(data, pos, version)?;
869 values.push(decode_value(type_tag, body)?);
870 pos = next;
871 }
872
873 Ok(values)
874}
875
876#[inline]
878pub fn row_non_pk_count(data: &[u8]) -> usize {
879 (u16::from_le_bytes([data[0], data[1]]) & COL_COUNT_MASK) as usize
880}
881
882pub fn decode_row_into(data: &[u8], out: &mut [Value], col_mapping: &[usize]) -> Result<()> {
883 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
884
885 for i in 0..col_count {
886 if bitmap[i / 8] & (1 << (i % 8)) != 0 {
887 continue;
888 }
889 let (type_tag, body, next) = read_cell(data, pos, version)?;
890 if i < col_mapping.len() && col_mapping[i] != usize::MAX {
891 out[col_mapping[i]] = decode_value(type_tag, body)?;
892 }
893 pos = next;
894 }
895
896 Ok(())
897}
898
899pub fn decode_pk_into(
900 key: &[u8],
901 count: usize,
902 out: &mut [Value],
903 pk_mapping: &[usize],
904) -> Result<()> {
905 let mut pos = 0;
906 for i in 0..count {
907 let (v, n) = decode_key_value(&key[pos..])?;
908 if i < pk_mapping.len() {
909 out[pk_mapping[i]] = v;
910 }
911 pos += n;
912 }
913 Ok(())
914}
915
916pub fn decode_columns(data: &[u8], targets: &[usize]) -> Result<Vec<Value>> {
917 if targets.is_empty() {
918 return Ok(Vec::new());
919 }
920 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
921
922 let mut results = Vec::with_capacity(targets.len());
923 let mut ti = 0;
924
925 for col in 0..col_count {
926 if ti >= targets.len() {
927 break;
928 }
929 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
930
931 if col == targets[ti] {
932 if is_null {
933 results.push(Value::Null);
934 } else {
935 let (type_tag, body, next) = read_cell(data, pos, version)?;
936 results.push(decode_value(type_tag, body)?);
937 pos = next;
938 }
939 ti += 1;
940 } else if !is_null {
941 pos = skip_cell(data, pos, version)?;
942 }
943 }
944
945 while ti < targets.len() {
946 results.push(Value::Null);
947 ti += 1;
948 }
949
950 Ok(results)
951}
952
953pub fn decode_columns_into(
954 data: &[u8],
955 targets: &[usize],
956 schema_cols: &[usize],
957 row: &mut [Value],
958) -> Result<()> {
959 if targets.is_empty() {
960 return Ok(());
961 }
962 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
963
964 let mut ti = 0;
965 for col in 0..col_count {
966 if ti >= targets.len() {
967 break;
968 }
969 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
970
971 if col == targets[ti] {
972 if is_null {
973 row[schema_cols[ti]] = Value::Null;
974 } else {
975 let (type_tag, body, next) = read_cell(data, pos, version)?;
976 row[schema_cols[ti]] = decode_value(type_tag, body)?;
977 pos = next;
978 }
979 ti += 1;
980 } else if !is_null {
981 pos = skip_cell(data, pos, version)?;
982 }
983 }
984
985 Ok(())
986}
987
988#[derive(Debug, Clone, Copy)]
989pub enum RawColumn<'a> {
990 Null,
991 Integer(i64),
992 Real(f64),
993 Boolean(bool),
994 Text(&'a str),
995 Blob(&'a [u8]),
996 Time(i64),
997 Date(i32),
998 Timestamp(i64),
999 Interval { months: i32, days: i32, micros: i64 },
1000 Json(&'a str),
1001 Jsonb(&'a [u8]),
1002 TsVector(&'a [u8]),
1003 TsQuery(&'a [u8]),
1004 Array(&'a [u8]),
1005}
1006
1007impl<'a> RawColumn<'a> {
1008 pub fn to_value(self) -> Value {
1009 match self {
1010 RawColumn::Null => Value::Null,
1011 RawColumn::Integer(i) => Value::Integer(i),
1012 RawColumn::Real(r) => Value::Real(r),
1013 RawColumn::Boolean(b) => Value::Boolean(b),
1014 RawColumn::Text(s) => Value::Text(CompactString::from(s)),
1015 RawColumn::Blob(b) => Value::Blob(b.to_vec()),
1016 RawColumn::Time(t) => Value::Time(t),
1017 RawColumn::Date(d) => Value::Date(d),
1018 RawColumn::Timestamp(t) => Value::Timestamp(t),
1019 RawColumn::Interval {
1020 months,
1021 days,
1022 micros,
1023 } => Value::Interval {
1024 months,
1025 days,
1026 micros,
1027 },
1028 RawColumn::Json(s) => Value::Json(CompactString::from(s)),
1029 RawColumn::Jsonb(b) => Value::Jsonb(std::sync::Arc::from(b)),
1030 RawColumn::TsVector(b) => Value::TsVector(std::sync::Arc::from(b)),
1031 RawColumn::TsQuery(b) => Value::TsQuery(std::sync::Arc::from(b)),
1032 RawColumn::Array(bytes) => decode_array_v2(bytes).unwrap_or(Value::Null),
1033 }
1034 }
1035
1036 pub fn cmp_value(&self, other: &Value) -> Option<std::cmp::Ordering> {
1037 use std::cmp::Ordering;
1038 match (self, other) {
1039 (RawColumn::Null, Value::Null) => Some(Ordering::Equal),
1040 (RawColumn::Null, _) | (_, Value::Null) => None,
1041 (RawColumn::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
1042 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
1043 (RawColumn::Real(a), Value::Real(b)) => a.partial_cmp(b),
1044 (RawColumn::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
1045 (RawColumn::Text(a), Value::Text(b)) => Some((*a).cmp(b.as_str())),
1046 (RawColumn::Blob(a), Value::Blob(b)) => Some((*a).cmp(b.as_slice())),
1047 (RawColumn::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
1048 (RawColumn::Time(a), Value::Time(b)) => Some(a.cmp(b)),
1049 (RawColumn::Date(a), Value::Date(b)) => Some(a.cmp(b)),
1050 (RawColumn::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
1051 (
1052 RawColumn::Interval {
1053 months: am,
1054 days: ad,
1055 micros: au,
1056 },
1057 Value::Interval {
1058 months: bm,
1059 days: bd,
1060 micros: bu,
1061 },
1062 ) => Some(am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu))),
1063 (RawColumn::Json(a), Value::Json(b)) => Some((*a).cmp(b.as_str())),
1064 (RawColumn::Jsonb(a), Value::Jsonb(b)) => Some((*a).cmp(b.as_ref())),
1065 (RawColumn::TsVector(a), Value::TsVector(b)) => Some((*a).cmp(b.as_ref())),
1066 (RawColumn::TsQuery(a), Value::TsQuery(b)) => Some((*a).cmp(b.as_ref())),
1067 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes).ok()? {
1068 Value::Array(a) => Some(a.as_ref().cmp(b.as_ref())),
1069 _ => None,
1070 },
1071 _ => None,
1072 }
1073 }
1074
1075 pub fn eq_value(&self, other: &Value) -> bool {
1076 match (self, other) {
1077 (RawColumn::Null, Value::Null) => true,
1078 (RawColumn::Integer(a), Value::Integer(b)) => a == b,
1079 (RawColumn::Integer(a), Value::Real(b)) => (*a as f64) == *b,
1080 (RawColumn::Real(a), Value::Real(b)) => a == b,
1081 (RawColumn::Real(a), Value::Integer(b)) => *a == (*b as f64),
1082 (RawColumn::Text(a), Value::Text(b)) => *a == b.as_str(),
1083 (RawColumn::Blob(a), Value::Blob(b)) => *a == b.as_slice(),
1084 (RawColumn::Boolean(a), Value::Boolean(b)) => a == b,
1085 (RawColumn::Time(a), Value::Time(b)) => a == b,
1086 (RawColumn::Date(a), Value::Date(b)) => a == b,
1087 (RawColumn::Timestamp(a), Value::Timestamp(b)) => a == b,
1088 (
1089 RawColumn::Interval {
1090 months: am,
1091 days: ad,
1092 micros: au,
1093 },
1094 Value::Interval {
1095 months: bm,
1096 days: bd,
1097 micros: bu,
1098 },
1099 ) => am == bm && ad == bd && au == bu,
1100 (RawColumn::Json(a), Value::Json(b)) => *a == b.as_str(),
1101 (RawColumn::Jsonb(a), Value::Jsonb(b)) => *a == b.as_ref(),
1102 (RawColumn::TsVector(a), Value::TsVector(b)) => *a == b.as_ref(),
1103 (RawColumn::TsQuery(a), Value::TsQuery(b)) => *a == b.as_ref(),
1104 (RawColumn::Array(bytes), Value::Array(b)) => match decode_array_v2(bytes) {
1105 Ok(Value::Array(a)) => a.as_ref() == b.as_ref(),
1106 _ => false,
1107 },
1108 _ => false,
1109 }
1110 }
1111
1112 pub fn as_f64(&self) -> Option<f64> {
1113 match self {
1114 RawColumn::Integer(i) => Some(*i as f64),
1115 RawColumn::Real(r) => Some(*r),
1116 _ => None,
1117 }
1118 }
1119
1120 pub fn as_i64(&self) -> Option<i64> {
1121 match self {
1122 RawColumn::Integer(i) => Some(*i),
1123 RawColumn::Time(t) => Some(*t),
1124 RawColumn::Date(d) => Some(*d as i64),
1125 RawColumn::Timestamp(t) => Some(*t),
1126 _ => None,
1127 }
1128 }
1129}
1130
1131fn decode_value_raw(type_tag: u8, data: &[u8]) -> Result<RawColumn<'_>> {
1132 match DataType::from_tag(type_tag) {
1133 Some(DataType::Integer) => Ok(RawColumn::Integer(i64::from_le_bytes(
1134 data[..8].try_into().unwrap(),
1135 ))),
1136 Some(DataType::Real) => Ok(RawColumn::Real(f64::from_le_bytes(
1137 data[..8].try_into().unwrap(),
1138 ))),
1139 Some(DataType::Boolean) => Ok(RawColumn::Boolean(data[0] != 0)),
1140 Some(DataType::Text) => {
1141 let s = std::str::from_utf8(data)
1142 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in column".into()))?;
1143 Ok(RawColumn::Text(s))
1144 }
1145 Some(DataType::Blob) => Ok(RawColumn::Blob(data)),
1146 Some(DataType::Time) => Ok(RawColumn::Time(i64::from_le_bytes(
1147 data[..8].try_into().unwrap(),
1148 ))),
1149 Some(DataType::Date) => Ok(RawColumn::Date(i32::from_le_bytes(
1150 data[..4].try_into().unwrap(),
1151 ))),
1152 Some(DataType::Timestamp) => Ok(RawColumn::Timestamp(i64::from_le_bytes(
1153 data[..8].try_into().unwrap(),
1154 ))),
1155 Some(DataType::Interval) => {
1156 if data.len() < 16 {
1157 return Err(SqlError::InvalidValue("truncated interval".into()));
1158 }
1159 let months = i32::from_le_bytes(data[0..4].try_into().unwrap());
1160 let days = i32::from_le_bytes(data[4..8].try_into().unwrap());
1161 let micros = i64::from_le_bytes(data[8..16].try_into().unwrap());
1162 Ok(RawColumn::Interval {
1163 months,
1164 days,
1165 micros,
1166 })
1167 }
1168 Some(DataType::Json) => {
1169 let s = std::str::from_utf8(data)
1170 .map_err(|_| SqlError::InvalidValue("invalid UTF-8 in JSON column".into()))?;
1171 Ok(RawColumn::Json(s))
1172 }
1173 Some(DataType::Jsonb) => Ok(RawColumn::Jsonb(data)),
1174 Some(DataType::TsVector) => Ok(RawColumn::TsVector(data)),
1175 Some(DataType::TsQuery) => Ok(RawColumn::TsQuery(data)),
1176 Some(DataType::Array) => Ok(RawColumn::Array(data)),
1177 _ => Err(SqlError::InvalidValue(format!(
1178 "unknown column type tag: {type_tag}"
1179 ))),
1180 }
1181}
1182
1183pub fn patch_column_in_place(data: &mut [u8], target: usize, new_val: &Value) -> Result<bool> {
1185 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1186 if target >= col_count || new_val.is_null() {
1187 return Ok(false);
1188 }
1189 let was_null = bitmap[target / 8] & (1 << (target % 8)) != 0;
1190 if was_null {
1191 return Ok(false);
1192 }
1193 for col in 0..target {
1194 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1195 if !is_null {
1196 pos = skip_cell(data, pos, version)?;
1197 }
1198 }
1199 let type_tag = data[pos];
1200 let (old_data_len, val_start) = match version {
1201 RowVersion::V2 => match fixed_width_size(type_tag) {
1202 Some(n) => (n, pos + 1),
1203 None => {
1204 if pos + 5 > data.len() {
1205 return Err(SqlError::InvalidValue("truncated column data".into()));
1206 }
1207 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1208 (len, pos + 5)
1209 }
1210 },
1211 RowVersion::V1 => {
1212 if pos + 5 > data.len() {
1213 return Err(SqlError::InvalidValue("truncated column data".into()));
1214 }
1215 let len = u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
1216 (len, pos + 5)
1217 }
1218 };
1219 let new_data_len = match value_encoded_size_v2(new_val) {
1220 Some(n) => n,
1221 None => return Ok(false),
1222 };
1223 if new_data_len != old_data_len {
1224 return Ok(false);
1225 }
1226 data[pos] = new_val.data_type().type_tag();
1227 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1228 Ok(true)
1229}
1230
1231pub fn patch_row_column(
1233 data: &[u8],
1234 target: usize,
1235 new_val: &Value,
1236 out: &mut Vec<u8>,
1237) -> Result<()> {
1238 let (version, col_count, bitmap, header_end) = parse_row_header(data)?;
1239
1240 let new_col_count = if target >= col_count {
1241 target + 1
1242 } else {
1243 col_count
1244 };
1245 let new_bitmap_bytes = new_col_count.div_ceil(8);
1246 let bitmap_bytes = col_count.div_ceil(8);
1247 out.clear();
1248
1249 let header = (new_col_count as u16) | V2_FLAG;
1250 out.extend_from_slice(&header.to_le_bytes());
1251 let bitmap_start = out.len();
1252 out.extend_from_slice(&data[2..2 + bitmap_bytes]);
1253 for _ in bitmap_bytes..new_bitmap_bytes {
1254 out.push(0xFF);
1255 }
1256 if new_val.is_null() {
1257 out[bitmap_start + target / 8] |= 1 << (target % 8);
1258 } else {
1259 out[bitmap_start + target / 8] &= !(1 << (target % 8));
1260 }
1261
1262 let mut pos = header_end;
1263 for col in 0..new_col_count {
1264 let was_null = if col < col_count {
1265 bitmap[col / 8] & (1 << (col % 8)) != 0
1266 } else {
1267 true
1268 };
1269
1270 if col == target {
1271 if !was_null {
1272 pos = skip_cell(data, pos, version)?;
1273 }
1274 if !new_val.is_null() {
1275 encode_cell_v2(new_val, out);
1276 }
1277 } else if !was_null {
1278 pos = copy_cell_to_v2(data, pos, version, out)?;
1279 }
1280 }
1281 Ok(())
1282}
1283
1284pub fn decode_column_raw(data: &[u8], target: usize) -> Result<RawColumn<'_>> {
1285 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1286 if target >= col_count {
1287 return Ok(RawColumn::Null);
1288 }
1289
1290 for col in 0..=target {
1291 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1292
1293 if col == target {
1294 if is_null {
1295 return Ok(RawColumn::Null);
1296 }
1297 let (type_tag, body, _) = read_cell(data, pos, version)?;
1298 return decode_value_raw(type_tag, body);
1299 } else if !is_null {
1300 pos = skip_cell(data, pos, version)?;
1301 }
1302 }
1303
1304 unreachable!()
1305}
1306
1307pub fn decode_column_with_offset(data: &[u8], target: usize) -> Result<(RawColumn<'_>, usize)> {
1309 let (version, col_count, bitmap, mut pos) = parse_row_header(data)?;
1310 if target >= col_count {
1311 return Ok((RawColumn::Null, usize::MAX));
1312 }
1313
1314 for col in 0..=target {
1315 let is_null = bitmap[col / 8] & (1 << (col % 8)) != 0;
1316
1317 if col == target {
1318 if is_null {
1319 return Ok((RawColumn::Null, usize::MAX));
1320 }
1321 let tag_offset = pos;
1322 let (type_tag, body, _) = read_cell(data, pos, version)?;
1323 let raw = decode_value_raw(type_tag, body)?;
1324 return Ok((raw, tag_offset));
1325 } else if !is_null {
1326 pos = skip_cell(data, pos, version)?;
1327 }
1328 }
1329
1330 unreachable!()
1331}
1332
1333pub fn patch_at_offset(data: &mut [u8], offset: usize, new_val: &Value) -> Result<bool> {
1335 if offset == usize::MAX || new_val.is_null() {
1336 return Ok(false);
1337 }
1338 if data.len() < 2 || offset >= data.len() {
1339 return Err(SqlError::InvalidValue("truncated column data".into()));
1340 }
1341 let version = if u16::from_le_bytes([data[0], data[1]]) & V2_FLAG != 0 {
1342 RowVersion::V2
1343 } else {
1344 RowVersion::V1
1345 };
1346 let type_tag = data[offset];
1347 let (old_data_len, val_start) = match version {
1348 RowVersion::V2 => match fixed_width_size(type_tag) {
1349 Some(n) => (n, offset + 1),
1350 None => {
1351 if offset + 5 > data.len() {
1352 return Err(SqlError::InvalidValue("truncated column data".into()));
1353 }
1354 let len =
1355 u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1356 (len, offset + 5)
1357 }
1358 },
1359 RowVersion::V1 => {
1360 if offset + 5 > data.len() {
1361 return Err(SqlError::InvalidValue("truncated column data".into()));
1362 }
1363 let len = u32::from_le_bytes(data[offset + 1..offset + 5].try_into().unwrap()) as usize;
1364 (len, offset + 5)
1365 }
1366 };
1367 let new_data_len = match value_encoded_size_v2(new_val) {
1368 Some(n) => n,
1369 None => return Ok(false),
1370 };
1371 if new_data_len != old_data_len {
1372 return Ok(false);
1373 }
1374 data[offset] = new_val.data_type().type_tag();
1375 write_value_payload_v2(new_val, &mut data[val_start..val_start + new_data_len]);
1376 Ok(true)
1377}
1378
1379pub fn decode_pk_integer(key: &[u8]) -> Result<i64> {
1380 if key.is_empty() || key[0] != TAG_INTEGER {
1381 return Err(SqlError::InvalidValue("not an integer key".into()));
1382 }
1383 let (val, _) = decode_integer(&key[1..])?;
1384 match val {
1385 Value::Integer(i) => Ok(i),
1386 _ => unreachable!(),
1387 }
1388}
1389
1390#[cfg(test)]
1391#[path = "encoding_tests.rs"]
1392mod tests;