Skip to main content

scouter_types/dataset/
schema.rs

1use std::sync::Arc;
2
3use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit};
4use serde_json::{Map, Value};
5
6use crate::dataset::error::DatasetError;
7use crate::dataset::types::{DatasetFingerprint, DatasetNamespace};
8
9pub const SCOUTER_CREATED_AT: &str = "scouter_created_at";
10pub const SCOUTER_PARTITION_DATE: &str = "scouter_partition_date";
11pub const SCOUTER_BATCH_ID: &str = "scouter_batch_id";
12
13const MAX_SCHEMA_DEPTH: usize = 32;
14
15/// Convert a Pydantic-generated JSON Schema string into an Arrow `Schema`.
16///
17/// Handles:
18/// - Scalar types: integer, number, string (with date/date-time format), boolean
19/// - Optional[T] via `anyOf: [{T}, {type: "null"}]`
20/// - Nested models via `$ref` → `$defs` resolution
21/// - List[T] via `type: "array"`
22/// - Enum/Literal via `enum` key → Dictionary(Int16, Utf8)
23///
24/// System columns (`scouter_created_at`, `scouter_partition_date`, `scouter_batch_id`)
25/// are NOT injected here — call `inject_system_columns()` after.
26pub fn json_schema_to_arrow(json_schema: &str) -> Result<Schema, DatasetError> {
27    let root: Value = serde_json::from_str(json_schema)?;
28
29    let obj = root.as_object().ok_or_else(|| {
30        DatasetError::SchemaParseError("JSON Schema root must be an object".to_string())
31    })?;
32
33    let defs = obj
34        .get("$defs")
35        .and_then(Value::as_object)
36        .cloned()
37        .unwrap_or_default();
38
39    let properties = obj
40        .get("properties")
41        .and_then(Value::as_object)
42        .ok_or_else(|| {
43            DatasetError::SchemaParseError(
44                "JSON Schema must have a 'properties' key at the root".to_string(),
45            )
46        })?;
47
48    let required: std::collections::HashSet<&str> = obj
49        .get("required")
50        .and_then(Value::as_array)
51        .map(|arr| arr.iter().filter_map(Value::as_str).collect())
52        .unwrap_or_default();
53
54    let mut fields = Vec::with_capacity(properties.len());
55    for (name, prop) in properties {
56        let nullable = !required.contains(name.as_str());
57        let (dtype, is_nullable) = resolve_type(prop, &defs, nullable, 0)?;
58        fields.push(Field::new(name, dtype, is_nullable));
59    }
60
61    Ok(Schema::new(fields))
62}
63
64/// Inject the three system columns at the end of a schema.
65/// These are always non-nullable and always appended in the same order.
66///
67/// Returns `Err` if the user schema already contains a reserved column name.
68pub fn inject_system_columns(schema: Schema) -> Result<Schema, DatasetError> {
69    for col_name in [SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE, SCOUTER_BATCH_ID] {
70        if schema.index_of(col_name).is_ok() {
71            return Err(DatasetError::SchemaParseError(format!(
72                "User schema must not contain reserved column '{col_name}'"
73            )));
74        }
75    }
76    let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
77    fields.push(Field::new(
78        SCOUTER_CREATED_AT,
79        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
80        false,
81    ));
82    fields.push(Field::new(SCOUTER_PARTITION_DATE, DataType::Date32, false));
83    fields.push(Field::new(SCOUTER_BATCH_ID, DataType::Utf8, false));
84    Ok(Schema::new(fields))
85}
86
87/// Compute a stable fingerprint from a schema.
88///
89/// Serializes the schema to a canonical string (sorted field names + type + nullable),
90/// then hashes with SHA-256.
91pub fn schema_fingerprint(schema: &Schema) -> Result<DatasetFingerprint, DatasetError> {
92    let canonical = canonical_schema_repr(schema);
93    Ok(DatasetFingerprint::from_schema_json(&canonical))
94}
95
96fn canonical_type_repr(dt: &DataType) -> String {
97    match dt {
98        DataType::Struct(fields) => {
99            let mut sub: Vec<String> = fields
100                .iter()
101                .map(|f| {
102                    format!(
103                        "{}:{}:{}",
104                        f.name(),
105                        canonical_type_repr(f.data_type()),
106                        f.is_nullable()
107                    )
108                })
109                .collect();
110            sub.sort();
111            format!("Struct({})", sub.join(","))
112        }
113        DataType::List(field) => {
114            format!(
115                "List({}:{}:{})",
116                field.name(),
117                canonical_type_repr(field.data_type()),
118                field.is_nullable()
119            )
120        }
121        other => format!("{other}"),
122    }
123}
124
125fn canonical_schema_repr(schema: &Schema) -> String {
126    let mut fields: Vec<String> = schema
127        .fields()
128        .iter()
129        .map(|f| {
130            format!(
131                "{}:{}:{}",
132                f.name(),
133                canonical_type_repr(f.data_type()),
134                f.is_nullable()
135            )
136        })
137        .collect();
138    fields.sort();
139    fields.join("|")
140}
141
142/// Returns true if this JSON Schema variant represents the `null` type,
143/// covering Pydantic v2's multiple encodings:
144/// - `{"type": "null"}`
145/// - `{"const": null}`
146/// - `{"enum": [null]}` (single-element null enum)
147fn is_null_variant(v: &Value) -> bool {
148    if v.get("type").and_then(Value::as_str) == Some("null") {
149        return true;
150    }
151    if v.get("const").map(Value::is_null).unwrap_or(false) {
152        return true;
153    }
154    if let Some(arr) = v.get("enum").and_then(Value::as_array) {
155        if arr.len() == 1 && arr[0].is_null() {
156            return true;
157        }
158    }
159    false
160}
161
162/// Resolve a single JSON Schema property into an Arrow `(DataType, nullable)` pair.
163fn resolve_type(
164    prop: &Value,
165    defs: &Map<String, Value>,
166    nullable: bool,
167    depth: usize,
168) -> Result<(DataType, bool), DatasetError> {
169    if depth >= MAX_SCHEMA_DEPTH {
170        return Err(DatasetError::SchemaParseError(format!(
171            "Schema nesting exceeds maximum depth of {MAX_SCHEMA_DEPTH}"
172        )));
173    }
174
175    let obj = match prop.as_object() {
176        Some(o) => o,
177        None => {
178            return Err(DatasetError::SchemaParseError(
179                "Property must be a JSON object".to_string(),
180            ))
181        }
182    };
183
184    // $ref — look up in $defs
185    if let Some(ref_val) = obj.get("$ref").and_then(Value::as_str) {
186        return resolve_ref(ref_val, defs, nullable, depth + 1);
187    }
188
189    // anyOf — typically Optional[T]: [{T}, {type: "null"}]
190    if let Some(any_of) = obj.get("anyOf").and_then(Value::as_array) {
191        return resolve_any_of(any_of, defs, depth + 1);
192    }
193
194    // enum / Literal
195    if obj.contains_key("enum") {
196        return Ok((
197            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
198            nullable,
199        ));
200    }
201
202    let type_str = obj
203        .get("type")
204        .and_then(Value::as_str)
205        .ok_or_else(|| DatasetError::UnsupportedType(format!("No 'type' in: {prop}")))?;
206
207    match type_str {
208        "integer" => Ok((DataType::Int64, nullable)),
209        "number" => Ok((DataType::Float64, nullable)),
210        "boolean" => Ok((DataType::Boolean, nullable)),
211        "string" => {
212            let format = obj.get("format").and_then(Value::as_str);
213            match format {
214                Some("date-time") => Ok((
215                    DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
216                    nullable,
217                )),
218                Some("date") => Ok((DataType::Date32, nullable)),
219                _ => Ok((DataType::Utf8View, nullable)),
220            }
221        }
222        "array" => {
223            let items = obj.get("items").ok_or_else(|| {
224                DatasetError::SchemaParseError("Array missing 'items'".to_string())
225            })?;
226            let (item_type, item_nullable) = resolve_type(items, defs, true, depth + 1)?;
227            let item_field = Arc::new(Field::new("item", item_type, item_nullable));
228            Ok((DataType::List(item_field), nullable))
229        }
230        "object" => {
231            let props = obj
232                .get("properties")
233                .and_then(Value::as_object)
234                .ok_or_else(|| {
235                    DatasetError::UnsupportedType(
236                        "Free-form dict (object without 'properties') is not yet supported"
237                            .to_string(),
238                    )
239                })?;
240            let required: std::collections::HashSet<&str> = obj
241                .get("required")
242                .and_then(Value::as_array)
243                .map(|arr| arr.iter().filter_map(Value::as_str).collect())
244                .unwrap_or_default();
245            let mut struct_fields = Vec::with_capacity(props.len());
246            for (name, sub_prop) in props {
247                let field_nullable = !required.contains(name.as_str());
248                let (dtype, is_nullable) = resolve_type(sub_prop, defs, field_nullable, depth + 1)?;
249                struct_fields.push(Arc::new(Field::new(name, dtype, is_nullable)));
250            }
251            Ok((DataType::Struct(Fields::from(struct_fields)), nullable))
252        }
253        "null" => Ok((DataType::Null, true)),
254        other => Err(DatasetError::UnsupportedType(other.to_string())),
255    }
256}
257
258/// Resolve `{"$ref": "#/$defs/SomeName"}` to an Arrow DataType.
259///
260/// Handles two cases:
261/// - Object defs with `properties` → Struct
262/// - Non-object defs (e.g., enum, primitive) → delegated back to `resolve_type`
263fn resolve_ref(
264    ref_val: &str,
265    defs: &Map<String, Value>,
266    nullable: bool,
267    depth: usize,
268) -> Result<(DataType, bool), DatasetError> {
269    if depth >= MAX_SCHEMA_DEPTH {
270        return Err(DatasetError::SchemaParseError(format!(
271            "Schema nesting exceeds maximum depth of {MAX_SCHEMA_DEPTH}"
272        )));
273    }
274
275    let def_name = ref_val.strip_prefix("#/$defs/").ok_or_else(|| {
276        DatasetError::RefResolutionError(format!("Unrecognized $ref format: {ref_val}"))
277    })?;
278
279    let def = defs.get(def_name).ok_or_else(|| {
280        DatasetError::RefResolutionError(format!("$defs entry not found: {def_name}"))
281    })?;
282
283    let def_obj = def.as_object().ok_or_else(|| {
284        DatasetError::RefResolutionError(format!("$defs entry '{def_name}' is not an object"))
285    })?;
286
287    // Struct def (nested Pydantic model)
288    if let Some(props) = def_obj.get("properties").and_then(Value::as_object) {
289        let required: std::collections::HashSet<&str> = def_obj
290            .get("required")
291            .and_then(Value::as_array)
292            .map(|arr| arr.iter().filter_map(Value::as_str).collect())
293            .unwrap_or_default();
294
295        let mut struct_fields = Vec::with_capacity(props.len());
296        for (name, sub_prop) in props {
297            let field_nullable = !required.contains(name.as_str());
298            let (dtype, is_nullable) = resolve_type(sub_prop, defs, field_nullable, depth + 1)?;
299            struct_fields.push(Arc::new(Field::new(name, dtype, is_nullable)));
300        }
301        return Ok((DataType::Struct(Fields::from(struct_fields)), nullable));
302    }
303
304    // Non-struct def (enum, primitive, etc.) — delegate to resolve_type
305    resolve_type(def, defs, nullable, depth + 1)
306}
307
308/// Handle `anyOf` — Pydantic's encoding for `Optional[T]` is `[{T}, {"type": "null"}]`.
309/// We find the non-null variant and mark it nullable.
310fn resolve_any_of(
311    variants: &[Value],
312    defs: &Map<String, Value>,
313    depth: usize,
314) -> Result<(DataType, bool), DatasetError> {
315    let non_null: Vec<&Value> = variants.iter().filter(|v| !is_null_variant(v)).collect();
316
317    if non_null.len() == 1 {
318        let (dtype, _) = resolve_type(non_null[0], defs, true, depth)?;
319        return Ok((dtype, true));
320    }
321
322    // Multiple non-null variants — not yet supported
323    Err(DatasetError::UnsupportedType(
324        "anyOf with multiple non-null variants is not supported".to_string(),
325    ))
326}
327
328/// Compute an Arrow schema fingerprint from a Pydantic JSON Schema string.
329/// Convenience wrapper: parse → inject system cols → fingerprint.
330pub fn fingerprint_from_json_schema(json_schema: &str) -> Result<DatasetFingerprint, DatasetError> {
331    let schema = json_schema_to_arrow(json_schema)?;
332    let schema_with_sys = inject_system_columns(schema)?;
333    schema_fingerprint(&schema_with_sys)
334}
335
336/// Build registration inputs from a JSON Schema string + namespace + partition columns.
337/// Returns `(arrow_schema, fingerprint)`.
338#[allow(dead_code)]
339pub(crate) fn build_registration(
340    json_schema: &str,
341    _namespace: &DatasetNamespace,
342    _partition_columns: &[String],
343) -> Result<(Schema, DatasetFingerprint), DatasetError> {
344    let schema = json_schema_to_arrow(json_schema)?;
345    let schema_with_sys = inject_system_columns(schema)?;
346    let fingerprint = schema_fingerprint(&schema_with_sys)?;
347    Ok((schema_with_sys, fingerprint))
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    fn flat_schema_json() -> &'static str {
355        r#"{
356            "type": "object",
357            "title": "UserEvent",
358            "properties": {
359                "user_id": {"type": "string"},
360                "event_type": {"type": "string"},
361                "value": {"type": "number"},
362                "count": {"type": "integer"},
363                "active": {"type": "boolean"},
364                "score": {"type": "number"}
365            },
366            "required": ["user_id", "event_type", "value", "count", "active"]
367        }"#
368    }
369
370    fn optional_schema_json() -> &'static str {
371        r#"{
372            "type": "object",
373            "title": "OptionalModel",
374            "properties": {
375                "name": {"type": "string"},
376                "age": {"anyOf": [{"type": "integer"}, {"type": "null"}]},
377                "score": {"anyOf": [{"type": "number"}, {"type": "null"}]}
378            },
379            "required": ["name"]
380        }"#
381    }
382
383    fn nested_schema_json() -> &'static str {
384        r##"{
385            "type": "object",
386            "title": "Order",
387            "properties": {
388                "order_id": {"type": "string"},
389                "address": {"$ref": "#/$defs/Address"}
390            },
391            "required": ["order_id", "address"],
392            "$defs": {
393                "Address": {
394                    "type": "object",
395                    "properties": {
396                        "street": {"type": "string"},
397                        "city": {"type": "string"},
398                        "zip": {"type": "string"}
399                    },
400                    "required": ["street", "city", "zip"]
401                }
402            }
403        }"##
404    }
405
406    fn datetime_schema_json() -> &'static str {
407        r#"{
408            "type": "object",
409            "title": "Event",
410            "properties": {
411                "created_at": {"type": "string", "format": "date-time"},
412                "event_date": {"type": "string", "format": "date"},
413                "label": {"type": "string"}
414            },
415            "required": ["created_at", "event_date", "label"]
416        }"#
417    }
418
419    fn list_schema_json() -> &'static str {
420        r#"{
421            "type": "object",
422            "title": "BatchPrediction",
423            "properties": {
424                "model_id": {"type": "string"},
425                "scores": {"type": "array", "items": {"type": "number"}}
426            },
427            "required": ["model_id", "scores"]
428        }"#
429    }
430
431    fn enum_schema_json() -> &'static str {
432        r#"{
433            "type": "object",
434            "title": "Status",
435            "properties": {
436                "status": {"enum": ["active", "inactive", "pending"]},
437                "name": {"type": "string"}
438            },
439            "required": ["status", "name"]
440        }"#
441    }
442
443    fn list_of_nested_schema_json() -> &'static str {
444        r##"{
445            "type": "object",
446            "title": "Report",
447            "properties": {
448                "report_id": {"type": "string"},
449                "items": {
450                    "type": "array",
451                    "items": {"$ref": "#/$defs/ReportItem"}
452                }
453            },
454            "required": ["report_id", "items"],
455            "$defs": {
456                "ReportItem": {
457                    "type": "object",
458                    "properties": {
459                        "label": {"type": "string"},
460                        "value": {"type": "number"}
461                    },
462                    "required": ["label", "value"]
463                }
464            }
465        }"##
466    }
467
468    #[test]
469    fn test_flat_schema() {
470        let schema = json_schema_to_arrow(flat_schema_json()).unwrap();
471        assert_eq!(schema.fields().len(), 6);
472
473        let user_id = schema.field_with_name("user_id").unwrap();
474        assert_eq!(user_id.data_type(), &DataType::Utf8View);
475        assert!(!user_id.is_nullable());
476
477        // score is not in required, so nullable
478        let score = schema.field_with_name("score").unwrap();
479        assert!(score.is_nullable());
480
481        let value = schema.field_with_name("value").unwrap();
482        assert_eq!(value.data_type(), &DataType::Float64);
483
484        let count = schema.field_with_name("count").unwrap();
485        assert_eq!(count.data_type(), &DataType::Int64);
486
487        let active = schema.field_with_name("active").unwrap();
488        assert_eq!(active.data_type(), &DataType::Boolean);
489    }
490
491    #[test]
492    fn test_optional_fields() {
493        let schema = json_schema_to_arrow(optional_schema_json()).unwrap();
494
495        let name = schema.field_with_name("name").unwrap();
496        assert!(!name.is_nullable());
497        assert_eq!(name.data_type(), &DataType::Utf8View);
498
499        let age = schema.field_with_name("age").unwrap();
500        assert!(age.is_nullable());
501        assert_eq!(age.data_type(), &DataType::Int64);
502
503        let score = schema.field_with_name("score").unwrap();
504        assert!(score.is_nullable());
505        assert_eq!(score.data_type(), &DataType::Float64);
506    }
507
508    #[test]
509    fn test_nested_struct() {
510        let schema = json_schema_to_arrow(nested_schema_json()).unwrap();
511
512        let address = schema.field_with_name("address").unwrap();
513        assert!(!address.is_nullable());
514        assert!(matches!(address.data_type(), DataType::Struct(_)));
515
516        if let DataType::Struct(fields) = address.data_type() {
517            assert_eq!(fields.len(), 3);
518            let street = fields.find("street").map(|(_, f)| f.clone());
519            assert!(street.is_some());
520            assert_eq!(street.unwrap().data_type(), &DataType::Utf8View);
521        }
522    }
523
524    #[test]
525    fn test_datetime_formats() {
526        let schema = json_schema_to_arrow(datetime_schema_json()).unwrap();
527
528        let created = schema.field_with_name("created_at").unwrap();
529        assert!(matches!(
530            created.data_type(),
531            DataType::Timestamp(TimeUnit::Microsecond, _)
532        ));
533
534        let date = schema.field_with_name("event_date").unwrap();
535        assert_eq!(date.data_type(), &DataType::Date32);
536    }
537
538    #[test]
539    fn test_list_type() {
540        let schema = json_schema_to_arrow(list_schema_json()).unwrap();
541
542        let scores = schema.field_with_name("scores").unwrap();
543        assert!(matches!(scores.data_type(), DataType::List(_)));
544        if let DataType::List(item) = scores.data_type() {
545            assert_eq!(item.data_type(), &DataType::Float64);
546        }
547    }
548
549    #[test]
550    fn test_enum_type() {
551        let schema = json_schema_to_arrow(enum_schema_json()).unwrap();
552
553        let status = schema.field_with_name("status").unwrap();
554        assert!(matches!(status.data_type(), DataType::Dictionary(_, _)));
555    }
556
557    #[test]
558    fn test_list_of_nested() {
559        let schema = json_schema_to_arrow(list_of_nested_schema_json()).unwrap();
560
561        let items = schema.field_with_name("items").unwrap();
562        assert!(matches!(items.data_type(), DataType::List(_)));
563        if let DataType::List(item_field) = items.data_type() {
564            assert!(matches!(item_field.data_type(), DataType::Struct(_)));
565        }
566    }
567
568    #[test]
569    fn test_system_columns_injected() {
570        let schema = json_schema_to_arrow(flat_schema_json()).unwrap();
571        let schema = inject_system_columns(schema).unwrap();
572
573        let created = schema.field_with_name(SCOUTER_CREATED_AT).unwrap();
574        assert!(matches!(
575            created.data_type(),
576            DataType::Timestamp(TimeUnit::Microsecond, _)
577        ));
578        assert!(!created.is_nullable());
579
580        let partition_date = schema.field_with_name(SCOUTER_PARTITION_DATE).unwrap();
581        assert_eq!(partition_date.data_type(), &DataType::Date32);
582        assert!(!partition_date.is_nullable());
583
584        let batch_id = schema.field_with_name(SCOUTER_BATCH_ID).unwrap();
585        assert_eq!(batch_id.data_type(), &DataType::Utf8);
586        assert!(!batch_id.is_nullable());
587    }
588
589    #[test]
590    fn test_reserved_column_collision_error() {
591        let bad = r#"{
592            "type": "object",
593            "properties": {
594                "scouter_created_at": {"type": "string"}
595            },
596            "required": ["scouter_created_at"]
597        }"#;
598        let schema = json_schema_to_arrow(bad).unwrap();
599        let err = inject_system_columns(schema).unwrap_err();
600        assert!(matches!(err, DatasetError::SchemaParseError(_)));
601        assert!(err.to_string().contains("reserved"));
602    }
603
604    #[test]
605    fn test_fingerprint_stability() {
606        let fp1 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
607        let fp2 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
608        assert_eq!(fp1, fp2);
609    }
610
611    #[test]
612    fn test_fingerprint_changes_on_field_add() {
613        let fp1 = fingerprint_from_json_schema(flat_schema_json()).unwrap();
614
615        let modified = r#"{
616            "type": "object",
617            "title": "UserEvent",
618            "properties": {
619                "user_id": {"type": "string"},
620                "event_type": {"type": "string"},
621                "value": {"type": "number"},
622                "count": {"type": "integer"},
623                "active": {"type": "boolean"},
624                "score": {"type": "number"},
625                "new_field": {"type": "string"}
626            },
627            "required": ["user_id", "event_type", "value", "count", "active"]
628        }"#;
629        let fp2 = fingerprint_from_json_schema(modified).unwrap();
630        assert_ne!(fp1, fp2);
631    }
632
633    #[test]
634    fn test_fingerprint_is_32_chars() {
635        let fp = fingerprint_from_json_schema(flat_schema_json()).unwrap();
636        assert_eq!(fp.as_str().len(), 32);
637    }
638
639    #[test]
640    fn test_fingerprint_field_order_independent() {
641        // Same fields, different declaration order → same fingerprint
642        let schema_a = r#"{
643            "type": "object",
644            "properties": {
645                "alpha": {"type": "string"},
646                "beta": {"type": "integer"}
647            },
648            "required": ["alpha", "beta"]
649        }"#;
650        let schema_b = r#"{
651            "type": "object",
652            "properties": {
653                "beta": {"type": "integer"},
654                "alpha": {"type": "string"}
655            },
656            "required": ["alpha", "beta"]
657        }"#;
658        let fp_a = fingerprint_from_json_schema(schema_a).unwrap();
659        let fp_b = fingerprint_from_json_schema(schema_b).unwrap();
660        assert_eq!(fp_a, fp_b);
661    }
662
663    #[test]
664    fn test_unsupported_type_error() {
665        let bad = r#"{
666            "type": "object",
667            "properties": {
668                "field": {"type": "unknown_type"}
669            },
670            "required": ["field"]
671        }"#;
672        let err = json_schema_to_arrow(bad).unwrap_err();
673        assert!(matches!(err, DatasetError::UnsupportedType(_)));
674    }
675
676    #[test]
677    fn test_missing_ref_error() {
678        let bad = r##"{
679            "type": "object",
680            "properties": {
681                "nested": {"$ref": "#/$defs/NonExistent"}
682            },
683            "required": ["nested"]
684        }"##;
685        let err = json_schema_to_arrow(bad).unwrap_err();
686        assert!(matches!(err, DatasetError::RefResolutionError(_)));
687    }
688
689    #[test]
690    fn test_missing_properties_key_error() {
691        let bad = r#"{"type": "object"}"#;
692        let err = json_schema_to_arrow(bad).unwrap_err();
693        assert!(matches!(err, DatasetError::SchemaParseError(_)));
694    }
695
696    #[test]
697    fn test_bad_ref_format_error() {
698        let bad = r##"{
699            "type": "object",
700            "properties": {
701                "x": {"$ref": "definitions/Foo"}
702            },
703            "required": ["x"]
704        }"##;
705        let err = json_schema_to_arrow(bad).unwrap_err();
706        assert!(matches!(err, DatasetError::RefResolutionError(_)));
707    }
708
709    #[test]
710    fn test_property_not_object_error() {
711        let bad = r#"{
712            "type": "object",
713            "properties": {
714                "x": true
715            },
716            "required": ["x"]
717        }"#;
718        let err = json_schema_to_arrow(bad).unwrap_err();
719        assert!(matches!(err, DatasetError::SchemaParseError(_)));
720    }
721
722    #[test]
723    fn test_any_of_multiple_non_null_variants_error() {
724        let bad = r#"{
725            "type": "object",
726            "properties": {
727                "x": {"anyOf": [{"type": "integer"}, {"type": "string"}]}
728            },
729            "required": ["x"]
730        }"#;
731        let err = json_schema_to_arrow(bad).unwrap_err();
732        assert!(matches!(err, DatasetError::UnsupportedType(_)));
733    }
734
735    #[test]
736    fn test_any_of_null_enum_encoding() {
737        // Pydantic v2 may encode Optional[T] null branch as {"enum": [null]}
738        let schema = r#"{
739            "type": "object",
740            "properties": {
741                "x": {"anyOf": [{"type": "integer"}, {"enum": [null]}]}
742            },
743            "required": []
744        }"#;
745        let result = json_schema_to_arrow(schema);
746        assert!(result.is_ok());
747        let field = result.unwrap();
748        let x = field.field_with_name("x").unwrap();
749        assert!(x.is_nullable());
750        assert_eq!(x.data_type(), &DataType::Int64);
751    }
752
753    #[test]
754    fn test_any_of_const_null_encoding() {
755        // Pydantic v2 may encode null branch as {"const": null}
756        let schema = r#"{
757            "type": "object",
758            "properties": {
759                "x": {"anyOf": [{"type": "string"}, {"const": null}]}
760            },
761            "required": []
762        }"#;
763        let result = json_schema_to_arrow(schema);
764        assert!(result.is_ok());
765        let field = result.unwrap();
766        let x = field.field_with_name("x").unwrap();
767        assert!(x.is_nullable());
768        assert_eq!(x.data_type(), &DataType::Utf8View);
769    }
770
771    #[test]
772    fn test_free_form_dict_is_unsupported_type() {
773        let bad = r#"{
774            "type": "object",
775            "properties": {
776                "x": {"type": "object"}
777            },
778            "required": ["x"]
779        }"#;
780        let err = json_schema_to_arrow(bad).unwrap_err();
781        assert!(matches!(err, DatasetError::UnsupportedType(_)));
782    }
783
784    #[test]
785    fn test_build_registration_includes_sys_cols() {
786        use crate::dataset::types::DatasetNamespace;
787        let ns = DatasetNamespace::new("cat", "sch", "tbl").unwrap();
788        let (schema, fingerprint) = build_registration(flat_schema_json(), &ns, &[]).unwrap();
789        assert!(schema.index_of(SCOUTER_CREATED_AT).is_ok());
790        assert!(schema.index_of(SCOUTER_PARTITION_DATE).is_ok());
791        assert!(schema.index_of(SCOUTER_BATCH_ID).is_ok());
792        assert_eq!(fingerprint.as_str().len(), 32);
793    }
794
795    #[test]
796    fn test_max_depth_exceeded() {
797        // Build a deeply nested $ref chain that exceeds MAX_SCHEMA_DEPTH
798        // We simulate by crafting a schema where $defs reference each other > 32 levels deep.
799        // Since $ref resolves via a flat $defs lookup (no actual recursion in the JSON),
800        // we test the depth by constructing an "object" with nested properties 33 levels deep.
801        let mut inner = r#"{"type": "string"}"#.to_string();
802        for _ in 0..MAX_SCHEMA_DEPTH {
803            inner = format!(
804                r#"{{"type": "object", "properties": {{"x": {inner}}}, "required": ["x"]}}"#
805            );
806        }
807        let schema = format!(
808            r#"{{"type": "object", "properties": {{"root": {inner}}}, "required": ["root"]}}"#
809        );
810        let err = json_schema_to_arrow(&schema).unwrap_err();
811        assert!(matches!(err, DatasetError::SchemaParseError(_)));
812        assert!(err.to_string().contains("depth"));
813    }
814
815    /// CONTRACT: the gRPC server and the Python client (TableConfig) must produce
816    /// identical fingerprints for the same Pydantic JSON schema.
817    ///
818    /// Client path (TableConfig::new):
819    ///   fingerprint_from_json_schema(pydantic_json)
820    ///
821    /// Server path (register_dataset handler):
822    ///   fingerprint_from_json_schema(req.json_schema)   ← same call, same input
823    ///
824    /// Both call fingerprint_from_json_schema, which internally does:
825    ///   json_schema_to_arrow → inject_system_columns → schema_fingerprint
826    ///
827    /// If this test fails, DatasetClient::new() will always raise FingerprintMismatch.
828    #[test]
829    fn test_client_server_fingerprint_contract() {
830        let pydantic_json = flat_schema_json();
831
832        // Client-side (TableConfig::new)
833        let client_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
834
835        // Server-side (register_dataset after fix — uses the same function)
836        let server_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
837
838        assert_eq!(
839            client_fp, server_fp,
840            "client and server fingerprints must agree; \
841             both must call fingerprint_from_json_schema with the original Pydantic JSON schema"
842        );
843
844        // Sanity: stored Arrow schema must contain system columns so Delta writes succeed
845        let arrow_schema = json_schema_to_arrow(pydantic_json).unwrap();
846        let schema_with_sys = inject_system_columns(arrow_schema).unwrap();
847        assert!(schema_with_sys
848            .field_with_name(SCOUTER_PARTITION_DATE)
849            .is_ok());
850        assert!(schema_with_sys.field_with_name(SCOUTER_CREATED_AT).is_ok());
851        assert!(schema_with_sys.field_with_name(SCOUTER_BATCH_ID).is_ok());
852    }
853
854    /// Show that hashing the Arrow schema WITHOUT system columns produces a different
855    /// fingerprint from the client path (which includes system columns).
856    /// This documents the pre-fix server bug: omitting inject_system_columns caused mismatches.
857    #[test]
858    fn test_fingerprint_differs_without_system_columns() {
859        let pydantic_json = flat_schema_json();
860
861        // Client fingerprint: Arrow schema WITH system columns, canonical repr
862        let client_fp = fingerprint_from_json_schema(pydantic_json).unwrap();
863
864        // Old server fingerprint: canonical repr WITHOUT system columns
865        let arrow_schema_no_sys = json_schema_to_arrow(pydantic_json).unwrap();
866        let old_server_fp =
867            DatasetFingerprint::from_schema_json(&canonical_schema_repr(&arrow_schema_no_sys));
868
869        assert_ne!(
870            client_fp, old_server_fp,
871            "omitting inject_system_columns changes the fingerprint — \
872             this was the pre-fix bug that caused FingerprintMismatch in DatasetClient::new()"
873        );
874    }
875}