datafusion_tui/app/handlers/
mod.rs1pub mod edit;
19pub mod normal;
20pub mod rc;
21
22use arrow::util::pretty::pretty_format_batches;
23use datafusion::error::DataFusionError;
24use datafusion::prelude::DataFrame;
25use log::{debug, error};
26use std::sync::Arc;
27use std::time::Instant;
28
29use crate::app::core::{App, AppReturn, InputMode};
30use crate::app::datafusion::context::{QueryResults, QueryResultsMeta};
31use crate::app::error::{DftError, Result};
32use crate::app::ui::Scroll;
33use crate::events::Key;
34
35pub async fn key_event_handler<'a>(app: &mut App, key: Key) -> Result<AppReturn> {
36 match app.input_mode {
37 InputMode::Normal => normal::normal_mode_handler(app, key).await,
38 InputMode::Editing => edit::edit_mode_handler(app, key).await,
39 InputMode::Rc => rc::rc_mode_handler(app, key).await,
40 }
41}
42
43pub async fn execute_query(app: &mut App) -> Result<AppReturn> {
44 let sql: String = app.editor.input.combine_lines();
45 handle_queries(app, sql).await?;
46 Ok(AppReturn::Continue)
47}
48
49async fn handle_queries(app: &mut App, sql: String) -> Result<()> {
50 let start = Instant::now();
51 let queries = sql.split(';');
52 for query in queries {
53 if !query.is_empty() {
54 let df = app.context.sql(query).await;
55 match df {
56 Ok(df) => handle_successful_query(app, start, query.to_string(), df).await?,
57 Err(err) => {
58 handle_failed_query(app, query.to_string(), err)?;
59 break;
60 }
61 };
62 }
63 }
64 Ok(())
65}
66
67async fn handle_successful_query(
68 app: &mut App,
69 start: Instant,
70 sql: String,
71 df: Arc<DataFrame>,
72) -> Result<()> {
73 debug!("Successfully executed query");
74 let batches = df.collect().await.map_err(DftError::DataFusionError)?;
75 let query_duration = start.elapsed().as_secs_f64();
76 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
77 let query_meta = QueryResultsMeta {
78 query: sql,
79 succeeded: true,
80 error: None,
81 rows,
82 query_duration,
83 };
84 app.editor.history.push(query_meta.clone());
85 let pretty_batches = pretty_format_batches(&batches).unwrap().to_string();
86 app.query_results = Some(QueryResults {
87 batches,
88 pretty_batches,
89 meta: query_meta,
90 scroll: Scroll { x: 0, y: 0 },
91 });
92 Ok(())
93}
94
95fn handle_failed_query(app: &mut App, sql: String, error: DataFusionError) -> Result<()> {
96 error!("{}", error);
97 let err_msg = format!("{}", error);
98 app.query_results = None;
99 let query_meta = QueryResultsMeta {
100 query: sql,
101 succeeded: false,
102 error: Some(err_msg),
103 rows: 0,
104 query_duration: 0.0,
105 };
106 app.editor.history.push(query_meta);
107 Ok(())
108}