datafusion_dft/execution/
local.rs1use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use color_eyre::eyre::eyre;
25use datafusion::logical_expr::LogicalPlan;
26use futures::TryFutureExt;
27use log::{debug, error, info};
28
29use crate::config::ExecutionConfig;
30use color_eyre::eyre::{self, Result};
31use datafusion::execution::{SendableRecordBatchStream, SessionState};
32use datafusion::physical_plan::{execute_stream, ExecutionPlan};
33use datafusion::prelude::*;
34use datafusion::sql::parser::{DFParser, Statement};
35use tokio_stream::StreamExt;
36
37use super::executor::dedicated::DedicatedExecutor;
38use super::local_benchmarks::LocalBenchmarkStats;
39use super::stats::{ExecutionDurationStats, ExecutionStats};
40use super::AppType;
41
42#[derive(Clone)]
56pub struct ExecutionContext {
57 config: ExecutionConfig,
58 session_ctx: SessionContext,
60 ddl_path: Option<PathBuf>,
62 executor: Option<DedicatedExecutor>,
64}
65
66impl std::fmt::Debug for ExecutionContext {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ExecutionContext").finish()
69 }
70}
71
72impl ExecutionContext {
73 pub fn try_new(
75 config: &ExecutionConfig,
76 session_state: SessionState,
77 app_type: AppType,
78 ) -> Result<Self> {
79 let mut executor = None;
80 if let AppType::FlightSQLServer = app_type {
81 if config.dedicated_executor_enabled {
82 let runtime_builder = tokio::runtime::Builder::new_multi_thread();
87 let dedicated_executor =
88 DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder);
89 executor = Some(dedicated_executor)
90 }
91 }
92
93 #[allow(unused_mut)]
94 let mut session_ctx = SessionContext::new_with_state(session_state);
95
96 #[cfg(feature = "functions-json")]
97 datafusion_functions_json::register_all(&mut session_ctx)?;
98
99 let session_ctx = session_ctx.enable_url_table();
101
102 session_ctx.register_udtf(
103 "parquet_metadata",
104 Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}),
105 );
106
107 Ok(Self {
108 config: config.clone(),
109 session_ctx,
110 ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
111 executor,
112 })
113 }
114
115 pub fn config(&self) -> &ExecutionConfig {
116 &self.config
117 }
118
119 pub fn create_tables(&mut self) -> Result<()> {
120 Ok(())
121 }
122
123 pub fn session_ctx(&self) -> &SessionContext {
125 &self.session_ctx
126 }
127
128 pub fn executor(&self) -> &Option<DedicatedExecutor> {
130 &self.executor
131 }
132
133 pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
135 let ctx = self.session_ctx.clone();
136 let task = async move { ctx.state().statement_to_plan(statement).await };
137 if let Some(executor) = &self.executor {
138 let job = executor.spawn(task).map_err(|e| eyre::eyre!(e));
139 let job_res = job.await?;
140 job_res.map_err(|e| eyre!(e))
141 } else {
142 task.await.map_err(|e| eyre!(e))
143 }
144 }
145
146 pub async fn execute_logical_plan(
148 &self,
149 logical_plan: LogicalPlan,
150 ) -> Result<SendableRecordBatchStream> {
151 let ctx = self.session_ctx.clone();
152 let task = async move {
153 let df = ctx.execute_logical_plan(logical_plan).await?;
154 df.execute_stream().await
155 };
156 if let Some(executor) = &self.executor {
157 let job = executor.spawn(task).map_err(|e| eyre!(e));
158 let job_res = job.await?;
159 job_res.map_err(|e| eyre!(e))
160 } else {
161 task.await.map_err(|e| eyre!(e))
162 }
163 }
164
165 pub async fn execute_sql_and_discard_results(
167 &self,
168 sql: &str,
169 ) -> datafusion::error::Result<()> {
170 let mut stream = self.execute_sql(sql).await?;
171 while let Some(maybe_batch) = stream.next().await {
173 maybe_batch?; }
175 Ok(())
176 }
177
178 pub async fn create_physical_plan(
181 &self,
182 sql: &str,
183 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
184 let df = self.session_ctx.sql(sql).await?;
185 df.create_physical_plan().await
186 }
187
188 pub async fn execute_sql(
191 &self,
192 sql: &str,
193 ) -> datafusion::error::Result<SendableRecordBatchStream> {
194 self.session_ctx.sql(sql).await?.execute_stream().await
195 }
196
197 pub async fn execute_statement(
200 &self,
201 statement: Statement,
202 ) -> datafusion::error::Result<SendableRecordBatchStream> {
203 let plan = self
204 .session_ctx
205 .state()
206 .statement_to_plan(statement)
207 .await?;
208 self.session_ctx
209 .execute_logical_plan(plan)
210 .await?
211 .execute_stream()
212 .await
213 }
214
215 pub fn load_ddl(&self) -> Option<String> {
217 info!("Loading DDL from: {:?}", &self.ddl_path);
218 if let Some(ddl_path) = &self.ddl_path {
219 if ddl_path.exists() {
220 let maybe_ddl = std::fs::read_to_string(ddl_path);
221 match maybe_ddl {
222 Ok(ddl) => Some(ddl),
223 Err(err) => {
224 error!("Error reading DDL: {:?}", err);
225 None
226 }
227 }
228 } else {
229 info!("DDL path ({:?}) does not exist", ddl_path);
230 None
231 }
232 } else {
233 info!("No DDL file configured");
234 None
235 }
236 }
237
238 pub fn save_ddl(&self, ddl: String) {
240 info!("Loading DDL from: {:?}", &self.ddl_path);
241 if let Some(ddl_path) = &self.ddl_path {
242 match std::fs::File::create(ddl_path) {
243 Ok(mut f) => match f.write_all(ddl.as_bytes()) {
244 Ok(_) => {
245 info!("Saved DDL file")
246 }
247 Err(e) => {
248 error!("Error writing DDL file: {e}")
249 }
250 },
251 Err(e) => {
252 error!("Error creating or opening DDL file: {e}")
253 }
254 }
255 } else {
256 info!("No DDL file configured");
257 }
258 }
259
260 pub async fn execute_ddl(&self) {
262 match self.load_ddl() {
263 Some(ddl) => {
264 let ddl_statements = ddl.split(';').collect::<Vec<&str>>();
265 for statement in ddl_statements {
266 if statement.trim().is_empty() {
267 continue;
268 }
269 if statement.trim().starts_with("--") {
270 continue;
271 }
272
273 debug!("Executing DDL statement: {:?}", statement);
274 match self.execute_sql_and_discard_results(statement).await {
275 Ok(_) => {
276 info!("DDL statement executed");
277 }
278 Err(e) => {
279 error!("Error executing DDL statement: {e}");
280 }
281 }
282 }
283 }
284 None => {
285 info!("No DDL to execute");
286 }
287 }
288 }
289
290 pub async fn benchmark_query(
292 &self,
293 query: &str,
294 cli_iterations: Option<usize>,
295 ) -> Result<LocalBenchmarkStats> {
296 let iterations = cli_iterations.unwrap_or(self.config.benchmark_iterations);
297 info!("Benchmarking query with {} iterations", iterations);
298 let mut rows_returned = Vec::with_capacity(iterations);
299 let mut logical_planning_durations = Vec::with_capacity(iterations);
300 let mut physical_planning_durations = Vec::with_capacity(iterations);
301 let mut execution_durations = Vec::with_capacity(iterations);
302 let mut total_durations = Vec::with_capacity(iterations);
303 let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
304 let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
305 if statements.len() == 1 {
306 for _ in 0..iterations {
307 let statement = statements[0].clone();
308 let start = std::time::Instant::now();
309 let logical_plan = self
310 .session_ctx()
311 .state()
312 .statement_to_plan(statement)
313 .await?;
314 let logical_planning_duration = start.elapsed();
315 let physical_plan = self
316 .session_ctx()
317 .state()
318 .create_physical_plan(&logical_plan)
319 .await?;
320 let physical_planning_duration = start.elapsed();
321 let task_ctx = self.session_ctx().task_ctx();
322 let mut stream = execute_stream(physical_plan, task_ctx)?;
323 let mut rows = 0;
324 while let Some(b) = stream.next().await {
325 rows += b?.num_rows();
326 }
327 rows_returned.push(rows);
328 let execution_duration = start.elapsed();
329 let total_duration = start.elapsed();
330 logical_planning_durations.push(logical_planning_duration);
331 physical_planning_durations
332 .push(physical_planning_duration - logical_planning_duration);
333 execution_durations.push(execution_duration - physical_planning_duration);
334 total_durations.push(total_duration);
335 }
336 } else {
337 return Err(eyre::eyre!("Only a single statement can be benchmarked"));
338 }
339
340 Ok(LocalBenchmarkStats::new(
341 query.to_string(),
342 rows_returned,
343 logical_planning_durations,
344 physical_planning_durations,
345 execution_durations,
346 total_durations,
347 ))
348 }
349
350 pub async fn analyze_query(&self, query: &str) -> Result<ExecutionStats> {
351 let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
352 let start = std::time::Instant::now();
353 let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
354 let parsing_duration = start.elapsed();
355 if statements.len() == 1 {
356 let statement = statements[0].clone();
357 let logical_plan = self
358 .session_ctx()
359 .state()
360 .statement_to_plan(statement.clone())
361 .await?;
362 let logical_planning_duration = start.elapsed();
363 let physical_plan = self
364 .session_ctx()
365 .state()
366 .create_physical_plan(&logical_plan)
367 .await?;
368 let physical_planning_duration = start.elapsed();
369 let task_ctx = self.session_ctx().task_ctx();
370 let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx)?;
371 let mut rows = 0;
372 let mut batches = 0;
373 let mut bytes = 0;
374 while let Some(b) = stream.next().await {
375 let batch = b?;
376 rows += batch.num_rows();
377 batches += 1;
378 bytes += batch.get_array_memory_size();
379 }
380 let execution_duration = start.elapsed();
381 let durations = ExecutionDurationStats::new(
382 parsing_duration,
383 logical_planning_duration - parsing_duration,
384 physical_planning_duration - logical_planning_duration,
385 execution_duration - physical_planning_duration,
386 start.elapsed(),
387 );
388 ExecutionStats::try_new(
389 query.to_string(),
390 durations,
391 rows,
392 batches,
393 bytes,
394 physical_plan,
395 )
396 } else {
397 Err(eyre::eyre!("Only a single statement can be benchmarked"))
398 }
399
400 }
402}