Skip to main content

nodedb_strict/
decode.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Binary Tuple decoder: O(1) field extraction from tuple bytes.
4//!
5//! Given a schema and a column index, computes the byte offset and extracts
6//! the field value without parsing any other column. This is the core
7//! performance advantage over self-describing formats like MessagePack/BSON.
8
9use nodedb_types::columnar::{ColumnType, SchemaOps, StrictSchema};
10
11use crate::encode::{FORMAT_VERSION, MAGIC};
12use nodedb_types::datetime::NdbDateTime;
13use nodedb_types::value::Value;
14
15use crate::error::StrictError;
16
17/// Decodes fields from Binary Tuples according to a fixed schema.
18///
19/// Reusable: create once per schema, decode many tuples. Precomputes
20/// byte offsets for O(1) field access.
21pub struct TupleDecoder {
22    schema: StrictSchema,
23    /// Byte offset of each fixed-size column within the fixed section.
24    /// Variable-length columns get `None`.
25    fixed_offsets: Vec<Option<usize>>,
26    /// Total size of the fixed-fields section.
27    fixed_section_size: usize,
28    /// For each schema column: if it's variable-length, its index in the
29    /// offset table (0-based among variable columns). Otherwise `None`.
30    var_table_index: Vec<Option<usize>>,
31    /// Number of variable-length columns.
32    var_count: usize,
33    /// Size of the tuple header: 4 (version) + null_bitmap_size.
34    header_size: usize,
35}
36
37impl TupleDecoder {
38    /// Create a decoder for the given schema.
39    pub fn new(schema: &StrictSchema) -> Self {
40        let mut fixed_offsets = Vec::with_capacity(schema.columns.len());
41        let mut var_table_index = Vec::with_capacity(schema.columns.len());
42        let mut fixed_offset = 0usize;
43        let mut var_idx = 0usize;
44
45        for col in &schema.columns {
46            if let Some(size) = col.column_type.fixed_size() {
47                fixed_offsets.push(Some(fixed_offset));
48                var_table_index.push(None);
49                fixed_offset += size;
50            } else {
51                fixed_offsets.push(None);
52                var_table_index.push(Some(var_idx));
53                var_idx += 1;
54            }
55        }
56
57        // Header: magic(4) + format_version(1) + schema_version(4) + null_bitmap.
58        let header_size = 9 + schema.null_bitmap_size();
59
60        Self {
61            schema: schema.clone(),
62            fixed_offsets,
63            fixed_section_size: fixed_offset,
64            var_table_index,
65            var_count: var_idx,
66            header_size,
67        }
68    }
69
70    /// Read and validate the header, then return the schema version.
71    ///
72    /// Validates magic bytes at [0..4] and format version at [4] before
73    /// returning the schema version at [5..9].
74    pub fn schema_version(&self, tuple: &[u8]) -> Result<u32, StrictError> {
75        if tuple.len() < 9 {
76            return Err(StrictError::TruncatedTuple {
77                expected: 9,
78                got: tuple.len(),
79            });
80        }
81        let got_magic = u32::from_le_bytes([tuple[0], tuple[1], tuple[2], tuple[3]]);
82        if got_magic != MAGIC {
83            return Err(StrictError::InvalidMagic {
84                expected: MAGIC,
85                got: got_magic,
86            });
87        }
88        let got_version = tuple[4];
89        if got_version != FORMAT_VERSION {
90            return Err(StrictError::InvalidFormatVersion {
91                expected: FORMAT_VERSION,
92                got: got_version,
93            });
94        }
95        Ok(u32::from_le_bytes([tuple[5], tuple[6], tuple[7], tuple[8]]))
96    }
97
98    /// Check whether column `col_idx` is null in the given tuple.
99    pub fn is_null(&self, tuple: &[u8], col_idx: usize) -> Result<bool, StrictError> {
100        self.check_bounds(col_idx)?;
101        self.check_min_size(tuple)?;
102
103        let bitmap_byte = tuple[9 + col_idx / 8];
104        Ok(bitmap_byte & (1 << (col_idx % 8)) != 0)
105    }
106
107    /// Extract raw bytes for a fixed-size column. Returns `None` if null.
108    ///
109    /// This is the O(1) fast path: a single bounds check + pointer slice.
110    pub fn extract_fixed_raw<'a>(
111        &self,
112        tuple: &'a [u8],
113        col_idx: usize,
114    ) -> Result<Option<&'a [u8]>, StrictError> {
115        self.check_bounds(col_idx)?;
116        self.check_min_size(tuple)?;
117
118        if self.is_null_unchecked(tuple, col_idx) {
119            return Ok(None);
120        }
121
122        let offset = self.fixed_offsets[col_idx].ok_or(StrictError::TypeMismatch {
123            column: self.schema.columns[col_idx].name.clone(),
124            expected: self.schema.columns[col_idx].column_type,
125        })?;
126
127        let size = self.schema.columns[col_idx]
128            .column_type
129            .fixed_size()
130            .ok_or(StrictError::TypeMismatch {
131                column: self.schema.columns[col_idx].name.clone(),
132                expected: self.schema.columns[col_idx].column_type,
133            })?;
134        let start = self.header_size + offset;
135        let end = start + size;
136
137        if end > tuple.len() {
138            return Err(StrictError::TruncatedTuple {
139                expected: end,
140                got: tuple.len(),
141            });
142        }
143
144        Ok(Some(&tuple[start..end]))
145    }
146
147    /// Extract raw bytes for a variable-length column. Returns `None` if null.
148    ///
149    /// Reads two entries from the offset table to determine start and length.
150    pub fn extract_variable_raw<'a>(
151        &self,
152        tuple: &'a [u8],
153        col_idx: usize,
154    ) -> Result<Option<&'a [u8]>, StrictError> {
155        self.check_bounds(col_idx)?;
156        self.check_min_size(tuple)?;
157
158        if self.is_null_unchecked(tuple, col_idx) {
159            return Ok(None);
160        }
161
162        let var_idx = self.var_table_index[col_idx].ok_or(StrictError::TypeMismatch {
163            column: self.schema.columns[col_idx].name.clone(),
164            expected: self.schema.columns[col_idx].column_type,
165        })?;
166
167        let table_start = self.header_size + self.fixed_section_size;
168        let entry_pos = table_start + var_idx * 4;
169        let next_pos = entry_pos + 4;
170
171        if next_pos + 4 > tuple.len() {
172            return Err(StrictError::TruncatedTuple {
173                expected: next_pos + 4,
174                got: tuple.len(),
175            });
176        }
177
178        // Safety: bounds checked above — entry_pos..+4 and next_pos..+4 are within tuple.
179        let offset = u32::from_le_bytes(
180            tuple[entry_pos..entry_pos + 4]
181                .try_into()
182                .expect("4-byte slice from bounds-checked range"),
183        );
184        let next_offset = u32::from_le_bytes(
185            tuple[next_pos..next_pos + 4]
186                .try_into()
187                .expect("4-byte slice from bounds-checked range"),
188        );
189
190        let var_data_start = table_start + (self.var_count + 1) * 4;
191        let abs_start = var_data_start + offset as usize;
192        let abs_end = var_data_start + next_offset as usize;
193
194        if abs_end > tuple.len() {
195            return Err(StrictError::CorruptOffset {
196                offset: next_offset,
197                len: tuple.len(),
198            });
199        }
200
201        Ok(Some(&tuple[abs_start..abs_end]))
202    }
203
204    /// Extract a column value as a `Value`, performing type-aware decoding.
205    ///
206    /// This is the general-purpose extraction path. For hot paths, prefer
207    /// `extract_fixed_raw` / `extract_variable_raw` to avoid `Value` allocation.
208    pub fn extract_value(&self, tuple: &[u8], col_idx: usize) -> Result<Value, StrictError> {
209        self.check_bounds(col_idx)?;
210
211        if self.is_null(tuple, col_idx)? {
212            return Ok(Value::Null);
213        }
214
215        let col = &self.schema.columns[col_idx];
216
217        if col.column_type.fixed_size().is_some() {
218            let raw = self
219                .extract_fixed_raw(tuple, col_idx)?
220                .ok_or(StrictError::TypeMismatch {
221                    column: col.name.clone(),
222                    expected: col.column_type,
223                })?;
224            Ok(decode_fixed_value(&col.column_type, raw))
225        } else {
226            let raw =
227                self.extract_variable_raw(tuple, col_idx)?
228                    .ok_or(StrictError::TypeMismatch {
229                        column: col.name.clone(),
230                        expected: col.column_type,
231                    })?;
232            Ok(decode_variable_value(&col.column_type, raw))
233        }
234    }
235
236    /// Extract all columns from a tuple into a Vec<Value>.
237    pub fn extract_all(&self, tuple: &[u8]) -> Result<Vec<Value>, StrictError> {
238        let mut values = Vec::with_capacity(self.schema.columns.len());
239        for i in 0..self.schema.columns.len() {
240            values.push(self.extract_value(tuple, i)?);
241        }
242        Ok(values)
243    }
244
245    /// Extract a column by name.
246    pub fn extract_by_name(&self, tuple: &[u8], name: &str) -> Result<Value, StrictError> {
247        let idx = self
248            .schema
249            .column_index(name)
250            .ok_or(StrictError::ColumnOutOfRange {
251                index: usize::MAX,
252                count: self.schema.columns.len(),
253            })?;
254        self.extract_value(tuple, idx)
255    }
256
257    /// Decode a tuple written with an older schema version.
258    ///
259    /// Columns present in the old schema are extracted normally. Columns added
260    /// in newer schema versions return their declared default value, or
261    /// `Value::Null` for nullable columns without a default.
262    ///
263    /// `old_col_count` is the number of columns in the schema version that
264    /// wrote this tuple.
265    ///
266    /// Correctness invariant: a non-nullable column added via
267    /// `ALTER ADD COLUMN ... NOT NULL DEFAULT <expr>` will always return the
268    /// materialized default, never `Value::Null`. The ALTER path rejects
269    /// `NOT NULL` without a `DEFAULT`, so every non-nullable column in the
270    /// schema must have `col.default` populated.
271    pub fn extract_value_versioned(
272        &self,
273        tuple: &[u8],
274        col_idx: usize,
275        old_col_count: usize,
276    ) -> Result<Value, StrictError> {
277        self.check_bounds(col_idx)?;
278
279        if col_idx >= old_col_count {
280            // Column was added after this tuple was written: materialize the
281            // declared default, or null for nullable columns without a default.
282            let col = &self.schema.columns[col_idx];
283            let value = col
284                .default
285                .as_deref()
286                .map(nodedb_types::columnar::StrictSchema::parse_default_literal)
287                .unwrap_or(Value::Null);
288            return Ok(value);
289        }
290
291        self.extract_value(tuple, col_idx)
292    }
293
294    /// Access the schema this decoder was built for.
295    pub fn schema(&self) -> &StrictSchema {
296        &self.schema
297    }
298
299    /// Extract the three bitemporal timestamps from a tuple:
300    /// `(system_from_ms, valid_from_ms, valid_until_ms)`. Only valid for
301    /// schemas constructed with `StrictSchema::new_bitemporal`.
302    pub fn extract_bitemporal_timestamps(
303        &self,
304        tuple: &[u8],
305    ) -> Result<(i64, i64, i64), StrictError> {
306        if !self.schema.bitemporal {
307            return Err(StrictError::ColumnOutOfRange {
308                index: 0,
309                count: self.schema.columns.len(),
310            });
311        }
312        let sys = extract_i64(self, tuple, 0)?;
313        let vf = extract_i64(self, tuple, 1)?;
314        let vu = extract_i64(self, tuple, 2)?;
315        Ok((sys, vf, vu))
316    }
317
318    /// Byte offset where fixed-field section starts.
319    pub fn fixed_section_start(&self) -> usize {
320        self.header_size
321    }
322
323    /// Byte offset where the variable offset table starts.
324    pub fn offset_table_start(&self) -> usize {
325        self.header_size + self.fixed_section_size
326    }
327
328    /// Byte offset where variable data starts.
329    pub fn var_data_start(&self) -> usize {
330        self.offset_table_start() + (self.var_count + 1) * 4
331    }
332
333    /// Number of variable-length columns in the schema.
334    pub fn var_count(&self) -> usize {
335        self.var_count
336    }
337
338    /// Byte offset and size for a fixed column (relative to tuple start).
339    /// Returns `None` if the column is variable-length.
340    pub fn fixed_field_location(&self, col_idx: usize) -> Option<(usize, usize)> {
341        let offset = self.fixed_offsets.get(col_idx).copied().flatten()?;
342        let size = self.schema.columns[col_idx].column_type.fixed_size()?;
343        Some((self.header_size + offset, size))
344    }
345
346    /// Index in the variable offset table for a column.
347    /// Returns `None` if the column is fixed-size.
348    pub fn var_field_index(&self, col_idx: usize) -> Option<usize> {
349        self.var_table_index.get(col_idx).copied().flatten()
350    }
351
352    // -- Internal helpers --
353
354    fn check_bounds(&self, col_idx: usize) -> Result<(), StrictError> {
355        if col_idx >= self.schema.columns.len() {
356            Err(StrictError::ColumnOutOfRange {
357                index: col_idx,
358                count: self.schema.columns.len(),
359            })
360        } else {
361            Ok(())
362        }
363    }
364
365    fn check_min_size(&self, tuple: &[u8]) -> Result<(), StrictError> {
366        let min = self.header_size;
367        if tuple.len() < min {
368            Err(StrictError::TruncatedTuple {
369                expected: min,
370                got: tuple.len(),
371            })
372        } else {
373            Ok(())
374        }
375    }
376
377    fn is_null_unchecked(&self, tuple: &[u8], col_idx: usize) -> bool {
378        let bitmap_byte = tuple[9 + col_idx / 8];
379        bitmap_byte & (1 << (col_idx % 8)) != 0
380    }
381}
382
383/// Extract a fixed Int64 column as a raw i64.
384fn extract_i64(decoder: &TupleDecoder, tuple: &[u8], col_idx: usize) -> Result<i64, StrictError> {
385    let raw = decoder
386        .extract_fixed_raw(tuple, col_idx)?
387        .ok_or(StrictError::TypeMismatch {
388            column: decoder.schema.columns[col_idx].name.clone(),
389            expected: ColumnType::Int64,
390        })?;
391    Ok(i64::from_le_bytes([
392        raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
393    ]))
394}
395
396/// Decode a fixed-size raw byte slice into a Value.
397fn decode_fixed_value(col_type: &ColumnType, raw: &[u8]) -> Value {
398    match col_type {
399        ColumnType::Int64 => Value::Integer(i64::from_le_bytes([
400            raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
401        ])),
402        ColumnType::Float64 => Value::Float(f64::from_le_bytes([
403            raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
404        ])),
405        ColumnType::Bool => Value::Bool(raw[0] != 0),
406        ColumnType::Timestamp => {
407            let micros = i64::from_le_bytes([
408                raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
409            ]);
410            Value::NaiveDateTime(NdbDateTime::from_micros(micros))
411        }
412        ColumnType::Timestamptz => {
413            let micros = i64::from_le_bytes([
414                raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
415            ]);
416            Value::DateTime(NdbDateTime::from_micros(micros))
417        }
418        ColumnType::Decimal { .. } => {
419            let mut bytes = [0u8; 16];
420            bytes.copy_from_slice(&raw[..16]);
421            Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
422        }
423        ColumnType::Uuid => {
424            let mut bytes = [0u8; 16];
425            bytes.copy_from_slice(&raw[..16]);
426            let parsed = uuid::Uuid::from_bytes(bytes);
427            Value::Uuid(parsed.to_string())
428        }
429        ColumnType::Vector(dim) => {
430            let d = *dim as usize;
431            let mut floats = Vec::with_capacity(d);
432            for i in 0..d {
433                let off = i * 4;
434                let bytes = [raw[off], raw[off + 1], raw[off + 2], raw[off + 3]];
435                let f = f32::from_le_bytes(bytes);
436                floats.push(Value::Float(f as f64));
437            }
438            Value::Array(floats)
439        }
440        _ => Value::Null, // Unreachable for fixed types.
441    }
442}
443
444/// Decode a variable-length raw byte slice into a Value.
445fn decode_variable_value(col_type: &ColumnType, raw: &[u8]) -> Value {
446    match col_type {
447        ColumnType::String => {
448            Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
449        }
450        ColumnType::Bytes => Value::Bytes(raw.to_vec()),
451        ColumnType::Geometry => {
452            // Try JSON (native Geometry encoding), fall back to string (WKT passthrough).
453            if let Ok(geom) = sonic_rs::from_slice::<nodedb_types::geometry::Geometry>(raw) {
454                Value::Geometry(geom)
455            } else {
456                Value::String(std::str::from_utf8(raw).unwrap_or_default().to_string())
457            }
458        }
459        ColumnType::Json => {
460            // Deserialize MessagePack bytes back to Value.
461            match nodedb_types::value_from_msgpack(raw) {
462                Ok(val) => val,
463                Err(e) => {
464                    tracing::warn!(len = raw.len(), error = %e, "corrupted JSON msgpack in tuple");
465                    Value::Null
466                }
467            }
468        }
469        _ => Value::Null,
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use nodedb_types::columnar::ColumnDef;
476
477    use super::*;
478    use crate::encode::TupleEncoder;
479
480    fn crm_schema() -> StrictSchema {
481        StrictSchema::new(vec![
482            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
483            ColumnDef::required("name", ColumnType::String),
484            ColumnDef::nullable("email", ColumnType::String),
485            ColumnDef::required(
486                "balance",
487                ColumnType::Decimal {
488                    precision: 18,
489                    scale: 4,
490                },
491            ),
492            ColumnDef::nullable("active", ColumnType::Bool),
493        ])
494        .unwrap()
495    }
496
497    fn encode_crm_row(values: &[Value]) -> Vec<u8> {
498        let schema = crm_schema();
499        TupleEncoder::new(&schema).encode(values).unwrap()
500    }
501
502    #[test]
503    fn roundtrip_all_fields() {
504        let schema = crm_schema();
505        let encoder = TupleEncoder::new(&schema);
506        let decoder = TupleDecoder::new(&schema);
507
508        let values = vec![
509            Value::Integer(42),
510            Value::String("Alice".into()),
511            Value::String("alice@example.com".into()),
512            Value::Decimal(rust_decimal::Decimal::new(5000, 2)),
513            Value::Bool(true),
514        ];
515
516        let tuple = encoder.encode(&values).unwrap();
517        let decoded = decoder.extract_all(&tuple).unwrap();
518
519        assert_eq!(decoded[0], Value::Integer(42));
520        assert_eq!(decoded[1], Value::String("Alice".into()));
521        assert_eq!(decoded[2], Value::String("alice@example.com".into()));
522        assert_eq!(
523            decoded[3],
524            Value::Decimal(rust_decimal::Decimal::new(5000, 2))
525        );
526        assert_eq!(decoded[4], Value::Bool(true));
527    }
528
529    #[test]
530    fn roundtrip_with_nulls() {
531        let schema = crm_schema();
532        let encoder = TupleEncoder::new(&schema);
533        let decoder = TupleDecoder::new(&schema);
534
535        let values = vec![
536            Value::Integer(1),
537            Value::String("Bob".into()),
538            Value::Null,
539            Value::Decimal(rust_decimal::Decimal::ZERO),
540            Value::Null,
541        ];
542
543        let tuple = encoder.encode(&values).unwrap();
544        let decoded = decoder.extract_all(&tuple).unwrap();
545
546        assert_eq!(decoded[0], Value::Integer(1));
547        assert_eq!(decoded[1], Value::String("Bob".into()));
548        assert_eq!(decoded[2], Value::Null);
549        assert_eq!(decoded[3], Value::Decimal(rust_decimal::Decimal::ZERO));
550        assert_eq!(decoded[4], Value::Null);
551    }
552
553    #[test]
554    fn o1_extraction_single_field() {
555        let schema = crm_schema();
556        let decoder = TupleDecoder::new(&schema);
557
558        let tuple = encode_crm_row(&[
559            Value::Integer(99),
560            Value::String("Charlie".into()),
561            Value::String("charlie@co.com".into()),
562            Value::Decimal(rust_decimal::Decimal::new(12345, 0)),
563            Value::Bool(false),
564        ]);
565
566        // Extract just the balance (column 3) without touching other columns.
567        let balance = decoder.extract_value(&tuple, 3).unwrap();
568        assert_eq!(
569            balance,
570            Value::Decimal(rust_decimal::Decimal::new(12345, 0))
571        );
572
573        // Extract just the name (column 1) — variable-length.
574        let name = decoder.extract_value(&tuple, 1).unwrap();
575        assert_eq!(name, Value::String("Charlie".into()));
576    }
577
578    #[test]
579    fn extract_by_name() {
580        let schema = crm_schema();
581        let decoder = TupleDecoder::new(&schema);
582
583        let tuple = encode_crm_row(&[
584            Value::Integer(7),
585            Value::String("Dana".into()),
586            Value::Null,
587            Value::Decimal(rust_decimal::Decimal::new(999, 1)),
588            Value::Bool(true),
589        ]);
590
591        assert_eq!(
592            decoder.extract_by_name(&tuple, "name").unwrap(),
593            Value::String("Dana".into())
594        );
595        assert_eq!(
596            decoder.extract_by_name(&tuple, "email").unwrap(),
597            Value::Null
598        );
599    }
600
601    #[test]
602    fn null_bitmap_check() {
603        let schema = crm_schema();
604        let decoder = TupleDecoder::new(&schema);
605
606        let tuple = encode_crm_row(&[
607            Value::Integer(1),
608            Value::String("x".into()),
609            Value::Null,
610            Value::Decimal(rust_decimal::Decimal::ZERO),
611            Value::Null,
612        ]);
613
614        assert!(!decoder.is_null(&tuple, 0).unwrap()); // id
615        assert!(!decoder.is_null(&tuple, 1).unwrap()); // name
616        assert!(decoder.is_null(&tuple, 2).unwrap()); // email
617        assert!(!decoder.is_null(&tuple, 3).unwrap()); // balance
618        assert!(decoder.is_null(&tuple, 4).unwrap()); // active
619    }
620
621    #[test]
622    fn column_out_of_range() {
623        let schema = crm_schema();
624        let decoder = TupleDecoder::new(&schema);
625        let tuple = encode_crm_row(&[
626            Value::Integer(1),
627            Value::String("x".into()),
628            Value::Null,
629            Value::Decimal(rust_decimal::Decimal::ZERO),
630            Value::Null,
631        ]);
632
633        let err = decoder.extract_value(&tuple, 99).unwrap_err();
634        assert!(matches!(
635            err,
636            StrictError::ColumnOutOfRange { index: 99, .. }
637        ));
638    }
639
640    #[test]
641    fn schema_version_read() {
642        let schema = crm_schema();
643        let decoder = TupleDecoder::new(&schema);
644        let tuple = encode_crm_row(&[
645            Value::Integer(1),
646            Value::String("x".into()),
647            Value::Null,
648            Value::Decimal(rust_decimal::Decimal::ZERO),
649            Value::Null,
650        ]);
651
652        assert_eq!(decoder.schema_version(&tuple).unwrap(), 1);
653    }
654
655    #[test]
656    fn schema_version_u32_no_truncation() {
657        // Verify that a schema version above u16::MAX (0x0001_0000 = 65536) encodes
658        // and decodes without truncation — the u16 ceiling bug this test guards against.
659        let mut schema = crm_schema();
660        schema.version = 0x0001_0000;
661        let encoder = TupleEncoder::new(&schema);
662        let decoder = TupleDecoder::new(&schema);
663
664        let tuple = encoder
665            .encode(&[
666                Value::Integer(1),
667                Value::String("test".into()),
668                Value::Null,
669                Value::Decimal(rust_decimal::Decimal::ZERO),
670                Value::Null,
671            ])
672            .unwrap();
673
674        let decoded_version = decoder.schema_version(&tuple).unwrap();
675        assert_eq!(
676            decoded_version, 0x0001_0000u32,
677            "schema_version must not truncate to u16"
678        );
679    }
680
681    #[test]
682    fn versioned_extraction_new_column_returns_null() {
683        let schema = crm_schema();
684        let decoder = TupleDecoder::new(&schema);
685
686        // Tuple was written with only 3 columns (older schema).
687        let old_schema = StrictSchema::new(vec![
688            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
689            ColumnDef::required("name", ColumnType::String),
690            ColumnDef::nullable("email", ColumnType::String),
691        ])
692        .unwrap();
693        let old_encoder = TupleEncoder::new(&old_schema);
694        let tuple = old_encoder
695            .encode(&[Value::Integer(1), Value::String("x".into()), Value::Null])
696            .unwrap();
697
698        // Reading column 3 (balance) and 4 (active) with old_col_count=3:
699        let balance = decoder.extract_value_versioned(&tuple, 3, 3).unwrap();
700        assert_eq!(balance, Value::Null);
701
702        let active = decoder.extract_value_versioned(&tuple, 4, 3).unwrap();
703        assert_eq!(active, Value::Null);
704
705        // But column 0 (id) still works:
706        let id = decoder.extract_value_versioned(&tuple, 0, 3).unwrap();
707        assert_eq!(id, Value::Integer(1));
708    }
709
710    #[test]
711    fn raw_fixed_extraction() {
712        let schema = StrictSchema::new(vec![
713            ColumnDef::required("a", ColumnType::Int64),
714            ColumnDef::required("b", ColumnType::Float64),
715            ColumnDef::required("c", ColumnType::Bool),
716        ])
717        .unwrap();
718        let encoder = TupleEncoder::new(&schema);
719        let decoder = TupleDecoder::new(&schema);
720
721        let tuple = encoder
722            .encode(&[Value::Integer(42), Value::Float(0.75), Value::Bool(true)])
723            .unwrap();
724
725        let a_raw = decoder.extract_fixed_raw(&tuple, 0).unwrap().unwrap();
726        assert_eq!(i64::from_le_bytes(a_raw.try_into().unwrap()), 42);
727
728        let b_raw = decoder.extract_fixed_raw(&tuple, 1).unwrap().unwrap();
729        assert_eq!(f64::from_le_bytes(b_raw.try_into().unwrap()), 0.75);
730
731        let c_raw = decoder.extract_fixed_raw(&tuple, 2).unwrap().unwrap();
732        assert_eq!(c_raw[0], 1);
733    }
734
735    #[test]
736    fn raw_variable_extraction() {
737        let schema = StrictSchema::new(vec![
738            ColumnDef::required("id", ColumnType::Int64),
739            ColumnDef::required("name", ColumnType::String),
740            ColumnDef::nullable("bio", ColumnType::String),
741        ])
742        .unwrap();
743        let encoder = TupleEncoder::new(&schema);
744        let decoder = TupleDecoder::new(&schema);
745
746        let tuple = encoder
747            .encode(&[
748                Value::Integer(1),
749                Value::String("hello".into()),
750                Value::String("world".into()),
751            ])
752            .unwrap();
753
754        let name_raw = decoder.extract_variable_raw(&tuple, 1).unwrap().unwrap();
755        assert_eq!(std::str::from_utf8(name_raw).unwrap(), "hello");
756
757        let bio_raw = decoder.extract_variable_raw(&tuple, 2).unwrap().unwrap();
758        assert_eq!(std::str::from_utf8(bio_raw).unwrap(), "world");
759    }
760
761    #[test]
762    fn all_types_roundtrip() {
763        let schema = StrictSchema::new(vec![
764            ColumnDef::required("i", ColumnType::Int64),
765            ColumnDef::required("f", ColumnType::Float64),
766            ColumnDef::required("s", ColumnType::String),
767            ColumnDef::required("b", ColumnType::Bool),
768            ColumnDef::required("raw", ColumnType::Bytes),
769            ColumnDef::required("ts", ColumnType::Timestamp),
770            ColumnDef::required("tstz", ColumnType::Timestamptz),
771            ColumnDef::required(
772                "dec",
773                ColumnType::Decimal {
774                    precision: 18,
775                    scale: 4,
776                },
777            ),
778            ColumnDef::required("uid", ColumnType::Uuid),
779            ColumnDef::required("vec", ColumnType::Vector(2)),
780        ])
781        .unwrap();
782        let encoder = TupleEncoder::new(&schema);
783        let decoder = TupleDecoder::new(&schema);
784
785        let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
786        let values = vec![
787            Value::Integer(-100),
788            Value::Float(0.5),
789            Value::String("test string".into()),
790            Value::Bool(false),
791            Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]),
792            Value::NaiveDateTime(NdbDateTime::from_micros(1_000_000)),
793            Value::DateTime(NdbDateTime::from_micros(2_000_000)),
794            Value::Decimal(rust_decimal::Decimal::new(314159, 5)),
795            Value::Uuid(uuid_str.into()),
796            Value::Array(vec![Value::Float(1.5), Value::Float(2.5)]),
797        ];
798
799        let tuple = encoder.encode(&values).unwrap();
800        let decoded = decoder.extract_all(&tuple).unwrap();
801
802        assert_eq!(decoded[0], Value::Integer(-100));
803        assert_eq!(decoded[1], Value::Float(0.5));
804        assert_eq!(decoded[2], Value::String("test string".into()));
805        assert_eq!(decoded[3], Value::Bool(false));
806        assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]));
807        assert_eq!(
808            decoded[5],
809            Value::NaiveDateTime(NdbDateTime::from_micros(1_000_000))
810        );
811        assert_eq!(
812            decoded[6],
813            Value::DateTime(NdbDateTime::from_micros(2_000_000))
814        );
815        assert_eq!(
816            decoded[7],
817            Value::Decimal(rust_decimal::Decimal::new(314159, 5))
818        );
819        assert_eq!(decoded[8], Value::Uuid(uuid_str.into()));
820        // Vector goes through f64→f32→f64 roundtrip, check approximate.
821        if let Value::Array(ref arr) = decoded[9] {
822            assert_eq!(arr.len(), 2);
823            if let Value::Float(v) = arr[0] {
824                assert!((v - 1.5).abs() < 0.001);
825            }
826        } else {
827            panic!("expected array");
828        }
829    }
830
831    /// Build a two-column "base" schema (id INT64, name TEXT) encoded as a
832    /// tuple, then decode it against a four-column "current" schema that added
833    /// two columns via ALTER.
834    fn base_schema() -> StrictSchema {
835        StrictSchema::new(vec![
836            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
837            ColumnDef::required("name", ColumnType::String),
838        ])
839        .unwrap()
840    }
841
842    fn base_tuple() -> Vec<u8> {
843        TupleEncoder::new(&base_schema())
844            .encode(&[Value::Integer(7), Value::String("Alice".into())])
845            .unwrap()
846    }
847
848    #[test]
849    fn versioned_int_default_zero_not_null() {
850        // Schema gained a NOT NULL column with DEFAULT 0.
851        let mut schema = base_schema();
852        let mut col = ColumnDef::required("score", ColumnType::Int64).with_default("0");
853        col.added_at_version = 2;
854        schema.columns.push(col);
855        schema.version = 2;
856
857        let decoder = TupleDecoder::new(&schema);
858        let tuple = base_tuple();
859
860        // Column 2 (score) did not exist when the tuple was written (old_col_count=2).
861        let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
862        assert_eq!(val, Value::Integer(0), "expected default 0, not null");
863    }
864
865    #[test]
866    fn versioned_text_default_pending_not_null() {
867        // Schema gained a NOT NULL TEXT column with DEFAULT 'pending'.
868        let mut schema = base_schema();
869        let mut col = ColumnDef::required("status", ColumnType::String).with_default("'pending'");
870        col.added_at_version = 2;
871        schema.columns.push(col);
872        schema.version = 2;
873
874        let decoder = TupleDecoder::new(&schema);
875        let tuple = base_tuple();
876
877        let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
878        assert_eq!(
879            val,
880            Value::String("pending".into()),
881            "expected default 'pending', not null"
882        );
883    }
884
885    #[test]
886    fn versioned_new_row_written_at_new_schema_no_double_default() {
887        // A tuple written under the new schema already has the column encoded;
888        // extract_value_versioned must read the real encoded value, not the default.
889        let mut schema = base_schema();
890        let mut col = ColumnDef::required("score", ColumnType::Int64).with_default("0");
891        col.added_at_version = 2;
892        schema.columns.push(col);
893        schema.version = 2;
894
895        let encoder = TupleEncoder::new(&schema);
896        let tuple = encoder
897            .encode(&[
898                Value::Integer(42),
899                Value::String("Bob".into()),
900                Value::Integer(99),
901            ])
902            .unwrap();
903
904        let decoder = TupleDecoder::new(&schema);
905        // All three columns were present when this tuple was written.
906        let val = decoder.extract_value_versioned(&tuple, 2, 3).unwrap();
907        assert_eq!(
908            val,
909            Value::Integer(99),
910            "must read encoded value, not default"
911        );
912    }
913
914    #[test]
915    fn versioned_multiple_alters_accumulate() {
916        // V0 (2 cols) → V1 adds `a INT64 DEFAULT 10` → V2 adds `b TEXT DEFAULT 'x'`.
917        // A V0 tuple must read defaults for both `a` and `b`.
918        let mut schema = base_schema();
919
920        let mut col_a = ColumnDef::required("a", ColumnType::Int64).with_default("10");
921        col_a.added_at_version = 2;
922        schema.columns.push(col_a);
923        schema.version = 2;
924
925        let mut col_b = ColumnDef::required("b", ColumnType::String).with_default("'x'");
926        col_b.added_at_version = 3;
927        schema.columns.push(col_b);
928        schema.version = 3;
929
930        let decoder = TupleDecoder::new(&schema);
931        let tuple = base_tuple(); // written at V1 (2 cols)
932
933        let a = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
934        assert_eq!(a, Value::Integer(10), "a default must be 10");
935
936        let b = decoder.extract_value_versioned(&tuple, 3, 2).unwrap();
937        assert_eq!(b, Value::String("x".into()), "b default must be 'x'");
938
939        // Original columns still decode correctly.
940        let id = decoder.extract_value_versioned(&tuple, 0, 2).unwrap();
941        assert_eq!(id, Value::Integer(7));
942    }
943
944    #[test]
945    fn versioned_nullable_column_no_default_returns_null() {
946        // A nullable column with no default must return null (not an error).
947        let mut schema = base_schema();
948        let mut col = ColumnDef::nullable("note", ColumnType::String);
949        col.added_at_version = 2;
950        schema.columns.push(col);
951        schema.version = 2;
952
953        let decoder = TupleDecoder::new(&schema);
954        let tuple = base_tuple();
955
956        let val = decoder.extract_value_versioned(&tuple, 2, 2).unwrap();
957        assert_eq!(
958            val,
959            Value::Null,
960            "nullable column without default must be null"
961        );
962    }
963}