1use nodedb_types::columnar::{ColumnType, SchemaOps, StrictSchema};
10
11use crate::encode::{FORMAT_VERSION, MAGIC};
12use nodedb_types::datetime::NdbDateTime;
13use nodedb_types::value::Value;
14
15use crate::error::StrictError;
16
17pub struct TupleDecoder {
22 schema: StrictSchema,
23 fixed_offsets: Vec<Option<usize>>,
26 fixed_section_size: usize,
28 var_table_index: Vec<Option<usize>>,
31 var_count: usize,
33 header_size: usize,
35}
36
37impl TupleDecoder {
38 pub fn new(schema: &StrictSchema) -> Self {
40 let mut fixed_offsets = Vec::with_capacity(schema.columns.len());
41 let mut var_table_index = Vec::with_capacity(schema.columns.len());
42 let mut fixed_offset = 0usize;
43 let mut var_idx = 0usize;
44
45 for col in &schema.columns {
46 if let Some(size) = col.column_type.fixed_size() {
47 fixed_offsets.push(Some(fixed_offset));
48 var_table_index.push(None);
49 fixed_offset += size;
50 } else {
51 fixed_offsets.push(None);
52 var_table_index.push(Some(var_idx));
53 var_idx += 1;
54 }
55 }
56
57 let header_size = 9 + schema.null_bitmap_size();
59
60 Self {
61 schema: schema.clone(),
62 fixed_offsets,
63 fixed_section_size: fixed_offset,
64 var_table_index,
65 var_count: var_idx,
66 header_size,
67 }
68 }
69
70 pub fn schema_version(&self, tuple: &[u8]) -> Result<u32, StrictError> {
75 if tuple.len() < 9 {
76 return Err(StrictError::TruncatedTuple {
77 expected: 9,
78 got: tuple.len(),
79 });
80 }
81 let got_magic = u32::from_le_bytes([tuple[0], tuple[1], tuple[2], tuple[3]]);
82 if got_magic != MAGIC {
83 return Err(StrictError::InvalidMagic {
84 expected: MAGIC,
85 got: got_magic,
86 });
87 }
88 let got_version = tuple[4];
89 if got_version != FORMAT_VERSION {
90 return Err(StrictError::InvalidFormatVersion {
91 expected: FORMAT_VERSION,
92 got: got_version,
93 });
94 }
95 Ok(u32::from_le_bytes([tuple[5], tuple[6], tuple[7], tuple[8]]))
96 }
97
98 pub fn is_null(&self, tuple: &[u8], col_idx: usize) -> Result<bool, StrictError> {
100 self.check_bounds(col_idx)?;
101 self.check_min_size(tuple)?;
102
103 let bitmap_byte = tuple[9 + col_idx / 8];
104 Ok(bitmap_byte & (1 << (col_idx % 8)) != 0)
105 }
106
107 pub fn extract_fixed_raw<'a>(
111 &self,
112 tuple: &'a [u8],
113 col_idx: usize,
114 ) -> Result<Option<&'a [u8]>, StrictError> {
115 self.check_bounds(col_idx)?;
116 self.check_min_size(tuple)?;
117
118 if self.is_null_unchecked(tuple, col_idx) {
119 return Ok(None);
120 }
121
122 let offset = self.fixed_offsets[col_idx].ok_or(StrictError::TypeMismatch {
123 column: self.schema.columns[col_idx].name.clone(),
124 expected: self.schema.columns[col_idx].column_type,
125 })?;
126
127 let size = self.schema.columns[col_idx]
128 .column_type
129 .fixed_size()
130 .ok_or(StrictError::TypeMismatch {
131 column: self.schema.columns[col_idx].name.clone(),
132 expected: self.schema.columns[col_idx].column_type,
133 })?;
134 let start = self.header_size + offset;
135 let end = start + size;
136
137 if end > tuple.len() {
138 return Err(StrictError::TruncatedTuple {
139 expected: end,
140 got: tuple.len(),
141 });
142 }
143
144 Ok(Some(&tuple[start..end]))
145 }
146
147 pub fn extract_variable_raw<'a>(
151 &self,
152 tuple: &'a [u8],
153 col_idx: usize,
154 ) -> Result<Option<&'a [u8]>, StrictError> {
155 self.check_bounds(col_idx)?;
156 self.check_min_size(tuple)?;
157
158 if self.is_null_unchecked(tuple, col_idx) {
159 return Ok(None);
160 }
161
162 let var_idx = self.var_table_index[col_idx].ok_or(StrictError::TypeMismatch {
163 column: self.schema.columns[col_idx].name.clone(),
164 expected: self.schema.columns[col_idx].column_type,
165 })?;
166
167 let table_start = self.header_size + self.fixed_section_size;
168 let entry_pos = table_start + var_idx * 4;
169 let next_pos = entry_pos + 4;
170
171 if next_pos + 4 > tuple.len() {
172 return Err(StrictError::TruncatedTuple {
173 expected: next_pos + 4,
174 got: tuple.len(),
175 });
176 }
177
178 let offset = u32::from_le_bytes(
180 tuple[entry_pos..entry_pos + 4]
181 .try_into()
182 .expect("4-byte slice from bounds-checked range"),
183 );
184 let next_offset = u32::from_le_bytes(
185 tuple[next_pos..next_pos + 4]
186 .try_into()
187 .expect("4-byte slice from bounds-checked range"),
188 );
189
190 let var_data_start = table_start + (self.var_count + 1) * 4;
191 let abs_start = var_data_start + offset as usize;
192 let abs_end = var_data_start + next_offset as usize;
193
194 if abs_end > tuple.len() {
195 return Err(StrictError::CorruptOffset {
196 offset: next_offset,
197 len: tuple.len(),
198 });
199 }
200
201 Ok(Some(&tuple[abs_start..abs_end]))
202 }
203
204 pub fn extract_value(&self, tuple: &[u8], col_idx: usize) -> Result<Value, StrictError> {
209 self.check_bounds(col_idx)?;
210
211 if self.is_null(tuple, col_idx)? {
212 return Ok(Value::Null);
213 }
214
215 let col = &self.schema.columns[col_idx];
216
217 if col.column_type.fixed_size().is_some() {
218 let raw = self
219 .extract_fixed_raw(tuple, col_idx)?
220 .ok_or(StrictError::TypeMismatch {
221 column: col.name.clone(),
222 expected: col.column_type,
223 })?;
224 Ok(decode_fixed_value(&col.column_type, raw))
225 } else {
226 let raw =
227 self.extract_variable_raw(tuple, col_idx)?
228 .ok_or(StrictError::TypeMismatch {
229 column: col.name.clone(),
230 expected: col.column_type,
231 })?;
232 Ok(decode_variable_value(&col.column_type, raw))
233 }
234 }
235
236 pub fn extract_all(&self, tuple: &[u8]) -> Result<Vec<Value>, StrictError> {
238 let mut values = Vec::with_capacity(self.schema.columns.len());
239 for i in 0..self.schema.columns.len() {
240 values.push(self.extract_value(tuple, i)?);
241 }
242 Ok(values)
243 }
244
245 pub fn extract_by_name(&self, tuple: &[u8], name: &str) -> Result<Value, StrictError> {
247 let idx = self
248 .schema
249 .column_index(name)
250 .ok_or(StrictError::ColumnOutOfRange {
251 index: usize::MAX,
252 count: self.schema.columns.len(),
253 })?;
254 self.extract_value(tuple, idx)
255 }
256
257 pub fn extract_value_versioned(
272 &self,
273 tuple: &[u8],
274 col_idx: usize,
275 old_col_count: usize,
276 ) -> Result<Value, StrictError> {
277 self.check_bounds(col_idx)?;
278
279 if col_idx >= old_col_count {
280 let col = &self.schema.columns[col_idx];
283 let value = col
284 .default
285 .as_deref()
286 .map(nodedb_types::columnar::StrictSchema::parse_default_literal)
287 .unwrap_or(Value::Null);
288 return Ok(value);
289 }
290
291 self.extract_value(tuple, col_idx)
292 }
293
294 pub fn schema(&self) -> &StrictSchema {
296 &self.schema
297 }
298
299 pub fn extract_bitemporal_timestamps(
303 &self,
304 tuple: &[u8],
305 ) -> Result<(i64, i64, i64), StrictError> {
306 if !self.schema.bitemporal {
307 return Err(StrictError::ColumnOutOfRange {
308 index: 0,
309 count: self.schema.columns.len(),
310 });
311 }
312 let sys = extract_i64(self, tuple, 0)?;
313 let vf = extract_i64(self, tuple, 1)?;
314 let vu = extract_i64(self, tuple, 2)?;
315 Ok((sys, vf, vu))
316 }
317
318 pub fn fixed_section_start(&self) -> usize {
320 self.header_size
321 }
322
323 pub fn offset_table_start(&self) -> usize {
325 self.header_size + self.fixed_section_size
326 }
327
328 pub fn var_data_start(&self) -> usize {
330 self.offset_table_start() + (self.var_count + 1) * 4
331 }
332
333 pub fn var_count(&self) -> usize {
335 self.var_count
336 }
337
338 pub fn fixed_field_location(&self, col_idx: usize) -> Option<(usize, usize)> {
341 let offset = self.fixed_offsets.get(col_idx).copied().flatten()?;
342 let size = self.schema.columns[col_idx].column_type.fixed_size()?;
343 Some((self.header_size + offset, size))
344 }
345
346 pub fn var_field_index(&self, col_idx: usize) -> Option<usize> {
349 self.var_table_index.get(col_idx).copied().flatten()
350 }
351
352 fn check_bounds(&self, col_idx: usize) -> Result<(), StrictError> {
355 if col_idx >= self.schema.columns.len() {
356 Err(StrictError::ColumnOutOfRange {
357 index: col_idx,
358 count: self.schema.columns.len(),
359 })
360 } else {
361 Ok(())
362 }
363 }
364
365 fn check_min_size(&self, tuple: &[u8]) -> Result<(), StrictError> {
366 let min = self.header_size;
367 if tuple.len() < min {
368 Err(StrictError::TruncatedTuple {
369 expected: min,
370 got: tuple.len(),
371 })
372 } else {
373 Ok(())
374 }
375 }
376
377 fn is_null_unchecked(&self, tuple: &[u8], col_idx: usize) -> bool {
378 let bitmap_byte = tuple[9 + col_idx / 8];
379 bitmap_byte & (1 << (col_idx % 8)) != 0
380 }
381}
382
383fn extract_i64(decoder: &TupleDecoder, tuple: &[u8], col_idx: usize) -> Result<i64, StrictError> {
385 let raw = decoder
386 .extract_fixed_raw(tuple, col_idx)?
387 .ok_or(StrictError::TypeMismatch {
388 column: decoder.schema.columns[col_idx].name.clone(),
389 expected: ColumnType::Int64,
390 })?;
391 Ok(i64::from_le_bytes([
392 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
393 ]))
394}
395
396fn decode_fixed_value(col_type: &ColumnType, raw: &[u8]) -> Value {
398 match col_type {
399 ColumnType::Int64 => Value::Integer(i64::from_le_bytes([
400 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
401 ])),
402 ColumnType::Float64 => Value::Float(f64::from_le_bytes([
403 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
404 ])),
405 ColumnType::Bool => Value::Bool(raw[0] != 0),
406 ColumnType::Timestamp => {
407 let micros = i64::from_le_bytes([
408 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
409 ]);
410 Value::NaiveDateTime(NdbDateTime::from_micros(micros))
411 }
412 ColumnType::Timestamptz => {
413 let micros = i64::from_le_bytes([
414 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
415 ]);
416 Value::DateTime(NdbDateTime::from_micros(micros))
417 }
418 ColumnType::Decimal { .. } => {
419 let mut bytes = [0u8; 16];
420 bytes.copy_from_slice(&raw[..16]);
421 Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
422 }
423 ColumnType::Uuid => {
424 let mut bytes = [0u8; 16];
425 bytes.copy_from_slice(&raw[..16]);
426 let parsed = uuid::Uuid::from_bytes(bytes);
427 Value::Uuid(parsed.to_string())
428 }
429 ColumnType::Vector(dim) => {
430 let d = *dim as usize;
431 let mut floats = Vec::with_capacity(d);
432 for i in 0..d {
433 let off = i * 4;
434 let bytes = [raw[off], raw[off + 1], raw[off + 2], raw[off + 3]];
435 let f = f32::from_le_bytes(bytes);
436 floats.push(Value::Float(f as f64));
437 }
438 Value::Array(floats)
439 }
440 _ => Value::Null, }
442}
443
444fn decode_variable_value(col_type: &ColumnType, raw: &[u8]) -> Value {
446 match col_type {
447 ColumnType::String => {
448 Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
449 }
450 ColumnType::Bytes => Value::Bytes(raw.to_vec()),
451 ColumnType::Geometry => {
452 if let Ok(geom) = sonic_rs::from_slice::<nodedb_types::geometry::Geometry>(raw) {
454 Value::Geometry(geom)
455 } else {
456 Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
457 }
458 }
459 ColumnType::Json => {
460 match nodedb_types::value_from_msgpack(raw) {
462 Ok(val) => val,
463 Err(e) => {
464 tracing::warn!(len = raw.len(), error = %e, "corrupted JSON msgpack in tuple");
465 Value::Null
466 }
467 }
468 }
469 _ => Value::Null,
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use nodedb_types::columnar::ColumnDef;
476
477 use super::*;
478 use crate::encode::TupleEncoder;
479
480 fn crm_schema() -> StrictSchema {
481 StrictSchema::new(vec![
482 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
483 ColumnDef::required("name", ColumnType::String),
484 ColumnDef::nullable("email", ColumnType::String),
485 ColumnDef::required(
486 "balance",
487 ColumnType::Decimal {
488 precision: 18,
489 scale: 4,
490 },
491 ),
492 ColumnDef::nullable("active", ColumnType::Bool),
493 ])
494 .unwrap()
495 }
496
497 fn encode_crm_row(values: &[Value]) -> Vec<u8> {
498 let schema = crm_schema();
499 TupleEncoder::new(&schema).encode(values).unwrap()
500 }
501
502 #[test]
503 fn roundtrip_all_fields() {
504 let schema = crm_schema();
505 let encoder = TupleEncoder::new(&schema);
506 let decoder = TupleDecoder::new(&schema);
507
508 let values = vec![
509 Value::Integer(42),
510 Value::String("Alice".into()),
511 Value::String("alice@example.com".into()),
512 Value::Decimal(rust_decimal::Decimal::new(5000, 2)),
513 Value::Bool(true),
514 ];
515
516 let tuple = encoder.encode(&values).unwrap();
517 let decoded = decoder.extract_all(&tuple).unwrap();
518
519 assert_eq!(decoded[0], Value::Integer(42));
520 assert_eq!(decoded[1], Value::String("Alice".into()));
521 assert_eq!(decoded[2], Value::String("alice@example.com".into()));
522 assert_eq!(
523 decoded[3],
524 Value::Decimal(rust_decimal::Decimal::new(5000, 2))
525 );
526 assert_eq!(decoded[4], Value::Bool(true));
527 }
528
529 #[test]
530 fn roundtrip_with_nulls() {
531 let schema = crm_schema();
532 let encoder = TupleEncoder::new(&schema);
533 let decoder = TupleDecoder::new(&schema);
534
535 let values = vec![
536 Value::Integer(1),
537 Value::String("Bob".into()),
538 Value::Null,
539 Value::Decimal(rust_decimal::Decimal::ZERO),
540 Value::Null,
541 ];
542
543 let tuple = encoder.encode(&values).unwrap();
544 let decoded = decoder.extract_all(&tuple).unwrap();
545
546 assert_eq!(decoded[0], Value::Integer(1));
547 assert_eq!(decoded[1], Value::String("Bob".into()));
548 assert_eq!(decoded[2], Value::Null);
549 assert_eq!(decoded[3], Value::Decimal(rust_decimal::Decimal::ZERO));
550 assert_eq!(decoded[4], Value::Null);
551 }
552
553 #[test]
554 fn o1_extraction_single_field() {
555 let schema = crm_schema();
556 let decoder = TupleDecoder::new(&schema);
557
558 let tuple = encode_crm_row(&[
559 Value::Integer(99),
560 Value::String("Charlie".into()),
561 Value::String("charlie@co.com".into()),
562 Value::Decimal(rust_decimal::Decimal::new(12345, 0)),
563 Value::Bool(false),
564 ]);
565
566 let balance = decoder.extract_value(&tuple, 3).unwrap();
568 assert_eq!(
569 balance,
570 Value::Decimal(rust_decimal::Decimal::new(12345, 0))
571 );
572
573 let name = decoder.extract_value(&tuple, 1).unwrap();
575 assert_eq!(name, Value::String("Charlie".into()));
576 }
577
578 #[test]
579 fn extract_by_name() {
580 let schema = crm_schema();
581 let decoder = TupleDecoder::new(&schema);
582
583 let tuple = encode_crm_row(&[
584 Value::Integer(7),
585 Value::String("Dana".into()),
586 Value::Null,
587 Value::Decimal(rust_decimal::Decimal::new(999, 1)),
588 Value::Bool(true),
589 ]);
590
591 assert_eq!(
592 decoder.extract_by_name(&tuple, "name").unwrap(),
593 Value::String("Dana".into())
594 );
595 assert_eq!(
596 decoder.extract_by_name(&tuple, "email").unwrap(),
597 Value::Null
598 );
599 }
600
601 #[test]
602 fn null_bitmap_check() {
603 let schema = crm_schema();
604 let decoder = TupleDecoder::new(&schema);
605
606 let tuple = encode_crm_row(&[
607 Value::Integer(1),
608 Value::String("x".into()),
609 Value::Null,
610 Value::Decimal(rust_decimal::Decimal::ZERO),
611 Value::Null,
612 ]);
613
614 assert!(!decoder.is_null(&tuple, 0).unwrap()); assert!(!decoder.is_null(&tuple, 1).unwrap()); assert!(decoder.is_null(&tuple, 2).unwrap()); assert!(!decoder.is_null(&tuple, 3).unwrap()); assert!(decoder.is_null(&tuple, 4).unwrap()); }
620
621 #[test]
622 fn column_out_of_range() {
623 let schema = crm_schema();
624 let decoder = TupleDecoder::new(&schema);
625 let tuple = encode_crm_row(&[
626 Value::Integer(1),
627 Value::String("x".into()),
628 Value::Null,
629 Value::Decimal(rust_decimal::Decimal::ZERO),
630 Value::Null,
631 ]);
632
633 let err = decoder.extract_value(&tuple, 99).unwrap_err();
634 assert!(matches!(
635 err,
636 StrictError::ColumnOutOfRange { index: 99, .. }
637 ));
638 }
639
640 #[test]
641 fn schema_version_read() {
642 let schema = crm_schema();
643 let decoder = TupleDecoder::new(&schema);
644 let tuple = encode_crm_row(&[
645 Value::Integer(1),
646 Value::String("x".into()),
647 Value::Null,
648 Value::Decimal(rust_decimal::Decimal::ZERO),
649 Value::Null,
650 ]);
651
652 assert_eq!(decoder.schema_version(&tuple).unwrap(), 1);
653 }
654
655 #[test]
656 fn schema_version_u32_no_truncation() {
657 let mut schema = crm_schema();
660 schema.version = 0x0001_0000;
661 let encoder = TupleEncoder::new(&schema);
662 let decoder = TupleDecoder::new(&schema);
663
664 let tuple = encoder
665 .encode(&[
666 Value::Integer(1),
667 Value::String("test".into()),
668 Value::Null,
669 Value::Decimal(rust_decimal::Decimal::ZERO),
670 Value::Null,
671 ])
672 .unwrap();
673
674 let decoded_version = decoder.schema_version(&tuple).unwrap();
675 assert_eq!(
676 decoded_version, 0x0001_0000u32,
677 "schema_version must not truncate to u16"
678 );
679 }
680
681 #[test]
682 fn versioned_extraction_new_column_returns_null() {
683 let schema = crm_schema();
684 let decoder = TupleDecoder::new(&schema);
685
686 let old_schema = StrictSchema::new(vec![
688 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
689 ColumnDef::required("name", ColumnType::String),
690 ColumnDef::nullable("email", ColumnType::String),
691 ])
692 .unwrap();
693 let old_encoder = TupleEncoder::new(&old_schema);
694 let tuple = old_encoder
695 .encode(&[Value::Integer(1), Value::String("x".into()), Value::Null])
696 .unwrap();
697
698 let balance = decoder.extract_value_versioned(&tuple, 3, 3).unwrap();
700 assert_eq!(balance, Value::Null);
701
702 let active = decoder.extract_value_versioned(&tuple, 4, 3).unwrap();
703 assert_eq!(active, Value::Null);
704
705 let id = decoder.extract_value_versioned(&tuple, 0, 3).unwrap();
707 assert_eq!(id, Value::Integer(1));
708 }
709
710 #[test]
711 fn raw_fixed_extraction() {
712 let schema = StrictSchema::new(vec![
713 ColumnDef::required("a", ColumnType::Int64),
714 ColumnDef::required("b", ColumnType::Float64),
715 ColumnDef::required("c", ColumnType::Bool),
716 ])
717 .unwrap();
718 let encoder = TupleEncoder::new(&schema);
719 let decoder = TupleDecoder::new(&schema);
720
721 let tuple = encoder
722 .encode(&[Value::Integer(42), Value::Float(0.75), Value::Bool(true)])
723 .unwrap();
724
725 let a_raw = decoder.extract_fixed_raw(&tuple, 0).unwrap().unwrap();
726 assert_eq!(i64::from_le_bytes(a_raw.try_into().unwrap()), 42);
727
728 let b_raw = decoder.extract_fixed_raw(&tuple, 1).unwrap().unwrap();
729 assert_eq!(f64::from_le_bytes(b_raw.try_into().unwrap()), 0.75);
730
731 let c_raw = decoder.extract_fixed_raw(&tuple, 2).unwrap().unwrap();
732 assert_eq!(c_raw[0], 1);
733 }
734
735 #[test]
736 fn raw_variable_extraction() {
737 let schema = StrictSchema::new(vec![
738 ColumnDef::required("id", ColumnType::Int64),
739 ColumnDef::required("name", ColumnType::String),
740 ColumnDef::nullable("bio", ColumnType::String),
741 ])
742 .unwrap();
743 let encoder = TupleEncoder::new(&schema);
744 let decoder = TupleDecoder::new(&schema);
745
746 let tuple = encoder
747 .encode(&[
748 Value::Integer(1),
749 Value::String("hello".into()),
750 Value::String("world".into()),
751 ])
752 .unwrap();
753
754 let name_raw = decoder.extract_variable_raw(&tuple, 1).unwrap().unwrap();
755 assert_eq!(std::str::from_utf8(name_raw).unwrap(), "hello");
756
757 let bio_raw = decoder.extract_variable_raw(&tuple, 2).unwrap().unwrap();
758 assert_eq!(std::str::from_utf8(bio_raw).unwrap(), "world");
759 }
760
761 #[test]
762 fn all_types_roundtrip() {
763 let schema = StrictSchema::new(vec![
764 ColumnDef::required("i", ColumnType::Int64),
765 ColumnDef::required("f", ColumnType::Float64),
766 ColumnDef::required("s", ColumnType::String),
767 ColumnDef::required("b", ColumnType::Bool),
768 ColumnDef::required("raw", ColumnType::Bytes),
769 ColumnDef::required("ts", ColumnType::Timestamp),
770 ColumnDef::required("tstz", ColumnType::Timestamptz),
771 ColumnDef::required(
772 "dec",
773 ColumnType::Decimal {
774 precision: 18,
775 scale: 4,
776 },
777 ),
778 ColumnDef::required("uid", ColumnType::Uuid),
779 ColumnDef::required("vec", ColumnType::Vector(2)),
780 ])
781 .unwrap();
782 let encoder = TupleEncoder::new(&schema);
783 let decoder = TupleDecoder::new(&schema);
784
785 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
786 let values = vec![
787 Value::Integer(-100),
788 Value::Float(0.5),
789 Value::String("test string".into()),
790 Value::Bool(false),
791 Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]),
792 Value::NaiveDateTime(NdbDateTime::from_micros(1_000_000)),
793 Value::DateTime(NdbDateTime::from_micros(2_000_000)),
794 Value::Decimal(rust_decimal::Decimal::new(314159, 5)),
795 Value::Uuid(uuid_str.into()),
796 Value::Array(vec![Value::Float(1.5), Value::Float(2.5)]),
797 ];
798
799 let tuple = encoder.encode(&values).unwrap();
800 let decoded = decoder.extract_all(&tuple).unwrap();
801
802 assert_eq!(decoded[0], Value::Integer(-100));
803 assert_eq!(decoded[1], Value::Float(0.5));
804 assert_eq!(decoded[2], Value::String("test string".into()));
805 assert_eq!(decoded[3], Value::Bool(false));
806 assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]));
807 assert_eq!(
808 decoded[5],
809 Value::NaiveDateTime(NdbDateTime::from_micros(1_000_000))
810 );
811 assert_eq!(
812 decoded[6],
813 Value::DateTime(NdbDateTime::from_micros(2_000_000))
814 );
815 assert_eq!(
816 decoded[7],
817 Value::Decimal(rust_decimal::Decimal::new(314159, 5))
818 );
819 assert_eq!(decoded[8], Value::Uuid(uuid_str.into()));
820 if let Value::Array(ref arr) = decoded[9] {
822 assert_eq!(arr.len(), 2);
823 if let Value::Float(v) = arr[0] {
824 assert!((v - 1.5).abs() < 0.001);
825 }
826 } else {
827 panic!("expected array");
828 }
829 }
830
831 fn base_schema() -> StrictSchema {
835 StrictSchema::new(vec![
836 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
837 ColumnDef::required("name", ColumnType::String),
838 ])
839 .unwrap()
840 }
841
842 fn base_tuple() -> Vec<u8> {
843 TupleEncoder::new(&base_schema())
844 .encode(&[Value::Integer(7), Value::String("Alice".into())])
845 .unwrap()
846 }
847
848 #[test]
849 fn versioned_int_default_zero_not_null() {
850 let mut schema = base_schema();
852 let mut col = ColumnDef::required("score", ColumnType::Int64).with_default("0");
853 col.added_at_version = 2;
854 schema.columns.push(col);
855 schema.version = 2;
856
857 let decoder = TupleDecoder::new(&schema);
858 let tuple = base_tuple();
859
860 let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
862 assert_eq!(val, Value::Integer(0), "expected default 0, not null");
863 }
864
865 #[test]
866 fn versioned_text_default_pending_not_null() {
867 let mut schema = base_schema();
869 let mut col = ColumnDef::required("status", ColumnType::String).with_default("'pending'");
870 col.added_at_version = 2;
871 schema.columns.push(col);
872 schema.version = 2;
873
874 let decoder = TupleDecoder::new(&schema);
875 let tuple = base_tuple();
876
877 let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
878 assert_eq!(
879 val,
880 Value::String("pending".into()),
881 "expected default 'pending', not null"
882 );
883 }
884
885 #[test]
886 fn versioned_new_row_written_at_new_schema_no_double_default() {
887 let mut schema = base_schema();
890 let mut col = ColumnDef::required("score", ColumnType::Int64).with_default("0");
891 col.added_at_version = 2;
892 schema.columns.push(col);
893 schema.version = 2;
894
895 let encoder = TupleEncoder::new(&schema);
896 let tuple = encoder
897 .encode(&[
898 Value::Integer(42),
899 Value::String("Bob".into()),
900 Value::Integer(99),
901 ])
902 .unwrap();
903
904 let decoder = TupleDecoder::new(&schema);
905 let val = decoder.extract_value_versioned(&tuple, 2, 3).unwrap();
907 assert_eq!(
908 val,
909 Value::Integer(99),
910 "must read encoded value, not default"
911 );
912 }
913
914 #[test]
915 fn versioned_multiple_alters_accumulate() {
916 let mut schema = base_schema();
919
920 let mut col_a = ColumnDef::required("a", ColumnType::Int64).with_default("10");
921 col_a.added_at_version = 2;
922 schema.columns.push(col_a);
923 schema.version = 2;
924
925 let mut col_b = ColumnDef::required("b", ColumnType::String).with_default("'x'");
926 col_b.added_at_version = 3;
927 schema.columns.push(col_b);
928 schema.version = 3;
929
930 let decoder = TupleDecoder::new(&schema);
931 let tuple = base_tuple(); let a = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
934 assert_eq!(a, Value::Integer(10), "a default must be 10");
935
936 let b = decoder.extract_value_versioned(&tuple, 3, 2).unwrap();
937 assert_eq!(b, Value::String("x".into()), "b default must be 'x'");
938
939 let id = decoder.extract_value_versioned(&tuple, 0, 2).unwrap();
941 assert_eq!(id, Value::Integer(7));
942 }
943
944 #[test]
945 fn versioned_nullable_column_no_default_returns_null() {
946 let mut schema = base_schema();
948 let mut col = ColumnDef::nullable("note", ColumnType::String);
949 col.added_at_version = 2;
950 schema.columns.push(col);
951 schema.version = 2;
952
953 let decoder = TupleDecoder::new(&schema);
954 let tuple = base_tuple();
955
956 let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
957 assert_eq!(
958 val,
959 Value::Null,
960 "nullable column without default must be null"
961 );
962 }
963}