Skip to main content

nodedb_strict/
encode.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Binary Tuple encoder: schema + values → compact byte representation.
4//!
5//! Layout:
6//! ```text
7//! [magic: u32 LE = 0x5453_444E "NDST"]   bytes 0..4
8//! [format_version: u8 = 1]               byte 4
9//! [schema_version: u32 LE]               bytes 5..9
10//! [null_bitmap: ceil(N/8) bytes, bit=1 means NULL]
11//! [fixed_fields: concatenated, zeroed when null]
12//! [offset_table: (N_var + 1) × u32 LE]
13//! [variable_data: concatenated variable-length bytes]
14//! ```
15
16/// Magic bytes identifying a Binary Tuple: `"NDST"` in little-endian.
17pub const MAGIC: u32 = 0x5453_444E;
18
19/// Current Binary Tuple format version.
20pub const FORMAT_VERSION: u8 = 1;
21
22use nodedb_types::columnar::{ColumnType, StrictSchema};
23use nodedb_types::value::Value;
24
25use crate::error::StrictError;
26
27/// Encodes rows into Binary Tuples according to a fixed schema.
28///
29/// Reusable: create once per schema, encode many rows. Internal buffers
30/// are reused across calls to minimize allocation.
31pub struct TupleEncoder {
32    schema: StrictSchema,
33    /// Precomputed: byte offset of each fixed-size column within the fixed section.
34    /// Variable-length columns get `None`.
35    fixed_offsets: Vec<Option<usize>>,
36    /// Total size of the fixed-fields section.
37    fixed_section_size: usize,
38    /// Indices of variable-length columns in schema order.
39    var_indices: Vec<usize>,
40    /// Size of the tuple header: 2 (version) + null_bitmap_size.
41    header_size: usize,
42}
43
44impl TupleEncoder {
45    /// Create an encoder for the given schema.
46    pub fn new(schema: &StrictSchema) -> Self {
47        let mut fixed_offsets = Vec::with_capacity(schema.columns.len());
48        let mut var_indices = Vec::new();
49        let mut fixed_offset = 0usize;
50
51        for (i, col) in schema.columns.iter().enumerate() {
52            if let Some(size) = col.column_type.fixed_size() {
53                fixed_offsets.push(Some(fixed_offset));
54                fixed_offset += size;
55            } else {
56                fixed_offsets.push(None);
57                var_indices.push(i);
58            }
59        }
60
61        // Header: magic(4) + format_version(1) + schema_version(4) + null_bitmap.
62        let header_size = 9 + schema.null_bitmap_size();
63
64        Self {
65            schema: schema.clone(),
66            fixed_offsets,
67            fixed_section_size: fixed_offset,
68            var_indices,
69            header_size,
70        }
71    }
72
73    /// Encode a row of values into a Binary Tuple.
74    ///
75    /// `values` must have exactly `schema.len()` entries. A `Value::Null` is
76    /// allowed only if the corresponding column is nullable.
77    pub fn encode(&self, values: &[Value]) -> Result<Vec<u8>, StrictError> {
78        let n_cols = self.schema.columns.len();
79        if values.len() != n_cols {
80            return Err(StrictError::ValueCountMismatch {
81                expected: n_cols,
82                got: values.len(),
83            });
84        }
85
86        // Pre-size: header + fixed + offset_table. Variable data appended later.
87        let offset_table_size = (self.var_indices.len() + 1) * 4;
88        let base_size = self.header_size + self.fixed_section_size + offset_table_size;
89        let mut buf = vec![0u8; base_size];
90
91        // 1. Magic, format version, schema version.
92        buf[0..4].copy_from_slice(&MAGIC.to_le_bytes());
93        buf[4] = FORMAT_VERSION;
94        buf[5..9].copy_from_slice(&self.schema.version.to_le_bytes());
95
96        // 2. Null bitmap + fixed fields + type validation.
97        let bitmap_start = 9;
98        let fixed_start = self.header_size;
99
100        for (i, (col, val)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
101            let is_null = matches!(val, Value::Null);
102
103            if is_null {
104                if !col.nullable {
105                    return Err(StrictError::NullViolation(col.name.clone()));
106                }
107                // Set null bit: byte = i / 8, bit = i % 8.
108                buf[bitmap_start + i / 8] |= 1 << (i % 8);
109                // Fixed fields remain zeroed; no variable data emitted.
110                continue;
111            }
112
113            // Type check (with coercion).
114            if !col.column_type.accepts(val) {
115                return Err(StrictError::TypeMismatch {
116                    column: col.name.clone(),
117                    expected: col.column_type,
118                });
119            }
120
121            // Write fixed-size value.
122            if let Some(offset) = self.fixed_offsets[i] {
123                let dst = fixed_start + offset;
124                encode_fixed(&mut buf[dst..], &col.column_type, val);
125            }
126            // Variable-length values are handled in the offset table pass below.
127        }
128
129        // 3. Variable-length fields: build offset table + variable data.
130        let offset_table_start = self.header_size + self.fixed_section_size;
131        let mut var_data: Vec<u8> = Vec::new();
132
133        for (var_idx, &col_idx) in self.var_indices.iter().enumerate() {
134            // Write current offset.
135            let offset = var_data.len() as u32;
136            let table_pos = offset_table_start + var_idx * 4;
137            buf[table_pos..table_pos + 4].copy_from_slice(&offset.to_le_bytes());
138
139            let val = &values[col_idx];
140            if !matches!(val, Value::Null) {
141                encode_variable(
142                    &mut var_data,
143                    &self.schema.columns[col_idx].column_type,
144                    val,
145                );
146            }
147            // If null: offset stays the same as next entry → zero length.
148        }
149
150        // Final sentinel offset (marks end of last variable field).
151        let sentinel = var_data.len() as u32;
152        let sentinel_pos = offset_table_start + self.var_indices.len() * 4;
153        buf[sentinel_pos..sentinel_pos + 4].copy_from_slice(&sentinel.to_le_bytes());
154
155        // 4. Append variable data.
156        buf.extend_from_slice(&var_data);
157
158        Ok(buf)
159    }
160
161    /// Access the schema this encoder was built for.
162    pub fn schema(&self) -> &StrictSchema {
163        &self.schema
164    }
165
166    /// Encode a row for a bitemporal strict schema. The three reserved
167    /// slots (0/1/2) are populated from the provided timestamps; the
168    /// remaining slots are filled from `user_values` in schema order.
169    ///
170    /// Errors if the schema is not bitemporal or if `user_values.len() !=
171    /// schema.len() - 3`.
172    pub fn encode_bitemporal(
173        &self,
174        system_from_ms: i64,
175        valid_from_ms: i64,
176        valid_until_ms: i64,
177        user_values: &[Value],
178    ) -> Result<Vec<u8>, StrictError> {
179        if !self.schema.bitemporal {
180            return Err(StrictError::ValueCountMismatch {
181                expected: self.schema.columns.len(),
182                got: user_values.len() + 3,
183            });
184        }
185        let expected_user = self.schema.columns.len().saturating_sub(3);
186        if user_values.len() != expected_user {
187            return Err(StrictError::ValueCountMismatch {
188                expected: expected_user,
189                got: user_values.len(),
190            });
191        }
192        let mut all = Vec::with_capacity(self.schema.columns.len());
193        all.push(Value::Integer(system_from_ms));
194        all.push(Value::Integer(valid_from_ms));
195        all.push(Value::Integer(valid_until_ms));
196        all.extend_from_slice(user_values);
197        self.encode(&all)
198    }
199}
200
201/// Encode a fixed-size value into the buffer at the given position.
202///
203/// Handles both native Value types and SQL coercion sources.
204fn encode_fixed(dst: &mut [u8], col_type: &ColumnType, value: &Value) {
205    match (col_type, value) {
206        // Int64: native.
207        (ColumnType::Int64, Value::Integer(v)) => {
208            dst[..8].copy_from_slice(&v.to_le_bytes());
209        }
210        // Float64: native + Int64→Float64 coercion.
211        (ColumnType::Float64, Value::Float(v)) => {
212            dst[..8].copy_from_slice(&v.to_le_bytes());
213        }
214        (ColumnType::Float64, Value::Integer(v)) => {
215            dst[..8].copy_from_slice(&(*v as f64).to_le_bytes());
216        }
217        // Bool: native.
218        (ColumnType::Bool, Value::Bool(v)) => {
219            dst[0] = *v as u8;
220        }
221        // Timestamp (naive): NaiveDateTime + Integer (micros) + String (ISO 8601 parse).
222        (ColumnType::Timestamp, Value::NaiveDateTime(dt)) => {
223            dst[..8].copy_from_slice(&dt.micros.to_le_bytes());
224        }
225        (ColumnType::Timestamp, Value::Integer(micros)) => {
226            dst[..8].copy_from_slice(&micros.to_le_bytes());
227        }
228        (ColumnType::Timestamp, Value::String(s)) => {
229            let micros = nodedb_types::NdbDateTime::parse(s)
230                .map(|dt| dt.micros)
231                .unwrap_or(0);
232            dst[..8].copy_from_slice(&micros.to_le_bytes());
233        }
234        // Timestamptz (TZ-aware): DateTime + Integer (micros) + String (ISO 8601 parse).
235        (ColumnType::Timestamptz, Value::DateTime(dt)) => {
236            dst[..8].copy_from_slice(&dt.micros.to_le_bytes());
237        }
238        (ColumnType::Timestamptz, Value::Integer(micros)) => {
239            dst[..8].copy_from_slice(&micros.to_le_bytes());
240        }
241        (ColumnType::Timestamptz, Value::String(s)) => {
242            let micros = nodedb_types::NdbDateTime::parse(s)
243                .map(|dt| dt.micros)
244                .unwrap_or(0);
245            dst[..8].copy_from_slice(&micros.to_le_bytes());
246        }
247        // Decimal: native Decimal + String/Float/Integer coercion.
248        (ColumnType::Decimal { .. }, Value::Decimal(d)) => {
249            dst[..16].copy_from_slice(&d.serialize());
250        }
251        (ColumnType::Decimal { .. }, Value::String(s)) => {
252            let d: rust_decimal::Decimal = s.parse().unwrap_or_default();
253            dst[..16].copy_from_slice(&d.serialize());
254        }
255        (ColumnType::Decimal { .. }, Value::Float(f)) => {
256            let d = rust_decimal::Decimal::try_from(*f).unwrap_or_default();
257            dst[..16].copy_from_slice(&d.serialize());
258        }
259        (ColumnType::Decimal { .. }, Value::Integer(i)) => {
260            let d = rust_decimal::Decimal::from(*i);
261            dst[..16].copy_from_slice(&d.serialize());
262        }
263        // Uuid: native Uuid string + String coercion.
264        (ColumnType::Uuid, Value::Uuid(s) | Value::String(s)) => {
265            if let Ok(parsed) = uuid::Uuid::parse_str(s) {
266                dst[..16].copy_from_slice(parsed.as_bytes());
267            }
268        }
269        // Vector: Array of floats + Bytes (packed f32).
270        (ColumnType::Vector(dim), Value::Array(arr)) => {
271            let d = *dim as usize;
272            for (i, v) in arr.iter().take(d).enumerate() {
273                let f = match v {
274                    Value::Float(f) => *f as f32,
275                    Value::Integer(n) => *n as f32,
276                    _ => 0.0,
277                };
278                dst[i * 4..(i + 1) * 4].copy_from_slice(&f.to_le_bytes());
279            }
280        }
281        (ColumnType::Vector(dim), Value::Bytes(b)) => {
282            let byte_len = (*dim as usize) * 4;
283            let copy_len = b.len().min(byte_len);
284            dst[..copy_len].copy_from_slice(&b[..copy_len]);
285        }
286        _ => {} // Type mismatch caught earlier by accepts().
287    }
288}
289
290/// Encode a variable-length value, appending to the data buffer.
291///
292/// Handles both native Value types and SQL coercion sources.
293fn encode_variable(var_data: &mut Vec<u8>, col_type: &ColumnType, value: &Value) {
294    match (col_type, value) {
295        (ColumnType::String, Value::String(s)) => {
296            var_data.extend_from_slice(s.as_bytes());
297        }
298        (ColumnType::Bytes, Value::Bytes(b)) => {
299            var_data.extend_from_slice(b);
300        }
301        // Geometry: native Geometry (JSON-serialized) + String (WKT/GeoJSON passthrough).
302        (ColumnType::Geometry, Value::Geometry(g)) => {
303            if let Ok(json) = sonic_rs::to_vec(g) {
304                var_data.extend_from_slice(&json);
305            }
306        }
307        (ColumnType::Geometry, Value::String(s)) => {
308            var_data.extend_from_slice(s.as_bytes());
309        }
310        (ColumnType::Json, Value::String(s)) => {
311            // String input for JSON column: parse as JSON, then serialize as MessagePack.
312            // This handles VALUES ('{"key":"val"}') where the SQL planner passes a string literal.
313            let parsed = sonic_rs::from_str::<serde_json::Value>(s)
314                .ok()
315                .map(nodedb_types::Value::from);
316            let to_encode = parsed.as_ref().unwrap_or(value);
317            if let Ok(bytes) = nodedb_types::value_to_msgpack(to_encode) {
318                var_data.extend_from_slice(&bytes);
319            }
320        }
321        (ColumnType::Json, value) => {
322            // Non-string input (Object, Array, etc.): serialize directly as MessagePack.
323            if let Ok(bytes) = nodedb_types::value_to_msgpack(value) {
324                var_data.extend_from_slice(&bytes);
325            }
326        }
327        _ => {}
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use nodedb_types::columnar::ColumnDef;
334    use nodedb_types::datetime::NdbDateTime;
335
336    use super::*;
337
338    fn crm_schema() -> StrictSchema {
339        StrictSchema::new(vec![
340            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
341            ColumnDef::required("name", ColumnType::String),
342            ColumnDef::nullable("email", ColumnType::String),
343            ColumnDef::required(
344                "balance",
345                ColumnType::Decimal {
346                    precision: 18,
347                    scale: 4,
348                },
349            ),
350            ColumnDef::nullable("active", ColumnType::Bool),
351        ])
352        .unwrap()
353    }
354
355    #[test]
356    fn encode_basic_row() {
357        let schema = crm_schema();
358        let encoder = TupleEncoder::new(&schema);
359
360        let values = vec![
361            Value::Integer(42),
362            Value::String("Alice".into()),
363            Value::String("alice@example.com".into()),
364            Value::Decimal(rust_decimal::Decimal::new(5000, 2)),
365            Value::Bool(true),
366        ];
367
368        let tuple = encoder.encode(&values).unwrap();
369
370        // Header: magic(4) + format_version(1) + schema_version(4) + null_bitmap(1) = 10 bytes
371        // magic = 0x5453_444E "NDST" LE
372        assert_eq!(&tuple[0..4], &0x5453_444Eu32.to_le_bytes()); // magic
373        assert_eq!(tuple[4], 1); // format_version
374        assert_eq!(tuple[5], 1); // schema version low byte = 1
375        assert_eq!(tuple[6], 0); // schema version byte 1
376        assert_eq!(tuple[7], 0); // schema version byte 2
377        assert_eq!(tuple[8], 0); // schema version byte 3
378        assert_eq!(tuple[9], 0); // null bitmap: no nulls
379
380        // Fixed section: Int64(8) + Decimal(16) + Bool(1) = 25 bytes
381        // Starting at offset 10
382        let id_bytes = &tuple[10..18];
383        assert_eq!(i64::from_le_bytes(id_bytes.try_into().unwrap()), 42);
384    }
385
386    #[test]
387    fn encode_with_nulls() {
388        let schema = crm_schema();
389        let encoder = TupleEncoder::new(&schema);
390
391        let values = vec![
392            Value::Integer(1),
393            Value::String("Bob".into()),
394            Value::Null, // email is nullable
395            Value::Decimal(rust_decimal::Decimal::ZERO),
396            Value::Null, // active is nullable
397        ];
398
399        let tuple = encoder.encode(&values).unwrap();
400
401        // Null bitmap at byte 9: bit 2 (email) and bit 4 (active) set.
402        // Bit 2 = 0b00000100 = 4, bit 4 = 0b00010000 = 16. Combined = 20.
403        assert_eq!(tuple[9], 0b00010100);
404    }
405
406    #[test]
407    fn encode_null_violation() {
408        let schema = crm_schema();
409        let encoder = TupleEncoder::new(&schema);
410
411        let values = vec![
412            Value::Null, // id is NOT NULL
413            Value::String("x".into()),
414            Value::Null,
415            Value::Decimal(rust_decimal::Decimal::ZERO),
416            Value::Null,
417        ];
418
419        let err = encoder.encode(&values).unwrap_err();
420        assert!(matches!(err, StrictError::NullViolation(ref s) if s == "id"));
421    }
422
423    #[test]
424    fn encode_type_mismatch() {
425        let schema = crm_schema();
426        let encoder = TupleEncoder::new(&schema);
427
428        let values = vec![
429            Value::String("not_an_int".into()), // id expects Int64
430            Value::String("x".into()),
431            Value::Null,
432            Value::Decimal(rust_decimal::Decimal::ZERO),
433            Value::Null,
434        ];
435
436        let err = encoder.encode(&values).unwrap_err();
437        assert!(matches!(err, StrictError::TypeMismatch { .. }));
438    }
439
440    #[test]
441    fn encode_value_count_mismatch() {
442        let schema = crm_schema();
443        let encoder = TupleEncoder::new(&schema);
444
445        let err = encoder.encode(&[Value::Integer(1)]).unwrap_err();
446        assert!(matches!(err, StrictError::ValueCountMismatch { .. }));
447    }
448
449    #[test]
450    fn encode_int_to_float_coercion() {
451        let schema =
452            StrictSchema::new(vec![ColumnDef::required("val", ColumnType::Float64)]).unwrap();
453        let encoder = TupleEncoder::new(&schema);
454
455        // Int64 → Float64 coercion should work.
456        let tuple = encoder.encode(&[Value::Integer(42)]).unwrap();
457        // Header: magic(4)+format_version(1)+schema_version(4)+bitmap(1) = 10. Fixed: 8 bytes Float64.
458        let f = f64::from_le_bytes(tuple[10..18].try_into().unwrap());
459        assert_eq!(f, 42.0);
460    }
461
462    #[test]
463    fn encode_timestamp() {
464        let schema =
465            StrictSchema::new(vec![ColumnDef::required("ts", ColumnType::Timestamp)]).unwrap();
466        let encoder = TupleEncoder::new(&schema);
467
468        let dt = NdbDateTime::from_micros(1_700_000_000_000_000);
469        let tuple = encoder.encode(&[Value::NaiveDateTime(dt)]).unwrap();
470        let micros = i64::from_le_bytes(tuple[10..18].try_into().unwrap());
471        assert_eq!(micros, 1_700_000_000_000_000);
472    }
473
474    #[test]
475    fn encode_timestamptz() {
476        let schema =
477            StrictSchema::new(vec![ColumnDef::required("ts", ColumnType::Timestamptz)]).unwrap();
478        let encoder = TupleEncoder::new(&schema);
479
480        let dt = NdbDateTime::from_micros(1_700_000_000_000_000);
481        let tuple = encoder.encode(&[Value::DateTime(dt)]).unwrap();
482        let micros = i64::from_le_bytes(tuple[10..18].try_into().unwrap());
483        assert_eq!(micros, 1_700_000_000_000_000);
484    }
485
486    #[test]
487    fn encode_decode_json_column() {
488        let schema = StrictSchema::new(vec![
489            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
490            ColumnDef::nullable("metadata", ColumnType::Json),
491        ])
492        .unwrap();
493        let encoder = TupleEncoder::new(&schema);
494
495        let metadata = Value::Object(std::collections::HashMap::from([
496            ("source".to_string(), Value::String("web".to_string())),
497            ("priority".to_string(), Value::Integer(3)),
498        ]));
499        let values = vec![Value::Integer(1), metadata.clone()];
500        let tuple = encoder.encode(&values).unwrap();
501
502        // Tuple must be longer than just the header + fixed section.
503        // Header: 10 bytes. Fixed: 8 (Int64). Offset table: 8 (2 entries × u32).
504        // Variable data must be non-empty (MessagePack of the object).
505        let min_size = 10 + 8 + 8;
506        assert!(tuple.len() > min_size, "tuple should contain variable data");
507
508        // Decode and verify the value roundtrips correctly.
509        let decoder = crate::decode::TupleDecoder::new(&schema);
510        let decoded = decoder.extract_all(&tuple).unwrap();
511        assert_eq!(decoded[0], Value::Integer(1));
512        assert_eq!(decoded[1], metadata);
513    }
514
515    #[test]
516    fn encode_json_null() {
517        let schema = StrictSchema::new(vec![
518            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
519            ColumnDef::nullable("data", ColumnType::Json),
520        ])
521        .unwrap();
522        let encoder = TupleEncoder::new(&schema);
523        let tuple = encoder.encode(&[Value::Integer(1), Value::Null]).unwrap();
524        // Null bitmap byte (index 9): bit 1 (column 1) should be set → 0b00000010 = 2.
525        assert_eq!(tuple[9] & 0b10, 0b10);
526    }
527
528    #[test]
529    fn encode_bitemporal_roundtrip() {
530        let schema = StrictSchema::new_bitemporal(vec![
531            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
532            ColumnDef::nullable("name", ColumnType::String),
533        ])
534        .unwrap();
535        assert!(schema.bitemporal);
536        assert_eq!(schema.columns[0].name, "__system_from_ms");
537        assert_eq!(schema.columns[1].name, "__valid_from_ms");
538        assert_eq!(schema.columns[2].name, "__valid_until_ms");
539        assert_eq!(schema.columns[3].name, "id");
540
541        let encoder = TupleEncoder::new(&schema);
542        let tuple = encoder
543            .encode_bitemporal(
544                100,
545                200,
546                i64::MAX,
547                &[Value::Integer(42), Value::String("alice".into())],
548            )
549            .unwrap();
550
551        let decoder = crate::decode::TupleDecoder::new(&schema);
552        let (sys, vf, vu) = decoder.extract_bitemporal_timestamps(&tuple).unwrap();
553        assert_eq!((sys, vf, vu), (100, 200, i64::MAX));
554        assert_eq!(
555            decoder.extract_by_name(&tuple, "id").unwrap(),
556            Value::Integer(42)
557        );
558        assert_eq!(
559            decoder.extract_by_name(&tuple, "name").unwrap(),
560            Value::String("alice".into())
561        );
562    }
563
564    #[test]
565    fn reserved_column_name_rejected() {
566        let err = StrictSchema::new(vec![ColumnDef::required(
567            "__system_from_ms",
568            ColumnType::Int64,
569        )])
570        .unwrap_err();
571        assert!(matches!(
572            err,
573            nodedb_types::columnar::SchemaError::ReservedColumnName(ref s) if s == "__system_from_ms"
574        ));
575    }
576
577    #[test]
578    fn encode_bitemporal_rejects_wrong_user_count() {
579        let schema = StrictSchema::new_bitemporal(vec![
580            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
581        ])
582        .unwrap();
583        let encoder = TupleEncoder::new(&schema);
584        let err = encoder.encode_bitemporal(0, 0, 0, &[]).unwrap_err();
585        assert!(matches!(
586            err,
587            StrictError::ValueCountMismatch {
588                expected: 1,
589                got: 0
590            }
591        ));
592    }
593
594    #[test]
595    fn encode_bitemporal_on_non_bitemporal_schema_errors() {
596        let schema = crm_schema();
597        let encoder = TupleEncoder::new(&schema);
598        let err = encoder.encode_bitemporal(0, 0, 0, &[]).unwrap_err();
599        assert!(matches!(err, StrictError::ValueCountMismatch { .. }));
600    }
601
602    #[test]
603    fn encode_vector() {
604        let schema =
605            StrictSchema::new(vec![ColumnDef::required("emb", ColumnType::Vector(3))]).unwrap();
606        let encoder = TupleEncoder::new(&schema);
607
608        let vals = vec![Value::Array(vec![
609            Value::Float(1.0),
610            Value::Float(2.0),
611            Value::Float(3.0),
612        ])];
613        let tuple = encoder.encode(&vals).unwrap();
614        // Header: 10 bytes. Fixed: 12 bytes (3 × f32).
615        let f0 = f32::from_le_bytes(tuple[10..14].try_into().unwrap());
616        let f1 = f32::from_le_bytes(tuple[14..18].try_into().unwrap());
617        let f2 = f32::from_le_bytes(tuple[18..22].try_into().unwrap());
618        assert_eq!((f0, f1, f2), (1.0, 2.0, 3.0));
619    }
620
621    /// Asserts NDST magic at [0..4], FORMAT_VERSION == 1 at [4], and
622    /// schema_version u32 at [5..9].
623    #[test]
624    fn golden_strict_tuple_format() {
625        let schema = crm_schema();
626        let encoder = TupleEncoder::new(&schema);
627        let values = vec![
628            Value::Integer(1),
629            Value::String("A".into()),
630            Value::String("a@b.com".into()),
631            Value::Decimal(rust_decimal::Decimal::ZERO),
632            Value::Bool(false),
633        ];
634        let tuple = encoder.encode(&values).unwrap();
635
636        // Magic at [0..4]: "NDST" LE = 0x5453_444E.
637        assert_eq!(
638            &tuple[0..4],
639            &0x5453_444Eu32.to_le_bytes(),
640            "magic mismatch"
641        );
642        assert_eq!(&tuple[0..4], b"NDST", "magic bytes mismatch");
643
644        // FORMAT_VERSION == 1 at [4].
645        assert_eq!(tuple[4], FORMAT_VERSION, "format_version mismatch");
646        assert_eq!(tuple[4], 1u8, "expected FORMAT_VERSION == 1");
647
648        // schema_version u32 LE at [5..9].
649        let schema_ver = u32::from_le_bytes([tuple[5], tuple[6], tuple[7], tuple[8]]);
650        assert_eq!(
651            schema_ver, 1u32,
652            "schema_version must be 1 for version-1 schema"
653        );
654
655        // null bitmap at [9]: no nulls → 0.
656        assert_eq!(tuple[9], 0u8, "expected no null bits");
657
658        // Tuple must be longer than the 10-byte header.
659        assert!(
660            tuple.len() > 10,
661            "tuple must contain fixed/variable data after header"
662        );
663    }
664}