1pub const MAGIC: u32 = 0x5453_444E;
18
19pub const FORMAT_VERSION: u8 = 1;
21
22use nodedb_types::columnar::{ColumnType, StrictSchema};
23use nodedb_types::value::Value;
24
25use crate::error::StrictError;
26
27pub struct TupleEncoder {
32 schema: StrictSchema,
33 fixed_offsets: Vec<Option<usize>>,
36 fixed_section_size: usize,
38 var_indices: Vec<usize>,
40 header_size: usize,
42}
43
44impl TupleEncoder {
45 pub fn new(schema: &StrictSchema) -> Self {
47 let mut fixed_offsets = Vec::with_capacity(schema.columns.len());
48 let mut var_indices = Vec::new();
49 let mut fixed_offset = 0usize;
50
51 for (i, col) in schema.columns.iter().enumerate() {
52 if let Some(size) = col.column_type.fixed_size() {
53 fixed_offsets.push(Some(fixed_offset));
54 fixed_offset += size;
55 } else {
56 fixed_offsets.push(None);
57 var_indices.push(i);
58 }
59 }
60
61 let header_size = 9 + schema.null_bitmap_size();
63
64 Self {
65 schema: schema.clone(),
66 fixed_offsets,
67 fixed_section_size: fixed_offset,
68 var_indices,
69 header_size,
70 }
71 }
72
73 pub fn encode(&self, values: &[Value]) -> Result<Vec<u8>, StrictError> {
78 let n_cols = self.schema.columns.len();
79 if values.len() != n_cols {
80 return Err(StrictError::ValueCountMismatch {
81 expected: n_cols,
82 got: values.len(),
83 });
84 }
85
86 let offset_table_size = (self.var_indices.len() + 1) * 4;
88 let base_size = self.header_size + self.fixed_section_size + offset_table_size;
89 let mut buf = vec![0u8; base_size];
90
91 buf[0..4].copy_from_slice(&MAGIC.to_le_bytes());
93 buf[4] = FORMAT_VERSION;
94 buf[5..9].copy_from_slice(&self.schema.version.to_le_bytes());
95
96 let bitmap_start = 9;
98 let fixed_start = self.header_size;
99
100 for (i, (col, val)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
101 let is_null = matches!(val, Value::Null);
102
103 if is_null {
104 if !col.nullable {
105 return Err(StrictError::NullViolation(col.name.clone()));
106 }
107 buf[bitmap_start + i / 8] |= 1 << (i % 8);
109 continue;
111 }
112
113 if !col.column_type.accepts(val) {
115 return Err(StrictError::TypeMismatch {
116 column: col.name.clone(),
117 expected: col.column_type,
118 });
119 }
120
121 if let Some(offset) = self.fixed_offsets[i] {
123 let dst = fixed_start + offset;
124 encode_fixed(&mut buf[dst..], &col.column_type, val);
125 }
126 }
128
129 let offset_table_start = self.header_size + self.fixed_section_size;
131 let mut var_data: Vec<u8> = Vec::new();
132
133 for (var_idx, &col_idx) in self.var_indices.iter().enumerate() {
134 let offset = var_data.len() as u32;
136 let table_pos = offset_table_start + var_idx * 4;
137 buf[table_pos..table_pos + 4].copy_from_slice(&offset.to_le_bytes());
138
139 let val = &values[col_idx];
140 if !matches!(val, Value::Null) {
141 encode_variable(
142 &mut var_data,
143 &self.schema.columns[col_idx].column_type,
144 val,
145 );
146 }
147 }
149
150 let sentinel = var_data.len() as u32;
152 let sentinel_pos = offset_table_start + self.var_indices.len() * 4;
153 buf[sentinel_pos..sentinel_pos + 4].copy_from_slice(&sentinel.to_le_bytes());
154
155 buf.extend_from_slice(&var_data);
157
158 Ok(buf)
159 }
160
161 pub fn schema(&self) -> &StrictSchema {
163 &self.schema
164 }
165
166 pub fn encode_bitemporal(
173 &self,
174 system_from_ms: i64,
175 valid_from_ms: i64,
176 valid_until_ms: i64,
177 user_values: &[Value],
178 ) -> Result<Vec<u8>, StrictError> {
179 if !self.schema.bitemporal {
180 return Err(StrictError::ValueCountMismatch {
181 expected: self.schema.columns.len(),
182 got: user_values.len() + 3,
183 });
184 }
185 let expected_user = self.schema.columns.len().saturating_sub(3);
186 if user_values.len() != expected_user {
187 return Err(StrictError::ValueCountMismatch {
188 expected: expected_user,
189 got: user_values.len(),
190 });
191 }
192 let mut all = Vec::with_capacity(self.schema.columns.len());
193 all.push(Value::Integer(system_from_ms));
194 all.push(Value::Integer(valid_from_ms));
195 all.push(Value::Integer(valid_until_ms));
196 all.extend_from_slice(user_values);
197 self.encode(&all)
198 }
199}
200
201fn encode_fixed(dst: &mut [u8], col_type: &ColumnType, value: &Value) {
205 match (col_type, value) {
206 (ColumnType::Int64, Value::Integer(v)) => {
208 dst[..8].copy_from_slice(&v.to_le_bytes());
209 }
210 (ColumnType::Float64, Value::Float(v)) => {
212 dst[..8].copy_from_slice(&v.to_le_bytes());
213 }
214 (ColumnType::Float64, Value::Integer(v)) => {
215 dst[..8].copy_from_slice(&(*v as f64).to_le_bytes());
216 }
217 (ColumnType::Bool, Value::Bool(v)) => {
219 dst[0] = *v as u8;
220 }
221 (ColumnType::Timestamp, Value::NaiveDateTime(dt)) => {
223 dst[..8].copy_from_slice(&dt.micros.to_le_bytes());
224 }
225 (ColumnType::Timestamp, Value::Integer(micros)) => {
226 dst[..8].copy_from_slice(µs.to_le_bytes());
227 }
228 (ColumnType::Timestamp, Value::String(s)) => {
229 let micros = nodedb_types::NdbDateTime::parse(s)
230 .map(|dt| dt.micros)
231 .unwrap_or(0);
232 dst[..8].copy_from_slice(µs.to_le_bytes());
233 }
234 (ColumnType::Timestamptz, Value::DateTime(dt)) => {
236 dst[..8].copy_from_slice(&dt.micros.to_le_bytes());
237 }
238 (ColumnType::Timestamptz, Value::Integer(micros)) => {
239 dst[..8].copy_from_slice(µs.to_le_bytes());
240 }
241 (ColumnType::Timestamptz, Value::String(s)) => {
242 let micros = nodedb_types::NdbDateTime::parse(s)
243 .map(|dt| dt.micros)
244 .unwrap_or(0);
245 dst[..8].copy_from_slice(µs.to_le_bytes());
246 }
247 (ColumnType::Decimal { .. }, Value::Decimal(d)) => {
249 dst[..16].copy_from_slice(&d.serialize());
250 }
251 (ColumnType::Decimal { .. }, Value::String(s)) => {
252 let d: rust_decimal::Decimal = s.parse().unwrap_or_default();
253 dst[..16].copy_from_slice(&d.serialize());
254 }
255 (ColumnType::Decimal { .. }, Value::Float(f)) => {
256 let d = rust_decimal::Decimal::try_from(*f).unwrap_or_default();
257 dst[..16].copy_from_slice(&d.serialize());
258 }
259 (ColumnType::Decimal { .. }, Value::Integer(i)) => {
260 let d = rust_decimal::Decimal::from(*i);
261 dst[..16].copy_from_slice(&d.serialize());
262 }
263 (ColumnType::Uuid, Value::Uuid(s) | Value::String(s)) => {
265 if let Ok(parsed) = uuid::Uuid::parse_str(s) {
266 dst[..16].copy_from_slice(parsed.as_bytes());
267 }
268 }
269 (ColumnType::Vector(dim), Value::Array(arr)) => {
271 let d = *dim as usize;
272 for (i, v) in arr.iter().take(d).enumerate() {
273 let f = match v {
274 Value::Float(f) => *f as f32,
275 Value::Integer(n) => *n as f32,
276 _ => 0.0,
277 };
278 dst[i * 4..(i + 1) * 4].copy_from_slice(&f.to_le_bytes());
279 }
280 }
281 (ColumnType::Vector(dim), Value::Bytes(b)) => {
282 let byte_len = (*dim as usize) * 4;
283 let copy_len = b.len().min(byte_len);
284 dst[..copy_len].copy_from_slice(&b[..copy_len]);
285 }
286 _ => {} }
288}
289
290fn encode_variable(var_data: &mut Vec<u8>, col_type: &ColumnType, value: &Value) {
294 match (col_type, value) {
295 (ColumnType::String, Value::String(s)) => {
296 var_data.extend_from_slice(s.as_bytes());
297 }
298 (ColumnType::Bytes, Value::Bytes(b)) => {
299 var_data.extend_from_slice(b);
300 }
301 (ColumnType::Geometry, Value::Geometry(g)) => {
303 if let Ok(json) = sonic_rs::to_vec(g) {
304 var_data.extend_from_slice(&json);
305 }
306 }
307 (ColumnType::Geometry, Value::String(s)) => {
308 var_data.extend_from_slice(s.as_bytes());
309 }
310 (ColumnType::Json, Value::String(s)) => {
311 let parsed = sonic_rs::from_str::<serde_json::Value>(s)
314 .ok()
315 .map(nodedb_types::Value::from);
316 let to_encode = parsed.as_ref().unwrap_or(value);
317 if let Ok(bytes) = nodedb_types::value_to_msgpack(to_encode) {
318 var_data.extend_from_slice(&bytes);
319 }
320 }
321 (ColumnType::Json, value) => {
322 if let Ok(bytes) = nodedb_types::value_to_msgpack(value) {
324 var_data.extend_from_slice(&bytes);
325 }
326 }
327 _ => {}
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use nodedb_types::columnar::ColumnDef;
334 use nodedb_types::datetime::NdbDateTime;
335
336 use super::*;
337
338 fn crm_schema() -> StrictSchema {
339 StrictSchema::new(vec![
340 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
341 ColumnDef::required("name", ColumnType::String),
342 ColumnDef::nullable("email", ColumnType::String),
343 ColumnDef::required(
344 "balance",
345 ColumnType::Decimal {
346 precision: 18,
347 scale: 4,
348 },
349 ),
350 ColumnDef::nullable("active", ColumnType::Bool),
351 ])
352 .unwrap()
353 }
354
355 #[test]
356 fn encode_basic_row() {
357 let schema = crm_schema();
358 let encoder = TupleEncoder::new(&schema);
359
360 let values = vec![
361 Value::Integer(42),
362 Value::String("Alice".into()),
363 Value::String("alice@example.com".into()),
364 Value::Decimal(rust_decimal::Decimal::new(5000, 2)),
365 Value::Bool(true),
366 ];
367
368 let tuple = encoder.encode(&values).unwrap();
369
370 assert_eq!(&tuple[0..4], &0x5453_444Eu32.to_le_bytes()); assert_eq!(tuple[4], 1); assert_eq!(tuple[5], 1); assert_eq!(tuple[6], 0); assert_eq!(tuple[7], 0); assert_eq!(tuple[8], 0); assert_eq!(tuple[9], 0); let id_bytes = &tuple[10..18];
383 assert_eq!(i64::from_le_bytes(id_bytes.try_into().unwrap()), 42);
384 }
385
386 #[test]
387 fn encode_with_nulls() {
388 let schema = crm_schema();
389 let encoder = TupleEncoder::new(&schema);
390
391 let values = vec![
392 Value::Integer(1),
393 Value::String("Bob".into()),
394 Value::Null, Value::Decimal(rust_decimal::Decimal::ZERO),
396 Value::Null, ];
398
399 let tuple = encoder.encode(&values).unwrap();
400
401 assert_eq!(tuple[9], 0b00010100);
404 }
405
406 #[test]
407 fn encode_null_violation() {
408 let schema = crm_schema();
409 let encoder = TupleEncoder::new(&schema);
410
411 let values = vec![
412 Value::Null, Value::String("x".into()),
414 Value::Null,
415 Value::Decimal(rust_decimal::Decimal::ZERO),
416 Value::Null,
417 ];
418
419 let err = encoder.encode(&values).unwrap_err();
420 assert!(matches!(err, StrictError::NullViolation(ref s) if s == "id"));
421 }
422
423 #[test]
424 fn encode_type_mismatch() {
425 let schema = crm_schema();
426 let encoder = TupleEncoder::new(&schema);
427
428 let values = vec![
429 Value::String("not_an_int".into()), Value::String("x".into()),
431 Value::Null,
432 Value::Decimal(rust_decimal::Decimal::ZERO),
433 Value::Null,
434 ];
435
436 let err = encoder.encode(&values).unwrap_err();
437 assert!(matches!(err, StrictError::TypeMismatch { .. }));
438 }
439
440 #[test]
441 fn encode_value_count_mismatch() {
442 let schema = crm_schema();
443 let encoder = TupleEncoder::new(&schema);
444
445 let err = encoder.encode(&[Value::Integer(1)]).unwrap_err();
446 assert!(matches!(err, StrictError::ValueCountMismatch { .. }));
447 }
448
449 #[test]
450 fn encode_int_to_float_coercion() {
451 let schema =
452 StrictSchema::new(vec![ColumnDef::required("val", ColumnType::Float64)]).unwrap();
453 let encoder = TupleEncoder::new(&schema);
454
455 let tuple = encoder.encode(&[Value::Integer(42)]).unwrap();
457 let f = f64::from_le_bytes(tuple[10..18].try_into().unwrap());
459 assert_eq!(f, 42.0);
460 }
461
462 #[test]
463 fn encode_timestamp() {
464 let schema =
465 StrictSchema::new(vec![ColumnDef::required("ts", ColumnType::Timestamp)]).unwrap();
466 let encoder = TupleEncoder::new(&schema);
467
468 let dt = NdbDateTime::from_micros(1_700_000_000_000_000);
469 let tuple = encoder.encode(&[Value::NaiveDateTime(dt)]).unwrap();
470 let micros = i64::from_le_bytes(tuple[10..18].try_into().unwrap());
471 assert_eq!(micros, 1_700_000_000_000_000);
472 }
473
474 #[test]
475 fn encode_timestamptz() {
476 let schema =
477 StrictSchema::new(vec![ColumnDef::required("ts", ColumnType::Timestamptz)]).unwrap();
478 let encoder = TupleEncoder::new(&schema);
479
480 let dt = NdbDateTime::from_micros(1_700_000_000_000_000);
481 let tuple = encoder.encode(&[Value::DateTime(dt)]).unwrap();
482 let micros = i64::from_le_bytes(tuple[10..18].try_into().unwrap());
483 assert_eq!(micros, 1_700_000_000_000_000);
484 }
485
486 #[test]
487 fn encode_decode_json_column() {
488 let schema = StrictSchema::new(vec![
489 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
490 ColumnDef::nullable("metadata", ColumnType::Json),
491 ])
492 .unwrap();
493 let encoder = TupleEncoder::new(&schema);
494
495 let metadata = Value::Object(std::collections::HashMap::from([
496 ("source".to_string(), Value::String("web".to_string())),
497 ("priority".to_string(), Value::Integer(3)),
498 ]));
499 let values = vec![Value::Integer(1), metadata.clone()];
500 let tuple = encoder.encode(&values).unwrap();
501
502 let min_size = 10 + 8 + 8;
506 assert!(tuple.len() > min_size, "tuple should contain variable data");
507
508 let decoder = crate::decode::TupleDecoder::new(&schema);
510 let decoded = decoder.extract_all(&tuple).unwrap();
511 assert_eq!(decoded[0], Value::Integer(1));
512 assert_eq!(decoded[1], metadata);
513 }
514
515 #[test]
516 fn encode_json_null() {
517 let schema = StrictSchema::new(vec![
518 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
519 ColumnDef::nullable("data", ColumnType::Json),
520 ])
521 .unwrap();
522 let encoder = TupleEncoder::new(&schema);
523 let tuple = encoder.encode(&[Value::Integer(1), Value::Null]).unwrap();
524 assert_eq!(tuple[9] & 0b10, 0b10);
526 }
527
528 #[test]
529 fn encode_bitemporal_roundtrip() {
530 let schema = StrictSchema::new_bitemporal(vec![
531 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
532 ColumnDef::nullable("name", ColumnType::String),
533 ])
534 .unwrap();
535 assert!(schema.bitemporal);
536 assert_eq!(schema.columns[0].name, "__system_from_ms");
537 assert_eq!(schema.columns[1].name, "__valid_from_ms");
538 assert_eq!(schema.columns[2].name, "__valid_until_ms");
539 assert_eq!(schema.columns[3].name, "id");
540
541 let encoder = TupleEncoder::new(&schema);
542 let tuple = encoder
543 .encode_bitemporal(
544 100,
545 200,
546 i64::MAX,
547 &[Value::Integer(42), Value::String("alice".into())],
548 )
549 .unwrap();
550
551 let decoder = crate::decode::TupleDecoder::new(&schema);
552 let (sys, vf, vu) = decoder.extract_bitemporal_timestamps(&tuple).unwrap();
553 assert_eq!((sys, vf, vu), (100, 200, i64::MAX));
554 assert_eq!(
555 decoder.extract_by_name(&tuple, "id").unwrap(),
556 Value::Integer(42)
557 );
558 assert_eq!(
559 decoder.extract_by_name(&tuple, "name").unwrap(),
560 Value::String("alice".into())
561 );
562 }
563
564 #[test]
565 fn reserved_column_name_rejected() {
566 let err = StrictSchema::new(vec![ColumnDef::required(
567 "__system_from_ms",
568 ColumnType::Int64,
569 )])
570 .unwrap_err();
571 assert!(matches!(
572 err,
573 nodedb_types::columnar::SchemaError::ReservedColumnName(ref s) if s == "__system_from_ms"
574 ));
575 }
576
577 #[test]
578 fn encode_bitemporal_rejects_wrong_user_count() {
579 let schema = StrictSchema::new_bitemporal(vec![
580 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
581 ])
582 .unwrap();
583 let encoder = TupleEncoder::new(&schema);
584 let err = encoder.encode_bitemporal(0, 0, 0, &[]).unwrap_err();
585 assert!(matches!(
586 err,
587 StrictError::ValueCountMismatch {
588 expected: 1,
589 got: 0
590 }
591 ));
592 }
593
594 #[test]
595 fn encode_bitemporal_on_non_bitemporal_schema_errors() {
596 let schema = crm_schema();
597 let encoder = TupleEncoder::new(&schema);
598 let err = encoder.encode_bitemporal(0, 0, 0, &[]).unwrap_err();
599 assert!(matches!(err, StrictError::ValueCountMismatch { .. }));
600 }
601
602 #[test]
603 fn encode_vector() {
604 let schema =
605 StrictSchema::new(vec![ColumnDef::required("emb", ColumnType::Vector(3))]).unwrap();
606 let encoder = TupleEncoder::new(&schema);
607
608 let vals = vec![Value::Array(vec![
609 Value::Float(1.0),
610 Value::Float(2.0),
611 Value::Float(3.0),
612 ])];
613 let tuple = encoder.encode(&vals).unwrap();
614 let f0 = f32::from_le_bytes(tuple[10..14].try_into().unwrap());
616 let f1 = f32::from_le_bytes(tuple[14..18].try_into().unwrap());
617 let f2 = f32::from_le_bytes(tuple[18..22].try_into().unwrap());
618 assert_eq!((f0, f1, f2), (1.0, 2.0, 3.0));
619 }
620
621 #[test]
624 fn golden_strict_tuple_format() {
625 let schema = crm_schema();
626 let encoder = TupleEncoder::new(&schema);
627 let values = vec![
628 Value::Integer(1),
629 Value::String("A".into()),
630 Value::String("a@b.com".into()),
631 Value::Decimal(rust_decimal::Decimal::ZERO),
632 Value::Bool(false),
633 ];
634 let tuple = encoder.encode(&values).unwrap();
635
636 assert_eq!(
638 &tuple[0..4],
639 &0x5453_444Eu32.to_le_bytes(),
640 "magic mismatch"
641 );
642 assert_eq!(&tuple[0..4], b"NDST", "magic bytes mismatch");
643
644 assert_eq!(tuple[4], FORMAT_VERSION, "format_version mismatch");
646 assert_eq!(tuple[4], 1u8, "expected FORMAT_VERSION == 1");
647
648 let schema_ver = u32::from_le_bytes([tuple[5], tuple[6], tuple[7], tuple[8]]);
650 assert_eq!(
651 schema_ver, 1u32,
652 "schema_version must be 1 for version-1 schema"
653 );
654
655 assert_eq!(tuple[9], 0u8, "expected no null bits");
657
658 assert!(
660 tuple.len() > 10,
661 "tuple must contain fixed/variable data after header"
662 );
663 }
664}