use std::{
io,
time::{Duration, Instant},
};
use clap::Args;
use crossterm::{
event::{
self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyEventKind,
},
execute,
terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use dora_message::{coordinator_to_cli::NodeInfo, id::NodeId, tarpc};
use eyre::Context;
use crate::common::{connect_and_check_version, rpc};
use ratatui::{
Frame, Terminal,
backend::{Backend, CrosstermBackend},
layout::{Constraint, Layout},
style::{Color, Modifier, Style},
widgets::{Block, Borders, Cell, Row, Table, TableState},
};
use uuid::Uuid;
use crate::LOCALHOST;
use super::super::{Executable, default_tracing};
#[derive(Debug, Args)]
pub struct Top {
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
pub coordinator_addr: std::net::IpAddr,
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
pub coordinator_port: u16,
#[clap(long, value_name = "SECONDS", default_value_t = 2)]
pub refresh_interval: u64,
}
impl Executable for Top {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let refresh_duration = Duration::from_secs(self.refresh_interval);
let res = run_app(
&mut terminal,
self.coordinator_addr,
self.coordinator_port,
refresh_duration,
)
.await;
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
if let Err(err) = res {
eprintln!("Error: {err:?}");
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SortColumn {
Node,
Cpu,
Memory,
}
struct App {
node_stats: Vec<NodeStats>,
table_state: TableState,
sort_column: SortColumn,
sort_ascending: bool,
}
#[derive(Debug, Clone)]
struct NodeStats {
#[allow(dead_code)]
dataflow_id: Uuid,
dataflow_name: String,
node_id: NodeId,
pid: Option<u32>,
cpu_usage: f32,
memory_mb: f64,
disk_read_mb_s: Option<f64>,
disk_write_mb_s: Option<f64>,
}
impl App {
fn new() -> Self {
let mut table_state = TableState::default();
table_state.select(Some(0));
Self {
node_stats: Vec::new(),
table_state,
sort_column: SortColumn::Cpu,
sort_ascending: false,
}
}
fn next(&mut self) {
if self.node_stats.is_empty() {
return;
}
let i = match self.table_state.selected() {
Some(i) => {
if i >= self.node_stats.len() - 1 {
0
} else {
i + 1
}
}
None => 0,
};
self.table_state.select(Some(i));
}
fn previous(&mut self) {
if self.node_stats.is_empty() {
return;
}
let i = match self.table_state.selected() {
Some(i) => {
if i == 0 {
self.node_stats.len() - 1
} else {
i - 1
}
}
None => 0,
};
self.table_state.select(Some(i));
}
fn toggle_sort(&mut self, column: SortColumn) {
if self.sort_column == column {
self.sort_ascending = !self.sort_ascending;
} else {
self.sort_column = column;
self.sort_ascending = false;
}
self.sort();
}
fn sort(&mut self) {
match self.sort_column {
SortColumn::Node => {
self.node_stats.sort_by(|a, b| {
let cmp = a.node_id.as_ref().cmp(b.node_id.as_ref());
if self.sort_ascending {
cmp
} else {
cmp.reverse()
}
});
}
SortColumn::Cpu => {
self.node_stats.sort_by(|a, b| {
let cmp = a
.cpu_usage
.partial_cmp(&b.cpu_usage)
.unwrap_or(std::cmp::Ordering::Equal);
if self.sort_ascending {
cmp
} else {
cmp.reverse()
}
});
}
SortColumn::Memory => {
self.node_stats.sort_by(|a, b| {
let cmp = a
.memory_mb
.partial_cmp(&b.memory_mb)
.unwrap_or(std::cmp::Ordering::Equal);
if self.sort_ascending {
cmp
} else {
cmp.reverse()
}
});
}
}
}
fn update_stats(&mut self, node_infos: Vec<NodeInfo>) {
self.node_stats.clear();
for node_info in node_infos {
let (pid, cpu_usage, memory_mb, disk_read_mb_s, disk_write_mb_s) =
if let Some(metrics) = &node_info.metrics {
(
Some(metrics.pid),
metrics.cpu_usage,
metrics.memory_mb,
metrics.disk_read_mb_s,
metrics.disk_write_mb_s,
)
} else {
(None, 0.0, 0.0, None, None)
};
self.node_stats.push(NodeStats {
dataflow_id: node_info.dataflow_id,
dataflow_name: node_info
.dataflow_name
.unwrap_or_else(|| "<unnamed>".to_string()),
node_id: node_info.node_id,
pid,
cpu_usage,
memory_mb,
disk_read_mb_s,
disk_write_mb_s,
});
}
self.sort();
}
}
async fn run_app<B: Backend>(
terminal: &mut Terminal<B>,
coordinator_addr: std::net::IpAddr,
coordinator_port: u16,
refresh_duration: Duration,
) -> eyre::Result<()> {
let mut app = App::new();
let mut last_update = Instant::now();
let mut node_infos: Vec<NodeInfo> = Vec::new();
let client = connect_and_check_version(coordinator_addr, coordinator_port)
.await
.wrap_err("Failed to connect to coordinator")?;
node_infos = rpc(
"get node info",
client.get_node_info(tarpc::context::current()),
)
.await?;
loop {
terminal.draw(|f| ui(f, &mut app, refresh_duration))?;
let timeout = refresh_duration
.checked_sub(last_update.elapsed())
.unwrap_or(Duration::from_millis(100));
let key_event: Option<KeyEvent> = tokio::task::spawn_blocking(move || {
if event::poll(timeout)? {
if let Event::Key(key) = event::read()? {
return Ok(Some(key));
}
}
Ok::<_, std::io::Error>(None)
})
.await??;
if let Some(key) = key_event {
if key.kind == KeyEventKind::Press {
match key.code {
KeyCode::Char('q') | KeyCode::Esc => {
return Ok(());
}
KeyCode::Down | KeyCode::Char('j') => {
app.next();
}
KeyCode::Up | KeyCode::Char('k') => {
app.previous();
}
KeyCode::Char('n') => {
app.toggle_sort(SortColumn::Node);
}
KeyCode::Char('c') => {
app.toggle_sort(SortColumn::Cpu);
}
KeyCode::Char('m') => {
app.toggle_sort(SortColumn::Memory);
}
KeyCode::Char('r') => {
last_update = Instant::now()
.checked_sub(refresh_duration)
.unwrap_or(Instant::now());
}
_ => {}
}
}
}
if last_update.elapsed() >= refresh_duration {
node_infos = rpc(
"refresh node info",
client.get_node_info(tarpc::context::current()),
)
.await?;
app.update_stats(node_infos.clone());
last_update = Instant::now();
}
}
}
fn ui(f: &mut Frame, app: &mut App, refresh_duration: Duration) {
let chunks = Layout::default()
.constraints([Constraint::Min(0)])
.split(f.area());
let sort_indicator = |col: SortColumn| {
if app.sort_column == col {
if app.sort_ascending { " ▲" } else { " ▼" }
} else {
""
}
};
let header_strings = vec![
format!("NODE{}", sort_indicator(SortColumn::Node)),
"DATAFLOW".to_string(),
"PID".to_string(),
format!("CPU%{}", sort_indicator(SortColumn::Cpu)),
format!("MEMORY (MB){}", sort_indicator(SortColumn::Memory)),
"I/O READ (MB/s)".to_string(),
"I/O WRITE (MB/s)".to_string(),
];
let header_cells = header_strings.iter().map(|h| {
Cell::from(h.as_str()).style(
Style::default()
.fg(Color::Yellow)
.add_modifier(Modifier::BOLD),
)
});
let header = Row::new(header_cells).height(1).bottom_margin(1);
let rows = app.node_stats.iter().map(|stats| {
let cells = vec![
Cell::from(stats.node_id.as_ref()),
Cell::from(stats.dataflow_name.as_str()),
Cell::from(
stats
.pid
.map(|p| p.to_string())
.unwrap_or_else(|| "N/A".to_string()),
),
Cell::from(format!("{:.1}%", stats.cpu_usage)),
Cell::from(format!("{:.1}", stats.memory_mb)),
Cell::from(
stats
.disk_read_mb_s
.map(|v| format!("{:.1}", v))
.unwrap_or_else(|| "N/A".to_string()),
),
Cell::from(
stats
.disk_write_mb_s
.map(|v| format!("{:.1}", v))
.unwrap_or_else(|| "N/A".to_string()),
),
];
Row::new(cells).height(1)
});
let widths = [
Constraint::Percentage(20),
Constraint::Percentage(20),
Constraint::Percentage(8),
Constraint::Percentage(12),
Constraint::Percentage(12),
Constraint::Percentage(14),
Constraint::Percentage(14),
];
let title = format!(
" Dora Inspect Top - Refreshing every {}s (q: quit, n/c/m: sort, r: refresh nodes) ",
refresh_duration.as_secs()
);
let table = Table::new(rows, widths)
.header(header)
.block(Block::default().borders(Borders::ALL).title(title))
.row_highlight_style(
Style::default()
.bg(Color::DarkGray)
.add_modifier(Modifier::BOLD),
)
.highlight_symbol(">> ");
f.render_stateful_widget(table, chunks[0], &mut app.table_state);
}