1use std::{
4 collections::{BTreeMap, HashMap, HashSet},
5 fs::File,
6 io::{self, Read},
7 str::FromStr,
8 time::{Duration, Instant},
9};
10
11use crate::tui::{
12 rpc_palette::{PaletteEvent, RpcPalette, RpcPaletteStatus, RpcReq},
13 rpc_state::RouteRpcState,
14 rpc_worker::{spawn_rpc_worker, RpcWorkerReq, RpcWorkerResp},
15 tree_worker::spawn_tree_worker,
16};
17use crate::{MonitorCli, ProxyHelp, TioOpts};
18use crossbeam::channel::{self, Sender};
19use ratatui::{
20 crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
21 layout::{Constraint, Direction, Layout, Rect},
22 style::{Color, Modifier, Style},
23 symbols,
24 text::{Line, Span},
25 widgets::{Axis, Block, Borders, Chart, Dataset, GraphType, Paragraph},
26 DefaultTerminal, Frame,
27};
28use toml_edit::{DocumentMut, InlineTable, Value};
29use twinleaf::{
30 data::{
31 AlignedWindow, Buffer, ColumnBatch, ColumnData, ColumnKey, DeviceFullMetadata, Sample,
32 StreamKey,
33 },
34 device::{DeviceEvent, DeviceRoute, DeviceTree, RpcClient, RpcList, TreeEvent, TreeItem},
35 tio::{self, proto::ProxyStatus},
36};
37use welch_sde::{Build, SpectralDensity};
38
39pub fn run_monitor(config: MonitorConfig) -> eyre::Result<()> {
40 run_monitor_app(config)
41}
42
43#[derive(Debug, Clone)]
44pub struct MonitorConfig {
45 pub tio: TioOpts,
46 pub fps: u32,
47 pub colors: Option<String>,
48 pub depth: Option<usize>,
49}
50
51impl From<MonitorCli> for MonitorConfig {
52 fn from(cli: MonitorCli) -> Self {
53 Self {
54 tio: cli.tio,
55 fps: cli.fps,
56 colors: cli.colors,
57 depth: cli.depth,
58 }
59 }
60}
61
62const MIN_PLOT_WINDOW_SECONDS: f64 = 0.5;
63const MAX_PLOT_WINDOW_SECONDS: f64 = 60.0;
64const PLOT_WINDOW_FINE_STEP_SECONDS: f64 = 0.5;
65const PLOT_WINDOW_COARSE_STEP_SECONDS: f64 = 5.0;
66const MIN_FFT_SAMPLES: usize = 60;
67const MONITOR_BUFFER_CAPACITY_SAMPLES: usize = 2_000_000;
68const WELCH_DEFAULT_SEGMENTS: usize = 4;
69const WELCH_DEFAULT_OVERLAP: f64 = 0.5;
70const WELCH_DFT_MAX_SIZE: usize = 4096;
71
72#[derive(Debug, Clone)]
73pub enum NavPos {
74 EmptyDevice {
75 device_idx: usize,
76 route: DeviceRoute,
77 },
78 Column {
79 device_idx: usize,
80 stream_idx: usize,
81 spec: ColumnKey,
82 },
83}
84
85impl NavPos {
86 pub fn device_idx(&self) -> usize {
87 match self {
88 NavPos::EmptyDevice { device_idx, .. } => *device_idx,
89 NavPos::Column { device_idx, .. } => *device_idx,
90 }
91 }
92
93 pub fn route(&self) -> &DeviceRoute {
94 match self {
95 NavPos::EmptyDevice { route, .. } => route,
96 NavPos::Column { spec, .. } => &spec.route,
97 }
98 }
99
100 pub fn stream_idx(&self) -> Option<usize> {
101 match self {
102 NavPos::EmptyDevice { .. } => None,
103 NavPos::Column { stream_idx, .. } => Some(*stream_idx),
104 }
105 }
106
107 pub fn column_idx(&self) -> Option<usize> {
108 match self {
109 NavPos::EmptyDevice { .. } => None,
110 NavPos::Column { spec, .. } => Some(spec.column_id),
111 }
112 }
113
114 pub fn spec(&self) -> Option<&ColumnKey> {
115 match self {
116 NavPos::EmptyDevice { .. } => None,
117 NavPos::Column { spec, .. } => Some(spec),
118 }
119 }
120}
121
122#[derive(Debug, Clone, Default)]
123pub struct Nav {
124 pub idx: usize,
125}
126
127impl Nav {
128 pub fn step_linear(&mut self, items: &[NavPos], backward: bool) {
130 if items.is_empty() {
131 return;
132 }
133 let len = items.len();
134 self.idx = if backward {
135 (self.idx + len - 1) % len
136 } else {
137 (self.idx + 1) % len
138 };
139 }
140
141 pub fn step_between_streams(&mut self, items: &[NavPos], backward: bool) {
143 if items.is_empty() {
144 return;
145 }
146 let cur = &items[self.idx];
147 let (cur_dev, cur_stream, cur_column) = match cur {
148 NavPos::EmptyDevice { .. } => return,
149 NavPos::Column {
150 device_idx,
151 stream_idx,
152 spec,
153 } => (*device_idx, *stream_idx, spec.column_id),
154 };
155
156 let mut streams: Vec<usize> = items
158 .iter()
159 .filter_map(|pos| match pos {
160 NavPos::Column {
161 device_idx,
162 stream_idx,
163 ..
164 } if *device_idx == cur_dev => Some(*stream_idx),
165 _ => None,
166 })
167 .collect();
168 streams.dedup();
169
170 if streams.len() <= 1 {
171 return;
172 }
173
174 let pos = streams.iter().position(|&s| s == cur_stream).unwrap_or(0);
175 let len = streams.len();
176 let target_stream = streams[if backward {
177 (pos + len - 1) % len
178 } else {
179 (pos + 1) % len
180 }];
181
182 self.idx = items
184 .iter()
185 .enumerate()
186 .filter(|(_, pos)| {
187 matches!(pos,
188 NavPos::Column { device_idx, stream_idx, .. }
189 if *device_idx == cur_dev && *stream_idx == target_stream)
190 })
191 .min_by_key(|(_, pos)| {
192 (pos.column_idx().unwrap_or(0) as isize - cur_column as isize).abs()
193 })
194 .map(|(i, _)| i)
195 .unwrap_or(self.idx);
196 }
197
198 pub fn step_device(&mut self, items: &[NavPos], backward: bool) {
200 if items.is_empty() {
201 return;
202 }
203
204 let cur = &items[self.idx];
205 let cur_device = cur.device_idx();
206 let cur_stream = cur.stream_idx().unwrap_or(0);
207 let cur_column = cur.column_idx().unwrap_or(0);
208
209 let mut device_indices: Vec<usize> = items.iter().map(|p| p.device_idx()).collect();
211 device_indices.sort();
212 device_indices.dedup();
213
214 if device_indices.len() <= 1 {
215 return;
216 }
217
218 let dev_pos = device_indices
220 .iter()
221 .position(|&d| d == cur_device)
222 .unwrap_or(0);
223 let len = device_indices.len();
224 let new_dev_pos = if backward {
225 (dev_pos + len - 1) % len
226 } else {
227 (dev_pos + 1) % len
228 };
229 let target_device = device_indices[new_dev_pos];
230
231 self.idx = items
233 .iter()
234 .enumerate()
235 .filter(|(_, pos)| pos.device_idx() == target_device)
236 .map(|(i, pos)| {
237 let dist = match pos {
238 NavPos::EmptyDevice { .. } => (0, 0),
239 NavPos::Column {
240 stream_idx, spec, ..
241 } => {
242 let s = (*stream_idx as isize - cur_stream as isize).abs();
243 let c = (spec.column_id as isize - cur_column as isize).abs();
244 (s, c)
245 }
246 };
247 (i, dist)
248 })
249 .min_by_key(|&(_, dist)| dist)
250 .map(|(i, _)| i)
251 .unwrap_or(self.idx);
252 }
253
254 pub fn home(&mut self, items: &[NavPos]) {
255 if !items.is_empty() {
256 self.idx = 0;
257 }
258 }
259
260 pub fn end(&mut self, items: &[NavPos]) {
261 if !items.is_empty() {
262 self.idx = items.len() - 1;
263 }
264 }
265}
266
267#[derive(Debug, Clone, Default)]
268pub struct Theme {
269 pub value_bounds: HashMap<String, (std::ops::RangeInclusive<f64>, bool)>,
270}
271
272impl Theme {
273 pub fn get_value_color(&self, stream: &str, col: &str, val: f64) -> Option<Color> {
274 if val.is_nan() {
275 return Some(Color::Yellow);
276 }
277 let key = format!("{}.{}", stream, col);
278 if let Some((range, is_temp)) = self.value_bounds.get(&key) {
279 if val < *range.start() {
280 Some(if *is_temp { Color::Blue } else { Color::Red })
281 } else if val > *range.end() {
282 Some(Color::Red)
283 } else {
284 Some(Color::Green)
285 }
286 } else {
287 None
288 }
289 }
290}
291
292pub struct StyleContext {
293 pub is_selected: bool,
294 pub is_stale: bool,
295 pub in_plot_mode: bool,
296 pub base_color: Color,
297}
298
299impl Default for StyleContext {
300 fn default() -> Self {
301 Self {
302 is_selected: false,
303 is_stale: false,
304 in_plot_mode: false,
305 base_color: Color::Reset,
306 }
307 }
308}
309
310impl StyleContext {
311 pub fn new() -> Self {
312 Self::default()
313 }
314 pub fn selected(mut self, yes: bool) -> Self {
315 self.is_selected = yes;
316 self
317 }
318 pub fn stale(mut self, yes: bool) -> Self {
319 self.is_stale = yes;
320 self
321 }
322 pub fn plot_mode(mut self, yes: bool) -> Self {
323 self.in_plot_mode = yes;
324 self
325 }
326 pub fn color(mut self, c: Color) -> Self {
327 self.base_color = c;
328 self
329 }
330
331 pub fn resolve(&self) -> Style {
332 let mut s = Style::default().fg(self.base_color);
333 if self.is_stale {
334 s = s.add_modifier(Modifier::DIM);
335 }
336 if self.is_selected {
337 s = s.add_modifier(Modifier::BOLD);
338 if !self.in_plot_mode {
339 s = s.add_modifier(Modifier::RAPID_BLINK);
340 }
341 }
342 s
343 }
344}
345
346#[derive(Debug, Clone, Default)]
347pub struct DeviceStatus {
348 pub last_heartbeat: Option<Instant>,
349 pub connected: bool,
350}
351
352impl DeviceStatus {
353 pub fn on_heartbeat(&mut self) {
354 self.last_heartbeat = Some(Instant::now());
355 self.connected = true;
356 }
357
358 pub fn is_alive(&self, timeout: Duration) -> bool {
359 self.last_heartbeat
360 .map(|t| t.elapsed() < timeout)
361 .unwrap_or(false)
362 }
363}
364
365#[derive(Debug, Clone, PartialEq)]
366pub enum Mode {
367 Normal,
368 Command,
369}
370
371#[derive(Debug, Clone)]
372pub enum Action {
373 Quit,
374 SetMode(Mode),
375 ExecuteRpc(RpcReq),
376 SelectRoute(DeviceRoute),
377 NavUp,
378 NavDown,
379 NavLeft,
380 NavRight,
381 NavTabNext,
382 NavTabPrev,
383 NavScroll(i16),
384 NavHome,
385 NavEnd,
386 TogglePlot,
387 ClosePlot,
388 ToggleFft,
389 ToggleFooter,
390 ToggleRoutes,
391 AdjustWindow(f64),
392 AdjustPlotWidth(i16),
393 AdjustPrecision(i8),
394}
395
396#[derive(Debug, Clone)]
397pub struct ViewConfig {
398 pub show_plot: bool,
399 pub show_footer: bool,
400 pub show_routes: bool,
401 pub show_fft: bool,
402 pub plot_window_seconds: f64,
403 pub plot_width_percent: u16,
404 pub axis_precision: usize,
405 pub follow_selection: bool,
406 pub scroll: u16,
407 pub desc_width: usize,
408 pub units_width: usize,
409 pub theme: Theme,
410}
411
412impl Default for ViewConfig {
413 fn default() -> Self {
414 Self {
415 show_plot: false,
416 show_footer: true,
417 show_routes: false,
418 show_fft: false,
419 plot_window_seconds: 5.0,
420 plot_width_percent: 70,
421 axis_precision: 3,
422 follow_selection: true,
423 scroll: 0,
424 desc_width: 0,
425 units_width: 0,
426 theme: Theme::default(),
427 }
428 }
429}
430
431pub struct FftReadyData {
432 pub points: Vec<(f64, f64)>,
433 pub median_asd: f64,
434 pub sample_count: usize,
435 pub total_sample_count: usize,
436 pub sampling_hz: f64,
437 pub segment_size: usize,
438 pub hop_size: usize,
439}
440
441pub enum FftStatus {
442 Ready(FftReadyData),
443 WaitingForSelection,
444 WaitingForSamples,
445 InvalidSampleRate {
446 sampling_rate: u32,
447 decimation: u32,
448 },
449 TooFewSamples {
450 have: usize,
451 need: usize,
452 sampling_hz: f64,
453 window_seconds: f64,
454 },
455 NoValidFrequencyBins {
456 sample_count: usize,
457 sampling_hz: f64,
458 },
459}
460
461pub struct MonitorState {
462 pub depth_limit: Option<usize>,
463 pub parent_route: DeviceRoute,
464 pub mode: Mode,
465 pub view: ViewConfig,
466
467 pub nav: Nav,
468 pub nav_items: Vec<NavPos>,
469
470 pub discovered_routes: HashSet<DeviceRoute>,
471 pub device_status: HashMap<DeviceRoute, DeviceStatus>,
472 pub last: BTreeMap<StreamKey, (Sample, Instant)>,
473 pub device_metadata: HashMap<DeviceRoute, DeviceFullMetadata>,
474 pub window_aligned: Option<AlignedWindow>,
475
476 pub footer_height: u16,
477 pub rpc_routes: HashMap<DeviceRoute, RouteRpcState>,
478 pub palette: RpcPalette,
479 pub blink_state: bool,
480 pub last_blink: Instant,
481}
482
483impl MonitorState {
484 pub fn new(depth_limit: Option<usize>, parent_route: &DeviceRoute) -> Self {
485 Self {
486 depth_limit,
487 parent_route: parent_route.clone(),
488 mode: Mode::Normal,
489 view: ViewConfig::default(),
490 nav: Nav::default(),
491 nav_items: Vec::new(),
492 discovered_routes: HashSet::new(),
493 device_status: HashMap::new(),
494 last: BTreeMap::new(),
495 device_metadata: HashMap::new(),
496 window_aligned: None,
497 footer_height: 0,
498 rpc_routes: HashMap::new(),
499 palette: RpcPalette::default(),
500 blink_state: true,
501 last_blink: Instant::now(),
502 }
503 }
504
505 fn rpc_palette_status(&self, route: &DeviceRoute) -> RpcPaletteStatus {
506 self.rpc_routes
507 .get(route)
508 .map(RouteRpcState::palette_status)
509 .unwrap_or(RpcPaletteStatus::WaitingForRpc)
510 }
511
512 fn update_palette_suggestions_for(&mut self, route: &DeviceRoute) {
513 if self.mode == Mode::Command && self.current_route() == *route {
514 let registry = self.rpc_routes.get(route).and_then(RouteRpcState::registry);
515 self.palette.update_suggestions(registry);
516 }
517 }
518
519 fn update(&mut self, action: Action, rpc_tx: &Sender<RpcWorkerReq>) -> bool {
520 match action {
521 Action::Quit => return true,
522 Action::SetMode(Mode::Command) => {
523 let route = self.current_route();
524 let registry = self
525 .rpc_routes
526 .get(&route)
527 .and_then(RouteRpcState::registry);
528 self.palette.enter(registry);
529 self.mode = Mode::Command;
530 }
531 Action::SetMode(Mode::Normal) => {
532 self.mode = Mode::Normal;
533 self.palette.exit();
534 }
535 Action::ExecuteRpc(req) => {
536 let _ = rpc_tx.send(RpcWorkerReq::Execute(req));
537 }
538 Action::SelectRoute(route) => {
539 self.view.follow_selection = true;
540 if let Some(idx) = self.nav_items.iter().position(|p| p.route() == &route) {
541 self.nav.idx = idx;
542 }
543 let registry = self
544 .rpc_routes
545 .get(&route)
546 .and_then(RouteRpcState::registry);
547 self.palette.update_suggestions(registry);
548 }
549 Action::NavUp => {
550 self.view.follow_selection = true;
551 self.nav.step_linear(&self.nav_items, true);
552 }
553 Action::NavDown => {
554 self.view.follow_selection = true;
555 self.nav.step_linear(&self.nav_items, false);
556 }
557 Action::NavLeft => {
558 self.view.follow_selection = true;
559 self.nav.step_between_streams(&self.nav_items, true);
560 }
561 Action::NavRight => {
562 self.view.follow_selection = true;
563 self.nav.step_between_streams(&self.nav_items, false);
564 }
565 Action::NavTabNext => {
566 self.view.follow_selection = true;
567 self.nav.step_device(&self.nav_items, false);
568 }
569 Action::NavTabPrev => {
570 self.view.follow_selection = true;
571 self.nav.step_device(&self.nav_items, true);
572 }
573 Action::NavScroll(delta) => {
574 self.view.follow_selection = false;
575 self.view.scroll = if delta < 0 {
576 self.view.scroll.saturating_sub(delta.abs() as u16)
577 } else {
578 self.view.scroll.saturating_add(delta as u16)
579 };
580 }
581 Action::NavHome => {
582 self.view.follow_selection = true;
583 self.nav.home(&self.nav_items);
584 }
585 Action::NavEnd => {
586 self.view.follow_selection = true;
587 self.nav.end(&self.nav_items);
588 }
589 Action::TogglePlot => {
590 if self.current_selection().is_some() {
591 self.view.show_plot = !self.view.show_plot;
592 }
593 }
594 Action::ClosePlot => {
595 self.view.show_plot = false;
596 }
597 Action::ToggleFft => {
598 if self.view.show_plot {
599 self.view.show_fft = !self.view.show_fft;
600 }
601 }
602 Action::ToggleFooter => self.view.show_footer = !self.view.show_footer,
603 Action::ToggleRoutes => self.view.show_routes = !self.view.show_routes,
604 Action::AdjustWindow(d) => {
605 self.view.plot_window_seconds = (self.view.plot_window_seconds + d)
606 .clamp(MIN_PLOT_WINDOW_SECONDS, MAX_PLOT_WINDOW_SECONDS)
607 }
608 Action::AdjustPlotWidth(d) => {
609 self.view.plot_width_percent =
610 (self.view.plot_width_percent as i16 + d).clamp(20, 90) as u16
611 }
612 Action::AdjustPrecision(delta) => {
613 let new_p = self.view.axis_precision as i16 + delta as i16;
614 self.view.axis_precision = new_p.clamp(0, 5) as usize;
615 }
616 }
617 false
618 }
619
620 fn update_rpclists(&mut self, list: RpcList) {
621 let route = list.route.clone();
622 self.rpc_routes
623 .entry(route.clone())
624 .or_default()
625 .on_fetch_success(&list);
626 self.update_palette_suggestions_for(&route);
627 }
628
629 fn update_rpclist_error(&mut self, route: DeviceRoute, error: String) {
630 self.rpc_routes
631 .entry(route.clone())
632 .or_default()
633 .on_fetch_error(error);
634 self.update_palette_suggestions_for(&route);
635 }
636
637 pub fn visible_routes(&self) -> Vec<DeviceRoute> {
638 let mut routes: Vec<_> = self
639 .discovered_routes
640 .iter()
641 .filter(|r| match self.parent_route.relative_route(r) {
642 Ok(rel) => self.depth_limit.map_or(true, |max| rel.len() <= max),
643 Err(_) => false,
644 })
645 .cloned()
646 .collect();
647 routes.sort();
648 routes
649 }
650
651 pub fn rebuild_nav_items(&mut self) {
652 let prev_selection = self.nav_items.get(self.nav.idx).cloned();
653
654 let routes = self.visible_routes();
655 let mut new_items = Vec::new();
656
657 for (dev_idx, route) in routes.iter().enumerate() {
658 let mut stream_ids: Vec<_> = self
659 .last
660 .keys()
661 .filter(|k| &k.route == route)
662 .map(|k| k.stream_id)
663 .collect();
664 stream_ids.sort();
665 stream_ids.dedup();
666
667 if stream_ids.is_empty() {
668 new_items.push(NavPos::EmptyDevice {
669 device_idx: dev_idx,
670 route: route.clone(),
671 });
672 } else {
673 for (stream_idx, sid) in stream_ids.iter().enumerate() {
674 let key = StreamKey::new(route.clone(), *sid);
675 if let Some((sample, _)) = self.last.get(&key) {
676 for (column_idx, _) in sample.columns.iter().enumerate() {
677 new_items.push(NavPos::Column {
678 device_idx: dev_idx,
679 stream_idx,
680 spec: ColumnKey {
681 route: route.clone(),
682 stream_id: *sid,
683 column_id: column_idx,
684 },
685 });
686 }
687 }
688 }
689 }
690 }
691
692 self.nav_items = new_items;
693
694 if self.nav_items.is_empty() {
695 self.nav.idx = 0;
696 return;
697 }
698
699 self.nav.idx = prev_selection
702 .and_then(|prev| {
703 let prev_spec = prev.spec().cloned();
704 let prev_route = prev.route().clone();
705 self.nav_items
706 .iter()
707 .position(|pos| match (&prev_spec, pos.spec()) {
708 (Some(a), Some(b)) => a == b,
709 (None, _) => pos.route() == &prev_route,
710 _ => false,
711 })
712 })
713 .unwrap_or_else(|| self.nav.idx.min(self.nav_items.len() - 1));
714 }
715
716 pub fn current_pos(&self) -> Option<&NavPos> {
717 self.nav_items.get(self.nav.idx)
718 }
719
720 pub fn current_selection(&self) -> Option<ColumnKey> {
721 self.current_pos().and_then(|p| p.spec().cloned())
722 }
723
724 pub fn current_route(&self) -> DeviceRoute {
725 self.current_pos()
726 .map(|p| p.route().clone())
727 .unwrap_or_else(|| self.parent_route.clone())
728 }
729
730 pub fn current_device_index(&self) -> usize {
731 self.current_pos().map(|p| p.device_idx()).unwrap_or(0)
732 }
733
734 pub fn device_count(&self) -> usize {
735 self.visible_routes().len()
736 }
737
738 fn handle_event(&mut self, event: TreeEvent, rpc_tx: &Sender<RpcWorkerReq>) {
739 match event {
740 TreeEvent::RouteDiscovered(route) => {
741 self.discovered_routes.insert(route.clone());
742 if self
743 .rpc_routes
744 .entry(route.clone())
745 .or_default()
746 .on_route_discovered()
747 {
748 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
749 }
750 self.device_status.entry(route).or_default();
751 }
752 TreeEvent::Device {
753 route,
754 event: DeviceEvent::NewHash(hash),
755 } => {
756 if self
757 .rpc_routes
758 .entry(route.clone())
759 .or_default()
760 .on_new_hash(hash)
761 {
762 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
763 }
764 }
765 TreeEvent::Device {
766 route,
767 event: DeviceEvent::Heartbeat { session_id },
768 } => {
769 if self
770 .rpc_routes
771 .entry(route.clone())
772 .or_default()
773 .on_heartbeat(session_id)
774 {
775 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
776 }
777 self.device_status.entry(route).or_default().on_heartbeat();
778 }
779 TreeEvent::Device {
780 route,
781 event: DeviceEvent::Status(status),
782 } => {
783 if self
784 .rpc_routes
785 .entry(route.clone())
786 .or_default()
787 .on_status(status)
788 {
789 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
790 }
791 let dev_status = self.device_status.entry(route.clone()).or_default();
792 match status {
793 ProxyStatus::SensorDisconnected => dev_status.connected = false,
794 ProxyStatus::SensorReconnected => dev_status.connected = true,
795 _ => {}
796 }
797 }
798 TreeEvent::Device {
799 route,
800 event: DeviceEvent::MetadataReady(metadata),
801 } => {
802 self.device_metadata.insert(route, metadata);
803 }
804 TreeEvent::Device {
805 event: DeviceEvent::RpcInvalidated(_),
806 ..
807 } => {}
808 }
809 }
810
811 pub fn handle_sample(&mut self, sample: Sample, route: DeviceRoute, buffer: &mut Buffer) {
812 let stream_key = StreamKey::new(route.clone(), sample.stream.stream_id);
813 buffer.process_sample(sample.clone(), stream_key.clone());
814 self.last.insert(stream_key, (sample, Instant::now()));
815 }
816
817 pub fn update_plot_window(&mut self, buffer: &Buffer) {
818 if !self.view.show_plot {
819 self.window_aligned = None;
820 return;
821 }
822
823 self.window_aligned = self.current_selection().and_then(|col| {
824 let stream_key = col.stream_key();
825 let run = buffer.get_run(&stream_key)?;
826 let n_samples = (self.view.plot_window_seconds * run.effective_rate)
827 .ceil()
828 .max(10.0) as usize;
829 buffer.read_aligned_window(&[col], n_samples).ok()
830 });
831 }
832
833 pub fn get_plot_data(&self) -> Option<(Vec<(f64, f64)>, f64, f64)> {
834 let spec = self.current_selection()?;
835 let win = self.window_aligned.as_ref()?;
836 let batch = win.columns.get(&spec)?;
837 if win.timestamps.is_empty() {
838 return None;
839 }
840 let data: Vec<(f64, f64)> = match batch {
841 ColumnBatch::F64(v) => win
842 .timestamps
843 .iter()
844 .copied()
845 .zip(v.iter().copied())
846 .collect(),
847 ColumnBatch::I64(v) => win
848 .timestamps
849 .iter()
850 .copied()
851 .zip(v.iter().map(|&x| x as f64))
852 .collect(),
853 ColumnBatch::U64(v) => win
854 .timestamps
855 .iter()
856 .copied()
857 .zip(v.iter().map(|&x| x as f64))
858 .collect(),
859 };
860 if data.is_empty() {
861 return None;
862 }
863 let (cur_t, cur_v) = *data.last().unwrap();
864 Some((data, cur_v, cur_t))
865 }
866
867 pub fn get_spectral_density_data(&self) -> FftStatus {
868 let Some(spec) = self.current_selection() else {
869 return FftStatus::WaitingForSelection;
870 };
871 let Some(win) = self.window_aligned.as_ref() else {
872 return FftStatus::WaitingForSamples;
873 };
874 let stream_key = spec.stream_key();
875 let Some(md) = win.segment_metadata.get(&stream_key) else {
876 return FftStatus::WaitingForSamples;
877 };
878 if md.sampling_rate == 0 || md.decimation == 0 {
879 return FftStatus::InvalidSampleRate {
880 sampling_rate: md.sampling_rate,
881 decimation: md.decimation,
882 };
883 }
884 let sampling_hz = md.sampling_rate as f64 / md.decimation as f64;
885 if !sampling_hz.is_finite() || sampling_hz <= 0.0 {
886 return FftStatus::InvalidSampleRate {
887 sampling_rate: md.sampling_rate,
888 decimation: md.decimation,
889 };
890 }
891 let Some(batch) = win.columns.get(&spec) else {
892 return FftStatus::WaitingForSamples;
893 };
894
895 let signal: Vec<f64> = match batch {
896 ColumnBatch::F64(v) => v.clone(),
897 ColumnBatch::I64(v) => v.iter().map(|&x| x as f64).collect(),
898 ColumnBatch::U64(v) => v.iter().map(|&x| x as f64).collect(),
899 };
900
901 if signal.len() < MIN_FFT_SAMPLES {
902 return FftStatus::TooFewSamples {
903 have: signal.len(),
904 need: MIN_FFT_SAMPLES,
905 sampling_hz,
906 window_seconds: self.view.plot_window_seconds,
907 };
908 }
909
910 let (fft_signal, segment_size, hop_size) = latest_complete_welch_signal(&signal);
911
912 let mean_val = fft_signal.iter().sum::<f64>() / fft_signal.len() as f64;
913 let detrended: Vec<f64> = fft_signal.iter().map(|x| x - mean_val).collect();
914
915 let welch: SpectralDensity<f64> = SpectralDensity::builder(&detrended, sampling_hz).build();
916 let sd = welch.periodogram();
917 let pts: Vec<(f64, f64)> = sd
918 .frequency()
919 .into_iter()
920 .zip(sd.iter().copied())
921 .filter_map(|(f, d)| {
922 if f > 0.0 && d.is_finite() && d > 0.0 {
923 Some((f, d.sqrt()))
924 } else {
925 None
926 }
927 })
928 .collect();
929
930 if pts.is_empty() {
931 return FftStatus::NoValidFrequencyBins {
932 sample_count: fft_signal.len(),
933 sampling_hz,
934 };
935 }
936
937 let mut asd_values: Vec<f64> = pts.iter().map(|(_, d)| *d).collect();
938 asd_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
939 let median_asd = if asd_values.len() % 2 == 0 {
940 (asd_values[asd_values.len() / 2 - 1] + asd_values[asd_values.len() / 2]) / 2.0
941 } else {
942 asd_values[asd_values.len() / 2]
943 };
944
945 FftStatus::Ready(FftReadyData {
946 points: pts,
947 median_asd,
948 sample_count: fft_signal.len(),
949 total_sample_count: signal.len(),
950 sampling_hz,
951 segment_size,
952 hop_size,
953 })
954 }
955
956 pub fn get_focused_channel_info(&self) -> Option<(String, String)> {
957 let spec = self.current_selection()?;
958 let win = self.window_aligned.as_ref()?;
959 let meta = win.column_metadata.get(&spec)?;
960 Some((meta.description.clone(), meta.units.clone()))
961 }
962
963 pub fn tick_blink(&mut self) {
964 if self.last_blink.elapsed() >= Duration::from_millis(500) {
965 self.blink_state = !self.blink_state;
966 self.last_blink = Instant::now();
967 }
968 }
969}
970
971fn get_action(ev: Event, app: &mut MonitorState) -> Option<Action> {
972 if let Event::Key(k) = ev {
973 if k.kind != KeyEventKind::Press {
974 return None;
975 }
976 match app.mode {
977 Mode::Command => {
978 let route = app.current_route();
979 let registry = app.rpc_routes.get(&route).and_then(RouteRpcState::registry);
980 let routes = app.visible_routes();
981 match app
982 .palette
983 .handle_key(k, registry, &route, &routes, app.footer_height)
984 {
985 PaletteEvent::Submit(req) => Some(Action::ExecuteRpc(req)),
986 PaletteEvent::SelectRoute(r) => Some(Action::SelectRoute(r)),
987 PaletteEvent::Exit => Some(Action::SetMode(Mode::Normal)),
988 PaletteEvent::Consumed => None,
989 }
990 }
991 Mode::Normal => match k.code {
992 KeyCode::Char(':') => Some(Action::SetMode(Mode::Command)),
993 KeyCode::Char('q') => Some(Action::Quit),
994 KeyCode::Char('c') if k.modifiers == KeyModifiers::CONTROL => Some(Action::Quit),
995 KeyCode::Esc => Some(Action::ClosePlot),
996 KeyCode::Up => Some(Action::NavUp),
997 KeyCode::Down => Some(Action::NavDown),
998 KeyCode::Left => Some(Action::NavLeft),
999 KeyCode::Right => Some(Action::NavRight),
1000 KeyCode::BackTab => Some(Action::NavTabPrev),
1001 KeyCode::Tab => Some(Action::NavTabNext),
1002 KeyCode::PageUp => Some(Action::NavScroll(-10)),
1003 KeyCode::PageDown => Some(Action::NavScroll(10)),
1004 KeyCode::Home => Some(Action::NavHome),
1005 KeyCode::End => Some(Action::NavEnd),
1006 KeyCode::Enter => Some(Action::TogglePlot),
1007 KeyCode::Char('f') => Some(Action::ToggleFft),
1008 KeyCode::Char('h') => Some(Action::ToggleFooter),
1009 KeyCode::Char('r') => Some(Action::ToggleRoutes),
1010 KeyCode::Char('=') => Some(Action::AdjustWindow(PLOT_WINDOW_FINE_STEP_SECONDS)),
1011 KeyCode::Char('-') => Some(Action::AdjustWindow(-PLOT_WINDOW_FINE_STEP_SECONDS)),
1012 KeyCode::Char('+') => Some(Action::AdjustWindow(PLOT_WINDOW_COARSE_STEP_SECONDS)),
1013 KeyCode::Char('_') => Some(Action::AdjustWindow(-PLOT_WINDOW_COARSE_STEP_SECONDS)),
1014 KeyCode::Char('[') => Some(Action::AdjustPlotWidth(5)),
1015 KeyCode::Char(']') => Some(Action::AdjustPlotWidth(-5)),
1016 KeyCode::Char(',') | KeyCode::Char('<') => Some(Action::AdjustPrecision(-1)),
1017 KeyCode::Char('.') | KeyCode::Char('>') => Some(Action::AdjustPrecision(1)),
1018 _ => None,
1019 },
1020 }
1021 } else {
1022 None
1023 }
1024}
1025
1026fn draw_ui(terminal: &mut DefaultTerminal, app: &mut MonitorState) -> Result<(), io::Error> {
1027 terminal.draw(|f| {
1028 let size = f.area();
1029 let height = size.height;
1030
1031 let (main_area, footer_area) = {
1032 let (main_constraint, footer_constraint) = if app.mode == Mode::Command {
1033 if height >= 18 {
1034 (
1035 Constraint::Min(10),
1036 Constraint::Length(5 + app.palette.suggestion_rows()),
1037 )
1038 } else if height >= 12 {
1039 (Constraint::Min(2), Constraint::Length(8))
1040 } else if height >= 5 {
1041 (Constraint::Min(2), Constraint::Length(3))
1042 } else {
1043 (Constraint::Min(0), Constraint::Length(2))
1044 }
1045 } else if app.view.show_footer {
1046 (Constraint::Min(10), Constraint::Length(6))
1047 } else {
1048 (Constraint::Min(10), Constraint::Length(2))
1049 };
1050 let chunks = Layout::default()
1051 .direction(Direction::Vertical)
1052 .constraints([main_constraint, footer_constraint])
1053 .split(size);
1054 (chunks[0], Some(chunks[1]))
1055 };
1056
1057 let (left, right) = if app.mode == Mode::Command && height < 3 {
1058 (None, None)
1059 } else if app.view.show_plot {
1060 let chunks = Layout::default()
1061 .direction(Direction::Horizontal)
1062 .constraints([
1063 Constraint::Percentage(100 - app.view.plot_width_percent),
1064 Constraint::Percentage(app.view.plot_width_percent),
1065 ])
1066 .split(main_area);
1067 (Some(chunks[0]), Some(chunks[1]))
1068 } else {
1069 (Some(main_area), None)
1070 };
1071
1072 if let Some(l) = left {
1073 render_monitor_panel(f, app, l, Instant::now());
1074 }
1075 if let Some(r) = right {
1076 render_graphics_panel(f, app, r);
1077 }
1078 if let Some(foot) = footer_area {
1079 render_footer(f, app, foot);
1080 }
1081 })?;
1082 Ok(())
1083}
1084
1085fn render_monitor_panel(f: &mut Frame, app: &mut MonitorState, area: Rect, now: Instant) {
1086 let inner = Rect {
1087 x: area.x,
1088 y: area.y,
1089 width: area.width.saturating_sub(1),
1090 height: area.height,
1091 };
1092 let (lines, col_map) = build_left_lines(app, now);
1093 let total = lines.len();
1094 let view_h = inner.height as usize;
1095
1096 if app.view.follow_selection {
1097 if let Some(&line_idx) = col_map.get(&app.nav.idx) {
1098 if view_h > 0 && total > view_h {
1099 let cur = app.view.scroll as usize;
1100 if line_idx < cur || line_idx >= cur + view_h {
1101 app.view.scroll = line_idx
1102 .saturating_sub(view_h / 2)
1103 .min(total.saturating_sub(view_h))
1104 as u16;
1105 }
1106 } else {
1107 app.view.scroll = 0;
1108 }
1109 }
1110 }
1111 app.view.scroll = (app.view.scroll as usize).min(total.saturating_sub(view_h)) as u16;
1112 f.render_widget(Paragraph::new(lines).scroll((app.view.scroll, 0)), inner);
1113
1114 if total > view_h {
1115 let sb_area = Rect {
1116 x: area.x + area.width - 1,
1117 y: area.y,
1118 width: 1,
1119 height: area.height,
1120 };
1121 let track_len = view_h;
1122 let thumb_len = (track_len * track_len / total).max(1);
1123 let max_thumb_pos = track_len - thumb_len;
1124 let scroll_max = total - track_len;
1125 let thumb_pos = (app.view.scroll as usize * max_thumb_pos) / scroll_max;
1126
1127 for i in 0..track_len {
1128 let ch = if i >= thumb_pos && i < thumb_pos + thumb_len {
1129 "█"
1130 } else {
1131 "│"
1132 };
1133 f.render_widget(
1134 Paragraph::new(ch).style(Style::default().fg(Color::DarkGray)),
1135 Rect {
1136 x: sb_area.x,
1137 y: sb_area.y + i as u16,
1138 width: 1,
1139 height: 1,
1140 },
1141 );
1142 }
1143 }
1144}
1145
1146fn stale_threshold(sample: &Sample) -> Duration {
1147 let rate = sample.segment.sampling_rate as f64 / sample.segment.decimation.max(1) as f64;
1148 let period_ms = if rate > 0.0 { 1000.0 / rate } else { 0.0 };
1149 Duration::from_millis((period_ms * 2.0).max(1200.0) as u64)
1151}
1152
1153fn build_left_lines(
1154 app: &mut MonitorState,
1155 now: Instant,
1156) -> (Vec<Line<'static>>, HashMap<usize, usize>) {
1157 let mut lines = Vec::new();
1158 let mut map = HashMap::new();
1159
1160 let routes = app.visible_routes();
1161
1162 if routes.is_empty() {
1163 lines.push(Line::from("Waiting for data..."));
1164 return (lines, map);
1165 }
1166
1167 let mut global_idx = 0;
1168 app.view.desc_width = app
1169 .last
1170 .values()
1171 .flat_map(|(s, _)| s.columns.iter())
1172 .map(|c| c.desc.description.len())
1173 .max()
1174 .unwrap_or(0);
1175 app.view.units_width = app
1176 .last
1177 .values()
1178 .flat_map(|(s, _)| s.columns.iter())
1179 .map(|c| c.desc.units.len())
1180 .max()
1181 .unwrap_or(0);
1182
1183 for (dev_idx, route) in routes.iter().enumerate() {
1184 let dev = app.device_metadata.get(route).map(|m| m.device.as_ref());
1185
1186 let status = app.device_status.get(route);
1187 let is_alive = status
1188 .map(|s| s.is_alive(Duration::from_millis(300)))
1189 .unwrap_or(false);
1190
1191 let head_style = if dev_idx == app.current_device_index() {
1192 Style::default().add_modifier(Modifier::BOLD | Modifier::UNDERLINED)
1193 } else {
1194 Style::default().add_modifier(Modifier::BOLD)
1195 };
1196
1197 let header_text = if let Some(d) = dev {
1198 if d.serial_number.is_empty() {
1199 d.name.clone()
1200 } else {
1201 format!("{} Serial: {}", d.name, d.serial_number)
1202 }
1203 } else {
1204 format!("<{}>", route)
1205 };
1206
1207 let status_indicator = if is_alive { "●" } else { "○" };
1208 let status_color = if is_alive {
1209 Color::Green
1210 } else {
1211 Color::DarkGray
1212 };
1213
1214 let mut header_spans = vec![
1215 Span::styled(
1216 format!("{} ", status_indicator),
1217 Style::default().fg(status_color),
1218 ),
1219 Span::styled(header_text, head_style),
1220 ];
1221 if app.view.show_routes {
1222 header_spans.push(Span::raw(format!(" [{}]", route)));
1223 }
1224
1225 lines.push(Line::from(header_spans));
1226
1227 let mut stream_ids: Vec<_> = app
1228 .last
1229 .keys()
1230 .filter(|k| &k.route == route)
1231 .map(|k| k.stream_id)
1232 .collect();
1233 stream_ids.sort();
1234
1235 if stream_ids.is_empty() {
1236 map.insert(global_idx, lines.len());
1237 global_idx += 1;
1238
1239 lines.push(Line::from(Span::styled(
1240 " (no streams yet)",
1241 Style::default().fg(Color::DarkGray),
1242 )));
1243 }
1244
1245 for sid in stream_ids {
1246 let key = StreamKey::new(route.clone(), sid);
1247 if let Some((sample, seen)) = app.last.get(&key) {
1248 let is_stale = now.saturating_duration_since(*seen) > stale_threshold(sample);
1249 for col in &sample.columns {
1250 let nav_idx = global_idx;
1251 global_idx += 1;
1252 map.insert(nav_idx, lines.len());
1253
1254 let is_sel = app.nav.idx == nav_idx;
1255 let ctx = StyleContext::new()
1256 .stale(is_stale)
1257 .selected(is_sel)
1258 .plot_mode(app.view.show_plot);
1259
1260 let label_style = ctx.resolve();
1261 let (val_str, val_f64) = fmt_value(&col.value);
1262 let val_col = app
1263 .view
1264 .theme
1265 .get_value_color(&sample.stream.name, &col.desc.name, val_f64)
1266 .unwrap_or(Color::Reset);
1267 let val_style = ctx.color(val_col).resolve();
1268
1269 let mut desc = col.desc.description.clone();
1270 if desc.len() < app.view.desc_width {
1271 desc.push_str(&" ".repeat(app.view.desc_width - desc.len()));
1272 }
1273
1274 let units = col.desc.units.clone();
1275 let padded_units = if app.view.units_width > 0 && !units.is_empty() {
1276 format!("{:>width$}", units, width = app.view.units_width)
1277 } else if app.view.units_width > 0 {
1278 " ".repeat(app.view.units_width)
1279 } else {
1280 String::new()
1281 };
1282
1283 lines.push(Line::from(vec![
1284 Span::styled(desc, label_style),
1285 Span::raw(" "),
1286 Span::styled(val_str, val_style),
1287 Span::raw(" "),
1288 Span::styled(padded_units, val_style),
1289 ]));
1290 }
1291 }
1292 }
1293 lines.push(Line::from(""));
1294 }
1295 (lines, map)
1296}
1297
1298fn render_footer(f: &mut Frame, app: &mut MonitorState, area: Rect) {
1299 app.footer_height = area.height; if app.mode == Mode::Command {
1301 let route = app.current_route();
1302 let status = app.rpc_palette_status(&route);
1303 let registry = app.rpc_routes.get(&route).and_then(RouteRpcState::registry);
1304 app.palette
1305 .render(f, area, &route, registry, status, app.blink_state);
1306 return;
1307 }
1308
1309 if !app.view.show_footer {
1310 let minimal = Line::from(vec![
1311 Span::raw(" "),
1312 key_span("h"),
1313 Span::raw(" Toggle Footer"),
1314 ]);
1315 f.render_widget(
1316 Paragraph::new(vec![minimal]).block(
1317 Block::default()
1318 .borders(Borders::TOP)
1319 .border_style(Style::default().fg(Color::DarkGray)),
1320 ),
1321 area,
1322 );
1323 return;
1324 }
1325
1326 let mut navigation_spans = vec![
1327 Span::styled(
1328 " Navigation ",
1329 Style::default()
1330 .fg(Color::Cyan)
1331 .add_modifier(Modifier::BOLD),
1332 ),
1333 key_span("↑"),
1334 key_sep(),
1335 key_span("↓"),
1336 Span::raw(" All "),
1337 key_span("←"),
1338 key_sep(),
1339 key_span("→"),
1340 Span::raw(" Streams"),
1341 ];
1342
1343 if app.device_count() > 1 {
1344 navigation_spans.push(Span::raw(" "));
1345 navigation_spans.push(key_span("Tab"));
1346 navigation_spans.push(key_sep());
1347 navigation_spans.push(key_span("Shift+Tab"));
1348 navigation_spans.push(Span::raw(" Devices"));
1349 }
1350 let navigation_line = Line::from(navigation_spans);
1351
1352 let toggle_line = Line::from(vec![
1353 Span::styled(
1354 " Toggle ",
1355 Style::default()
1356 .fg(Color::Green)
1357 .add_modifier(Modifier::BOLD),
1358 ),
1359 key_span("Enter"),
1360 Span::raw(" Plot "),
1361 key_span("f"),
1362 Span::raw(" FFT "),
1363 key_span("h"),
1364 Span::raw(" Footer "),
1365 key_span("r"),
1366 Span::raw(" Routes "),
1367 key_span(":"),
1368 Span::raw(" Cmd"),
1369 ]);
1370
1371 let window_line = Line::from(vec![
1372 Span::styled(
1373 " Plot ",
1374 Style::default()
1375 .fg(Color::Yellow)
1376 .add_modifier(Modifier::BOLD),
1377 ),
1378 key_span("+"),
1379 key_sep(),
1380 key_span("-"),
1381 Span::raw(" Window (0.5s, Shift 5.0s) "),
1382 key_span("["),
1383 key_sep(),
1384 key_span("]"),
1385 Span::raw(" Plot Width "),
1386 key_span("<"),
1387 key_sep(),
1388 key_span(">"),
1389 Span::raw(" Plot Precision"),
1390 ]);
1391
1392 let scroll_line = Line::from(vec![
1393 Span::styled(
1394 " Scroll ",
1395 Style::default()
1396 .fg(Color::Magenta)
1397 .add_modifier(Modifier::BOLD),
1398 ),
1399 key_span("Home"),
1400 key_sep(),
1401 key_span("End"),
1402 key_sep(),
1403 key_span("PgUp"),
1404 key_sep(),
1405 key_span("PgDn"),
1406 ]);
1407
1408 let quit_line = Line::from(vec![
1409 Span::styled(
1410 " Quit ",
1411 Style::default().fg(Color::Red).add_modifier(Modifier::BOLD),
1412 ),
1413 key_span("q"),
1414 Span::raw(" / "),
1415 key_span("Ctrl+C"),
1416 Span::raw(" Quit"),
1417 ]);
1418
1419 let lines = vec![
1420 navigation_line,
1421 toggle_line,
1422 window_line,
1423 scroll_line,
1424 quit_line,
1425 ];
1426
1427 let block = Block::default()
1428 .borders(Borders::TOP)
1429 .border_style(Style::default().fg(Color::DarkGray))
1430 .title(Span::styled(
1431 " Controls ",
1432 Style::default().add_modifier(Modifier::BOLD),
1433 ));
1434
1435 f.render_widget(Paragraph::new(lines).block(block), area);
1436}
1437
1438fn key_span(text: &str) -> Span<'static> {
1439 Span::styled(
1440 format!(" {} ", text),
1441 Style::default()
1442 .fg(Color::White)
1443 .bg(Color::DarkGray)
1444 .add_modifier(Modifier::BOLD),
1445 )
1446}
1447
1448fn key_sep() -> Span<'static> {
1449 Span::raw(" ")
1450}
1451
1452fn latest_complete_welch_signal(signal: &[f64]) -> (&[f64], usize, usize) {
1453 let denominator =
1454 WELCH_DEFAULT_SEGMENTS as f64 * (1.0 - WELCH_DEFAULT_OVERLAP) + WELCH_DEFAULT_OVERLAP;
1455 let default_segment_size = (signal.len() as f64 / denominator).trunc().max(1.0) as usize;
1456
1457 if default_segment_size.next_power_of_two() <= WELCH_DFT_MAX_SIZE {
1458 let hop_size = default_segment_size
1459 - (default_segment_size as f64 * WELCH_DEFAULT_OVERLAP).round() as usize;
1460 return (signal, default_segment_size, hop_size.max(1));
1461 }
1462
1463 let segment_size = WELCH_DFT_MAX_SIZE;
1464 let hop_size = segment_size - (segment_size as f64 * WELCH_DEFAULT_OVERLAP).round() as usize;
1465 let segment_count = (signal.len() - segment_size) / hop_size + 1;
1466 let used_len = (segment_count - 1) * hop_size + segment_size;
1467
1468 (
1469 &signal[signal.len() - used_len..],
1470 segment_size,
1471 hop_size.max(1),
1472 )
1473}
1474
1475fn render_graphics_panel(f: &mut Frame, app: &MonitorState, area: Rect) {
1476 if let (Some(pos), Some((desc, units))) = (app.current_pos(), app.get_focused_channel_info()) {
1477 let route = pos.route();
1478 if app.view.show_fft {
1479 match app.get_spectral_density_data() {
1480 FftStatus::Ready(sd) => {
1481 let title = format!(
1482 "{} — {} ({:.1}s, FFT {} of {} samples, seg {}, hop {}) | Median ASD: {:.3e} {}/√Hz",
1483 route,
1484 desc,
1485 app.view.plot_window_seconds,
1486 sd.sample_count,
1487 sd.total_sample_count,
1488 sd.segment_size,
1489 sd.hop_size,
1490 sd.median_asd,
1491 units
1492 );
1493 let block = Block::default().title(title).borders(Borders::ALL);
1494
1495 if !sd.points.is_empty() {
1496 let log_data: Vec<(f64, f64)> = sd
1497 .points
1498 .iter()
1499 .map(|(freq, val)| (freq.log10(), val.log10()))
1500 .collect();
1501
1502 let min_f = log_data.first().map(|(f, _)| *f).unwrap_or(0.0);
1503 let max_f = log_data.last().map(|(f, _)| *f).unwrap_or(1.0);
1504 let ds: Vec<f64> = log_data.iter().map(|(_, d)| *d).collect();
1505 let min_d = ds.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1506 let max_d = ds.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1507
1508 let y_pad = if (max_d - min_d) > 0.1 {
1509 (max_d - min_d) * 0.1
1510 } else {
1511 0.5
1512 };
1513
1514 let dataset = Dataset::default()
1515 .name(desc.as_str())
1516 .marker(symbols::Marker::Braille)
1517 .style(Style::default().fg(Color::Cyan))
1518 .graph_type(GraphType::Line)
1519 .data(&log_data);
1520
1521 let chart = Chart::new(vec![dataset])
1522 .block(block)
1523 .x_axis(
1524 Axis::default()
1525 .title("Freq [Hz] (log)")
1526 .bounds([min_f, max_f])
1527 .labels(generate_log_labels(
1528 min_f,
1529 max_f,
1530 5,
1531 app.view.axis_precision,
1532 )),
1533 )
1534 .y_axis(
1535 Axis::default()
1536 .title(format!("Val [{}/√Hz]", units))
1537 .bounds([min_d - y_pad, max_d + y_pad])
1538 .labels(generate_log_labels(
1539 min_d - y_pad,
1540 max_d + y_pad,
1541 5,
1542 app.view.axis_precision,
1543 )),
1544 );
1545 f.render_widget(chart, area);
1546 } else {
1547 f.render_widget(Paragraph::new("No valid FFT data").block(block), area);
1548 }
1549 }
1550 FftStatus::TooFewSamples {
1551 have,
1552 need,
1553 sampling_hz,
1554 window_seconds,
1555 } => {
1556 let block = Block::default()
1557 .title(format!("FFT unavailable - {} samples needed", need))
1558 .borders(Borders::ALL);
1559 let message = format!(
1560 "Current FFT buffer has {} of {} samples. At {:.3} Hz, current window is {:.1}s.",
1561 have, need, sampling_hz, window_seconds
1562 );
1563 f.render_widget(Paragraph::new(message).block(block), area);
1564 }
1565 FftStatus::InvalidSampleRate {
1566 sampling_rate,
1567 decimation,
1568 } => {
1569 let block = Block::default()
1570 .title("FFT unavailable - invalid sample rate")
1571 .borders(Borders::ALL);
1572 let message = format!(
1573 "Stream metadata reports sampling_rate={} and decimation={}.",
1574 sampling_rate, decimation
1575 );
1576 f.render_widget(Paragraph::new(message).block(block), area);
1577 }
1578 FftStatus::NoValidFrequencyBins {
1579 sample_count,
1580 sampling_hz,
1581 } => {
1582 let block = Block::default()
1583 .title("FFT unavailable - no valid frequency bins")
1584 .borders(Borders::ALL);
1585 let message = format!(
1586 "Welch produced no positive finite bins from {} samples at {:.3} Hz.",
1587 sample_count, sampling_hz
1588 );
1589 f.render_widget(Paragraph::new(message).block(block), area);
1590 }
1591 FftStatus::WaitingForSelection => {
1592 let block = Block::default()
1593 .title("FFT unavailable - no channel selected")
1594 .borders(Borders::ALL);
1595 f.render_widget(
1596 Paragraph::new("Select a stream column to plot FFT.").block(block),
1597 area,
1598 );
1599 }
1600 FftStatus::WaitingForSamples => {
1601 let block = Block::default()
1602 .title("Buffering FFT...")
1603 .borders(Borders::ALL);
1604 f.render_widget(
1605 Paragraph::new("Waiting for samples in the selected window.").block(block),
1606 area,
1607 );
1608 }
1609 }
1610 } else {
1611 let title = format!(
1612 "{} — {} ({:.1}s)",
1613 route, desc, app.view.plot_window_seconds
1614 );
1615 let block = Block::default().title(title).borders(Borders::ALL);
1616
1617 if let Some((data, _, _)) = app.get_plot_data() {
1618 let min_t = data.first().map(|(t, _)| *t).unwrap_or(0.0);
1619 let max_t = data.last().map(|(t, _)| *t).unwrap_or(1.0);
1620 let vs: Vec<f64> = data.iter().map(|(_, v)| *v).collect();
1621 let min_v = vs.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1622 let max_v = vs.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1623
1624 let pad = if (max_v - min_v).abs() > 1e-10 {
1625 (max_v - min_v) * 0.4
1626 } else {
1627 1.0
1628 };
1629
1630 let dataset = Dataset::default()
1631 .name(desc.as_str())
1632 .marker(symbols::Marker::Braille)
1633 .style(Style::default().fg(Color::Green))
1634 .graph_type(GraphType::Line)
1635 .data(&data);
1636
1637 let chart = Chart::new(vec![dataset])
1638 .block(block)
1639 .x_axis(
1640 Axis::default()
1641 .title("Time [s]")
1642 .bounds([min_t, max_t])
1643 .labels(generate_linear_labels(
1644 min_t,
1645 max_t,
1646 3,
1647 app.view.axis_precision,
1648 )),
1649 )
1650 .y_axis(
1651 Axis::default()
1652 .title(format!("Value [{}]", units))
1653 .bounds([min_v - pad, max_v + pad])
1654 .labels(generate_linear_labels(
1655 min_v - pad,
1656 max_v + pad,
1657 5,
1658 app.view.axis_precision,
1659 )),
1660 );
1661 f.render_widget(chart, area);
1662 } else {
1663 f.render_widget(Paragraph::new("Buffering...").block(block), area);
1664 }
1665 }
1666 } else {
1667 f.render_widget(
1668 Block::default()
1669 .title("Channel Detail")
1670 .borders(Borders::ALL),
1671 area,
1672 );
1673 }
1674}
1675
1676fn generate_linear_labels(
1677 min: f64,
1678 max: f64,
1679 count: usize,
1680 precision: usize,
1681) -> Vec<Span<'static>> {
1682 if count < 2 {
1683 return vec![];
1684 }
1685 let step = (max - min) / ((count - 1) as f64);
1686 (0..count)
1687 .map(|i| {
1688 let v = min + (i as f64 * step);
1689 Span::from(format!("{:>10.p$}", v, p = precision))
1690 })
1691 .collect()
1692}
1693
1694fn generate_log_labels(
1695 min_log: f64,
1696 max_log: f64,
1697 count: usize,
1698 precision: usize,
1699) -> Vec<Span<'static>> {
1700 if count < 2 {
1701 return vec![];
1702 }
1703 let step = (max_log - min_log) / ((count - 1) as f64);
1704 let max_val = 10f64.powf(max_log.max(min_log)).abs();
1705 let use_scientific = max_val < 0.01 || max_val >= 1000.0;
1706
1707 (0..count)
1708 .map(|i| {
1709 let log_val = min_log + (i as f64 * step);
1710 let real_val = 10f64.powf(log_val);
1711
1712 let s = if use_scientific {
1713 format!("{:.p$e}", real_val, p = precision)
1714 } else {
1715 format!("{:.p$}", real_val, p = precision)
1716 };
1717 Span::from(format!("{:>10}", s))
1718 })
1719 .collect()
1720}
1721
1722fn fmt_value(v: &ColumnData) -> (String, f64) {
1723 match v {
1724 ColumnData::Float(x) => (format!("{:15.4}", x), *x as f64),
1725 ColumnData::Int(x) => (format!("{:15}", x), *x as f64),
1726 ColumnData::UInt(x) => (format!("{:15}", x), *x as f64),
1727 _ => (" type?".to_string(), f64::NAN),
1728 }
1729}
1730
1731fn load_theme(path: &str) -> io::Result<Theme> {
1732 let mut s = String::new();
1733 File::open(path)?.read_to_string(&mut s)?;
1734 let doc =
1735 DocumentMut::from_str(&s).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1736 let mut bounds = HashMap::new();
1737 for (k, v) in doc.get_values() {
1738 let col = k.iter().map(|k| k.get()).collect::<Vec<_>>().join(".");
1739 if let Value::InlineTable(it) = v {
1740 let (t, min) = if let Some(v) = get_num(it, "cold") {
1741 (true, v)
1742 } else {
1743 (false, get_num(it, "min").unwrap_or(f64::NEG_INFINITY))
1744 };
1745 let max = if let Some(v) = get_num(it, "hot") {
1746 v
1747 } else {
1748 get_num(it, "max").unwrap_or(f64::INFINITY)
1749 };
1750 bounds.insert(col, (min..=max, t));
1751 }
1752 }
1753 Ok(Theme {
1754 value_bounds: bounds,
1755 })
1756}
1757
1758fn get_num(it: &InlineTable, k: &str) -> Option<f64> {
1759 it.get(k)
1760 .and_then(|v| v.as_float().or(v.as_integer().map(|i| i as f64)))
1761}
1762fn run_monitor_app(config: MonitorConfig) -> eyre::Result<()> {
1763 use eyre::WrapErr;
1764
1765 let MonitorConfig {
1766 tio,
1767 fps,
1768 colors,
1769 depth,
1770 } = config;
1771
1772 let proxy = tio::proxy::Interface::new(&tio.root);
1773 let parent_route: DeviceRoute = tio.route.clone();
1774
1775 let tree = DeviceTree::open(&proxy, parent_route.clone())
1776 .wrap_err_with(|| format!("could not open device tree on {}", tio.root))
1777 .with_proxy_help()?;
1778 let data_rx = spawn_tree_worker(tree);
1779
1780 let rpc_client = RpcClient::open(&proxy, parent_route.clone())
1781 .wrap_err_with(|| format!("could not open RPC client on {}", tio.root))
1782 .with_proxy_help()?;
1783 let (rpc_tx, rpc_resp_rx) = spawn_rpc_worker(rpc_client);
1784
1785 let (key_tx, key_rx) = channel::unbounded();
1787 std::thread::spawn(move || loop {
1788 if let Ok(ev) = event::read() {
1789 if key_tx.send(ev).is_err() {
1790 return;
1791 }
1792 }
1793 });
1794
1795 let mut app = MonitorState::new(depth, &parent_route);
1797 if let Some(path) = &colors {
1798 if let Ok(theme) = load_theme(path) {
1799 app.view.theme = theme;
1800 } else {
1801 eprintln!("Failed to load theme");
1802 }
1803 }
1804
1805 let mut buffer = Buffer::new(MONITOR_BUFFER_CAPACITY_SAMPLES);
1806
1807 let mut term = ratatui::init();
1809 let _ = term.hide_cursor();
1810 let ui_tick = channel::tick(Duration::from_millis(1000 / fps as u64));
1811 let mut stream_error = None;
1812
1813 'main: loop {
1814 crossbeam::select! {
1815 recv(data_rx) -> item => {
1816 match item {
1817 Ok(Ok(TreeItem::Sample(sample, route))) => {
1818 app.handle_sample(sample, route, &mut buffer);
1819 }
1820 Ok(Ok(TreeItem::Event(event))) => {
1821 app.handle_event(event, &rpc_tx);
1822 }
1823 Ok(Err(e)) => {
1824 stream_error = Some(e);
1825 break 'main;
1826 }
1827 Err(_) => break 'main,
1828 }
1829 }
1830
1831 recv(key_rx) -> ev => {
1832 if let Ok(ev) = ev {
1833 if let Some(act) = get_action(ev, &mut app) {
1834 if app.update(act, &rpc_tx) {
1835 break 'main;
1836 }
1837 }
1838 }
1839 }
1840
1841 recv(rpc_resp_rx) -> resp => {
1842 if let Ok(resp) = resp {
1843 match resp {
1844 RpcWorkerResp::List(list) => {
1845 app.update_rpclists(list);
1846 }
1847 RpcWorkerResp::ListErr { route, error } => {
1848 app.update_rpclist_error(route, error);
1849 }
1850 RpcWorkerResp::RpcResult(res) => {
1851 let (msg, col) = match res.result {
1852 Ok(s) => (
1853 format!("{}: {}", app.palette.last_rpc_command(), s),
1854 Color::Green,
1855 ),
1856 Err(s) => (format!("ERR: {}", s), Color::Red),
1857 };
1858 app.palette.set_rpc_result(msg, col);
1859 }
1860 }
1861 }
1862 }
1863
1864 recv(ui_tick) -> _ => {
1865 app.update_plot_window(&buffer);
1866 app.rebuild_nav_items();
1867 app.tick_blink();
1868
1869 if draw_ui(&mut term, &mut app).is_err() {
1870 break 'main;
1871 }
1872 }
1873 }
1874 }
1875
1876 ratatui::restore();
1877 if let Some(e) = stream_error {
1878 use color_eyre::Help;
1879 return Err(eyre::Report::new(e))
1880 .wrap_err("lost connection to data source")
1881 .suggestion("the data source went away (proxy exited or device disconnected); restart it and re-run this command");
1882 }
1883 Ok(())
1884}