1pub mod aggregate_bridge;
70mod bridge;
71mod channel_source;
72pub mod complex_type_lambda;
74pub mod complex_type_udf;
76mod exec;
77pub mod execute;
79pub mod format_bridge_udf;
81pub mod json_extensions;
83pub mod json_path;
85pub mod json_tvf;
87pub mod json_types;
89pub mod json_udaf;
91pub mod json_udf;
93pub mod lookup_join;
95pub mod proctime_udf;
97mod source;
98mod table_provider;
99pub mod watermark_filter;
101pub mod watermark_udf;
103pub mod window_udf;
105
106pub use aggregate_bridge::{
107 create_aggregate_factory, lookup_aggregate_udf, result_to_scalar_value, scalar_value_to_result,
108 DataFusionAccumulatorAdapter, DataFusionAggregateFactory,
109};
110pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
111pub use channel_source::ChannelStreamSource;
112pub use complex_type_lambda::{
113 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
114 MapTransformValues,
115};
116pub use complex_type_udf::{
117 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
118 StructExtract, StructMerge, StructRename, StructSet,
119};
120pub use exec::StreamingScanExec;
121pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
122pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
123pub use json_extensions::{
124 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
125 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
126};
127pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
128pub use json_tvf::{
129 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
130 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
131};
132pub use json_udaf::{JsonAgg, JsonObjectAgg};
133pub use json_udf::{
134 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
135 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
136 JsonbGetText, JsonbGetTextIdx, ToJsonb,
137};
138pub use proctime_udf::ProcTimeUdf;
139pub use source::{SortColumn, StreamSource, StreamSourceRef};
140pub use table_provider::StreamingTableProvider;
141pub use watermark_filter::WatermarkDynamicFilter;
142pub use watermark_udf::WatermarkUdf;
143pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
144
145use std::sync::atomic::AtomicI64;
146use std::sync::Arc;
147
148use datafusion::execution::SessionStateBuilder;
149use datafusion::prelude::*;
150use datafusion_expr::ScalarUDF;
151
152use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
153
154#[must_use]
161pub fn base_session_config() -> SessionConfig {
162 let mut config = SessionConfig::new();
163 config.options_mut().sql_parser.enable_ident_normalization = false;
164 config
165}
166
167#[must_use]
173pub fn create_session_context() -> SessionContext {
174 SessionContext::new_with_config(base_session_config())
175}
176
177#[must_use]
198pub fn create_streaming_context() -> SessionContext {
199 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
200}
201
202#[must_use]
210pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
211 let config = base_session_config()
212 .with_batch_size(8192)
213 .with_target_partitions(1); let ctx = if matches!(mode, StreamingValidatorMode::Off) {
216 SessionContext::new_with_config(config)
217 } else {
218 let default_state = SessionStateBuilder::new()
222 .with_config(config.clone())
223 .with_default_features()
224 .build();
225 let mut rules: Vec<
226 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
227 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
228 rules.extend(default_state.physical_optimizers().iter().cloned());
229
230 let state = SessionStateBuilder::new()
231 .with_config(config)
232 .with_default_features()
233 .with_physical_optimizer_rules(rules)
234 .build();
235 SessionContext::new_with_state(state)
236 };
237
238 register_streaming_functions(&ctx);
239 ctx
240}
241
242pub fn register_streaming_functions(ctx: &SessionContext) {
253 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
254 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
255 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
256 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
257 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
258 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
259 register_json_functions(ctx);
260 register_json_extensions(ctx);
261 register_complex_type_functions(ctx);
262 register_lambda_functions(ctx);
263}
264
265pub fn register_streaming_functions_with_watermark(
276 ctx: &SessionContext,
277 watermark_ms: Arc<AtomicI64>,
278) {
279 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
280 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
281 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
282 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
283 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
284 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
285 register_json_functions(ctx);
286 register_json_extensions(ctx);
287 register_complex_type_functions(ctx);
288 register_lambda_functions(ctx);
289}
290
291pub fn register_json_functions(ctx: &SessionContext) {
294 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
296 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
297 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
298 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
299 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
300 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
301
302 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
304 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
305 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
306
307 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
309 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
310
311 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
313 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
314 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
315 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
316
317 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
319 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
320 JsonObjectAgg::new(),
321 ));
322
323 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
325 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
326 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
327 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
328
329 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
331 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
332
333 register_json_table_functions(ctx);
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use arrow_array::{Float64Array, Int64Array, RecordBatch};
341 use arrow_schema::{DataType, Field, Schema};
342 use datafusion::execution::FunctionRegistry;
343 use futures::StreamExt;
344 use std::sync::Arc;
345
346 fn test_schema() -> Arc<Schema> {
347 Arc::new(Schema::new(vec![
348 Field::new("id", DataType::Int64, false),
349 Field::new("value", DataType::Float64, true),
350 ]))
351 }
352
353 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
355 source.take_sender().expect("sender already taken")
356 }
357
358 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
359 RecordBatch::try_new(
360 Arc::clone(schema),
361 vec![
362 Arc::new(Int64Array::from(ids)),
363 Arc::new(Float64Array::from(values)),
364 ],
365 )
366 .unwrap()
367 }
368
369 #[test]
370 fn test_create_streaming_context() {
371 let ctx = create_streaming_context();
372 let state = ctx.state();
373 let config = state.config();
374
375 assert_eq!(config.batch_size(), 8192);
376 assert_eq!(config.target_partitions(), 1);
377 }
378
379 #[tokio::test]
380 async fn test_full_query_pipeline() {
381 let ctx = create_streaming_context();
382 let schema = test_schema();
383
384 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
386 let sender = take_test_sender(&source);
387 let provider = StreamingTableProvider::new("events", source);
388 ctx.register_table("events", Arc::new(provider)).unwrap();
389
390 sender
392 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
393 .await
394 .unwrap();
395 sender
396 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
397 .await
398 .unwrap();
399 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
403 let batches = df.collect().await.unwrap();
404
405 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
407 assert_eq!(total_rows, 5);
408 }
409
410 #[tokio::test]
411 async fn test_query_with_projection() {
412 let ctx = create_streaming_context();
413 let schema = test_schema();
414
415 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
416 let sender = take_test_sender(&source);
417 let provider = StreamingTableProvider::new("events", source);
418 ctx.register_table("events", Arc::new(provider)).unwrap();
419
420 sender
421 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
422 .await
423 .unwrap();
424 drop(sender);
425
426 let df = ctx.sql("SELECT id FROM events").await.unwrap();
428 let batches = df.collect().await.unwrap();
429
430 assert_eq!(batches.len(), 1);
431 assert_eq!(batches[0].num_columns(), 1);
432 assert_eq!(batches[0].schema().field(0).name(), "id");
433 }
434
435 #[tokio::test]
436 async fn test_query_with_filter() {
437 let ctx = create_streaming_context();
438 let schema = test_schema();
439
440 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
441 let sender = take_test_sender(&source);
442 let provider = StreamingTableProvider::new("events", source);
443 ctx.register_table("events", Arc::new(provider)).unwrap();
444
445 sender
446 .send(test_batch(
447 &schema,
448 vec![1, 2, 3, 4, 5],
449 vec![10.0, 20.0, 30.0, 40.0, 50.0],
450 ))
451 .await
452 .unwrap();
453 drop(sender);
454
455 let df = ctx
457 .sql("SELECT * FROM events WHERE value > 25")
458 .await
459 .unwrap();
460 let batches = df.collect().await.unwrap();
461
462 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
463 assert_eq!(total_rows, 3); }
465
466 #[tokio::test]
467 async fn test_unbounded_aggregation_rejected() {
468 let ctx = create_streaming_context();
471 let schema = test_schema();
472
473 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
474 let sender = take_test_sender(&source);
475 let provider = StreamingTableProvider::new("events", source);
476 ctx.register_table("events", Arc::new(provider)).unwrap();
477
478 sender
479 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
480 .await
481 .unwrap();
482 drop(sender);
483
484 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
486
487 let result = df.collect().await;
489 assert!(
490 result.is_err(),
491 "Aggregation on unbounded stream should fail"
492 );
493 }
494
495 #[tokio::test]
496 async fn test_query_with_order_by() {
497 let ctx = create_streaming_context();
498 let schema = test_schema();
499
500 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
501 let sender = take_test_sender(&source);
502 let provider = StreamingTableProvider::new("events", source);
503 ctx.register_table("events", Arc::new(provider)).unwrap();
504
505 sender
506 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
507 .await
508 .unwrap();
509 drop(sender);
510
511 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
513 let batches = df.collect().await.unwrap();
514
515 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
517 assert_eq!(total_rows, 3);
518 }
519
520 #[tokio::test]
521 async fn test_bridge_throughput() {
522 let schema = test_schema();
524 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
525 let sender = bridge.sender();
526 let mut stream = bridge.into_stream();
527
528 let batch_count = 1000;
529 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
530
531 let send_task = tokio::spawn(async move {
533 for _ in 0..batch_count {
534 sender.send(batch.clone()).await.unwrap();
535 }
536 });
537
538 let mut received = 0;
540 while let Some(result) = stream.next().await {
541 result.unwrap();
542 received += 1;
543 if received == batch_count {
544 break;
545 }
546 }
547
548 send_task.await.unwrap();
549 assert_eq!(received, batch_count);
550 }
551
552 #[test]
555 fn test_streaming_functions_registered() {
556 let ctx = create_streaming_context();
557 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
559 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
560 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
561 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
562 }
563
564 #[test]
565 fn test_streaming_functions_with_watermark() {
566 use std::sync::atomic::AtomicI64;
567
568 let ctx = create_session_context();
569 let wm = Arc::new(AtomicI64::new(42_000));
570 register_streaming_functions_with_watermark(&ctx, wm);
571
572 assert!(ctx.udf("tumble").is_ok());
573 assert!(ctx.udf("watermark").is_ok());
574 }
575
576 #[tokio::test]
577 async fn test_tumble_udf_via_datafusion() {
578 use arrow_array::TimestampMillisecondArray;
579 use arrow_schema::TimeUnit;
580
581 let ctx = create_streaming_context();
582
583 let schema = Arc::new(Schema::new(vec![
585 Field::new(
586 "event_time",
587 DataType::Timestamp(TimeUnit::Millisecond, None),
588 false,
589 ),
590 Field::new("value", DataType::Float64, false),
591 ]));
592
593 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
594 let sender = take_test_sender(&source);
595 let provider = StreamingTableProvider::new("events", source);
596 ctx.register_table("events", Arc::new(provider)).unwrap();
597
598 let batch = RecordBatch::try_new(
602 Arc::clone(&schema),
603 vec![
604 Arc::new(TimestampMillisecondArray::from(vec![
605 60_000i64, 120_000, 360_000,
606 ])),
607 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
608 ],
609 )
610 .unwrap();
611 sender.send(batch).await.unwrap();
612 drop(sender);
613
614 let df = ctx
617 .sql(
618 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
619 value \
620 FROM events",
621 )
622 .await
623 .unwrap();
624
625 let batches = df.collect().await.unwrap();
626 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
627 assert_eq!(total_rows, 3);
628
629 let ws_col = batches[0]
631 .column(0)
632 .as_any()
633 .downcast_ref::<TimestampMillisecondArray>()
634 .expect("window_start should be TimestampMillisecond");
635 assert_eq!(ws_col.value(0), 0);
637 assert_eq!(ws_col.value(1), 0);
638 assert_eq!(ws_col.value(2), 300_000);
640 }
641
642 #[tokio::test]
643 async fn test_logical_plan_from_windowed_query() {
644 use arrow_schema::TimeUnit;
645
646 let ctx = create_streaming_context();
647
648 let schema = Arc::new(Schema::new(vec![
649 Field::new(
650 "event_time",
651 DataType::Timestamp(TimeUnit::Millisecond, None),
652 false,
653 ),
654 Field::new("value", DataType::Float64, false),
655 ]));
656
657 let source = Arc::new(ChannelStreamSource::new(schema));
658 let _sender = source.take_sender();
659 let provider = StreamingTableProvider::new("events", source);
660 ctx.register_table("events", Arc::new(provider)).unwrap();
661
662 let df = ctx
664 .sql(
665 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
666 COUNT(*) as cnt \
667 FROM events \
668 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
669 )
670 .await;
671
672 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
674 }
675
676 #[tokio::test]
677 async fn test_end_to_end_execute_streaming_sql() {
678 use crate::planner::StreamingPlanner;
679
680 let ctx = create_streaming_context();
681
682 let schema = Arc::new(Schema::new(vec![
683 Field::new("id", DataType::Int64, false),
684 Field::new("name", DataType::Utf8, true),
685 ]));
686
687 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
688 let sender = take_test_sender(&source);
689 let provider = StreamingTableProvider::new("items", source);
690 ctx.register_table("items", Arc::new(provider)).unwrap();
691
692 let batch = RecordBatch::try_new(
693 Arc::clone(&schema),
694 vec![
695 Arc::new(Int64Array::from(vec![1, 2, 3])),
696 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
697 ],
698 )
699 .unwrap();
700 sender.send(batch).await.unwrap();
701 drop(sender);
702
703 let mut planner = StreamingPlanner::new();
704 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
705 .await
706 .unwrap();
707
708 match result {
709 StreamingSqlResult::Query(qr) => {
710 let mut stream = qr.stream;
711 let mut total = 0;
712 while let Some(batch) = stream.next().await {
713 total += batch.unwrap().num_rows();
714 }
715 assert_eq!(total, 2); }
717 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
718 }
719 }
720
721 #[tokio::test]
722 async fn test_watermark_function_in_filter() {
723 use arrow_array::TimestampMillisecondArray;
724 use arrow_schema::TimeUnit;
725 use std::sync::atomic::AtomicI64;
726
727 let config = base_session_config()
729 .with_batch_size(8192)
730 .with_target_partitions(1);
731 let ctx = SessionContext::new_with_config(config);
732 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
734
735 let schema = Arc::new(Schema::new(vec![
736 Field::new(
737 "event_time",
738 DataType::Timestamp(TimeUnit::Millisecond, None),
739 false,
740 ),
741 Field::new("value", DataType::Float64, false),
742 ]));
743
744 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
745 let sender = take_test_sender(&source);
746 let provider = StreamingTableProvider::new("events", source);
747 ctx.register_table("events", Arc::new(provider)).unwrap();
748
749 let batch = RecordBatch::try_new(
751 Arc::clone(&schema),
752 vec![
753 Arc::new(TimestampMillisecondArray::from(vec![
754 100_000i64, 200_000, 300_000,
755 ])),
756 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
757 ],
758 )
759 .unwrap();
760 sender.send(batch).await.unwrap();
761 drop(sender);
762
763 let df = ctx
765 .sql("SELECT value FROM events WHERE event_time > watermark()")
766 .await
767 .unwrap();
768 let batches = df.collect().await.unwrap();
769 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
770 assert_eq!(total_rows, 1);
772 }
773
774 #[tokio::test]
775 async fn test_date_trunc_available() {
776 let ctx = create_streaming_context();
777 let df = ctx
778 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
779 .await
780 .unwrap();
781 let batches = df.collect().await.unwrap();
782 assert_eq!(batches.len(), 1);
783 assert_eq!(batches[0].num_rows(), 1);
784 }
785
786 #[tokio::test]
787 async fn test_date_bin_available() {
788 let ctx = create_streaming_context();
789 let df = ctx
790 .sql(
791 "SELECT date_bin(\
792 INTERVAL '15 minutes', \
793 TIMESTAMP '2026-01-15 14:32:00', \
794 TIMESTAMP '2026-01-01 00:00:00')",
795 )
796 .await
797 .unwrap();
798 let batches = df.collect().await.unwrap();
799 assert_eq!(batches.len(), 1);
800 assert_eq!(batches[0].num_rows(), 1);
801 }
802
803 #[tokio::test]
804 async fn test_unnest_literal_array() {
805 let ctx = create_streaming_context();
806 let df = ctx
807 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
808 .await
809 .unwrap();
810 let batches = df.collect().await.unwrap();
811 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
812 assert_eq!(total_rows, 3);
813 }
814
815 #[tokio::test]
816 async fn test_unnest_from_table_with_array_col() {
817 let ctx = create_streaming_context();
818 ctx.sql(
820 "CREATE TABLE arr_table (id INT, tags INT[]) \
821 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
822 )
823 .await
824 .unwrap();
825 let df = ctx
826 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
827 .await
828 .unwrap();
829 let batches = df.collect().await.unwrap();
830 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
831 assert_eq!(total_rows, 3);
833 }
834}