1pub mod aggregate_bridge;
70mod bridge;
71mod channel_source;
72pub mod complex_type_lambda;
74pub mod complex_type_udf;
76mod exec;
77pub mod execute;
79pub mod format_bridge_udf;
81pub mod json_extensions;
83pub mod json_path;
85pub mod json_tvf;
87pub mod json_types;
89pub mod json_udaf;
91pub mod json_udf;
93pub mod lookup_join;
95pub mod lookup_join_exec;
97pub mod proctime_udf;
99mod source;
100mod table_provider;
101pub mod watermark_filter;
103pub mod watermark_udf;
105pub mod window_udf;
107
108pub use aggregate_bridge::{
109 create_aggregate_factory, lookup_aggregate_udf, result_to_scalar_value, scalar_value_to_result,
110 DataFusionAccumulatorAdapter, DataFusionAggregateFactory,
111};
112pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
113pub use channel_source::ChannelStreamSource;
114pub use complex_type_lambda::{
115 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
116 MapTransformValues,
117};
118pub use complex_type_udf::{
119 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
120 StructExtract, StructMerge, StructRename, StructSet,
121};
122pub use exec::StreamingScanExec;
123pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
124pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
125pub use json_extensions::{
126 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
127 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
128};
129pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
130pub use json_tvf::{
131 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
132 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
133};
134pub use json_udaf::{JsonAgg, JsonObjectAgg};
135pub use json_udf::{
136 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
137 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
138 JsonbGetText, JsonbGetTextIdx, ToJsonb,
139};
140pub use lookup_join_exec::{
141 LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
142 PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
143 VersionedLookupState,
144};
145pub use proctime_udf::ProcTimeUdf;
146pub use source::{SortColumn, StreamSource, StreamSourceRef};
147pub use table_provider::StreamingTableProvider;
148pub use watermark_filter::WatermarkDynamicFilter;
149pub use watermark_udf::WatermarkUdf;
150pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
151
152use std::sync::atomic::AtomicI64;
153use std::sync::Arc;
154
155use datafusion::execution::SessionStateBuilder;
156use datafusion::prelude::*;
157use datafusion_expr::ScalarUDF;
158
159use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
160
161#[must_use]
168pub fn base_session_config() -> SessionConfig {
169 let mut config = SessionConfig::new();
170 config.options_mut().sql_parser.enable_ident_normalization = false;
171 config
172}
173
174#[must_use]
180pub fn create_session_context() -> SessionContext {
181 SessionContext::new_with_config(base_session_config())
182}
183
184#[must_use]
205pub fn create_streaming_context() -> SessionContext {
206 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
207}
208
209#[must_use]
217pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
218 let config = base_session_config()
219 .with_batch_size(8192)
220 .with_target_partitions(1); let ctx = if matches!(mode, StreamingValidatorMode::Off) {
223 SessionContext::new_with_config(config)
224 } else {
225 let default_state = SessionStateBuilder::new()
229 .with_config(config.clone())
230 .with_default_features()
231 .build();
232 let mut rules: Vec<
233 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
234 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
235 rules.extend(default_state.physical_optimizers().iter().cloned());
236
237 let state = SessionStateBuilder::new()
238 .with_config(config)
239 .with_default_features()
240 .with_physical_optimizer_rules(rules)
241 .build();
242 SessionContext::new_with_state(state)
243 };
244
245 register_streaming_functions(&ctx);
246 ctx
247}
248
249pub fn register_streaming_functions(ctx: &SessionContext) {
260 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
261 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
262 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
263 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
264 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
265 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
266 register_json_functions(ctx);
267 register_json_extensions(ctx);
268 register_complex_type_functions(ctx);
269 register_lambda_functions(ctx);
270}
271
272pub fn register_streaming_functions_with_watermark(
283 ctx: &SessionContext,
284 watermark_ms: Arc<AtomicI64>,
285) {
286 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
287 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
288 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
289 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
290 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
291 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
292 register_json_functions(ctx);
293 register_json_extensions(ctx);
294 register_complex_type_functions(ctx);
295 register_lambda_functions(ctx);
296}
297
298pub fn register_json_functions(ctx: &SessionContext) {
301 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
303 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
304 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
305 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
306 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
307 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
308
309 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
311 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
312 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
313
314 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
316 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
317
318 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
320 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
321 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
322 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
323
324 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
326 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
327 JsonObjectAgg::new(),
328 ));
329
330 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
332 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
333 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
334 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
335
336 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
338 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
339
340 register_json_table_functions(ctx);
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use arrow_array::{Float64Array, Int64Array, RecordBatch};
348 use arrow_schema::{DataType, Field, Schema};
349 use datafusion::execution::FunctionRegistry;
350 use futures::StreamExt;
351 use std::sync::Arc;
352
353 fn test_schema() -> Arc<Schema> {
354 Arc::new(Schema::new(vec![
355 Field::new("id", DataType::Int64, false),
356 Field::new("value", DataType::Float64, true),
357 ]))
358 }
359
360 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
362 source.take_sender().expect("sender already taken")
363 }
364
365 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
366 RecordBatch::try_new(
367 Arc::clone(schema),
368 vec![
369 Arc::new(Int64Array::from(ids)),
370 Arc::new(Float64Array::from(values)),
371 ],
372 )
373 .unwrap()
374 }
375
376 #[test]
377 fn test_create_streaming_context() {
378 let ctx = create_streaming_context();
379 let state = ctx.state();
380 let config = state.config();
381
382 assert_eq!(config.batch_size(), 8192);
383 assert_eq!(config.target_partitions(), 1);
384 }
385
386 #[tokio::test]
387 async fn test_full_query_pipeline() {
388 let ctx = create_streaming_context();
389 let schema = test_schema();
390
391 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
393 let sender = take_test_sender(&source);
394 let provider = StreamingTableProvider::new("events", source);
395 ctx.register_table("events", Arc::new(provider)).unwrap();
396
397 sender
399 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
400 .await
401 .unwrap();
402 sender
403 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
404 .await
405 .unwrap();
406 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
410 let batches = df.collect().await.unwrap();
411
412 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
414 assert_eq!(total_rows, 5);
415 }
416
417 #[tokio::test]
418 async fn test_query_with_projection() {
419 let ctx = create_streaming_context();
420 let schema = test_schema();
421
422 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
423 let sender = take_test_sender(&source);
424 let provider = StreamingTableProvider::new("events", source);
425 ctx.register_table("events", Arc::new(provider)).unwrap();
426
427 sender
428 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
429 .await
430 .unwrap();
431 drop(sender);
432
433 let df = ctx.sql("SELECT id FROM events").await.unwrap();
435 let batches = df.collect().await.unwrap();
436
437 assert_eq!(batches.len(), 1);
438 assert_eq!(batches[0].num_columns(), 1);
439 assert_eq!(batches[0].schema().field(0).name(), "id");
440 }
441
442 #[tokio::test]
443 async fn test_query_with_filter() {
444 let ctx = create_streaming_context();
445 let schema = test_schema();
446
447 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
448 let sender = take_test_sender(&source);
449 let provider = StreamingTableProvider::new("events", source);
450 ctx.register_table("events", Arc::new(provider)).unwrap();
451
452 sender
453 .send(test_batch(
454 &schema,
455 vec![1, 2, 3, 4, 5],
456 vec![10.0, 20.0, 30.0, 40.0, 50.0],
457 ))
458 .await
459 .unwrap();
460 drop(sender);
461
462 let df = ctx
464 .sql("SELECT * FROM events WHERE value > 25")
465 .await
466 .unwrap();
467 let batches = df.collect().await.unwrap();
468
469 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
470 assert_eq!(total_rows, 3); }
472
473 #[tokio::test]
474 async fn test_unbounded_aggregation_rejected() {
475 let ctx = create_streaming_context();
478 let schema = test_schema();
479
480 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
481 let sender = take_test_sender(&source);
482 let provider = StreamingTableProvider::new("events", source);
483 ctx.register_table("events", Arc::new(provider)).unwrap();
484
485 sender
486 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
487 .await
488 .unwrap();
489 drop(sender);
490
491 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
493
494 let result = df.collect().await;
496 assert!(
497 result.is_err(),
498 "Aggregation on unbounded stream should fail"
499 );
500 }
501
502 #[tokio::test]
503 async fn test_query_with_order_by() {
504 let ctx = create_streaming_context();
505 let schema = test_schema();
506
507 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
508 let sender = take_test_sender(&source);
509 let provider = StreamingTableProvider::new("events", source);
510 ctx.register_table("events", Arc::new(provider)).unwrap();
511
512 sender
513 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
514 .await
515 .unwrap();
516 drop(sender);
517
518 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
520 let batches = df.collect().await.unwrap();
521
522 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
524 assert_eq!(total_rows, 3);
525 }
526
527 #[tokio::test]
528 async fn test_bridge_throughput() {
529 let schema = test_schema();
531 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
532 let sender = bridge.sender();
533 let mut stream = bridge.into_stream();
534
535 let batch_count = 1000;
536 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
537
538 let send_task = tokio::spawn(async move {
540 for _ in 0..batch_count {
541 sender.send(batch.clone()).await.unwrap();
542 }
543 });
544
545 let mut received = 0;
547 while let Some(result) = stream.next().await {
548 result.unwrap();
549 received += 1;
550 if received == batch_count {
551 break;
552 }
553 }
554
555 send_task.await.unwrap();
556 assert_eq!(received, batch_count);
557 }
558
559 #[test]
562 fn test_streaming_functions_registered() {
563 let ctx = create_streaming_context();
564 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
566 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
567 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
568 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
569 }
570
571 #[test]
572 fn test_streaming_functions_with_watermark() {
573 use std::sync::atomic::AtomicI64;
574
575 let ctx = create_session_context();
576 let wm = Arc::new(AtomicI64::new(42_000));
577 register_streaming_functions_with_watermark(&ctx, wm);
578
579 assert!(ctx.udf("tumble").is_ok());
580 assert!(ctx.udf("watermark").is_ok());
581 }
582
583 #[tokio::test]
584 async fn test_tumble_udf_via_datafusion() {
585 use arrow_array::TimestampMillisecondArray;
586 use arrow_schema::TimeUnit;
587
588 let ctx = create_streaming_context();
589
590 let schema = Arc::new(Schema::new(vec![
592 Field::new(
593 "event_time",
594 DataType::Timestamp(TimeUnit::Millisecond, None),
595 false,
596 ),
597 Field::new("value", DataType::Float64, false),
598 ]));
599
600 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
601 let sender = take_test_sender(&source);
602 let provider = StreamingTableProvider::new("events", source);
603 ctx.register_table("events", Arc::new(provider)).unwrap();
604
605 let batch = RecordBatch::try_new(
609 Arc::clone(&schema),
610 vec![
611 Arc::new(TimestampMillisecondArray::from(vec![
612 60_000i64, 120_000, 360_000,
613 ])),
614 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
615 ],
616 )
617 .unwrap();
618 sender.send(batch).await.unwrap();
619 drop(sender);
620
621 let df = ctx
624 .sql(
625 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
626 value \
627 FROM events",
628 )
629 .await
630 .unwrap();
631
632 let batches = df.collect().await.unwrap();
633 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
634 assert_eq!(total_rows, 3);
635
636 let ws_col = batches[0]
638 .column(0)
639 .as_any()
640 .downcast_ref::<TimestampMillisecondArray>()
641 .expect("window_start should be TimestampMillisecond");
642 assert_eq!(ws_col.value(0), 0);
644 assert_eq!(ws_col.value(1), 0);
645 assert_eq!(ws_col.value(2), 300_000);
647 }
648
649 #[tokio::test]
650 async fn test_logical_plan_from_windowed_query() {
651 use arrow_schema::TimeUnit;
652
653 let ctx = create_streaming_context();
654
655 let schema = Arc::new(Schema::new(vec![
656 Field::new(
657 "event_time",
658 DataType::Timestamp(TimeUnit::Millisecond, None),
659 false,
660 ),
661 Field::new("value", DataType::Float64, false),
662 ]));
663
664 let source = Arc::new(ChannelStreamSource::new(schema));
665 let _sender = source.take_sender();
666 let provider = StreamingTableProvider::new("events", source);
667 ctx.register_table("events", Arc::new(provider)).unwrap();
668
669 let df = ctx
671 .sql(
672 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
673 COUNT(*) as cnt \
674 FROM events \
675 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
676 )
677 .await;
678
679 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
681 }
682
683 #[tokio::test]
684 async fn test_end_to_end_execute_streaming_sql() {
685 use crate::planner::StreamingPlanner;
686
687 let ctx = create_streaming_context();
688
689 let schema = Arc::new(Schema::new(vec![
690 Field::new("id", DataType::Int64, false),
691 Field::new("name", DataType::Utf8, true),
692 ]));
693
694 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
695 let sender = take_test_sender(&source);
696 let provider = StreamingTableProvider::new("items", source);
697 ctx.register_table("items", Arc::new(provider)).unwrap();
698
699 let batch = RecordBatch::try_new(
700 Arc::clone(&schema),
701 vec![
702 Arc::new(Int64Array::from(vec![1, 2, 3])),
703 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
704 ],
705 )
706 .unwrap();
707 sender.send(batch).await.unwrap();
708 drop(sender);
709
710 let mut planner = StreamingPlanner::new();
711 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
712 .await
713 .unwrap();
714
715 match result {
716 StreamingSqlResult::Query(qr) => {
717 let mut stream = qr.stream;
718 let mut total = 0;
719 while let Some(batch) = stream.next().await {
720 total += batch.unwrap().num_rows();
721 }
722 assert_eq!(total, 2); }
724 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
725 }
726 }
727
728 #[tokio::test]
729 async fn test_watermark_function_in_filter() {
730 use arrow_array::TimestampMillisecondArray;
731 use arrow_schema::TimeUnit;
732 use std::sync::atomic::AtomicI64;
733
734 let config = base_session_config()
736 .with_batch_size(8192)
737 .with_target_partitions(1);
738 let ctx = SessionContext::new_with_config(config);
739 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
741
742 let schema = Arc::new(Schema::new(vec![
743 Field::new(
744 "event_time",
745 DataType::Timestamp(TimeUnit::Millisecond, None),
746 false,
747 ),
748 Field::new("value", DataType::Float64, false),
749 ]));
750
751 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
752 let sender = take_test_sender(&source);
753 let provider = StreamingTableProvider::new("events", source);
754 ctx.register_table("events", Arc::new(provider)).unwrap();
755
756 let batch = RecordBatch::try_new(
758 Arc::clone(&schema),
759 vec![
760 Arc::new(TimestampMillisecondArray::from(vec![
761 100_000i64, 200_000, 300_000,
762 ])),
763 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
764 ],
765 )
766 .unwrap();
767 sender.send(batch).await.unwrap();
768 drop(sender);
769
770 let df = ctx
772 .sql("SELECT value FROM events WHERE event_time > watermark()")
773 .await
774 .unwrap();
775 let batches = df.collect().await.unwrap();
776 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
777 assert_eq!(total_rows, 1);
779 }
780
781 #[tokio::test]
782 async fn test_date_trunc_available() {
783 let ctx = create_streaming_context();
784 let df = ctx
785 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
786 .await
787 .unwrap();
788 let batches = df.collect().await.unwrap();
789 assert_eq!(batches.len(), 1);
790 assert_eq!(batches[0].num_rows(), 1);
791 }
792
793 #[tokio::test]
794 async fn test_date_bin_available() {
795 let ctx = create_streaming_context();
796 let df = ctx
797 .sql(
798 "SELECT date_bin(\
799 INTERVAL '15 minutes', \
800 TIMESTAMP '2026-01-15 14:32:00', \
801 TIMESTAMP '2026-01-01 00:00:00')",
802 )
803 .await
804 .unwrap();
805 let batches = df.collect().await.unwrap();
806 assert_eq!(batches.len(), 1);
807 assert_eq!(batches[0].num_rows(), 1);
808 }
809
810 #[tokio::test]
811 async fn test_unnest_literal_array() {
812 let ctx = create_streaming_context();
813 let df = ctx
814 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
815 .await
816 .unwrap();
817 let batches = df.collect().await.unwrap();
818 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
819 assert_eq!(total_rows, 3);
820 }
821
822 #[tokio::test]
823 async fn test_unnest_from_table_with_array_col() {
824 let ctx = create_streaming_context();
825 ctx.sql(
827 "CREATE TABLE arr_table (id INT, tags INT[]) \
828 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
829 )
830 .await
831 .unwrap();
832 let df = ctx
833 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
834 .await
835 .unwrap();
836 let batches = df.collect().await.unwrap();
837 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
838 assert_eq!(total_rows, 3);
840 }
841}