Skip to main content

laminar_sql/datafusion/
execute.rs

1//! End-to-end streaming SQL execution (F005B)
2//!
3//! Provides [`execute_streaming_sql`] for parsing, planning, and executing
4//! streaming SQL statements through the `DataFusion` engine.
5//!
6//! # Architecture
7//!
8//! ```text
9//! SQL string
10//!     │
11//!     ▼
12//! parse_streaming_sql()  →  StreamingStatement
13//!     │
14//!     ▼
15//! StreamingPlanner::plan()  →  StreamingPlan
16//!     │
17//!     ├─ DDL (CREATE SOURCE/SINK)  →  DdlResult
18//!     │
19//!     ├─ Query (windows/joins)     →  LogicalPlan → DataFrame → stream
20//!     │                                + QueryPlan metadata
21//!     │
22//!     └─ Standard SQL              →  DataFusion ctx.sql() → stream
23//! ```
24
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::parse_streaming_sql;
29use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
30use crate::Error;
31
32/// Result of executing a streaming SQL statement.
33#[derive(Debug)]
34pub enum StreamingSqlResult {
35    /// DDL statement result (CREATE SOURCE, CREATE SINK)
36    Ddl(DdlResult),
37    /// Query execution result with optional streaming metadata
38    Query(QueryResult),
39}
40
41/// Result of a DDL statement execution.
42#[derive(Debug)]
43pub struct DdlResult {
44    /// The streaming plan describing what was created or registered
45    pub plan: StreamingPlan,
46}
47
48/// Result of a query execution.
49///
50/// Contains both the `DataFusion` record batch stream and optional
51/// streaming metadata (window config, join config, emit clause) from
52/// the `QueryPlan`. Ring 0 operators use the `query_plan` to configure
53/// windowing and join behavior.
54pub struct QueryResult {
55    /// Record batch stream from `DataFusion` execution
56    pub stream: SendableRecordBatchStream,
57    /// Streaming query metadata (window config, join config, etc.)
58    ///
59    /// `None` for standard SQL pass-through queries.
60    /// `Some` for queries with streaming features (windows, joins).
61    pub query_plan: Option<QueryPlan>,
62}
63
64impl std::fmt::Debug for QueryResult {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("QueryResult")
67            .field("query_plan", &self.query_plan)
68            .field("stream", &"<SendableRecordBatchStream>")
69            .finish()
70    }
71}
72
73/// Executes a streaming SQL statement end-to-end.
74///
75/// This function performs the full pipeline:
76/// 1. Parse SQL with streaming extensions (CREATE SOURCE/SINK, windows, etc.)
77/// 2. Plan via [`StreamingPlanner`]
78/// 3. For DDL: return the streaming plan as [`DdlResult`]
79/// 4. For queries with streaming features: create `LogicalPlan` via
80///    `DataFusion`, execute, and return stream + [`QueryPlan`] metadata
81/// 5. For standard SQL: pass through to `DataFusion` directly
82///
83/// # Arguments
84///
85/// * `sql` - The SQL statement to execute
86/// * `ctx` - `DataFusion` session context (should have streaming functions registered)
87/// * `planner` - Streaming planner with registered sources/sinks
88///
89/// # Errors
90///
91/// Returns [`Error`] if parsing, planning, or execution fails.
92pub async fn execute_streaming_sql(
93    sql: &str,
94    ctx: &SessionContext,
95    planner: &mut StreamingPlanner,
96) -> std::result::Result<StreamingSqlResult, Error> {
97    let statements = parse_streaming_sql(sql)?;
98
99    if statements.is_empty() {
100        return Err(Error::ParseError(
101            crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
102        ));
103    }
104
105    // Process the first statement
106    let statement = &statements[0];
107    let plan = planner.plan(statement)?;
108
109    match plan {
110        StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
111            Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
112        }
113        StreamingPlan::Query(query_plan) => {
114            let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
115            let df = ctx.execute_logical_plan(logical_plan).await?;
116            let stream = df.execute_stream().await?;
117
118            Ok(StreamingSqlResult::Query(QueryResult {
119                stream,
120                query_plan: Some(query_plan),
121            }))
122        }
123        StreamingPlan::Standard(stmt) => {
124            let sql_str = stmt.to_string();
125            let df = ctx.sql(&sql_str).await?;
126            let stream = df.execute_stream().await?;
127
128            Ok(StreamingSqlResult::Query(QueryResult {
129                stream,
130                query_plan: None,
131            }))
132        }
133        StreamingPlan::DagExplain(_) => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::datafusion::create_streaming_context;
141
142    #[tokio::test]
143    async fn test_execute_ddl_source() {
144        let ctx = create_streaming_context();
145        crate::datafusion::register_streaming_functions(&ctx);
146        let mut planner = StreamingPlanner::new();
147
148        let result = execute_streaming_sql(
149            "CREATE SOURCE events (id INT, name VARCHAR)",
150            &ctx,
151            &mut planner,
152        )
153        .await
154        .unwrap();
155
156        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
157    }
158
159    #[tokio::test]
160    async fn test_execute_ddl_sink() {
161        let ctx = create_streaming_context();
162        crate::datafusion::register_streaming_functions(&ctx);
163        let mut planner = StreamingPlanner::new();
164
165        // Register source first
166        execute_streaming_sql(
167            "CREATE SOURCE events (id INT, name VARCHAR)",
168            &ctx,
169            &mut planner,
170        )
171        .await
172        .unwrap();
173
174        let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
175            .await
176            .unwrap();
177
178        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
179    }
180
181    #[tokio::test]
182    async fn test_execute_empty_sql_error() {
183        let ctx = create_streaming_context();
184        let mut planner = StreamingPlanner::new();
185
186        let result = execute_streaming_sql("", &ctx, &mut planner).await;
187        assert!(result.is_err());
188    }
189
190    #[tokio::test]
191    async fn test_execute_standard_passthrough() {
192        use futures::StreamExt;
193
194        let ctx = create_streaming_context();
195        crate::datafusion::register_streaming_functions(&ctx);
196        let mut planner = StreamingPlanner::new();
197
198        // Simple SELECT 1 goes through DataFusion directly
199        let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
200            .await
201            .unwrap();
202
203        match result {
204            StreamingSqlResult::Query(qr) => {
205                assert!(qr.query_plan.is_none());
206                let mut stream = qr.stream;
207                let batch = stream.next().await.unwrap().unwrap();
208                assert_eq!(batch.num_rows(), 1);
209            }
210            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
211        }
212    }
213
214    #[tokio::test]
215    async fn test_execute_standard_query_with_table() {
216        use arrow_array::{Int64Array, RecordBatch, StringArray};
217        use arrow_schema::{DataType, Field, Schema};
218        use futures::StreamExt;
219        use std::sync::Arc;
220
221        let ctx = create_streaming_context();
222        crate::datafusion::register_streaming_functions(&ctx);
223
224        let schema = Arc::new(Schema::new(vec![
225            Field::new("id", DataType::Int64, false),
226            Field::new("name", DataType::Utf8, true),
227        ]));
228
229        let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
230            &schema,
231        )));
232        let sender = source.take_sender().expect("sender available");
233        let provider = crate::datafusion::StreamingTableProvider::new("users", source);
234        ctx.register_table("users", Arc::new(provider)).unwrap();
235
236        // Send data and close channel
237        let batch = RecordBatch::try_new(
238            Arc::clone(&schema),
239            vec![
240                Arc::new(Int64Array::from(vec![1, 2])),
241                Arc::new(StringArray::from(vec!["alice", "bob"])),
242            ],
243        )
244        .unwrap();
245        sender.send(batch).await.unwrap();
246        drop(sender);
247
248        let mut planner = StreamingPlanner::new();
249        let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
250            .await
251            .unwrap();
252
253        match result {
254            StreamingSqlResult::Query(qr) => {
255                assert!(qr.query_plan.is_none()); // Standard query
256                let mut stream = qr.stream;
257                let mut total = 0;
258                while let Some(batch) = stream.next().await {
259                    total += batch.unwrap().num_rows();
260                }
261                assert_eq!(total, 2);
262            }
263            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
264        }
265    }
266}