1mod bridge;
4mod channel_source;
5pub mod complex_type_lambda;
7pub mod complex_type_udf;
9mod exec;
10pub mod execute;
12pub mod format_bridge_udf;
14pub mod json_extensions;
16pub mod json_path;
18pub mod json_tvf;
20pub mod json_types;
22pub mod json_udaf;
24pub mod json_udf;
26pub mod live_source;
28pub mod lookup_join;
30pub mod lookup_join_exec;
32pub mod proctime_udf;
34mod source;
35mod table_provider;
36pub mod watermark_udf;
39pub mod window_udf;
41
42pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
43pub use channel_source::ChannelStreamSource;
44pub use complex_type_lambda::{
45 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
46 MapTransformValues,
47};
48pub use complex_type_udf::{
49 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
50 StructExtract, StructMerge, StructRename, StructSet,
51};
52pub use exec::StreamingScanExec;
53pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
54pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
55pub use json_extensions::{
56 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
57 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
58};
59pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
60pub use json_tvf::{
61 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
62 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
63};
64pub use json_udaf::{JsonAgg, JsonObjectAgg};
65pub use json_udf::{
66 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
67 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
68 JsonbGetText, JsonbGetTextIdx, ToJsonb,
69};
70pub use live_source::{LiveSourceHandle, LiveSourceProvider};
71pub use lookup_join_exec::{
72 LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
73 PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
74 VersionedLookupState,
75};
76pub use proctime_udf::ProcTimeUdf;
77pub use source::{SortColumn, StreamSource, StreamSourceRef};
78pub use table_provider::StreamingTableProvider;
79pub use watermark_udf::WatermarkUdf;
80pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
81
82use std::sync::atomic::AtomicI64;
83use std::sync::Arc;
84
85use datafusion::execution::SessionStateBuilder;
86use datafusion::prelude::*;
87use datafusion_expr::ScalarUDF;
88
89use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
90
91#[must_use]
98pub fn base_session_config() -> SessionConfig {
99 let mut config = SessionConfig::new();
100 config.options_mut().sql_parser.enable_ident_normalization = false;
101 config = config.with_target_partitions(1);
105 config
106}
107
108#[must_use]
114pub fn create_session_context() -> SessionContext {
115 SessionContext::new_with_config(base_session_config())
116}
117
118#[must_use]
139pub fn create_streaming_context() -> SessionContext {
140 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
141}
142
143#[must_use]
151pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
152 let config = base_session_config().with_batch_size(8192);
153
154 let ctx = if matches!(mode, StreamingValidatorMode::Off) {
155 SessionContext::new_with_config(config)
156 } else {
157 let default_state = SessionStateBuilder::new()
161 .with_config(config.clone())
162 .with_default_features()
163 .build();
164 let mut rules: Vec<
165 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
166 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
167 rules.extend(default_state.physical_optimizers().iter().cloned());
168
169 let state = SessionStateBuilder::new()
170 .with_config(config)
171 .with_default_features()
172 .with_physical_optimizer_rules(rules)
173 .build();
174 SessionContext::new_with_state(state)
175 };
176
177 register_streaming_functions(&ctx);
178 ctx
179}
180
181pub fn register_streaming_functions(ctx: &SessionContext) {
192 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
193 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
194 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
195 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
196 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
197 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
198 register_json_functions(ctx);
199 register_json_extensions(ctx);
200 register_complex_type_functions(ctx);
201 register_lambda_functions(ctx);
202}
203
204pub fn register_streaming_functions_with_watermark(
215 ctx: &SessionContext,
216 watermark_ms: Arc<AtomicI64>,
217) {
218 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
219 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
220 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
221 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
222 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
223 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
224 register_json_functions(ctx);
225 register_json_extensions(ctx);
226 register_complex_type_functions(ctx);
227 register_lambda_functions(ctx);
228}
229
230pub fn register_json_functions(ctx: &SessionContext) {
233 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
235 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
236 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
237 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
238 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
239 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
240
241 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
243 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
244 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
245
246 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
248 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
249
250 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
252 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
253 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
254 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
255
256 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
258 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
259 JsonObjectAgg::new(),
260 ));
261
262 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
264 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
265 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
266 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
267
268 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
270 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
271
272 register_json_table_functions(ctx);
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use arrow_array::{Float64Array, Int64Array, RecordBatch};
280 use arrow_schema::{DataType, Field, Schema};
281 use datafusion::execution::FunctionRegistry;
282 use futures::StreamExt;
283 use std::sync::Arc;
284
285 fn test_schema() -> Arc<Schema> {
286 Arc::new(Schema::new(vec![
287 Field::new("id", DataType::Int64, false),
288 Field::new("value", DataType::Float64, true),
289 ]))
290 }
291
292 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
294 source.take_sender().expect("sender already taken")
295 }
296
297 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
298 RecordBatch::try_new(
299 Arc::clone(schema),
300 vec![
301 Arc::new(Int64Array::from(ids)),
302 Arc::new(Float64Array::from(values)),
303 ],
304 )
305 .unwrap()
306 }
307
308 #[test]
309 fn test_create_streaming_context() {
310 let ctx = create_streaming_context();
311 let state = ctx.state();
312 let config = state.config();
313
314 assert_eq!(config.batch_size(), 8192);
315 assert_eq!(config.target_partitions(), 1);
316 }
317
318 #[tokio::test]
319 async fn test_full_query_pipeline() {
320 let ctx = create_streaming_context();
321 let schema = test_schema();
322
323 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
325 let sender = take_test_sender(&source);
326 let provider = StreamingTableProvider::new("events", source);
327 ctx.register_table("events", Arc::new(provider)).unwrap();
328
329 sender
331 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
332 .await
333 .unwrap();
334 sender
335 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
336 .await
337 .unwrap();
338 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
342 let batches = df.collect().await.unwrap();
343
344 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
346 assert_eq!(total_rows, 5);
347 }
348
349 #[tokio::test]
350 async fn test_query_with_projection() {
351 let ctx = create_streaming_context();
352 let schema = test_schema();
353
354 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
355 let sender = take_test_sender(&source);
356 let provider = StreamingTableProvider::new("events", source);
357 ctx.register_table("events", Arc::new(provider)).unwrap();
358
359 sender
360 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
361 .await
362 .unwrap();
363 drop(sender);
364
365 let df = ctx.sql("SELECT id FROM events").await.unwrap();
367 let batches = df.collect().await.unwrap();
368
369 assert_eq!(batches.len(), 1);
370 assert_eq!(batches[0].num_columns(), 1);
371 assert_eq!(batches[0].schema().field(0).name(), "id");
372 }
373
374 #[tokio::test]
375 async fn test_query_with_filter() {
376 let ctx = create_streaming_context();
377 let schema = test_schema();
378
379 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
380 let sender = take_test_sender(&source);
381 let provider = StreamingTableProvider::new("events", source);
382 ctx.register_table("events", Arc::new(provider)).unwrap();
383
384 sender
385 .send(test_batch(
386 &schema,
387 vec![1, 2, 3, 4, 5],
388 vec![10.0, 20.0, 30.0, 40.0, 50.0],
389 ))
390 .await
391 .unwrap();
392 drop(sender);
393
394 let df = ctx
396 .sql("SELECT * FROM events WHERE value > 25")
397 .await
398 .unwrap();
399 let batches = df.collect().await.unwrap();
400
401 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
402 assert_eq!(total_rows, 3); }
404
405 #[tokio::test]
406 async fn test_unbounded_aggregation_rejected() {
407 let ctx = create_streaming_context();
410 let schema = test_schema();
411
412 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
413 let sender = take_test_sender(&source);
414 let provider = StreamingTableProvider::new("events", source);
415 ctx.register_table("events", Arc::new(provider)).unwrap();
416
417 sender
418 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
419 .await
420 .unwrap();
421 drop(sender);
422
423 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
425
426 let result = df.collect().await;
428 assert!(
429 result.is_err(),
430 "Aggregation on unbounded stream should fail"
431 );
432 }
433
434 #[tokio::test]
435 async fn test_query_with_order_by() {
436 let ctx = create_streaming_context();
437 let schema = test_schema();
438
439 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
440 let sender = take_test_sender(&source);
441 let provider = StreamingTableProvider::new("events", source);
442 ctx.register_table("events", Arc::new(provider)).unwrap();
443
444 sender
445 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
446 .await
447 .unwrap();
448 drop(sender);
449
450 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
452 let batches = df.collect().await.unwrap();
453
454 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
456 assert_eq!(total_rows, 3);
457 }
458
459 #[tokio::test]
460 async fn test_bridge_throughput() {
461 let schema = test_schema();
463 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
464 let sender = bridge.sender();
465 let mut stream = bridge.into_stream();
466
467 let batch_count = 1000;
468 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
469
470 let send_task = tokio::spawn(async move {
472 for _ in 0..batch_count {
473 sender.send(batch.clone()).await.unwrap();
474 }
475 });
476
477 let mut received = 0;
479 while let Some(result) = stream.next().await {
480 result.unwrap();
481 received += 1;
482 if received == batch_count {
483 break;
484 }
485 }
486
487 send_task.await.unwrap();
488 assert_eq!(received, batch_count);
489 }
490
491 #[test]
494 fn test_streaming_functions_registered() {
495 let ctx = create_streaming_context();
496 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
498 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
499 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
500 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
501 }
502
503 #[test]
504 fn test_streaming_functions_with_watermark() {
505 use std::sync::atomic::AtomicI64;
506
507 let ctx = create_session_context();
508 let wm = Arc::new(AtomicI64::new(42_000));
509 register_streaming_functions_with_watermark(&ctx, wm);
510
511 assert!(ctx.udf("tumble").is_ok());
512 assert!(ctx.udf("watermark").is_ok());
513 }
514
515 #[tokio::test]
516 async fn test_tumble_udf_via_datafusion() {
517 use arrow_array::TimestampMillisecondArray;
518 use arrow_schema::TimeUnit;
519
520 let ctx = create_streaming_context();
521
522 let schema = Arc::new(Schema::new(vec![
524 Field::new(
525 "event_time",
526 DataType::Timestamp(TimeUnit::Millisecond, None),
527 false,
528 ),
529 Field::new("value", DataType::Float64, false),
530 ]));
531
532 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
533 let sender = take_test_sender(&source);
534 let provider = StreamingTableProvider::new("events", source);
535 ctx.register_table("events", Arc::new(provider)).unwrap();
536
537 let batch = RecordBatch::try_new(
541 Arc::clone(&schema),
542 vec![
543 Arc::new(TimestampMillisecondArray::from(vec![
544 60_000i64, 120_000, 360_000,
545 ])),
546 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
547 ],
548 )
549 .unwrap();
550 sender.send(batch).await.unwrap();
551 drop(sender);
552
553 let df = ctx
556 .sql(
557 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
558 value \
559 FROM events",
560 )
561 .await
562 .unwrap();
563
564 let batches = df.collect().await.unwrap();
565 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
566 assert_eq!(total_rows, 3);
567
568 let ws_col = batches[0]
570 .column(0)
571 .as_any()
572 .downcast_ref::<TimestampMillisecondArray>()
573 .expect("window_start should be TimestampMillisecond");
574 assert_eq!(ws_col.value(0), 0);
576 assert_eq!(ws_col.value(1), 0);
577 assert_eq!(ws_col.value(2), 300_000);
579 }
580
581 #[tokio::test]
582 async fn test_logical_plan_from_windowed_query() {
583 use arrow_schema::TimeUnit;
584
585 let ctx = create_streaming_context();
586
587 let schema = Arc::new(Schema::new(vec![
588 Field::new(
589 "event_time",
590 DataType::Timestamp(TimeUnit::Millisecond, None),
591 false,
592 ),
593 Field::new("value", DataType::Float64, false),
594 ]));
595
596 let source = Arc::new(ChannelStreamSource::new(schema));
597 let _sender = source.take_sender();
598 let provider = StreamingTableProvider::new("events", source);
599 ctx.register_table("events", Arc::new(provider)).unwrap();
600
601 let df = ctx
603 .sql(
604 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
605 COUNT(*) as cnt \
606 FROM events \
607 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
608 )
609 .await;
610
611 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
613 }
614
615 #[tokio::test]
616 async fn test_end_to_end_execute_streaming_sql() {
617 use crate::planner::StreamingPlanner;
618
619 let ctx = create_streaming_context();
620
621 let schema = Arc::new(Schema::new(vec![
622 Field::new("id", DataType::Int64, false),
623 Field::new("name", DataType::Utf8, true),
624 ]));
625
626 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
627 let sender = take_test_sender(&source);
628 let provider = StreamingTableProvider::new("items", source);
629 ctx.register_table("items", Arc::new(provider)).unwrap();
630
631 let batch = RecordBatch::try_new(
632 Arc::clone(&schema),
633 vec![
634 Arc::new(Int64Array::from(vec![1, 2, 3])),
635 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
636 ],
637 )
638 .unwrap();
639 sender.send(batch).await.unwrap();
640 drop(sender);
641
642 let mut planner = StreamingPlanner::new();
643 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
644 .await
645 .unwrap();
646
647 match result {
648 StreamingSqlResult::Query(qr) => {
649 let mut stream = qr.stream;
650 let mut total = 0;
651 while let Some(batch) = stream.next().await {
652 total += batch.unwrap().num_rows();
653 }
654 assert_eq!(total, 2); }
656 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
657 }
658 }
659
660 #[tokio::test]
661 async fn test_watermark_function_in_filter() {
662 use arrow_array::TimestampMillisecondArray;
663 use arrow_schema::TimeUnit;
664 use std::sync::atomic::AtomicI64;
665
666 let config = base_session_config()
668 .with_batch_size(8192)
669 .with_target_partitions(1);
670 let ctx = SessionContext::new_with_config(config);
671 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
673
674 let schema = Arc::new(Schema::new(vec![
675 Field::new(
676 "event_time",
677 DataType::Timestamp(TimeUnit::Millisecond, None),
678 false,
679 ),
680 Field::new("value", DataType::Float64, false),
681 ]));
682
683 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
684 let sender = take_test_sender(&source);
685 let provider = StreamingTableProvider::new("events", source);
686 ctx.register_table("events", Arc::new(provider)).unwrap();
687
688 let batch = RecordBatch::try_new(
690 Arc::clone(&schema),
691 vec![
692 Arc::new(TimestampMillisecondArray::from(vec![
693 100_000i64, 200_000, 300_000,
694 ])),
695 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
696 ],
697 )
698 .unwrap();
699 sender.send(batch).await.unwrap();
700 drop(sender);
701
702 let df = ctx
704 .sql("SELECT value FROM events WHERE event_time > watermark()")
705 .await
706 .unwrap();
707 let batches = df.collect().await.unwrap();
708 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
709 assert_eq!(total_rows, 1);
711 }
712
713 #[tokio::test]
714 async fn test_date_trunc_available() {
715 let ctx = create_streaming_context();
716 let df = ctx
717 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
718 .await
719 .unwrap();
720 let batches = df.collect().await.unwrap();
721 assert_eq!(batches.len(), 1);
722 assert_eq!(batches[0].num_rows(), 1);
723 }
724
725 #[tokio::test]
726 async fn test_date_bin_available() {
727 let ctx = create_streaming_context();
728 let df = ctx
729 .sql(
730 "SELECT date_bin(\
731 INTERVAL '15 minutes', \
732 TIMESTAMP '2026-01-15 14:32:00', \
733 TIMESTAMP '2026-01-01 00:00:00')",
734 )
735 .await
736 .unwrap();
737 let batches = df.collect().await.unwrap();
738 assert_eq!(batches.len(), 1);
739 assert_eq!(batches[0].num_rows(), 1);
740 }
741
742 #[tokio::test]
743 async fn test_unnest_literal_array() {
744 let ctx = create_streaming_context();
745 let df = ctx
746 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
747 .await
748 .unwrap();
749 let batches = df.collect().await.unwrap();
750 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
751 assert_eq!(total_rows, 3);
752 }
753
754 #[tokio::test]
755 async fn test_unnest_from_table_with_array_col() {
756 let ctx = create_streaming_context();
757 ctx.sql(
759 "CREATE TABLE arr_table (id INT, tags INT[]) \
760 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
761 )
762 .await
763 .unwrap();
764 let df = ctx
765 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
766 .await
767 .unwrap();
768 let batches = df.collect().await.unwrap();
769 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
770 assert_eq!(total_rows, 3);
772 }
773}