1pub mod aggregate_bridge;
70mod bridge;
71mod channel_source;
72pub mod complex_type_lambda;
74pub mod complex_type_udf;
76mod exec;
77pub mod execute;
79pub mod format_bridge_udf;
81pub mod json_extensions;
83pub mod json_path;
85pub mod json_tvf;
87pub mod json_types;
89pub mod json_udaf;
91pub mod json_udf;
93pub mod live_source;
95pub mod lookup_join;
97pub mod lookup_join_exec;
99pub mod proctime_udf;
101mod source;
102mod table_provider;
103pub mod watermark_filter;
105pub mod watermark_udf;
107pub mod window_udf;
109
110pub use aggregate_bridge::{
111 create_aggregate_factory, lookup_aggregate_udf, result_to_scalar_value, scalar_value_to_result,
112 DataFusionAccumulatorAdapter, DataFusionAggregateFactory,
113};
114pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
115pub use channel_source::ChannelStreamSource;
116pub use complex_type_lambda::{
117 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
118 MapTransformValues,
119};
120pub use complex_type_udf::{
121 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
122 StructExtract, StructMerge, StructRename, StructSet,
123};
124pub use exec::StreamingScanExec;
125pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
126pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
127pub use json_extensions::{
128 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
129 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
130};
131pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
132pub use json_tvf::{
133 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
134 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
135};
136pub use json_udaf::{JsonAgg, JsonObjectAgg};
137pub use json_udf::{
138 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
139 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
140 JsonbGetText, JsonbGetTextIdx, ToJsonb,
141};
142pub use live_source::{LiveSourceHandle, LiveSourceProvider};
143pub use lookup_join_exec::{
144 LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
145 PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
146 VersionedLookupState,
147};
148pub use proctime_udf::ProcTimeUdf;
149pub use source::{SortColumn, StreamSource, StreamSourceRef};
150pub use table_provider::StreamingTableProvider;
151pub use watermark_filter::WatermarkDynamicFilter;
152pub use watermark_udf::WatermarkUdf;
153pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
154
155use std::sync::atomic::AtomicI64;
156use std::sync::Arc;
157
158use datafusion::execution::SessionStateBuilder;
159use datafusion::prelude::*;
160use datafusion_expr::ScalarUDF;
161
162use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
163
164#[must_use]
171pub fn base_session_config() -> SessionConfig {
172 let mut config = SessionConfig::new();
173 config.options_mut().sql_parser.enable_ident_normalization = false;
174 config = config.with_target_partitions(1);
178 config
179}
180
181#[must_use]
187pub fn create_session_context() -> SessionContext {
188 SessionContext::new_with_config(base_session_config())
189}
190
191#[must_use]
212pub fn create_streaming_context() -> SessionContext {
213 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
214}
215
216#[must_use]
224pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
225 let config = base_session_config().with_batch_size(8192);
226
227 let ctx = if matches!(mode, StreamingValidatorMode::Off) {
228 SessionContext::new_with_config(config)
229 } else {
230 let default_state = SessionStateBuilder::new()
234 .with_config(config.clone())
235 .with_default_features()
236 .build();
237 let mut rules: Vec<
238 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
239 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
240 rules.extend(default_state.physical_optimizers().iter().cloned());
241
242 let state = SessionStateBuilder::new()
243 .with_config(config)
244 .with_default_features()
245 .with_physical_optimizer_rules(rules)
246 .build();
247 SessionContext::new_with_state(state)
248 };
249
250 register_streaming_functions(&ctx);
251 ctx
252}
253
254pub fn register_streaming_functions(ctx: &SessionContext) {
265 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
266 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
267 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
268 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
269 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
270 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
271 register_json_functions(ctx);
272 register_json_extensions(ctx);
273 register_complex_type_functions(ctx);
274 register_lambda_functions(ctx);
275}
276
277pub fn register_streaming_functions_with_watermark(
288 ctx: &SessionContext,
289 watermark_ms: Arc<AtomicI64>,
290) {
291 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
292 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
293 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
294 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
295 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
296 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
297 register_json_functions(ctx);
298 register_json_extensions(ctx);
299 register_complex_type_functions(ctx);
300 register_lambda_functions(ctx);
301}
302
303pub fn register_json_functions(ctx: &SessionContext) {
306 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
308 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
309 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
310 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
311 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
312 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
313
314 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
316 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
317 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
318
319 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
321 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
322
323 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
325 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
326 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
327 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
328
329 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
331 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
332 JsonObjectAgg::new(),
333 ));
334
335 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
337 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
338 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
339 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
340
341 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
343 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
344
345 register_json_table_functions(ctx);
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352 use arrow_array::{Float64Array, Int64Array, RecordBatch};
353 use arrow_schema::{DataType, Field, Schema};
354 use datafusion::execution::FunctionRegistry;
355 use futures::StreamExt;
356 use std::sync::Arc;
357
358 fn test_schema() -> Arc<Schema> {
359 Arc::new(Schema::new(vec![
360 Field::new("id", DataType::Int64, false),
361 Field::new("value", DataType::Float64, true),
362 ]))
363 }
364
365 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
367 source.take_sender().expect("sender already taken")
368 }
369
370 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
371 RecordBatch::try_new(
372 Arc::clone(schema),
373 vec![
374 Arc::new(Int64Array::from(ids)),
375 Arc::new(Float64Array::from(values)),
376 ],
377 )
378 .unwrap()
379 }
380
381 #[test]
382 fn test_create_streaming_context() {
383 let ctx = create_streaming_context();
384 let state = ctx.state();
385 let config = state.config();
386
387 assert_eq!(config.batch_size(), 8192);
388 assert_eq!(config.target_partitions(), 1);
389 }
390
391 #[tokio::test]
392 async fn test_full_query_pipeline() {
393 let ctx = create_streaming_context();
394 let schema = test_schema();
395
396 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
398 let sender = take_test_sender(&source);
399 let provider = StreamingTableProvider::new("events", source);
400 ctx.register_table("events", Arc::new(provider)).unwrap();
401
402 sender
404 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
405 .await
406 .unwrap();
407 sender
408 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
409 .await
410 .unwrap();
411 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
415 let batches = df.collect().await.unwrap();
416
417 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
419 assert_eq!(total_rows, 5);
420 }
421
422 #[tokio::test]
423 async fn test_query_with_projection() {
424 let ctx = create_streaming_context();
425 let schema = test_schema();
426
427 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
428 let sender = take_test_sender(&source);
429 let provider = StreamingTableProvider::new("events", source);
430 ctx.register_table("events", Arc::new(provider)).unwrap();
431
432 sender
433 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
434 .await
435 .unwrap();
436 drop(sender);
437
438 let df = ctx.sql("SELECT id FROM events").await.unwrap();
440 let batches = df.collect().await.unwrap();
441
442 assert_eq!(batches.len(), 1);
443 assert_eq!(batches[0].num_columns(), 1);
444 assert_eq!(batches[0].schema().field(0).name(), "id");
445 }
446
447 #[tokio::test]
448 async fn test_query_with_filter() {
449 let ctx = create_streaming_context();
450 let schema = test_schema();
451
452 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
453 let sender = take_test_sender(&source);
454 let provider = StreamingTableProvider::new("events", source);
455 ctx.register_table("events", Arc::new(provider)).unwrap();
456
457 sender
458 .send(test_batch(
459 &schema,
460 vec![1, 2, 3, 4, 5],
461 vec![10.0, 20.0, 30.0, 40.0, 50.0],
462 ))
463 .await
464 .unwrap();
465 drop(sender);
466
467 let df = ctx
469 .sql("SELECT * FROM events WHERE value > 25")
470 .await
471 .unwrap();
472 let batches = df.collect().await.unwrap();
473
474 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
475 assert_eq!(total_rows, 3); }
477
478 #[tokio::test]
479 async fn test_unbounded_aggregation_rejected() {
480 let ctx = create_streaming_context();
483 let schema = test_schema();
484
485 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
486 let sender = take_test_sender(&source);
487 let provider = StreamingTableProvider::new("events", source);
488 ctx.register_table("events", Arc::new(provider)).unwrap();
489
490 sender
491 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
492 .await
493 .unwrap();
494 drop(sender);
495
496 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
498
499 let result = df.collect().await;
501 assert!(
502 result.is_err(),
503 "Aggregation on unbounded stream should fail"
504 );
505 }
506
507 #[tokio::test]
508 async fn test_query_with_order_by() {
509 let ctx = create_streaming_context();
510 let schema = test_schema();
511
512 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
513 let sender = take_test_sender(&source);
514 let provider = StreamingTableProvider::new("events", source);
515 ctx.register_table("events", Arc::new(provider)).unwrap();
516
517 sender
518 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
519 .await
520 .unwrap();
521 drop(sender);
522
523 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
525 let batches = df.collect().await.unwrap();
526
527 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
529 assert_eq!(total_rows, 3);
530 }
531
532 #[tokio::test]
533 async fn test_bridge_throughput() {
534 let schema = test_schema();
536 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
537 let sender = bridge.sender();
538 let mut stream = bridge.into_stream();
539
540 let batch_count = 1000;
541 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
542
543 let send_task = tokio::spawn(async move {
545 for _ in 0..batch_count {
546 sender.send(batch.clone()).await.unwrap();
547 }
548 });
549
550 let mut received = 0;
552 while let Some(result) = stream.next().await {
553 result.unwrap();
554 received += 1;
555 if received == batch_count {
556 break;
557 }
558 }
559
560 send_task.await.unwrap();
561 assert_eq!(received, batch_count);
562 }
563
564 #[test]
567 fn test_streaming_functions_registered() {
568 let ctx = create_streaming_context();
569 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
571 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
572 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
573 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
574 }
575
576 #[test]
577 fn test_streaming_functions_with_watermark() {
578 use std::sync::atomic::AtomicI64;
579
580 let ctx = create_session_context();
581 let wm = Arc::new(AtomicI64::new(42_000));
582 register_streaming_functions_with_watermark(&ctx, wm);
583
584 assert!(ctx.udf("tumble").is_ok());
585 assert!(ctx.udf("watermark").is_ok());
586 }
587
588 #[tokio::test]
589 async fn test_tumble_udf_via_datafusion() {
590 use arrow_array::TimestampMillisecondArray;
591 use arrow_schema::TimeUnit;
592
593 let ctx = create_streaming_context();
594
595 let schema = Arc::new(Schema::new(vec![
597 Field::new(
598 "event_time",
599 DataType::Timestamp(TimeUnit::Millisecond, None),
600 false,
601 ),
602 Field::new("value", DataType::Float64, false),
603 ]));
604
605 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
606 let sender = take_test_sender(&source);
607 let provider = StreamingTableProvider::new("events", source);
608 ctx.register_table("events", Arc::new(provider)).unwrap();
609
610 let batch = RecordBatch::try_new(
614 Arc::clone(&schema),
615 vec![
616 Arc::new(TimestampMillisecondArray::from(vec![
617 60_000i64, 120_000, 360_000,
618 ])),
619 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
620 ],
621 )
622 .unwrap();
623 sender.send(batch).await.unwrap();
624 drop(sender);
625
626 let df = ctx
629 .sql(
630 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
631 value \
632 FROM events",
633 )
634 .await
635 .unwrap();
636
637 let batches = df.collect().await.unwrap();
638 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
639 assert_eq!(total_rows, 3);
640
641 let ws_col = batches[0]
643 .column(0)
644 .as_any()
645 .downcast_ref::<TimestampMillisecondArray>()
646 .expect("window_start should be TimestampMillisecond");
647 assert_eq!(ws_col.value(0), 0);
649 assert_eq!(ws_col.value(1), 0);
650 assert_eq!(ws_col.value(2), 300_000);
652 }
653
654 #[tokio::test]
655 async fn test_logical_plan_from_windowed_query() {
656 use arrow_schema::TimeUnit;
657
658 let ctx = create_streaming_context();
659
660 let schema = Arc::new(Schema::new(vec![
661 Field::new(
662 "event_time",
663 DataType::Timestamp(TimeUnit::Millisecond, None),
664 false,
665 ),
666 Field::new("value", DataType::Float64, false),
667 ]));
668
669 let source = Arc::new(ChannelStreamSource::new(schema));
670 let _sender = source.take_sender();
671 let provider = StreamingTableProvider::new("events", source);
672 ctx.register_table("events", Arc::new(provider)).unwrap();
673
674 let df = ctx
676 .sql(
677 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
678 COUNT(*) as cnt \
679 FROM events \
680 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
681 )
682 .await;
683
684 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
686 }
687
688 #[tokio::test]
689 async fn test_end_to_end_execute_streaming_sql() {
690 use crate::planner::StreamingPlanner;
691
692 let ctx = create_streaming_context();
693
694 let schema = Arc::new(Schema::new(vec![
695 Field::new("id", DataType::Int64, false),
696 Field::new("name", DataType::Utf8, true),
697 ]));
698
699 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
700 let sender = take_test_sender(&source);
701 let provider = StreamingTableProvider::new("items", source);
702 ctx.register_table("items", Arc::new(provider)).unwrap();
703
704 let batch = RecordBatch::try_new(
705 Arc::clone(&schema),
706 vec![
707 Arc::new(Int64Array::from(vec![1, 2, 3])),
708 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
709 ],
710 )
711 .unwrap();
712 sender.send(batch).await.unwrap();
713 drop(sender);
714
715 let mut planner = StreamingPlanner::new();
716 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
717 .await
718 .unwrap();
719
720 match result {
721 StreamingSqlResult::Query(qr) => {
722 let mut stream = qr.stream;
723 let mut total = 0;
724 while let Some(batch) = stream.next().await {
725 total += batch.unwrap().num_rows();
726 }
727 assert_eq!(total, 2); }
729 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
730 }
731 }
732
733 #[tokio::test]
734 async fn test_watermark_function_in_filter() {
735 use arrow_array::TimestampMillisecondArray;
736 use arrow_schema::TimeUnit;
737 use std::sync::atomic::AtomicI64;
738
739 let config = base_session_config()
741 .with_batch_size(8192)
742 .with_target_partitions(1);
743 let ctx = SessionContext::new_with_config(config);
744 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
746
747 let schema = Arc::new(Schema::new(vec![
748 Field::new(
749 "event_time",
750 DataType::Timestamp(TimeUnit::Millisecond, None),
751 false,
752 ),
753 Field::new("value", DataType::Float64, false),
754 ]));
755
756 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
757 let sender = take_test_sender(&source);
758 let provider = StreamingTableProvider::new("events", source);
759 ctx.register_table("events", Arc::new(provider)).unwrap();
760
761 let batch = RecordBatch::try_new(
763 Arc::clone(&schema),
764 vec![
765 Arc::new(TimestampMillisecondArray::from(vec![
766 100_000i64, 200_000, 300_000,
767 ])),
768 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
769 ],
770 )
771 .unwrap();
772 sender.send(batch).await.unwrap();
773 drop(sender);
774
775 let df = ctx
777 .sql("SELECT value FROM events WHERE event_time > watermark()")
778 .await
779 .unwrap();
780 let batches = df.collect().await.unwrap();
781 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
782 assert_eq!(total_rows, 1);
784 }
785
786 #[tokio::test]
787 async fn test_date_trunc_available() {
788 let ctx = create_streaming_context();
789 let df = ctx
790 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
791 .await
792 .unwrap();
793 let batches = df.collect().await.unwrap();
794 assert_eq!(batches.len(), 1);
795 assert_eq!(batches[0].num_rows(), 1);
796 }
797
798 #[tokio::test]
799 async fn test_date_bin_available() {
800 let ctx = create_streaming_context();
801 let df = ctx
802 .sql(
803 "SELECT date_bin(\
804 INTERVAL '15 minutes', \
805 TIMESTAMP '2026-01-15 14:32:00', \
806 TIMESTAMP '2026-01-01 00:00:00')",
807 )
808 .await
809 .unwrap();
810 let batches = df.collect().await.unwrap();
811 assert_eq!(batches.len(), 1);
812 assert_eq!(batches[0].num_rows(), 1);
813 }
814
815 #[tokio::test]
816 async fn test_unnest_literal_array() {
817 let ctx = create_streaming_context();
818 let df = ctx
819 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
820 .await
821 .unwrap();
822 let batches = df.collect().await.unwrap();
823 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
824 assert_eq!(total_rows, 3);
825 }
826
827 #[tokio::test]
828 async fn test_unnest_from_table_with_array_col() {
829 let ctx = create_streaming_context();
830 ctx.sql(
832 "CREATE TABLE arr_table (id INT, tags INT[]) \
833 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
834 )
835 .await
836 .unwrap();
837 let df = ctx
838 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
839 .await
840 .unwrap();
841 let batches = df.collect().await.unwrap();
842 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
843 assert_eq!(total_rows, 3);
845 }
846}