Skip to main content

laminar_sql/datafusion/
mod.rs

1//! `DataFusion` integration for SQL processing
2//!
3//! This module provides the integration layer between `LaminarDB`'s push-based
4//! streaming engine and `DataFusion`'s pull-based SQL query execution.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │                    Ring 2: Query Planning                        │
11//! │  SQL Query → SessionContext → LogicalPlan → ExecutionPlan       │
12//! │                                      │                          │
13//! │                            StreamingScanExec                    │
14//! │                                      │                          │
15//! │                              ┌───────▼──────┐                   │
16//! │                              │ StreamBridge │ (tokio channel)   │
17//! │                              └───────▲──────┘                   │
18//! ├──────────────────────────────────────┼──────────────────────────┤
19//! │                    Ring 0: Hot Path   │                          │
20//! │                                      │                          │
21//! │  Source → Reactor.poll() ────────────┘                          │
22//! │              (Events with RecordBatch data)                     │
23//! └─────────────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Components
27//!
28//! - [`StreamSource`]: Trait for streaming data sources
29//! - [`StreamBridge`]: Channel-based push-to-pull bridge
30//! - [`StreamingScanExec`]: `DataFusion` execution plan for streaming scans
31//! - [`StreamingTableProvider`]: `DataFusion` table provider for streaming sources
32//! - [`ChannelStreamSource`]: Concrete source using channels
33//!
34//! # Usage
35//!
36//! ```rust,ignore
37//! use laminar_sql::datafusion::{
38//!     create_streaming_context, ChannelStreamSource, StreamingTableProvider,
39//! };
40//! use std::sync::Arc;
41//!
42//! // Create a streaming context
43//! let ctx = create_streaming_context();
44//!
45//! // Create a channel source
46//! let schema = Arc::new(Schema::new(vec![
47//!     Field::new("id", DataType::Int64, false),
48//!     Field::new("value", DataType::Float64, true),
49//! ]));
50//! let source = Arc::new(ChannelStreamSource::new(schema));
51//! let sender = source.sender();
52//!
53//! // Register as a table
54//! let provider = StreamingTableProvider::new("events", source);
55//! ctx.register_table("events", Arc::new(provider))?;
56//!
57//! // Push data from the Reactor
58//! sender.send(batch).await?;
59//!
60//! // Execute SQL queries
61//! let df = ctx.sql("SELECT * FROM events WHERE value > 100").await?;
62//! ```
63
64/// DataFusion aggregate bridge for streaming aggregation.
65///
66/// Bridges DataFusion's `Accumulator` trait with `laminar-core`'s
67/// `DynAccumulator` / `DynAggregatorFactory` traits. This avoids
68/// duplicating aggregation logic.
69pub mod aggregate_bridge;
70mod bridge;
71mod channel_source;
72/// Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3)
73pub mod complex_type_lambda;
74/// Array, Struct, and Map scalar UDFs (F-SCHEMA-015)
75pub mod complex_type_udf;
76mod exec;
77/// End-to-end streaming SQL execution
78pub mod execute;
79/// Format bridge UDFs for inline format conversion
80pub mod format_bridge_udf;
81/// LaminarDB streaming JSON extension UDFs (F-SCHEMA-013)
82pub mod json_extensions;
83/// SQL/JSON path query compiler and scalar UDFs
84pub mod json_path;
85/// JSON table-valued functions (array/object expansion)
86pub mod json_tvf;
87/// JSONB binary format types for JSON UDF evaluation
88pub mod json_types;
89/// PostgreSQL-compatible JSON aggregate UDAFs
90pub mod json_udaf;
91/// PostgreSQL-compatible JSON scalar UDFs
92pub mod json_udf;
93/// Lookup join plan node for DataFusion.
94pub mod lookup_join;
95/// Processing-time UDF for `PROCTIME()` support
96pub mod proctime_udf;
97mod source;
98mod table_provider;
99/// Dynamic watermark filter for scan-level late-data pruning
100pub mod watermark_filter;
101/// Watermark UDF for current watermark access
102pub mod watermark_udf;
103/// Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
104pub mod window_udf;
105
106pub use aggregate_bridge::{
107    create_aggregate_factory, lookup_aggregate_udf, result_to_scalar_value, scalar_value_to_result,
108    DataFusionAccumulatorAdapter, DataFusionAggregateFactory,
109};
110pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
111pub use channel_source::ChannelStreamSource;
112pub use complex_type_lambda::{
113    register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
114    MapTransformValues,
115};
116pub use complex_type_udf::{
117    register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
118    StructExtract, StructMerge, StructRename, StructSet,
119};
120pub use exec::StreamingScanExec;
121pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
122pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
123pub use json_extensions::{
124    register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
125    JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
126};
127pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
128pub use json_tvf::{
129    register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
130    JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
131};
132pub use json_udaf::{JsonAgg, JsonObjectAgg};
133pub use json_udf::{
134    JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
135    JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
136    JsonbGetText, JsonbGetTextIdx, ToJsonb,
137};
138pub use proctime_udf::ProcTimeUdf;
139pub use source::{SortColumn, StreamSource, StreamSourceRef};
140pub use table_provider::StreamingTableProvider;
141pub use watermark_filter::WatermarkDynamicFilter;
142pub use watermark_udf::WatermarkUdf;
143pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
144
145use std::sync::atomic::AtomicI64;
146use std::sync::Arc;
147
148use datafusion::execution::SessionStateBuilder;
149use datafusion::prelude::*;
150use datafusion_expr::ScalarUDF;
151
152use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
153
154/// Returns a base `SessionConfig` with identifier normalization disabled.
155///
156/// DataFusion's default behaviour lowercases all unquoted SQL identifiers
157/// (per the SQL standard). LaminarDB disables this so that mixed-case
158/// column names from external sources (Kafka, CDC, WebSocket) can be
159/// referenced without double-quoting.
160#[must_use]
161pub fn base_session_config() -> SessionConfig {
162    let mut config = SessionConfig::new();
163    config.options_mut().sql_parser.enable_ident_normalization = false;
164    config
165}
166
167/// Creates a `DataFusion` session context with identifier normalization
168/// disabled.
169///
170/// Suitable for ad-hoc / non-streaming queries (filters, lookups).
171/// For streaming workloads prefer [`create_streaming_context`].
172#[must_use]
173pub fn create_session_context() -> SessionContext {
174    SessionContext::new_with_config(base_session_config())
175}
176
177/// Creates a `DataFusion` session context configured for streaming queries.
178///
179/// The context is configured with:
180/// - Batch size of 8192 (balanced for streaming throughput)
181/// - Single partition (streaming sources are typically not partitioned)
182/// - Identifier normalization disabled (mixed-case columns work unquoted)
183/// - All streaming UDFs registered (TUMBLE, HOP, SESSION, WATERMARK)
184/// - `StreamingPhysicalValidator` in `Reject` mode (blocks unsafe plans)
185///
186/// The watermark UDF is initialized with no watermark set (returns NULL).
187/// Use [`register_streaming_functions_with_watermark`] to provide a live
188/// watermark source.
189///
190/// # Example
191///
192/// ```rust,ignore
193/// let ctx = create_streaming_context();
194/// ctx.register_table("events", provider)?;
195/// let df = ctx.sql("SELECT * FROM events").await?;
196/// ```
197#[must_use]
198pub fn create_streaming_context() -> SessionContext {
199    create_streaming_context_with_validator(StreamingValidatorMode::Reject)
200}
201
202/// Creates a streaming context with a configurable validator mode.
203///
204/// Same as [`create_streaming_context`] but allows choosing how the
205/// [`StreamingPhysicalValidator`] handles plan violations.
206///
207/// Use [`StreamingValidatorMode::Off`] to get the previous behaviour
208/// (no plan-time validation).
209#[must_use]
210pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
211    let config = base_session_config()
212        .with_batch_size(8192)
213        .with_target_partitions(1); // Single partition for streaming
214
215    let ctx = if matches!(mode, StreamingValidatorMode::Off) {
216        SessionContext::new_with_config(config)
217    } else {
218        // Build a default state to get the standard optimizer rules, then
219        // prepend our streaming validator so it fires before DataFusion's
220        // built-in SanityCheckPlan (which produces generic error messages).
221        let default_state = SessionStateBuilder::new()
222            .with_config(config.clone())
223            .with_default_features()
224            .build();
225        let mut rules: Vec<
226            Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
227        > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
228        rules.extend(default_state.physical_optimizers().iter().cloned());
229
230        let state = SessionStateBuilder::new()
231            .with_config(config)
232            .with_default_features()
233            .with_physical_optimizer_rules(rules)
234            .build();
235        SessionContext::new_with_state(state)
236    };
237
238    register_streaming_functions(&ctx);
239    ctx
240}
241
242/// Registers `LaminarDB` streaming UDFs with a session context.
243///
244/// Registers the following scalar functions:
245/// - `tumble(timestamp, interval)` — tumbling window start
246/// - `hop(timestamp, slide, size)` — hopping window start
247/// - `session(timestamp, gap)` — session window pass-through
248/// - `watermark()` — current watermark (returns NULL, no live source)
249///
250/// Use [`register_streaming_functions_with_watermark`] to provide a
251/// live watermark source from Ring 0.
252pub fn register_streaming_functions(ctx: &SessionContext) {
253    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
254    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
255    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
256    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
257    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
258    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
259    register_json_functions(ctx);
260    register_json_extensions(ctx);
261    register_complex_type_functions(ctx);
262    register_lambda_functions(ctx);
263}
264
265/// Registers streaming UDFs with a live watermark source.
266///
267/// Same as [`register_streaming_functions`] but connects the `watermark()`
268/// UDF to a shared atomic value that Ring 0 updates in real time.
269///
270/// # Arguments
271///
272/// * `ctx` - `DataFusion` session context
273/// * `watermark_ms` - Shared atomic holding the current watermark in
274///   milliseconds since epoch. Values < 0 mean "no watermark" (returns NULL).
275pub fn register_streaming_functions_with_watermark(
276    ctx: &SessionContext,
277    watermark_ms: Arc<AtomicI64>,
278) {
279    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
280    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
281    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
282    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
283    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
284    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
285    register_json_functions(ctx);
286    register_json_extensions(ctx);
287    register_complex_type_functions(ctx);
288    register_lambda_functions(ctx);
289}
290
291/// Registers all PostgreSQL-compatible JSON UDFs and UDAFs
292/// with the given `SessionContext`.
293pub fn register_json_functions(ctx: &SessionContext) {
294    // Extraction operators
295    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
296    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
297    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
298    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
299    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
300    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
301
302    // Existence operators
303    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
304    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
305    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
306
307    // Containment operators
308    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
309    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
310
311    // Interrogation / construction
312    ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
313    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
314    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
315    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
316
317    // Aggregates
318    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
319    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
320        JsonObjectAgg::new(),
321    ));
322
323    // Format bridge functions
324    ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
325    ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
326    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
327    ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
328
329    // JSON path query functions (scalar)
330    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
331    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
332
333    // JSON table-valued functions
334    register_json_table_functions(ctx);
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use arrow_array::{Float64Array, Int64Array, RecordBatch};
341    use arrow_schema::{DataType, Field, Schema};
342    use datafusion::execution::FunctionRegistry;
343    use futures::StreamExt;
344    use std::sync::Arc;
345
346    fn test_schema() -> Arc<Schema> {
347        Arc::new(Schema::new(vec![
348            Field::new("id", DataType::Int64, false),
349            Field::new("value", DataType::Float64, true),
350        ]))
351    }
352
353    /// Take the sender from a `ChannelStreamSource`, panicking if already taken.
354    fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
355        source.take_sender().expect("sender already taken")
356    }
357
358    fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
359        RecordBatch::try_new(
360            Arc::clone(schema),
361            vec![
362                Arc::new(Int64Array::from(ids)),
363                Arc::new(Float64Array::from(values)),
364            ],
365        )
366        .unwrap()
367    }
368
369    #[test]
370    fn test_create_streaming_context() {
371        let ctx = create_streaming_context();
372        let state = ctx.state();
373        let config = state.config();
374
375        assert_eq!(config.batch_size(), 8192);
376        assert_eq!(config.target_partitions(), 1);
377    }
378
379    #[tokio::test]
380    async fn test_full_query_pipeline() {
381        let ctx = create_streaming_context();
382        let schema = test_schema();
383
384        // Create source and take the sender (important for channel closure)
385        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
386        let sender = take_test_sender(&source);
387        let provider = StreamingTableProvider::new("events", source);
388        ctx.register_table("events", Arc::new(provider)).unwrap();
389
390        // Send test data
391        sender
392            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
393            .await
394            .unwrap();
395        sender
396            .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
397            .await
398            .unwrap();
399        drop(sender); // Close the channel
400
401        // Execute query
402        let df = ctx.sql("SELECT * FROM events").await.unwrap();
403        let batches = df.collect().await.unwrap();
404
405        // Verify results
406        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
407        assert_eq!(total_rows, 5);
408    }
409
410    #[tokio::test]
411    async fn test_query_with_projection() {
412        let ctx = create_streaming_context();
413        let schema = test_schema();
414
415        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
416        let sender = take_test_sender(&source);
417        let provider = StreamingTableProvider::new("events", source);
418        ctx.register_table("events", Arc::new(provider)).unwrap();
419
420        sender
421            .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
422            .await
423            .unwrap();
424        drop(sender);
425
426        // Query only the id column
427        let df = ctx.sql("SELECT id FROM events").await.unwrap();
428        let batches = df.collect().await.unwrap();
429
430        assert_eq!(batches.len(), 1);
431        assert_eq!(batches[0].num_columns(), 1);
432        assert_eq!(batches[0].schema().field(0).name(), "id");
433    }
434
435    #[tokio::test]
436    async fn test_query_with_filter() {
437        let ctx = create_streaming_context();
438        let schema = test_schema();
439
440        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
441        let sender = take_test_sender(&source);
442        let provider = StreamingTableProvider::new("events", source);
443        ctx.register_table("events", Arc::new(provider)).unwrap();
444
445        sender
446            .send(test_batch(
447                &schema,
448                vec![1, 2, 3, 4, 5],
449                vec![10.0, 20.0, 30.0, 40.0, 50.0],
450            ))
451            .await
452            .unwrap();
453        drop(sender);
454
455        // Filter for value > 25
456        let df = ctx
457            .sql("SELECT * FROM events WHERE value > 25")
458            .await
459            .unwrap();
460        let batches = df.collect().await.unwrap();
461
462        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
463        assert_eq!(total_rows, 3); // 30, 40, 50
464    }
465
466    #[tokio::test]
467    async fn test_unbounded_aggregation_rejected() {
468        // Aggregations on unbounded streams should be rejected by `DataFusion`.
469        // Streaming aggregations require windows, which are implemented.
470        let ctx = create_streaming_context();
471        let schema = test_schema();
472
473        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
474        let sender = take_test_sender(&source);
475        let provider = StreamingTableProvider::new("events", source);
476        ctx.register_table("events", Arc::new(provider)).unwrap();
477
478        sender
479            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
480            .await
481            .unwrap();
482        drop(sender);
483
484        // Aggregate query on unbounded stream should fail at execution
485        let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
486
487        // Execution should fail because we can't aggregate an infinite stream
488        let result = df.collect().await;
489        assert!(
490            result.is_err(),
491            "Aggregation on unbounded stream should fail"
492        );
493    }
494
495    #[tokio::test]
496    async fn test_query_with_order_by() {
497        let ctx = create_streaming_context();
498        let schema = test_schema();
499
500        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
501        let sender = take_test_sender(&source);
502        let provider = StreamingTableProvider::new("events", source);
503        ctx.register_table("events", Arc::new(provider)).unwrap();
504
505        sender
506            .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
507            .await
508            .unwrap();
509        drop(sender);
510
511        // Query with ORDER BY (`DataFusion` handles this with Sort operator)
512        let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
513        let batches = df.collect().await.unwrap();
514
515        // Verify we got results (ordering may vary due to streaming nature)
516        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
517        assert_eq!(total_rows, 3);
518    }
519
520    #[tokio::test]
521    async fn test_bridge_throughput() {
522        // Benchmark-style test for bridge performance
523        let schema = test_schema();
524        let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
525        let sender = bridge.sender();
526        let mut stream = bridge.into_stream();
527
528        let batch_count = 1000;
529        let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
530
531        // Spawn sender task
532        let send_task = tokio::spawn(async move {
533            for _ in 0..batch_count {
534                sender.send(batch.clone()).await.unwrap();
535            }
536        });
537
538        // Receive all batches
539        let mut received = 0;
540        while let Some(result) = stream.next().await {
541            result.unwrap();
542            received += 1;
543            if received == batch_count {
544                break;
545            }
546        }
547
548        send_task.await.unwrap();
549        assert_eq!(received, batch_count);
550    }
551
552    // ── Integration Tests ──────────────────────────────────────────
553
554    #[test]
555    fn test_streaming_functions_registered() {
556        let ctx = create_streaming_context();
557        // Verify all 4 UDFs are registered
558        assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
559        assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
560        assert!(ctx.udf("session").is_ok(), "session UDF not registered");
561        assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
562    }
563
564    #[test]
565    fn test_streaming_functions_with_watermark() {
566        use std::sync::atomic::AtomicI64;
567
568        let ctx = create_session_context();
569        let wm = Arc::new(AtomicI64::new(42_000));
570        register_streaming_functions_with_watermark(&ctx, wm);
571
572        assert!(ctx.udf("tumble").is_ok());
573        assert!(ctx.udf("watermark").is_ok());
574    }
575
576    #[tokio::test]
577    async fn test_tumble_udf_via_datafusion() {
578        use arrow_array::TimestampMillisecondArray;
579        use arrow_schema::TimeUnit;
580
581        let ctx = create_streaming_context();
582
583        // Create schema with timestamp and value columns
584        let schema = Arc::new(Schema::new(vec![
585            Field::new(
586                "event_time",
587                DataType::Timestamp(TimeUnit::Millisecond, None),
588                false,
589            ),
590            Field::new("value", DataType::Float64, false),
591        ]));
592
593        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
594        let sender = take_test_sender(&source);
595        let provider = StreamingTableProvider::new("events", source);
596        ctx.register_table("events", Arc::new(provider)).unwrap();
597
598        // Send events across two 5-minute windows:
599        // Window [0, 300_000): timestamps 60_000, 120_000
600        // Window [300_000, 600_000): timestamps 360_000
601        let batch = RecordBatch::try_new(
602            Arc::clone(&schema),
603            vec![
604                Arc::new(TimestampMillisecondArray::from(vec![
605                    60_000i64, 120_000, 360_000,
606                ])),
607                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
608            ],
609        )
610        .unwrap();
611        sender.send(batch).await.unwrap();
612        drop(sender);
613
614        // Verify the tumble UDF computes correct window starts via DataFusion
615        // (GROUP BY aggregation and ORDER BY on unbounded streams are handled by Ring 0)
616        let df = ctx
617            .sql(
618                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
619                 value \
620                 FROM events",
621            )
622            .await
623            .unwrap();
624
625        let batches = df.collect().await.unwrap();
626        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
627        assert_eq!(total_rows, 3);
628
629        // Verify the window_start values (single batch, order preserved)
630        let ws_col = batches[0]
631            .column(0)
632            .as_any()
633            .downcast_ref::<TimestampMillisecondArray>()
634            .expect("window_start should be TimestampMillisecond");
635        // 60_000 and 120_000 → window [0, 300_000), start = 0
636        assert_eq!(ws_col.value(0), 0);
637        assert_eq!(ws_col.value(1), 0);
638        // 360_000 → window [300_000, 600_000), start = 300_000
639        assert_eq!(ws_col.value(2), 300_000);
640    }
641
642    #[tokio::test]
643    async fn test_logical_plan_from_windowed_query() {
644        use arrow_schema::TimeUnit;
645
646        let ctx = create_streaming_context();
647
648        let schema = Arc::new(Schema::new(vec![
649            Field::new(
650                "event_time",
651                DataType::Timestamp(TimeUnit::Millisecond, None),
652                false,
653            ),
654            Field::new("value", DataType::Float64, false),
655        ]));
656
657        let source = Arc::new(ChannelStreamSource::new(schema));
658        let _sender = source.take_sender();
659        let provider = StreamingTableProvider::new("events", source);
660        ctx.register_table("events", Arc::new(provider)).unwrap();
661
662        // Create a LogicalPlan for a windowed query
663        let df = ctx
664            .sql(
665                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
666                 COUNT(*) as cnt \
667                 FROM events \
668                 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
669            )
670            .await;
671
672        // Should succeed in creating the logical plan (UDFs are registered)
673        assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
674    }
675
676    #[tokio::test]
677    async fn test_end_to_end_execute_streaming_sql() {
678        use crate::planner::StreamingPlanner;
679
680        let ctx = create_streaming_context();
681
682        let schema = Arc::new(Schema::new(vec![
683            Field::new("id", DataType::Int64, false),
684            Field::new("name", DataType::Utf8, true),
685        ]));
686
687        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
688        let sender = take_test_sender(&source);
689        let provider = StreamingTableProvider::new("items", source);
690        ctx.register_table("items", Arc::new(provider)).unwrap();
691
692        let batch = RecordBatch::try_new(
693            Arc::clone(&schema),
694            vec![
695                Arc::new(Int64Array::from(vec![1, 2, 3])),
696                Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
697            ],
698        )
699        .unwrap();
700        sender.send(batch).await.unwrap();
701        drop(sender);
702
703        let mut planner = StreamingPlanner::new();
704        let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
705            .await
706            .unwrap();
707
708        match result {
709            StreamingSqlResult::Query(qr) => {
710                let mut stream = qr.stream;
711                let mut total = 0;
712                while let Some(batch) = stream.next().await {
713                    total += batch.unwrap().num_rows();
714                }
715                assert_eq!(total, 2); // id=2, id=3
716            }
717            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
718        }
719    }
720
721    #[tokio::test]
722    async fn test_watermark_function_in_filter() {
723        use arrow_array::TimestampMillisecondArray;
724        use arrow_schema::TimeUnit;
725        use std::sync::atomic::AtomicI64;
726
727        // Create context with a specific watermark value
728        let config = base_session_config()
729            .with_batch_size(8192)
730            .with_target_partitions(1);
731        let ctx = SessionContext::new_with_config(config);
732        let wm = Arc::new(AtomicI64::new(200_000)); // watermark at 200s
733        register_streaming_functions_with_watermark(&ctx, wm);
734
735        let schema = Arc::new(Schema::new(vec![
736            Field::new(
737                "event_time",
738                DataType::Timestamp(TimeUnit::Millisecond, None),
739                false,
740            ),
741            Field::new("value", DataType::Float64, false),
742        ]));
743
744        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
745        let sender = take_test_sender(&source);
746        let provider = StreamingTableProvider::new("events", source);
747        ctx.register_table("events", Arc::new(provider)).unwrap();
748
749        // Events: 100s, 200s, 300s - watermark is at 200s
750        let batch = RecordBatch::try_new(
751            Arc::clone(&schema),
752            vec![
753                Arc::new(TimestampMillisecondArray::from(vec![
754                    100_000i64, 200_000, 300_000,
755                ])),
756                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
757            ],
758        )
759        .unwrap();
760        sender.send(batch).await.unwrap();
761        drop(sender);
762
763        // Filter events after watermark
764        let df = ctx
765            .sql("SELECT value FROM events WHERE event_time > watermark()")
766            .await
767            .unwrap();
768        let batches = df.collect().await.unwrap();
769        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
770        // Only event at 300s is after watermark (200s)
771        assert_eq!(total_rows, 1);
772    }
773
774    #[tokio::test]
775    async fn test_date_trunc_available() {
776        let ctx = create_streaming_context();
777        let df = ctx
778            .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
779            .await
780            .unwrap();
781        let batches = df.collect().await.unwrap();
782        assert_eq!(batches.len(), 1);
783        assert_eq!(batches[0].num_rows(), 1);
784    }
785
786    #[tokio::test]
787    async fn test_date_bin_available() {
788        let ctx = create_streaming_context();
789        let df = ctx
790            .sql(
791                "SELECT date_bin(\
792                 INTERVAL '15 minutes', \
793                 TIMESTAMP '2026-01-15 14:32:00', \
794                 TIMESTAMP '2026-01-01 00:00:00')",
795            )
796            .await
797            .unwrap();
798        let batches = df.collect().await.unwrap();
799        assert_eq!(batches.len(), 1);
800        assert_eq!(batches[0].num_rows(), 1);
801    }
802
803    #[tokio::test]
804    async fn test_unnest_literal_array() {
805        let ctx = create_streaming_context();
806        let df = ctx
807            .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
808            .await
809            .unwrap();
810        let batches = df.collect().await.unwrap();
811        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
812        assert_eq!(total_rows, 3);
813    }
814
815    #[tokio::test]
816    async fn test_unnest_from_table_with_array_col() {
817        let ctx = create_streaming_context();
818        // Register a table with an array column
819        ctx.sql(
820            "CREATE TABLE arr_table (id INT, tags INT[]) \
821             AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
822        )
823        .await
824        .unwrap();
825        let df = ctx
826            .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
827            .await
828            .unwrap();
829        let batches = df.collect().await.unwrap();
830        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
831        // Row 1: [10,20] → 2 rows, Row 2: [30] → 1 row = 3 total
832        assert_eq!(total_rows, 3);
833    }
834}