datafusion_dft/execution/
local.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
19
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use color_eyre::eyre::eyre;
25use datafusion::logical_expr::LogicalPlan;
26use futures::TryFutureExt;
27use log::{debug, error, info};
28
29use crate::config::ExecutionConfig;
30use color_eyre::eyre::{self, Result};
31use datafusion::execution::{SendableRecordBatchStream, SessionState};
32use datafusion::physical_plan::{execute_stream, ExecutionPlan};
33use datafusion::prelude::*;
34use datafusion::sql::parser::{DFParser, Statement};
35use tokio_stream::StreamExt;
36
37use super::executor::dedicated::DedicatedExecutor;
38use super::local_benchmarks::LocalBenchmarkStats;
39use super::stats::{ExecutionDurationStats, ExecutionStats};
40use super::AppType;
41
42/// Structure for executing queries locally
43///
44/// This context includes both:
45///
46/// 1. The configuration of a [`SessionContext`] with various extensions enabled
47///
48/// 2. The code for running SQL queries
49///
50/// The design goals for this module are to serve as an example of how to integrate
51/// DataFusion into an application and to provide a simple interface for running SQL queries
52/// with the various extensions enabled.
53///
54/// Thus it is important (eventually) not depend on the code in the app crate
55#[derive(Clone)]
56pub struct ExecutionContext {
57    config: ExecutionConfig,
58    /// Underlying `SessionContext`
59    session_ctx: SessionContext,
60    /// Path to the configured DDL file
61    ddl_path: Option<PathBuf>,
62    /// Dedicated executor for running CPU intensive work
63    executor: Option<DedicatedExecutor>,
64}
65
66impl std::fmt::Debug for ExecutionContext {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ExecutionContext").finish()
69    }
70}
71
72impl ExecutionContext {
73    /// Construct a new `ExecutionContext` with the specified configuration
74    pub fn try_new(
75        config: &ExecutionConfig,
76        session_state: SessionState,
77        app_type: AppType,
78    ) -> Result<Self> {
79        let mut executor = None;
80        if let AppType::FlightSQLServer = app_type {
81            if config.dedicated_executor_enabled {
82                // Ideally we would only use `enable_time` but we are still doing
83                // some network requests as part of planning / execution which require network
84                // functionality.
85
86                let runtime_builder = tokio::runtime::Builder::new_multi_thread();
87                let dedicated_executor =
88                    DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder);
89                executor = Some(dedicated_executor)
90            }
91        }
92
93        #[allow(unused_mut)]
94        let mut session_ctx = SessionContext::new_with_state(session_state);
95
96        #[cfg(feature = "functions-json")]
97        datafusion_functions_json::register_all(&mut session_ctx)?;
98
99        // Register Parquet Metadata Function
100        let session_ctx = session_ctx.enable_url_table();
101
102        session_ctx.register_udtf(
103            "parquet_metadata",
104            Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}),
105        );
106
107        Ok(Self {
108            config: config.clone(),
109            session_ctx,
110            ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
111            executor,
112        })
113    }
114
115    pub fn config(&self) -> &ExecutionConfig {
116        &self.config
117    }
118
119    pub fn create_tables(&mut self) -> Result<()> {
120        Ok(())
121    }
122
123    /// Return the inner DataFusion [`SessionContext`]
124    pub fn session_ctx(&self) -> &SessionContext {
125        &self.session_ctx
126    }
127
128    /// Return the inner [`DedicatedExecutor`]
129    pub fn executor(&self) -> &Option<DedicatedExecutor> {
130        &self.executor
131    }
132
133    /// Convert the statement to a `LogicalPlan`.  Uses the [`DedicatedExecutor`] if it is available.
134    pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
135        let ctx = self.session_ctx.clone();
136        let task = async move { ctx.state().statement_to_plan(statement).await };
137        if let Some(executor) = &self.executor {
138            let job = executor.spawn(task).map_err(|e| eyre::eyre!(e));
139            let job_res = job.await?;
140            job_res.map_err(|e| eyre!(e))
141        } else {
142            task.await.map_err(|e| eyre!(e))
143        }
144    }
145
146    /// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`.  Uses the [`DedicatedExecutor`] if it is available.
147    pub async fn execute_logical_plan(
148        &self,
149        logical_plan: LogicalPlan,
150    ) -> Result<SendableRecordBatchStream> {
151        let ctx = self.session_ctx.clone();
152        let task = async move {
153            let df = ctx.execute_logical_plan(logical_plan).await?;
154            df.execute_stream().await
155        };
156        if let Some(executor) = &self.executor {
157            let job = executor.spawn(task).map_err(|e| eyre!(e));
158            let job_res = job.await?;
159            job_res.map_err(|e| eyre!(e))
160        } else {
161            task.await.map_err(|e| eyre!(e))
162        }
163    }
164
165    /// Executes the specified sql string, driving it to completion but discarding any results
166    pub async fn execute_sql_and_discard_results(
167        &self,
168        sql: &str,
169    ) -> datafusion::error::Result<()> {
170        let mut stream = self.execute_sql(sql).await?;
171        // note we don't call collect() to avoid buffering data
172        while let Some(maybe_batch) = stream.next().await {
173            maybe_batch?; // check for errors
174        }
175        Ok(())
176    }
177
178    /// Create a physical plan from the specified SQL string.  This is useful if you want to store
179    /// the plan and collect metrics from it.
180    pub async fn create_physical_plan(
181        &self,
182        sql: &str,
183    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
184        let df = self.session_ctx.sql(sql).await?;
185        df.create_physical_plan().await
186    }
187
188    /// Executes the specified sql string, returning the resulting
189    /// [`SendableRecordBatchStream`] of results
190    pub async fn execute_sql(
191        &self,
192        sql: &str,
193    ) -> datafusion::error::Result<SendableRecordBatchStream> {
194        self.session_ctx.sql(sql).await?.execute_stream().await
195    }
196
197    /// Executes the a pre-parsed DataFusion [`Statement`], returning the
198    /// resulting [`SendableRecordBatchStream`] of results
199    pub async fn execute_statement(
200        &self,
201        statement: Statement,
202    ) -> datafusion::error::Result<SendableRecordBatchStream> {
203        let plan = self
204            .session_ctx
205            .state()
206            .statement_to_plan(statement)
207            .await?;
208        self.session_ctx
209            .execute_logical_plan(plan)
210            .await?
211            .execute_stream()
212            .await
213    }
214
215    /// Load DDL from configured DDL path for execution (so strips out comments and empty lines)
216    pub fn load_ddl(&self) -> Option<String> {
217        info!("Loading DDL from: {:?}", &self.ddl_path);
218        if let Some(ddl_path) = &self.ddl_path {
219            if ddl_path.exists() {
220                let maybe_ddl = std::fs::read_to_string(ddl_path);
221                match maybe_ddl {
222                    Ok(ddl) => Some(ddl),
223                    Err(err) => {
224                        error!("Error reading DDL: {:?}", err);
225                        None
226                    }
227                }
228            } else {
229                info!("DDL path ({:?}) does not exist", ddl_path);
230                None
231            }
232        } else {
233            info!("No DDL file configured");
234            None
235        }
236    }
237
238    /// Save DDL to configured DDL path
239    pub fn save_ddl(&self, ddl: String) {
240        info!("Loading DDL from: {:?}", &self.ddl_path);
241        if let Some(ddl_path) = &self.ddl_path {
242            match std::fs::File::create(ddl_path) {
243                Ok(mut f) => match f.write_all(ddl.as_bytes()) {
244                    Ok(_) => {
245                        info!("Saved DDL file")
246                    }
247                    Err(e) => {
248                        error!("Error writing DDL file: {e}")
249                    }
250                },
251                Err(e) => {
252                    error!("Error creating or opening DDL file: {e}")
253                }
254            }
255        } else {
256            info!("No DDL file configured");
257        }
258    }
259
260    /// Execute DDL statements sequentially
261    pub async fn execute_ddl(&self) {
262        match self.load_ddl() {
263            Some(ddl) => {
264                let ddl_statements = ddl.split(';').collect::<Vec<&str>>();
265                for statement in ddl_statements {
266                    if statement.trim().is_empty() {
267                        continue;
268                    }
269                    if statement.trim().starts_with("--") {
270                        continue;
271                    }
272
273                    debug!("Executing DDL statement: {:?}", statement);
274                    match self.execute_sql_and_discard_results(statement).await {
275                        Ok(_) => {
276                            info!("DDL statement executed");
277                        }
278                        Err(e) => {
279                            error!("Error executing DDL statement: {e}");
280                        }
281                    }
282                }
283            }
284            None => {
285                info!("No DDL to execute");
286            }
287        }
288    }
289
290    /// Benchmark the provided query.  Currently, only a single statement can be benchmarked
291    pub async fn benchmark_query(
292        &self,
293        query: &str,
294        cli_iterations: Option<usize>,
295    ) -> Result<LocalBenchmarkStats> {
296        let iterations = cli_iterations.unwrap_or(self.config.benchmark_iterations);
297        info!("Benchmarking query with {} iterations", iterations);
298        let mut rows_returned = Vec::with_capacity(iterations);
299        let mut logical_planning_durations = Vec::with_capacity(iterations);
300        let mut physical_planning_durations = Vec::with_capacity(iterations);
301        let mut execution_durations = Vec::with_capacity(iterations);
302        let mut total_durations = Vec::with_capacity(iterations);
303        let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
304        let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
305        if statements.len() == 1 {
306            for _ in 0..iterations {
307                let statement = statements[0].clone();
308                let start = std::time::Instant::now();
309                let logical_plan = self
310                    .session_ctx()
311                    .state()
312                    .statement_to_plan(statement)
313                    .await?;
314                let logical_planning_duration = start.elapsed();
315                let physical_plan = self
316                    .session_ctx()
317                    .state()
318                    .create_physical_plan(&logical_plan)
319                    .await?;
320                let physical_planning_duration = start.elapsed();
321                let task_ctx = self.session_ctx().task_ctx();
322                let mut stream = execute_stream(physical_plan, task_ctx)?;
323                let mut rows = 0;
324                while let Some(b) = stream.next().await {
325                    rows += b?.num_rows();
326                }
327                rows_returned.push(rows);
328                let execution_duration = start.elapsed();
329                let total_duration = start.elapsed();
330                logical_planning_durations.push(logical_planning_duration);
331                physical_planning_durations
332                    .push(physical_planning_duration - logical_planning_duration);
333                execution_durations.push(execution_duration - physical_planning_duration);
334                total_durations.push(total_duration);
335            }
336        } else {
337            return Err(eyre::eyre!("Only a single statement can be benchmarked"));
338        }
339
340        Ok(LocalBenchmarkStats::new(
341            query.to_string(),
342            rows_returned,
343            logical_planning_durations,
344            physical_planning_durations,
345            execution_durations,
346            total_durations,
347        ))
348    }
349
350    pub async fn analyze_query(&self, query: &str) -> Result<ExecutionStats> {
351        let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
352        let start = std::time::Instant::now();
353        let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
354        let parsing_duration = start.elapsed();
355        if statements.len() == 1 {
356            let statement = statements[0].clone();
357            let logical_plan = self
358                .session_ctx()
359                .state()
360                .statement_to_plan(statement.clone())
361                .await?;
362            let logical_planning_duration = start.elapsed();
363            let physical_plan = self
364                .session_ctx()
365                .state()
366                .create_physical_plan(&logical_plan)
367                .await?;
368            let physical_planning_duration = start.elapsed();
369            let task_ctx = self.session_ctx().task_ctx();
370            let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx)?;
371            let mut rows = 0;
372            let mut batches = 0;
373            let mut bytes = 0;
374            while let Some(b) = stream.next().await {
375                let batch = b?;
376                rows += batch.num_rows();
377                batches += 1;
378                bytes += batch.get_array_memory_size();
379            }
380            let execution_duration = start.elapsed();
381            let durations = ExecutionDurationStats::new(
382                parsing_duration,
383                logical_planning_duration - parsing_duration,
384                physical_planning_duration - logical_planning_duration,
385                execution_duration - physical_planning_duration,
386                start.elapsed(),
387            );
388            ExecutionStats::try_new(
389                query.to_string(),
390                durations,
391                rows,
392                batches,
393                bytes,
394                physical_plan,
395            )
396        } else {
397            Err(eyre::eyre!("Only a single statement can be benchmarked"))
398        }
399
400        // Ok(())
401    }
402}