1use std::sync::Arc;
2
3use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit};
4use serde_json::{Map, Value};
5
6use crate::dataset::error::DatasetError;
7use crate::dataset::types::{DatasetFingerprint, DatasetNamespace};
8
9pub const SCOUTER_CREATED_AT: &str = "scouter_created_at";
10pub const SCOUTER_PARTITION_DATE: &str = "scouter_partition_date";
11pub const SCOUTER_BATCH_ID: &str = "scouter_batch_id";
12
13const MAX_SCHEMA_DEPTH: usize = 32;
14
15pub fn json_schema_to_arrow(json_schema: &str) -> Result<Schema, DatasetError> {
27 let root: Value = serde_json::from_str(json_schema)?;
28
29 let obj = root.as_object().ok_or_else(|| {
30 DatasetError::SchemaParseError("JSON Schema root must be an object".to_string())
31 })?;
32
33 let defs = obj
34 .get("$defs")
35 .and_then(Value::as_object)
36 .cloned()
37 .unwrap_or_default();
38
39 let properties = obj
40 .get("properties")
41 .and_then(Value::as_object)
42 .ok_or_else(|| {
43 DatasetError::SchemaParseError(
44 "JSON Schema must have a 'properties' key at the root".to_string(),
45 )
46 })?;
47
48 let required: std::collections::HashSet<&str> = obj
49 .get("required")
50 .and_then(Value::as_array)
51 .map(|arr| arr.iter().filter_map(Value::as_str).collect())
52 .unwrap_or_default();
53
54 let mut fields = Vec::with_capacity(properties.len());
55 for (name, prop) in properties {
56 let nullable = !required.contains(name.as_str());
57 let (dtype, is_nullable) = resolve_type(prop, &defs, nullable, 0)?;
58 fields.push(Field::new(name, dtype, is_nullable));
59 }
60
61 Ok(Schema::new(fields))
62}
63
64pub fn inject_system_columns(schema: Schema) -> Result<Schema, DatasetError> {
69 for col_name in [SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE, SCOUTER_BATCH_ID] {
70 if schema.index_of(col_name).is_ok() {
71 return Err(DatasetError::SchemaParseError(format!(
72 "User schema must not contain reserved column '{col_name}'"
73 )));
74 }
75 }
76 let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
77 fields.push(Field::new(
78 SCOUTER_CREATED_AT,
79 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
80 false,
81 ));
82 fields.push(Field::new(SCOUTER_PARTITION_DATE, DataType::Date32, false));
83 fields.push(Field::new(SCOUTER_BATCH_ID, DataType::Utf8, false));
84 Ok(Schema::new(fields))
85}
86
87pub fn schema_fingerprint(schema: &Schema) -> Result<DatasetFingerprint, DatasetError> {
92 let canonical = canonical_schema_repr(schema);
93 Ok(DatasetFingerprint::from_schema_json(&canonical))
94}
95
96fn canonical_type_repr(dt: &DataType) -> String {
97 match dt {
98 DataType::Struct(fields) => {
99 let mut sub: Vec<String> = fields
100 .iter()
101 .map(|f| {
102 format!(
103 "{}:{}:{}",
104 f.name(),
105 canonical_type_repr(f.data_type()),
106 f.is_nullable()
107 )
108 })
109 .collect();
110 sub.sort();
111 format!("Struct({})", sub.join(","))
112 }
113 DataType::List(field) => {
114 format!(
115 "List({}:{}:{})",
116 field.name(),
117 canonical_type_repr(field.data_type()),
118 field.is_nullable()
119 )
120 }
121 other => format!("{other}"),
122 }
123}
124
125fn canonical_schema_repr(schema: &Schema) -> String {
126 let mut fields: Vec<String> = schema
127 .fields()
128 .iter()
129 .map(|f| {
130 format!(
131 "{}:{}:{}",
132 f.name(),
133 canonical_type_repr(f.data_type()),
134 f.is_nullable()
135 )
136 })
137 .collect();
138 fields.sort();
139 fields.join("|")
140}
141
142fn is_null_variant(v: &Value) -> bool {
148 if v.get("type").and_then(Value::as_str) == Some("null") {
149 return true;
150 }
151 if v.get("const").map(Value::is_null).unwrap_or(false) {
152 return true;
153 }
154 if let Some(arr) = v.get("enum").and_then(Value::as_array) {
155 if arr.len() == 1 && arr[0].is_null() {
156 return true;
157 }
158 }
159 false
160}
161
162fn resolve_type(
164 prop: &Value,
165 defs: &Map<String, Value>,
166 nullable: bool,
167 depth: usize,
168) -> Result<(DataType, bool), DatasetError> {
169 if depth >= MAX_SCHEMA_DEPTH {
170 return Err(DatasetError::SchemaParseError(format!(
171 "Schema nesting exceeds maximum depth of {MAX_SCHEMA_DEPTH}"
172 )));
173 }
174
175 let obj = match prop.as_object() {
176 Some(o) => o,
177 None => {
178 return Err(DatasetError::SchemaParseError(
179 "Property must be a JSON object".to_string(),
180 ))
181 }
182 };
183
184 if let Some(ref_val) = obj.get("$ref").and_then(Value::as_str) {
186 return resolve_ref(ref_val, defs, nullable, depth + 1);
187 }
188
189 if let Some(any_of) = obj.get("anyOf").and_then(Value::as_array) {
191 return resolve_any_of(any_of, defs, depth + 1);
192 }
193
194 if obj.contains_key("enum") {
196 return Ok((
197 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
198 nullable,
199 ));
200 }
201
202 let type_str = obj
203 .get("type")
204 .and_then(Value::as_str)
205 .ok_or_else(|| DatasetError::UnsupportedType(format!("No 'type' in: {prop}")))?;
206
207 match type_str {
208 "integer" => Ok((DataType::Int64, nullable)),
209 "number" => Ok((DataType::Float64, nullable)),
210 "boolean" => Ok((DataType::Boolean, nullable)),
211 "string" => {
212 let format = obj.get("format").and_then(Value::as_str);
213 match format {
214 Some("date-time") => Ok((
215 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
216 nullable,
217 )),
218 Some("date") => Ok((DataType::Date32, nullable)),
219 _ => Ok((DataType::Utf8View, nullable)),
220 }
221 }
222 "array" => {
223 let items = obj.get("items").ok_or_else(|| {
224 DatasetError::SchemaParseError("Array missing 'items'".to_string())
225 })?;
226 let (item_type, item_nullable) = resolve_type(items, defs, true, depth + 1)?;
227 let item_field = Arc::new(Field::new("item", item_type, item_nullable));
228 Ok((DataType::List(item_field), nullable))
229 }
230 "object" => {
231 let props = obj
232 .get("properties")
233 .and_then(Value::as_object)
234 .ok_or_else(|| {
235 DatasetError::UnsupportedType(
236 "Free-form dict (object without 'properties') is not yet supported"
237 .to_string(),
238 )
239 })?;
240 let required: std::collections::HashSet<&str> = obj
241 .get("required")
242 .and_then(Value::as_array)
243 .map(|arr| arr.iter().filter_map(Value::as_str).collect())
244 .unwrap_or_default();
245 let mut struct_fields = Vec::with_capacity(props.len());
246 for (name, sub_prop) in props {
247 let field_nullable = !required.contains(name.as_str());
248 let (dtype, is_nullable) = resolve_type(sub_prop, defs, field_nullable, depth + 1)?;
249 struct_fields.push(Arc::new(Field::new(name, dtype, is_nullable)));
250 }
251 Ok((DataType::Struct(Fields::from(struct_fields)), nullable))
252 }
253 "null" => Ok((DataType::Null, true)),
254 other => Err(DatasetError::UnsupportedType(other.to_string())),
255 }
256}
257
258fn resolve_ref(
264 ref_val: &str,
265 defs: &Map<String, Value>,
266 nullable: bool,
267 depth: usize,
268) -> Result<(DataType, bool), DatasetError> {
269 if depth >= MAX_SCHEMA_DEPTH {
270 return Err(DatasetError::SchemaParseError(format!(
271 "Schema nesting exceeds maximum depth of {MAX_SCHEMA_DEPTH}"
272 )));
273 }
274
275 let def_name = ref_val.strip_prefix("#/$defs/").ok_or_else(|| {
276 DatasetError::RefResolutionError(format!("Unrecognized $ref format: {ref_val}"))
277 })?;
278
279 let def = defs.get(def_name).ok_or_else(|| {
280 DatasetError::RefResolutionError(format!("$defs entry not found: {def_name}"))
281 })?;
282
283 let def_obj = def.as_object().ok_or_else(|| {
284 DatasetError::RefResolutionError(format!("$defs entry '{def_name}' is not an object"))
285 })?;
286
287 if let Some(props) = def_obj.get("properties").and_then(Value::as_object) {
289 let required: std::collections::HashSet<&str> = def_obj
290 .get("required")
291 .and_then(Value::as_array)
292 .map(|arr| arr.iter().filter_map(Value::as_str).collect())
293 .unwrap_or_default();
294
295 let mut struct_fields = Vec::with_capacity(props.len());
296 for (name, sub_prop) in props {
297 let field_nullable = !required.contains(name.as_str());
298 let (dtype, is_nullable) = resolve_type(sub_prop, defs, field_nullable, depth + 1)?;
299 struct_fields.push(Arc::new(Field::new(name, dtype, is_nullable)));
300 }
301 return Ok((DataType::Struct(Fields::from(struct_fields)), nullable));
302 }
303
304 resolve_type(def, defs, nullable, depth + 1)
306}
307
308fn resolve_any_of(
311 variants: &[Value],
312 defs: &Map<String, Value>,
313 depth: usize,
314) -> Result<(DataType, bool), DatasetError> {
315 let non_null: Vec<&Value> = variants.iter().filter(|v| !is_null_variant(v)).collect();
316
317 if non_null.len() == 1 {
318 let (dtype, _) = resolve_type(non_null[0], defs, true, depth)?;
319 return Ok((dtype, true));
320 }
321
322 Err(DatasetError::UnsupportedType(
324 "anyOf with multiple non-null variants is not supported".to_string(),
325 ))
326}
327
328pub fn fingerprint_from_json_schema(json_schema: &str) -> Result<DatasetFingerprint, DatasetError> {
331 let schema = json_schema_to_arrow(json_schema)?;
332 let schema_with_sys = inject_system_columns(schema)?;
333 schema_fingerprint(&schema_with_sys)
334}
335
336#[allow(dead_code)]
339pub(crate) fn build_registration(
340 json_schema: &str,
341 _namespace: &DatasetNamespace,
342 _partition_columns: &[String],
343) -> Result<(Schema, DatasetFingerprint), DatasetError> {
344 let schema = json_schema_to_arrow(json_schema)?;
345 let schema_with_sys = inject_system_columns(schema)?;
346 let fingerprint = schema_fingerprint(&schema_with_sys)?;
347 Ok((schema_with_sys, fingerprint))
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 fn flat_schema_json() -> &'static str {
355 r#"{
356 "type": "object",
357 "title": "UserEvent",
358 "properties": {
359 "user_id": {"type": "string"},
360 "event_type": {"type": "string"},
361 "value": {"type": "number"},
362 "count": {"type": "integer"},
363 "active": {"type": "boolean"},
364 "score": {"type": "number"}
365 },
366 "required": ["user_id", "event_type", "value", "count", "active"]
367 }"#
368 }
369
370 fn optional_schema_json() -> &'static str {
371 r#"{
372 "type": "object",
373 "title": "OptionalModel",
374 "properties": {
375 "name": {"type": "string"},
376 "age": {"anyOf": [{"type": "integer"}, {"type": "null"}]},
377 "score": {"anyOf": [{"type": "number"}, {"type": "null"}]}
378 },
379 "required": ["name"]
380 }"#
381 }
382
383 fn nested_schema_json() -> &'static str {
384 r##"{
385 "type": "object",
386 "title": "Order",
387 "properties": {
388 "order_id": {"type": "string"},
389 "address": {"$ref": "#/$defs/Address"}
390 },
391 "required": ["order_id", "address"],
392 "$defs": {
393 "Address": {
394 "type": "object",
395 "properties": {
396 "street": {"type": "string"},
397 "city": {"type": "string"},
398 "zip": {"type": "string"}
399 },
400 "required": ["street", "city", "zip"]
401 }
402 }
403 }"##
404 }
405
406 fn datetime_schema_json() -> &'static str {
407 r#"{
408 "type": "object",
409 "title": "Event",
410 "properties": {
411 "created_at": {"type": "string", "format": "date-time"},
412 "event_date": {"type": "string", "format": "date"},
413 "label": {"type": "string"}
414 },
415 "required": ["created_at", "event_date", "label"]
416 }"#
417 }
418
419 fn list_schema_json() -> &'static str {
420 r#"{
421 "type": "object",
422 "title": "BatchPrediction",
423 "properties": {
424 "model_id": {"type": "string"},
425 "scores": {"type": "array", "items": {"type": "number"}}
426 },
427 "required": ["model_id", "scores"]
428 }"#
429 }
430
431 fn enum_schema_json() -> &'static str {
432 r#"{
433 "type": "object",
434 "title": "Status",
435 "properties": {
436 "status": {"enum": ["active", "inactive", "pending"]},
437 "name": {"type": "string"}
438 },
439 "required": ["status", "name"]
440 }"#
441 }
442
443 fn list_of_nested_schema_json() -> &'static str {
444 r##"{
445 "type": "object",
446 "title": "Report",
447 "properties": {
448 "report_id": {"type": "string"},
449 "items": {
450 "type": "array",
451 "items": {"$ref": "#/$defs/ReportItem"}
452 }
453 },
454 "required": ["report_id", "items"],
455 "$defs": {
456 "ReportItem": {
457 "type": "object",
458 "properties": {
459 "label": {"type": "string"},
460 "value": {"type": "number"}
461 },
462 "required": ["label", "value"]
463 }
464 }
465 }"##
466 }
467
468 #[test]
469 fn test_flat_schema() {
470 let schema = json_schema_to_arrow(flat_schema_json()).unwrap();
471 assert_eq!(schema.fields().len(), 6);
472
473 let user_id = schema.field_with_name("user_id").unwrap();
474 assert_eq!(user_id.data_type(), &DataType::Utf8View);
475 assert!(!user_id.is_nullable());
476
477 let score = schema.field_with_name("score").unwrap();
479 assert!(score.is_nullable());
480
481 let value = schema.field_with_name("value").unwrap();
482 assert_eq!(value.data_type(), &DataType::Float64);
483
484 let count = schema.field_with_name("count").unwrap();
485 assert_eq!(count.data_type(), &DataType::Int64);
486
487 let active = schema.field_with_name("active").unwrap();
488 assert_eq!(active.data_type(), &DataType::Boolean);
489 }
490
491 #[test]
492 fn test_optional_fields() {
493 let schema = json_schema_to_arrow(optional_schema_json()).unwrap();
494
495 let name = schema.field_with_name("name").unwrap();
496 assert!(!name.is_nullable());
497 assert_eq!(name.data_type(), &DataType::Utf8View);
498
499 let age = schema.field_with_name("age").unwrap();
500 assert!(age.is_nullable());
501 assert_eq!(age.data_type(), &DataType::Int64);
502
503 let score = schema.field_with_name("score").unwrap();
504 assert!(score.is_nullable());
505 assert_eq!(score.data_type(), &DataType::Float64);
506 }
507
508 #[test]
509 fn test_nested_struct() {
510 let schema = json_schema_to_arrow(nested_schema_json()).unwrap();
511
512 let address = schema.field_with_name("address").unwrap();
513 assert!(!address.is_nullable());
514 assert!(matches!(address.data_type(), DataType::Struct(_)));
515
516 if let DataType::Struct(fields) = address.data_type() {
517 assert_eq!(fields.len(), 3);
518 let street = fields.find("street").map(|(_, f)| f.clone());
519 assert!(street.is_some());
520 assert_eq!(street.unwrap().data_type(), &DataType::Utf8View);
521 }
522 }
523
524 #[test]
525 fn test_datetime_formats() {
526 let schema = json_schema_to_arrow(datetime_schema_json()).unwrap();
527
528 let created = schema.field_with_name("created_at").unwrap();
529 assert!(matches!(
530 created.data_type(),
531 DataType::Timestamp(TimeUnit::Microsecond, _)
532 ));
533
534 let date = schema.field_with_name("event_date").unwrap();
535 assert_eq!(date.data_type(), &DataType::Date32);
536 }
537
538 #[test]
539 fn test_list_type() {
540 let schema = json_schema_to_arrow(list_schema_json()).unwrap();
541
542 let scores = schema.field_with_name("scores").unwrap();
543 assert!(matches!(scores.data_type(), DataType::List(_)));
544 if let DataType::List(item) = scores.data_type() {
545 assert_eq!(item.data_type(), &DataType::Float64);
546 }
547 }
548
549 #[test]
550 fn test_enum_type() {
551 let schema = json_schema_to_arrow(enum_schema_json()).unwrap();
552
553 let status = schema.field_with_name("status").unwrap();
554 assert!(matches!(status.data_type(), DataType::Dictionary(_, _)));
555 }
556
557 #[test]
558 fn test_list_of_nested() {
559 let schema = json_schema_to_arrow(list_of_nested_schema_json()).unwrap();
560
561 let items = schema.field_with_name("items").unwrap();
562 assert!(matches!(items.data_type(), DataType::List(_)));
563 if let DataType::List(item_field) = items.data_type() {
564 assert!(matches!(item_field.data_type(), DataType::Struct(_)));
565 }
566 }
567
568 #[test]
569 fn test_system_columns_injected() {
570 let schema = json_schema_to_arrow(flat_schema_json()).unwrap();
571 let schema = inject_system_columns(schema).unwrap();
572
573 let created = schema.field_with_name(SCOUTER_CREATED_AT).unwrap();
574 assert!(matches!(
575 created.data_type(),
576 DataType::Timestamp(TimeUnit::Microsecond, _)
577 ));
578 assert!(!created.is_nullable());
579
580 let partition_date = schema.field_with_name(SCOUTER_PARTITION_DATE).unwrap();
581 assert_eq!(partition_date.data_type(), &DataType::Date32);
582 assert!(!partition_date.is_nullable());
583
584 let batch_id = schema.field_with_name(SCOUTER_BATCH_ID).unwrap();
585 assert_eq!(batch_id.data_type(), &DataType::Utf8);
586 assert!(!batch_id.is_nullable());
587 }
588
589 #[test]
590 fn test_reserved_column_collision_error() {
591 let bad = r#"{
592 "type": "object",
593 "properties": {
594 "scouter_created_at": {"type": "string"}
595 },
596 "required": ["scouter_created_at"]
597 }"#;
598 let schema = json_schema_to_arrow(bad).unwrap();
599 let err = inject_system_columns(schema).unwrap_err();
600 assert!(matches!(err, DatasetError::SchemaParseError(_)));
601 assert!(err.to_string().contains("reserved"));
602 }
603
604 #[test]
605 fn test_fingerprint_stability() {
606 let fp1 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
607 let fp2 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
608 assert_eq!(fp1, fp2);
609 }
610
611 #[test]
612 fn test_fingerprint_changes_on_field_add() {
613 let fp1 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
614
615 let modified = r#"{
616 "type": "object",
617 "title": "UserEvent",
618 "properties": {
619 "user_id": {"type": "string"},
620 "event_type": {"type": "string"},
621 "value": {"type": "number"},
622 "count": {"type": "integer"},
623 "active": {"type": "boolean"},
624 "score": {"type": "number"},
625 "new_field": {"type": "string"}
626 },
627 "required": ["user_id", "event_type", "value", "count", "active"]
628 }"#;
629 let fp2 = fingerprint_from_json_schema(modified).unwrap();
630 assert_ne!(fp1, fp2);
631 }
632
633 #[test]
634 fn test_fingerprint_is_32_chars() {
635 let fp = fingerprint_from_json_schema(flat_schema_json()).unwrap();
636 assert_eq!(fp.as_str().len(), 32);
637 }
638
639 #[test]
640 fn test_fingerprint_field_order_independent() {
641 let schema_a = r#"{
643 "type": "object",
644 "properties": {
645 "alpha": {"type": "string"},
646 "beta": {"type": "integer"}
647 },
648 "required": ["alpha", "beta"]
649 }"#;
650 let schema_b = r#"{
651 "type": "object",
652 "properties": {
653 "beta": {"type": "integer"},
654 "alpha": {"type": "string"}
655 },
656 "required": ["alpha", "beta"]
657 }"#;
658 let fp_a = fingerprint_from_json_schema(schema_a).unwrap();
659 let fp_b = fingerprint_from_json_schema(schema_b).unwrap();
660 assert_eq!(fp_a, fp_b);
661 }
662
663 #[test]
664 fn test_unsupported_type_error() {
665 let bad = r#"{
666 "type": "object",
667 "properties": {
668 "field": {"type": "unknown_type"}
669 },
670 "required": ["field"]
671 }"#;
672 let err = json_schema_to_arrow(bad).unwrap_err();
673 assert!(matches!(err, DatasetError::UnsupportedType(_)));
674 }
675
676 #[test]
677 fn test_missing_ref_error() {
678 let bad = r##"{
679 "type": "object",
680 "properties": {
681 "nested": {"$ref": "#/$defs/NonExistent"}
682 },
683 "required": ["nested"]
684 }"##;
685 let err = json_schema_to_arrow(bad).unwrap_err();
686 assert!(matches!(err, DatasetError::RefResolutionError(_)));
687 }
688
689 #[test]
690 fn test_missing_properties_key_error() {
691 let bad = r#"{"type": "object"}"#;
692 let err = json_schema_to_arrow(bad).unwrap_err();
693 assert!(matches!(err, DatasetError::SchemaParseError(_)));
694 }
695
696 #[test]
697 fn test_bad_ref_format_error() {
698 let bad = r##"{
699 "type": "object",
700 "properties": {
701 "x": {"$ref": "definitions/Foo"}
702 },
703 "required": ["x"]
704 }"##;
705 let err = json_schema_to_arrow(bad).unwrap_err();
706 assert!(matches!(err, DatasetError::RefResolutionError(_)));
707 }
708
709 #[test]
710 fn test_property_not_object_error() {
711 let bad = r#"{
712 "type": "object",
713 "properties": {
714 "x": true
715 },
716 "required": ["x"]
717 }"#;
718 let err = json_schema_to_arrow(bad).unwrap_err();
719 assert!(matches!(err, DatasetError::SchemaParseError(_)));
720 }
721
722 #[test]
723 fn test_any_of_multiple_non_null_variants_error() {
724 let bad = r#"{
725 "type": "object",
726 "properties": {
727 "x": {"anyOf": [{"type": "integer"}, {"type": "string"}]}
728 },
729 "required": ["x"]
730 }"#;
731 let err = json_schema_to_arrow(bad).unwrap_err();
732 assert!(matches!(err, DatasetError::UnsupportedType(_)));
733 }
734
735 #[test]
736 fn test_any_of_null_enum_encoding() {
737 let schema = r#"{
739 "type": "object",
740 "properties": {
741 "x": {"anyOf": [{"type": "integer"}, {"enum": [null]}]}
742 },
743 "required": []
744 }"#;
745 let result = json_schema_to_arrow(schema);
746 assert!(result.is_ok());
747 let field = result.unwrap();
748 let x = field.field_with_name("x").unwrap();
749 assert!(x.is_nullable());
750 assert_eq!(x.data_type(), &DataType::Int64);
751 }
752
753 #[test]
754 fn test_any_of_const_null_encoding() {
755 let schema = r#"{
757 "type": "object",
758 "properties": {
759 "x": {"anyOf": [{"type": "string"}, {"const": null}]}
760 },
761 "required": []
762 }"#;
763 let result = json_schema_to_arrow(schema);
764 assert!(result.is_ok());
765 let field = result.unwrap();
766 let x = field.field_with_name("x").unwrap();
767 assert!(x.is_nullable());
768 assert_eq!(x.data_type(), &DataType::Utf8View);
769 }
770
771 #[test]
772 fn test_free_form_dict_is_unsupported_type() {
773 let bad = r#"{
774 "type": "object",
775 "properties": {
776 "x": {"type": "object"}
777 },
778 "required": ["x"]
779 }"#;
780 let err = json_schema_to_arrow(bad).unwrap_err();
781 assert!(matches!(err, DatasetError::UnsupportedType(_)));
782 }
783
784 #[test]
785 fn test_build_registration_includes_sys_cols() {
786 use crate::dataset::types::DatasetNamespace;
787 let ns = DatasetNamespace::new("cat", "sch", "tbl").unwrap();
788 let (schema, fingerprint) = build_registration(flat_schema_json(), &ns, &[]).unwrap();
789 assert!(schema.index_of(SCOUTER_CREATED_AT).is_ok());
790 assert!(schema.index_of(SCOUTER_PARTITION_DATE).is_ok());
791 assert!(schema.index_of(SCOUTER_BATCH_ID).is_ok());
792 assert_eq!(fingerprint.as_str().len(), 32);
793 }
794
795 #[test]
796 fn test_max_depth_exceeded() {
797 let mut inner = r#"{"type": "string"}"#.to_string();
802 for _ in 0..MAX_SCHEMA_DEPTH {
803 inner = format!(
804 r#"{{"type": "object", "properties": {{"x": {inner}}}, "required": ["x"]}}"#
805 );
806 }
807 let schema = format!(
808 r#"{{"type": "object", "properties": {{"root": {inner}}}, "required": ["root"]}}"#
809 );
810 let err = json_schema_to_arrow(&schema).unwrap_err();
811 assert!(matches!(err, DatasetError::SchemaParseError(_)));
812 assert!(err.to_string().contains("depth"));
813 }
814
815 #[test]
829 fn test_client_server_fingerprint_contract() {
830 let pydantic_json = flat_schema_json();
831
832 let client_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
834
835 let server_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
837
838 assert_eq!(
839 client_fp, server_fp,
840 "client and server fingerprints must agree; \
841 both must call fingerprint_from_json_schema with the original Pydantic JSON schema"
842 );
843
844 let arrow_schema = json_schema_to_arrow(pydantic_json).unwrap();
846 let schema_with_sys = inject_system_columns(arrow_schema).unwrap();
847 assert!(schema_with_sys
848 .field_with_name(SCOUTER_PARTITION_DATE)
849 .is_ok());
850 assert!(schema_with_sys.field_with_name(SCOUTER_CREATED_AT).is_ok());
851 assert!(schema_with_sys.field_with_name(SCOUTER_BATCH_ID).is_ok());
852 }
853
854 #[test]
858 fn test_fingerprint_differs_without_system_columns() {
859 let pydantic_json = flat_schema_json();
860
861 let client_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
863
864 let arrow_schema_no_sys = json_schema_to_arrow(pydantic_json).unwrap();
866 let old_server_fp =
867 DatasetFingerprint::from_schema_json(&canonical_schema_repr(&arrow_schema_no_sys));
868
869 assert_ne!(
870 client_fp, old_server_fp,
871 "omitting inject_system_columns changes the fingerprint — \
872 this was the pre-fix bug that caused FingerprintMismatch in DatasetClient::new()"
873 );
874 }
875}