Skip to main content

laminar_sql/datafusion/
mod.rs

1//! DataFusion integration for SQL processing.
2
3mod bridge;
4mod channel_source;
5/// Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3)
6pub mod complex_type_lambda;
7/// Array, Struct, and Map scalar UDFs (F-SCHEMA-015)
8pub mod complex_type_udf;
9mod exec;
10/// End-to-end streaming SQL execution
11pub mod execute;
12/// Format bridge UDFs for inline format conversion
13pub mod format_bridge_udf;
14/// LaminarDB streaming JSON extension UDFs (F-SCHEMA-013)
15pub mod json_extensions;
16/// SQL/JSON path query compiler and scalar UDFs
17pub mod json_path;
18/// JSON table-valued functions (array/object expansion)
19pub mod json_tvf;
20/// JSONB binary format types for JSON UDF evaluation
21pub mod json_types;
22/// PostgreSQL-compatible JSON aggregate UDAFs
23pub mod json_udaf;
24/// PostgreSQL-compatible JSON scalar UDFs
25pub mod json_udf;
26/// Live source provider for streaming execution with plan caching
27pub mod live_source;
28/// Lookup join plan node for DataFusion.
29pub mod lookup_join;
30/// Physical execution plan and extension planner for lookup joins.
31pub mod lookup_join_exec;
32/// Processing-time UDF for `PROCTIME()` support
33pub mod proctime_udf;
34mod source;
35mod table_provider;
36/// Dynamic watermark filter for scan-level late-data pruning
37pub mod watermark_filter;
38/// Watermark UDF for current watermark access
39pub mod watermark_udf;
40/// Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
41pub mod window_udf;
42
43pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
44pub use channel_source::ChannelStreamSource;
45pub use complex_type_lambda::{
46    register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
47    MapTransformValues,
48};
49pub use complex_type_udf::{
50    register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
51    StructExtract, StructMerge, StructRename, StructSet,
52};
53pub use exec::StreamingScanExec;
54pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
55pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
56pub use json_extensions::{
57    register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
58    JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
59};
60pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
61pub use json_tvf::{
62    register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
63    JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
64};
65pub use json_udaf::{JsonAgg, JsonObjectAgg};
66pub use json_udf::{
67    JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
68    JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
69    JsonbGetText, JsonbGetTextIdx, ToJsonb,
70};
71pub use live_source::{LiveSourceHandle, LiveSourceProvider};
72pub use lookup_join_exec::{
73    LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
74    PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
75    VersionedLookupState,
76};
77pub use proctime_udf::ProcTimeUdf;
78pub use source::{SortColumn, StreamSource, StreamSourceRef};
79pub use table_provider::StreamingTableProvider;
80pub use watermark_filter::WatermarkDynamicFilter;
81pub use watermark_udf::WatermarkUdf;
82pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
83
84use std::sync::atomic::AtomicI64;
85use std::sync::Arc;
86
87use datafusion::execution::SessionStateBuilder;
88use datafusion::prelude::*;
89use datafusion_expr::ScalarUDF;
90
91use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
92
93/// Returns a base `SessionConfig` with identifier normalization disabled.
94///
95/// DataFusion's default behaviour lowercases all unquoted SQL identifiers
96/// (per the SQL standard). LaminarDB disables this so that mixed-case
97/// column names from external sources (Kafka, CDC, WebSocket) can be
98/// referenced without double-quoting.
99#[must_use]
100pub fn base_session_config() -> SessionConfig {
101    let mut config = SessionConfig::new();
102    config.options_mut().sql_parser.enable_ident_normalization = false;
103    // Single partition for streaming micro-batch execution. Multi-partition
104    // plans contain stateful operators (RepartitionExec) that cannot be
105    // reused across cycles, causing panics on cached physical plans.
106    config = config.with_target_partitions(1);
107    config
108}
109
110/// Creates a `DataFusion` session context with identifier normalization
111/// disabled.
112///
113/// Suitable for ad-hoc / non-streaming queries (filters, lookups).
114/// For streaming workloads prefer [`create_streaming_context`].
115#[must_use]
116pub fn create_session_context() -> SessionContext {
117    SessionContext::new_with_config(base_session_config())
118}
119
120/// Creates a `DataFusion` session context configured for streaming queries.
121///
122/// The context is configured with:
123/// - Batch size of 8192 (balanced for streaming throughput)
124/// - Single partition (streaming sources are typically not partitioned)
125/// - Identifier normalization disabled (mixed-case columns work unquoted)
126/// - All streaming UDFs registered (TUMBLE, HOP, SESSION, WATERMARK)
127/// - `StreamingPhysicalValidator` in `Reject` mode (blocks unsafe plans)
128///
129/// The watermark UDF is initialized with no watermark set (returns NULL).
130/// Use [`register_streaming_functions_with_watermark`] to provide a live
131/// watermark source.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// let ctx = create_streaming_context();
137/// ctx.register_table("events", provider)?;
138/// let df = ctx.sql("SELECT * FROM events").await?;
139/// ```
140#[must_use]
141pub fn create_streaming_context() -> SessionContext {
142    create_streaming_context_with_validator(StreamingValidatorMode::Reject)
143}
144
145/// Creates a streaming context with a configurable validator mode.
146///
147/// Same as [`create_streaming_context`] but allows choosing how the
148/// [`StreamingPhysicalValidator`] handles plan violations.
149///
150/// Use [`StreamingValidatorMode::Off`] to get the previous behaviour
151/// (no plan-time validation).
152#[must_use]
153pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
154    let config = base_session_config().with_batch_size(8192);
155
156    let ctx = if matches!(mode, StreamingValidatorMode::Off) {
157        SessionContext::new_with_config(config)
158    } else {
159        // Build a default state to get the standard optimizer rules, then
160        // prepend our streaming validator so it fires before DataFusion's
161        // built-in SanityCheckPlan (which produces generic error messages).
162        let default_state = SessionStateBuilder::new()
163            .with_config(config.clone())
164            .with_default_features()
165            .build();
166        let mut rules: Vec<
167            Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
168        > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
169        rules.extend(default_state.physical_optimizers().iter().cloned());
170
171        let state = SessionStateBuilder::new()
172            .with_config(config)
173            .with_default_features()
174            .with_physical_optimizer_rules(rules)
175            .build();
176        SessionContext::new_with_state(state)
177    };
178
179    register_streaming_functions(&ctx);
180    ctx
181}
182
183/// Registers `LaminarDB` streaming UDFs with a session context.
184///
185/// Registers the following scalar functions:
186/// - `tumble(timestamp, interval)` — tumbling window start
187/// - `hop(timestamp, slide, size)` — hopping window start
188/// - `session(timestamp, gap)` — session window pass-through
189/// - `watermark()` — current watermark (returns NULL, no live source)
190///
191/// Use [`register_streaming_functions_with_watermark`] to provide a
192/// live watermark source from Ring 0.
193pub fn register_streaming_functions(ctx: &SessionContext) {
194    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
195    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
196    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
197    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
198    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
199    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
200    register_json_functions(ctx);
201    register_json_extensions(ctx);
202    register_complex_type_functions(ctx);
203    register_lambda_functions(ctx);
204}
205
206/// Registers streaming UDFs with a live watermark source.
207///
208/// Same as [`register_streaming_functions`] but connects the `watermark()`
209/// UDF to a shared atomic value that Ring 0 updates in real time.
210///
211/// # Arguments
212///
213/// * `ctx` - `DataFusion` session context
214/// * `watermark_ms` - Shared atomic holding the current watermark in
215///   milliseconds since epoch. Values < 0 mean "no watermark" (returns NULL).
216pub fn register_streaming_functions_with_watermark(
217    ctx: &SessionContext,
218    watermark_ms: Arc<AtomicI64>,
219) {
220    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
221    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
222    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
223    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
224    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
225    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
226    register_json_functions(ctx);
227    register_json_extensions(ctx);
228    register_complex_type_functions(ctx);
229    register_lambda_functions(ctx);
230}
231
232/// Registers all PostgreSQL-compatible JSON UDFs and UDAFs
233/// with the given `SessionContext`.
234pub fn register_json_functions(ctx: &SessionContext) {
235    // Extraction operators
236    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
237    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
238    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
239    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
240    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
241    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
242
243    // Existence operators
244    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
245    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
246    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
247
248    // Containment operators
249    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
250    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
251
252    // Interrogation / construction
253    ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
254    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
255    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
256    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
257
258    // Aggregates
259    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
260    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
261        JsonObjectAgg::new(),
262    ));
263
264    // Format bridge functions
265    ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
266    ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
267    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
268    ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
269
270    // JSON path query functions (scalar)
271    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
272    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
273
274    // JSON table-valued functions
275    register_json_table_functions(ctx);
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use arrow_array::{Float64Array, Int64Array, RecordBatch};
282    use arrow_schema::{DataType, Field, Schema};
283    use datafusion::execution::FunctionRegistry;
284    use futures::StreamExt;
285    use std::sync::Arc;
286
287    fn test_schema() -> Arc<Schema> {
288        Arc::new(Schema::new(vec![
289            Field::new("id", DataType::Int64, false),
290            Field::new("value", DataType::Float64, true),
291        ]))
292    }
293
294    /// Take the sender from a `ChannelStreamSource`, panicking if already taken.
295    fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
296        source.take_sender().expect("sender already taken")
297    }
298
299    fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
300        RecordBatch::try_new(
301            Arc::clone(schema),
302            vec![
303                Arc::new(Int64Array::from(ids)),
304                Arc::new(Float64Array::from(values)),
305            ],
306        )
307        .unwrap()
308    }
309
310    #[test]
311    fn test_create_streaming_context() {
312        let ctx = create_streaming_context();
313        let state = ctx.state();
314        let config = state.config();
315
316        assert_eq!(config.batch_size(), 8192);
317        assert_eq!(config.target_partitions(), 1);
318    }
319
320    #[tokio::test]
321    async fn test_full_query_pipeline() {
322        let ctx = create_streaming_context();
323        let schema = test_schema();
324
325        // Create source and take the sender (important for channel closure)
326        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
327        let sender = take_test_sender(&source);
328        let provider = StreamingTableProvider::new("events", source);
329        ctx.register_table("events", Arc::new(provider)).unwrap();
330
331        // Send test data
332        sender
333            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
334            .await
335            .unwrap();
336        sender
337            .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
338            .await
339            .unwrap();
340        drop(sender); // Close the channel
341
342        // Execute query
343        let df = ctx.sql("SELECT * FROM events").await.unwrap();
344        let batches = df.collect().await.unwrap();
345
346        // Verify results
347        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
348        assert_eq!(total_rows, 5);
349    }
350
351    #[tokio::test]
352    async fn test_query_with_projection() {
353        let ctx = create_streaming_context();
354        let schema = test_schema();
355
356        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
357        let sender = take_test_sender(&source);
358        let provider = StreamingTableProvider::new("events", source);
359        ctx.register_table("events", Arc::new(provider)).unwrap();
360
361        sender
362            .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
363            .await
364            .unwrap();
365        drop(sender);
366
367        // Query only the id column
368        let df = ctx.sql("SELECT id FROM events").await.unwrap();
369        let batches = df.collect().await.unwrap();
370
371        assert_eq!(batches.len(), 1);
372        assert_eq!(batches[0].num_columns(), 1);
373        assert_eq!(batches[0].schema().field(0).name(), "id");
374    }
375
376    #[tokio::test]
377    async fn test_query_with_filter() {
378        let ctx = create_streaming_context();
379        let schema = test_schema();
380
381        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
382        let sender = take_test_sender(&source);
383        let provider = StreamingTableProvider::new("events", source);
384        ctx.register_table("events", Arc::new(provider)).unwrap();
385
386        sender
387            .send(test_batch(
388                &schema,
389                vec![1, 2, 3, 4, 5],
390                vec![10.0, 20.0, 30.0, 40.0, 50.0],
391            ))
392            .await
393            .unwrap();
394        drop(sender);
395
396        // Filter for value > 25
397        let df = ctx
398            .sql("SELECT * FROM events WHERE value > 25")
399            .await
400            .unwrap();
401        let batches = df.collect().await.unwrap();
402
403        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
404        assert_eq!(total_rows, 3); // 30, 40, 50
405    }
406
407    #[tokio::test]
408    async fn test_unbounded_aggregation_rejected() {
409        // Aggregations on unbounded streams should be rejected by `DataFusion`.
410        // Streaming aggregations require windows, which are implemented.
411        let ctx = create_streaming_context();
412        let schema = test_schema();
413
414        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
415        let sender = take_test_sender(&source);
416        let provider = StreamingTableProvider::new("events", source);
417        ctx.register_table("events", Arc::new(provider)).unwrap();
418
419        sender
420            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
421            .await
422            .unwrap();
423        drop(sender);
424
425        // Aggregate query on unbounded stream should fail at execution
426        let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
427
428        // Execution should fail because we can't aggregate an infinite stream
429        let result = df.collect().await;
430        assert!(
431            result.is_err(),
432            "Aggregation on unbounded stream should fail"
433        );
434    }
435
436    #[tokio::test]
437    async fn test_query_with_order_by() {
438        let ctx = create_streaming_context();
439        let schema = test_schema();
440
441        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
442        let sender = take_test_sender(&source);
443        let provider = StreamingTableProvider::new("events", source);
444        ctx.register_table("events", Arc::new(provider)).unwrap();
445
446        sender
447            .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
448            .await
449            .unwrap();
450        drop(sender);
451
452        // Query with ORDER BY (`DataFusion` handles this with Sort operator)
453        let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
454        let batches = df.collect().await.unwrap();
455
456        // Verify we got results (ordering may vary due to streaming nature)
457        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
458        assert_eq!(total_rows, 3);
459    }
460
461    #[tokio::test]
462    async fn test_bridge_throughput() {
463        // Benchmark-style test for bridge performance
464        let schema = test_schema();
465        let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
466        let sender = bridge.sender();
467        let mut stream = bridge.into_stream();
468
469        let batch_count = 1000;
470        let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
471
472        // Spawn sender task
473        let send_task = tokio::spawn(async move {
474            for _ in 0..batch_count {
475                sender.send(batch.clone()).await.unwrap();
476            }
477        });
478
479        // Receive all batches
480        let mut received = 0;
481        while let Some(result) = stream.next().await {
482            result.unwrap();
483            received += 1;
484            if received == batch_count {
485                break;
486            }
487        }
488
489        send_task.await.unwrap();
490        assert_eq!(received, batch_count);
491    }
492
493    // ── Integration Tests ──────────────────────────────────────────
494
495    #[test]
496    fn test_streaming_functions_registered() {
497        let ctx = create_streaming_context();
498        // Verify all 4 UDFs are registered
499        assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
500        assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
501        assert!(ctx.udf("session").is_ok(), "session UDF not registered");
502        assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
503    }
504
505    #[test]
506    fn test_streaming_functions_with_watermark() {
507        use std::sync::atomic::AtomicI64;
508
509        let ctx = create_session_context();
510        let wm = Arc::new(AtomicI64::new(42_000));
511        register_streaming_functions_with_watermark(&ctx, wm);
512
513        assert!(ctx.udf("tumble").is_ok());
514        assert!(ctx.udf("watermark").is_ok());
515    }
516
517    #[tokio::test]
518    async fn test_tumble_udf_via_datafusion() {
519        use arrow_array::TimestampMillisecondArray;
520        use arrow_schema::TimeUnit;
521
522        let ctx = create_streaming_context();
523
524        // Create schema with timestamp and value columns
525        let schema = Arc::new(Schema::new(vec![
526            Field::new(
527                "event_time",
528                DataType::Timestamp(TimeUnit::Millisecond, None),
529                false,
530            ),
531            Field::new("value", DataType::Float64, false),
532        ]));
533
534        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
535        let sender = take_test_sender(&source);
536        let provider = StreamingTableProvider::new("events", source);
537        ctx.register_table("events", Arc::new(provider)).unwrap();
538
539        // Send events across two 5-minute windows:
540        // Window [0, 300_000): timestamps 60_000, 120_000
541        // Window [300_000, 600_000): timestamps 360_000
542        let batch = RecordBatch::try_new(
543            Arc::clone(&schema),
544            vec![
545                Arc::new(TimestampMillisecondArray::from(vec![
546                    60_000i64, 120_000, 360_000,
547                ])),
548                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
549            ],
550        )
551        .unwrap();
552        sender.send(batch).await.unwrap();
553        drop(sender);
554
555        // Verify the tumble UDF computes correct window starts via DataFusion
556        // (GROUP BY aggregation and ORDER BY on unbounded streams are handled by Ring 0)
557        let df = ctx
558            .sql(
559                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
560                 value \
561                 FROM events",
562            )
563            .await
564            .unwrap();
565
566        let batches = df.collect().await.unwrap();
567        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
568        assert_eq!(total_rows, 3);
569
570        // Verify the window_start values (single batch, order preserved)
571        let ws_col = batches[0]
572            .column(0)
573            .as_any()
574            .downcast_ref::<TimestampMillisecondArray>()
575            .expect("window_start should be TimestampMillisecond");
576        // 60_000 and 120_000 → window [0, 300_000), start = 0
577        assert_eq!(ws_col.value(0), 0);
578        assert_eq!(ws_col.value(1), 0);
579        // 360_000 → window [300_000, 600_000), start = 300_000
580        assert_eq!(ws_col.value(2), 300_000);
581    }
582
583    #[tokio::test]
584    async fn test_logical_plan_from_windowed_query() {
585        use arrow_schema::TimeUnit;
586
587        let ctx = create_streaming_context();
588
589        let schema = Arc::new(Schema::new(vec![
590            Field::new(
591                "event_time",
592                DataType::Timestamp(TimeUnit::Millisecond, None),
593                false,
594            ),
595            Field::new("value", DataType::Float64, false),
596        ]));
597
598        let source = Arc::new(ChannelStreamSource::new(schema));
599        let _sender = source.take_sender();
600        let provider = StreamingTableProvider::new("events", source);
601        ctx.register_table("events", Arc::new(provider)).unwrap();
602
603        // Create a LogicalPlan for a windowed query
604        let df = ctx
605            .sql(
606                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
607                 COUNT(*) as cnt \
608                 FROM events \
609                 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
610            )
611            .await;
612
613        // Should succeed in creating the logical plan (UDFs are registered)
614        assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
615    }
616
617    #[tokio::test]
618    async fn test_end_to_end_execute_streaming_sql() {
619        use crate::planner::StreamingPlanner;
620
621        let ctx = create_streaming_context();
622
623        let schema = Arc::new(Schema::new(vec![
624            Field::new("id", DataType::Int64, false),
625            Field::new("name", DataType::Utf8, true),
626        ]));
627
628        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
629        let sender = take_test_sender(&source);
630        let provider = StreamingTableProvider::new("items", source);
631        ctx.register_table("items", Arc::new(provider)).unwrap();
632
633        let batch = RecordBatch::try_new(
634            Arc::clone(&schema),
635            vec![
636                Arc::new(Int64Array::from(vec![1, 2, 3])),
637                Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
638            ],
639        )
640        .unwrap();
641        sender.send(batch).await.unwrap();
642        drop(sender);
643
644        let mut planner = StreamingPlanner::new();
645        let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
646            .await
647            .unwrap();
648
649        match result {
650            StreamingSqlResult::Query(qr) => {
651                let mut stream = qr.stream;
652                let mut total = 0;
653                while let Some(batch) = stream.next().await {
654                    total += batch.unwrap().num_rows();
655                }
656                assert_eq!(total, 2); // id=2, id=3
657            }
658            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
659        }
660    }
661
662    #[tokio::test]
663    async fn test_watermark_function_in_filter() {
664        use arrow_array::TimestampMillisecondArray;
665        use arrow_schema::TimeUnit;
666        use std::sync::atomic::AtomicI64;
667
668        // Create context with a specific watermark value
669        let config = base_session_config()
670            .with_batch_size(8192)
671            .with_target_partitions(1);
672        let ctx = SessionContext::new_with_config(config);
673        let wm = Arc::new(AtomicI64::new(200_000)); // watermark at 200s
674        register_streaming_functions_with_watermark(&ctx, wm);
675
676        let schema = Arc::new(Schema::new(vec![
677            Field::new(
678                "event_time",
679                DataType::Timestamp(TimeUnit::Millisecond, None),
680                false,
681            ),
682            Field::new("value", DataType::Float64, false),
683        ]));
684
685        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
686        let sender = take_test_sender(&source);
687        let provider = StreamingTableProvider::new("events", source);
688        ctx.register_table("events", Arc::new(provider)).unwrap();
689
690        // Events: 100s, 200s, 300s - watermark is at 200s
691        let batch = RecordBatch::try_new(
692            Arc::clone(&schema),
693            vec![
694                Arc::new(TimestampMillisecondArray::from(vec![
695                    100_000i64, 200_000, 300_000,
696                ])),
697                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
698            ],
699        )
700        .unwrap();
701        sender.send(batch).await.unwrap();
702        drop(sender);
703
704        // Filter events after watermark
705        let df = ctx
706            .sql("SELECT value FROM events WHERE event_time > watermark()")
707            .await
708            .unwrap();
709        let batches = df.collect().await.unwrap();
710        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
711        // Only event at 300s is after watermark (200s)
712        assert_eq!(total_rows, 1);
713    }
714
715    #[tokio::test]
716    async fn test_date_trunc_available() {
717        let ctx = create_streaming_context();
718        let df = ctx
719            .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
720            .await
721            .unwrap();
722        let batches = df.collect().await.unwrap();
723        assert_eq!(batches.len(), 1);
724        assert_eq!(batches[0].num_rows(), 1);
725    }
726
727    #[tokio::test]
728    async fn test_date_bin_available() {
729        let ctx = create_streaming_context();
730        let df = ctx
731            .sql(
732                "SELECT date_bin(\
733                 INTERVAL '15 minutes', \
734                 TIMESTAMP '2026-01-15 14:32:00', \
735                 TIMESTAMP '2026-01-01 00:00:00')",
736            )
737            .await
738            .unwrap();
739        let batches = df.collect().await.unwrap();
740        assert_eq!(batches.len(), 1);
741        assert_eq!(batches[0].num_rows(), 1);
742    }
743
744    #[tokio::test]
745    async fn test_unnest_literal_array() {
746        let ctx = create_streaming_context();
747        let df = ctx
748            .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
749            .await
750            .unwrap();
751        let batches = df.collect().await.unwrap();
752        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
753        assert_eq!(total_rows, 3);
754    }
755
756    #[tokio::test]
757    async fn test_unnest_from_table_with_array_col() {
758        let ctx = create_streaming_context();
759        // Register a table with an array column
760        ctx.sql(
761            "CREATE TABLE arr_table (id INT, tags INT[]) \
762             AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
763        )
764        .await
765        .unwrap();
766        let df = ctx
767            .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
768            .await
769            .unwrap();
770        let batches = df.collect().await.unwrap();
771        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
772        // Row 1: [10,20] → 2 rows, Row 2: [30] → 1 row = 3 total
773        assert_eq!(total_rows, 3);
774    }
775}