Skip to main content

laminar_sql/datafusion/
mod.rs

1//! DataFusion integration for SQL processing.
2
3mod bridge;
4mod channel_source;
5/// Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3)
6pub mod complex_type_lambda;
7/// Array, Struct, and Map scalar UDFs (F-SCHEMA-015)
8pub mod complex_type_udf;
9mod exec;
10/// End-to-end streaming SQL execution
11pub mod execute;
12/// Format bridge UDFs for inline format conversion
13pub mod format_bridge_udf;
14/// LaminarDB streaming JSON extension UDFs (F-SCHEMA-013)
15pub mod json_extensions;
16/// SQL/JSON path query compiler and scalar UDFs
17pub mod json_path;
18/// JSON table-valued functions (array/object expansion)
19pub mod json_tvf;
20/// JSONB binary format types for JSON UDF evaluation
21pub mod json_types;
22/// PostgreSQL-compatible JSON aggregate UDAFs
23pub mod json_udaf;
24/// PostgreSQL-compatible JSON scalar UDFs
25pub mod json_udf;
26/// Live source provider for streaming execution with plan caching
27pub mod live_source;
28/// Lookup join plan node for DataFusion.
29pub mod lookup_join;
30/// Physical execution plan and extension planner for lookup joins.
31pub mod lookup_join_exec;
32/// Processing-time UDF for `PROCTIME()` support
33pub mod proctime_udf;
34mod source;
35mod table_provider;
36/// Dynamic watermark filter for scan-level late-data pruning
37/// Watermark UDF for current watermark access
38pub mod watermark_udf;
39/// Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
40pub mod window_udf;
41
42pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
43pub use channel_source::ChannelStreamSource;
44pub use complex_type_lambda::{
45    register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
46    MapTransformValues,
47};
48pub use complex_type_udf::{
49    register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
50    StructExtract, StructMerge, StructRename, StructSet,
51};
52pub use exec::StreamingScanExec;
53pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
54pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
55pub use json_extensions::{
56    register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
57    JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
58};
59pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
60pub use json_tvf::{
61    register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
62    JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
63};
64pub use json_udaf::{JsonAgg, JsonObjectAgg};
65pub use json_udf::{
66    JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
67    JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
68    JsonbGetText, JsonbGetTextIdx, ToJsonb,
69};
70pub use live_source::{LiveSourceHandle, LiveSourceProvider};
71pub use lookup_join_exec::{
72    LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
73    PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
74    VersionedLookupState,
75};
76pub use proctime_udf::ProcTimeUdf;
77pub use source::{SortColumn, StreamSource, StreamSourceRef};
78pub use table_provider::StreamingTableProvider;
79pub use watermark_udf::WatermarkUdf;
80pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
81
82use std::sync::atomic::AtomicI64;
83use std::sync::Arc;
84
85use datafusion::execution::SessionStateBuilder;
86use datafusion::prelude::*;
87use datafusion_expr::ScalarUDF;
88
89use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
90
91/// Returns a base `SessionConfig` with identifier normalization disabled.
92///
93/// DataFusion's default behaviour lowercases all unquoted SQL identifiers
94/// (per the SQL standard). LaminarDB disables this so that mixed-case
95/// column names from external sources (Kafka, CDC, WebSocket) can be
96/// referenced without double-quoting.
97#[must_use]
98pub fn base_session_config() -> SessionConfig {
99    let mut config = SessionConfig::new();
100    config.options_mut().sql_parser.enable_ident_normalization = false;
101    // Single partition for streaming micro-batch execution. Multi-partition
102    // plans contain stateful operators (RepartitionExec) that cannot be
103    // reused across cycles, causing panics on cached physical plans.
104    config = config.with_target_partitions(1);
105    config
106}
107
108/// Creates a `DataFusion` session context with identifier normalization
109/// disabled.
110///
111/// Suitable for ad-hoc / non-streaming queries (filters, lookups).
112/// For streaming workloads prefer [`create_streaming_context`].
113#[must_use]
114pub fn create_session_context() -> SessionContext {
115    SessionContext::new_with_config(base_session_config())
116}
117
118/// Creates a `DataFusion` session context configured for streaming queries.
119///
120/// The context is configured with:
121/// - Batch size of 8192 (balanced for streaming throughput)
122/// - Single partition (streaming sources are typically not partitioned)
123/// - Identifier normalization disabled (mixed-case columns work unquoted)
124/// - All streaming UDFs registered (TUMBLE, HOP, SESSION, WATERMARK)
125/// - `StreamingPhysicalValidator` in `Reject` mode (blocks unsafe plans)
126///
127/// The watermark UDF is initialized with no watermark set (returns NULL).
128/// Use [`register_streaming_functions_with_watermark`] to provide a live
129/// watermark source.
130///
131/// # Example
132///
133/// ```rust,ignore
134/// let ctx = create_streaming_context();
135/// ctx.register_table("events", provider)?;
136/// let df = ctx.sql("SELECT * FROM events").await?;
137/// ```
138#[must_use]
139pub fn create_streaming_context() -> SessionContext {
140    create_streaming_context_with_validator(StreamingValidatorMode::Reject)
141}
142
143/// Creates a streaming context with a configurable validator mode.
144///
145/// Same as [`create_streaming_context`] but allows choosing how the
146/// [`StreamingPhysicalValidator`] handles plan violations.
147///
148/// Use [`StreamingValidatorMode::Off`] to get the previous behaviour
149/// (no plan-time validation).
150#[must_use]
151pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
152    let config = base_session_config().with_batch_size(8192);
153
154    let ctx = if matches!(mode, StreamingValidatorMode::Off) {
155        SessionContext::new_with_config(config)
156    } else {
157        // Build a default state to get the standard optimizer rules, then
158        // prepend our streaming validator so it fires before DataFusion's
159        // built-in SanityCheckPlan (which produces generic error messages).
160        let default_state = SessionStateBuilder::new()
161            .with_config(config.clone())
162            .with_default_features()
163            .build();
164        let mut rules: Vec<
165            Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
166        > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
167        rules.extend(default_state.physical_optimizers().iter().cloned());
168
169        let state = SessionStateBuilder::new()
170            .with_config(config)
171            .with_default_features()
172            .with_physical_optimizer_rules(rules)
173            .build();
174        SessionContext::new_with_state(state)
175    };
176
177    register_streaming_functions(&ctx);
178    ctx
179}
180
181/// Registers `LaminarDB` streaming UDFs with a session context.
182///
183/// Registers the following scalar functions:
184/// - `tumble(timestamp, interval)` — tumbling window start
185/// - `hop(timestamp, slide, size)` — hopping window start
186/// - `session(timestamp, gap)` — session window pass-through
187/// - `watermark()` — current watermark (returns NULL, no live source)
188///
189/// Use [`register_streaming_functions_with_watermark`] to provide a
190/// live watermark source from Ring 0.
191pub fn register_streaming_functions(ctx: &SessionContext) {
192    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
193    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
194    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
195    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
196    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
197    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
198    register_json_functions(ctx);
199    register_json_extensions(ctx);
200    register_complex_type_functions(ctx);
201    register_lambda_functions(ctx);
202}
203
204/// Registers streaming UDFs with a live watermark source.
205///
206/// Same as [`register_streaming_functions`] but connects the `watermark()`
207/// UDF to a shared atomic value that Ring 0 updates in real time.
208///
209/// # Arguments
210///
211/// * `ctx` - `DataFusion` session context
212/// * `watermark_ms` - Shared atomic holding the current watermark in
213///   milliseconds since epoch. Values < 0 mean "no watermark" (returns NULL).
214pub fn register_streaming_functions_with_watermark(
215    ctx: &SessionContext,
216    watermark_ms: Arc<AtomicI64>,
217) {
218    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
219    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
220    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
221    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
222    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
223    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
224    register_json_functions(ctx);
225    register_json_extensions(ctx);
226    register_complex_type_functions(ctx);
227    register_lambda_functions(ctx);
228}
229
230/// Registers all PostgreSQL-compatible JSON UDFs and UDAFs
231/// with the given `SessionContext`.
232pub fn register_json_functions(ctx: &SessionContext) {
233    // Extraction operators
234    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
235    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
236    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
237    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
238    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
239    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
240
241    // Existence operators
242    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
243    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
244    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
245
246    // Containment operators
247    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
248    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
249
250    // Interrogation / construction
251    ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
252    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
253    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
254    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
255
256    // Aggregates
257    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
258    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
259        JsonObjectAgg::new(),
260    ));
261
262    // Format bridge functions
263    ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
264    ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
265    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
266    ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
267
268    // JSON path query functions (scalar)
269    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
270    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
271
272    // JSON table-valued functions
273    register_json_table_functions(ctx);
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use arrow_array::{Float64Array, Int64Array, RecordBatch};
280    use arrow_schema::{DataType, Field, Schema};
281    use datafusion::execution::FunctionRegistry;
282    use futures::StreamExt;
283    use std::sync::Arc;
284
285    fn test_schema() -> Arc<Schema> {
286        Arc::new(Schema::new(vec![
287            Field::new("id", DataType::Int64, false),
288            Field::new("value", DataType::Float64, true),
289        ]))
290    }
291
292    /// Take the sender from a `ChannelStreamSource`, panicking if already taken.
293    fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
294        source.take_sender().expect("sender already taken")
295    }
296
297    fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
298        RecordBatch::try_new(
299            Arc::clone(schema),
300            vec![
301                Arc::new(Int64Array::from(ids)),
302                Arc::new(Float64Array::from(values)),
303            ],
304        )
305        .unwrap()
306    }
307
308    #[test]
309    fn test_create_streaming_context() {
310        let ctx = create_streaming_context();
311        let state = ctx.state();
312        let config = state.config();
313
314        assert_eq!(config.batch_size(), 8192);
315        assert_eq!(config.target_partitions(), 1);
316    }
317
318    #[tokio::test]
319    async fn test_full_query_pipeline() {
320        let ctx = create_streaming_context();
321        let schema = test_schema();
322
323        // Create source and take the sender (important for channel closure)
324        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
325        let sender = take_test_sender(&source);
326        let provider = StreamingTableProvider::new("events", source);
327        ctx.register_table("events", Arc::new(provider)).unwrap();
328
329        // Send test data
330        sender
331            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
332            .await
333            .unwrap();
334        sender
335            .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
336            .await
337            .unwrap();
338        drop(sender); // Close the channel
339
340        // Execute query
341        let df = ctx.sql("SELECT * FROM events").await.unwrap();
342        let batches = df.collect().await.unwrap();
343
344        // Verify results
345        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
346        assert_eq!(total_rows, 5);
347    }
348
349    #[tokio::test]
350    async fn test_query_with_projection() {
351        let ctx = create_streaming_context();
352        let schema = test_schema();
353
354        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
355        let sender = take_test_sender(&source);
356        let provider = StreamingTableProvider::new("events", source);
357        ctx.register_table("events", Arc::new(provider)).unwrap();
358
359        sender
360            .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
361            .await
362            .unwrap();
363        drop(sender);
364
365        // Query only the id column
366        let df = ctx.sql("SELECT id FROM events").await.unwrap();
367        let batches = df.collect().await.unwrap();
368
369        assert_eq!(batches.len(), 1);
370        assert_eq!(batches[0].num_columns(), 1);
371        assert_eq!(batches[0].schema().field(0).name(), "id");
372    }
373
374    #[tokio::test]
375    async fn test_query_with_filter() {
376        let ctx = create_streaming_context();
377        let schema = test_schema();
378
379        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
380        let sender = take_test_sender(&source);
381        let provider = StreamingTableProvider::new("events", source);
382        ctx.register_table("events", Arc::new(provider)).unwrap();
383
384        sender
385            .send(test_batch(
386                &schema,
387                vec![1, 2, 3, 4, 5],
388                vec![10.0, 20.0, 30.0, 40.0, 50.0],
389            ))
390            .await
391            .unwrap();
392        drop(sender);
393
394        // Filter for value > 25
395        let df = ctx
396            .sql("SELECT * FROM events WHERE value > 25")
397            .await
398            .unwrap();
399        let batches = df.collect().await.unwrap();
400
401        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
402        assert_eq!(total_rows, 3); // 30, 40, 50
403    }
404
405    #[tokio::test]
406    async fn test_unbounded_aggregation_rejected() {
407        // Aggregations on unbounded streams should be rejected by `DataFusion`.
408        // Streaming aggregations require windows, which are implemented.
409        let ctx = create_streaming_context();
410        let schema = test_schema();
411
412        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
413        let sender = take_test_sender(&source);
414        let provider = StreamingTableProvider::new("events", source);
415        ctx.register_table("events", Arc::new(provider)).unwrap();
416
417        sender
418            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
419            .await
420            .unwrap();
421        drop(sender);
422
423        // Aggregate query on unbounded stream should fail at execution
424        let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
425
426        // Execution should fail because we can't aggregate an infinite stream
427        let result = df.collect().await;
428        assert!(
429            result.is_err(),
430            "Aggregation on unbounded stream should fail"
431        );
432    }
433
434    #[tokio::test]
435    async fn test_query_with_order_by() {
436        let ctx = create_streaming_context();
437        let schema = test_schema();
438
439        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
440        let sender = take_test_sender(&source);
441        let provider = StreamingTableProvider::new("events", source);
442        ctx.register_table("events", Arc::new(provider)).unwrap();
443
444        sender
445            .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
446            .await
447            .unwrap();
448        drop(sender);
449
450        // Query with ORDER BY (`DataFusion` handles this with Sort operator)
451        let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
452        let batches = df.collect().await.unwrap();
453
454        // Verify we got results (ordering may vary due to streaming nature)
455        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
456        assert_eq!(total_rows, 3);
457    }
458
459    #[tokio::test]
460    async fn test_bridge_throughput() {
461        // Benchmark-style test for bridge performance
462        let schema = test_schema();
463        let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
464        let sender = bridge.sender();
465        let mut stream = bridge.into_stream();
466
467        let batch_count = 1000;
468        let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
469
470        // Spawn sender task
471        let send_task = tokio::spawn(async move {
472            for _ in 0..batch_count {
473                sender.send(batch.clone()).await.unwrap();
474            }
475        });
476
477        // Receive all batches
478        let mut received = 0;
479        while let Some(result) = stream.next().await {
480            result.unwrap();
481            received += 1;
482            if received == batch_count {
483                break;
484            }
485        }
486
487        send_task.await.unwrap();
488        assert_eq!(received, batch_count);
489    }
490
491    // ── Integration Tests ──────────────────────────────────────────
492
493    #[test]
494    fn test_streaming_functions_registered() {
495        let ctx = create_streaming_context();
496        // Verify all 4 UDFs are registered
497        assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
498        assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
499        assert!(ctx.udf("session").is_ok(), "session UDF not registered");
500        assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
501    }
502
503    #[test]
504    fn test_streaming_functions_with_watermark() {
505        use std::sync::atomic::AtomicI64;
506
507        let ctx = create_session_context();
508        let wm = Arc::new(AtomicI64::new(42_000));
509        register_streaming_functions_with_watermark(&ctx, wm);
510
511        assert!(ctx.udf("tumble").is_ok());
512        assert!(ctx.udf("watermark").is_ok());
513    }
514
515    #[tokio::test]
516    async fn test_tumble_udf_via_datafusion() {
517        use arrow_array::TimestampMillisecondArray;
518        use arrow_schema::TimeUnit;
519
520        let ctx = create_streaming_context();
521
522        // Create schema with timestamp and value columns
523        let schema = Arc::new(Schema::new(vec![
524            Field::new(
525                "event_time",
526                DataType::Timestamp(TimeUnit::Millisecond, None),
527                false,
528            ),
529            Field::new("value", DataType::Float64, false),
530        ]));
531
532        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
533        let sender = take_test_sender(&source);
534        let provider = StreamingTableProvider::new("events", source);
535        ctx.register_table("events", Arc::new(provider)).unwrap();
536
537        // Send events across two 5-minute windows:
538        // Window [0, 300_000): timestamps 60_000, 120_000
539        // Window [300_000, 600_000): timestamps 360_000
540        let batch = RecordBatch::try_new(
541            Arc::clone(&schema),
542            vec![
543                Arc::new(TimestampMillisecondArray::from(vec![
544                    60_000i64, 120_000, 360_000,
545                ])),
546                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
547            ],
548        )
549        .unwrap();
550        sender.send(batch).await.unwrap();
551        drop(sender);
552
553        // Verify the tumble UDF computes correct window starts via DataFusion
554        // (GROUP BY aggregation and ORDER BY on unbounded streams are handled by Ring 0)
555        let df = ctx
556            .sql(
557                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
558                 value \
559                 FROM events",
560            )
561            .await
562            .unwrap();
563
564        let batches = df.collect().await.unwrap();
565        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
566        assert_eq!(total_rows, 3);
567
568        // Verify the window_start values (single batch, order preserved)
569        let ws_col = batches[0]
570            .column(0)
571            .as_any()
572            .downcast_ref::<TimestampMillisecondArray>()
573            .expect("window_start should be TimestampMillisecond");
574        // 60_000 and 120_000 → window [0, 300_000), start = 0
575        assert_eq!(ws_col.value(0), 0);
576        assert_eq!(ws_col.value(1), 0);
577        // 360_000 → window [300_000, 600_000), start = 300_000
578        assert_eq!(ws_col.value(2), 300_000);
579    }
580
581    #[tokio::test]
582    async fn test_logical_plan_from_windowed_query() {
583        use arrow_schema::TimeUnit;
584
585        let ctx = create_streaming_context();
586
587        let schema = Arc::new(Schema::new(vec![
588            Field::new(
589                "event_time",
590                DataType::Timestamp(TimeUnit::Millisecond, None),
591                false,
592            ),
593            Field::new("value", DataType::Float64, false),
594        ]));
595
596        let source = Arc::new(ChannelStreamSource::new(schema));
597        let _sender = source.take_sender();
598        let provider = StreamingTableProvider::new("events", source);
599        ctx.register_table("events", Arc::new(provider)).unwrap();
600
601        // Create a LogicalPlan for a windowed query
602        let df = ctx
603            .sql(
604                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
605                 COUNT(*) as cnt \
606                 FROM events \
607                 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
608            )
609            .await;
610
611        // Should succeed in creating the logical plan (UDFs are registered)
612        assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
613    }
614
615    #[tokio::test]
616    async fn test_end_to_end_execute_streaming_sql() {
617        use crate::planner::StreamingPlanner;
618
619        let ctx = create_streaming_context();
620
621        let schema = Arc::new(Schema::new(vec![
622            Field::new("id", DataType::Int64, false),
623            Field::new("name", DataType::Utf8, true),
624        ]));
625
626        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
627        let sender = take_test_sender(&source);
628        let provider = StreamingTableProvider::new("items", source);
629        ctx.register_table("items", Arc::new(provider)).unwrap();
630
631        let batch = RecordBatch::try_new(
632            Arc::clone(&schema),
633            vec![
634                Arc::new(Int64Array::from(vec![1, 2, 3])),
635                Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
636            ],
637        )
638        .unwrap();
639        sender.send(batch).await.unwrap();
640        drop(sender);
641
642        let mut planner = StreamingPlanner::new();
643        let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
644            .await
645            .unwrap();
646
647        match result {
648            StreamingSqlResult::Query(qr) => {
649                let mut stream = qr.stream;
650                let mut total = 0;
651                while let Some(batch) = stream.next().await {
652                    total += batch.unwrap().num_rows();
653                }
654                assert_eq!(total, 2); // id=2, id=3
655            }
656            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
657        }
658    }
659
660    #[tokio::test]
661    async fn test_watermark_function_in_filter() {
662        use arrow_array::TimestampMillisecondArray;
663        use arrow_schema::TimeUnit;
664        use std::sync::atomic::AtomicI64;
665
666        // Create context with a specific watermark value
667        let config = base_session_config()
668            .with_batch_size(8192)
669            .with_target_partitions(1);
670        let ctx = SessionContext::new_with_config(config);
671        let wm = Arc::new(AtomicI64::new(200_000)); // watermark at 200s
672        register_streaming_functions_with_watermark(&ctx, wm);
673
674        let schema = Arc::new(Schema::new(vec![
675            Field::new(
676                "event_time",
677                DataType::Timestamp(TimeUnit::Millisecond, None),
678                false,
679            ),
680            Field::new("value", DataType::Float64, false),
681        ]));
682
683        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
684        let sender = take_test_sender(&source);
685        let provider = StreamingTableProvider::new("events", source);
686        ctx.register_table("events", Arc::new(provider)).unwrap();
687
688        // Events: 100s, 200s, 300s - watermark is at 200s
689        let batch = RecordBatch::try_new(
690            Arc::clone(&schema),
691            vec![
692                Arc::new(TimestampMillisecondArray::from(vec![
693                    100_000i64, 200_000, 300_000,
694                ])),
695                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
696            ],
697        )
698        .unwrap();
699        sender.send(batch).await.unwrap();
700        drop(sender);
701
702        // Filter events after watermark
703        let df = ctx
704            .sql("SELECT value FROM events WHERE event_time > watermark()")
705            .await
706            .unwrap();
707        let batches = df.collect().await.unwrap();
708        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
709        // Only event at 300s is after watermark (200s)
710        assert_eq!(total_rows, 1);
711    }
712
713    #[tokio::test]
714    async fn test_date_trunc_available() {
715        let ctx = create_streaming_context();
716        let df = ctx
717            .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
718            .await
719            .unwrap();
720        let batches = df.collect().await.unwrap();
721        assert_eq!(batches.len(), 1);
722        assert_eq!(batches[0].num_rows(), 1);
723    }
724
725    #[tokio::test]
726    async fn test_date_bin_available() {
727        let ctx = create_streaming_context();
728        let df = ctx
729            .sql(
730                "SELECT date_bin(\
731                 INTERVAL '15 minutes', \
732                 TIMESTAMP '2026-01-15 14:32:00', \
733                 TIMESTAMP '2026-01-01 00:00:00')",
734            )
735            .await
736            .unwrap();
737        let batches = df.collect().await.unwrap();
738        assert_eq!(batches.len(), 1);
739        assert_eq!(batches[0].num_rows(), 1);
740    }
741
742    #[tokio::test]
743    async fn test_unnest_literal_array() {
744        let ctx = create_streaming_context();
745        let df = ctx
746            .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
747            .await
748            .unwrap();
749        let batches = df.collect().await.unwrap();
750        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
751        assert_eq!(total_rows, 3);
752    }
753
754    #[tokio::test]
755    async fn test_unnest_from_table_with_array_col() {
756        let ctx = create_streaming_context();
757        // Register a table with an array column
758        ctx.sql(
759            "CREATE TABLE arr_table (id INT, tags INT[]) \
760             AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
761        )
762        .await
763        .unwrap();
764        let df = ctx
765            .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
766            .await
767            .unwrap();
768        let batches = df.collect().await.unwrap();
769        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
770        // Row 1: [10,20] → 2 rows, Row 2: [30] → 1 row = 3 total
771        assert_eq!(total_rows, 3);
772    }
773}