laminar_sql/datafusion/
execute.rs1use datafusion::execution::SendableRecordBatchStream;
4use datafusion::prelude::SessionContext;
5
6use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
7use crate::parser::parse_streaming_sql;
8use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
9use crate::Error;
10
11#[derive(Debug)]
13pub enum StreamingSqlResult {
14 Ddl(DdlResult),
16 Query(QueryResult),
18}
19
20#[derive(Debug)]
22pub struct DdlResult {
23 pub plan: StreamingPlan,
25}
26
27pub struct QueryResult {
34 pub stream: SendableRecordBatchStream,
36 pub query_plan: Option<QueryPlan>,
41}
42
43impl std::fmt::Debug for QueryResult {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("QueryResult")
46 .field("query_plan", &self.query_plan)
47 .field("stream", &"<SendableRecordBatchStream>")
48 .finish()
49 }
50}
51
52pub async fn execute_streaming_sql(
72 sql: &str,
73 ctx: &SessionContext,
74 planner: &mut StreamingPlanner,
75) -> std::result::Result<StreamingSqlResult, Error> {
76 let statements = parse_streaming_sql(sql)?;
77
78 if statements.is_empty() {
79 return Err(Error::ParseError(
80 crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
81 ));
82 }
83
84 let statement = &statements[0];
86 let plan = planner.plan(statement)?;
87
88 match plan {
89 StreamingPlan::Query(mut query_plan) => {
90 rewrite_interval_arithmetic(&mut query_plan.statement);
92 let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
93 let df = ctx.execute_logical_plan(logical_plan).await?;
94 let stream = df.execute_stream().await?;
95
96 Ok(StreamingSqlResult::Query(QueryResult {
97 stream,
98 query_plan: Some(query_plan),
99 }))
100 }
101 StreamingPlan::Standard(mut stmt) => {
102 rewrite_interval_arithmetic(&mut stmt);
104 let sql_str = stmt.to_string();
105 let df = ctx.sql(&sql_str).await?;
106 let stream = df.execute_stream().await?;
107
108 Ok(StreamingSqlResult::Query(QueryResult {
109 stream,
110 query_plan: None,
111 }))
112 }
113 StreamingPlan::RegisterSource(_)
114 | StreamingPlan::RegisterSink(_)
115 | StreamingPlan::RegisterLookupTable(_)
116 | StreamingPlan::DropLookupTable { .. } => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use crate::datafusion::create_streaming_context;
124
125 #[tokio::test]
126 async fn test_execute_ddl_source() {
127 let ctx = create_streaming_context();
128 crate::datafusion::register_streaming_functions(&ctx);
129 let mut planner = StreamingPlanner::new();
130
131 let result = execute_streaming_sql(
132 "CREATE SOURCE events (id INT, name VARCHAR)",
133 &ctx,
134 &mut planner,
135 )
136 .await
137 .unwrap();
138
139 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
140 }
141
142 #[tokio::test]
143 async fn test_execute_ddl_sink() {
144 let ctx = create_streaming_context();
145 crate::datafusion::register_streaming_functions(&ctx);
146 let mut planner = StreamingPlanner::new();
147
148 execute_streaming_sql(
150 "CREATE SOURCE events (id INT, name VARCHAR)",
151 &ctx,
152 &mut planner,
153 )
154 .await
155 .unwrap();
156
157 let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
158 .await
159 .unwrap();
160
161 assert!(matches!(result, StreamingSqlResult::Ddl(_)));
162 }
163
164 #[tokio::test]
165 async fn test_execute_empty_sql_error() {
166 let ctx = create_streaming_context();
167 let mut planner = StreamingPlanner::new();
168
169 let result = execute_streaming_sql("", &ctx, &mut planner).await;
170 assert!(result.is_err());
171 }
172
173 #[tokio::test]
174 async fn test_execute_standard_passthrough() {
175 use futures::StreamExt;
176
177 let ctx = create_streaming_context();
178 crate::datafusion::register_streaming_functions(&ctx);
179 let mut planner = StreamingPlanner::new();
180
181 let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
183 .await
184 .unwrap();
185
186 match result {
187 StreamingSqlResult::Query(qr) => {
188 assert!(qr.query_plan.is_none());
189 let mut stream = qr.stream;
190 let batch = stream.next().await.unwrap().unwrap();
191 assert_eq!(batch.num_rows(), 1);
192 }
193 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
194 }
195 }
196
197 #[tokio::test]
198 async fn test_execute_standard_query_with_table() {
199 use arrow_array::{Int64Array, RecordBatch, StringArray};
200 use arrow_schema::{DataType, Field, Schema};
201 use futures::StreamExt;
202 use std::sync::Arc;
203
204 let ctx = create_streaming_context();
205 crate::datafusion::register_streaming_functions(&ctx);
206
207 let schema = Arc::new(Schema::new(vec![
208 Field::new("id", DataType::Int64, false),
209 Field::new("name", DataType::Utf8, true),
210 ]));
211
212 let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
213 &schema,
214 )));
215 let sender = source.take_sender().expect("sender available");
216 let provider = crate::datafusion::StreamingTableProvider::new("users", source);
217 ctx.register_table("users", Arc::new(provider)).unwrap();
218
219 let batch = RecordBatch::try_new(
221 Arc::clone(&schema),
222 vec![
223 Arc::new(Int64Array::from(vec![1, 2])),
224 Arc::new(StringArray::from(vec!["alice", "bob"])),
225 ],
226 )
227 .unwrap();
228 sender.send(batch).await.unwrap();
229 drop(sender);
230
231 let mut planner = StreamingPlanner::new();
232 let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
233 .await
234 .unwrap();
235
236 match result {
237 StreamingSqlResult::Query(qr) => {
238 assert!(qr.query_plan.is_none()); let mut stream = qr.stream;
240 let mut total = 0;
241 while let Some(batch) = stream.next().await {
242 total += batch.unwrap().num_rows();
243 }
244 assert_eq!(total, 2);
245 }
246 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
247 }
248 }
249}