Skip to main content

laminar_sql/datafusion/
execute.rs

1//! Streaming SQL execution via DataFusion.
2
3use datafusion::execution::SendableRecordBatchStream;
4use datafusion::prelude::SessionContext;
5
6use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
7use crate::parser::parse_streaming_sql;
8use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
9use crate::Error;
10
11/// Result of executing a streaming SQL statement.
12#[derive(Debug)]
13pub enum StreamingSqlResult {
14    /// DDL statement result (CREATE SOURCE, CREATE SINK)
15    Ddl(DdlResult),
16    /// Query execution result with optional streaming metadata
17    Query(QueryResult),
18}
19
20/// Result of a DDL statement execution.
21#[derive(Debug)]
22pub struct DdlResult {
23    /// The streaming plan describing what was created or registered
24    pub plan: StreamingPlan,
25}
26
27/// Result of a query execution.
28///
29/// Contains both the `DataFusion` record batch stream and optional
30/// streaming metadata (window config, join config, emit clause) from
31/// the `QueryPlan`. Ring 0 operators use the `query_plan` to configure
32/// windowing and join behavior.
33pub struct QueryResult {
34    /// Record batch stream from `DataFusion` execution
35    pub stream: SendableRecordBatchStream,
36    /// Streaming query metadata (window config, join config, etc.)
37    ///
38    /// `None` for standard SQL pass-through queries.
39    /// `Some` for queries with streaming features (windows, joins).
40    pub query_plan: Option<QueryPlan>,
41}
42
43impl std::fmt::Debug for QueryResult {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("QueryResult")
46            .field("query_plan", &self.query_plan)
47            .field("stream", &"<SendableRecordBatchStream>")
48            .finish()
49    }
50}
51
52/// Executes a streaming SQL statement end-to-end.
53///
54/// This function performs the full pipeline:
55/// 1. Parse SQL with streaming extensions (CREATE SOURCE/SINK, windows, etc.)
56/// 2. Plan via [`StreamingPlanner`]
57/// 3. For DDL: return the streaming plan as [`DdlResult`]
58/// 4. For queries with streaming features: create `LogicalPlan` via
59///    `DataFusion`, execute, and return stream + [`QueryPlan`] metadata
60/// 5. For standard SQL: pass through to `DataFusion` directly
61///
62/// # Arguments
63///
64/// * `sql` - The SQL statement to execute
65/// * `ctx` - `DataFusion` session context (should have streaming functions registered)
66/// * `planner` - Streaming planner with registered sources/sinks
67///
68/// # Errors
69///
70/// Returns [`Error`] if parsing, planning, or execution fails.
71pub async fn execute_streaming_sql(
72    sql: &str,
73    ctx: &SessionContext,
74    planner: &mut StreamingPlanner,
75) -> std::result::Result<StreamingSqlResult, Error> {
76    let statements = parse_streaming_sql(sql)?;
77
78    if statements.is_empty() {
79        return Err(Error::ParseError(
80            crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
81        ));
82    }
83
84    // Process the first statement
85    let statement = &statements[0];
86    let plan = planner.plan(statement)?;
87
88    match plan {
89        StreamingPlan::Query(mut query_plan) => {
90            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
91            rewrite_interval_arithmetic(&mut query_plan.statement);
92            let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
93            let df = ctx.execute_logical_plan(logical_plan).await?;
94            let stream = df.execute_stream().await?;
95
96            Ok(StreamingSqlResult::Query(QueryResult {
97                stream,
98                query_plan: Some(query_plan),
99            }))
100        }
101        StreamingPlan::Standard(mut stmt) => {
102            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
103            rewrite_interval_arithmetic(&mut stmt);
104            let sql_str = stmt.to_string();
105            let df = ctx.sql(&sql_str).await?;
106            let stream = df.execute_stream().await?;
107
108            Ok(StreamingSqlResult::Query(QueryResult {
109                stream,
110                query_plan: None,
111            }))
112        }
113        StreamingPlan::RegisterSource(_)
114        | StreamingPlan::RegisterSink(_)
115        | StreamingPlan::RegisterLookupTable(_)
116        | StreamingPlan::DropLookupTable { .. } => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::datafusion::create_streaming_context;
124
125    #[tokio::test]
126    async fn test_execute_ddl_source() {
127        let ctx = create_streaming_context();
128        crate::datafusion::register_streaming_functions(&ctx);
129        let mut planner = StreamingPlanner::new();
130
131        let result = execute_streaming_sql(
132            "CREATE SOURCE events (id INT, name VARCHAR)",
133            &ctx,
134            &mut planner,
135        )
136        .await
137        .unwrap();
138
139        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
140    }
141
142    #[tokio::test]
143    async fn test_execute_ddl_sink() {
144        let ctx = create_streaming_context();
145        crate::datafusion::register_streaming_functions(&ctx);
146        let mut planner = StreamingPlanner::new();
147
148        // Register source first
149        execute_streaming_sql(
150            "CREATE SOURCE events (id INT, name VARCHAR)",
151            &ctx,
152            &mut planner,
153        )
154        .await
155        .unwrap();
156
157        let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
158            .await
159            .unwrap();
160
161        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
162    }
163
164    #[tokio::test]
165    async fn test_execute_empty_sql_error() {
166        let ctx = create_streaming_context();
167        let mut planner = StreamingPlanner::new();
168
169        let result = execute_streaming_sql("", &ctx, &mut planner).await;
170        assert!(result.is_err());
171    }
172
173    #[tokio::test]
174    async fn test_execute_standard_passthrough() {
175        use futures::StreamExt;
176
177        let ctx = create_streaming_context();
178        crate::datafusion::register_streaming_functions(&ctx);
179        let mut planner = StreamingPlanner::new();
180
181        // Simple SELECT 1 goes through DataFusion directly
182        let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
183            .await
184            .unwrap();
185
186        match result {
187            StreamingSqlResult::Query(qr) => {
188                assert!(qr.query_plan.is_none());
189                let mut stream = qr.stream;
190                let batch = stream.next().await.unwrap().unwrap();
191                assert_eq!(batch.num_rows(), 1);
192            }
193            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
194        }
195    }
196
197    #[tokio::test]
198    async fn test_execute_standard_query_with_table() {
199        use arrow_array::{Int64Array, RecordBatch, StringArray};
200        use arrow_schema::{DataType, Field, Schema};
201        use futures::StreamExt;
202        use std::sync::Arc;
203
204        let ctx = create_streaming_context();
205        crate::datafusion::register_streaming_functions(&ctx);
206
207        let schema = Arc::new(Schema::new(vec![
208            Field::new("id", DataType::Int64, false),
209            Field::new("name", DataType::Utf8, true),
210        ]));
211
212        let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
213            &schema,
214        )));
215        let sender = source.take_sender().expect("sender available");
216        let provider = crate::datafusion::StreamingTableProvider::new("users", source);
217        ctx.register_table("users", Arc::new(provider)).unwrap();
218
219        // Send data and close channel
220        let batch = RecordBatch::try_new(
221            Arc::clone(&schema),
222            vec![
223                Arc::new(Int64Array::from(vec![1, 2])),
224                Arc::new(StringArray::from(vec!["alice", "bob"])),
225            ],
226        )
227        .unwrap();
228        sender.send(batch).await.unwrap();
229        drop(sender);
230
231        let mut planner = StreamingPlanner::new();
232        let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
233            .await
234            .unwrap();
235
236        match result {
237            StreamingSqlResult::Query(qr) => {
238                assert!(qr.query_plan.is_none()); // Standard query
239                let mut stream = qr.stream;
240                let mut total = 0;
241                while let Some(batch) = stream.next().await {
242                    total += batch.unwrap().num_rows();
243                }
244                assert_eq!(total, 2);
245            }
246            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
247        }
248    }
249}