Skip to main content

laminar_sql/datafusion/
mod.rs

1//! `DataFusion` integration for SQL processing
2//!
3//! This module provides the integration layer between `LaminarDB`'s push-based
4//! streaming engine and `DataFusion`'s pull-based SQL query execution.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │                    Ring 2: Query Planning                        │
11//! │  SQL Query → SessionContext → LogicalPlan → ExecutionPlan       │
12//! │                                      │                          │
13//! │                            StreamingScanExec                    │
14//! │                                      │                          │
15//! │                              ┌───────▼──────┐                   │
16//! │                              │ StreamBridge │ (tokio channel)   │
17//! │                              └───────▲──────┘                   │
18//! ├──────────────────────────────────────┼──────────────────────────┤
19//! │                    Ring 0: Hot Path   │                          │
20//! │                                      │                          │
21//! │  Source → Reactor.poll() ────────────┘                          │
22//! │              (Events with RecordBatch data)                     │
23//! └─────────────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Components
27//!
28//! - [`StreamSource`]: Trait for streaming data sources
29//! - [`StreamBridge`]: Channel-based push-to-pull bridge
30//! - [`StreamingScanExec`]: `DataFusion` execution plan for streaming scans
31//! - [`StreamingTableProvider`]: `DataFusion` table provider for streaming sources
32//! - [`ChannelStreamSource`]: Concrete source using channels
33//!
34//! # Usage
35//!
36//! ```rust,ignore
37//! use laminar_sql::datafusion::{
38//!     create_streaming_context, ChannelStreamSource, StreamingTableProvider,
39//! };
40//! use std::sync::Arc;
41//!
42//! // Create a streaming context
43//! let ctx = create_streaming_context();
44//!
45//! // Create a channel source
46//! let schema = Arc::new(Schema::new(vec![
47//!     Field::new("id", DataType::Int64, false),
48//!     Field::new("value", DataType::Float64, true),
49//! ]));
50//! let source = Arc::new(ChannelStreamSource::new(schema));
51//! let sender = source.sender();
52//!
53//! // Register as a table
54//! let provider = StreamingTableProvider::new("events", source);
55//! ctx.register_table("events", Arc::new(provider))?;
56//!
57//! // Push data from the Reactor
58//! sender.send(batch).await?;
59//!
60//! // Execute SQL queries
61//! let df = ctx.sql("SELECT * FROM events WHERE value > 100").await?;
62//! ```
63
64/// DataFusion aggregate bridge for streaming aggregation.
65///
66/// Bridges DataFusion's `Accumulator` trait with `laminar-core`'s
67/// `DynAccumulator` / `DynAggregatorFactory` traits. This avoids
68/// duplicating aggregation logic.
69pub mod aggregate_bridge;
70mod bridge;
71mod channel_source;
72/// Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3)
73pub mod complex_type_lambda;
74/// Array, Struct, and Map scalar UDFs (F-SCHEMA-015)
75pub mod complex_type_udf;
76mod exec;
77/// End-to-end streaming SQL execution
78pub mod execute;
79/// Format bridge UDFs for inline format conversion
80pub mod format_bridge_udf;
81/// LaminarDB streaming JSON extension UDFs (F-SCHEMA-013)
82pub mod json_extensions;
83/// SQL/JSON path query compiler and scalar UDFs
84pub mod json_path;
85/// JSON table-valued functions (array/object expansion)
86pub mod json_tvf;
87/// JSONB binary format types for JSON UDF evaluation
88pub mod json_types;
89/// PostgreSQL-compatible JSON aggregate UDAFs
90pub mod json_udaf;
91/// PostgreSQL-compatible JSON scalar UDFs
92pub mod json_udf;
93/// Lookup join plan node for DataFusion.
94pub mod lookup_join;
95/// Physical execution plan and extension planner for lookup joins.
96pub mod lookup_join_exec;
97/// Processing-time UDF for `PROCTIME()` support
98pub mod proctime_udf;
99mod source;
100mod table_provider;
101/// Dynamic watermark filter for scan-level late-data pruning
102pub mod watermark_filter;
103/// Watermark UDF for current watermark access
104pub mod watermark_udf;
105/// Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
106pub mod window_udf;
107
108pub use aggregate_bridge::{
109    create_aggregate_factory, lookup_aggregate_udf, result_to_scalar_value, scalar_value_to_result,
110    DataFusionAccumulatorAdapter, DataFusionAggregateFactory,
111};
112pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
113pub use channel_source::ChannelStreamSource;
114pub use complex_type_lambda::{
115    register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
116    MapTransformValues,
117};
118pub use complex_type_udf::{
119    register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
120    StructExtract, StructMerge, StructRename, StructSet,
121};
122pub use exec::StreamingScanExec;
123pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
124pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
125pub use json_extensions::{
126    register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
127    JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
128};
129pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
130pub use json_tvf::{
131    register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
132    JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
133};
134pub use json_udaf::{JsonAgg, JsonObjectAgg};
135pub use json_udf::{
136    JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
137    JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
138    JsonbGetText, JsonbGetTextIdx, ToJsonb,
139};
140pub use lookup_join_exec::{
141    LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
142    PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
143    VersionedLookupState,
144};
145pub use proctime_udf::ProcTimeUdf;
146pub use source::{SortColumn, StreamSource, StreamSourceRef};
147pub use table_provider::StreamingTableProvider;
148pub use watermark_filter::WatermarkDynamicFilter;
149pub use watermark_udf::WatermarkUdf;
150pub use window_udf::{CumulateWindowStart, HopWindowStart, SessionWindowStart, TumbleWindowStart};
151
152use std::sync::atomic::AtomicI64;
153use std::sync::Arc;
154
155use datafusion::execution::SessionStateBuilder;
156use datafusion::prelude::*;
157use datafusion_expr::ScalarUDF;
158
159use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
160
161/// Returns a base `SessionConfig` with identifier normalization disabled.
162///
163/// DataFusion's default behaviour lowercases all unquoted SQL identifiers
164/// (per the SQL standard). LaminarDB disables this so that mixed-case
165/// column names from external sources (Kafka, CDC, WebSocket) can be
166/// referenced without double-quoting.
167#[must_use]
168pub fn base_session_config() -> SessionConfig {
169    let mut config = SessionConfig::new();
170    config.options_mut().sql_parser.enable_ident_normalization = false;
171    config
172}
173
174/// Creates a `DataFusion` session context with identifier normalization
175/// disabled.
176///
177/// Suitable for ad-hoc / non-streaming queries (filters, lookups).
178/// For streaming workloads prefer [`create_streaming_context`].
179#[must_use]
180pub fn create_session_context() -> SessionContext {
181    SessionContext::new_with_config(base_session_config())
182}
183
184/// Creates a `DataFusion` session context configured for streaming queries.
185///
186/// The context is configured with:
187/// - Batch size of 8192 (balanced for streaming throughput)
188/// - Single partition (streaming sources are typically not partitioned)
189/// - Identifier normalization disabled (mixed-case columns work unquoted)
190/// - All streaming UDFs registered (TUMBLE, HOP, SESSION, WATERMARK)
191/// - `StreamingPhysicalValidator` in `Reject` mode (blocks unsafe plans)
192///
193/// The watermark UDF is initialized with no watermark set (returns NULL).
194/// Use [`register_streaming_functions_with_watermark`] to provide a live
195/// watermark source.
196///
197/// # Example
198///
199/// ```rust,ignore
200/// let ctx = create_streaming_context();
201/// ctx.register_table("events", provider)?;
202/// let df = ctx.sql("SELECT * FROM events").await?;
203/// ```
204#[must_use]
205pub fn create_streaming_context() -> SessionContext {
206    create_streaming_context_with_validator(StreamingValidatorMode::Reject)
207}
208
209/// Creates a streaming context with a configurable validator mode.
210///
211/// Same as [`create_streaming_context`] but allows choosing how the
212/// [`StreamingPhysicalValidator`] handles plan violations.
213///
214/// Use [`StreamingValidatorMode::Off`] to get the previous behaviour
215/// (no plan-time validation).
216#[must_use]
217pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
218    let config = base_session_config()
219        .with_batch_size(8192)
220        .with_target_partitions(1); // Single partition for streaming
221
222    let ctx = if matches!(mode, StreamingValidatorMode::Off) {
223        SessionContext::new_with_config(config)
224    } else {
225        // Build a default state to get the standard optimizer rules, then
226        // prepend our streaming validator so it fires before DataFusion's
227        // built-in SanityCheckPlan (which produces generic error messages).
228        let default_state = SessionStateBuilder::new()
229            .with_config(config.clone())
230            .with_default_features()
231            .build();
232        let mut rules: Vec<
233            Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
234        > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
235        rules.extend(default_state.physical_optimizers().iter().cloned());
236
237        let state = SessionStateBuilder::new()
238            .with_config(config)
239            .with_default_features()
240            .with_physical_optimizer_rules(rules)
241            .build();
242        SessionContext::new_with_state(state)
243    };
244
245    register_streaming_functions(&ctx);
246    ctx
247}
248
249/// Registers `LaminarDB` streaming UDFs with a session context.
250///
251/// Registers the following scalar functions:
252/// - `tumble(timestamp, interval)` — tumbling window start
253/// - `hop(timestamp, slide, size)` — hopping window start
254/// - `session(timestamp, gap)` — session window pass-through
255/// - `watermark()` — current watermark (returns NULL, no live source)
256///
257/// Use [`register_streaming_functions_with_watermark`] to provide a
258/// live watermark source from Ring 0.
259pub fn register_streaming_functions(ctx: &SessionContext) {
260    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
261    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
262    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
263    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
264    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
265    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
266    register_json_functions(ctx);
267    register_json_extensions(ctx);
268    register_complex_type_functions(ctx);
269    register_lambda_functions(ctx);
270}
271
272/// Registers streaming UDFs with a live watermark source.
273///
274/// Same as [`register_streaming_functions`] but connects the `watermark()`
275/// UDF to a shared atomic value that Ring 0 updates in real time.
276///
277/// # Arguments
278///
279/// * `ctx` - `DataFusion` session context
280/// * `watermark_ms` - Shared atomic holding the current watermark in
281///   milliseconds since epoch. Values < 0 mean "no watermark" (returns NULL).
282pub fn register_streaming_functions_with_watermark(
283    ctx: &SessionContext,
284    watermark_ms: Arc<AtomicI64>,
285) {
286    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
287    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
288    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
289    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
290    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
291    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
292    register_json_functions(ctx);
293    register_json_extensions(ctx);
294    register_complex_type_functions(ctx);
295    register_lambda_functions(ctx);
296}
297
298/// Registers all PostgreSQL-compatible JSON UDFs and UDAFs
299/// with the given `SessionContext`.
300pub fn register_json_functions(ctx: &SessionContext) {
301    // Extraction operators
302    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
303    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
304    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
305    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
306    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
307    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
308
309    // Existence operators
310    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
311    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
312    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
313
314    // Containment operators
315    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
316    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
317
318    // Interrogation / construction
319    ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
320    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
321    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
322    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
323
324    // Aggregates
325    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
326    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
327        JsonObjectAgg::new(),
328    ));
329
330    // Format bridge functions
331    ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
332    ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
333    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
334    ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
335
336    // JSON path query functions (scalar)
337    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
338    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
339
340    // JSON table-valued functions
341    register_json_table_functions(ctx);
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use arrow_array::{Float64Array, Int64Array, RecordBatch};
348    use arrow_schema::{DataType, Field, Schema};
349    use datafusion::execution::FunctionRegistry;
350    use futures::StreamExt;
351    use std::sync::Arc;
352
353    fn test_schema() -> Arc<Schema> {
354        Arc::new(Schema::new(vec![
355            Field::new("id", DataType::Int64, false),
356            Field::new("value", DataType::Float64, true),
357        ]))
358    }
359
360    /// Take the sender from a `ChannelStreamSource`, panicking if already taken.
361    fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
362        source.take_sender().expect("sender already taken")
363    }
364
365    fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
366        RecordBatch::try_new(
367            Arc::clone(schema),
368            vec![
369                Arc::new(Int64Array::from(ids)),
370                Arc::new(Float64Array::from(values)),
371            ],
372        )
373        .unwrap()
374    }
375
376    #[test]
377    fn test_create_streaming_context() {
378        let ctx = create_streaming_context();
379        let state = ctx.state();
380        let config = state.config();
381
382        assert_eq!(config.batch_size(), 8192);
383        assert_eq!(config.target_partitions(), 1);
384    }
385
386    #[tokio::test]
387    async fn test_full_query_pipeline() {
388        let ctx = create_streaming_context();
389        let schema = test_schema();
390
391        // Create source and take the sender (important for channel closure)
392        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
393        let sender = take_test_sender(&source);
394        let provider = StreamingTableProvider::new("events", source);
395        ctx.register_table("events", Arc::new(provider)).unwrap();
396
397        // Send test data
398        sender
399            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
400            .await
401            .unwrap();
402        sender
403            .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
404            .await
405            .unwrap();
406        drop(sender); // Close the channel
407
408        // Execute query
409        let df = ctx.sql("SELECT * FROM events").await.unwrap();
410        let batches = df.collect().await.unwrap();
411
412        // Verify results
413        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
414        assert_eq!(total_rows, 5);
415    }
416
417    #[tokio::test]
418    async fn test_query_with_projection() {
419        let ctx = create_streaming_context();
420        let schema = test_schema();
421
422        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
423        let sender = take_test_sender(&source);
424        let provider = StreamingTableProvider::new("events", source);
425        ctx.register_table("events", Arc::new(provider)).unwrap();
426
427        sender
428            .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
429            .await
430            .unwrap();
431        drop(sender);
432
433        // Query only the id column
434        let df = ctx.sql("SELECT id FROM events").await.unwrap();
435        let batches = df.collect().await.unwrap();
436
437        assert_eq!(batches.len(), 1);
438        assert_eq!(batches[0].num_columns(), 1);
439        assert_eq!(batches[0].schema().field(0).name(), "id");
440    }
441
442    #[tokio::test]
443    async fn test_query_with_filter() {
444        let ctx = create_streaming_context();
445        let schema = test_schema();
446
447        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
448        let sender = take_test_sender(&source);
449        let provider = StreamingTableProvider::new("events", source);
450        ctx.register_table("events", Arc::new(provider)).unwrap();
451
452        sender
453            .send(test_batch(
454                &schema,
455                vec![1, 2, 3, 4, 5],
456                vec![10.0, 20.0, 30.0, 40.0, 50.0],
457            ))
458            .await
459            .unwrap();
460        drop(sender);
461
462        // Filter for value > 25
463        let df = ctx
464            .sql("SELECT * FROM events WHERE value > 25")
465            .await
466            .unwrap();
467        let batches = df.collect().await.unwrap();
468
469        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
470        assert_eq!(total_rows, 3); // 30, 40, 50
471    }
472
473    #[tokio::test]
474    async fn test_unbounded_aggregation_rejected() {
475        // Aggregations on unbounded streams should be rejected by `DataFusion`.
476        // Streaming aggregations require windows, which are implemented.
477        let ctx = create_streaming_context();
478        let schema = test_schema();
479
480        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
481        let sender = take_test_sender(&source);
482        let provider = StreamingTableProvider::new("events", source);
483        ctx.register_table("events", Arc::new(provider)).unwrap();
484
485        sender
486            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
487            .await
488            .unwrap();
489        drop(sender);
490
491        // Aggregate query on unbounded stream should fail at execution
492        let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
493
494        // Execution should fail because we can't aggregate an infinite stream
495        let result = df.collect().await;
496        assert!(
497            result.is_err(),
498            "Aggregation on unbounded stream should fail"
499        );
500    }
501
502    #[tokio::test]
503    async fn test_query_with_order_by() {
504        let ctx = create_streaming_context();
505        let schema = test_schema();
506
507        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
508        let sender = take_test_sender(&source);
509        let provider = StreamingTableProvider::new("events", source);
510        ctx.register_table("events", Arc::new(provider)).unwrap();
511
512        sender
513            .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
514            .await
515            .unwrap();
516        drop(sender);
517
518        // Query with ORDER BY (`DataFusion` handles this with Sort operator)
519        let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
520        let batches = df.collect().await.unwrap();
521
522        // Verify we got results (ordering may vary due to streaming nature)
523        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
524        assert_eq!(total_rows, 3);
525    }
526
527    #[tokio::test]
528    async fn test_bridge_throughput() {
529        // Benchmark-style test for bridge performance
530        let schema = test_schema();
531        let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
532        let sender = bridge.sender();
533        let mut stream = bridge.into_stream();
534
535        let batch_count = 1000;
536        let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
537
538        // Spawn sender task
539        let send_task = tokio::spawn(async move {
540            for _ in 0..batch_count {
541                sender.send(batch.clone()).await.unwrap();
542            }
543        });
544
545        // Receive all batches
546        let mut received = 0;
547        while let Some(result) = stream.next().await {
548            result.unwrap();
549            received += 1;
550            if received == batch_count {
551                break;
552            }
553        }
554
555        send_task.await.unwrap();
556        assert_eq!(received, batch_count);
557    }
558
559    // ── Integration Tests ──────────────────────────────────────────
560
561    #[test]
562    fn test_streaming_functions_registered() {
563        let ctx = create_streaming_context();
564        // Verify all 4 UDFs are registered
565        assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
566        assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
567        assert!(ctx.udf("session").is_ok(), "session UDF not registered");
568        assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
569    }
570
571    #[test]
572    fn test_streaming_functions_with_watermark() {
573        use std::sync::atomic::AtomicI64;
574
575        let ctx = create_session_context();
576        let wm = Arc::new(AtomicI64::new(42_000));
577        register_streaming_functions_with_watermark(&ctx, wm);
578
579        assert!(ctx.udf("tumble").is_ok());
580        assert!(ctx.udf("watermark").is_ok());
581    }
582
583    #[tokio::test]
584    async fn test_tumble_udf_via_datafusion() {
585        use arrow_array::TimestampMillisecondArray;
586        use arrow_schema::TimeUnit;
587
588        let ctx = create_streaming_context();
589
590        // Create schema with timestamp and value columns
591        let schema = Arc::new(Schema::new(vec![
592            Field::new(
593                "event_time",
594                DataType::Timestamp(TimeUnit::Millisecond, None),
595                false,
596            ),
597            Field::new("value", DataType::Float64, false),
598        ]));
599
600        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
601        let sender = take_test_sender(&source);
602        let provider = StreamingTableProvider::new("events", source);
603        ctx.register_table("events", Arc::new(provider)).unwrap();
604
605        // Send events across two 5-minute windows:
606        // Window [0, 300_000): timestamps 60_000, 120_000
607        // Window [300_000, 600_000): timestamps 360_000
608        let batch = RecordBatch::try_new(
609            Arc::clone(&schema),
610            vec![
611                Arc::new(TimestampMillisecondArray::from(vec![
612                    60_000i64, 120_000, 360_000,
613                ])),
614                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
615            ],
616        )
617        .unwrap();
618        sender.send(batch).await.unwrap();
619        drop(sender);
620
621        // Verify the tumble UDF computes correct window starts via DataFusion
622        // (GROUP BY aggregation and ORDER BY on unbounded streams are handled by Ring 0)
623        let df = ctx
624            .sql(
625                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
626                 value \
627                 FROM events",
628            )
629            .await
630            .unwrap();
631
632        let batches = df.collect().await.unwrap();
633        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
634        assert_eq!(total_rows, 3);
635
636        // Verify the window_start values (single batch, order preserved)
637        let ws_col = batches[0]
638            .column(0)
639            .as_any()
640            .downcast_ref::<TimestampMillisecondArray>()
641            .expect("window_start should be TimestampMillisecond");
642        // 60_000 and 120_000 → window [0, 300_000), start = 0
643        assert_eq!(ws_col.value(0), 0);
644        assert_eq!(ws_col.value(1), 0);
645        // 360_000 → window [300_000, 600_000), start = 300_000
646        assert_eq!(ws_col.value(2), 300_000);
647    }
648
649    #[tokio::test]
650    async fn test_logical_plan_from_windowed_query() {
651        use arrow_schema::TimeUnit;
652
653        let ctx = create_streaming_context();
654
655        let schema = Arc::new(Schema::new(vec![
656            Field::new(
657                "event_time",
658                DataType::Timestamp(TimeUnit::Millisecond, None),
659                false,
660            ),
661            Field::new("value", DataType::Float64, false),
662        ]));
663
664        let source = Arc::new(ChannelStreamSource::new(schema));
665        let _sender = source.take_sender();
666        let provider = StreamingTableProvider::new("events", source);
667        ctx.register_table("events", Arc::new(provider)).unwrap();
668
669        // Create a LogicalPlan for a windowed query
670        let df = ctx
671            .sql(
672                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
673                 COUNT(*) as cnt \
674                 FROM events \
675                 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
676            )
677            .await;
678
679        // Should succeed in creating the logical plan (UDFs are registered)
680        assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
681    }
682
683    #[tokio::test]
684    async fn test_end_to_end_execute_streaming_sql() {
685        use crate::planner::StreamingPlanner;
686
687        let ctx = create_streaming_context();
688
689        let schema = Arc::new(Schema::new(vec![
690            Field::new("id", DataType::Int64, false),
691            Field::new("name", DataType::Utf8, true),
692        ]));
693
694        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
695        let sender = take_test_sender(&source);
696        let provider = StreamingTableProvider::new("items", source);
697        ctx.register_table("items", Arc::new(provider)).unwrap();
698
699        let batch = RecordBatch::try_new(
700            Arc::clone(&schema),
701            vec![
702                Arc::new(Int64Array::from(vec![1, 2, 3])),
703                Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
704            ],
705        )
706        .unwrap();
707        sender.send(batch).await.unwrap();
708        drop(sender);
709
710        let mut planner = StreamingPlanner::new();
711        let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
712            .await
713            .unwrap();
714
715        match result {
716            StreamingSqlResult::Query(qr) => {
717                let mut stream = qr.stream;
718                let mut total = 0;
719                while let Some(batch) = stream.next().await {
720                    total += batch.unwrap().num_rows();
721                }
722                assert_eq!(total, 2); // id=2, id=3
723            }
724            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
725        }
726    }
727
728    #[tokio::test]
729    async fn test_watermark_function_in_filter() {
730        use arrow_array::TimestampMillisecondArray;
731        use arrow_schema::TimeUnit;
732        use std::sync::atomic::AtomicI64;
733
734        // Create context with a specific watermark value
735        let config = base_session_config()
736            .with_batch_size(8192)
737            .with_target_partitions(1);
738        let ctx = SessionContext::new_with_config(config);
739        let wm = Arc::new(AtomicI64::new(200_000)); // watermark at 200s
740        register_streaming_functions_with_watermark(&ctx, wm);
741
742        let schema = Arc::new(Schema::new(vec![
743            Field::new(
744                "event_time",
745                DataType::Timestamp(TimeUnit::Millisecond, None),
746                false,
747            ),
748            Field::new("value", DataType::Float64, false),
749        ]));
750
751        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
752        let sender = take_test_sender(&source);
753        let provider = StreamingTableProvider::new("events", source);
754        ctx.register_table("events", Arc::new(provider)).unwrap();
755
756        // Events: 100s, 200s, 300s - watermark is at 200s
757        let batch = RecordBatch::try_new(
758            Arc::clone(&schema),
759            vec![
760                Arc::new(TimestampMillisecondArray::from(vec![
761                    100_000i64, 200_000, 300_000,
762                ])),
763                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
764            ],
765        )
766        .unwrap();
767        sender.send(batch).await.unwrap();
768        drop(sender);
769
770        // Filter events after watermark
771        let df = ctx
772            .sql("SELECT value FROM events WHERE event_time > watermark()")
773            .await
774            .unwrap();
775        let batches = df.collect().await.unwrap();
776        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
777        // Only event at 300s is after watermark (200s)
778        assert_eq!(total_rows, 1);
779    }
780
781    #[tokio::test]
782    async fn test_date_trunc_available() {
783        let ctx = create_streaming_context();
784        let df = ctx
785            .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
786            .await
787            .unwrap();
788        let batches = df.collect().await.unwrap();
789        assert_eq!(batches.len(), 1);
790        assert_eq!(batches[0].num_rows(), 1);
791    }
792
793    #[tokio::test]
794    async fn test_date_bin_available() {
795        let ctx = create_streaming_context();
796        let df = ctx
797            .sql(
798                "SELECT date_bin(\
799                 INTERVAL '15 minutes', \
800                 TIMESTAMP '2026-01-15 14:32:00', \
801                 TIMESTAMP '2026-01-01 00:00:00')",
802            )
803            .await
804            .unwrap();
805        let batches = df.collect().await.unwrap();
806        assert_eq!(batches.len(), 1);
807        assert_eq!(batches[0].num_rows(), 1);
808    }
809
810    #[tokio::test]
811    async fn test_unnest_literal_array() {
812        let ctx = create_streaming_context();
813        let df = ctx
814            .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
815            .await
816            .unwrap();
817        let batches = df.collect().await.unwrap();
818        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
819        assert_eq!(total_rows, 3);
820    }
821
822    #[tokio::test]
823    async fn test_unnest_from_table_with_array_col() {
824        let ctx = create_streaming_context();
825        // Register a table with an array column
826        ctx.sql(
827            "CREATE TABLE arr_table (id INT, tags INT[]) \
828             AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
829        )
830        .await
831        .unwrap();
832        let df = ctx
833            .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
834            .await
835            .unwrap();
836        let batches = df.collect().await.unwrap();
837        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
838        // Row 1: [10,20] → 2 rows, Row 2: [30] → 1 row = 3 total
839        assert_eq!(total_rows, 3);
840    }
841}