datafusion_dft/tui/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod execution;
19pub mod handlers;
20pub mod state;
21pub mod ui;
22
23use color_eyre::eyre::eyre;
24use color_eyre::Result;
25use crossterm::event as ct;
26use futures::FutureExt;
27use log::{debug, error, info, trace};
28use ratatui::backend::CrosstermBackend;
29use ratatui::crossterm::{
30    self, cursor, event,
31    terminal::{EnterAlternateScreen, LeaveAlternateScreen},
32};
33use ratatui::{prelude::*, style::palette::tailwind, widgets::*};
34use std::sync::Arc;
35use strum::IntoEnumIterator;
36use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
37use tokio::task::JoinHandle;
38use tokio_stream::StreamExt;
39use tokio_util::sync::CancellationToken;
40
41use self::execution::{ExecutionError, ExecutionResultsBatch, TuiExecution};
42use self::handlers::{app_event_handler, crossterm_event_handler};
43use crate::args::DftArgs;
44use crate::execution::sql_utils::clean_sql;
45use crate::execution::AppExecution;
46
47#[derive(Debug)]
48pub enum AppEvent {
49    Key(event::KeyEvent),
50    Error,
51    Quit,
52    FocusLost,
53    FocusGained,
54    Render,
55    Closed,
56    Init,
57    Paste(String),
58    Mouse(event::MouseEvent),
59    Resize(u16, u16),
60    // DDL
61    ExecuteDDL(String),
62    DDLError,
63    DDLSuccess,
64    // Query Execution
65    NewExecution,
66    ExecutionResultsNextBatch(ExecutionResultsBatch),
67    ExecutionResultsPreviousPage,
68    ExecutionResultsError(ExecutionError),
69    // FlightSQL
70    #[cfg(feature = "flightsql")]
71    FlightSQLEstablishConnection,
72    #[cfg(feature = "flightsql")]
73    FlightSQLNewExecution,
74    #[cfg(feature = "flightsql")]
75    FlightSQLExecutionResultsNextBatch(ExecutionResultsBatch),
76    #[cfg(feature = "flightsql")]
77    FlightSQLExecutionResultsNextPage,
78    #[cfg(feature = "flightsql")]
79    FlightSQLExecutionResultsPreviousPage,
80    #[cfg(feature = "flightsql")]
81    FlightSQLExecutionResultsError(ExecutionError),
82    #[cfg(feature = "flightsql")]
83    FlightSQLFailedToConnect,
84    #[cfg(feature = "flightsql")]
85    FlightSQLConnected,
86}
87
88#[allow(dead_code)]
89pub struct App<'app> {
90    state: state::AppState<'app>,
91    execution: Arc<TuiExecution>,
92    event_tx: UnboundedSender<AppEvent>,
93    event_rx: UnboundedReceiver<AppEvent>,
94    cancellation_token: CancellationToken,
95    task: JoinHandle<()>,
96    ddl_task: Option<JoinHandle<()>>,
97    args: DftArgs,
98}
99
100impl<'app> App<'app> {
101    pub fn new(state: state::AppState<'app>, args: DftArgs, execution: AppExecution) -> Self {
102        let (event_tx, event_rx) = mpsc::unbounded_channel();
103        let cancellation_token = CancellationToken::new();
104        let task = tokio::spawn(async {});
105        let app_execution = Arc::new(TuiExecution::new(Arc::new(execution)));
106
107        Self {
108            state,
109            args,
110            task,
111            event_rx,
112            event_tx,
113            cancellation_token,
114            execution: app_execution,
115            ddl_task: None,
116        }
117    }
118
119    pub fn event_tx(&self) -> UnboundedSender<AppEvent> {
120        self.event_tx.clone()
121    }
122
123    pub fn ddl_task(&mut self) -> &mut Option<JoinHandle<()>> {
124        &mut self.ddl_task
125    }
126
127    pub fn event_rx(&mut self) -> &mut UnboundedReceiver<AppEvent> {
128        &mut self.event_rx
129    }
130
131    pub fn execution(&self) -> Arc<TuiExecution> {
132        Arc::clone(&self.execution)
133    }
134
135    pub fn cancellation_token(&self) -> CancellationToken {
136        self.cancellation_token.clone()
137    }
138
139    pub fn set_cancellation_token(&mut self, cancellation_token: CancellationToken) {
140        self.cancellation_token = cancellation_token;
141    }
142
143    pub fn state(&self) -> &state::AppState<'app> {
144        &self.state
145    }
146
147    pub fn state_mut(&mut self) -> &mut state::AppState<'app> {
148        &mut self.state
149    }
150
151    /// Enter app, optionally setup `crossterm` with UI settings such as alternative screen and
152    /// mouse capture, then start event loop.
153    pub fn enter(&mut self, ui: bool) -> Result<()> {
154        if ui {
155            ratatui::crossterm::terminal::enable_raw_mode()?;
156            ratatui::crossterm::execute!(std::io::stdout(), EnterAlternateScreen, cursor::Hide)?;
157            if self.state.config.interaction.mouse {
158                ratatui::crossterm::execute!(std::io::stdout(), event::EnableMouseCapture)?;
159            }
160            if self.state.config.interaction.paste {
161                ratatui::crossterm::execute!(std::io::stdout(), event::EnableBracketedPaste)?;
162            }
163        }
164        self.start_app_event_loop();
165        Ok(())
166    }
167
168    /// Stop event loop. Waits for task to finish for up to 100ms.
169    pub fn stop(&self) -> Result<()> {
170        self.cancel();
171        let mut counter = 0;
172        while !self.task.is_finished() {
173            std::thread::sleep(std::time::Duration::from_millis(1));
174            counter += 1;
175            if counter > 50 {
176                self.task.abort();
177            }
178            if counter > 100 {
179                error!("Failed to abort task in 100 milliseconds for unknown reason");
180                break;
181            }
182        }
183        Ok(())
184    }
185
186    /// Exit app, disabling UI settings such as alternative screen and mouse capture.
187    pub fn exit(&mut self) -> Result<()> {
188        self.stop()?;
189        if crossterm::terminal::is_raw_mode_enabled()? {
190            if self.state.config.interaction.paste {
191                crossterm::execute!(std::io::stdout(), event::DisableBracketedPaste)?;
192            }
193            if self.state.config.interaction.mouse {
194                crossterm::execute!(std::io::stdout(), event::DisableMouseCapture)?;
195            }
196            crossterm::execute!(std::io::stdout(), LeaveAlternateScreen, cursor::Show)?;
197            crossterm::terminal::disable_raw_mode()?;
198        }
199        Ok(())
200    }
201
202    pub fn cancel(&self) {
203        self.cancellation_token.cancel();
204    }
205
206    /// Convert `crossterm::Event` into an application Event. If `None` is returned then the
207    /// crossterm event is not yet supported by application
208    fn handle_crossterm_event(event: event::Event) -> Option<AppEvent> {
209        crossterm_event_handler(event)
210    }
211
212    pub fn send_app_event(app_event: AppEvent, tx: &UnboundedSender<AppEvent>) {
213        // TODO: Can maybe make tx optional, add a self param, and get tx from self
214        let res = tx.send(app_event);
215        match res {
216            Ok(_) => trace!("App event sent"),
217            Err(err) => error!("Error sending app event: {}", err),
218        };
219    }
220
221    /// Start tokio task which runs an event loop responsible for capturing
222    /// terminal events and triggering render events based on user configured rates.
223    fn start_app_event_loop(&mut self) {
224        let render_delay =
225            std::time::Duration::from_secs_f64(1.0 / self.state.config.display.frame_rate);
226        debug!("Render delay: {:?}", render_delay);
227        // TODO-V1: Add this to config
228        self.cancel();
229        self.set_cancellation_token(CancellationToken::new());
230        let _cancellation_token = self.cancellation_token();
231        let _event_tx = self.event_tx();
232
233        self.task = tokio::spawn(async move {
234            let mut reader = ct::EventStream::new();
235            let mut render_interval = tokio::time::interval(render_delay);
236            debug!("Render interval: {:?}", render_interval);
237            _event_tx.send(AppEvent::Init).unwrap();
238            loop {
239                let render_delay = render_interval.tick();
240                let crossterm_event = reader.next().fuse();
241                tokio::select! {
242                  _ = _cancellation_token.cancelled() => {
243                      break;
244                  }
245                  maybe_event = crossterm_event => {
246                      let maybe_app_event = match maybe_event {
247                            Some(Ok(event)) => {
248                                Self::handle_crossterm_event(event)
249                            }
250                            Some(Err(_)) => Some(AppEvent::Error),
251                            None => unimplemented!()
252                      };
253                      if let Some(app_event) = maybe_app_event {
254                          Self::send_app_event(app_event, &_event_tx);
255                      };
256                  },
257                  _ = render_delay => Self::send_app_event(AppEvent::Render, &_event_tx),
258                }
259            }
260        });
261    }
262
263    /// Execute DDL from users DDL file
264    pub fn execute_ddl(&mut self) {
265        let ddl = self.execution.load_ddl().unwrap_or_default();
266        info!("Loaded DDL: {:?}", ddl);
267        if !ddl.is_empty() {
268            self.state.sql_tab.add_ddl_to_editor(ddl.clone());
269        }
270        let _ = self.event_tx().send(AppEvent::ExecuteDDL(clean_sql(ddl)));
271    }
272
273    #[cfg(feature = "flightsql")]
274    pub fn establish_flightsql_connection(&self) {
275        let _ = self.event_tx().send(AppEvent::FlightSQLEstablishConnection);
276    }
277
278    /// Get the next event from event loop
279    pub async fn next(&mut self) -> Result<AppEvent> {
280        self.event_rx()
281            .recv()
282            .await
283            .ok_or(eyre!("Unable to get event"))
284    }
285
286    pub fn handle_app_event(&mut self, event: AppEvent) -> Result<()> {
287        app_event_handler(self, event)
288    }
289
290    fn render_tabs(&self, area: Rect, buf: &mut Buffer) {
291        let titles = ui::SelectedTab::iter().map(|t| ui::SelectedTab::title(t, self));
292        let highlight_style = (Color::default(), tailwind::ORANGE.c500);
293        let selected_tab_index = self.state.tabs.selected as usize;
294        Tabs::new(titles)
295            .highlight_style(highlight_style)
296            .select(selected_tab_index)
297            .padding("", "")
298            .divider(" ")
299            .render(area, buf);
300    }
301
302    pub async fn loop_without_render(&mut self) -> Result<()> {
303        self.enter(false)?;
304        // Main loop for handling events
305        loop {
306            let event = self.next().await?;
307            self.handle_app_event(event)?;
308            if self.state.should_quit {
309                break Ok(());
310            }
311        }
312    }
313}
314
315impl Widget for &App<'_> {
316    /// Note: Ratatui uses Immediate Mode rendering (i.e. the entire UI is redrawn)
317    /// on every frame based on application state. There is no permanent widget object
318    /// in memory.
319    fn render(self, area: Rect, buf: &mut Buffer) {
320        let vertical = Layout::vertical([Constraint::Length(1), Constraint::Min(0)]);
321        let [header_area, inner_area] = vertical.areas(area);
322
323        let horizontal = Layout::horizontal([Constraint::Min(0)]);
324        let [tabs_area] = horizontal.areas(header_area);
325        self.render_tabs(tabs_area, buf);
326        self.state.tabs.selected.render(inner_area, buf, self);
327    }
328}
329
330impl App<'_> {
331    /// Run the main event loop for the application
332    pub async fn run_app(self) -> Result<()> {
333        info!("Running app with state: {:?}", self.state);
334        let mut app = self;
335
336        app.execute_ddl();
337
338        #[cfg(feature = "flightsql")]
339        app.establish_flightsql_connection();
340
341        let mut terminal =
342            ratatui::Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap();
343        app.enter(true)?;
344        // Main loop for handling events
345        loop {
346            let event = app.next().await?;
347
348            if let AppEvent::Render = &event {
349                terminal.draw(|f| f.render_widget(&app, f.area()))?;
350            };
351
352            app.handle_app_event(event)?;
353
354            if app.state.should_quit {
355                break;
356            }
357        }
358        app.exit()
359    }
360}