laminar_sql/datafusion/
execute.rs1use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::parse_streaming_sql;
29use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
30use crate::Error;
31
32#[derive(Debug)]
34pub enum StreamingSqlResult {
35 Ddl(DdlResult),
37 Query(QueryResult),
39}
40
41#[derive(Debug)]
43pub struct DdlResult {
44 pub plan: StreamingPlan,
46}
47
48pub struct QueryResult {
55 pub stream: SendableRecordBatchStream,
57 pub query_plan: Option<QueryPlan>,
62}
63
64impl std::fmt::Debug for QueryResult {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("QueryResult")
67 .field("query_plan", &self.query_plan)
68 .field("stream", &"<SendableRecordBatchStream>")
69 .finish()
70 }
71}
72
73pub async fn execute_streaming_sql(
93 sql: &str,
94 ctx: &SessionContext,
95 planner: &mut StreamingPlanner,
96) -> std::result::Result<StreamingSqlResult, Error> {
97 let statements = parse_streaming_sql(sql)?;
98
99 if statements.is_empty() {
100 return Err(Error::ParseError(
101 crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
102 ));
103 }
104
105 let statement = &statements[0];
107 let plan = planner.plan(statement)?;
108
109 match plan {
110 StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
111 Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
112 }
113 StreamingPlan::Query(query_plan) => {
114 let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
115 let df = ctx.execute_logical_plan(logical_plan).await?;
116 let stream = df.execute_stream().await?;
117
118 Ok(StreamingSqlResult::Query(QueryResult {
119 stream,
120 query_plan: Some(query_plan),
121 }))
122 }
123 StreamingPlan::Standard(stmt) => {
124 let sql_str = stmt.to_string();
125 let df = ctx.sql(&sql_str).await?;
126 let stream = df.execute_stream().await?;
127
128 Ok(StreamingSqlResult::Query(QueryResult {
129 stream,
130 query_plan: None,
131 }))
132 }
133 StreamingPlan::DagExplain(_) => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::datafusion::create_streaming_context;
141
142 #[tokio::test]
143 async fn test_execute_ddl_source() {
144 let ctx = create_streaming_context();
145 crate::datafusion::register_streaming_functions(&ctx);
146 let mut planner = StreamingPlanner::new();
147
148 let result = execute_streaming_sql(
149 "CREATE SOURCE events (id INT, name VARCHAR)",
150 &ctx,
151 &mut planner,
152 )
153 .await
154 .unwrap();
155
156 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
157 }
158
159 #[tokio::test]
160 async fn test_execute_ddl_sink() {
161 let ctx = create_streaming_context();
162 crate::datafusion::register_streaming_functions(&ctx);
163 let mut planner = StreamingPlanner::new();
164
165 execute_streaming_sql(
167 "CREATE SOURCE events (id INT, name VARCHAR)",
168 &ctx,
169 &mut planner,
170 )
171 .await
172 .unwrap();
173
174 let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
175 .await
176 .unwrap();
177
178 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
179 }
180
181 #[tokio::test]
182 async fn test_execute_empty_sql_error() {
183 let ctx = create_streaming_context();
184 let mut planner = StreamingPlanner::new();
185
186 let result = execute_streaming_sql("", &ctx, &mut planner).await;
187 assert!(result.is_err());
188 }
189
190 #[tokio::test]
191 async fn test_execute_standard_passthrough() {
192 use futures::StreamExt;
193
194 let ctx = create_streaming_context();
195 crate::datafusion::register_streaming_functions(&ctx);
196 let mut planner = StreamingPlanner::new();
197
198 let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
200 .await
201 .unwrap();
202
203 match result {
204 StreamingSqlResult::Query(qr) => {
205 assert!(qr.query_plan.is_none());
206 let mut stream = qr.stream;
207 let batch = stream.next().await.unwrap().unwrap();
208 assert_eq!(batch.num_rows(), 1);
209 }
210 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
211 }
212 }
213
214 #[tokio::test]
215 async fn test_execute_standard_query_with_table() {
216 use arrow_array::{Int64Array, RecordBatch, StringArray};
217 use arrow_schema::{DataType, Field, Schema};
218 use futures::StreamExt;
219 use std::sync::Arc;
220
221 let ctx = create_streaming_context();
222 crate::datafusion::register_streaming_functions(&ctx);
223
224 let schema = Arc::new(Schema::new(vec![
225 Field::new("id", DataType::Int64, false),
226 Field::new("name", DataType::Utf8, true),
227 ]));
228
229 let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
230 &schema,
231 )));
232 let sender = source.take_sender().expect("sender available");
233 let provider = crate::datafusion::StreamingTableProvider::new("users", source);
234 ctx.register_table("users", Arc::new(provider)).unwrap();
235
236 let batch = RecordBatch::try_new(
238 Arc::clone(&schema),
239 vec![
240 Arc::new(Int64Array::from(vec![1, 2])),
241 Arc::new(StringArray::from(vec!["alice", "bob"])),
242 ],
243 )
244 .unwrap();
245 sender.send(batch).await.unwrap();
246 drop(sender);
247
248 let mut planner = StreamingPlanner::new();
249 let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
250 .await
251 .unwrap();
252
253 match result {
254 StreamingSqlResult::Query(qr) => {
255 assert!(qr.query_plan.is_none()); let mut stream = qr.stream;
257 let mut total = 0;
258 while let Some(batch) = stream.next().await {
259 total += batch.unwrap().num_rows();
260 }
261 assert_eq!(total, 2);
262 }
263 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
264 }
265 }
266}