laminar_sql/datafusion/
execute.rs1use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
29use crate::parser::parse_streaming_sql;
30use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
31use crate::Error;
32
33#[derive(Debug)]
35pub enum StreamingSqlResult {
36 Ddl(DdlResult),
38 Query(QueryResult),
40}
41
42#[derive(Debug)]
44pub struct DdlResult {
45 pub plan: StreamingPlan,
47}
48
49pub struct QueryResult {
56 pub stream: SendableRecordBatchStream,
58 pub query_plan: Option<QueryPlan>,
63}
64
65impl std::fmt::Debug for QueryResult {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("QueryResult")
68 .field("query_plan", &self.query_plan)
69 .field("stream", &"<SendableRecordBatchStream>")
70 .finish()
71 }
72}
73
74pub async fn execute_streaming_sql(
94 sql: &str,
95 ctx: &SessionContext,
96 planner: &mut StreamingPlanner,
97) -> std::result::Result<StreamingSqlResult, Error> {
98 let statements = parse_streaming_sql(sql)?;
99
100 if statements.is_empty() {
101 return Err(Error::ParseError(
102 crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
103 ));
104 }
105
106 let statement = &statements[0];
108 let plan = planner.plan(statement)?;
109
110 match plan {
111 StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
112 Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
113 }
114 StreamingPlan::Query(mut query_plan) => {
115 rewrite_interval_arithmetic(&mut query_plan.statement);
117 let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
118 let df = ctx.execute_logical_plan(logical_plan).await?;
119 let stream = df.execute_stream().await?;
120
121 Ok(StreamingSqlResult::Query(QueryResult {
122 stream,
123 query_plan: Some(query_plan),
124 }))
125 }
126 StreamingPlan::Standard(mut stmt) => {
127 rewrite_interval_arithmetic(&mut stmt);
129 let sql_str = stmt.to_string();
130 let df = ctx.sql(&sql_str).await?;
131 let stream = df.execute_stream().await?;
132
133 Ok(StreamingSqlResult::Query(QueryResult {
134 stream,
135 query_plan: None,
136 }))
137 }
138 StreamingPlan::DagExplain(_) => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::datafusion::create_streaming_context;
146
147 #[tokio::test]
148 async fn test_execute_ddl_source() {
149 let ctx = create_streaming_context();
150 crate::datafusion::register_streaming_functions(&ctx);
151 let mut planner = StreamingPlanner::new();
152
153 let result = execute_streaming_sql(
154 "CREATE SOURCE events (id INT, name VARCHAR)",
155 &ctx,
156 &mut planner,
157 )
158 .await
159 .unwrap();
160
161 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
162 }
163
164 #[tokio::test]
165 async fn test_execute_ddl_sink() {
166 let ctx = create_streaming_context();
167 crate::datafusion::register_streaming_functions(&ctx);
168 let mut planner = StreamingPlanner::new();
169
170 execute_streaming_sql(
172 "CREATE SOURCE events (id INT, name VARCHAR)",
173 &ctx,
174 &mut planner,
175 )
176 .await
177 .unwrap();
178
179 let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
180 .await
181 .unwrap();
182
183 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
184 }
185
186 #[tokio::test]
187 async fn test_execute_empty_sql_error() {
188 let ctx = create_streaming_context();
189 let mut planner = StreamingPlanner::new();
190
191 let result = execute_streaming_sql("", &ctx, &mut planner).await;
192 assert!(result.is_err());
193 }
194
195 #[tokio::test]
196 async fn test_execute_standard_passthrough() {
197 use futures::StreamExt;
198
199 let ctx = create_streaming_context();
200 crate::datafusion::register_streaming_functions(&ctx);
201 let mut planner = StreamingPlanner::new();
202
203 let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
205 .await
206 .unwrap();
207
208 match result {
209 StreamingSqlResult::Query(qr) => {
210 assert!(qr.query_plan.is_none());
211 let mut stream = qr.stream;
212 let batch = stream.next().await.unwrap().unwrap();
213 assert_eq!(batch.num_rows(), 1);
214 }
215 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
216 }
217 }
218
219 #[tokio::test]
220 async fn test_execute_standard_query_with_table() {
221 use arrow_array::{Int64Array, RecordBatch, StringArray};
222 use arrow_schema::{DataType, Field, Schema};
223 use futures::StreamExt;
224 use std::sync::Arc;
225
226 let ctx = create_streaming_context();
227 crate::datafusion::register_streaming_functions(&ctx);
228
229 let schema = Arc::new(Schema::new(vec![
230 Field::new("id", DataType::Int64, false),
231 Field::new("name", DataType::Utf8, true),
232 ]));
233
234 let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
235 &schema,
236 )));
237 let sender = source.take_sender().expect("sender available");
238 let provider = crate::datafusion::StreamingTableProvider::new("users", source);
239 ctx.register_table("users", Arc::new(provider)).unwrap();
240
241 let batch = RecordBatch::try_new(
243 Arc::clone(&schema),
244 vec![
245 Arc::new(Int64Array::from(vec![1, 2])),
246 Arc::new(StringArray::from(vec!["alice", "bob"])),
247 ],
248 )
249 .unwrap();
250 sender.send(batch).await.unwrap();
251 drop(sender);
252
253 let mut planner = StreamingPlanner::new();
254 let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
255 .await
256 .unwrap();
257
258 match result {
259 StreamingSqlResult::Query(qr) => {
260 assert!(qr.query_plan.is_none()); let mut stream = qr.stream;
262 let mut total = 0;
263 while let Some(batch) = stream.next().await {
264 total += batch.unwrap().num_rows();
265 }
266 assert_eq!(total, 2);
267 }
268 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
269 }
270 }
271}