1use super::SchemaId;
7use super::enum_support::{EnumInfo, EnumVariantInfo};
8use super::field_types::{FieldDef, FieldType, semantic_to_field_type};
9use arrow_schema::{DataType, Schema as ArrowSchema};
10use sha2::{Digest, Sha256};
11use std::collections::HashMap;
12
13#[inline]
20fn allocate_current_id() -> SchemaId {
21 super::current_registry().allocate_id()
22}
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct TypeSchema {
27 pub id: SchemaId,
29 pub name: String,
31 pub fields: Vec<FieldDef>,
33 pub(crate) field_map: HashMap<String, usize>,
35 pub data_size: usize,
37 pub component_types: Option<Vec<String>>,
40 pub(crate) field_sources: HashMap<String, String>,
42 pub enum_info: Option<EnumInfo>,
44 #[serde(skip)]
47 pub content_hash: Option<[u8; 32]>,
48}
49
50impl TypeSchema {
51 pub fn field_kind(&self, idx: usize) -> Option<shape_value::NativeKind> {
60 self.fields
61 .get(idx)
62 .and_then(|f| f.field_type.to_native_kind().ok())
63 }
64
65 pub fn new(name: impl Into<String>, field_defs: Vec<(String, FieldType)>) -> Self {
72 Self::with_id(allocate_current_id(), name, field_defs)
73 }
74
75 pub fn with_id(
81 id: SchemaId,
82 name: impl Into<String>,
83 field_defs: Vec<(String, FieldType)>,
84 ) -> Self {
85 let name = name.into();
86
87 let mut fields = Vec::with_capacity(field_defs.len());
88 let mut field_map = HashMap::with_capacity(field_defs.len());
89 let mut offset = 0;
90
91 for (index, (field_name, field_type)) in field_defs.into_iter().enumerate() {
92 let alignment = field_type.alignment();
94 offset = (offset + alignment - 1) & !(alignment - 1);
95
96 let field = FieldDef::new(&field_name, field_type.clone(), offset, index as u16);
97 field_map.insert(field_name, index);
98 offset += field_type.size();
99 fields.push(field);
100 }
101
102 let data_size = (offset + 7) & !7;
104
105 Self {
106 id,
107 name,
108 fields,
109 field_map,
110 data_size,
111 component_types: None,
112 field_sources: HashMap::new(),
113 enum_info: None,
114 content_hash: None,
115 }
116 }
117
118 pub fn get_field(&self, name: &str) -> Option<&FieldDef> {
120 self.field_map.get(name).map(|&idx| &self.fields[idx])
121 }
122
123 pub fn field_offset(&self, name: &str) -> Option<usize> {
125 self.get_field(name).map(|f| f.offset)
126 }
127
128 pub fn field_index(&self, name: &str) -> Option<u16> {
130 self.get_field(name).map(|f| f.index)
131 }
132
133 pub fn field_by_index(&self, index: u16) -> Option<&FieldDef> {
135 self.fields.get(index as usize)
136 }
137
138 pub fn field_count(&self) -> usize {
140 self.fields.len()
141 }
142
143 pub fn has_field(&self, name: &str) -> bool {
145 self.field_map.contains_key(name)
146 }
147
148 pub fn field_names(&self) -> impl Iterator<Item = &str> {
150 self.fields.iter().map(|f| f.name.as_str())
151 }
152
153 pub fn is_enum(&self) -> bool {
155 self.enum_info.is_some()
156 }
157
158 pub fn get_enum_info(&self) -> Option<&EnumInfo> {
160 self.enum_info.as_ref()
161 }
162
163 pub fn variant_id(&self, variant_name: &str) -> Option<u16> {
165 self.enum_info.as_ref()?.variant_id(variant_name)
166 }
167
168 pub fn new_enum(name: impl Into<String>, variants: Vec<EnumVariantInfo>) -> Self {
178 Self::new_enum_with_id(allocate_current_id(), name, variants)
179 }
180
181 pub fn new_enum_with_id(
183 id: SchemaId,
184 name: impl Into<String>,
185 variants: Vec<EnumVariantInfo>,
186 ) -> Self {
187 let name = name.into();
188 let enum_info = EnumInfo::new(variants);
189 let max_payload = enum_info.max_payload_fields();
190
191 let mut fields = Vec::with_capacity(1 + max_payload as usize);
193 let mut field_map = HashMap::with_capacity(1 + max_payload as usize);
194
195 fields.push(FieldDef::new("__variant", FieldType::I64, 0, 0));
197 field_map.insert("__variant".to_string(), 0);
198
199 for i in 0..max_payload {
201 let field_name = format!("__payload_{}", i);
202 let offset = 8 + (i as usize * 8);
203 fields.push(FieldDef::new(&field_name, FieldType::Any, offset, i + 1));
204 field_map.insert(field_name, i as usize + 1);
205 }
206
207 let data_size = 8 + (max_payload as usize * 8);
208
209 Self {
210 id,
211 name,
212 fields,
213 field_map,
214 data_size,
215 component_types: None,
216 field_sources: HashMap::new(),
217 enum_info: Some(enum_info),
218 content_hash: None,
219 }
220 }
221
222 pub fn compute_content_hash(&self) -> [u8; 32] {
232 let mut hasher = Sha256::new();
233
234 hasher.update(b"name:");
236 hasher.update(self.name.as_bytes());
237
238 let mut sorted_fields: Vec<&FieldDef> = self.fields.iter().collect();
240 sorted_fields.sort_by(|a, b| a.name.cmp(&b.name));
241
242 hasher.update(b"|fields:");
243 for field in &sorted_fields {
244 hasher.update(b"(");
245 hasher.update(field.name.as_bytes());
246 hasher.update(b":");
247 hasher.update(field.field_type.to_string().as_bytes());
248 hasher.update(b")");
249 }
250
251 if let Some(enum_info) = &self.enum_info {
253 let mut sorted_variants: Vec<&super::enum_support::EnumVariantInfo> =
254 enum_info.variants.iter().collect();
255 sorted_variants.sort_by(|a, b| a.name.cmp(&b.name));
256
257 hasher.update(b"|variants:");
258 for variant in &sorted_variants {
259 hasher.update(b"(");
260 hasher.update(variant.name.as_bytes());
261 hasher.update(b":");
262 hasher.update(variant.payload_fields.to_string().as_bytes());
263 hasher.update(b")");
264 }
265 }
266
267 let result = hasher.finalize();
268 let mut hash = [0u8; 32];
269 hash.copy_from_slice(&result);
270 hash
271 }
272
273 pub fn content_hash(&mut self) -> [u8; 32] {
275 if let Some(hash) = self.content_hash {
276 return hash;
277 }
278 let hash = self.compute_content_hash();
279 self.content_hash = Some(hash);
280 hash
281 }
282
283 pub fn bind_to_arrow_schema(
288 &self,
289 arrow_schema: &ArrowSchema,
290 ) -> Result<TypeBinding, TypeBindingError> {
291 let mut field_to_column = Vec::with_capacity(self.fields.len());
292
293 for field in &self.fields {
294 if field.name.starts_with("__") {
296 field_to_column.push(0); continue;
298 }
299
300 let col_name = field.wire_name();
301 let col_idx =
302 arrow_schema
303 .index_of(col_name)
304 .map_err(|_| TypeBindingError::MissingColumn {
305 field_name: col_name.to_string(),
306 type_name: self.name.clone(),
307 })?;
308
309 let arrow_field = &arrow_schema.fields()[col_idx];
310 if !is_compatible(&field.field_type, arrow_field.data_type()) {
311 return Err(TypeBindingError::TypeMismatch {
312 field_name: field.name.clone(),
313 expected: format!("{:?}", field.field_type),
314 actual: format!("{:?}", arrow_field.data_type()),
315 });
316 }
317
318 field_to_column.push(col_idx);
319 }
320
321 Ok(TypeBinding {
322 schema_name: self.name.clone(),
323 field_to_column,
324 })
325 }
326
327 pub fn from_canonical(canonical: &crate::type_system::environment::CanonicalType) -> Self {
332 let id = allocate_current_id();
333 let name = canonical.name.clone();
334
335 let mut fields = Vec::with_capacity(canonical.fields.len());
336 let mut field_map = HashMap::with_capacity(canonical.fields.len());
337
338 for (index, cf) in canonical.fields.iter().enumerate() {
339 let field_type = semantic_to_field_type(&cf.field_type, cf.optional);
341
342 let field = FieldDef::new(&cf.name, field_type, cf.offset, index as u16);
343 field_map.insert(cf.name.clone(), index);
344 fields.push(field);
345 }
346
347 Self {
348 id,
349 name,
350 fields,
351 field_map,
352 data_size: canonical.data_size,
353 component_types: None,
354 field_sources: HashMap::new(),
355 enum_info: None,
356 content_hash: None,
357 }
358 }
359}
360
361#[derive(Debug, Clone)]
366pub struct TypeBinding {
367 pub schema_name: String,
369 pub field_to_column: Vec<usize>,
371}
372
373impl TypeBinding {
374 pub fn column_index(&self, field_index: usize) -> Option<usize> {
376 self.field_to_column.get(field_index).copied()
377 }
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
382pub enum TypeBindingError {
383 #[error("Type '{type_name}' requires column '{field_name}' which is not in the DataTable")]
385 MissingColumn {
386 field_name: String,
387 type_name: String,
388 },
389 #[error("Column '{field_name}' has type {actual} but expected {expected}")]
391 TypeMismatch {
392 field_name: String,
393 expected: String,
394 actual: String,
395 },
396}
397
398fn is_compatible(field_type: &FieldType, arrow_type: &DataType) -> bool {
400 match (field_type, arrow_type) {
401 (FieldType::F64, DataType::Float64) => true,
402 (FieldType::F64, DataType::Float32) => true, (FieldType::F64, DataType::Int64) => true, (FieldType::I64, DataType::Int64) => true,
405 (FieldType::I64, DataType::Int32) => true, (FieldType::Bool, DataType::Boolean) => true,
407 (FieldType::String, DataType::Utf8) => true,
408 (FieldType::String, DataType::LargeUtf8) => true,
409 (FieldType::Timestamp, DataType::Timestamp(_, _)) => true,
410 (FieldType::Timestamp, DataType::Int64) => true, (FieldType::Decimal, DataType::Float64) => true, (FieldType::Decimal, DataType::Int64) => true, (FieldType::Any, _) => true, _ => false,
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_type_schema_creation() {
424 let schema = TypeSchema::new(
425 "TestType",
426 vec![
427 ("a".to_string(), FieldType::F64),
428 ("b".to_string(), FieldType::I64),
429 ("c".to_string(), FieldType::String),
430 ],
431 );
432
433 assert_eq!(schema.name, "TestType");
434 assert_eq!(schema.field_count(), 3);
435 assert_eq!(schema.data_size, 24); }
437
438 #[test]
439 fn test_field_offsets() {
440 let schema = TypeSchema::new(
441 "OffsetTest",
442 vec![
443 ("first".to_string(), FieldType::F64),
444 ("second".to_string(), FieldType::I64),
445 ("third".to_string(), FieldType::Bool),
446 ],
447 );
448
449 assert_eq!(schema.field_offset("first"), Some(0));
450 assert_eq!(schema.field_offset("second"), Some(8));
451 assert_eq!(schema.field_offset("third"), Some(16));
452 assert_eq!(schema.field_offset("nonexistent"), None);
453 }
454
455 #[test]
456 fn test_field_index() {
457 let schema = TypeSchema::new(
458 "IndexTest",
459 vec![
460 ("a".to_string(), FieldType::F64),
461 ("b".to_string(), FieldType::F64),
462 ("c".to_string(), FieldType::F64),
463 ],
464 );
465
466 assert_eq!(schema.field_index("a"), Some(0));
467 assert_eq!(schema.field_index("b"), Some(1));
468 assert_eq!(schema.field_index("c"), Some(2));
469 }
470
471 #[test]
472 fn test_unique_schema_ids() {
473 let schema1 = TypeSchema::new("Type1", vec![]);
474 let schema2 = TypeSchema::new("Type2", vec![]);
475 let schema3 = TypeSchema::new("Type3", vec![]);
476
477 assert_ne!(schema1.id, schema2.id);
479 assert_ne!(schema2.id, schema3.id);
480 assert_ne!(schema1.id, schema3.id);
481 }
482
483 #[test]
488 fn test_enum_schema_creation() {
489 let schema = TypeSchema::new_enum(
490 "Option",
491 vec![
492 EnumVariantInfo::new("Some", 0, 1),
493 EnumVariantInfo::new("None", 1, 0),
494 ],
495 );
496
497 assert_eq!(schema.name, "Option");
498 assert!(schema.is_enum());
499
500 let enum_info = schema.get_enum_info().unwrap();
502 assert_eq!(enum_info.variants.len(), 2);
503 assert_eq!(enum_info.variant_id("Some"), Some(0));
504 assert_eq!(enum_info.variant_id("None"), Some(1));
505 assert_eq!(enum_info.max_payload_fields(), 1);
506 }
507
508 #[test]
509 fn test_enum_schema_layout() {
510 let schema = TypeSchema::new_enum(
511 "Result",
512 vec![
513 EnumVariantInfo::new("Ok", 0, 1),
514 EnumVariantInfo::new("Err", 1, 1),
515 ],
516 );
517
518 assert_eq!(schema.data_size, 16);
520 assert_eq!(schema.field_count(), 2);
521
522 assert_eq!(schema.field_offset("__variant"), Some(0));
524 assert_eq!(schema.field_offset("__payload_0"), Some(8));
525 }
526
527 #[test]
528 fn test_enum_schema_multiple_payloads() {
529 let schema = TypeSchema::new_enum(
531 "Shape",
532 vec![
533 EnumVariantInfo::new("Circle", 0, 1), EnumVariantInfo::new("Rectangle", 1, 2), EnumVariantInfo::new("Point", 2, 0), ],
537 );
538
539 assert_eq!(schema.data_size, 24);
542 assert_eq!(schema.field_count(), 3);
543
544 assert_eq!(schema.field_offset("__variant"), Some(0));
545 assert_eq!(schema.field_offset("__payload_0"), Some(8));
546 assert_eq!(schema.field_offset("__payload_1"), Some(16));
547 }
548
549 #[test]
550 fn test_enum_variant_lookup() {
551 let schema = TypeSchema::new_enum(
552 "Status",
553 vec![
554 EnumVariantInfo::new("Pending", 0, 0),
555 EnumVariantInfo::new("Running", 1, 1),
556 EnumVariantInfo::new("Complete", 2, 1),
557 EnumVariantInfo::new("Failed", 3, 1),
558 ],
559 );
560
561 let enum_info = schema.get_enum_info().unwrap();
562
563 let running = enum_info.variant_by_id(1).unwrap();
565 assert_eq!(running.name, "Running");
566 assert_eq!(running.payload_fields, 1);
567
568 let complete = enum_info.variant_by_name("Complete").unwrap();
570 assert_eq!(complete.id, 2);
571
572 assert!(enum_info.variant_by_id(99).is_none());
574 assert!(enum_info.variant_by_name("Unknown").is_none());
575 }
576
577 #[test]
582 fn test_bind_to_arrow_schema_success() {
583 use arrow_schema::{Field, Schema as ArrowSchema};
584
585 let type_schema = TypeSchema::new(
586 "Candle",
587 vec![
588 ("open".to_string(), FieldType::F64),
589 ("close".to_string(), FieldType::F64),
590 ("volume".to_string(), FieldType::I64),
591 ],
592 );
593
594 let arrow_schema = ArrowSchema::new(vec![
595 Field::new("date", DataType::Utf8, false),
596 Field::new("open", DataType::Float64, false),
597 Field::new("close", DataType::Float64, false),
598 Field::new("volume", DataType::Int64, false),
599 ]);
600
601 let binding = type_schema.bind_to_arrow_schema(&arrow_schema).unwrap();
602 assert_eq!(binding.schema_name, "Candle");
603 assert_eq!(binding.column_index(0), Some(1));
605 assert_eq!(binding.column_index(1), Some(2));
607 assert_eq!(binding.column_index(2), Some(3));
609 }
610
611 #[test]
612 fn test_bind_missing_column() {
613 use arrow_schema::{Field, Schema as ArrowSchema};
614
615 let type_schema = TypeSchema::new(
616 "Candle",
617 vec![
618 ("open".to_string(), FieldType::F64),
619 ("missing_field".to_string(), FieldType::F64),
620 ],
621 );
622
623 let arrow_schema = ArrowSchema::new(vec![Field::new("open", DataType::Float64, false)]);
624
625 let err = type_schema.bind_to_arrow_schema(&arrow_schema).unwrap_err();
626 assert!(matches!(err, TypeBindingError::MissingColumn { .. }));
627 }
628
629 #[test]
630 fn test_bind_type_mismatch() {
631 use arrow_schema::{Field, Schema as ArrowSchema};
632
633 let type_schema = TypeSchema::new("Test", vec![("name".to_string(), FieldType::F64)]);
634
635 let arrow_schema = ArrowSchema::new(vec![
636 Field::new("name", DataType::Utf8, false), ]);
638
639 let err = type_schema.bind_to_arrow_schema(&arrow_schema).unwrap_err();
640 assert!(matches!(err, TypeBindingError::TypeMismatch { .. }));
641 }
642
643 #[test]
649 fn test_schema_construction_with_hashmap_set_fields() {
650 let schema = TypeSchema::new(
651 "ContainerHolder",
652 vec![
653 (
654 "by_name".to_string(),
655 FieldType::HashMap {
656 key: Box::new(FieldType::String),
657 value: Box::new(FieldType::I64),
658 },
659 ),
660 ("tags".to_string(), FieldType::Set(Box::new(FieldType::String))),
661 ],
662 );
663 assert_eq!(schema.data_size, 16);
665 assert_eq!(schema.field_offset("by_name"), Some(0));
666 assert_eq!(schema.field_offset("tags"), Some(8));
667 assert_eq!(schema.field_kind(0), None);
670 assert_eq!(schema.field_kind(1), None);
671 }
672
673 #[test]
674 fn test_bind_compatible_types() {
675 use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
676
677 let type_schema = TypeSchema::new(
679 "Wide",
680 vec![
681 ("f32_as_f64".to_string(), FieldType::F64),
682 ("i32_as_i64".to_string(), FieldType::I64),
683 ("ts".to_string(), FieldType::Timestamp),
684 ("any_field".to_string(), FieldType::Any),
685 ],
686 );
687
688 let arrow_schema = ArrowSchema::new(vec![
689 Field::new("f32_as_f64", DataType::Float32, false),
690 Field::new("i32_as_i64", DataType::Int32, false),
691 Field::new(
692 "ts",
693 DataType::Timestamp(TimeUnit::Microsecond, None),
694 false,
695 ),
696 Field::new("any_field", DataType::Boolean, false),
697 ]);
698
699 let binding = type_schema.bind_to_arrow_schema(&arrow_schema);
700 assert!(binding.is_ok());
701 }
702}