use crate::context::TaskStorage;
use crate::context::docker_client::DockerClient;
use crate::task::{Task, TaskStatus};
use crate::tui::app::TuiApp;
use crate::tui::events::ServerEvent;
use crate::tui::input::{TuiAction, handle_event};
use crate::tui::ui::render;
use crossterm::event::{DisableMouseCapture, EnableMouseCapture};
use crossterm::terminal::{
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
};
use ratatui::Terminal;
use ratatui::backend::CrosstermBackend;
use std::io::Stdout;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::{Duration, Instant, interval};
struct TerminalGuard {
terminal: Terminal<CrosstermBackend<Stdout>>,
}
impl Drop for TerminalGuard {
fn drop(&mut self) {
let _ = disable_raw_mode();
let _ = crossterm::execute!(
self.terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
);
let _ = self.terminal.show_cursor();
}
}
pub async fn run_tui(
mut event_receiver: UnboundedReceiver<ServerEvent>,
storage: Arc<TaskStorage>,
docker_client: Arc<dyn DockerClient>,
data_dir: PathBuf,
workers_total: usize,
shutdown_notify: Arc<tokio::sync::Notify>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
enable_raw_mode()?;
let mut stdout = std::io::stdout();
crossterm::execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let terminal = Terminal::new(backend)?;
let mut guard = TerminalGuard { terminal };
let mut app = TuiApp::new(workers_total);
refresh_task_list(&mut app, &storage).await;
app.load_logs_for_selected_task(&data_dir);
let mut tick = interval(Duration::from_secs(1));
tick.tick().await;
let (input_tx, mut input_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
loop {
if crossterm::event::poll(std::time::Duration::from_millis(50)).unwrap_or(false)
&& let Ok(event) = crossterm::event::read()
&& input_tx.send(event).is_err()
{
break;
}
}
});
let mut last_log_refresh = Instant::now();
loop {
guard.terminal.draw(|frame| {
render(&mut app, frame);
})?;
if app.should_quit {
shutdown_notify.notify_one();
break;
}
tokio::select! {
Some(event) = input_rx.recv() => {
if let Some(action) = handle_event(&mut app, &event, &data_dir) {
match action {
TuiAction::CancelTask { task_id, was_running, is_interactive } => {
match storage.mark_cancelled(&task_id).await {
Ok(_) => {
app.server_messages.push((chrono::Local::now(), format!("Cancelled {task_id}")));
if was_running {
let container_name = if is_interactive {
format!("tsk-interactive-{task_id}")
} else {
format!("tsk-{task_id}")
};
if let Err(e) = docker_client.kill_container(&container_name).await {
app.server_messages.push((chrono::Local::now(), format!("Warning: could not kill container: {e}")));
}
}
}
Err(e) => {
app.server_messages.push((chrono::Local::now(), format!("Failed to cancel {task_id}: {e}")));
}
}
refresh_task_list(&mut app, &storage).await;
}
TuiAction::DeleteTask { task_id, task_dir } => {
match storage.delete_task(&task_id).await {
Ok(_) => {
if let Some(dir) = task_dir
&& crate::file_system::exists(&dir).await.unwrap_or(false)
&& let Err(e) = crate::file_system::remove_dir(&dir).await
{
app.server_messages.push((chrono::Local::now(), format!("Failed to delete task directory: {e}")));
}
app.server_messages.push((chrono::Local::now(), format!("Deleted {task_id}")));
}
Err(e) => {
app.server_messages.push((chrono::Local::now(), format!("Failed to delete {task_id}: {e}")));
}
}
refresh_task_list(&mut app, &storage).await;
}
}
}
}
event = event_receiver.recv() => {
match event {
Some(server_event) => {
process_server_event(&mut app, &server_event, &storage).await;
}
None => {
break;
}
}
}
_ = tick.tick() => {
refresh_task_list(&mut app, &storage).await;
app.workers_active = app.tasks.iter()
.filter(|t| t.status == crate::task::TaskStatus::Running)
.count();
}
}
if last_log_refresh.elapsed() >= Duration::from_millis(500) {
app.refresh_logs(&data_dir);
last_log_refresh = Instant::now();
}
}
Ok(())
}
async fn refresh_task_list(app: &mut TuiApp, storage: &TaskStorage) {
if let Ok(tasks) = storage.list_tasks().await {
let mut tasks: Vec<_> = tasks.into_iter().rev().collect();
sort_tasks_for_display(&mut tasks);
app.update_tasks(tasks);
}
}
async fn process_server_event(app: &mut TuiApp, event: &ServerEvent, storage: &TaskStorage) {
match event {
ServerEvent::TaskScheduled {
task_id: _,
task_name,
} => {
app.server_messages
.push((chrono::Local::now(), format!("Scheduling {task_name}")));
}
ServerEvent::TaskCompleted {
task_id: _,
task_name,
} => {
app.server_messages
.push((chrono::Local::now(), format!("Task completed: {task_name}")));
}
ServerEvent::TaskFailed {
task_id: _,
task_name,
error,
} => {
app.server_messages.push((
chrono::Local::now(),
format!("Task failed: {task_name} - {error}"),
));
}
ServerEvent::StatusMessage(msg) => {
app.server_messages
.push((chrono::Local::now(), msg.clone()));
}
ServerEvent::WarningMessage(msg) => {
app.server_messages
.push((chrono::Local::now(), format!("âš {msg}")));
}
}
if app.server_messages.len() > 100 {
app.server_messages.drain(..app.server_messages.len() - 100);
}
refresh_task_list(app, storage).await;
}
pub(crate) fn sort_tasks_for_display(tasks: &mut Vec<Task>) {
let is_terminal = |t: &Task| {
matches!(
t.status,
TaskStatus::Complete | TaskStatus::Failed | TaskStatus::Cancelled
)
};
let (non_terminal, terminal): (Vec<_>, Vec<_>) = tasks.drain(..).partition(|t| !is_terminal(t));
tasks.extend(non_terminal);
tasks.extend(terminal);
}
#[cfg(test)]
mod tests {
use super::*;
fn task(id: &str, status: TaskStatus, parent_ids: Vec<&str>) -> Task {
Task {
id: id.to_string(),
status,
parent_ids: parent_ids.into_iter().map(String::from).collect(),
..Task::test_default()
}
}
fn ids(tasks: &[Task]) -> Vec<&str> {
tasks.iter().map(|t| t.id.as_str()).collect()
}
#[test]
fn cancelled_tasks_are_terminal() {
let mut tasks = vec![
task("running", TaskStatus::Running, vec![]),
task("cancelled", TaskStatus::Cancelled, vec![]),
task("queued", TaskStatus::Queued, vec![]),
task("complete", TaskStatus::Complete, vec![]),
];
sort_tasks_for_display(&mut tasks);
assert_eq!(
ids(&tasks),
vec!["running", "queued", "cancelled", "complete"]
);
}
#[test]
fn sort_is_idempotent() {
let mut tasks = vec![
task("running", TaskStatus::Running, vec![]),
task("complete1", TaskStatus::Complete, vec![]),
task("queued", TaskStatus::Queued, vec![]),
task("failed", TaskStatus::Failed, vec![]),
task("cancelled", TaskStatus::Cancelled, vec![]),
];
sort_tasks_for_display(&mut tasks);
let first_pass: Vec<String> = tasks.iter().map(|t| t.id.clone()).collect();
sort_tasks_for_display(&mut tasks);
let second_pass: Vec<String> = tasks.iter().map(|t| t.id.clone()).collect();
assert_eq!(first_pass, second_pass);
}
#[test]
fn parent_child_sorted_same_as_other_tasks() {
let mut tasks = vec![
task("parent-run", TaskStatus::Running, vec![]),
task("child-run", TaskStatus::Running, vec!["parent-run"]),
task("parent-done", TaskStatus::Complete, vec![]),
task("child-done", TaskStatus::Complete, vec!["parent-done"]),
];
sort_tasks_for_display(&mut tasks);
assert_eq!(
ids(&tasks),
vec!["parent-run", "child-run", "parent-done", "child-done"]
);
}
#[test]
fn cancelled_parent_with_cancelled_children() {
let mut tasks = vec![
task("parent", TaskStatus::Cancelled, vec![]),
task("child", TaskStatus::Cancelled, vec!["parent"]),
task("running", TaskStatus::Running, vec![]),
];
sort_tasks_for_display(&mut tasks);
assert_eq!(ids(&tasks), vec!["running", "parent", "child"]);
}
}