use crossterm::event::{KeyCode, KeyEvent};
use ratatui::{
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
widgets::{Block, Borders, Cell, Row, Table, TableState},
Frame,
};
use sqlx::PgPool;
use crate::tui::app::AppAction;
pub struct QueueStats {
pub queue: String,
pub pending: i64,
pub running: i64,
pub failed: i64,
pub done: i64,
}
pub struct FailedJob {
pub id: String,
pub queue: String,
pub job_type: String,
pub attempts: i32,
pub error: String,
pub failed_at: String,
}
pub struct QueueTab {
pub stats: Vec<QueueStats>,
pub failed: Vec<FailedJob>,
pub stats_state: TableState,
pub failed_state: TableState,
pub status: String,
pub focus_failed: bool,
}
impl Default for QueueTab {
fn default() -> Self {
Self {
stats: Vec::new(),
failed: Vec::new(),
stats_state: TableState::default(),
failed_state: TableState::default(),
status: "Loading...".into(),
focus_failed: false,
}
}
}
impl QueueTab {
pub async fn load(&mut self, pool: &PgPool) {
match fetch_queue_data(pool).await {
Ok((stats, failed)) => {
let total_failed: i64 = stats.iter().map(|s| s.failed).sum();
self.status = format!("{} queues {} failed", stats.len(), total_failed);
self.stats = stats;
self.failed = failed;
if self.stats_state.selected().is_none() && !self.stats.is_empty() {
self.stats_state.select(Some(0));
}
if self.failed_state.selected().is_none() && !self.failed.is_empty() {
self.failed_state.select(Some(0));
}
}
Err(e) => {
self.status = format!("Error: {e}");
self.stats.clear();
self.failed.clear();
}
}
}
pub fn render(&mut self, frame: &mut Frame, area: Rect) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(10), Constraint::Min(5)])
.split(area);
self.render_stats(frame, chunks[0]);
self.render_failed(frame, chunks[1]);
}
fn render_stats(&mut self, frame: &mut Frame, area: Rect) {
let focused = !self.focus_failed;
let border_style = if focused {
Style::default().fg(Color::Cyan)
} else {
Style::default()
};
let header = Row::new(vec![
Cell::from("Queue").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Pending").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Running").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Failed").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Done").style(Style::default().add_modifier(Modifier::BOLD)),
])
.height(1);
let rows: Vec<Row> = self
.stats
.iter()
.map(|s| {
Row::new(vec![
Cell::from(s.queue.as_str()),
Cell::from(s.pending.to_string()).style(Style::default().fg(Color::Yellow)),
Cell::from(s.running.to_string()).style(Style::default().fg(Color::Cyan)),
Cell::from(s.failed.to_string()).style(Style::default().fg(if s.failed > 0 {
Color::Red
} else {
Color::DarkGray
})),
Cell::from(s.done.to_string()).style(Style::default().fg(Color::Green)),
])
})
.collect();
let widths = [
Constraint::Min(20),
Constraint::Length(9),
Constraint::Length(9),
Constraint::Length(9),
Constraint::Length(9),
];
let title = format!(" Queue Stats — {} ", self.status);
let table = Table::new(rows, widths)
.header(header)
.block(
Block::default()
.title(title)
.borders(Borders::ALL)
.border_style(border_style),
)
.highlight_style(Style::default().bg(Color::DarkGray))
.highlight_symbol("▶ ");
frame.render_stateful_widget(table, area, &mut self.stats_state);
}
fn render_failed(&mut self, frame: &mut Frame, area: Rect) {
let focused = self.focus_failed;
let border_style = if focused {
Style::default().fg(Color::Red)
} else {
Style::default()
};
let header = Row::new(vec![
Cell::from("ID").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Queue").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Job Type").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Att").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Error").style(Style::default().add_modifier(Modifier::BOLD)),
Cell::from("Failed At").style(Style::default().add_modifier(Modifier::BOLD)),
])
.height(1);
let rows: Vec<Row> = self
.failed
.iter()
.map(|j| {
Row::new(vec![
Cell::from(j.id.as_str()),
Cell::from(j.queue.as_str()),
Cell::from(j.job_type.as_str()),
Cell::from(j.attempts.to_string()),
Cell::from(j.error.as_str()).style(Style::default().fg(Color::Red)),
Cell::from(j.failed_at.as_str()).style(Style::default().fg(Color::DarkGray)),
])
})
.collect();
let widths = [
Constraint::Length(8),
Constraint::Length(12),
Constraint::Min(20),
Constraint::Length(4),
Constraint::Min(30),
Constraint::Length(20),
];
let title = format!(" Failed Jobs ({}) ", self.failed.len());
let table = Table::new(rows, widths)
.header(header)
.block(
Block::default()
.title(title)
.borders(Borders::ALL)
.border_style(border_style),
)
.highlight_style(Style::default().bg(Color::DarkGray))
.highlight_symbol("▶ ");
frame.render_stateful_widget(table, area, &mut self.failed_state);
}
pub fn handle_key(&mut self, key: KeyEvent) -> AppAction {
match key.code {
KeyCode::Tab => {
self.focus_failed = !self.focus_failed;
AppAction::None
}
KeyCode::Up => {
if self.focus_failed {
let i = self.failed_state.selected().unwrap_or(0).saturating_sub(1);
self.failed_state.select(Some(i));
} else {
let i = self.stats_state.selected().unwrap_or(0).saturating_sub(1);
self.stats_state.select(Some(i));
}
AppAction::None
}
KeyCode::Down => {
if self.focus_failed {
let max = self.failed.len().saturating_sub(1);
let i = (self.failed_state.selected().unwrap_or(0) + 1).min(max);
self.failed_state.select(Some(i));
} else {
let max = self.stats.len().saturating_sub(1);
let i = (self.stats_state.selected().unwrap_or(0) + 1).min(max);
self.stats_state.select(Some(i));
}
AppAction::None
}
KeyCode::F(5) | KeyCode::Char('r') | KeyCode::Char('R') => AppAction::Reload,
_ => AppAction::None,
}
}
}
async fn fetch_queue_data(pool: &PgPool) -> anyhow::Result<(Vec<QueueStats>, Vec<FailedJob>)> {
use sqlx::Row as _;
let stats_result = sqlx::query(
"SELECT \
queue, \
COUNT(*) FILTER (WHERE status = 'pending') AS pending, \
COUNT(*) FILTER (WHERE status = 'running') AS running, \
COUNT(*) FILTER (WHERE status = 'failed') AS failed, \
COUNT(*) FILTER (WHERE status = 'done') AS done \
FROM jobs \
GROUP BY queue \
ORDER BY queue",
)
.fetch_all(pool)
.await;
let stats = match stats_result {
Ok(rows) => rows
.into_iter()
.map(|r| QueueStats {
queue: r.try_get("queue").unwrap_or_default(),
pending: r.try_get("pending").unwrap_or(0),
running: r.try_get("running").unwrap_or(0),
failed: r.try_get("failed").unwrap_or(0),
done: r.try_get("done").unwrap_or(0),
})
.collect(),
Err(_) => {
return Ok((
vec![QueueStats {
queue: "(no jobs table — run migrations first)".into(),
pending: 0,
running: 0,
failed: 0,
done: 0,
}],
Vec::new(),
));
}
};
let failed_result = sqlx::query(
"SELECT \
id::text AS id_str, \
queue, \
job_type, \
attempts, \
COALESCE(error, '') AS error, \
failed_at \
FROM jobs \
WHERE status = 'failed' \
ORDER BY failed_at DESC NULLS LAST \
LIMIT 100",
)
.fetch_all(pool)
.await;
let failed = match failed_result {
Ok(rows) => rows
.into_iter()
.map(|r| {
let failed_at_ts: Option<chrono::DateTime<chrono::Utc>> =
r.try_get("failed_at").ok();
FailedJob {
id: r.try_get("id_str").unwrap_or_default(),
queue: r.try_get("queue").unwrap_or_default(),
job_type: r.try_get("job_type").unwrap_or_default(),
attempts: r.try_get("attempts").unwrap_or(0),
error: r.try_get("error").unwrap_or_default(),
failed_at: failed_at_ts
.map(|t| t.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "-".into()),
}
})
.collect(),
Err(_) => Vec::new(),
};
Ok((stats, failed))
}