datafusion_tui/app/datafusion/
context.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Context (remote or local)
19
20use arrow::record_batch::RecordBatch;
21use datafusion::dataframe::DataFrame;
22use datafusion::error::{DataFusionError, Result};
23use datafusion::execution::context::{SessionConfig, SessionContext};
24
25use log::{debug, error, info};
26use std::fs::File;
27use std::io::{BufRead, BufReader};
28use std::sync::Arc;
29
30use crate::app::ui::Scroll;
31
32#[derive(Clone, Debug, PartialEq)]
33pub struct QueryResultsMeta {
34    pub query: String,
35    pub succeeded: bool,
36    pub error: Option<String>,
37    pub rows: usize,
38    pub query_duration: f64,
39}
40
41pub struct QueryResults {
42    pub batches: Vec<RecordBatch>,
43    pub pretty_batches: String,
44    pub meta: QueryResultsMeta,
45    pub scroll: Scroll,
46}
47
48impl QueryResults {
49    pub fn format_timing_info(&self) -> String {
50        format!(
51            "[ {} {} in set. Query took {:.3} seconds ] ",
52            self.meta.rows,
53            if self.meta.rows == 1 { "row" } else { "rows" },
54            self.meta.query_duration
55        )
56    }
57}
58
59/// The CLI supports using a local DataFusion context or a distributed BallistaContext
60pub enum Context {
61    /// In-process execution with DataFusion
62    Local(SessionContext),
63    /// Distributed execution with Ballista (if available)
64    Remote(BallistaContext),
65}
66
67impl Context {
68    /// create a new remote context with given host and port
69    pub async fn new_remote(host: &str, port: u16) -> Result<Context> {
70        debug!("Created BallistaContext @ {:?}:{:?}", host, port);
71        Ok(Context::Remote(BallistaContext::try_new(host, port).await?))
72    }
73
74    /// create a local context using the given config
75    pub async fn new_local(config: &SessionConfig) -> Context {
76        debug!("Created ExecutionContext");
77        let ctx = SessionContext::with_config(config.clone());
78
79        #[cfg(feature = "s3")]
80        use crate::app::datafusion::object_stores::register_s3;
81        #[cfg(feature = "s3")]
82        let ctx = register_s3(ctx).await;
83
84        #[cfg(feature = "bigtable")]
85        use crate::app::datafusion::table_providers::register_bigtable;
86        #[cfg(feature = "bigtable")]
87        let ctx = register_bigtable(ctx).await;
88
89        Context::Local(ctx)
90    }
91
92    /// execute an SQL statement against the context
93    pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
94        info!("Executing SQL: {:?}", sql);
95        match self {
96            Context::Local(datafusion) => datafusion.sql(sql).await,
97            Context::Remote(ballista) => ballista.sql(sql).await,
98        }
99    }
100
101    pub async fn exec_files(&mut self, files: Vec<String>) {
102        let files = files
103            .into_iter()
104            .map(|file_path| File::open(file_path).unwrap())
105            .collect::<Vec<_>>();
106        for file in files {
107            let mut reader = BufReader::new(file);
108            exec_from_lines(self, &mut reader).await;
109        }
110    }
111
112    pub fn format_execution_config(&self) -> Option<Vec<String>> {
113        match self {
114            Context::Local(ctx) => {
115                let mut config = Vec::new();
116                let cfg = ctx.copied_config();
117                debug!("Extracting ExecutionConfig attributes");
118                config.push(format!("Target Partitions: {}", cfg.target_partitions));
119                config.push(format!("Repartition Joins: {}", cfg.repartition_joins));
120                config.push(format!(
121                    "Repartition Aggregations: {}",
122                    cfg.repartition_aggregations
123                ));
124                config.push(format!("Repartition Windows: {}", cfg.repartition_windows));
125                Some(config)
126            }
127            Context::Remote(_) => None,
128        }
129    }
130
131    pub fn format_physical_optimizers(&self) -> Option<Vec<String>> {
132        match self {
133            Context::Local(ctx) => {
134                let physical_opts = ctx.state.read().physical_optimizers.clone();
135                debug!("Extracting physical optimizer rules");
136                let opts = physical_opts
137                    .iter()
138                    .map(|opt| opt.name().to_string())
139                    .collect();
140                Some(opts)
141            }
142            Context::Remote(_) => None,
143        }
144    }
145}
146
147async fn exec_from_lines(ctx: &mut Context, reader: &mut BufReader<File>) {
148    let mut query = "".to_owned();
149
150    for line in reader.lines() {
151        match line {
152            Ok(line) if line.starts_with("--") => {
153                continue;
154            }
155            Ok(line) => {
156                let line = line.trim_end();
157                query.push_str(line);
158                if line.ends_with(';') {
159                    match exec_and_print(ctx, query).await {
160                        Ok(_) => {}
161                        Err(err) => error!("{:?}", err),
162                    }
163                    query = "".to_owned();
164                } else {
165                    query.push('\n');
166                }
167            }
168            _ => {
169                break;
170            }
171        }
172    }
173
174    // run the left over query if the last statement doesn't contain ‘;’
175    if !query.is_empty() {
176        match exec_and_print(ctx, query).await {
177            Ok(_) => {}
178            Err(err) => error!("{:?}", err),
179        }
180    }
181}
182
183async fn exec_and_print(ctx: &mut Context, sql: String) -> Result<()> {
184    let _df = ctx.sql(&sql).await?;
185    Ok(())
186}
187
188// implement wrappers around the BallistaContext to support running without ballista
189
190// Feature added but not tested as cant install from crates
191#[cfg(feature = "ballista")]
192use ballista;
193#[cfg(feature = "ballista")]
194pub struct BallistaContext(ballista::context::BallistaContext);
195#[cfg(feature = "ballista")]
196impl BallistaContext {
197    pub async fn try_new(host: &str, port: u16) -> Result<Self> {
198        use ballista::context::BallistaContext;
199        use ballista::prelude::BallistaConfig;
200        let builder = BallistaConfig::builder().set("ballista.with_information_schema", "true");
201        let config = builder
202            .build()
203            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
204        let remote_ctx = BallistaContext::remote(host, port, &config)
205            .await
206            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
207        Ok(Self(remote_ctx))
208    }
209    pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
210        self.0.sql(sql).await
211    }
212}
213
214#[cfg(not(feature = "ballista"))]
215pub struct BallistaContext();
216#[cfg(not(feature = "ballista"))]
217impl BallistaContext {
218    pub async fn try_new(_host: &str, _port: u16) -> Result<Self> {
219        Err(DataFusionError::NotImplemented(
220            "Remote execution not supported. Compile with feature 'ballista' to enable".to_string(),
221        ))
222    }
223    pub async fn sql(&mut self, _sql: &str) -> Result<Arc<DataFrame>> {
224        unreachable!()
225    }
226}
227
228#[cfg(test)]
229mod test {
230    use crate::app::core::{App, TabItem};
231    use crate::app::datafusion::context::{QueryResults, QueryResultsMeta};
232    use crate::app::handlers::execute_query;
233    use crate::app::ui::Scroll;
234    use crate::cli::args::mock_standard_args;
235    use crate::utils::test_util::assert_results_eq;
236
237    #[test]
238    fn test_tab_item_from_char() {
239        assert!(TabItem::try_from('0').is_err());
240        assert_eq!(TabItem::Editor, TabItem::try_from('1').unwrap());
241        assert_eq!(TabItem::QueryHistory, TabItem::try_from('2').unwrap());
242        assert_eq!(TabItem::Context, TabItem::try_from('3').unwrap());
243        assert_eq!(TabItem::Logs, TabItem::try_from('4').unwrap());
244        assert!(TabItem::try_from('5').is_err());
245    }
246
247    #[test]
248    fn test_tab_item_to_usize() {
249        (0_usize..TabItem::all_values().len()).for_each(|i| {
250            assert_eq!(
251                TabItem::all_values()[i],
252                TabItem::try_from(format!("{}", i + 1).chars().next().unwrap()).unwrap()
253            );
254            assert_eq!(TabItem::all_values()[i].list_index(), i);
255        });
256    }
257
258    #[tokio::test]
259    async fn test_select() {
260        let args = mock_standard_args();
261        let mut app = App::new(args).await;
262
263        let query = "SELECT 1";
264        for char in query.chars() {
265            app.editor.input.append_char(char).unwrap();
266        }
267
268        execute_query(&mut app).await.unwrap();
269
270        let results = app.query_results.unwrap();
271
272        let expected_meta = QueryResultsMeta {
273            query: query.to_string(),
274            succeeded: true,
275            error: None,
276            rows: 1,
277            query_duration: 0f64,
278        };
279
280        let expected_results = QueryResults {
281            batches: Vec::new(),
282            pretty_batches: String::new(),
283            meta: expected_meta,
284            scroll: Scroll { x: 0, y: 0 },
285        };
286
287        assert_results_eq(Some(results), Some(expected_results));
288    }
289
290    #[tokio::test]
291    async fn test_select_with_typo() {
292        let args = mock_standard_args();
293        let mut app = App::new(args).await;
294
295        let query = "SELE 1";
296        for char in query.chars() {
297            app.editor.input.append_char(char).unwrap();
298        }
299
300        execute_query(&mut app).await.unwrap();
301
302        let actual = app.editor.history.pop();
303
304        let expected_meta = QueryResultsMeta {
305            query: query.to_string(),
306            succeeded: false,
307            error: Some(
308                "SQL error: ParserError(\"Expected an SQL statement, found: SELE\")".to_string(),
309            ),
310            rows: 0,
311            query_duration: 0f64,
312        };
313
314        assert_eq!(actual, Some(expected_meta));
315    }
316
317    #[tokio::test]
318    async fn test_create_table() {
319        let args = mock_standard_args();
320        let mut app = App::new(args).await;
321
322        let query = "CREATE TABLE abc AS VALUES (1,2,3)";
323
324        for char in query.chars() {
325            app.editor.input.append_char(char).unwrap();
326        }
327
328        execute_query(&mut app).await.unwrap();
329
330        let results = app.query_results;
331
332        // Need to look into rows here.  Originally I believe creating a table
333        // returned no rows but now it appears to return the created table.
334        // It seems this is from datafusion side but not certain.
335        let expected_meta = QueryResultsMeta {
336            query: query.to_string(),
337            succeeded: true,
338            error: None,
339            rows: 1,
340            query_duration: 0f64,
341        };
342
343        let expected_results = QueryResults {
344            batches: Vec::new(),
345            pretty_batches: String::new(),
346            meta: expected_meta,
347            scroll: Scroll { x: 0, y: 0 },
348        };
349
350        assert_results_eq(results, Some(expected_results));
351    }
352
353    #[tokio::test]
354    async fn test_multiple_queries() {
355        let args = mock_standard_args();
356        let mut app = App::new(args).await;
357
358        let query = "SELECT 1;SELECT 2;";
359        for char in query.chars() {
360            app.editor.input.append_char(char).unwrap();
361        }
362
363        execute_query(&mut app).await.unwrap();
364
365        let results = app.query_results.unwrap();
366
367        let expected_meta = QueryResultsMeta {
368            query: "SELECT 2".to_string(),
369            succeeded: true,
370            error: None,
371            rows: 1,
372            query_duration: 0f64,
373        };
374
375        let expected_results = QueryResults {
376            batches: Vec::new(),
377            pretty_batches: String::new(),
378            meta: expected_meta,
379            scroll: Scroll { x: 0, y: 0 },
380        };
381
382        assert_results_eq(Some(results), Some(expected_results));
383    }
384}