mod app;
mod panels;
mod replay;
mod theme;
mod widgets;
#[cfg(test)]
mod snapshot_tests;
use anyhow::Result;
use crossterm::{
ExecutableCommand,
event::{EventStream, KeyEvent, KeyEventKind},
terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
};
use futures::StreamExt;
use ratatui::prelude::*;
use std::io::stderr;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::daemon::client::{DaemonClient, DaemonEvent};
use crate::daemon::protocol::Method;
pub enum TuiEvent {
Key(KeyEvent),
Tick,
Render,
DaemonEvent(DaemonEvent),
}
pub struct Tui {
terminal: Terminal<CrosstermBackend<std::io::Stderr>>,
event_tx: mpsc::Sender<TuiEvent>,
event_rx: mpsc::Receiver<TuiEvent>,
cancellation_token: CancellationToken,
task: Option<tokio::task::JoinHandle<()>>,
tick_rate: f64,
frame_rate: f64,
}
impl Tui {
pub fn new() -> Result<Self> {
let terminal = Terminal::new(CrosstermBackend::new(stderr()))?;
let (event_tx, event_rx) = mpsc::channel(256);
let cancellation_token = CancellationToken::new();
Ok(Self {
terminal,
event_tx,
event_rx,
cancellation_token,
task: None,
tick_rate: 2.0,
frame_rate: 30.0,
})
}
pub fn start(&mut self) {
let tick_delay = Duration::from_secs_f64(1.0 / self.tick_rate);
let render_delay = Duration::from_secs_f64(1.0 / self.frame_rate);
let event_tx = self.event_tx.clone();
let cancel = self.cancellation_token.clone();
self.task = Some(tokio::spawn(async move {
let reader = EventStream::new();
tokio::pin!(reader);
let mut tick_interval = tokio::time::interval(tick_delay);
let mut render_interval = tokio::time::interval(render_delay);
loop {
let tick = tick_interval.tick();
let render = render_interval.tick();
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick => {
let _ = event_tx.try_send(TuiEvent::Tick);
}
_ = render => {
let _ = event_tx.try_send(TuiEvent::Render);
}
Some(Ok(event)) = reader.next() => {
if let crossterm::event::Event::Key(key) = event
&& key.kind == KeyEventKind::Press {
let _ = event_tx.try_send(TuiEvent::Key(key));
}
}
}
}
}));
}
pub fn enter(&mut self) -> Result<()> {
enable_raw_mode()?;
stderr().execute(EnterAlternateScreen)?;
self.terminal.clear()?;
self.start();
Ok(())
}
pub fn exit(&mut self) -> Result<()> {
self.cancellation_token.cancel();
if let Some(task) = self.task.take() {
task.abort();
}
disable_raw_mode()?;
stderr().execute(LeaveAlternateScreen)?;
self.terminal.show_cursor()?;
Ok(())
}
pub async fn next(&mut self) -> Option<TuiEvent> {
self.event_rx.recv().await
}
}
impl Drop for Tui {
fn drop(&mut self) {
let _ = self.exit();
}
}
async fn load_initial_data(client: &mut DaemonClient, app: &mut app::App) {
if let Ok(response) = client.send(Method::RunList, serde_json::json!({})).await
&& let Some(result) = DaemonClient::extract_result(&response)
&& let Some(runs_json) = result.get("runs").and_then(|v| v.as_array())
{
let runs: Vec<darq_core::types::Run> = runs_json
.iter()
.filter_map(|v| serde_json::from_value(v.clone()).ok())
.collect();
app.update_from_runs(runs);
}
refresh_stats(client, app).await;
}
async fn refresh_stats(client: &mut DaemonClient, app: &mut app::App) {
if let Ok(response) = client.call(Method::Stats).await
&& let Some(result) = DaemonClient::extract_result(&response)
{
if let Some(u) = result.get("uptime_secs").and_then(|v| v.as_u64()) {
app.daemon_uptime_secs = u;
}
if let Some(b) = result.get("blueprint_count").and_then(|v| v.as_u64()) {
app.daemon_blueprint_count = b;
}
}
}
pub async fn run_replay(path: std::path::PathBuf, speed: f64) -> Result<()> {
let mut tui = Tui::new()?;
tui.enter()?;
let mut app = app::App::new();
app.is_replay = true;
let (daemon_tx, mut daemon_rx) = mpsc::unbounded_channel::<DaemonEvent>();
let tui_tx = tui.event_tx.clone();
let forward_task = tokio::spawn(async move {
while let Some(event) = daemon_rx.recv().await {
let _ = tui_tx.try_send(TuiEvent::DaemonEvent(event));
}
});
let replay_task = tokio::spawn(async move {
if let Err(e) = replay::run(path, speed, daemon_tx).await {
tracing::error!(error = %e, "replay task failed");
}
});
app.event_status = format!("Replay: speed {speed}×");
tui.terminal.draw(|frame| app.render(frame))?;
loop {
if let Some(event) = tui.next().await {
let is_render = matches!(event, TuiEvent::Render);
let action = app.handle_event(event);
if is_render {
tui.terminal.draw(|frame| app.render(frame))?;
}
if let Some(action) = action {
match action {
app::Action::Quit => break,
app::Action::ApproveRun(_) | app::Action::TriggerNext => {}
}
}
}
}
forward_task.abort();
replay_task.abort();
tui.exit()?;
Ok(())
}
pub async fn run(mut client: DaemonClient) -> Result<()> {
let mut tui = Tui::new()?;
tui.enter()?;
let mut app = app::App::new();
load_initial_data(&mut client, &mut app).await;
let mut event_tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
{
let (daemon_tx, mut daemon_rx) = mpsc::unbounded_channel::<DaemonEvent>();
let tui_tx = tui.event_tx.clone();
event_tasks.push(tokio::spawn(async move {
while let Some(event) = daemon_rx.recv().await {
let _ = tui_tx.try_send(TuiEvent::DaemonEvent(event));
}
}));
match DaemonClient::subscribe_events().await {
Ok(mut event_stream) => {
app.event_status = "Events: connected!".into();
event_tasks.push(tokio::spawn(async move {
while let Some(event) = event_stream.recv().await {
if daemon_tx.send(event).is_err() {
break;
}
}
}));
}
Err(e) => {
app.event_status = format!("Events: FAILED: {e}");
}
}
}
tui.terminal.draw(|frame| app.render(frame))?;
loop {
if let Some(event) = tui.next().await {
let is_render = matches!(event, TuiEvent::Render);
let action = app.handle_event(event);
if is_render {
tui.terminal.draw(|frame| app.render(frame))?;
}
if let Some(action) = action {
match action {
app::Action::Quit => break,
app::Action::ApproveRun(id) => {
let params = serde_json::json!({ "id": id });
let _ = client.send(Method::RunApprove, params).await;
}
app::Action::TriggerNext => {}
}
}
}
}
for task in event_tasks {
task.abort();
}
tui.exit()?;
Ok(())
}