1use std::sync::Arc;
2
3use arrow::array::{
4 ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder,
5 ListBuilder, RecordBatch, StringBuilder, StringDictionaryBuilder, StringViewBuilder,
6 TimestampMicrosecondBuilder,
7};
8use arrow::datatypes::{DataType, Fields, Int16Type, SchemaRef, TimeUnit};
9use chrono::{NaiveDate, Utc};
10use serde_json::Value;
11use uuid::Uuid;
12
13use crate::dataset::error::DatasetError;
14
15pub struct DynamicBatchBuilder {
26 schema: SchemaRef,
28 user_field_count: usize,
30 columns: Vec<Vec<Option<Value>>>,
34 row_count: usize,
36}
37
38impl DynamicBatchBuilder {
39 pub fn new(schema: SchemaRef) -> Self {
44 let n_fields = schema.fields().len();
45 debug_assert!(
46 n_fields >= 3,
47 "Schema must contain at least 3 system columns"
48 );
49 let user_field_count = n_fields.saturating_sub(3);
50 Self {
51 schema,
52 user_field_count,
53 columns: vec![Vec::new(); user_field_count],
54 row_count: 0,
55 }
56 }
57
58 pub fn append_json_row(&mut self, json_str: &str) -> Result<(), DatasetError> {
66 let root: Value = serde_json::from_str(json_str)?;
67 let obj = root.as_object().ok_or_else(|| {
68 DatasetError::SchemaParseError(
69 "JSON row must be an object (model_dump_json() output expected)".to_string(),
70 )
71 })?;
72
73 for (col_idx, field) in self.schema.fields()[..self.user_field_count]
74 .iter()
75 .enumerate()
76 {
77 let val = obj.get(field.name()).cloned();
78 self.columns[col_idx].push(val);
79 }
80 self.row_count += 1;
81 Ok(())
82 }
83
84 pub fn row_count(&self) -> usize {
86 self.row_count
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.row_count == 0
92 }
93
94 pub fn finish(self) -> Result<RecordBatch, DatasetError> {
101 let n = self.row_count;
102
103 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
105 for (col_idx, field) in self.schema.fields()[..self.user_field_count]
106 .iter()
107 .enumerate()
108 {
109 let arr = build_array(&self.columns[col_idx], field.data_type())?;
110 arrays.push(arr);
111 }
112
113 let now_us = Utc::now().timestamp_micros();
117 let mut ts_builder =
118 TimestampMicrosecondBuilder::with_capacity(n).with_timezone("UTC".to_string());
119 for _ in 0..n {
120 ts_builder.append_value(now_us);
121 }
122 arrays.push(Arc::new(ts_builder.finish()));
123
124 let today = Utc::now().date_naive();
126 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("epoch is valid");
127 let days_since_epoch = (today - epoch).num_days() as i32;
128 let mut date_builder = Date32Builder::with_capacity(n);
129 for _ in 0..n {
130 date_builder.append_value(days_since_epoch);
131 }
132 arrays.push(Arc::new(date_builder.finish()));
133
134 let batch_id = Uuid::now_v7().to_string();
136 let mut id_builder = StringBuilder::with_capacity(n, n * 36);
137 for _ in 0..n {
138 id_builder.append_value(&batch_id);
139 }
140 arrays.push(Arc::new(id_builder.finish()));
141
142 RecordBatch::try_new(self.schema, arrays).map_err(|e| {
143 DatasetError::ArrowSchemaError(format!("Failed to create RecordBatch: {e}"))
144 })
145 }
146}
147
148fn build_array(values: &[Option<Value>], data_type: &DataType) -> Result<ArrayRef, DatasetError> {
154 match data_type {
155 DataType::Int64 => {
156 let mut b = Int64Builder::with_capacity(values.len());
157 for v in values {
158 match v {
159 Some(Value::Number(n)) => match n.as_i64() {
160 Some(i) => b.append_value(i),
161 None => {
162 return Err(DatasetError::SchemaParseError(format!(
163 "Cannot coerce {n} to Int64"
164 )))
165 }
166 },
167 Some(Value::Null) | None => b.append_null(),
168 other => {
169 return Err(DatasetError::SchemaParseError(format!(
170 "Expected integer, got: {other:?}"
171 )))
172 }
173 }
174 }
175 Ok(Arc::new(b.finish()))
176 }
177
178 DataType::Float64 => {
179 let mut b = Float64Builder::with_capacity(values.len());
180 for v in values {
181 match v {
182 Some(Value::Number(n)) => match n.as_f64() {
183 Some(f) => b.append_value(f),
184 None => {
185 return Err(DatasetError::SchemaParseError(format!(
186 "Cannot coerce {n} to Float64"
187 )))
188 }
189 },
190 Some(Value::Null) | None => b.append_null(),
191 other => {
192 return Err(DatasetError::SchemaParseError(format!(
193 "Expected number, got: {other:?}"
194 )))
195 }
196 }
197 }
198 Ok(Arc::new(b.finish()))
199 }
200
201 DataType::Utf8View => {
202 let mut b = StringViewBuilder::with_capacity(values.len());
203 for v in values {
204 match v {
205 Some(Value::String(s)) => b.append_value(s),
206 Some(Value::Null) | None => b.append_null(),
207 other => {
208 return Err(DatasetError::SchemaParseError(format!(
209 "Expected string, got: {other:?}"
210 )))
211 }
212 }
213 }
214 Ok(Arc::new(b.finish()))
215 }
216
217 DataType::Utf8 => {
219 let mut b = StringBuilder::with_capacity(values.len(), values.len() * 8);
220 for v in values {
221 match v {
222 Some(Value::String(s)) => b.append_value(s),
223 Some(Value::Null) | None => b.append_null(),
224 other => {
225 return Err(DatasetError::SchemaParseError(format!(
226 "Expected string, got: {other:?}"
227 )))
228 }
229 }
230 }
231 Ok(Arc::new(b.finish()))
232 }
233
234 DataType::Boolean => {
235 let mut b = BooleanBuilder::with_capacity(values.len());
236 for v in values {
237 match v {
238 Some(Value::Bool(bv)) => b.append_value(*bv),
239 Some(Value::Null) | None => b.append_null(),
240 other => {
241 return Err(DatasetError::SchemaParseError(format!(
242 "Expected boolean, got: {other:?}"
243 )))
244 }
245 }
246 }
247 Ok(Arc::new(b.finish()))
248 }
249
250 DataType::Timestamp(TimeUnit::Microsecond, _) => {
251 let mut b = TimestampMicrosecondBuilder::with_capacity(values.len())
252 .with_timezone("UTC".to_string());
253 for v in values {
254 match v {
255 Some(Value::String(s)) => {
256 let ts = chrono::DateTime::parse_from_rfc3339(s)
257 .map_err(|e| {
258 DatasetError::SchemaParseError(format!(
259 "Cannot parse '{s}' as RFC3339 datetime: {e}"
260 ))
261 })?
262 .timestamp_micros();
263 b.append_value(ts);
264 }
265 Some(Value::Null) | None => b.append_null(),
266 other => {
267 return Err(DatasetError::SchemaParseError(format!(
268 "Expected datetime string, got: {other:?}"
269 )))
270 }
271 }
272 }
273 Ok(Arc::new(b.finish()))
274 }
275
276 DataType::Date32 => {
277 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("epoch is valid");
278 let mut b = Date32Builder::with_capacity(values.len());
279 for v in values {
280 match v {
281 Some(Value::String(s)) => {
282 let date = NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| {
283 DatasetError::SchemaParseError(format!(
284 "Cannot parse '{s}' as date (YYYY-MM-DD): {e}"
285 ))
286 })?;
287 let days = (date - epoch).num_days() as i32;
288 b.append_value(days);
289 }
290 Some(Value::Null) | None => b.append_null(),
291 other => {
292 return Err(DatasetError::SchemaParseError(format!(
293 "Expected date string, got: {other:?}"
294 )))
295 }
296 }
297 }
298 Ok(Arc::new(b.finish()))
299 }
300
301 DataType::Dictionary(key_type, value_type) => {
302 if key_type.as_ref() == &DataType::Int16 && value_type.as_ref() == &DataType::Utf8 {
303 let mut b: StringDictionaryBuilder<Int16Type> =
304 StringDictionaryBuilder::with_capacity(values.len(), 16, values.len() * 8);
305 for v in values {
306 match v {
307 Some(Value::String(s)) => {
308 b.append_value(s);
309 }
310 Some(Value::Null) | None => b.append_null(),
311 other => {
312 return Err(DatasetError::SchemaParseError(format!(
313 "Expected string for dictionary, got: {other:?}"
314 )))
315 }
316 }
317 }
318 Ok(Arc::new(b.finish()))
319 } else {
320 Err(DatasetError::UnsupportedType(format!(
321 "Dictionary({key_type:?}, {value_type:?}) — only Dictionary(Int16, Utf8) is supported"
322 )))
323 }
324 }
325
326 DataType::List(item_field) => {
327 let inner_builder = make_builder(item_field.data_type(), values.len())?;
328 let mut list_builder = ListBuilder::new(inner_builder);
329 for v in values {
330 match v {
331 Some(Value::Array(items)) => {
332 let inner = list_builder.values();
333 append_to_builder(inner, items, item_field.data_type())?;
334 list_builder.append(true);
335 }
336 Some(Value::Null) | None => {
337 list_builder.append_null();
338 }
339 other => {
340 return Err(DatasetError::SchemaParseError(format!(
341 "Expected array, got: {other:?}"
342 )))
343 }
344 }
345 }
346 Ok(Arc::new(list_builder.finish()))
347 }
348
349 DataType::Struct(fields) => build_struct_array(values, fields),
350
351 other => Err(DatasetError::UnsupportedType(format!(
352 "Arrow type {other} is not supported by DynamicBatchBuilder"
353 ))),
354 }
355}
356
357fn build_struct_array(values: &[Option<Value>], fields: &Fields) -> Result<ArrayRef, DatasetError> {
359 let mut sub_cols: Vec<Vec<Option<Value>>> =
361 vec![Vec::with_capacity(values.len()); fields.len()];
362
363 for v in values {
364 match v {
365 Some(Value::Object(obj)) => {
366 for (i, field) in fields.iter().enumerate() {
367 sub_cols[i].push(obj.get(field.name()).cloned());
368 }
369 }
370 Some(Value::Null) | None => {
371 for col in sub_cols.iter_mut() {
372 col.push(None);
373 }
374 }
375 other => {
376 return Err(DatasetError::SchemaParseError(format!(
377 "Expected JSON object for struct field, got: {other:?}"
378 )))
379 }
380 }
381 }
382
383 let sub_arrays: Vec<ArrayRef> = fields
384 .iter()
385 .enumerate()
386 .map(|(i, field)| build_array(&sub_cols[i], field.data_type()))
387 .collect::<Result<_, _>>()?;
388
389 let null_buffer: arrow::buffer::NullBuffer = values
391 .iter()
392 .map(|v| v.as_ref().map(|v| !v.is_null()).unwrap_or(false))
393 .collect();
394
395 let struct_array =
396 arrow::array::StructArray::new(fields.clone(), sub_arrays, Some(null_buffer));
397
398 Ok(Arc::new(struct_array))
399}
400
401fn make_builder(
404 data_type: &DataType,
405 capacity: usize,
406) -> Result<Box<dyn ArrayBuilder>, DatasetError> {
407 match data_type {
408 DataType::Int64 => Ok(Box::new(Int64Builder::with_capacity(capacity))),
409 DataType::Float64 => Ok(Box::new(Float64Builder::with_capacity(capacity))),
410 DataType::Utf8View => Ok(Box::new(StringViewBuilder::with_capacity(capacity))),
411 DataType::Utf8 => Ok(Box::new(StringBuilder::with_capacity(
412 capacity,
413 capacity * 8,
414 ))),
415 DataType::Boolean => Ok(Box::new(BooleanBuilder::with_capacity(capacity))),
416 DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(Box::new(
417 TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC".to_string()),
418 )),
419 DataType::Date32 => Ok(Box::new(Date32Builder::with_capacity(capacity))),
420 other => Err(DatasetError::UnsupportedType(format!(
421 "Cannot create list item builder for {other}"
422 ))),
423 }
424}
425
426fn append_to_builder(
429 builder: &mut dyn ArrayBuilder,
430 items: &[Value],
431 data_type: &DataType,
432) -> Result<(), DatasetError> {
433 match data_type {
434 DataType::Int64 => {
435 let b = builder
436 .as_any_mut()
437 .downcast_mut::<Int64Builder>()
438 .ok_or_else(|| {
439 DatasetError::SchemaParseError(
440 "Internal error: builder type mismatch for Int64".to_string(),
441 )
442 })?;
443 for v in items {
444 match v {
445 Value::Number(n) => b.append_value(n.as_i64().ok_or_else(|| {
446 DatasetError::SchemaParseError(format!("Cannot coerce {n} to Int64"))
447 })?),
448 Value::Null => b.append_null(),
449 other => {
450 return Err(DatasetError::SchemaParseError(format!(
451 "Expected integer in list, got: {other:?}"
452 )))
453 }
454 }
455 }
456 }
457 DataType::Float64 => {
458 let b = builder
459 .as_any_mut()
460 .downcast_mut::<Float64Builder>()
461 .ok_or_else(|| {
462 DatasetError::SchemaParseError(
463 "Internal error: builder type mismatch for Float64".to_string(),
464 )
465 })?;
466 for v in items {
467 match v {
468 Value::Number(n) => b.append_value(n.as_f64().ok_or_else(|| {
469 DatasetError::SchemaParseError(format!("Cannot coerce {n} to Float64"))
470 })?),
471 Value::Null => b.append_null(),
472 other => {
473 return Err(DatasetError::SchemaParseError(format!(
474 "Expected number in list, got: {other:?}"
475 )))
476 }
477 }
478 }
479 }
480 DataType::Utf8View => {
481 let b = builder
482 .as_any_mut()
483 .downcast_mut::<StringViewBuilder>()
484 .ok_or_else(|| {
485 DatasetError::SchemaParseError(
486 "Internal error: builder type mismatch for Utf8View".to_string(),
487 )
488 })?;
489 for v in items {
490 match v {
491 Value::String(s) => b.append_value(s),
492 Value::Null => b.append_null(),
493 other => {
494 return Err(DatasetError::SchemaParseError(format!(
495 "Expected string in list, got: {other:?}"
496 )))
497 }
498 }
499 }
500 }
501 DataType::Boolean => {
502 let b = builder
503 .as_any_mut()
504 .downcast_mut::<BooleanBuilder>()
505 .ok_or_else(|| {
506 DatasetError::SchemaParseError(
507 "Internal error: builder type mismatch for Boolean".to_string(),
508 )
509 })?;
510 for v in items {
511 match v {
512 Value::Bool(bv) => b.append_value(*bv),
513 Value::Null => b.append_null(),
514 other => {
515 return Err(DatasetError::SchemaParseError(format!(
516 "Expected boolean in list, got: {other:?}"
517 )))
518 }
519 }
520 }
521 }
522 other => {
523 return Err(DatasetError::UnsupportedType(format!(
524 "List item type {other} is not supported"
525 )))
526 }
527 }
528 Ok(())
529}
530
531#[cfg(test)]
536mod tests {
537 use super::*;
538 use crate::dataset::schema::{
539 inject_system_columns, json_schema_to_arrow, SCOUTER_BATCH_ID, SCOUTER_CREATED_AT,
540 SCOUTER_PARTITION_DATE,
541 };
542 use arrow::array::{
543 Array, BooleanArray, Date32Array, Float64Array, Int64Array, TimestampMicrosecondArray,
544 };
545 use arrow::datatypes::DataType;
546
547 fn schema_from_json(json: &str) -> SchemaRef {
548 let schema = json_schema_to_arrow(json).unwrap();
549 Arc::new(inject_system_columns(schema).unwrap())
550 }
551
552 fn flat_schema() -> SchemaRef {
553 schema_from_json(
554 r#"{
555 "type": "object",
556 "properties": {
557 "user_id": {"type": "string"},
558 "value": {"type": "number"},
559 "count": {"type": "integer"},
560 "active": {"type": "boolean"}
561 },
562 "required": ["user_id", "value", "count", "active"]
563 }"#,
564 )
565 }
566
567 #[test]
568 fn test_flat_types_round_trip() {
569 let schema = flat_schema();
570 let mut b = DynamicBatchBuilder::new(schema.clone());
571 b.append_json_row(r#"{"user_id":"alice","value":1.5,"count":3,"active":true}"#)
572 .unwrap();
573 b.append_json_row(r#"{"user_id":"bob","value":2.0,"count":7,"active":false}"#)
574 .unwrap();
575 assert_eq!(b.row_count(), 2);
576
577 let batch = b.finish().unwrap();
578 assert_eq!(batch.num_rows(), 2);
579 assert_eq!(batch.schema(), schema);
580
581 let val_col = batch
583 .column_by_name("value")
584 .unwrap()
585 .as_any()
586 .downcast_ref::<Float64Array>()
587 .unwrap();
588 assert!((val_col.value(0) - 1.5).abs() < f64::EPSILON);
589
590 let cnt_col = batch
591 .column_by_name("count")
592 .unwrap()
593 .as_any()
594 .downcast_ref::<Int64Array>()
595 .unwrap();
596 assert_eq!(cnt_col.value(1), 7);
597
598 let active_col = batch
599 .column_by_name("active")
600 .unwrap()
601 .as_any()
602 .downcast_ref::<BooleanArray>()
603 .unwrap();
604 assert!(!active_col.value(1));
605 }
606
607 #[test]
608 fn test_system_columns_injected() {
609 let schema = flat_schema();
610 let mut b = DynamicBatchBuilder::new(schema);
611 b.append_json_row(r#"{"user_id":"x","value":0.0,"count":0,"active":false}"#)
612 .unwrap();
613 let batch = b.finish().unwrap();
614
615 let ts = batch
617 .column_by_name(SCOUTER_CREATED_AT)
618 .unwrap()
619 .as_any()
620 .downcast_ref::<TimestampMicrosecondArray>()
621 .unwrap();
622 assert!(ts.value(0) > 0);
623
624 let date = batch
626 .column_by_name(SCOUTER_PARTITION_DATE)
627 .unwrap()
628 .as_any()
629 .downcast_ref::<Date32Array>()
630 .unwrap();
631 assert!(date.value(0) > 0);
633
634 let ids = batch.column_by_name(SCOUTER_BATCH_ID).unwrap();
636 assert_eq!(ids.len(), 1);
637 assert!(!ids.is_null(0));
638 }
639
640 #[test]
641 fn test_batch_id_shared_across_rows() {
642 let schema = flat_schema();
643 let mut b = DynamicBatchBuilder::new(schema);
644 for _ in 0..5 {
645 b.append_json_row(r#"{"user_id":"u","value":0.0,"count":0,"active":true}"#)
646 .unwrap();
647 }
648 let batch = b.finish().unwrap();
649 let ids: Vec<String> = (0..5)
650 .map(|i| {
651 arrow::array::as_string_array(batch.column_by_name(SCOUTER_BATCH_ID).unwrap())
652 .value(i)
653 .to_string()
654 })
655 .collect();
656 assert!(ids.windows(2).all(|w| w[0] == w[1]));
658 assert_eq!(ids[0].len(), 36);
660 }
661
662 #[test]
663 fn test_nullable_fields() {
664 let schema = schema_from_json(
665 r#"{
666 "type": "object",
667 "properties": {
668 "name": {"type": "string"},
669 "age": {"anyOf": [{"type": "integer"}, {"type": "null"}]}
670 },
671 "required": ["name"]
672 }"#,
673 );
674 let mut b = DynamicBatchBuilder::new(schema);
675 b.append_json_row(r#"{"name":"alice","age":30}"#).unwrap();
676 b.append_json_row(r#"{"name":"bob","age":null}"#).unwrap();
677 b.append_json_row(r#"{"name":"carol"}"#).unwrap(); let batch = b.finish().unwrap();
680 let age = batch
681 .column_by_name("age")
682 .unwrap()
683 .as_any()
684 .downcast_ref::<Int64Array>()
685 .unwrap();
686 assert_eq!(age.value(0), 30);
687 assert!(age.is_null(1));
688 assert!(age.is_null(2));
689 }
690
691 #[test]
692 fn test_timestamp_parsing() {
693 let schema = schema_from_json(
694 r#"{
695 "type": "object",
696 "properties": {
697 "ts": {"type": "string", "format": "date-time"}
698 },
699 "required": ["ts"]
700 }"#,
701 );
702 let mut b = DynamicBatchBuilder::new(schema);
703 b.append_json_row(r#"{"ts":"2024-06-01T12:00:00Z"}"#)
704 .unwrap();
705 let batch = b.finish().unwrap();
706 let ts = batch
707 .column_by_name("ts")
708 .unwrap()
709 .as_any()
710 .downcast_ref::<TimestampMicrosecondArray>()
711 .unwrap();
712 assert_eq!(ts.value(0), 1_717_243_200_000_000);
714 }
715
716 #[test]
717 fn test_date_parsing() {
718 let schema = schema_from_json(
719 r#"{
720 "type": "object",
721 "properties": {
722 "d": {"type": "string", "format": "date"}
723 },
724 "required": ["d"]
725 }"#,
726 );
727 let mut b = DynamicBatchBuilder::new(schema);
728 b.append_json_row(r#"{"d":"1970-01-02"}"#).unwrap();
729 let batch = b.finish().unwrap();
730 let dates = batch
731 .column_by_name("d")
732 .unwrap()
733 .as_any()
734 .downcast_ref::<Date32Array>()
735 .unwrap();
736 assert_eq!(dates.value(0), 1); }
738
739 #[test]
740 fn test_nested_struct() {
741 let schema = schema_from_json(
742 r##"{
743 "type": "object",
744 "properties": {
745 "id": {"type": "string"},
746 "addr": {"$ref": "#/$defs/Addr"}
747 },
748 "required": ["id", "addr"],
749 "$defs": {
750 "Addr": {
751 "type": "object",
752 "properties": {
753 "city": {"type": "string"},
754 "zip": {"type": "string"}
755 },
756 "required": ["city", "zip"]
757 }
758 }
759 }"##,
760 );
761 let mut b = DynamicBatchBuilder::new(schema);
762 b.append_json_row(r#"{"id":"1","addr":{"city":"NYC","zip":"10001"}}"#)
763 .unwrap();
764 let batch = b.finish().unwrap();
765 let addr_col = batch.column_by_name("addr").unwrap();
766 assert!(matches!(addr_col.data_type(), DataType::Struct(_)));
767 assert!(!addr_col.is_null(0));
768 }
769
770 #[test]
771 fn test_list_field() {
772 let schema = schema_from_json(
773 r#"{
774 "type": "object",
775 "properties": {
776 "scores": {"type": "array", "items": {"type": "number"}}
777 },
778 "required": ["scores"]
779 }"#,
780 );
781 let mut b = DynamicBatchBuilder::new(schema);
782 b.append_json_row(r#"{"scores":[1.0,2.5,3.0]}"#).unwrap();
783 let batch = b.finish().unwrap();
784 let scores = batch.column_by_name("scores").unwrap();
785 assert!(matches!(scores.data_type(), DataType::List(_)));
786 assert_eq!(scores.len(), 1);
787 }
788
789 #[test]
790 fn test_dictionary_field() {
791 let schema = schema_from_json(
792 r#"{
793 "type": "object",
794 "properties": {
795 "status": {"enum": ["active","inactive"]}
796 },
797 "required": ["status"]
798 }"#,
799 );
800 let mut b = DynamicBatchBuilder::new(schema);
801 b.append_json_row(r#"{"status":"active"}"#).unwrap();
802 b.append_json_row(r#"{"status":"inactive"}"#).unwrap();
803 let batch = b.finish().unwrap();
804 let status = batch.column_by_name("status").unwrap();
805 assert!(matches!(status.data_type(), DataType::Dictionary(_, _)));
806 }
807
808 #[test]
809 fn test_empty_builder_finish() {
810 let schema = flat_schema();
811 let b = DynamicBatchBuilder::new(schema.clone());
812 assert!(b.is_empty());
813 let batch = b.finish().unwrap();
814 assert_eq!(batch.num_rows(), 0);
815 assert_eq!(batch.schema(), schema);
816 }
817
818 #[test]
819 fn test_malformed_json_error() {
820 let schema = flat_schema();
821 let mut b = DynamicBatchBuilder::new(schema);
822 let err = b.append_json_row("{not valid json}").unwrap_err();
823 assert!(matches!(err, DatasetError::SerializationError(_)));
824 }
825
826 #[test]
827 fn test_non_object_json_error() {
828 let schema = flat_schema();
829 let mut b = DynamicBatchBuilder::new(schema);
830 let err = b
831 .append_json_row(r#"["array","not","object"]"#)
832 .unwrap_err();
833 assert!(matches!(err, DatasetError::SchemaParseError(_)));
834 }
835
836 #[test]
837 fn test_type_mismatch_int_error() {
838 let schema = flat_schema();
839 let mut b = DynamicBatchBuilder::new(schema);
840 b.append_json_row(r#"{"user_id":"u","value":1.0,"count":"bad","active":true}"#)
842 .unwrap(); let err = b.finish().unwrap_err();
846 assert!(matches!(err, DatasetError::SchemaParseError(_)));
847 }
848
849 #[test]
850 fn test_row_count_matches() {
851 let schema = flat_schema();
852 let mut b = DynamicBatchBuilder::new(schema);
853 for i in 0..42 {
854 b.append_json_row(&format!(
855 r#"{{"user_id":"u{i}","value":{i}.0,"count":{i},"active":true}}"#
856 ))
857 .unwrap();
858 }
859 assert_eq!(b.row_count(), 42);
860 let batch = b.finish().unwrap();
861 assert_eq!(batch.num_rows(), 42);
862 }
863}