Skip to main content

scouter_types/dataset/
batch_builder.rs

1use std::sync::Arc;
2
3use arrow::array::{
4    ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder,
5    ListBuilder, RecordBatch, StringBuilder, StringDictionaryBuilder, StringViewBuilder,
6    TimestampMicrosecondBuilder,
7};
8use arrow::datatypes::{DataType, Fields, Int16Type, SchemaRef, TimeUnit};
9use chrono::{NaiveDate, Utc};
10use serde_json::Value;
11use uuid::Uuid;
12
13use crate::dataset::error::DatasetError;
14
15/// Builds Arrow [`RecordBatch`]es from JSON strings at runtime.
16///
17/// The schema (with system columns already injected via [`inject_system_columns`]) is
18/// provided at construction. Call [`append_json_row`] for each JSON string produced by
19/// `pydantic_instance.model_dump_json()`, then [`finish`] to materialise the batch with
20/// system columns (`scouter_created_at`, `scouter_partition_date`, `scouter_batch_id`)
21/// automatically filled in.
22///
23/// System columns must be the last three fields in the schema, in the order mandated by
24/// [`inject_system_columns`].
25pub struct DynamicBatchBuilder {
26    /// Full schema including system columns (last 3 fields).
27    schema: SchemaRef,
28    /// Number of user-defined columns (`schema.fields().len() - 3`).
29    user_field_count: usize,
30    /// Accumulated JSON values per user field: `columns[i]` is the column for
31    /// `schema.field(i)`.  Using `Option<Value>` defers Arrow builder dispatch
32    /// until `finish()`, which keeps `append_json_row` allocation-free.
33    columns: Vec<Vec<Option<Value>>>,
34    /// Number of rows appended so far.
35    row_count: usize,
36}
37
38impl DynamicBatchBuilder {
39    /// Construct a builder for the given schema.
40    ///
41    /// Panics in debug builds if the schema has fewer than 3 fields (the minimum
42    /// needed for the three system columns).
43    pub fn new(schema: SchemaRef) -> Self {
44        let n_fields = schema.fields().len();
45        debug_assert!(
46            n_fields >= 3,
47            "Schema must contain at least 3 system columns"
48        );
49        let user_field_count = n_fields.saturating_sub(3);
50        Self {
51            schema,
52            user_field_count,
53            columns: vec![Vec::new(); user_field_count],
54            row_count: 0,
55        }
56    }
57
58    /// Parse `json_str` and append one row.
59    ///
60    /// `json_str` must be a JSON object whose keys cover the user-defined fields.
61    /// Missing keys append `null` (valid only for nullable fields — the schema
62    /// determines nullability, Arrow validates it at `finish()`).
63    ///
64    /// Returns an error if `json_str` is not valid JSON or if it is not a JSON object.
65    pub fn append_json_row(&mut self, json_str: &str) -> Result<(), DatasetError> {
66        let root: Value = serde_json::from_str(json_str)?;
67        let obj = root.as_object().ok_or_else(|| {
68            DatasetError::SchemaParseError(
69                "JSON row must be an object (model_dump_json() output expected)".to_string(),
70            )
71        })?;
72
73        for (col_idx, field) in self.schema.fields()[..self.user_field_count]
74            .iter()
75            .enumerate()
76        {
77            let val = obj.get(field.name()).cloned();
78            self.columns[col_idx].push(val);
79        }
80        self.row_count += 1;
81        Ok(())
82    }
83
84    /// Number of rows appended so far.
85    pub fn row_count(&self) -> usize {
86        self.row_count
87    }
88
89    /// Returns `true` if no rows have been appended.
90    pub fn is_empty(&self) -> bool {
91        self.row_count == 0
92    }
93
94    /// Consume the builder and produce a [`RecordBatch`].
95    ///
96    /// System columns are automatically injected:
97    /// - `scouter_created_at`: current UTC timestamp (microsecond precision)
98    /// - `scouter_partition_date`: today's date
99    /// - `scouter_batch_id`: a UUID v7 string shared across all rows in this batch
100    pub fn finish(self) -> Result<RecordBatch, DatasetError> {
101        let n = self.row_count;
102
103        // Build user columns
104        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
105        for (col_idx, field) in self.schema.fields()[..self.user_field_count]
106            .iter()
107            .enumerate()
108        {
109            let arr = build_array(&self.columns[col_idx], field.data_type())?;
110            arrays.push(arr);
111        }
112
113        // --- System columns ---
114
115        // scouter_created_at: Timestamp(Microsecond, UTC)
116        let now_us = Utc::now().timestamp_micros();
117        let mut ts_builder =
118            TimestampMicrosecondBuilder::with_capacity(n).with_timezone("UTC".to_string());
119        for _ in 0..n {
120            ts_builder.append_value(now_us);
121        }
122        arrays.push(Arc::new(ts_builder.finish()));
123
124        // scouter_partition_date: Date32 (days since UNIX epoch)
125        let today = Utc::now().date_naive();
126        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("epoch is valid");
127        let days_since_epoch = (today - epoch).num_days() as i32;
128        let mut date_builder = Date32Builder::with_capacity(n);
129        for _ in 0..n {
130            date_builder.append_value(days_since_epoch);
131        }
132        arrays.push(Arc::new(date_builder.finish()));
133
134        // scouter_batch_id: Utf8 — one UUID v7 shared across the entire batch
135        let batch_id = Uuid::now_v7().to_string();
136        let mut id_builder = StringBuilder::with_capacity(n, n * 36);
137        for _ in 0..n {
138            id_builder.append_value(&batch_id);
139        }
140        arrays.push(Arc::new(id_builder.finish()));
141
142        RecordBatch::try_new(self.schema, arrays).map_err(|e| {
143            DatasetError::ArrowSchemaError(format!("Failed to create RecordBatch: {e}"))
144        })
145    }
146}
147
148// ---------------------------------------------------------------------------
149// Internal helpers
150// ---------------------------------------------------------------------------
151
152/// Build a single Arrow [`ArrayRef`] from a column of optional JSON values.
153fn build_array(values: &[Option<Value>], data_type: &DataType) -> Result<ArrayRef, DatasetError> {
154    match data_type {
155        DataType::Int64 => {
156            let mut b = Int64Builder::with_capacity(values.len());
157            for v in values {
158                match v {
159                    Some(Value::Number(n)) => match n.as_i64() {
160                        Some(i) => b.append_value(i),
161                        None => {
162                            return Err(DatasetError::SchemaParseError(format!(
163                                "Cannot coerce {n} to Int64"
164                            )))
165                        }
166                    },
167                    Some(Value::Null) | None => b.append_null(),
168                    other => {
169                        return Err(DatasetError::SchemaParseError(format!(
170                            "Expected integer, got: {other:?}"
171                        )))
172                    }
173                }
174            }
175            Ok(Arc::new(b.finish()))
176        }
177
178        DataType::Float64 => {
179            let mut b = Float64Builder::with_capacity(values.len());
180            for v in values {
181                match v {
182                    Some(Value::Number(n)) => match n.as_f64() {
183                        Some(f) => b.append_value(f),
184                        None => {
185                            return Err(DatasetError::SchemaParseError(format!(
186                                "Cannot coerce {n} to Float64"
187                            )))
188                        }
189                    },
190                    Some(Value::Null) | None => b.append_null(),
191                    other => {
192                        return Err(DatasetError::SchemaParseError(format!(
193                            "Expected number, got: {other:?}"
194                        )))
195                    }
196                }
197            }
198            Ok(Arc::new(b.finish()))
199        }
200
201        DataType::Utf8View => {
202            let mut b = StringViewBuilder::with_capacity(values.len());
203            for v in values {
204                match v {
205                    Some(Value::String(s)) => b.append_value(s),
206                    Some(Value::Null) | None => b.append_null(),
207                    other => {
208                        return Err(DatasetError::SchemaParseError(format!(
209                            "Expected string, got: {other:?}"
210                        )))
211                    }
212                }
213            }
214            Ok(Arc::new(b.finish()))
215        }
216
217        // scouter_batch_id uses plain Utf8, not Utf8View
218        DataType::Utf8 => {
219            let mut b = StringBuilder::with_capacity(values.len(), values.len() * 8);
220            for v in values {
221                match v {
222                    Some(Value::String(s)) => b.append_value(s),
223                    Some(Value::Null) | None => b.append_null(),
224                    other => {
225                        return Err(DatasetError::SchemaParseError(format!(
226                            "Expected string, got: {other:?}"
227                        )))
228                    }
229                }
230            }
231            Ok(Arc::new(b.finish()))
232        }
233
234        DataType::Boolean => {
235            let mut b = BooleanBuilder::with_capacity(values.len());
236            for v in values {
237                match v {
238                    Some(Value::Bool(bv)) => b.append_value(*bv),
239                    Some(Value::Null) | None => b.append_null(),
240                    other => {
241                        return Err(DatasetError::SchemaParseError(format!(
242                            "Expected boolean, got: {other:?}"
243                        )))
244                    }
245                }
246            }
247            Ok(Arc::new(b.finish()))
248        }
249
250        DataType::Timestamp(TimeUnit::Microsecond, _) => {
251            let mut b = TimestampMicrosecondBuilder::with_capacity(values.len())
252                .with_timezone("UTC".to_string());
253            for v in values {
254                match v {
255                    Some(Value::String(s)) => {
256                        let ts = chrono::DateTime::parse_from_rfc3339(s)
257                            .map_err(|e| {
258                                DatasetError::SchemaParseError(format!(
259                                    "Cannot parse '{s}' as RFC3339 datetime: {e}"
260                                ))
261                            })?
262                            .timestamp_micros();
263                        b.append_value(ts);
264                    }
265                    Some(Value::Null) | None => b.append_null(),
266                    other => {
267                        return Err(DatasetError::SchemaParseError(format!(
268                            "Expected datetime string, got: {other:?}"
269                        )))
270                    }
271                }
272            }
273            Ok(Arc::new(b.finish()))
274        }
275
276        DataType::Date32 => {
277            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("epoch is valid");
278            let mut b = Date32Builder::with_capacity(values.len());
279            for v in values {
280                match v {
281                    Some(Value::String(s)) => {
282                        let date = NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| {
283                            DatasetError::SchemaParseError(format!(
284                                "Cannot parse '{s}' as date (YYYY-MM-DD): {e}"
285                            ))
286                        })?;
287                        let days = (date - epoch).num_days() as i32;
288                        b.append_value(days);
289                    }
290                    Some(Value::Null) | None => b.append_null(),
291                    other => {
292                        return Err(DatasetError::SchemaParseError(format!(
293                            "Expected date string, got: {other:?}"
294                        )))
295                    }
296                }
297            }
298            Ok(Arc::new(b.finish()))
299        }
300
301        DataType::Dictionary(key_type, value_type) => {
302            if key_type.as_ref() == &DataType::Int16 && value_type.as_ref() == &DataType::Utf8 {
303                let mut b: StringDictionaryBuilder<Int16Type> =
304                    StringDictionaryBuilder::with_capacity(values.len(), 16, values.len() * 8);
305                for v in values {
306                    match v {
307                        Some(Value::String(s)) => {
308                            b.append_value(s);
309                        }
310                        Some(Value::Null) | None => b.append_null(),
311                        other => {
312                            return Err(DatasetError::SchemaParseError(format!(
313                                "Expected string for dictionary, got: {other:?}"
314                            )))
315                        }
316                    }
317                }
318                Ok(Arc::new(b.finish()))
319            } else {
320                Err(DatasetError::UnsupportedType(format!(
321                    "Dictionary({key_type:?}, {value_type:?}) — only Dictionary(Int16, Utf8) is supported"
322                )))
323            }
324        }
325
326        DataType::List(item_field) => {
327            let inner_builder = make_builder(item_field.data_type(), values.len())?;
328            let mut list_builder = ListBuilder::new(inner_builder);
329            for v in values {
330                match v {
331                    Some(Value::Array(items)) => {
332                        let inner = list_builder.values();
333                        append_to_builder(inner, items, item_field.data_type())?;
334                        list_builder.append(true);
335                    }
336                    Some(Value::Null) | None => {
337                        list_builder.append_null();
338                    }
339                    other => {
340                        return Err(DatasetError::SchemaParseError(format!(
341                            "Expected array, got: {other:?}"
342                        )))
343                    }
344                }
345            }
346            Ok(Arc::new(list_builder.finish()))
347        }
348
349        DataType::Struct(fields) => build_struct_array(values, fields),
350
351        other => Err(DatasetError::UnsupportedType(format!(
352            "Arrow type {other} is not supported by DynamicBatchBuilder"
353        ))),
354    }
355}
356
357/// Build a struct array column from a slice of optional JSON objects.
358fn build_struct_array(values: &[Option<Value>], fields: &Fields) -> Result<ArrayRef, DatasetError> {
359    // Collect per-subfield columns
360    let mut sub_cols: Vec<Vec<Option<Value>>> =
361        vec![Vec::with_capacity(values.len()); fields.len()];
362
363    for v in values {
364        match v {
365            Some(Value::Object(obj)) => {
366                for (i, field) in fields.iter().enumerate() {
367                    sub_cols[i].push(obj.get(field.name()).cloned());
368                }
369            }
370            Some(Value::Null) | None => {
371                for col in sub_cols.iter_mut() {
372                    col.push(None);
373                }
374            }
375            other => {
376                return Err(DatasetError::SchemaParseError(format!(
377                    "Expected JSON object for struct field, got: {other:?}"
378                )))
379            }
380        }
381    }
382
383    let sub_arrays: Vec<ArrayRef> = fields
384        .iter()
385        .enumerate()
386        .map(|(i, field)| build_array(&sub_cols[i], field.data_type()))
387        .collect::<Result<_, _>>()?;
388
389    // Build null bitmap from the top-level option
390    let null_buffer: arrow::buffer::NullBuffer = values
391        .iter()
392        .map(|v| v.as_ref().map(|v| !v.is_null()).unwrap_or(false))
393        .collect();
394
395    let struct_array =
396        arrow::array::StructArray::new(fields.clone(), sub_arrays, Some(null_buffer));
397
398    Ok(Arc::new(struct_array))
399}
400
401/// Create a concrete [`ArrayBuilder`] for a given Arrow [`DataType`].
402/// Used to construct inner builders for [`ListBuilder`].
403fn make_builder(
404    data_type: &DataType,
405    capacity: usize,
406) -> Result<Box<dyn ArrayBuilder>, DatasetError> {
407    match data_type {
408        DataType::Int64 => Ok(Box::new(Int64Builder::with_capacity(capacity))),
409        DataType::Float64 => Ok(Box::new(Float64Builder::with_capacity(capacity))),
410        DataType::Utf8View => Ok(Box::new(StringViewBuilder::with_capacity(capacity))),
411        DataType::Utf8 => Ok(Box::new(StringBuilder::with_capacity(
412            capacity,
413            capacity * 8,
414        ))),
415        DataType::Boolean => Ok(Box::new(BooleanBuilder::with_capacity(capacity))),
416        DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(Box::new(
417            TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC".to_string()),
418        )),
419        DataType::Date32 => Ok(Box::new(Date32Builder::with_capacity(capacity))),
420        other => Err(DatasetError::UnsupportedType(format!(
421            "Cannot create list item builder for {other}"
422        ))),
423    }
424}
425
426/// Append a slice of JSON values to an existing `dyn ArrayBuilder`.
427/// Used when filling the inner builder of a [`ListBuilder`].
428fn append_to_builder(
429    builder: &mut dyn ArrayBuilder,
430    items: &[Value],
431    data_type: &DataType,
432) -> Result<(), DatasetError> {
433    match data_type {
434        DataType::Int64 => {
435            let b = builder
436                .as_any_mut()
437                .downcast_mut::<Int64Builder>()
438                .ok_or_else(|| {
439                    DatasetError::SchemaParseError(
440                        "Internal error: builder type mismatch for Int64".to_string(),
441                    )
442                })?;
443            for v in items {
444                match v {
445                    Value::Number(n) => b.append_value(n.as_i64().ok_or_else(|| {
446                        DatasetError::SchemaParseError(format!("Cannot coerce {n} to Int64"))
447                    })?),
448                    Value::Null => b.append_null(),
449                    other => {
450                        return Err(DatasetError::SchemaParseError(format!(
451                            "Expected integer in list, got: {other:?}"
452                        )))
453                    }
454                }
455            }
456        }
457        DataType::Float64 => {
458            let b = builder
459                .as_any_mut()
460                .downcast_mut::<Float64Builder>()
461                .ok_or_else(|| {
462                    DatasetError::SchemaParseError(
463                        "Internal error: builder type mismatch for Float64".to_string(),
464                    )
465                })?;
466            for v in items {
467                match v {
468                    Value::Number(n) => b.append_value(n.as_f64().ok_or_else(|| {
469                        DatasetError::SchemaParseError(format!("Cannot coerce {n} to Float64"))
470                    })?),
471                    Value::Null => b.append_null(),
472                    other => {
473                        return Err(DatasetError::SchemaParseError(format!(
474                            "Expected number in list, got: {other:?}"
475                        )))
476                    }
477                }
478            }
479        }
480        DataType::Utf8View => {
481            let b = builder
482                .as_any_mut()
483                .downcast_mut::<StringViewBuilder>()
484                .ok_or_else(|| {
485                    DatasetError::SchemaParseError(
486                        "Internal error: builder type mismatch for Utf8View".to_string(),
487                    )
488                })?;
489            for v in items {
490                match v {
491                    Value::String(s) => b.append_value(s),
492                    Value::Null => b.append_null(),
493                    other => {
494                        return Err(DatasetError::SchemaParseError(format!(
495                            "Expected string in list, got: {other:?}"
496                        )))
497                    }
498                }
499            }
500        }
501        DataType::Boolean => {
502            let b = builder
503                .as_any_mut()
504                .downcast_mut::<BooleanBuilder>()
505                .ok_or_else(|| {
506                    DatasetError::SchemaParseError(
507                        "Internal error: builder type mismatch for Boolean".to_string(),
508                    )
509                })?;
510            for v in items {
511                match v {
512                    Value::Bool(bv) => b.append_value(*bv),
513                    Value::Null => b.append_null(),
514                    other => {
515                        return Err(DatasetError::SchemaParseError(format!(
516                            "Expected boolean in list, got: {other:?}"
517                        )))
518                    }
519                }
520            }
521        }
522        other => {
523            return Err(DatasetError::UnsupportedType(format!(
524                "List item type {other} is not supported"
525            )))
526        }
527    }
528    Ok(())
529}
530
531// ---------------------------------------------------------------------------
532// Tests
533// ---------------------------------------------------------------------------
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use crate::dataset::schema::{
539        inject_system_columns, json_schema_to_arrow, SCOUTER_BATCH_ID, SCOUTER_CREATED_AT,
540        SCOUTER_PARTITION_DATE,
541    };
542    use arrow::array::{
543        Array, BooleanArray, Date32Array, Float64Array, Int64Array, TimestampMicrosecondArray,
544    };
545    use arrow::datatypes::DataType;
546
547    fn schema_from_json(json: &str) -> SchemaRef {
548        let schema = json_schema_to_arrow(json).unwrap();
549        Arc::new(inject_system_columns(schema).unwrap())
550    }
551
552    fn flat_schema() -> SchemaRef {
553        schema_from_json(
554            r#"{
555                "type": "object",
556                "properties": {
557                    "user_id": {"type": "string"},
558                    "value": {"type": "number"},
559                    "count": {"type": "integer"},
560                    "active": {"type": "boolean"}
561                },
562                "required": ["user_id", "value", "count", "active"]
563            }"#,
564        )
565    }
566
567    #[test]
568    fn test_flat_types_round_trip() {
569        let schema = flat_schema();
570        let mut b = DynamicBatchBuilder::new(schema.clone());
571        b.append_json_row(r#"{"user_id":"alice","value":1.5,"count":3,"active":true}"#)
572            .unwrap();
573        b.append_json_row(r#"{"user_id":"bob","value":2.0,"count":7,"active":false}"#)
574            .unwrap();
575        assert_eq!(b.row_count(), 2);
576
577        let batch = b.finish().unwrap();
578        assert_eq!(batch.num_rows(), 2);
579        assert_eq!(batch.schema(), schema);
580
581        // spot-check user columns
582        let val_col = batch
583            .column_by_name("value")
584            .unwrap()
585            .as_any()
586            .downcast_ref::<Float64Array>()
587            .unwrap();
588        assert!((val_col.value(0) - 1.5).abs() < f64::EPSILON);
589
590        let cnt_col = batch
591            .column_by_name("count")
592            .unwrap()
593            .as_any()
594            .downcast_ref::<Int64Array>()
595            .unwrap();
596        assert_eq!(cnt_col.value(1), 7);
597
598        let active_col = batch
599            .column_by_name("active")
600            .unwrap()
601            .as_any()
602            .downcast_ref::<BooleanArray>()
603            .unwrap();
604        assert!(!active_col.value(1));
605    }
606
607    #[test]
608    fn test_system_columns_injected() {
609        let schema = flat_schema();
610        let mut b = DynamicBatchBuilder::new(schema);
611        b.append_json_row(r#"{"user_id":"x","value":0.0,"count":0,"active":false}"#)
612            .unwrap();
613        let batch = b.finish().unwrap();
614
615        // scouter_created_at
616        let ts = batch
617            .column_by_name(SCOUTER_CREATED_AT)
618            .unwrap()
619            .as_any()
620            .downcast_ref::<TimestampMicrosecondArray>()
621            .unwrap();
622        assert!(ts.value(0) > 0);
623
624        // scouter_partition_date
625        let date = batch
626            .column_by_name(SCOUTER_PARTITION_DATE)
627            .unwrap()
628            .as_any()
629            .downcast_ref::<Date32Array>()
630            .unwrap();
631        // days since epoch should be positive (we're past 1970)
632        assert!(date.value(0) > 0);
633
634        // scouter_batch_id is shared across all rows
635        let ids = batch.column_by_name(SCOUTER_BATCH_ID).unwrap();
636        assert_eq!(ids.len(), 1);
637        assert!(!ids.is_null(0));
638    }
639
640    #[test]
641    fn test_batch_id_shared_across_rows() {
642        let schema = flat_schema();
643        let mut b = DynamicBatchBuilder::new(schema);
644        for _ in 0..5 {
645            b.append_json_row(r#"{"user_id":"u","value":0.0,"count":0,"active":true}"#)
646                .unwrap();
647        }
648        let batch = b.finish().unwrap();
649        let ids: Vec<String> = (0..5)
650            .map(|i| {
651                arrow::array::as_string_array(batch.column_by_name(SCOUTER_BATCH_ID).unwrap())
652                    .value(i)
653                    .to_string()
654            })
655            .collect();
656        // All rows in a batch share the same UUID
657        assert!(ids.windows(2).all(|w| w[0] == w[1]));
658        // UUID is non-empty
659        assert_eq!(ids[0].len(), 36);
660    }
661
662    #[test]
663    fn test_nullable_fields() {
664        let schema = schema_from_json(
665            r#"{
666                "type": "object",
667                "properties": {
668                    "name": {"type": "string"},
669                    "age": {"anyOf": [{"type": "integer"}, {"type": "null"}]}
670                },
671                "required": ["name"]
672            }"#,
673        );
674        let mut b = DynamicBatchBuilder::new(schema);
675        b.append_json_row(r#"{"name":"alice","age":30}"#).unwrap();
676        b.append_json_row(r#"{"name":"bob","age":null}"#).unwrap();
677        b.append_json_row(r#"{"name":"carol"}"#).unwrap(); // missing → null
678
679        let batch = b.finish().unwrap();
680        let age = batch
681            .column_by_name("age")
682            .unwrap()
683            .as_any()
684            .downcast_ref::<Int64Array>()
685            .unwrap();
686        assert_eq!(age.value(0), 30);
687        assert!(age.is_null(1));
688        assert!(age.is_null(2));
689    }
690
691    #[test]
692    fn test_timestamp_parsing() {
693        let schema = schema_from_json(
694            r#"{
695                "type": "object",
696                "properties": {
697                    "ts": {"type": "string", "format": "date-time"}
698                },
699                "required": ["ts"]
700            }"#,
701        );
702        let mut b = DynamicBatchBuilder::new(schema);
703        b.append_json_row(r#"{"ts":"2024-06-01T12:00:00Z"}"#)
704            .unwrap();
705        let batch = b.finish().unwrap();
706        let ts = batch
707            .column_by_name("ts")
708            .unwrap()
709            .as_any()
710            .downcast_ref::<TimestampMicrosecondArray>()
711            .unwrap();
712        // 2024-06-01T12:00:00Z = 1717243200000000 µs
713        assert_eq!(ts.value(0), 1_717_243_200_000_000);
714    }
715
716    #[test]
717    fn test_date_parsing() {
718        let schema = schema_from_json(
719            r#"{
720                "type": "object",
721                "properties": {
722                    "d": {"type": "string", "format": "date"}
723                },
724                "required": ["d"]
725            }"#,
726        );
727        let mut b = DynamicBatchBuilder::new(schema);
728        b.append_json_row(r#"{"d":"1970-01-02"}"#).unwrap();
729        let batch = b.finish().unwrap();
730        let dates = batch
731            .column_by_name("d")
732            .unwrap()
733            .as_any()
734            .downcast_ref::<Date32Array>()
735            .unwrap();
736        assert_eq!(dates.value(0), 1); // 1 day after epoch
737    }
738
739    #[test]
740    fn test_nested_struct() {
741        let schema = schema_from_json(
742            r##"{
743                "type": "object",
744                "properties": {
745                    "id": {"type": "string"},
746                    "addr": {"$ref": "#/$defs/Addr"}
747                },
748                "required": ["id", "addr"],
749                "$defs": {
750                    "Addr": {
751                        "type": "object",
752                        "properties": {
753                            "city": {"type": "string"},
754                            "zip": {"type": "string"}
755                        },
756                        "required": ["city", "zip"]
757                    }
758                }
759            }"##,
760        );
761        let mut b = DynamicBatchBuilder::new(schema);
762        b.append_json_row(r#"{"id":"1","addr":{"city":"NYC","zip":"10001"}}"#)
763            .unwrap();
764        let batch = b.finish().unwrap();
765        let addr_col = batch.column_by_name("addr").unwrap();
766        assert!(matches!(addr_col.data_type(), DataType::Struct(_)));
767        assert!(!addr_col.is_null(0));
768    }
769
770    #[test]
771    fn test_list_field() {
772        let schema = schema_from_json(
773            r#"{
774                "type": "object",
775                "properties": {
776                    "scores": {"type": "array", "items": {"type": "number"}}
777                },
778                "required": ["scores"]
779            }"#,
780        );
781        let mut b = DynamicBatchBuilder::new(schema);
782        b.append_json_row(r#"{"scores":[1.0,2.5,3.0]}"#).unwrap();
783        let batch = b.finish().unwrap();
784        let scores = batch.column_by_name("scores").unwrap();
785        assert!(matches!(scores.data_type(), DataType::List(_)));
786        assert_eq!(scores.len(), 1);
787    }
788
789    #[test]
790    fn test_dictionary_field() {
791        let schema = schema_from_json(
792            r#"{
793                "type": "object",
794                "properties": {
795                    "status": {"enum": ["active","inactive"]}
796                },
797                "required": ["status"]
798            }"#,
799        );
800        let mut b = DynamicBatchBuilder::new(schema);
801        b.append_json_row(r#"{"status":"active"}"#).unwrap();
802        b.append_json_row(r#"{"status":"inactive"}"#).unwrap();
803        let batch = b.finish().unwrap();
804        let status = batch.column_by_name("status").unwrap();
805        assert!(matches!(status.data_type(), DataType::Dictionary(_, _)));
806    }
807
808    #[test]
809    fn test_empty_builder_finish() {
810        let schema = flat_schema();
811        let b = DynamicBatchBuilder::new(schema.clone());
812        assert!(b.is_empty());
813        let batch = b.finish().unwrap();
814        assert_eq!(batch.num_rows(), 0);
815        assert_eq!(batch.schema(), schema);
816    }
817
818    #[test]
819    fn test_malformed_json_error() {
820        let schema = flat_schema();
821        let mut b = DynamicBatchBuilder::new(schema);
822        let err = b.append_json_row("{not valid json}").unwrap_err();
823        assert!(matches!(err, DatasetError::SerializationError(_)));
824    }
825
826    #[test]
827    fn test_non_object_json_error() {
828        let schema = flat_schema();
829        let mut b = DynamicBatchBuilder::new(schema);
830        let err = b
831            .append_json_row(r#"["array","not","object"]"#)
832            .unwrap_err();
833        assert!(matches!(err, DatasetError::SchemaParseError(_)));
834    }
835
836    #[test]
837    fn test_type_mismatch_int_error() {
838        let schema = flat_schema();
839        let mut b = DynamicBatchBuilder::new(schema);
840        // "count" is Int64, but we pass a string
841        b.append_json_row(r#"{"user_id":"u","value":1.0,"count":"bad","active":true}"#)
842            .unwrap(); // append succeeds (we defer type checking to finish)
843
844        // build_array is called at finish, so error surfaces there
845        let err = b.finish().unwrap_err();
846        assert!(matches!(err, DatasetError::SchemaParseError(_)));
847    }
848
849    #[test]
850    fn test_row_count_matches() {
851        let schema = flat_schema();
852        let mut b = DynamicBatchBuilder::new(schema);
853        for i in 0..42 {
854            b.append_json_row(&format!(
855                r#"{{"user_id":"u{i}","value":{i}.0,"count":{i},"active":true}}"#
856            ))
857            .unwrap();
858        }
859        assert_eq!(b.row_count(), 42);
860        let batch = b.finish().unwrap();
861        assert_eq!(batch.num_rows(), 42);
862    }
863}