use std::{
collections::{BTreeMap, HashMap, HashSet},
fs::File,
io::{self, Read},
str::FromStr,
time::{Duration, Instant},
};
use crate::tui::{
rpc_palette::{PaletteEvent, RpcPalette, RpcPaletteStatus, RpcReq},
rpc_state::RouteRpcState,
rpc_worker::{spawn_rpc_worker, RpcWorkerReq, RpcWorkerResp},
tree_worker::spawn_tree_worker,
};
use crate::{MonitorCli, ProxyHelp, TioOpts};
use crossbeam::channel::{self, Sender};
use ratatui::{
crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
symbols,
text::{Line, Span},
widgets::{Axis, Block, Borders, Chart, Dataset, GraphType, Paragraph},
DefaultTerminal, Frame,
};
use toml_edit::{DocumentMut, InlineTable, Value};
use twinleaf::{
data::{
AlignedWindow, Buffer, ColumnBatch, ColumnData, ColumnKey, DeviceFullMetadata, Sample,
StreamKey,
},
device::{DeviceEvent, DeviceRoute, DeviceTree, RpcClient, RpcList, TreeEvent, TreeItem},
tio::{self, proto::ProxyStatus},
};
use welch_sde::{Build, SpectralDensity};
pub fn run_monitor(config: MonitorConfig) -> eyre::Result<()> {
run_monitor_app(config)
}
#[derive(Debug, Clone)]
pub struct MonitorConfig {
pub tio: TioOpts,
pub fps: u32,
pub colors: Option<String>,
pub depth: Option<usize>,
}
impl From<MonitorCli> for MonitorConfig {
fn from(cli: MonitorCli) -> Self {
Self {
tio: cli.tio,
fps: cli.fps,
colors: cli.colors,
depth: cli.depth,
}
}
}
const MIN_PLOT_WINDOW_SECONDS: f64 = 0.5;
const MAX_PLOT_WINDOW_SECONDS: f64 = 60.0;
const PLOT_WINDOW_FINE_STEP_SECONDS: f64 = 0.5;
const PLOT_WINDOW_COARSE_STEP_SECONDS: f64 = 5.0;
const MIN_FFT_SAMPLES: usize = 60;
const MONITOR_BUFFER_CAPACITY_SAMPLES: usize = 2_000_000;
const WELCH_DEFAULT_SEGMENTS: usize = 4;
const WELCH_DEFAULT_OVERLAP: f64 = 0.5;
const WELCH_DFT_MAX_SIZE: usize = 4096;
#[derive(Debug, Clone)]
pub enum NavPos {
EmptyDevice {
device_idx: usize,
route: DeviceRoute,
},
Column {
device_idx: usize,
stream_idx: usize,
spec: ColumnKey,
},
}
impl NavPos {
pub fn device_idx(&self) -> usize {
match self {
NavPos::EmptyDevice { device_idx, .. } => *device_idx,
NavPos::Column { device_idx, .. } => *device_idx,
}
}
pub fn route(&self) -> &DeviceRoute {
match self {
NavPos::EmptyDevice { route, .. } => route,
NavPos::Column { spec, .. } => &spec.route,
}
}
pub fn stream_idx(&self) -> Option<usize> {
match self {
NavPos::EmptyDevice { .. } => None,
NavPos::Column { stream_idx, .. } => Some(*stream_idx),
}
}
pub fn column_idx(&self) -> Option<usize> {
match self {
NavPos::EmptyDevice { .. } => None,
NavPos::Column { spec, .. } => Some(spec.column_id),
}
}
pub fn spec(&self) -> Option<&ColumnKey> {
match self {
NavPos::EmptyDevice { .. } => None,
NavPos::Column { spec, .. } => Some(spec),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Nav {
pub idx: usize,
}
impl Nav {
pub fn step_linear(&mut self, items: &[NavPos], backward: bool) {
if items.is_empty() {
return;
}
let len = items.len();
self.idx = if backward {
(self.idx + len - 1) % len
} else {
(self.idx + 1) % len
};
}
pub fn step_between_streams(&mut self, items: &[NavPos], backward: bool) {
if items.is_empty() {
return;
}
let cur = &items[self.idx];
let (cur_dev, cur_stream, cur_column) = match cur {
NavPos::EmptyDevice { .. } => return,
NavPos::Column {
device_idx,
stream_idx,
spec,
} => (*device_idx, *stream_idx, spec.column_id),
};
let mut streams: Vec<usize> = items
.iter()
.filter_map(|pos| match pos {
NavPos::Column {
device_idx,
stream_idx,
..
} if *device_idx == cur_dev => Some(*stream_idx),
_ => None,
})
.collect();
streams.dedup();
if streams.len() <= 1 {
return;
}
let pos = streams.iter().position(|&s| s == cur_stream).unwrap_or(0);
let len = streams.len();
let target_stream = streams[if backward {
(pos + len - 1) % len
} else {
(pos + 1) % len
}];
self.idx = items
.iter()
.enumerate()
.filter(|(_, pos)| {
matches!(pos,
NavPos::Column { device_idx, stream_idx, .. }
if *device_idx == cur_dev && *stream_idx == target_stream)
})
.min_by_key(|(_, pos)| {
(pos.column_idx().unwrap_or(0) as isize - cur_column as isize).abs()
})
.map(|(i, _)| i)
.unwrap_or(self.idx);
}
pub fn step_device(&mut self, items: &[NavPos], backward: bool) {
if items.is_empty() {
return;
}
let cur = &items[self.idx];
let cur_device = cur.device_idx();
let cur_stream = cur.stream_idx().unwrap_or(0);
let cur_column = cur.column_idx().unwrap_or(0);
let mut device_indices: Vec<usize> = items.iter().map(|p| p.device_idx()).collect();
device_indices.sort();
device_indices.dedup();
if device_indices.len() <= 1 {
return;
}
let dev_pos = device_indices
.iter()
.position(|&d| d == cur_device)
.unwrap_or(0);
let len = device_indices.len();
let new_dev_pos = if backward {
(dev_pos + len - 1) % len
} else {
(dev_pos + 1) % len
};
let target_device = device_indices[new_dev_pos];
self.idx = items
.iter()
.enumerate()
.filter(|(_, pos)| pos.device_idx() == target_device)
.map(|(i, pos)| {
let dist = match pos {
NavPos::EmptyDevice { .. } => (0, 0),
NavPos::Column {
stream_idx, spec, ..
} => {
let s = (*stream_idx as isize - cur_stream as isize).abs();
let c = (spec.column_id as isize - cur_column as isize).abs();
(s, c)
}
};
(i, dist)
})
.min_by_key(|&(_, dist)| dist)
.map(|(i, _)| i)
.unwrap_or(self.idx);
}
pub fn home(&mut self, items: &[NavPos]) {
if !items.is_empty() {
self.idx = 0;
}
}
pub fn end(&mut self, items: &[NavPos]) {
if !items.is_empty() {
self.idx = items.len() - 1;
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Theme {
pub value_bounds: HashMap<String, (std::ops::RangeInclusive<f64>, bool)>,
}
impl Theme {
pub fn get_value_color(&self, stream: &str, col: &str, val: f64) -> Option<Color> {
if val.is_nan() {
return Some(Color::Yellow);
}
let key = format!("{}.{}", stream, col);
if let Some((range, is_temp)) = self.value_bounds.get(&key) {
if val < *range.start() {
Some(if *is_temp { Color::Blue } else { Color::Red })
} else if val > *range.end() {
Some(Color::Red)
} else {
Some(Color::Green)
}
} else {
None
}
}
}
pub struct StyleContext {
pub is_selected: bool,
pub is_stale: bool,
pub in_plot_mode: bool,
pub base_color: Color,
}
impl Default for StyleContext {
fn default() -> Self {
Self {
is_selected: false,
is_stale: false,
in_plot_mode: false,
base_color: Color::Reset,
}
}
}
impl StyleContext {
pub fn new() -> Self {
Self::default()
}
pub fn selected(mut self, yes: bool) -> Self {
self.is_selected = yes;
self
}
pub fn stale(mut self, yes: bool) -> Self {
self.is_stale = yes;
self
}
pub fn plot_mode(mut self, yes: bool) -> Self {
self.in_plot_mode = yes;
self
}
pub fn color(mut self, c: Color) -> Self {
self.base_color = c;
self
}
pub fn resolve(&self) -> Style {
let mut s = Style::default().fg(self.base_color);
if self.is_stale {
s = s.add_modifier(Modifier::DIM);
}
if self.is_selected {
s = s.add_modifier(Modifier::BOLD);
if !self.in_plot_mode {
s = s.add_modifier(Modifier::RAPID_BLINK);
}
}
s
}
}
#[derive(Debug, Clone, Default)]
pub struct DeviceStatus {
pub last_heartbeat: Option<Instant>,
pub connected: bool,
}
impl DeviceStatus {
pub fn on_heartbeat(&mut self) {
self.last_heartbeat = Some(Instant::now());
self.connected = true;
}
pub fn is_alive(&self, timeout: Duration) -> bool {
self.last_heartbeat
.map(|t| t.elapsed() < timeout)
.unwrap_or(false)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Mode {
Normal,
Command,
}
#[derive(Debug, Clone)]
pub enum Action {
Quit,
SetMode(Mode),
ExecuteRpc(RpcReq),
SelectRoute(DeviceRoute),
NavUp,
NavDown,
NavLeft,
NavRight,
NavTabNext,
NavTabPrev,
NavScroll(i16),
NavHome,
NavEnd,
TogglePlot,
ClosePlot,
ToggleFft,
ToggleFooter,
ToggleRoutes,
AdjustWindow(f64),
AdjustPlotWidth(i16),
AdjustPrecision(i8),
}
#[derive(Debug, Clone)]
pub struct ViewConfig {
pub show_plot: bool,
pub show_footer: bool,
pub show_routes: bool,
pub show_fft: bool,
pub plot_window_seconds: f64,
pub plot_width_percent: u16,
pub axis_precision: usize,
pub follow_selection: bool,
pub scroll: u16,
pub desc_width: usize,
pub units_width: usize,
pub theme: Theme,
}
impl Default for ViewConfig {
fn default() -> Self {
Self {
show_plot: false,
show_footer: true,
show_routes: false,
show_fft: false,
plot_window_seconds: 5.0,
plot_width_percent: 70,
axis_precision: 3,
follow_selection: true,
scroll: 0,
desc_width: 0,
units_width: 0,
theme: Theme::default(),
}
}
}
pub struct FftReadyData {
pub points: Vec<(f64, f64)>,
pub median_asd: f64,
pub sample_count: usize,
pub total_sample_count: usize,
pub sampling_hz: f64,
pub segment_size: usize,
pub hop_size: usize,
}
pub enum FftStatus {
Ready(FftReadyData),
WaitingForSelection,
WaitingForSamples,
InvalidSampleRate {
sampling_rate: u32,
decimation: u32,
},
TooFewSamples {
have: usize,
need: usize,
sampling_hz: f64,
window_seconds: f64,
},
NoValidFrequencyBins {
sample_count: usize,
sampling_hz: f64,
},
}
pub struct MonitorState {
pub depth_limit: Option<usize>,
pub parent_route: DeviceRoute,
pub mode: Mode,
pub view: ViewConfig,
pub nav: Nav,
pub nav_items: Vec<NavPos>,
pub discovered_routes: HashSet<DeviceRoute>,
pub device_status: HashMap<DeviceRoute, DeviceStatus>,
pub last: BTreeMap<StreamKey, (Sample, Instant)>,
pub device_metadata: HashMap<DeviceRoute, DeviceFullMetadata>,
pub window_aligned: Option<AlignedWindow>,
pub footer_height: u16,
pub rpc_routes: HashMap<DeviceRoute, RouteRpcState>,
pub palette: RpcPalette,
pub blink_state: bool,
pub last_blink: Instant,
}
impl MonitorState {
pub fn new(depth_limit: Option<usize>, parent_route: &DeviceRoute) -> Self {
Self {
depth_limit,
parent_route: parent_route.clone(),
mode: Mode::Normal,
view: ViewConfig::default(),
nav: Nav::default(),
nav_items: Vec::new(),
discovered_routes: HashSet::new(),
device_status: HashMap::new(),
last: BTreeMap::new(),
device_metadata: HashMap::new(),
window_aligned: None,
footer_height: 0,
rpc_routes: HashMap::new(),
palette: RpcPalette::default(),
blink_state: true,
last_blink: Instant::now(),
}
}
fn rpc_palette_status(&self, route: &DeviceRoute) -> RpcPaletteStatus {
self.rpc_routes
.get(route)
.map(RouteRpcState::palette_status)
.unwrap_or(RpcPaletteStatus::WaitingForRpc)
}
fn update_palette_suggestions_for(&mut self, route: &DeviceRoute) {
if self.mode == Mode::Command && self.current_route() == *route {
let registry = self.rpc_routes.get(route).and_then(RouteRpcState::registry);
self.palette.update_suggestions(registry);
}
}
fn update(&mut self, action: Action, rpc_tx: &Sender<RpcWorkerReq>) -> bool {
match action {
Action::Quit => return true,
Action::SetMode(Mode::Command) => {
let route = self.current_route();
let registry = self
.rpc_routes
.get(&route)
.and_then(RouteRpcState::registry);
self.palette.enter(registry);
self.mode = Mode::Command;
}
Action::SetMode(Mode::Normal) => {
self.mode = Mode::Normal;
self.palette.exit();
}
Action::ExecuteRpc(req) => {
let _ = rpc_tx.send(RpcWorkerReq::Execute(req));
}
Action::SelectRoute(route) => {
self.view.follow_selection = true;
if let Some(idx) = self.nav_items.iter().position(|p| p.route() == &route) {
self.nav.idx = idx;
}
let registry = self
.rpc_routes
.get(&route)
.and_then(RouteRpcState::registry);
self.palette.update_suggestions(registry);
}
Action::NavUp => {
self.view.follow_selection = true;
self.nav.step_linear(&self.nav_items, true);
}
Action::NavDown => {
self.view.follow_selection = true;
self.nav.step_linear(&self.nav_items, false);
}
Action::NavLeft => {
self.view.follow_selection = true;
self.nav.step_between_streams(&self.nav_items, true);
}
Action::NavRight => {
self.view.follow_selection = true;
self.nav.step_between_streams(&self.nav_items, false);
}
Action::NavTabNext => {
self.view.follow_selection = true;
self.nav.step_device(&self.nav_items, false);
}
Action::NavTabPrev => {
self.view.follow_selection = true;
self.nav.step_device(&self.nav_items, true);
}
Action::NavScroll(delta) => {
self.view.follow_selection = false;
self.view.scroll = if delta < 0 {
self.view.scroll.saturating_sub(delta.abs() as u16)
} else {
self.view.scroll.saturating_add(delta as u16)
};
}
Action::NavHome => {
self.view.follow_selection = true;
self.nav.home(&self.nav_items);
}
Action::NavEnd => {
self.view.follow_selection = true;
self.nav.end(&self.nav_items);
}
Action::TogglePlot => {
if self.current_selection().is_some() {
self.view.show_plot = !self.view.show_plot;
}
}
Action::ClosePlot => {
self.view.show_plot = false;
}
Action::ToggleFft => {
if self.view.show_plot {
self.view.show_fft = !self.view.show_fft;
}
}
Action::ToggleFooter => self.view.show_footer = !self.view.show_footer,
Action::ToggleRoutes => self.view.show_routes = !self.view.show_routes,
Action::AdjustWindow(d) => {
self.view.plot_window_seconds = (self.view.plot_window_seconds + d)
.clamp(MIN_PLOT_WINDOW_SECONDS, MAX_PLOT_WINDOW_SECONDS)
}
Action::AdjustPlotWidth(d) => {
self.view.plot_width_percent =
(self.view.plot_width_percent as i16 + d).clamp(20, 90) as u16
}
Action::AdjustPrecision(delta) => {
let new_p = self.view.axis_precision as i16 + delta as i16;
self.view.axis_precision = new_p.clamp(0, 5) as usize;
}
}
false
}
fn update_rpclists(&mut self, list: RpcList) {
let route = list.route.clone();
self.rpc_routes
.entry(route.clone())
.or_default()
.on_fetch_success(&list);
self.update_palette_suggestions_for(&route);
}
fn update_rpclist_error(&mut self, route: DeviceRoute, error: String) {
self.rpc_routes
.entry(route.clone())
.or_default()
.on_fetch_error(error);
self.update_palette_suggestions_for(&route);
}
pub fn visible_routes(&self) -> Vec<DeviceRoute> {
let mut routes: Vec<_> = self
.discovered_routes
.iter()
.filter(|r| match self.parent_route.relative_route(r) {
Ok(rel) => self.depth_limit.map_or(true, |max| rel.len() <= max),
Err(_) => false,
})
.cloned()
.collect();
routes.sort();
routes
}
pub fn rebuild_nav_items(&mut self) {
let prev_selection = self.nav_items.get(self.nav.idx).cloned();
let routes = self.visible_routes();
let mut new_items = Vec::new();
for (dev_idx, route) in routes.iter().enumerate() {
let mut stream_ids: Vec<_> = self
.last
.keys()
.filter(|k| &k.route == route)
.map(|k| k.stream_id)
.collect();
stream_ids.sort();
stream_ids.dedup();
if stream_ids.is_empty() {
new_items.push(NavPos::EmptyDevice {
device_idx: dev_idx,
route: route.clone(),
});
} else {
for (stream_idx, sid) in stream_ids.iter().enumerate() {
let key = StreamKey::new(route.clone(), *sid);
if let Some((sample, _)) = self.last.get(&key) {
for (column_idx, _) in sample.columns.iter().enumerate() {
new_items.push(NavPos::Column {
device_idx: dev_idx,
stream_idx,
spec: ColumnKey {
route: route.clone(),
stream_id: *sid,
column_id: column_idx,
},
});
}
}
}
}
}
self.nav_items = new_items;
if self.nav_items.is_empty() {
self.nav.idx = 0;
return;
}
self.nav.idx = prev_selection
.and_then(|prev| {
let prev_spec = prev.spec().cloned();
let prev_route = prev.route().clone();
self.nav_items
.iter()
.position(|pos| match (&prev_spec, pos.spec()) {
(Some(a), Some(b)) => a == b,
(None, _) => pos.route() == &prev_route,
_ => false,
})
})
.unwrap_or_else(|| self.nav.idx.min(self.nav_items.len() - 1));
}
pub fn current_pos(&self) -> Option<&NavPos> {
self.nav_items.get(self.nav.idx)
}
pub fn current_selection(&self) -> Option<ColumnKey> {
self.current_pos().and_then(|p| p.spec().cloned())
}
pub fn current_route(&self) -> DeviceRoute {
self.current_pos()
.map(|p| p.route().clone())
.unwrap_or_else(|| self.parent_route.clone())
}
pub fn current_device_index(&self) -> usize {
self.current_pos().map(|p| p.device_idx()).unwrap_or(0)
}
pub fn device_count(&self) -> usize {
self.visible_routes().len()
}
fn handle_event(&mut self, event: TreeEvent, rpc_tx: &Sender<RpcWorkerReq>) {
match event {
TreeEvent::RouteDiscovered(route) => {
self.discovered_routes.insert(route.clone());
if self
.rpc_routes
.entry(route.clone())
.or_default()
.on_route_discovered()
{
let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
}
self.device_status.entry(route).or_default();
}
TreeEvent::Device {
route,
event: DeviceEvent::NewHash(hash),
} => {
if self
.rpc_routes
.entry(route.clone())
.or_default()
.on_new_hash(hash)
{
let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
}
}
TreeEvent::Device {
route,
event: DeviceEvent::Heartbeat { session_id },
} => {
if self
.rpc_routes
.entry(route.clone())
.or_default()
.on_heartbeat(session_id)
{
let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
}
self.device_status.entry(route).or_default().on_heartbeat();
}
TreeEvent::Device {
route,
event: DeviceEvent::Status(status),
} => {
if self
.rpc_routes
.entry(route.clone())
.or_default()
.on_status(status)
{
let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
}
let dev_status = self.device_status.entry(route.clone()).or_default();
match status {
ProxyStatus::SensorDisconnected => dev_status.connected = false,
ProxyStatus::SensorReconnected => dev_status.connected = true,
_ => {}
}
}
TreeEvent::Device {
route,
event: DeviceEvent::MetadataReady(metadata),
} => {
self.device_metadata.insert(route, metadata);
}
TreeEvent::Device {
event: DeviceEvent::RpcInvalidated(_),
..
} => {}
}
}
pub fn handle_sample(&mut self, sample: Sample, route: DeviceRoute, buffer: &mut Buffer) {
let stream_key = StreamKey::new(route.clone(), sample.stream.stream_id);
buffer.process_sample(sample.clone(), stream_key.clone());
self.last.insert(stream_key, (sample, Instant::now()));
}
pub fn update_plot_window(&mut self, buffer: &Buffer) {
if !self.view.show_plot {
self.window_aligned = None;
return;
}
self.window_aligned = self.current_selection().and_then(|col| {
let stream_key = col.stream_key();
let run = buffer.get_run(&stream_key)?;
let n_samples = (self.view.plot_window_seconds * run.effective_rate)
.ceil()
.max(10.0) as usize;
buffer.read_aligned_window(&[col], n_samples).ok()
});
}
pub fn get_plot_data(&self) -> Option<(Vec<(f64, f64)>, f64, f64)> {
let spec = self.current_selection()?;
let win = self.window_aligned.as_ref()?;
let batch = win.columns.get(&spec)?;
if win.timestamps.is_empty() {
return None;
}
let data: Vec<(f64, f64)> = match batch {
ColumnBatch::F64(v) => win
.timestamps
.iter()
.copied()
.zip(v.iter().copied())
.collect(),
ColumnBatch::I64(v) => win
.timestamps
.iter()
.copied()
.zip(v.iter().map(|&x| x as f64))
.collect(),
ColumnBatch::U64(v) => win
.timestamps
.iter()
.copied()
.zip(v.iter().map(|&x| x as f64))
.collect(),
};
if data.is_empty() {
return None;
}
let (cur_t, cur_v) = *data.last().unwrap();
Some((data, cur_v, cur_t))
}
pub fn get_spectral_density_data(&self) -> FftStatus {
let Some(spec) = self.current_selection() else {
return FftStatus::WaitingForSelection;
};
let Some(win) = self.window_aligned.as_ref() else {
return FftStatus::WaitingForSamples;
};
let stream_key = spec.stream_key();
let Some(md) = win.segment_metadata.get(&stream_key) else {
return FftStatus::WaitingForSamples;
};
if md.sampling_rate == 0 || md.decimation == 0 {
return FftStatus::InvalidSampleRate {
sampling_rate: md.sampling_rate,
decimation: md.decimation,
};
}
let sampling_hz = md.sampling_rate as f64 / md.decimation as f64;
if !sampling_hz.is_finite() || sampling_hz <= 0.0 {
return FftStatus::InvalidSampleRate {
sampling_rate: md.sampling_rate,
decimation: md.decimation,
};
}
let Some(batch) = win.columns.get(&spec) else {
return FftStatus::WaitingForSamples;
};
let signal: Vec<f64> = match batch {
ColumnBatch::F64(v) => v.clone(),
ColumnBatch::I64(v) => v.iter().map(|&x| x as f64).collect(),
ColumnBatch::U64(v) => v.iter().map(|&x| x as f64).collect(),
};
if signal.len() < MIN_FFT_SAMPLES {
return FftStatus::TooFewSamples {
have: signal.len(),
need: MIN_FFT_SAMPLES,
sampling_hz,
window_seconds: self.view.plot_window_seconds,
};
}
let (fft_signal, segment_size, hop_size) = latest_complete_welch_signal(&signal);
let mean_val = fft_signal.iter().sum::<f64>() / fft_signal.len() as f64;
let detrended: Vec<f64> = fft_signal.iter().map(|x| x - mean_val).collect();
let welch: SpectralDensity<f64> = SpectralDensity::builder(&detrended, sampling_hz).build();
let sd = welch.periodogram();
let pts: Vec<(f64, f64)> = sd
.frequency()
.into_iter()
.zip(sd.iter().copied())
.filter_map(|(f, d)| {
if f > 0.0 && d.is_finite() && d > 0.0 {
Some((f, d.sqrt()))
} else {
None
}
})
.collect();
if pts.is_empty() {
return FftStatus::NoValidFrequencyBins {
sample_count: fft_signal.len(),
sampling_hz,
};
}
let mut asd_values: Vec<f64> = pts.iter().map(|(_, d)| *d).collect();
asd_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let median_asd = if asd_values.len() % 2 == 0 {
(asd_values[asd_values.len() / 2 - 1] + asd_values[asd_values.len() / 2]) / 2.0
} else {
asd_values[asd_values.len() / 2]
};
FftStatus::Ready(FftReadyData {
points: pts,
median_asd,
sample_count: fft_signal.len(),
total_sample_count: signal.len(),
sampling_hz,
segment_size,
hop_size,
})
}
pub fn get_focused_channel_info(&self) -> Option<(String, String)> {
let spec = self.current_selection()?;
let win = self.window_aligned.as_ref()?;
let meta = win.column_metadata.get(&spec)?;
Some((meta.description.clone(), meta.units.clone()))
}
pub fn tick_blink(&mut self) {
if self.last_blink.elapsed() >= Duration::from_millis(500) {
self.blink_state = !self.blink_state;
self.last_blink = Instant::now();
}
}
}
fn get_action(ev: Event, app: &mut MonitorState) -> Option<Action> {
if let Event::Key(k) = ev {
if k.kind != KeyEventKind::Press {
return None;
}
match app.mode {
Mode::Command => {
let route = app.current_route();
let registry = app.rpc_routes.get(&route).and_then(RouteRpcState::registry);
let routes = app.visible_routes();
match app
.palette
.handle_key(k, registry, &route, &routes, app.footer_height)
{
PaletteEvent::Submit(req) => Some(Action::ExecuteRpc(req)),
PaletteEvent::SelectRoute(r) => Some(Action::SelectRoute(r)),
PaletteEvent::Exit => Some(Action::SetMode(Mode::Normal)),
PaletteEvent::Consumed => None,
}
}
Mode::Normal => match k.code {
KeyCode::Char(':') => Some(Action::SetMode(Mode::Command)),
KeyCode::Char('q') => Some(Action::Quit),
KeyCode::Char('c') if k.modifiers == KeyModifiers::CONTROL => Some(Action::Quit),
KeyCode::Esc => Some(Action::ClosePlot),
KeyCode::Up => Some(Action::NavUp),
KeyCode::Down => Some(Action::NavDown),
KeyCode::Left => Some(Action::NavLeft),
KeyCode::Right => Some(Action::NavRight),
KeyCode::BackTab => Some(Action::NavTabPrev),
KeyCode::Tab => Some(Action::NavTabNext),
KeyCode::PageUp => Some(Action::NavScroll(-10)),
KeyCode::PageDown => Some(Action::NavScroll(10)),
KeyCode::Home => Some(Action::NavHome),
KeyCode::End => Some(Action::NavEnd),
KeyCode::Enter => Some(Action::TogglePlot),
KeyCode::Char('f') => Some(Action::ToggleFft),
KeyCode::Char('h') => Some(Action::ToggleFooter),
KeyCode::Char('r') => Some(Action::ToggleRoutes),
KeyCode::Char('=') => Some(Action::AdjustWindow(PLOT_WINDOW_FINE_STEP_SECONDS)),
KeyCode::Char('-') => Some(Action::AdjustWindow(-PLOT_WINDOW_FINE_STEP_SECONDS)),
KeyCode::Char('+') => Some(Action::AdjustWindow(PLOT_WINDOW_COARSE_STEP_SECONDS)),
KeyCode::Char('_') => Some(Action::AdjustWindow(-PLOT_WINDOW_COARSE_STEP_SECONDS)),
KeyCode::Char('[') => Some(Action::AdjustPlotWidth(5)),
KeyCode::Char(']') => Some(Action::AdjustPlotWidth(-5)),
KeyCode::Char(',') | KeyCode::Char('<') => Some(Action::AdjustPrecision(-1)),
KeyCode::Char('.') | KeyCode::Char('>') => Some(Action::AdjustPrecision(1)),
_ => None,
},
}
} else {
None
}
}
fn draw_ui(terminal: &mut DefaultTerminal, app: &mut MonitorState) -> Result<(), io::Error> {
terminal.draw(|f| {
let size = f.area();
let height = size.height;
let (main_area, footer_area) = {
let (main_constraint, footer_constraint) = if app.mode == Mode::Command {
if height >= 18 {
(
Constraint::Min(10),
Constraint::Length(5 + app.palette.suggestion_rows()),
)
} else if height >= 12 {
(Constraint::Min(2), Constraint::Length(8))
} else if height >= 5 {
(Constraint::Min(2), Constraint::Length(3))
} else {
(Constraint::Min(0), Constraint::Length(2))
}
} else if app.view.show_footer {
(Constraint::Min(10), Constraint::Length(6))
} else {
(Constraint::Min(10), Constraint::Length(2))
};
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([main_constraint, footer_constraint])
.split(size);
(chunks[0], Some(chunks[1]))
};
let (left, right) = if app.mode == Mode::Command && height < 3 {
(None, None)
} else if app.view.show_plot {
let chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([
Constraint::Percentage(100 - app.view.plot_width_percent),
Constraint::Percentage(app.view.plot_width_percent),
])
.split(main_area);
(Some(chunks[0]), Some(chunks[1]))
} else {
(Some(main_area), None)
};
if let Some(l) = left {
render_monitor_panel(f, app, l, Instant::now());
}
if let Some(r) = right {
render_graphics_panel(f, app, r);
}
if let Some(foot) = footer_area {
render_footer(f, app, foot);
}
})?;
Ok(())
}
fn render_monitor_panel(f: &mut Frame, app: &mut MonitorState, area: Rect, now: Instant) {
let inner = Rect {
x: area.x,
y: area.y,
width: area.width.saturating_sub(1),
height: area.height,
};
let (lines, col_map) = build_left_lines(app, now);
let total = lines.len();
let view_h = inner.height as usize;
if app.view.follow_selection {
if let Some(&line_idx) = col_map.get(&app.nav.idx) {
if view_h > 0 && total > view_h {
let cur = app.view.scroll as usize;
if line_idx < cur || line_idx >= cur + view_h {
app.view.scroll = line_idx
.saturating_sub(view_h / 2)
.min(total.saturating_sub(view_h))
as u16;
}
} else {
app.view.scroll = 0;
}
}
}
app.view.scroll = (app.view.scroll as usize).min(total.saturating_sub(view_h)) as u16;
f.render_widget(Paragraph::new(lines).scroll((app.view.scroll, 0)), inner);
if total > view_h {
let sb_area = Rect {
x: area.x + area.width - 1,
y: area.y,
width: 1,
height: area.height,
};
let track_len = view_h;
let thumb_len = (track_len * track_len / total).max(1);
let max_thumb_pos = track_len - thumb_len;
let scroll_max = total - track_len;
let thumb_pos = (app.view.scroll as usize * max_thumb_pos) / scroll_max;
for i in 0..track_len {
let ch = if i >= thumb_pos && i < thumb_pos + thumb_len {
"█"
} else {
"│"
};
f.render_widget(
Paragraph::new(ch).style(Style::default().fg(Color::DarkGray)),
Rect {
x: sb_area.x,
y: sb_area.y + i as u16,
width: 1,
height: 1,
},
);
}
}
}
fn stale_threshold(sample: &Sample) -> Duration {
let rate = sample.segment.sampling_rate as f64 / sample.segment.decimation.max(1) as f64;
let period_ms = if rate > 0.0 { 1000.0 / rate } else { 0.0 };
Duration::from_millis((period_ms * 2.0).max(1200.0) as u64)
}
fn build_left_lines(
app: &mut MonitorState,
now: Instant,
) -> (Vec<Line<'static>>, HashMap<usize, usize>) {
let mut lines = Vec::new();
let mut map = HashMap::new();
let routes = app.visible_routes();
if routes.is_empty() {
lines.push(Line::from("Waiting for data..."));
return (lines, map);
}
let mut global_idx = 0;
app.view.desc_width = app
.last
.values()
.flat_map(|(s, _)| s.columns.iter())
.map(|c| c.desc.description.len())
.max()
.unwrap_or(0);
app.view.units_width = app
.last
.values()
.flat_map(|(s, _)| s.columns.iter())
.map(|c| c.desc.units.len())
.max()
.unwrap_or(0);
for (dev_idx, route) in routes.iter().enumerate() {
let dev = app.device_metadata.get(route).map(|m| m.device.as_ref());
let status = app.device_status.get(route);
let is_alive = status
.map(|s| s.is_alive(Duration::from_millis(300)))
.unwrap_or(false);
let head_style = if dev_idx == app.current_device_index() {
Style::default().add_modifier(Modifier::BOLD | Modifier::UNDERLINED)
} else {
Style::default().add_modifier(Modifier::BOLD)
};
let header_text = if let Some(d) = dev {
if d.serial_number.is_empty() {
d.name.clone()
} else {
format!("{} Serial: {}", d.name, d.serial_number)
}
} else {
format!("<{}>", route)
};
let status_indicator = if is_alive { "●" } else { "○" };
let status_color = if is_alive {
Color::Green
} else {
Color::DarkGray
};
let mut header_spans = vec![
Span::styled(
format!("{} ", status_indicator),
Style::default().fg(status_color),
),
Span::styled(header_text, head_style),
];
if app.view.show_routes {
header_spans.push(Span::raw(format!(" [{}]", route)));
}
lines.push(Line::from(header_spans));
let mut stream_ids: Vec<_> = app
.last
.keys()
.filter(|k| &k.route == route)
.map(|k| k.stream_id)
.collect();
stream_ids.sort();
if stream_ids.is_empty() {
map.insert(global_idx, lines.len());
global_idx += 1;
lines.push(Line::from(Span::styled(
" (no streams yet)",
Style::default().fg(Color::DarkGray),
)));
}
for sid in stream_ids {
let key = StreamKey::new(route.clone(), sid);
if let Some((sample, seen)) = app.last.get(&key) {
let is_stale = now.saturating_duration_since(*seen) > stale_threshold(sample);
for col in &sample.columns {
let nav_idx = global_idx;
global_idx += 1;
map.insert(nav_idx, lines.len());
let is_sel = app.nav.idx == nav_idx;
let ctx = StyleContext::new()
.stale(is_stale)
.selected(is_sel)
.plot_mode(app.view.show_plot);
let label_style = ctx.resolve();
let (val_str, val_f64) = fmt_value(&col.value);
let val_col = app
.view
.theme
.get_value_color(&sample.stream.name, &col.desc.name, val_f64)
.unwrap_or(Color::Reset);
let val_style = ctx.color(val_col).resolve();
let mut desc = col.desc.description.clone();
if desc.len() < app.view.desc_width {
desc.push_str(&" ".repeat(app.view.desc_width - desc.len()));
}
let units = col.desc.units.clone();
let padded_units = if app.view.units_width > 0 && !units.is_empty() {
format!("{:>width$}", units, width = app.view.units_width)
} else if app.view.units_width > 0 {
" ".repeat(app.view.units_width)
} else {
String::new()
};
lines.push(Line::from(vec![
Span::styled(desc, label_style),
Span::raw(" "),
Span::styled(val_str, val_style),
Span::raw(" "),
Span::styled(padded_units, val_style),
]));
}
}
}
lines.push(Line::from(""));
}
(lines, map)
}
fn render_footer(f: &mut Frame, app: &mut MonitorState, area: Rect) {
app.footer_height = area.height; if app.mode == Mode::Command {
let route = app.current_route();
let status = app.rpc_palette_status(&route);
let registry = app.rpc_routes.get(&route).and_then(RouteRpcState::registry);
app.palette
.render(f, area, &route, registry, status, app.blink_state);
return;
}
if !app.view.show_footer {
let minimal = Line::from(vec![
Span::raw(" "),
key_span("h"),
Span::raw(" Toggle Footer"),
]);
f.render_widget(
Paragraph::new(vec![minimal]).block(
Block::default()
.borders(Borders::TOP)
.border_style(Style::default().fg(Color::DarkGray)),
),
area,
);
return;
}
let mut navigation_spans = vec![
Span::styled(
" Navigation ",
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
),
key_span("↑"),
key_sep(),
key_span("↓"),
Span::raw(" All "),
key_span("←"),
key_sep(),
key_span("→"),
Span::raw(" Streams"),
];
if app.device_count() > 1 {
navigation_spans.push(Span::raw(" "));
navigation_spans.push(key_span("Tab"));
navigation_spans.push(key_sep());
navigation_spans.push(key_span("Shift+Tab"));
navigation_spans.push(Span::raw(" Devices"));
}
let navigation_line = Line::from(navigation_spans);
let toggle_line = Line::from(vec![
Span::styled(
" Toggle ",
Style::default()
.fg(Color::Green)
.add_modifier(Modifier::BOLD),
),
key_span("Enter"),
Span::raw(" Plot "),
key_span("f"),
Span::raw(" FFT "),
key_span("h"),
Span::raw(" Footer "),
key_span("r"),
Span::raw(" Routes "),
key_span(":"),
Span::raw(" Cmd"),
]);
let window_line = Line::from(vec![
Span::styled(
" Plot ",
Style::default()
.fg(Color::Yellow)
.add_modifier(Modifier::BOLD),
),
key_span("+"),
key_sep(),
key_span("-"),
Span::raw(" Window (0.5s, Shift 5.0s) "),
key_span("["),
key_sep(),
key_span("]"),
Span::raw(" Plot Width "),
key_span("<"),
key_sep(),
key_span(">"),
Span::raw(" Plot Precision"),
]);
let scroll_line = Line::from(vec![
Span::styled(
" Scroll ",
Style::default()
.fg(Color::Magenta)
.add_modifier(Modifier::BOLD),
),
key_span("Home"),
key_sep(),
key_span("End"),
key_sep(),
key_span("PgUp"),
key_sep(),
key_span("PgDn"),
]);
let quit_line = Line::from(vec![
Span::styled(
" Quit ",
Style::default().fg(Color::Red).add_modifier(Modifier::BOLD),
),
key_span("q"),
Span::raw(" / "),
key_span("Ctrl+C"),
Span::raw(" Quit"),
]);
let lines = vec![
navigation_line,
toggle_line,
window_line,
scroll_line,
quit_line,
];
let block = Block::default()
.borders(Borders::TOP)
.border_style(Style::default().fg(Color::DarkGray))
.title(Span::styled(
" Controls ",
Style::default().add_modifier(Modifier::BOLD),
));
f.render_widget(Paragraph::new(lines).block(block), area);
}
fn key_span(text: &str) -> Span<'static> {
Span::styled(
format!(" {} ", text),
Style::default()
.fg(Color::White)
.bg(Color::DarkGray)
.add_modifier(Modifier::BOLD),
)
}
fn key_sep() -> Span<'static> {
Span::raw(" ")
}
fn latest_complete_welch_signal(signal: &[f64]) -> (&[f64], usize, usize) {
let denominator =
WELCH_DEFAULT_SEGMENTS as f64 * (1.0 - WELCH_DEFAULT_OVERLAP) + WELCH_DEFAULT_OVERLAP;
let default_segment_size = (signal.len() as f64 / denominator).trunc().max(1.0) as usize;
if default_segment_size.next_power_of_two() <= WELCH_DFT_MAX_SIZE {
let hop_size = default_segment_size
- (default_segment_size as f64 * WELCH_DEFAULT_OVERLAP).round() as usize;
return (signal, default_segment_size, hop_size.max(1));
}
let segment_size = WELCH_DFT_MAX_SIZE;
let hop_size = segment_size - (segment_size as f64 * WELCH_DEFAULT_OVERLAP).round() as usize;
let segment_count = (signal.len() - segment_size) / hop_size + 1;
let used_len = (segment_count - 1) * hop_size + segment_size;
(
&signal[signal.len() - used_len..],
segment_size,
hop_size.max(1),
)
}
fn render_graphics_panel(f: &mut Frame, app: &MonitorState, area: Rect) {
if let (Some(pos), Some((desc, units))) = (app.current_pos(), app.get_focused_channel_info()) {
let route = pos.route();
if app.view.show_fft {
match app.get_spectral_density_data() {
FftStatus::Ready(sd) => {
let title = format!(
"{} — {} ({:.1}s, FFT {} of {} samples, seg {}, hop {}) | Median ASD: {:.3e} {}/√Hz",
route,
desc,
app.view.plot_window_seconds,
sd.sample_count,
sd.total_sample_count,
sd.segment_size,
sd.hop_size,
sd.median_asd,
units
);
let block = Block::default().title(title).borders(Borders::ALL);
if !sd.points.is_empty() {
let log_data: Vec<(f64, f64)> = sd
.points
.iter()
.map(|(freq, val)| (freq.log10(), val.log10()))
.collect();
let min_f = log_data.first().map(|(f, _)| *f).unwrap_or(0.0);
let max_f = log_data.last().map(|(f, _)| *f).unwrap_or(1.0);
let ds: Vec<f64> = log_data.iter().map(|(_, d)| *d).collect();
let min_d = ds.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_d = ds.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let y_pad = if (max_d - min_d) > 0.1 {
(max_d - min_d) * 0.1
} else {
0.5
};
let dataset = Dataset::default()
.name(desc.as_str())
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.graph_type(GraphType::Line)
.data(&log_data);
let chart = Chart::new(vec![dataset])
.block(block)
.x_axis(
Axis::default()
.title("Freq [Hz] (log)")
.bounds([min_f, max_f])
.labels(generate_log_labels(
min_f,
max_f,
5,
app.view.axis_precision,
)),
)
.y_axis(
Axis::default()
.title(format!("Val [{}/√Hz]", units))
.bounds([min_d - y_pad, max_d + y_pad])
.labels(generate_log_labels(
min_d - y_pad,
max_d + y_pad,
5,
app.view.axis_precision,
)),
);
f.render_widget(chart, area);
} else {
f.render_widget(Paragraph::new("No valid FFT data").block(block), area);
}
}
FftStatus::TooFewSamples {
have,
need,
sampling_hz,
window_seconds,
} => {
let block = Block::default()
.title(format!("FFT unavailable - {} samples needed", need))
.borders(Borders::ALL);
let message = format!(
"Current FFT buffer has {} of {} samples. At {:.3} Hz, current window is {:.1}s.",
have, need, sampling_hz, window_seconds
);
f.render_widget(Paragraph::new(message).block(block), area);
}
FftStatus::InvalidSampleRate {
sampling_rate,
decimation,
} => {
let block = Block::default()
.title("FFT unavailable - invalid sample rate")
.borders(Borders::ALL);
let message = format!(
"Stream metadata reports sampling_rate={} and decimation={}.",
sampling_rate, decimation
);
f.render_widget(Paragraph::new(message).block(block), area);
}
FftStatus::NoValidFrequencyBins {
sample_count,
sampling_hz,
} => {
let block = Block::default()
.title("FFT unavailable - no valid frequency bins")
.borders(Borders::ALL);
let message = format!(
"Welch produced no positive finite bins from {} samples at {:.3} Hz.",
sample_count, sampling_hz
);
f.render_widget(Paragraph::new(message).block(block), area);
}
FftStatus::WaitingForSelection => {
let block = Block::default()
.title("FFT unavailable - no channel selected")
.borders(Borders::ALL);
f.render_widget(
Paragraph::new("Select a stream column to plot FFT.").block(block),
area,
);
}
FftStatus::WaitingForSamples => {
let block = Block::default()
.title("Buffering FFT...")
.borders(Borders::ALL);
f.render_widget(
Paragraph::new("Waiting for samples in the selected window.").block(block),
area,
);
}
}
} else {
let title = format!(
"{} — {} ({:.1}s)",
route, desc, app.view.plot_window_seconds
);
let block = Block::default().title(title).borders(Borders::ALL);
if let Some((data, _, _)) = app.get_plot_data() {
let min_t = data.first().map(|(t, _)| *t).unwrap_or(0.0);
let max_t = data.last().map(|(t, _)| *t).unwrap_or(1.0);
let vs: Vec<f64> = data.iter().map(|(_, v)| *v).collect();
let min_v = vs.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_v = vs.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let pad = if (max_v - min_v).abs() > 1e-10 {
(max_v - min_v) * 0.4
} else {
1.0
};
let dataset = Dataset::default()
.name(desc.as_str())
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Green))
.graph_type(GraphType::Line)
.data(&data);
let chart = Chart::new(vec![dataset])
.block(block)
.x_axis(
Axis::default()
.title("Time [s]")
.bounds([min_t, max_t])
.labels(generate_linear_labels(
min_t,
max_t,
3,
app.view.axis_precision,
)),
)
.y_axis(
Axis::default()
.title(format!("Value [{}]", units))
.bounds([min_v - pad, max_v + pad])
.labels(generate_linear_labels(
min_v - pad,
max_v + pad,
5,
app.view.axis_precision,
)),
);
f.render_widget(chart, area);
} else {
f.render_widget(Paragraph::new("Buffering...").block(block), area);
}
}
} else {
f.render_widget(
Block::default()
.title("Channel Detail")
.borders(Borders::ALL),
area,
);
}
}
fn generate_linear_labels(
min: f64,
max: f64,
count: usize,
precision: usize,
) -> Vec<Span<'static>> {
if count < 2 {
return vec![];
}
let step = (max - min) / ((count - 1) as f64);
(0..count)
.map(|i| {
let v = min + (i as f64 * step);
Span::from(format!("{:>10.p$}", v, p = precision))
})
.collect()
}
fn generate_log_labels(
min_log: f64,
max_log: f64,
count: usize,
precision: usize,
) -> Vec<Span<'static>> {
if count < 2 {
return vec![];
}
let step = (max_log - min_log) / ((count - 1) as f64);
let max_val = 10f64.powf(max_log.max(min_log)).abs();
let use_scientific = max_val < 0.01 || max_val >= 1000.0;
(0..count)
.map(|i| {
let log_val = min_log + (i as f64 * step);
let real_val = 10f64.powf(log_val);
let s = if use_scientific {
format!("{:.p$e}", real_val, p = precision)
} else {
format!("{:.p$}", real_val, p = precision)
};
Span::from(format!("{:>10}", s))
})
.collect()
}
fn fmt_value(v: &ColumnData) -> (String, f64) {
match v {
ColumnData::Float(x) => (format!("{:15.4}", x), *x as f64),
ColumnData::Int(x) => (format!("{:15}", x), *x as f64),
ColumnData::UInt(x) => (format!("{:15}", x), *x as f64),
_ => (" type?".to_string(), f64::NAN),
}
}
fn load_theme(path: &str) -> io::Result<Theme> {
let mut s = String::new();
File::open(path)?.read_to_string(&mut s)?;
let doc =
DocumentMut::from_str(&s).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut bounds = HashMap::new();
for (k, v) in doc.get_values() {
let col = k.iter().map(|k| k.get()).collect::<Vec<_>>().join(".");
if let Value::InlineTable(it) = v {
let (t, min) = if let Some(v) = get_num(it, "cold") {
(true, v)
} else {
(false, get_num(it, "min").unwrap_or(f64::NEG_INFINITY))
};
let max = if let Some(v) = get_num(it, "hot") {
v
} else {
get_num(it, "max").unwrap_or(f64::INFINITY)
};
bounds.insert(col, (min..=max, t));
}
}
Ok(Theme {
value_bounds: bounds,
})
}
fn get_num(it: &InlineTable, k: &str) -> Option<f64> {
it.get(k)
.and_then(|v| v.as_float().or(v.as_integer().map(|i| i as f64)))
}
fn run_monitor_app(config: MonitorConfig) -> eyre::Result<()> {
use eyre::WrapErr;
let MonitorConfig {
tio,
fps,
colors,
depth,
} = config;
let proxy = tio::proxy::Interface::new(&tio.root);
let parent_route: DeviceRoute = tio.route.clone();
let tree = DeviceTree::open(&proxy, parent_route.clone())
.wrap_err_with(|| format!("could not open device tree on {}", tio.root))
.with_proxy_help()?;
let data_rx = spawn_tree_worker(tree);
let rpc_client = RpcClient::open(&proxy, parent_route.clone())
.wrap_err_with(|| format!("could not open RPC client on {}", tio.root))
.with_proxy_help()?;
let (rpc_tx, rpc_resp_rx) = spawn_rpc_worker(rpc_client);
let (key_tx, key_rx) = channel::unbounded();
std::thread::spawn(move || loop {
if let Ok(ev) = event::read() {
if key_tx.send(ev).is_err() {
return;
}
}
});
let mut app = MonitorState::new(depth, &parent_route);
if let Some(path) = &colors {
if let Ok(theme) = load_theme(path) {
app.view.theme = theme;
} else {
eprintln!("Failed to load theme");
}
}
let mut buffer = Buffer::new(MONITOR_BUFFER_CAPACITY_SAMPLES);
let mut term = ratatui::init();
let _ = term.hide_cursor();
let ui_tick = channel::tick(Duration::from_millis(1000 / fps as u64));
let mut stream_error = None;
'main: loop {
crossbeam::select! {
recv(data_rx) -> item => {
match item {
Ok(Ok(TreeItem::Sample(sample, route))) => {
app.handle_sample(sample, route, &mut buffer);
}
Ok(Ok(TreeItem::Event(event))) => {
app.handle_event(event, &rpc_tx);
}
Ok(Err(e)) => {
stream_error = Some(e);
break 'main;
}
Err(_) => break 'main,
}
}
recv(key_rx) -> ev => {
if let Ok(ev) = ev {
if let Some(act) = get_action(ev, &mut app) {
if app.update(act, &rpc_tx) {
break 'main;
}
}
}
}
recv(rpc_resp_rx) -> resp => {
if let Ok(resp) = resp {
match resp {
RpcWorkerResp::List(list) => {
app.update_rpclists(list);
}
RpcWorkerResp::ListErr { route, error } => {
app.update_rpclist_error(route, error);
}
RpcWorkerResp::RpcResult(res) => {
let (msg, col) = match res.result {
Ok(s) => (
format!("{}: {}", app.palette.last_rpc_command(), s),
Color::Green,
),
Err(s) => (format!("ERR: {}", s), Color::Red),
};
app.palette.set_rpc_result(msg, col);
}
}
}
}
recv(ui_tick) -> _ => {
app.update_plot_window(&buffer);
app.rebuild_nav_items();
app.tick_blink();
if draw_ui(&mut term, &mut app).is_err() {
break 'main;
}
}
}
}
ratatui::restore();
if let Some(e) = stream_error {
use color_eyre::Help;
return Err(eyre::Report::new(e))
.wrap_err("lost connection to data source")
.suggestion("the data source went away (proxy exited or device disconnected); restart it and re-run this command");
}
Ok(())
}