datafusion_tui/app/handlers/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod edit;
19pub mod normal;
20pub mod rc;
21
22use arrow::util::pretty::pretty_format_batches;
23use datafusion::error::DataFusionError;
24use datafusion::prelude::DataFrame;
25use log::{debug, error};
26use std::sync::Arc;
27use std::time::Instant;
28
29use crate::app::core::{App, AppReturn, InputMode};
30use crate::app::datafusion::context::{QueryResults, QueryResultsMeta};
31use crate::app::error::{DftError, Result};
32use crate::app::ui::Scroll;
33use crate::events::Key;
34
35pub async fn key_event_handler<'a>(app: &mut App, key: Key) -> Result<AppReturn> {
36    match app.input_mode {
37        InputMode::Normal => normal::normal_mode_handler(app, key).await,
38        InputMode::Editing => edit::edit_mode_handler(app, key).await,
39        InputMode::Rc => rc::rc_mode_handler(app, key).await,
40    }
41}
42
43pub async fn execute_query(app: &mut App) -> Result<AppReturn> {
44    let sql: String = app.editor.input.combine_lines();
45    handle_queries(app, sql).await?;
46    Ok(AppReturn::Continue)
47}
48
49async fn handle_queries(app: &mut App, sql: String) -> Result<()> {
50    let start = Instant::now();
51    let queries = sql.split(';');
52    for query in queries {
53        if !query.is_empty() {
54            let df = app.context.sql(query).await;
55            match df {
56                Ok(df) => handle_successful_query(app, start, query.to_string(), df).await?,
57                Err(err) => {
58                    handle_failed_query(app, query.to_string(), err)?;
59                    break;
60                }
61            };
62        }
63    }
64    Ok(())
65}
66
67async fn handle_successful_query(
68    app: &mut App,
69    start: Instant,
70    sql: String,
71    df: Arc<DataFrame>,
72) -> Result<()> {
73    debug!("Successfully executed query");
74    let batches = df.collect().await.map_err(DftError::DataFusionError)?;
75    let query_duration = start.elapsed().as_secs_f64();
76    let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
77    let query_meta = QueryResultsMeta {
78        query: sql,
79        succeeded: true,
80        error: None,
81        rows,
82        query_duration,
83    };
84    app.editor.history.push(query_meta.clone());
85    let pretty_batches = pretty_format_batches(&batches).unwrap().to_string();
86    app.query_results = Some(QueryResults {
87        batches,
88        pretty_batches,
89        meta: query_meta,
90        scroll: Scroll { x: 0, y: 0 },
91    });
92    Ok(())
93}
94
95fn handle_failed_query(app: &mut App, sql: String, error: DataFusionError) -> Result<()> {
96    error!("{}", error);
97    let err_msg = format!("{}", error);
98    app.query_results = None;
99    let query_meta = QueryResultsMeta {
100        query: sql,
101        succeeded: false,
102        error: Some(err_msg),
103        rows: 0,
104        query_duration: 0.0,
105    };
106    app.editor.history.push(query_meta);
107    Ok(())
108}