use std::collections::BTreeMap;
use crossterm::event::{Event, EventStream, KeyCode, KeyModifiers};
use futures::StreamExt;
use ratatui::{backend::Backend, Terminal};
use std::sync::Arc;
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use crate::config::{Config, ConnectionProfile};
use crate::db::{connectors, traits::{KvClient, SqlClient}, types::{DbQueryResult, Value}};
use crate::ui::screens::connection::{ConnectionAction, ConnectionScreen};
use crate::ui::screens::data_grid::{DataGridAction, DataGridScreen, PAGE_SIZE};
use crate::ui::screens::sql_editor::{SqlEditorAction, SqlEditorScreen};
use crate::ui::screens::table_list::{TableListAction, TableListScreen};
pub enum ActiveClient {
Sql(Arc<dyn SqlClient>),
Kv(Arc<dyn KvClient>),
}
pub enum DbEvent {
SqlConnected { client: Arc<dyn SqlClient>, url: String, db_type: String },
KvConnected { client: Arc<dyn KvClient>, url: String, db_type: String },
ConnectionFailed(String),
TablesLoaded(Vec<String>),
TablesLoadFailed(String),
DataLoaded(DbQueryResult), DataPageLoaded(DbQueryResult), DataCountLoaded(u64), DataLoadFailed(String),
QueryRows(DbQueryResult),
QueryExecuted(u64),
QueryFailed(String),
}
#[derive(Debug, Clone, PartialEq)]
pub enum AppState {
Connection,
TableList,
DataGrid,
SqlEditor,
Quit,
}
pub struct App {
pub state: AppState,
pub should_quit: bool,
pub connection_screen: ConnectionScreen,
pub table_list_screen: TableListScreen,
pub data_grid_screen: DataGridScreen,
pub sql_editor_screen: SqlEditorScreen,
pub active_client: Option<ActiveClient>,
db_tx: mpsc::Sender<DbEvent>,
db_rx: mpsc::Receiver<DbEvent>,
}
impl App {
pub fn new() -> Self {
let profiles = Config::load().unwrap_or_default().connections;
let (db_tx, db_rx) = mpsc::channel(8);
Self {
state: AppState::Connection,
should_quit: false,
connection_screen: ConnectionScreen::new(profiles),
table_list_screen: TableListScreen::new(),
data_grid_screen: DataGridScreen::new(String::new()),
sql_editor_screen: SqlEditorScreen::new(String::new()),
active_client: None,
db_tx,
db_rx,
}
}
pub async fn run<B: Backend>(
&mut self,
terminal: &mut Terminal<B>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut events = EventStream::new();
loop {
while let Ok(ev) = self.db_rx.try_recv() {
self.handle_db_event(ev);
}
terminal.draw(|f| crate::ui::layout::draw(f, self))?;
if self.should_quit {
break;
}
if let Ok(Some(Ok(Event::Key(key)))) =
timeout(Duration::from_millis(50), events.next()).await
{
if key.code == KeyCode::Char('c')
&& key.modifiers.contains(KeyModifiers::CONTROL)
{
self.should_quit = true;
} else {
self.handle_key(key);
}
}
}
Ok(())
}
fn handle_key(&mut self, key: crossterm::event::KeyEvent) {
match self.state {
AppState::Connection => {
match self.connection_screen.handle_key(key) {
ConnectionAction::Quit => self.should_quit = true,
ConnectionAction::Connect { url, db_type } => {
self.spawn_connect(url, db_type);
}
ConnectionAction::DeleteProfile { idx, persist } => {
if idx < self.connection_screen.profiles.len() {
let profile = self.connection_screen.profiles.remove(idx);
let len = self.connection_screen.profiles.len();
if len == 0 {
self.connection_screen.list_state.select(None);
} else {
self.connection_screen.list_state.select(Some(idx.min(len - 1)));
}
if persist {
match Config::delete_profile(&profile.url) {
Ok(()) => {
self.connection_screen.status =
Some(format!("Deleted \"{}\"", profile.name));
}
Err(e) => {
self.connection_screen.status =
Some(format!("Error deleting: {e}"));
}
}
}
}
}
ConnectionAction::SaveProfile { name, url, db_type } => {
let profile = ConnectionProfile { name: name.clone(), db_type, url };
match Config::save_profile(profile) {
Ok(()) => {
let profiles = Config::load().unwrap_or_default().connections;
let idx = profiles.iter().position(|p| p.name == name);
self.connection_screen.profiles = profiles;
if let Some(i) = idx {
self.connection_screen.list_state.select(Some(i));
}
self.connection_screen.status =
Some(format!("Saved \"{name}\""));
}
Err(e) => {
self.connection_screen.status =
Some(format!("Error saving: {e}"));
}
}
}
ConnectionAction::None => {}
}
}
AppState::TableList => {
match self.table_list_screen.handle_key(key) {
TableListAction::OpenTable(name) => self.spawn_load_data(name),
TableListAction::OpenEditor => self.open_sql_editor(),
TableListAction::Disconnect => {
self.active_client = None;
self.table_list_screen = TableListScreen::new();
self.connection_screen.reset_input();
self.state = AppState::Connection;
}
TableListAction::None => {}
}
}
AppState::DataGrid => {
match self.data_grid_screen.handle_key(key) {
DataGridAction::Back => self.state = AppState::TableList,
DataGridAction::ApplyFilter => self.spawn_reload_filters(),
DataGridAction::LoadMore => self.spawn_load_more(),
DataGridAction::None => {}
}
}
AppState::SqlEditor => {
match self.sql_editor_screen.handle_key(key) {
SqlEditorAction::Execute(sql) => self.spawn_execute_query(sql),
SqlEditorAction::Back => self.state = AppState::TableList,
SqlEditorAction::None => {}
}
}
_ => {
if key.code == KeyCode::Char('q') {
self.should_quit = true;
}
}
}
}
fn open_sql_editor(&mut self) {
let db_info = self.table_list_screen.db_info.clone();
self.sql_editor_screen = SqlEditorScreen::new(db_info);
self.state = AppState::SqlEditor;
}
fn spawn_connect(&mut self, url: String, db_type: String) {
self.connection_screen.status =
Some(format!("Connecting [{db_type}] {url} …"));
let tx = self.db_tx.clone();
tokio::spawn(async move {
let event = if db_type == "redis" {
match connectors::connect_kv(&db_type, &url).await {
Ok(c) => DbEvent::KvConnected { client: Arc::from(c), url, db_type },
Err(e) => DbEvent::ConnectionFailed(e.to_string()),
}
} else {
match connectors::connect_sql(&db_type, &url).await {
Ok(c) => DbEvent::SqlConnected { client: Arc::from(c), url, db_type },
Err(e) => DbEvent::ConnectionFailed(e.to_string()),
}
};
let _ = tx.send(event).await;
});
}
fn spawn_load_tables(&mut self) {
let tx = self.db_tx.clone();
match &self.active_client {
Some(ActiveClient::Sql(c)) => {
let c = Arc::clone(c);
tokio::spawn(async move {
let ev = match c.get_tables().await {
Ok(t) => DbEvent::TablesLoaded(t),
Err(e) => DbEvent::TablesLoadFailed(e.to_string()),
};
let _ = tx.send(ev).await;
});
}
Some(ActiveClient::Kv(c)) => {
let c = Arc::clone(c);
tokio::spawn(async move {
let ev = match c.keys("*").await {
Ok(k) => DbEvent::TablesLoaded(k),
Err(e) => DbEvent::TablesLoadFailed(e.to_string()),
};
let _ = tx.send(ev).await;
});
}
None => {}
}
}
fn spawn_load_data(&mut self, table_name: String) {
self.data_grid_screen = DataGridScreen::new(table_name.clone());
self.state = AppState::DataGrid;
if let Some(ActiveClient::Sql(c)) = &self.active_client {
let empty = BTreeMap::new();
spawn_sql_page(Arc::clone(c), &table_name, &empty, 0, true, self.db_tx.clone());
spawn_sql_count(Arc::clone(c), &table_name, &empty, self.db_tx.clone());
} else if let Some(ActiveClient::Kv(_)) = &self.active_client {
self.data_grid_screen.set_error(
"Data Grid not available for key-value stores. Use the SQL Editor.".into(),
);
} else {
self.data_grid_screen.set_error("Not connected".into());
}
}
fn spawn_load_more(&mut self) {
if self.data_grid_screen.loading { return; }
self.data_grid_screen.loading = true;
let table = self.data_grid_screen.table_name.clone();
let filters = self.data_grid_screen.filters.clone();
let offset = self.data_grid_screen.loaded_count;
if let Some(ActiveClient::Sql(c)) = &self.active_client {
spawn_sql_page(Arc::clone(c), &table, &filters, offset, false, self.db_tx.clone());
}
}
fn spawn_reload_filters(&mut self) {
let table = self.data_grid_screen.table_name.clone();
let filters = self.data_grid_screen.filters.clone();
self.data_grid_screen.reset_data(); self.data_grid_screen.total_count = None;
if let Some(ActiveClient::Sql(c)) = &self.active_client {
spawn_sql_page(Arc::clone(c), &table, &filters, 0, true, self.db_tx.clone());
spawn_sql_count(Arc::clone(c), &table, &filters, self.db_tx.clone());
} else {
self.data_grid_screen.set_error("Not connected to a SQL database".into());
}
}
fn spawn_execute_query(&mut self, sql: String) {
self.sql_editor_screen.set_running();
let tx = self.db_tx.clone();
match &self.active_client {
Some(ActiveClient::Sql(c)) => {
let c = Arc::clone(c);
tokio::spawn(async move {
let ev = if is_select_query(&sql) {
match c.fetch_all(&sql).await {
Ok(r) => DbEvent::QueryRows(r),
Err(e) => DbEvent::QueryFailed(e.to_string()),
}
} else {
match c.execute(&sql).await {
Ok(n) => DbEvent::QueryExecuted(n),
Err(e) => DbEvent::QueryFailed(e.to_string()),
}
};
let _ = tx.send(ev).await;
});
}
Some(ActiveClient::Kv(_)) => {
self.sql_editor_screen.set_error(
"SQL editor requires a SQL connection (not Redis)".into(),
);
}
None => {
self.sql_editor_screen.set_error("Not connected".into());
}
}
}
fn handle_db_event(&mut self, event: DbEvent) {
match event {
DbEvent::SqlConnected { client, url, db_type } => {
self.active_client = Some(ActiveClient::Sql(client));
self.connection_screen.status = None;
self.table_list_screen = TableListScreen::new();
self.table_list_screen.db_info = format!("[{db_type}] {url}");
self.state = AppState::TableList;
self.spawn_load_tables();
}
DbEvent::KvConnected { client, url, db_type } => {
self.active_client = Some(ActiveClient::Kv(client));
self.connection_screen.status = None;
self.table_list_screen = TableListScreen::new();
self.table_list_screen.db_info = format!("[{db_type}] {url}");
self.state = AppState::TableList;
self.spawn_load_tables();
}
DbEvent::ConnectionFailed(msg) => {
self.connection_screen.status = Some(format!("Error: {msg}"));
}
DbEvent::TablesLoaded(tables) => {
self.table_list_screen.set_tables(tables);
}
DbEvent::TablesLoadFailed(msg) => {
self.table_list_screen.set_error(format!("Error: {msg}"));
}
DbEvent::DataLoaded(result) => {
self.data_grid_screen.set_result(result);
}
DbEvent::DataPageLoaded(result) => {
self.data_grid_screen.append_rows(result);
}
DbEvent::DataCountLoaded(n) => {
self.data_grid_screen.set_total(n);
}
DbEvent::DataLoadFailed(msg) => {
self.data_grid_screen.set_error(msg);
}
DbEvent::QueryRows(result) => {
self.sql_editor_screen.set_rows(result);
}
DbEvent::QueryExecuted(n) => {
self.sql_editor_screen.set_affected(n);
}
DbEvent::QueryFailed(msg) => {
self.sql_editor_screen.set_error(msg);
}
}
}
}
fn is_select_query(sql: &str) -> bool {
let upper = sql.trim_start().to_uppercase();
upper.starts_with("SELECT")
|| upper.starts_with("WITH")
|| upper.starts_with("EXPLAIN")
|| upper.starts_with("SHOW")
|| upper.starts_with("DESCRIBE")
|| upper.starts_with("PRAGMA")
}
fn build_where(filters: &BTreeMap<String, String>) -> String {
if filters.is_empty() { return String::new(); }
let clauses: Vec<String> = filters.iter()
.map(|(col, val)| {
let escaped = val.replace('\'', "''");
format!("\"{}\" LIKE '%{}%'", col, escaped)
})
.collect();
format!(" WHERE {}", clauses.join(" AND "))
}
fn build_data_query(table: &str, filters: &BTreeMap<String, String>, offset: usize) -> String {
let wh = build_where(filters);
format!("SELECT * FROM \"{table}\"{wh} LIMIT {PAGE_SIZE} OFFSET {offset}")
}
fn build_count_query(table: &str, filters: &BTreeMap<String, String>) -> String {
let wh = build_where(filters);
format!("SELECT COUNT(*) AS _count FROM \"{table}\"{wh}")
}
fn parse_count(result: &DbQueryResult) -> u64 {
result.rows.first()
.and_then(|r| r.values.first())
.map(|v| match v {
Value::Int(n) => *n as u64,
Value::Text(s) => s.parse().unwrap_or(0),
_ => 0,
})
.unwrap_or(0)
}
fn spawn_sql_page(
client: Arc<dyn SqlClient>,
table: &str,
filters: &BTreeMap<String, String>,
offset: usize,
initial: bool,
tx: mpsc::Sender<DbEvent>,
) {
let query = build_data_query(table, filters, offset);
tokio::spawn(async move {
let ev = match client.fetch_all(&query).await {
Ok(r) => if initial { DbEvent::DataLoaded(r) } else { DbEvent::DataPageLoaded(r) },
Err(e) => DbEvent::DataLoadFailed(e.to_string()),
};
let _ = tx.send(ev).await;
});
}
fn spawn_sql_count(
client: Arc<dyn SqlClient>,
table: &str,
filters: &BTreeMap<String, String>,
tx: mpsc::Sender<DbEvent>,
) {
let query = build_count_query(table, filters);
tokio::spawn(async move {
if let Ok(r) = client.fetch_all(&query).await {
let _ = tx.send(DbEvent::DataCountLoaded(parse_count(&r))).await;
}
});
}