1use nodedb_types::columnar::{ColumnType, SchemaOps, StrictSchema};
8use nodedb_types::datetime::NdbDateTime;
9use nodedb_types::value::Value;
10
11use crate::error::StrictError;
12
13pub struct TupleDecoder {
18 schema: StrictSchema,
19 fixed_offsets: Vec<Option<usize>>,
22 fixed_section_size: usize,
24 var_table_index: Vec<Option<usize>>,
27 var_count: usize,
29 header_size: usize,
31}
32
33impl TupleDecoder {
34 pub fn new(schema: &StrictSchema) -> Self {
36 let mut fixed_offsets = Vec::with_capacity(schema.columns.len());
37 let mut var_table_index = Vec::with_capacity(schema.columns.len());
38 let mut fixed_offset = 0usize;
39 let mut var_idx = 0usize;
40
41 for col in &schema.columns {
42 if let Some(size) = col.column_type.fixed_size() {
43 fixed_offsets.push(Some(fixed_offset));
44 var_table_index.push(None);
45 fixed_offset += size;
46 } else {
47 fixed_offsets.push(None);
48 var_table_index.push(Some(var_idx));
49 var_idx += 1;
50 }
51 }
52
53 let header_size = 2 + schema.null_bitmap_size();
54
55 Self {
56 schema: schema.clone(),
57 fixed_offsets,
58 fixed_section_size: fixed_offset,
59 var_table_index,
60 var_count: var_idx,
61 header_size,
62 }
63 }
64
65 pub fn schema_version(&self, tuple: &[u8]) -> Result<u16, StrictError> {
67 if tuple.len() < 2 {
68 return Err(StrictError::TruncatedTuple {
69 expected: 2,
70 got: tuple.len(),
71 });
72 }
73 Ok(u16::from_le_bytes([tuple[0], tuple[1]]))
74 }
75
76 pub fn is_null(&self, tuple: &[u8], col_idx: usize) -> Result<bool, StrictError> {
78 self.check_bounds(col_idx)?;
79 self.check_min_size(tuple)?;
80
81 let bitmap_byte = tuple[2 + col_idx / 8];
82 Ok(bitmap_byte & (1 << (col_idx % 8)) != 0)
83 }
84
85 pub fn extract_fixed_raw<'a>(
89 &self,
90 tuple: &'a [u8],
91 col_idx: usize,
92 ) -> Result<Option<&'a [u8]>, StrictError> {
93 self.check_bounds(col_idx)?;
94 self.check_min_size(tuple)?;
95
96 if self.is_null_unchecked(tuple, col_idx) {
97 return Ok(None);
98 }
99
100 let offset = self.fixed_offsets[col_idx].ok_or(StrictError::TypeMismatch {
101 column: self.schema.columns[col_idx].name.clone(),
102 expected: self.schema.columns[col_idx].column_type.clone(),
103 })?;
104
105 let size = self.schema.columns[col_idx]
106 .column_type
107 .fixed_size()
108 .ok_or(StrictError::TypeMismatch {
109 column: self.schema.columns[col_idx].name.clone(),
110 expected: self.schema.columns[col_idx].column_type.clone(),
111 })?;
112 let start = self.header_size + offset;
113 let end = start + size;
114
115 if end > tuple.len() {
116 return Err(StrictError::TruncatedTuple {
117 expected: end,
118 got: tuple.len(),
119 });
120 }
121
122 Ok(Some(&tuple[start..end]))
123 }
124
125 pub fn extract_variable_raw<'a>(
129 &self,
130 tuple: &'a [u8],
131 col_idx: usize,
132 ) -> Result<Option<&'a [u8]>, StrictError> {
133 self.check_bounds(col_idx)?;
134 self.check_min_size(tuple)?;
135
136 if self.is_null_unchecked(tuple, col_idx) {
137 return Ok(None);
138 }
139
140 let var_idx = self.var_table_index[col_idx].ok_or(StrictError::TypeMismatch {
141 column: self.schema.columns[col_idx].name.clone(),
142 expected: self.schema.columns[col_idx].column_type.clone(),
143 })?;
144
145 let table_start = self.header_size + self.fixed_section_size;
146 let entry_pos = table_start + var_idx * 4;
147 let next_pos = entry_pos + 4;
148
149 if next_pos + 4 > tuple.len() {
150 return Err(StrictError::TruncatedTuple {
151 expected: next_pos + 4,
152 got: tuple.len(),
153 });
154 }
155
156 let offset = u32::from_le_bytes(
158 tuple[entry_pos..entry_pos + 4]
159 .try_into()
160 .expect("4-byte slice from bounds-checked range"),
161 );
162 let next_offset = u32::from_le_bytes(
163 tuple[next_pos..next_pos + 4]
164 .try_into()
165 .expect("4-byte slice from bounds-checked range"),
166 );
167
168 let var_data_start = table_start + (self.var_count + 1) * 4;
169 let abs_start = var_data_start + offset as usize;
170 let abs_end = var_data_start + next_offset as usize;
171
172 if abs_end > tuple.len() {
173 return Err(StrictError::CorruptOffset {
174 offset: next_offset,
175 len: tuple.len(),
176 });
177 }
178
179 Ok(Some(&tuple[abs_start..abs_end]))
180 }
181
182 pub fn extract_value(&self, tuple: &[u8], col_idx: usize) -> Result<Value, StrictError> {
187 self.check_bounds(col_idx)?;
188
189 if self.is_null(tuple, col_idx)? {
190 return Ok(Value::Null);
191 }
192
193 let col = &self.schema.columns[col_idx];
194
195 if col.column_type.fixed_size().is_some() {
196 let raw = self
197 .extract_fixed_raw(tuple, col_idx)?
198 .ok_or(StrictError::TypeMismatch {
199 column: col.name.clone(),
200 expected: col.column_type.clone(),
201 })?;
202 Ok(decode_fixed_value(&col.column_type, raw))
203 } else {
204 let raw =
205 self.extract_variable_raw(tuple, col_idx)?
206 .ok_or(StrictError::TypeMismatch {
207 column: col.name.clone(),
208 expected: col.column_type.clone(),
209 })?;
210 Ok(decode_variable_value(&col.column_type, raw))
211 }
212 }
213
214 pub fn extract_all(&self, tuple: &[u8]) -> Result<Vec<Value>, StrictError> {
216 let mut values = Vec::with_capacity(self.schema.columns.len());
217 for i in 0..self.schema.columns.len() {
218 values.push(self.extract_value(tuple, i)?);
219 }
220 Ok(values)
221 }
222
223 pub fn extract_by_name(&self, tuple: &[u8], name: &str) -> Result<Value, StrictError> {
225 let idx = self
226 .schema
227 .column_index(name)
228 .ok_or(StrictError::ColumnOutOfRange {
229 index: usize::MAX,
230 count: self.schema.columns.len(),
231 })?;
232 self.extract_value(tuple, idx)
233 }
234
235 pub fn extract_value_versioned(
243 &self,
244 tuple: &[u8],
245 col_idx: usize,
246 old_col_count: usize,
247 ) -> Result<Value, StrictError> {
248 self.check_bounds(col_idx)?;
249
250 if col_idx >= old_col_count {
251 let col = &self.schema.columns[col_idx];
254 return if col.nullable {
255 Ok(Value::Null)
256 } else {
257 Ok(Value::Null)
260 };
261 }
262
263 self.extract_value(tuple, col_idx)
264 }
265
266 pub fn schema(&self) -> &StrictSchema {
268 &self.schema
269 }
270
271 pub fn fixed_section_start(&self) -> usize {
273 self.header_size
274 }
275
276 pub fn offset_table_start(&self) -> usize {
278 self.header_size + self.fixed_section_size
279 }
280
281 pub fn var_data_start(&self) -> usize {
283 self.offset_table_start() + (self.var_count + 1) * 4
284 }
285
286 pub fn var_count(&self) -> usize {
288 self.var_count
289 }
290
291 pub fn fixed_field_location(&self, col_idx: usize) -> Option<(usize, usize)> {
294 let offset = self.fixed_offsets.get(col_idx).copied().flatten()?;
295 let size = self.schema.columns[col_idx].column_type.fixed_size()?;
296 Some((self.header_size + offset, size))
297 }
298
299 pub fn var_field_index(&self, col_idx: usize) -> Option<usize> {
302 self.var_table_index.get(col_idx).copied().flatten()
303 }
304
305 fn check_bounds(&self, col_idx: usize) -> Result<(), StrictError> {
308 if col_idx >= self.schema.columns.len() {
309 Err(StrictError::ColumnOutOfRange {
310 index: col_idx,
311 count: self.schema.columns.len(),
312 })
313 } else {
314 Ok(())
315 }
316 }
317
318 fn check_min_size(&self, tuple: &[u8]) -> Result<(), StrictError> {
319 let min = self.header_size;
320 if tuple.len() < min {
321 Err(StrictError::TruncatedTuple {
322 expected: min,
323 got: tuple.len(),
324 })
325 } else {
326 Ok(())
327 }
328 }
329
330 fn is_null_unchecked(&self, tuple: &[u8], col_idx: usize) -> bool {
331 let bitmap_byte = tuple[2 + col_idx / 8];
332 bitmap_byte & (1 << (col_idx % 8)) != 0
333 }
334}
335
336fn decode_fixed_value(col_type: &ColumnType, raw: &[u8]) -> Value {
338 match col_type {
339 ColumnType::Int64 => Value::Integer(i64::from_le_bytes([
340 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
341 ])),
342 ColumnType::Float64 => Value::Float(f64::from_le_bytes([
343 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
344 ])),
345 ColumnType::Bool => Value::Bool(raw[0] != 0),
346 ColumnType::Timestamp => {
347 let micros = i64::from_le_bytes([
348 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
349 ]);
350 Value::DateTime(NdbDateTime::from_micros(micros))
351 }
352 ColumnType::Decimal => {
353 let mut bytes = [0u8; 16];
354 bytes.copy_from_slice(&raw[..16]);
355 Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
356 }
357 ColumnType::Uuid => {
358 let mut bytes = [0u8; 16];
359 bytes.copy_from_slice(&raw[..16]);
360 let parsed = uuid::Uuid::from_bytes(bytes);
361 Value::Uuid(parsed.to_string())
362 }
363 ColumnType::Vector(dim) => {
364 let d = *dim as usize;
365 let mut floats = Vec::with_capacity(d);
366 for i in 0..d {
367 let off = i * 4;
368 let bytes = [raw[off], raw[off + 1], raw[off + 2], raw[off + 3]];
369 let f = f32::from_le_bytes(bytes);
370 floats.push(Value::Float(f as f64));
371 }
372 Value::Array(floats)
373 }
374 _ => Value::Null, }
376}
377
378fn decode_variable_value(col_type: &ColumnType, raw: &[u8]) -> Value {
380 match col_type {
381 ColumnType::String => {
382 Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
383 }
384 ColumnType::Bytes => Value::Bytes(raw.to_vec()),
385 ColumnType::Geometry => {
386 if let Ok(geom) = sonic_rs::from_slice::<nodedb_types::geometry::Geometry>(raw) {
388 Value::Geometry(geom)
389 } else {
390 Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
391 }
392 }
393 ColumnType::Json => {
394 match nodedb_types::value_from_msgpack(raw) {
396 Ok(val) => val,
397 Err(e) => {
398 tracing::warn!(len = raw.len(), error = %e, "corrupted JSON msgpack in tuple");
399 Value::Null
400 }
401 }
402 }
403 _ => Value::Null,
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use nodedb_types::columnar::ColumnDef;
410
411 use super::*;
412 use crate::encode::TupleEncoder;
413
414 fn crm_schema() -> StrictSchema {
415 StrictSchema::new(vec![
416 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
417 ColumnDef::required("name", ColumnType::String),
418 ColumnDef::nullable("email", ColumnType::String),
419 ColumnDef::required("balance", ColumnType::Decimal),
420 ColumnDef::nullable("active", ColumnType::Bool),
421 ])
422 .unwrap()
423 }
424
425 fn encode_crm_row(values: &[Value]) -> Vec<u8> {
426 let schema = crm_schema();
427 TupleEncoder::new(&schema).encode(values).unwrap()
428 }
429
430 #[test]
431 fn roundtrip_all_fields() {
432 let schema = crm_schema();
433 let encoder = TupleEncoder::new(&schema);
434 let decoder = TupleDecoder::new(&schema);
435
436 let values = vec![
437 Value::Integer(42),
438 Value::String("Alice".into()),
439 Value::String("alice@example.com".into()),
440 Value::Decimal(rust_decimal::Decimal::new(5000, 2)),
441 Value::Bool(true),
442 ];
443
444 let tuple = encoder.encode(&values).unwrap();
445 let decoded = decoder.extract_all(&tuple).unwrap();
446
447 assert_eq!(decoded[0], Value::Integer(42));
448 assert_eq!(decoded[1], Value::String("Alice".into()));
449 assert_eq!(decoded[2], Value::String("alice@example.com".into()));
450 assert_eq!(
451 decoded[3],
452 Value::Decimal(rust_decimal::Decimal::new(5000, 2))
453 );
454 assert_eq!(decoded[4], Value::Bool(true));
455 }
456
457 #[test]
458 fn roundtrip_with_nulls() {
459 let schema = crm_schema();
460 let encoder = TupleEncoder::new(&schema);
461 let decoder = TupleDecoder::new(&schema);
462
463 let values = vec![
464 Value::Integer(1),
465 Value::String("Bob".into()),
466 Value::Null,
467 Value::Decimal(rust_decimal::Decimal::ZERO),
468 Value::Null,
469 ];
470
471 let tuple = encoder.encode(&values).unwrap();
472 let decoded = decoder.extract_all(&tuple).unwrap();
473
474 assert_eq!(decoded[0], Value::Integer(1));
475 assert_eq!(decoded[1], Value::String("Bob".into()));
476 assert_eq!(decoded[2], Value::Null);
477 assert_eq!(decoded[3], Value::Decimal(rust_decimal::Decimal::ZERO));
478 assert_eq!(decoded[4], Value::Null);
479 }
480
481 #[test]
482 fn o1_extraction_single_field() {
483 let schema = crm_schema();
484 let decoder = TupleDecoder::new(&schema);
485
486 let tuple = encode_crm_row(&[
487 Value::Integer(99),
488 Value::String("Charlie".into()),
489 Value::String("charlie@co.com".into()),
490 Value::Decimal(rust_decimal::Decimal::new(12345, 0)),
491 Value::Bool(false),
492 ]);
493
494 let balance = decoder.extract_value(&tuple, 3).unwrap();
496 assert_eq!(
497 balance,
498 Value::Decimal(rust_decimal::Decimal::new(12345, 0))
499 );
500
501 let name = decoder.extract_value(&tuple, 1).unwrap();
503 assert_eq!(name, Value::String("Charlie".into()));
504 }
505
506 #[test]
507 fn extract_by_name() {
508 let schema = crm_schema();
509 let decoder = TupleDecoder::new(&schema);
510
511 let tuple = encode_crm_row(&[
512 Value::Integer(7),
513 Value::String("Dana".into()),
514 Value::Null,
515 Value::Decimal(rust_decimal::Decimal::new(999, 1)),
516 Value::Bool(true),
517 ]);
518
519 assert_eq!(
520 decoder.extract_by_name(&tuple, "name").unwrap(),
521 Value::String("Dana".into())
522 );
523 assert_eq!(
524 decoder.extract_by_name(&tuple, "email").unwrap(),
525 Value::Null
526 );
527 }
528
529 #[test]
530 fn null_bitmap_check() {
531 let schema = crm_schema();
532 let decoder = TupleDecoder::new(&schema);
533
534 let tuple = encode_crm_row(&[
535 Value::Integer(1),
536 Value::String("x".into()),
537 Value::Null,
538 Value::Decimal(rust_decimal::Decimal::ZERO),
539 Value::Null,
540 ]);
541
542 assert!(!decoder.is_null(&tuple, 0).unwrap()); assert!(!decoder.is_null(&tuple, 1).unwrap()); assert!(decoder.is_null(&tuple, 2).unwrap()); assert!(!decoder.is_null(&tuple, 3).unwrap()); assert!(decoder.is_null(&tuple, 4).unwrap()); }
548
549 #[test]
550 fn column_out_of_range() {
551 let schema = crm_schema();
552 let decoder = TupleDecoder::new(&schema);
553 let tuple = encode_crm_row(&[
554 Value::Integer(1),
555 Value::String("x".into()),
556 Value::Null,
557 Value::Decimal(rust_decimal::Decimal::ZERO),
558 Value::Null,
559 ]);
560
561 let err = decoder.extract_value(&tuple, 99).unwrap_err();
562 assert!(matches!(
563 err,
564 StrictError::ColumnOutOfRange { index: 99, .. }
565 ));
566 }
567
568 #[test]
569 fn schema_version_read() {
570 let schema = crm_schema();
571 let decoder = TupleDecoder::new(&schema);
572 let tuple = encode_crm_row(&[
573 Value::Integer(1),
574 Value::String("x".into()),
575 Value::Null,
576 Value::Decimal(rust_decimal::Decimal::ZERO),
577 Value::Null,
578 ]);
579
580 assert_eq!(decoder.schema_version(&tuple).unwrap(), 1);
581 }
582
583 #[test]
584 fn versioned_extraction_new_column_returns_null() {
585 let schema = crm_schema();
586 let decoder = TupleDecoder::new(&schema);
587
588 let old_schema = StrictSchema::new(vec![
590 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
591 ColumnDef::required("name", ColumnType::String),
592 ColumnDef::nullable("email", ColumnType::String),
593 ])
594 .unwrap();
595 let old_encoder = TupleEncoder::new(&old_schema);
596 let tuple = old_encoder
597 .encode(&[Value::Integer(1), Value::String("x".into()), Value::Null])
598 .unwrap();
599
600 let balance = decoder.extract_value_versioned(&tuple, 3, 3).unwrap();
602 assert_eq!(balance, Value::Null);
603
604 let active = decoder.extract_value_versioned(&tuple, 4, 3).unwrap();
605 assert_eq!(active, Value::Null);
606
607 let id = decoder.extract_value_versioned(&tuple, 0, 3).unwrap();
609 assert_eq!(id, Value::Integer(1));
610 }
611
612 #[test]
613 fn raw_fixed_extraction() {
614 let schema = StrictSchema::new(vec![
615 ColumnDef::required("a", ColumnType::Int64),
616 ColumnDef::required("b", ColumnType::Float64),
617 ColumnDef::required("c", ColumnType::Bool),
618 ])
619 .unwrap();
620 let encoder = TupleEncoder::new(&schema);
621 let decoder = TupleDecoder::new(&schema);
622
623 let tuple = encoder
624 .encode(&[Value::Integer(42), Value::Float(0.75), Value::Bool(true)])
625 .unwrap();
626
627 let a_raw = decoder.extract_fixed_raw(&tuple, 0).unwrap().unwrap();
628 assert_eq!(i64::from_le_bytes(a_raw.try_into().unwrap()), 42);
629
630 let b_raw = decoder.extract_fixed_raw(&tuple, 1).unwrap().unwrap();
631 assert_eq!(f64::from_le_bytes(b_raw.try_into().unwrap()), 0.75);
632
633 let c_raw = decoder.extract_fixed_raw(&tuple, 2).unwrap().unwrap();
634 assert_eq!(c_raw[0], 1);
635 }
636
637 #[test]
638 fn raw_variable_extraction() {
639 let schema = StrictSchema::new(vec![
640 ColumnDef::required("id", ColumnType::Int64),
641 ColumnDef::required("name", ColumnType::String),
642 ColumnDef::nullable("bio", ColumnType::String),
643 ])
644 .unwrap();
645 let encoder = TupleEncoder::new(&schema);
646 let decoder = TupleDecoder::new(&schema);
647
648 let tuple = encoder
649 .encode(&[
650 Value::Integer(1),
651 Value::String("hello".into()),
652 Value::String("world".into()),
653 ])
654 .unwrap();
655
656 let name_raw = decoder.extract_variable_raw(&tuple, 1).unwrap().unwrap();
657 assert_eq!(std::str::from_utf8(name_raw).unwrap(), "hello");
658
659 let bio_raw = decoder.extract_variable_raw(&tuple, 2).unwrap().unwrap();
660 assert_eq!(std::str::from_utf8(bio_raw).unwrap(), "world");
661 }
662
663 #[test]
664 fn all_types_roundtrip() {
665 let schema = StrictSchema::new(vec![
666 ColumnDef::required("i", ColumnType::Int64),
667 ColumnDef::required("f", ColumnType::Float64),
668 ColumnDef::required("s", ColumnType::String),
669 ColumnDef::required("b", ColumnType::Bool),
670 ColumnDef::required("raw", ColumnType::Bytes),
671 ColumnDef::required("ts", ColumnType::Timestamp),
672 ColumnDef::required("dec", ColumnType::Decimal),
673 ColumnDef::required("uid", ColumnType::Uuid),
674 ColumnDef::required("vec", ColumnType::Vector(2)),
675 ])
676 .unwrap();
677 let encoder = TupleEncoder::new(&schema);
678 let decoder = TupleDecoder::new(&schema);
679
680 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
681 let values = vec![
682 Value::Integer(-100),
683 Value::Float(0.5),
684 Value::String("test string".into()),
685 Value::Bool(false),
686 Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]),
687 Value::DateTime(NdbDateTime::from_micros(1_000_000)),
688 Value::Decimal(rust_decimal::Decimal::new(314159, 5)),
689 Value::Uuid(uuid_str.into()),
690 Value::Array(vec![Value::Float(1.5), Value::Float(2.5)]),
691 ];
692
693 let tuple = encoder.encode(&values).unwrap();
694 let decoded = decoder.extract_all(&tuple).unwrap();
695
696 assert_eq!(decoded[0], Value::Integer(-100));
697 assert_eq!(decoded[1], Value::Float(0.5));
698 assert_eq!(decoded[2], Value::String("test string".into()));
699 assert_eq!(decoded[3], Value::Bool(false));
700 assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]));
701 assert_eq!(
702 decoded[5],
703 Value::DateTime(NdbDateTime::from_micros(1_000_000))
704 );
705 assert_eq!(
706 decoded[6],
707 Value::Decimal(rust_decimal::Decimal::new(314159, 5))
708 );
709 assert_eq!(decoded[7], Value::Uuid(uuid_str.into()));
710 if let Value::Array(ref arr) = decoded[8] {
712 assert_eq!(arr.len(), 2);
713 if let Value::Float(v) = arr[0] {
714 assert!((v - 1.5).abs() < 0.001);
715 }
716 } else {
717 panic!("expected array");
718 }
719 }
720}