laminar_sql/datafusion/
execute.rs1use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
29use crate::parser::parse_streaming_sql;
30use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
31use crate::Error;
32
33#[derive(Debug)]
35pub enum StreamingSqlResult {
36 Ddl(DdlResult),
38 Query(QueryResult),
40}
41
42#[derive(Debug)]
44pub struct DdlResult {
45 pub plan: StreamingPlan,
47}
48
49pub struct QueryResult {
56 pub stream: SendableRecordBatchStream,
58 pub query_plan: Option<QueryPlan>,
63}
64
65impl std::fmt::Debug for QueryResult {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("QueryResult")
68 .field("query_plan", &self.query_plan)
69 .field("stream", &"<SendableRecordBatchStream>")
70 .finish()
71 }
72}
73
74pub async fn execute_streaming_sql(
94 sql: &str,
95 ctx: &SessionContext,
96 planner: &mut StreamingPlanner,
97) -> std::result::Result<StreamingSqlResult, Error> {
98 let statements = parse_streaming_sql(sql)?;
99
100 if statements.is_empty() {
101 return Err(Error::ParseError(
102 crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
103 ));
104 }
105
106 let statement = &statements[0];
108 let plan = planner.plan(statement)?;
109
110 match plan {
111 StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
112 Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
113 }
114 StreamingPlan::Query(mut query_plan) => {
115 rewrite_interval_arithmetic(&mut query_plan.statement);
117 let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
118 let df = ctx.execute_logical_plan(logical_plan).await?;
119 let stream = df.execute_stream().await?;
120
121 Ok(StreamingSqlResult::Query(QueryResult {
122 stream,
123 query_plan: Some(query_plan),
124 }))
125 }
126 StreamingPlan::Standard(mut stmt) => {
127 rewrite_interval_arithmetic(&mut stmt);
129 let sql_str = stmt.to_string();
130 let df = ctx.sql(&sql_str).await?;
131 let stream = df.execute_stream().await?;
132
133 Ok(StreamingSqlResult::Query(QueryResult {
134 stream,
135 query_plan: None,
136 }))
137 }
138 StreamingPlan::DagExplain(_)
139 | StreamingPlan::RegisterLookupTable(_)
140 | StreamingPlan::DropLookupTable { .. } => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::datafusion::create_streaming_context;
148
149 #[tokio::test]
150 async fn test_execute_ddl_source() {
151 let ctx = create_streaming_context();
152 crate::datafusion::register_streaming_functions(&ctx);
153 let mut planner = StreamingPlanner::new();
154
155 let result = execute_streaming_sql(
156 "CREATE SOURCE events (id INT, name VARCHAR)",
157 &ctx,
158 &mut planner,
159 )
160 .await
161 .unwrap();
162
163 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
164 }
165
166 #[tokio::test]
167 async fn test_execute_ddl_sink() {
168 let ctx = create_streaming_context();
169 crate::datafusion::register_streaming_functions(&ctx);
170 let mut planner = StreamingPlanner::new();
171
172 execute_streaming_sql(
174 "CREATE SOURCE events (id INT, name VARCHAR)",
175 &ctx,
176 &mut planner,
177 )
178 .await
179 .unwrap();
180
181 let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
182 .await
183 .unwrap();
184
185 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
186 }
187
188 #[tokio::test]
189 async fn test_execute_empty_sql_error() {
190 let ctx = create_streaming_context();
191 let mut planner = StreamingPlanner::new();
192
193 let result = execute_streaming_sql("", &ctx, &mut planner).await;
194 assert!(result.is_err());
195 }
196
197 #[tokio::test]
198 async fn test_execute_standard_passthrough() {
199 use futures::StreamExt;
200
201 let ctx = create_streaming_context();
202 crate::datafusion::register_streaming_functions(&ctx);
203 let mut planner = StreamingPlanner::new();
204
205 let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
207 .await
208 .unwrap();
209
210 match result {
211 StreamingSqlResult::Query(qr) => {
212 assert!(qr.query_plan.is_none());
213 let mut stream = qr.stream;
214 let batch = stream.next().await.unwrap().unwrap();
215 assert_eq!(batch.num_rows(), 1);
216 }
217 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
218 }
219 }
220
221 #[tokio::test]
222 async fn test_execute_standard_query_with_table() {
223 use arrow_array::{Int64Array, RecordBatch, StringArray};
224 use arrow_schema::{DataType, Field, Schema};
225 use futures::StreamExt;
226 use std::sync::Arc;
227
228 let ctx = create_streaming_context();
229 crate::datafusion::register_streaming_functions(&ctx);
230
231 let schema = Arc::new(Schema::new(vec![
232 Field::new("id", DataType::Int64, false),
233 Field::new("name", DataType::Utf8, true),
234 ]));
235
236 let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
237 &schema,
238 )));
239 let sender = source.take_sender().expect("sender available");
240 let provider = crate::datafusion::StreamingTableProvider::new("users", source);
241 ctx.register_table("users", Arc::new(provider)).unwrap();
242
243 let batch = RecordBatch::try_new(
245 Arc::clone(&schema),
246 vec![
247 Arc::new(Int64Array::from(vec![1, 2])),
248 Arc::new(StringArray::from(vec!["alice", "bob"])),
249 ],
250 )
251 .unwrap();
252 sender.send(batch).await.unwrap();
253 drop(sender);
254
255 let mut planner = StreamingPlanner::new();
256 let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
257 .await
258 .unwrap();
259
260 match result {
261 StreamingSqlResult::Query(qr) => {
262 assert!(qr.query_plan.is_none()); let mut stream = qr.stream;
264 let mut total = 0;
265 while let Some(batch) = stream.next().await {
266 total += batch.unwrap().num_rows();
267 }
268 assert_eq!(total, 2);
269 }
270 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
271 }
272 }
273}