1mod bridge;
4mod channel_source;
5pub mod complex_type_lambda;
7pub mod complex_type_udf;
9mod exec;
10pub mod execute;
12pub mod format_bridge_udf;
14pub mod json_extensions;
16pub mod json_path;
18pub mod json_tvf;
20pub mod json_types;
22pub mod json_udaf;
24pub mod json_udf;
26pub mod live_source;
28pub mod lookup_join;
30pub mod lookup_join_exec;
32pub mod proctime_udf;
34mod source;
35mod table_provider;
36pub mod watermark_filter;
38pub mod watermark_udf;
40pub mod window_udf;
42
43pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
44pub use channel_source::ChannelStreamSource;
45pub use complex_type_lambda::{
46 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
47 MapTransformValues,
48};
49pub use complex_type_udf::{
50 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
51 StructExtract, StructMerge, StructRename, StructSet,
52};
53pub use exec::StreamingScanExec;
54pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
55pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
56pub use json_extensions::{
57 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
58 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
59};
60pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
61pub use json_tvf::{
62 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
63 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
64};
65pub use json_udaf::{JsonAgg, JsonObjectAgg};
66pub use json_udf::{
67 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
68 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
69 JsonbGetText, JsonbGetTextIdx, ToJsonb,
70};
71pub use live_source::{LiveSourceHandle, LiveSourceProvider};
72pub use lookup_join_exec::{
73 LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
74 PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
75 VersionedLookupState,
76};
77pub use proctime_udf::ProcTimeUdf;
78pub use source::{SortColumn, StreamSource, StreamSourceRef};
79pub use table_provider::StreamingTableProvider;
80pub use watermark_filter::WatermarkDynamicFilter;
81pub use watermark_udf::WatermarkUdf;
82pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
83
84use std::sync::atomic::AtomicI64;
85use std::sync::Arc;
86
87use datafusion::execution::SessionStateBuilder;
88use datafusion::prelude::*;
89use datafusion_expr::ScalarUDF;
90
91use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
92
93#[must_use]
100pub fn base_session_config() -> SessionConfig {
101 let mut config = SessionConfig::new();
102 config.options_mut().sql_parser.enable_ident_normalization = false;
103 config = config.with_target_partitions(1);
107 config
108}
109
110#[must_use]
116pub fn create_session_context() -> SessionContext {
117 SessionContext::new_with_config(base_session_config())
118}
119
120#[must_use]
141pub fn create_streaming_context() -> SessionContext {
142 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
143}
144
145#[must_use]
153pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
154 let config = base_session_config().with_batch_size(8192);
155
156 let ctx = if matches!(mode, StreamingValidatorMode::Off) {
157 SessionContext::new_with_config(config)
158 } else {
159 let default_state = SessionStateBuilder::new()
163 .with_config(config.clone())
164 .with_default_features()
165 .build();
166 let mut rules: Vec<
167 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
168 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
169 rules.extend(default_state.physical_optimizers().iter().cloned());
170
171 let state = SessionStateBuilder::new()
172 .with_config(config)
173 .with_default_features()
174 .with_physical_optimizer_rules(rules)
175 .build();
176 SessionContext::new_with_state(state)
177 };
178
179 register_streaming_functions(&ctx);
180 ctx
181}
182
183pub fn register_streaming_functions(ctx: &SessionContext) {
194 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
195 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
196 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
197 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
198 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
199 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
200 register_json_functions(ctx);
201 register_json_extensions(ctx);
202 register_complex_type_functions(ctx);
203 register_lambda_functions(ctx);
204}
205
206pub fn register_streaming_functions_with_watermark(
217 ctx: &SessionContext,
218 watermark_ms: Arc<AtomicI64>,
219) {
220 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
221 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
222 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
223 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
224 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
225 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
226 register_json_functions(ctx);
227 register_json_extensions(ctx);
228 register_complex_type_functions(ctx);
229 register_lambda_functions(ctx);
230}
231
232pub fn register_json_functions(ctx: &SessionContext) {
235 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
237 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
238 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
239 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
240 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
241 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
242
243 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
245 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
246 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
247
248 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
250 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
251
252 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
254 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
255 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
256 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
257
258 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
260 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
261 JsonObjectAgg::new(),
262 ));
263
264 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
266 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
267 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
268 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
269
270 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
272 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
273
274 register_json_table_functions(ctx);
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use arrow_array::{Float64Array, Int64Array, RecordBatch};
282 use arrow_schema::{DataType, Field, Schema};
283 use datafusion::execution::FunctionRegistry;
284 use futures::StreamExt;
285 use std::sync::Arc;
286
287 fn test_schema() -> Arc<Schema> {
288 Arc::new(Schema::new(vec![
289 Field::new("id", DataType::Int64, false),
290 Field::new("value", DataType::Float64, true),
291 ]))
292 }
293
294 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
296 source.take_sender().expect("sender already taken")
297 }
298
299 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
300 RecordBatch::try_new(
301 Arc::clone(schema),
302 vec![
303 Arc::new(Int64Array::from(ids)),
304 Arc::new(Float64Array::from(values)),
305 ],
306 )
307 .unwrap()
308 }
309
310 #[test]
311 fn test_create_streaming_context() {
312 let ctx = create_streaming_context();
313 let state = ctx.state();
314 let config = state.config();
315
316 assert_eq!(config.batch_size(), 8192);
317 assert_eq!(config.target_partitions(), 1);
318 }
319
320 #[tokio::test]
321 async fn test_full_query_pipeline() {
322 let ctx = create_streaming_context();
323 let schema = test_schema();
324
325 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
327 let sender = take_test_sender(&source);
328 let provider = StreamingTableProvider::new("events", source);
329 ctx.register_table("events", Arc::new(provider)).unwrap();
330
331 sender
333 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
334 .await
335 .unwrap();
336 sender
337 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
338 .await
339 .unwrap();
340 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
344 let batches = df.collect().await.unwrap();
345
346 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
348 assert_eq!(total_rows, 5);
349 }
350
351 #[tokio::test]
352 async fn test_query_with_projection() {
353 let ctx = create_streaming_context();
354 let schema = test_schema();
355
356 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
357 let sender = take_test_sender(&source);
358 let provider = StreamingTableProvider::new("events", source);
359 ctx.register_table("events", Arc::new(provider)).unwrap();
360
361 sender
362 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
363 .await
364 .unwrap();
365 drop(sender);
366
367 let df = ctx.sql("SELECT id FROM events").await.unwrap();
369 let batches = df.collect().await.unwrap();
370
371 assert_eq!(batches.len(), 1);
372 assert_eq!(batches[0].num_columns(), 1);
373 assert_eq!(batches[0].schema().field(0).name(), "id");
374 }
375
376 #[tokio::test]
377 async fn test_query_with_filter() {
378 let ctx = create_streaming_context();
379 let schema = test_schema();
380
381 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
382 let sender = take_test_sender(&source);
383 let provider = StreamingTableProvider::new("events", source);
384 ctx.register_table("events", Arc::new(provider)).unwrap();
385
386 sender
387 .send(test_batch(
388 &schema,
389 vec![1, 2, 3, 4, 5],
390 vec![10.0, 20.0, 30.0, 40.0, 50.0],
391 ))
392 .await
393 .unwrap();
394 drop(sender);
395
396 let df = ctx
398 .sql("SELECT * FROM events WHERE value > 25")
399 .await
400 .unwrap();
401 let batches = df.collect().await.unwrap();
402
403 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
404 assert_eq!(total_rows, 3); }
406
407 #[tokio::test]
408 async fn test_unbounded_aggregation_rejected() {
409 let ctx = create_streaming_context();
412 let schema = test_schema();
413
414 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
415 let sender = take_test_sender(&source);
416 let provider = StreamingTableProvider::new("events", source);
417 ctx.register_table("events", Arc::new(provider)).unwrap();
418
419 sender
420 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
421 .await
422 .unwrap();
423 drop(sender);
424
425 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
427
428 let result = df.collect().await;
430 assert!(
431 result.is_err(),
432 "Aggregation on unbounded stream should fail"
433 );
434 }
435
436 #[tokio::test]
437 async fn test_query_with_order_by() {
438 let ctx = create_streaming_context();
439 let schema = test_schema();
440
441 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
442 let sender = take_test_sender(&source);
443 let provider = StreamingTableProvider::new("events", source);
444 ctx.register_table("events", Arc::new(provider)).unwrap();
445
446 sender
447 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
448 .await
449 .unwrap();
450 drop(sender);
451
452 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
454 let batches = df.collect().await.unwrap();
455
456 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
458 assert_eq!(total_rows, 3);
459 }
460
461 #[tokio::test]
462 async fn test_bridge_throughput() {
463 let schema = test_schema();
465 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
466 let sender = bridge.sender();
467 let mut stream = bridge.into_stream();
468
469 let batch_count = 1000;
470 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
471
472 let send_task = tokio::spawn(async move {
474 for _ in 0..batch_count {
475 sender.send(batch.clone()).await.unwrap();
476 }
477 });
478
479 let mut received = 0;
481 while let Some(result) = stream.next().await {
482 result.unwrap();
483 received += 1;
484 if received == batch_count {
485 break;
486 }
487 }
488
489 send_task.await.unwrap();
490 assert_eq!(received, batch_count);
491 }
492
493 #[test]
496 fn test_streaming_functions_registered() {
497 let ctx = create_streaming_context();
498 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
500 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
501 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
502 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
503 }
504
505 #[test]
506 fn test_streaming_functions_with_watermark() {
507 use std::sync::atomic::AtomicI64;
508
509 let ctx = create_session_context();
510 let wm = Arc::new(AtomicI64::new(42_000));
511 register_streaming_functions_with_watermark(&ctx, wm);
512
513 assert!(ctx.udf("tumble").is_ok());
514 assert!(ctx.udf("watermark").is_ok());
515 }
516
517 #[tokio::test]
518 async fn test_tumble_udf_via_datafusion() {
519 use arrow_array::TimestampMillisecondArray;
520 use arrow_schema::TimeUnit;
521
522 let ctx = create_streaming_context();
523
524 let schema = Arc::new(Schema::new(vec![
526 Field::new(
527 "event_time",
528 DataType::Timestamp(TimeUnit::Millisecond, None),
529 false,
530 ),
531 Field::new("value", DataType::Float64, false),
532 ]));
533
534 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
535 let sender = take_test_sender(&source);
536 let provider = StreamingTableProvider::new("events", source);
537 ctx.register_table("events", Arc::new(provider)).unwrap();
538
539 let batch = RecordBatch::try_new(
543 Arc::clone(&schema),
544 vec![
545 Arc::new(TimestampMillisecondArray::from(vec![
546 60_000i64, 120_000, 360_000,
547 ])),
548 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
549 ],
550 )
551 .unwrap();
552 sender.send(batch).await.unwrap();
553 drop(sender);
554
555 let df = ctx
558 .sql(
559 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
560 value \
561 FROM events",
562 )
563 .await
564 .unwrap();
565
566 let batches = df.collect().await.unwrap();
567 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
568 assert_eq!(total_rows, 3);
569
570 let ws_col = batches[0]
572 .column(0)
573 .as_any()
574 .downcast_ref::<TimestampMillisecondArray>()
575 .expect("window_start should be TimestampMillisecond");
576 assert_eq!(ws_col.value(0), 0);
578 assert_eq!(ws_col.value(1), 0);
579 assert_eq!(ws_col.value(2), 300_000);
581 }
582
583 #[tokio::test]
584 async fn test_logical_plan_from_windowed_query() {
585 use arrow_schema::TimeUnit;
586
587 let ctx = create_streaming_context();
588
589 let schema = Arc::new(Schema::new(vec![
590 Field::new(
591 "event_time",
592 DataType::Timestamp(TimeUnit::Millisecond, None),
593 false,
594 ),
595 Field::new("value", DataType::Float64, false),
596 ]));
597
598 let source = Arc::new(ChannelStreamSource::new(schema));
599 let _sender = source.take_sender();
600 let provider = StreamingTableProvider::new("events", source);
601 ctx.register_table("events", Arc::new(provider)).unwrap();
602
603 let df = ctx
605 .sql(
606 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
607 COUNT(*) as cnt \
608 FROM events \
609 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
610 )
611 .await;
612
613 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
615 }
616
617 #[tokio::test]
618 async fn test_end_to_end_execute_streaming_sql() {
619 use crate::planner::StreamingPlanner;
620
621 let ctx = create_streaming_context();
622
623 let schema = Arc::new(Schema::new(vec![
624 Field::new("id", DataType::Int64, false),
625 Field::new("name", DataType::Utf8, true),
626 ]));
627
628 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
629 let sender = take_test_sender(&source);
630 let provider = StreamingTableProvider::new("items", source);
631 ctx.register_table("items", Arc::new(provider)).unwrap();
632
633 let batch = RecordBatch::try_new(
634 Arc::clone(&schema),
635 vec![
636 Arc::new(Int64Array::from(vec![1, 2, 3])),
637 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
638 ],
639 )
640 .unwrap();
641 sender.send(batch).await.unwrap();
642 drop(sender);
643
644 let mut planner = StreamingPlanner::new();
645 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
646 .await
647 .unwrap();
648
649 match result {
650 StreamingSqlResult::Query(qr) => {
651 let mut stream = qr.stream;
652 let mut total = 0;
653 while let Some(batch) = stream.next().await {
654 total += batch.unwrap().num_rows();
655 }
656 assert_eq!(total, 2); }
658 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
659 }
660 }
661
662 #[tokio::test]
663 async fn test_watermark_function_in_filter() {
664 use arrow_array::TimestampMillisecondArray;
665 use arrow_schema::TimeUnit;
666 use std::sync::atomic::AtomicI64;
667
668 let config = base_session_config()
670 .with_batch_size(8192)
671 .with_target_partitions(1);
672 let ctx = SessionContext::new_with_config(config);
673 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
675
676 let schema = Arc::new(Schema::new(vec![
677 Field::new(
678 "event_time",
679 DataType::Timestamp(TimeUnit::Millisecond, None),
680 false,
681 ),
682 Field::new("value", DataType::Float64, false),
683 ]));
684
685 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
686 let sender = take_test_sender(&source);
687 let provider = StreamingTableProvider::new("events", source);
688 ctx.register_table("events", Arc::new(provider)).unwrap();
689
690 let batch = RecordBatch::try_new(
692 Arc::clone(&schema),
693 vec![
694 Arc::new(TimestampMillisecondArray::from(vec![
695 100_000i64, 200_000, 300_000,
696 ])),
697 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
698 ],
699 )
700 .unwrap();
701 sender.send(batch).await.unwrap();
702 drop(sender);
703
704 let df = ctx
706 .sql("SELECT value FROM events WHERE event_time > watermark()")
707 .await
708 .unwrap();
709 let batches = df.collect().await.unwrap();
710 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
711 assert_eq!(total_rows, 1);
713 }
714
715 #[tokio::test]
716 async fn test_date_trunc_available() {
717 let ctx = create_streaming_context();
718 let df = ctx
719 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
720 .await
721 .unwrap();
722 let batches = df.collect().await.unwrap();
723 assert_eq!(batches.len(), 1);
724 assert_eq!(batches[0].num_rows(), 1);
725 }
726
727 #[tokio::test]
728 async fn test_date_bin_available() {
729 let ctx = create_streaming_context();
730 let df = ctx
731 .sql(
732 "SELECT date_bin(\
733 INTERVAL '15 minutes', \
734 TIMESTAMP '2026-01-15 14:32:00', \
735 TIMESTAMP '2026-01-01 00:00:00')",
736 )
737 .await
738 .unwrap();
739 let batches = df.collect().await.unwrap();
740 assert_eq!(batches.len(), 1);
741 assert_eq!(batches[0].num_rows(), 1);
742 }
743
744 #[tokio::test]
745 async fn test_unnest_literal_array() {
746 let ctx = create_streaming_context();
747 let df = ctx
748 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
749 .await
750 .unwrap();
751 let batches = df.collect().await.unwrap();
752 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
753 assert_eq!(total_rows, 3);
754 }
755
756 #[tokio::test]
757 async fn test_unnest_from_table_with_array_col() {
758 let ctx = create_streaming_context();
759 ctx.sql(
761 "CREATE TABLE arr_table (id INT, tags INT[]) \
762 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
763 )
764 .await
765 .unwrap();
766 let df = ctx
767 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
768 .await
769 .unwrap();
770 let batches = df.collect().await.unwrap();
771 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
772 assert_eq!(total_rows, 3);
774 }
775}