Skip to main content

laminar_sql/datafusion/
execute.rs

1//! End-to-end streaming SQL execution (F005B)
2//!
3//! Provides [`execute_streaming_sql`] for parsing, planning, and executing
4//! streaming SQL statements through the `DataFusion` engine.
5//!
6//! # Architecture
7//!
8//! ```text
9//! SQL string
10//!     │
11//!     ▼
12//! parse_streaming_sql()  →  StreamingStatement
13//!     │
14//!     ▼
15//! StreamingPlanner::plan()  →  StreamingPlan
16//!     │
17//!     ├─ DDL (CREATE SOURCE/SINK)  →  DdlResult
18//!     │
19//!     ├─ Query (windows/joins)     →  LogicalPlan → DataFrame → stream
20//!     │                                + QueryPlan metadata
21//!     │
22//!     └─ Standard SQL              →  DataFusion ctx.sql() → stream
23//! ```
24
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
29use crate::parser::parse_streaming_sql;
30use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
31use crate::Error;
32
33/// Result of executing a streaming SQL statement.
34#[derive(Debug)]
35pub enum StreamingSqlResult {
36    /// DDL statement result (CREATE SOURCE, CREATE SINK)
37    Ddl(DdlResult),
38    /// Query execution result with optional streaming metadata
39    Query(QueryResult),
40}
41
42/// Result of a DDL statement execution.
43#[derive(Debug)]
44pub struct DdlResult {
45    /// The streaming plan describing what was created or registered
46    pub plan: StreamingPlan,
47}
48
49/// Result of a query execution.
50///
51/// Contains both the `DataFusion` record batch stream and optional
52/// streaming metadata (window config, join config, emit clause) from
53/// the `QueryPlan`. Ring 0 operators use the `query_plan` to configure
54/// windowing and join behavior.
55pub struct QueryResult {
56    /// Record batch stream from `DataFusion` execution
57    pub stream: SendableRecordBatchStream,
58    /// Streaming query metadata (window config, join config, etc.)
59    ///
60    /// `None` for standard SQL pass-through queries.
61    /// `Some` for queries with streaming features (windows, joins).
62    pub query_plan: Option<QueryPlan>,
63}
64
65impl std::fmt::Debug for QueryResult {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("QueryResult")
68            .field("query_plan", &self.query_plan)
69            .field("stream", &"<SendableRecordBatchStream>")
70            .finish()
71    }
72}
73
74/// Executes a streaming SQL statement end-to-end.
75///
76/// This function performs the full pipeline:
77/// 1. Parse SQL with streaming extensions (CREATE SOURCE/SINK, windows, etc.)
78/// 2. Plan via [`StreamingPlanner`]
79/// 3. For DDL: return the streaming plan as [`DdlResult`]
80/// 4. For queries with streaming features: create `LogicalPlan` via
81///    `DataFusion`, execute, and return stream + [`QueryPlan`] metadata
82/// 5. For standard SQL: pass through to `DataFusion` directly
83///
84/// # Arguments
85///
86/// * `sql` - The SQL statement to execute
87/// * `ctx` - `DataFusion` session context (should have streaming functions registered)
88/// * `planner` - Streaming planner with registered sources/sinks
89///
90/// # Errors
91///
92/// Returns [`Error`] if parsing, planning, or execution fails.
93pub async fn execute_streaming_sql(
94    sql: &str,
95    ctx: &SessionContext,
96    planner: &mut StreamingPlanner,
97) -> std::result::Result<StreamingSqlResult, Error> {
98    let statements = parse_streaming_sql(sql)?;
99
100    if statements.is_empty() {
101        return Err(Error::ParseError(
102            crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
103        ));
104    }
105
106    // Process the first statement
107    let statement = &statements[0];
108    let plan = planner.plan(statement)?;
109
110    match plan {
111        StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
112            Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
113        }
114        StreamingPlan::Query(mut query_plan) => {
115            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
116            rewrite_interval_arithmetic(&mut query_plan.statement);
117            let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
118            let df = ctx.execute_logical_plan(logical_plan).await?;
119            let stream = df.execute_stream().await?;
120
121            Ok(StreamingSqlResult::Query(QueryResult {
122                stream,
123                query_plan: Some(query_plan),
124            }))
125        }
126        StreamingPlan::Standard(mut stmt) => {
127            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
128            rewrite_interval_arithmetic(&mut stmt);
129            let sql_str = stmt.to_string();
130            let df = ctx.sql(&sql_str).await?;
131            let stream = df.execute_stream().await?;
132
133            Ok(StreamingSqlResult::Query(QueryResult {
134                stream,
135                query_plan: None,
136            }))
137        }
138        StreamingPlan::DagExplain(_) => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use crate::datafusion::create_streaming_context;
146
147    #[tokio::test]
148    async fn test_execute_ddl_source() {
149        let ctx = create_streaming_context();
150        crate::datafusion::register_streaming_functions(&ctx);
151        let mut planner = StreamingPlanner::new();
152
153        let result = execute_streaming_sql(
154            "CREATE SOURCE events (id INT, name VARCHAR)",
155            &ctx,
156            &mut planner,
157        )
158        .await
159        .unwrap();
160
161        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
162    }
163
164    #[tokio::test]
165    async fn test_execute_ddl_sink() {
166        let ctx = create_streaming_context();
167        crate::datafusion::register_streaming_functions(&ctx);
168        let mut planner = StreamingPlanner::new();
169
170        // Register source first
171        execute_streaming_sql(
172            "CREATE SOURCE events (id INT, name VARCHAR)",
173            &ctx,
174            &mut planner,
175        )
176        .await
177        .unwrap();
178
179        let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
180            .await
181            .unwrap();
182
183        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
184    }
185
186    #[tokio::test]
187    async fn test_execute_empty_sql_error() {
188        let ctx = create_streaming_context();
189        let mut planner = StreamingPlanner::new();
190
191        let result = execute_streaming_sql("", &ctx, &mut planner).await;
192        assert!(result.is_err());
193    }
194
195    #[tokio::test]
196    async fn test_execute_standard_passthrough() {
197        use futures::StreamExt;
198
199        let ctx = create_streaming_context();
200        crate::datafusion::register_streaming_functions(&ctx);
201        let mut planner = StreamingPlanner::new();
202
203        // Simple SELECT 1 goes through DataFusion directly
204        let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
205            .await
206            .unwrap();
207
208        match result {
209            StreamingSqlResult::Query(qr) => {
210                assert!(qr.query_plan.is_none());
211                let mut stream = qr.stream;
212                let batch = stream.next().await.unwrap().unwrap();
213                assert_eq!(batch.num_rows(), 1);
214            }
215            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
216        }
217    }
218
219    #[tokio::test]
220    async fn test_execute_standard_query_with_table() {
221        use arrow_array::{Int64Array, RecordBatch, StringArray};
222        use arrow_schema::{DataType, Field, Schema};
223        use futures::StreamExt;
224        use std::sync::Arc;
225
226        let ctx = create_streaming_context();
227        crate::datafusion::register_streaming_functions(&ctx);
228
229        let schema = Arc::new(Schema::new(vec![
230            Field::new("id", DataType::Int64, false),
231            Field::new("name", DataType::Utf8, true),
232        ]));
233
234        let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
235            &schema,
236        )));
237        let sender = source.take_sender().expect("sender available");
238        let provider = crate::datafusion::StreamingTableProvider::new("users", source);
239        ctx.register_table("users", Arc::new(provider)).unwrap();
240
241        // Send data and close channel
242        let batch = RecordBatch::try_new(
243            Arc::clone(&schema),
244            vec![
245                Arc::new(Int64Array::from(vec![1, 2])),
246                Arc::new(StringArray::from(vec!["alice", "bob"])),
247            ],
248        )
249        .unwrap();
250        sender.send(batch).await.unwrap();
251        drop(sender);
252
253        let mut planner = StreamingPlanner::new();
254        let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
255            .await
256            .unwrap();
257
258        match result {
259            StreamingSqlResult::Query(qr) => {
260                assert!(qr.query_plan.is_none()); // Standard query
261                let mut stream = qr.stream;
262                let mut total = 0;
263                while let Some(batch) = stream.next().await {
264                    total += batch.unwrap().num_rows();
265                }
266                assert_eq!(total, 2);
267            }
268            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
269        }
270    }
271}