datafusion_tui/app/datafusion/
context.rs1use arrow::record_batch::RecordBatch;
21use datafusion::dataframe::DataFrame;
22use datafusion::error::{DataFusionError, Result};
23use datafusion::execution::context::{SessionConfig, SessionContext};
24
25use log::{debug, error, info};
26use std::fs::File;
27use std::io::{BufRead, BufReader};
28use std::sync::Arc;
29
30use crate::app::ui::Scroll;
31
32#[derive(Clone, Debug, PartialEq)]
33pub struct QueryResultsMeta {
34 pub query: String,
35 pub succeeded: bool,
36 pub error: Option<String>,
37 pub rows: usize,
38 pub query_duration: f64,
39}
40
41pub struct QueryResults {
42 pub batches: Vec<RecordBatch>,
43 pub pretty_batches: String,
44 pub meta: QueryResultsMeta,
45 pub scroll: Scroll,
46}
47
48impl QueryResults {
49 pub fn format_timing_info(&self) -> String {
50 format!(
51 "[ {} {} in set. Query took {:.3} seconds ] ",
52 self.meta.rows,
53 if self.meta.rows == 1 { "row" } else { "rows" },
54 self.meta.query_duration
55 )
56 }
57}
58
59pub enum Context {
61 Local(SessionContext),
63 Remote(BallistaContext),
65}
66
67impl Context {
68 pub async fn new_remote(host: &str, port: u16) -> Result<Context> {
70 debug!("Created BallistaContext @ {:?}:{:?}", host, port);
71 Ok(Context::Remote(BallistaContext::try_new(host, port).await?))
72 }
73
74 pub async fn new_local(config: &SessionConfig) -> Context {
76 debug!("Created ExecutionContext");
77 let ctx = SessionContext::with_config(config.clone());
78
79 #[cfg(feature = "s3")]
80 use crate::app::datafusion::object_stores::register_s3;
81 #[cfg(feature = "s3")]
82 let ctx = register_s3(ctx).await;
83
84 #[cfg(feature = "bigtable")]
85 use crate::app::datafusion::table_providers::register_bigtable;
86 #[cfg(feature = "bigtable")]
87 let ctx = register_bigtable(ctx).await;
88
89 Context::Local(ctx)
90 }
91
92 pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
94 info!("Executing SQL: {:?}", sql);
95 match self {
96 Context::Local(datafusion) => datafusion.sql(sql).await,
97 Context::Remote(ballista) => ballista.sql(sql).await,
98 }
99 }
100
101 pub async fn exec_files(&mut self, files: Vec<String>) {
102 let files = files
103 .into_iter()
104 .map(|file_path| File::open(file_path).unwrap())
105 .collect::<Vec<_>>();
106 for file in files {
107 let mut reader = BufReader::new(file);
108 exec_from_lines(self, &mut reader).await;
109 }
110 }
111
112 pub fn format_execution_config(&self) -> Option<Vec<String>> {
113 match self {
114 Context::Local(ctx) => {
115 let mut config = Vec::new();
116 let cfg = ctx.copied_config();
117 debug!("Extracting ExecutionConfig attributes");
118 config.push(format!("Target Partitions: {}", cfg.target_partitions));
119 config.push(format!("Repartition Joins: {}", cfg.repartition_joins));
120 config.push(format!(
121 "Repartition Aggregations: {}",
122 cfg.repartition_aggregations
123 ));
124 config.push(format!("Repartition Windows: {}", cfg.repartition_windows));
125 Some(config)
126 }
127 Context::Remote(_) => None,
128 }
129 }
130
131 pub fn format_physical_optimizers(&self) -> Option<Vec<String>> {
132 match self {
133 Context::Local(ctx) => {
134 let physical_opts = ctx.state.read().physical_optimizers.clone();
135 debug!("Extracting physical optimizer rules");
136 let opts = physical_opts
137 .iter()
138 .map(|opt| opt.name().to_string())
139 .collect();
140 Some(opts)
141 }
142 Context::Remote(_) => None,
143 }
144 }
145}
146
147async fn exec_from_lines(ctx: &mut Context, reader: &mut BufReader<File>) {
148 let mut query = "".to_owned();
149
150 for line in reader.lines() {
151 match line {
152 Ok(line) if line.starts_with("--") => {
153 continue;
154 }
155 Ok(line) => {
156 let line = line.trim_end();
157 query.push_str(line);
158 if line.ends_with(';') {
159 match exec_and_print(ctx, query).await {
160 Ok(_) => {}
161 Err(err) => error!("{:?}", err),
162 }
163 query = "".to_owned();
164 } else {
165 query.push('\n');
166 }
167 }
168 _ => {
169 break;
170 }
171 }
172 }
173
174 if !query.is_empty() {
176 match exec_and_print(ctx, query).await {
177 Ok(_) => {}
178 Err(err) => error!("{:?}", err),
179 }
180 }
181}
182
183async fn exec_and_print(ctx: &mut Context, sql: String) -> Result<()> {
184 let _df = ctx.sql(&sql).await?;
185 Ok(())
186}
187
188#[cfg(feature = "ballista")]
192use ballista;
193#[cfg(feature = "ballista")]
194pub struct BallistaContext(ballista::context::BallistaContext);
195#[cfg(feature = "ballista")]
196impl BallistaContext {
197 pub async fn try_new(host: &str, port: u16) -> Result<Self> {
198 use ballista::context::BallistaContext;
199 use ballista::prelude::BallistaConfig;
200 let builder = BallistaConfig::builder().set("ballista.with_information_schema", "true");
201 let config = builder
202 .build()
203 .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
204 let remote_ctx = BallistaContext::remote(host, port, &config)
205 .await
206 .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
207 Ok(Self(remote_ctx))
208 }
209 pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
210 self.0.sql(sql).await
211 }
212}
213
214#[cfg(not(feature = "ballista"))]
215pub struct BallistaContext();
216#[cfg(not(feature = "ballista"))]
217impl BallistaContext {
218 pub async fn try_new(_host: &str, _port: u16) -> Result<Self> {
219 Err(DataFusionError::NotImplemented(
220 "Remote execution not supported. Compile with feature 'ballista' to enable".to_string(),
221 ))
222 }
223 pub async fn sql(&mut self, _sql: &str) -> Result<Arc<DataFrame>> {
224 unreachable!()
225 }
226}
227
228#[cfg(test)]
229mod test {
230 use crate::app::core::{App, TabItem};
231 use crate::app::datafusion::context::{QueryResults, QueryResultsMeta};
232 use crate::app::handlers::execute_query;
233 use crate::app::ui::Scroll;
234 use crate::cli::args::mock_standard_args;
235 use crate::utils::test_util::assert_results_eq;
236
237 #[test]
238 fn test_tab_item_from_char() {
239 assert!(TabItem::try_from('0').is_err());
240 assert_eq!(TabItem::Editor, TabItem::try_from('1').unwrap());
241 assert_eq!(TabItem::QueryHistory, TabItem::try_from('2').unwrap());
242 assert_eq!(TabItem::Context, TabItem::try_from('3').unwrap());
243 assert_eq!(TabItem::Logs, TabItem::try_from('4').unwrap());
244 assert!(TabItem::try_from('5').is_err());
245 }
246
247 #[test]
248 fn test_tab_item_to_usize() {
249 (0_usize..TabItem::all_values().len()).for_each(|i| {
250 assert_eq!(
251 TabItem::all_values()[i],
252 TabItem::try_from(format!("{}", i + 1).chars().next().unwrap()).unwrap()
253 );
254 assert_eq!(TabItem::all_values()[i].list_index(), i);
255 });
256 }
257
258 #[tokio::test]
259 async fn test_select() {
260 let args = mock_standard_args();
261 let mut app = App::new(args).await;
262
263 let query = "SELECT 1";
264 for char in query.chars() {
265 app.editor.input.append_char(char).unwrap();
266 }
267
268 execute_query(&mut app).await.unwrap();
269
270 let results = app.query_results.unwrap();
271
272 let expected_meta = QueryResultsMeta {
273 query: query.to_string(),
274 succeeded: true,
275 error: None,
276 rows: 1,
277 query_duration: 0f64,
278 };
279
280 let expected_results = QueryResults {
281 batches: Vec::new(),
282 pretty_batches: String::new(),
283 meta: expected_meta,
284 scroll: Scroll { x: 0, y: 0 },
285 };
286
287 assert_results_eq(Some(results), Some(expected_results));
288 }
289
290 #[tokio::test]
291 async fn test_select_with_typo() {
292 let args = mock_standard_args();
293 let mut app = App::new(args).await;
294
295 let query = "SELE 1";
296 for char in query.chars() {
297 app.editor.input.append_char(char).unwrap();
298 }
299
300 execute_query(&mut app).await.unwrap();
301
302 let actual = app.editor.history.pop();
303
304 let expected_meta = QueryResultsMeta {
305 query: query.to_string(),
306 succeeded: false,
307 error: Some(
308 "SQL error: ParserError(\"Expected an SQL statement, found: SELE\")".to_string(),
309 ),
310 rows: 0,
311 query_duration: 0f64,
312 };
313
314 assert_eq!(actual, Some(expected_meta));
315 }
316
317 #[tokio::test]
318 async fn test_create_table() {
319 let args = mock_standard_args();
320 let mut app = App::new(args).await;
321
322 let query = "CREATE TABLE abc AS VALUES (1,2,3)";
323
324 for char in query.chars() {
325 app.editor.input.append_char(char).unwrap();
326 }
327
328 execute_query(&mut app).await.unwrap();
329
330 let results = app.query_results;
331
332 let expected_meta = QueryResultsMeta {
336 query: query.to_string(),
337 succeeded: true,
338 error: None,
339 rows: 1,
340 query_duration: 0f64,
341 };
342
343 let expected_results = QueryResults {
344 batches: Vec::new(),
345 pretty_batches: String::new(),
346 meta: expected_meta,
347 scroll: Scroll { x: 0, y: 0 },
348 };
349
350 assert_results_eq(results, Some(expected_results));
351 }
352
353 #[tokio::test]
354 async fn test_multiple_queries() {
355 let args = mock_standard_args();
356 let mut app = App::new(args).await;
357
358 let query = "SELECT 1;SELECT 2;";
359 for char in query.chars() {
360 app.editor.input.append_char(char).unwrap();
361 }
362
363 execute_query(&mut app).await.unwrap();
364
365 let results = app.query_results.unwrap();
366
367 let expected_meta = QueryResultsMeta {
368 query: "SELECT 2".to_string(),
369 succeeded: true,
370 error: None,
371 rows: 1,
372 query_duration: 0f64,
373 };
374
375 let expected_results = QueryResults {
376 batches: Vec::new(),
377 pretty_batches: String::new(),
378 meta: expected_meta,
379 scroll: Scroll { x: 0, y: 0 },
380 };
381
382 assert_results_eq(Some(results), Some(expected_results));
383 }
384}