1use crate::tui::rpc_palette::{PaletteEvent, RpcPalette, RpcPaletteStatus, RpcReq};
10use crate::tui::rpc_state::RouteRpcState;
11use crate::tui::rpc_worker::{spawn_rpc_worker, RpcWorkerReq, RpcWorkerResp};
12use crate::tui::tree_worker::spawn_tree_worker;
13use crate::{HealthCli, ProxyHelp};
14use chrono::{DateTime, Local};
15use crossbeam::channel::{self, Sender};
16use ratatui::{
17 crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
18 layout::{Constraint, Direction, Layout},
19 style::{Color, Modifier, Style},
20 text::{Line, Span},
21 widgets::{Block, Borders, Cell, Paragraph, Row, Table, TableState},
22 Terminal,
23};
24use std::{
25 collections::{BTreeMap, HashMap, VecDeque},
26 io,
27 time::{Duration, Instant, SystemTime},
28};
29use twinleaf::{
30 data::{BoundaryReason, StreamKey},
31 device::{DeviceEvent, DeviceRoute, DeviceTree, RpcClient, RpcList, TreeEvent, TreeItem},
32 tio,
33};
34
35pub fn run_health(config: HealthConfig) -> eyre::Result<()> {
36 run_health_app(config)
37}
38
39#[derive(Debug, Clone)]
40pub struct HealthConfig {
41 tio: crate::TioOpts,
42 jitter_window: u64,
43 event_log_size: usize,
44 event_display_lines: u16,
45 warnings_only: bool,
46 stale_dur: Duration,
47 ppm_warn: f64,
48 ppm_err: f64,
49 streams: Option<Vec<u8>>,
50 quiet: bool,
51 fps: u64,
52}
53
54impl From<HealthCli> for HealthConfig {
55 fn from(cli: HealthCli) -> Self {
56 let stale_dur = cli.stale_dur();
57 Self {
58 tio: cli.tio,
59 jitter_window: cli.jitter_window,
60 event_log_size: cli.event_log_size as usize,
61 event_display_lines: cli.event_display_lines,
62 warnings_only: cli.warnings_only,
63 stale_dur,
64 ppm_warn: cli.ppm_warn,
65 ppm_err: cli.ppm_err,
66 streams: cli.streams,
67 quiet: cli.quiet,
68 fps: cli.fps,
69 }
70 }
71}
72
73#[derive(Default)]
74struct DeviceState {
75 last_heartbeat: Option<Instant>,
76 heartbeat_toggle: bool,
77}
78
79impl DeviceState {
80 fn heartbeat_char(&self, now: Instant) -> char {
81 let fresh = self
82 .last_heartbeat
83 .map(|t| now.duration_since(t) < Duration::from_millis(500))
84 .unwrap_or(false);
85
86 if !fresh {
87 '♡' } else if self.heartbeat_toggle {
89 '♥' } else {
91 '♡' }
93 }
94
95 fn on_heartbeat(&mut self, now: Instant) {
96 self.last_heartbeat = Some(now);
97 self.heartbeat_toggle = !self.heartbeat_toggle;
98 }
99}
100
101struct TimeWindow {
102 buf: Vec<f64>,
103 cap: usize,
104 idx: usize,
105 filled: bool,
106}
107
108impl TimeWindow {
109 fn new(seconds: u64, hz_guess: f64) -> Self {
110 let cap = ((seconds as f64 * hz_guess).round() as usize).max(16);
111 Self {
112 buf: vec![0.0; cap],
113 cap,
114 idx: 0,
115 filled: false,
116 }
117 }
118
119 fn push(&mut self, v: f64) {
120 self.buf[self.idx] = v;
121 self.idx = (self.idx + 1) % self.cap;
122 if self.idx == 0 {
123 self.filled = true;
124 }
125 }
126
127 fn std_ms(&self) -> f64 {
128 let n = if self.filled { self.cap } else { self.idx };
129 if n == 0 {
130 return 0.0;
131 }
132 let mean: f64 = self.buf[..n].iter().sum::<f64>() / (n as f64);
133 let var: f64 = self.buf[..n]
134 .iter()
135 .map(|x| (x - mean).powi(2))
136 .sum::<f64>()
137 / (n as f64);
138 var.sqrt()
139 }
140}
141
142struct OnlineSlope {
143 n: u64,
144 sum_x: f64,
145 sum_y: f64,
146 sum_xx: f64,
147 sum_xy: f64,
148 x0: f64,
149 y0: f64,
150}
151
152impl Default for OnlineSlope {
153 fn default() -> Self {
154 Self {
155 n: 0,
156 sum_x: 0.0,
157 sum_y: 0.0,
158 sum_xx: 0.0,
159 sum_xy: 0.0,
160 x0: 0.0,
161 y0: 0.0,
162 }
163 }
164}
165
166impl OnlineSlope {
167 fn push(&mut self, x: f64, y: f64) {
168 if self.n == 0 {
169 self.x0 = x;
170 self.y0 = y;
171 }
172 let dx = x - self.x0;
173 let dy = y - self.y0;
174 self.n += 1;
175 self.sum_x += dx;
176 self.sum_y += dy;
177 self.sum_xx += dx * dx;
178 self.sum_xy += dx * dy;
179 }
180
181 fn slope(&self) -> Option<f64> {
182 if self.n < 2 {
183 return None;
184 }
185 let denom = self.n as f64 * self.sum_xx - self.sum_x * self.sum_x;
186 if denom.abs() < f64::EPSILON {
187 return None;
188 }
189 Some((self.n as f64 * self.sum_xy - self.sum_x * self.sum_y) / denom)
190 }
191
192 fn reset(&mut self) {
193 *self = Self::default();
194 }
195}
196
197const MIN_DRIFT_SAMPLES: u64 = 50;
198
199#[derive(Default)]
200struct StreamStats {
201 host_epoch: Option<Instant>,
202
203 drift_slope: OnlineSlope,
204 drift_s: f64,
205 ppm: f64,
206
207 last_host: Option<Instant>,
208 last_data: Option<f64>,
209 jitter_ms: f64,
210 jitter_window: Option<TimeWindow>,
211
212 last_n: Option<u32>,
213 samples_dropped: u64,
214 current_session_id: Option<u32>,
215
216 rate_slope: OnlineSlope,
217 received_count: u64,
218 rate_smps: f64,
219
220 name: String,
221 last_seen: Option<Instant>,
222}
223
224impl StreamStats {
225 fn on_sample(&mut self, sample_n: u32, t_data: f64, now: Instant, jitter_window_s: u64) {
226 if self.host_epoch.is_none() {
227 self.host_epoch = Some(now);
228 }
229 let host_time = now.duration_since(self.host_epoch.unwrap()).as_secs_f64();
230
231 if self.jitter_window.is_none() {
233 self.jitter_window = Some(TimeWindow::new(jitter_window_s, 100.0));
234 }
235 if let (Some(lh), Some(ld)) = (self.last_host, self.last_data) {
236 let dh = now.duration_since(lh).as_secs_f64();
237 let dd = t_data - ld;
238 if let Some(w) = &mut self.jitter_window {
239 w.push((dd - dh) * 1000.0);
240 self.jitter_ms = w.std_ms();
241 }
242 }
243 self.last_host = Some(now);
244 self.last_data = Some(t_data);
245
246 self.drift_slope.push(host_time, t_data);
248 if self.drift_slope.n >= MIN_DRIFT_SAMPLES {
249 if let Some(beta) = self.drift_slope.slope() {
250 let host_elapsed = host_time - self.drift_slope.x0;
251 self.drift_s = (beta - 1.0) * host_elapsed;
252 self.ppm = (beta - 1.0) * 1e6;
253 }
254 }
255
256 self.received_count += 1;
258 self.rate_slope.push(host_time, self.received_count as f64);
259 if let Some(slope) = self.rate_slope.slope() {
260 self.rate_smps = slope;
261 }
262
263 self.last_n = Some(sample_n);
264 }
265
266 fn reset_timing(&mut self) {
267 self.drift_slope.reset();
268 self.drift_s = 0.0;
269 self.ppm = 0.0;
270 self.last_host = None;
271 self.last_data = None;
272 self.jitter_ms = 0.0;
273 self.jitter_window = None;
274 self.last_n = None;
275 }
276
277 fn reset_for_new_session(&mut self, session_id: u32) {
278 self.reset_timing();
279 self.samples_dropped = 0;
280 self.current_session_id = Some(session_id);
281 }
282
283 fn reset_all(&mut self) {
284 self.reset_timing();
285 self.rate_slope.reset();
286 self.received_count = 0;
287 self.rate_smps = 0.0;
288 self.host_epoch = None;
289 self.samples_dropped = 0;
290 }
291
292 fn stale_threshold(&self, floor: Duration) -> Duration {
294 if self.rate_slope.n >= 2 && self.rate_smps > 0.0 {
295 let period = Duration::from_secs_f64(2.0 / self.rate_smps);
296 std::cmp::max(floor, period)
297 } else {
298 floor
299 }
300 }
301
302 fn is_stale(&self, now: Instant, floor: Duration) -> bool {
303 let threshold = self.stale_threshold(floor);
304 self.last_seen
305 .map(|t| now.duration_since(t) > threshold)
306 .unwrap_or(true)
307 }
308}
309
310#[derive(Clone)]
311struct LoggedEvent {
312 timestamp: SystemTime,
313 event: String,
314 color: Color,
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318enum Mode {
319 Normal,
320 Command,
321}
322
323enum Action {
324 Quit,
325 ToggleHeartbeat,
326 TogglePpm,
327 ToggleSampleTime,
328 ScrollUp,
329 ScrollDown,
330 PageUp,
331 PageDown,
332 ScrollHome,
333 ScrollEnd,
334 SetMode(Mode),
335 ExecuteRpc(RpcReq),
336 SelectRoute(DeviceRoute),
337 ResetStats,
338 ClearLog,
339}
340
341struct HealthState {
342 stats: BTreeMap<StreamKey, StreamStats>,
343 device_states: HashMap<DeviceRoute, DeviceState>,
344 event_log: VecDeque<LoggedEvent>,
345 event_scroll_offset: usize,
346 show_heartbeat: bool,
347 show_ppm: bool,
348 show_sample_time: bool,
349 streams_filter: Option<Vec<u8>>,
350 jitter_window_s: u64,
351 event_log_cap: usize,
352 event_display_lines: usize,
353 warnings_only: bool,
354 stale_dur: Duration,
355 ppm_warn: f64,
356 ppm_err: f64,
357 quiet: bool,
358 mode: Mode,
359 palette: RpcPalette,
360 palette_route: Option<DeviceRoute>,
361 rpc_routes: HashMap<DeviceRoute, RouteRpcState>,
362 footer_height: u16,
363 session_start: Instant,
364}
365
366impl HealthState {
367 fn new(config: &HealthConfig) -> Self {
368 Self {
369 stats: BTreeMap::new(),
370 device_states: HashMap::new(),
371 event_log: VecDeque::new(),
372 event_scroll_offset: 0,
373 show_heartbeat: false,
374 show_ppm: true,
375 show_sample_time: true,
376 streams_filter: config.streams.clone(),
377 jitter_window_s: config.jitter_window,
378 event_log_cap: config.event_log_size,
379 event_display_lines: config.event_display_lines as usize,
380 warnings_only: config.warnings_only,
381 stale_dur: config.stale_dur,
382 ppm_warn: config.ppm_warn,
383 ppm_err: config.ppm_err,
384 quiet: config.quiet,
385 mode: Mode::Normal,
386 palette: RpcPalette::default(),
387 palette_route: None,
388 rpc_routes: HashMap::new(),
389 footer_height: 0,
390 session_start: Instant::now(),
391 }
392 }
393
394 fn active_route<'a>(&'a self, root_route: &'a DeviceRoute) -> &'a DeviceRoute {
395 self.palette_route.as_ref().unwrap_or(root_route)
396 }
397
398 fn available_routes(&self, root_route: &DeviceRoute) -> Vec<DeviceRoute> {
399 let mut routes: Vec<DeviceRoute> = self.device_states.keys().cloned().collect();
400 if !routes.contains(root_route) {
401 routes.push(root_route.clone());
402 }
403 routes.sort();
404 routes
405 }
406
407 fn rpc_palette_status(&self, route: &DeviceRoute) -> RpcPaletteStatus {
408 self.rpc_routes
409 .get(route)
410 .map(RouteRpcState::palette_status)
411 .unwrap_or(RpcPaletteStatus::WaitingForRpc)
412 }
413
414 fn update_palette_suggestions_for(&mut self, route: &DeviceRoute, root_route: &DeviceRoute) {
415 if self.mode == Mode::Command && self.active_route(root_route) == route {
416 let registry = self.rpc_routes.get(route).and_then(RouteRpcState::registry);
417 self.palette.update_suggestions(registry);
418 }
419 }
420
421 fn log_event(&mut self, msg: String, color: Color) {
422 self.event_log.push_front(LoggedEvent {
423 timestamp: SystemTime::now(),
424 event: msg,
425 color,
426 });
427 if self.event_log.len() > self.event_log_cap {
428 self.event_log.pop_back();
429 }
430 }
431
432 fn filtered_event_count(&self) -> usize {
433 self.event_log
434 .iter()
435 .filter(|e| !self.warnings_only || matches!(e.color, Color::Red | Color::Yellow))
436 .count()
437 }
438
439 fn handle_sample(&mut self, sample: twinleaf::data::Sample, route: DeviceRoute, now: Instant) {
440 let sid = sample.stream.stream_id;
441
442 if let Some(filter) = &self.streams_filter {
443 if !filter.contains(&sid) {
444 return;
445 }
446 }
447
448 let key = StreamKey::new(route.clone(), sid);
449 let st = self.stats.entry(key).or_insert_with(|| StreamStats {
450 name: sample.stream.name.clone(),
451 current_session_id: Some(sample.device.session_id),
452 ..Default::default()
453 });
454
455 st.name = sample.stream.name.clone();
456
457 if let Some(boundary) = &sample.boundary {
458 self.handle_boundary(&boundary.reason, &route, &sample.stream.name, sid);
459 }
460
461 let st = self.stats.get_mut(&StreamKey::new(route, sid)).unwrap();
462 if st.last_n.map(|n| sample.n != n).unwrap_or(true) {
463 st.last_seen = Some(now);
464 }
465
466 st.on_sample(sample.n, sample.timestamp_end(), now, self.jitter_window_s);
467 }
468
469 fn handle_boundary(
470 &mut self,
471 reason: &BoundaryReason,
472 route: &DeviceRoute,
473 stream_name: &str,
474 stream_id: u8,
475 ) {
476 let key = StreamKey::new(route.clone(), stream_id);
477 match reason {
478 BoundaryReason::Initial => {
479 self.log_event(
480 format!("[{}/{}] STREAM STARTED", route, stream_name),
481 Color::Green,
482 );
483 }
484 BoundaryReason::SessionChanged { old, new } => {
485 self.log_event(
486 format!("[{}/{}] SESSION: {} → {}", route, stream_name, old, new),
487 Color::Green,
488 );
489 if let Some(st) = self.stats.get_mut(&key) {
490 st.reset_for_new_session(*new);
491 }
492 }
493 BoundaryReason::SamplesLost { expected, received } => {
494 let count = received.wrapping_sub(*expected);
495 self.log_event(
496 format!("[{}/{}] DROPPED: {} samples", route, stream_name, count),
497 Color::Red,
498 );
499 if let Some(st) = self.stats.get_mut(&key) {
500 st.samples_dropped += count as u64;
501 }
502 }
503 BoundaryReason::TimeBackward { gap_seconds } => {
504 self.log_event(
505 format!(
506 "[{}/{}] TIME BACKWARD: {:.3}s",
507 route, stream_name, gap_seconds
508 ),
509 Color::Yellow,
510 );
511 }
512 BoundaryReason::RateChanged { old_rate, new_rate } => {
513 self.log_event(
514 format!(
515 "[{}/{}] RATE: {:.1} → {:.1} Hz",
516 route, stream_name, old_rate, new_rate
517 ),
518 Color::Yellow,
519 );
520 if let Some(st) = self.stats.get_mut(&key) {
521 st.reset_timing();
522 st.rate_slope.reset();
523 st.received_count = 0;
524 st.rate_smps = 0.0;
525 }
526 }
527 BoundaryReason::TimeRefSessionChanged { old, new } => {
528 self.log_event(
529 format!("[{}/{}] TIME REF: {} → {}", route, stream_name, old, new),
530 Color::Yellow,
531 );
532 if let Some(st) = self.stats.get_mut(&key) {
533 st.reset_timing();
534 }
535 }
536 BoundaryReason::SegmentRollover { old_id, new_id } => {
537 self.log_event(
538 format!(
539 "[{}/{}] SEGMENT: {} → {}",
540 route, stream_name, old_id, new_id
541 ),
542 Color::Green,
543 );
544 }
545 BoundaryReason::SegmentChanged { old_id, new_id } => {
546 self.log_event(
547 format!(
548 "[{}/{}] SEGMENT CHANGED: {} → {}",
549 route, stream_name, old_id, new_id
550 ),
551 Color::Yellow,
552 );
553 if let Some(st) = self.stats.get_mut(&key) {
554 st.reset_timing();
555 }
556 }
557 }
558 }
559
560 fn handle_event(&mut self, event: TreeEvent, now: Instant, rpc_tx: &Sender<RpcWorkerReq>) {
561 match event {
562 TreeEvent::RouteDiscovered(route) => {
563 self.device_states.entry(route.clone()).or_default();
564 self.log_event(format!("[{}] ROUTE DISCOVERED", route), Color::Green);
565 if self
566 .rpc_routes
567 .entry(route.clone())
568 .or_default()
569 .on_route_discovered()
570 {
571 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
572 }
573 }
574 TreeEvent::Device {
575 route,
576 event: DeviceEvent::Heartbeat { session_id },
577 } => {
578 if self
579 .rpc_routes
580 .entry(route.clone())
581 .or_default()
582 .on_heartbeat(session_id)
583 {
584 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
585 }
586 self.device_states
587 .entry(route)
588 .or_default()
589 .on_heartbeat(now);
590 }
591 TreeEvent::Device {
592 route,
593 event: DeviceEvent::Status(status),
594 } => {
595 self.log_event(format!("[{}] STATUS: {:?}", route, status), Color::Yellow);
596 if self
597 .rpc_routes
598 .entry(route.clone())
599 .or_default()
600 .on_status(status)
601 {
602 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
603 }
604 if matches!(status, tio::proto::ProxyStatus::SensorDisconnected) {
605 for (key, st) in self.stats.iter_mut() {
606 if key.route == route {
607 st.reset_timing();
608 st.rate_slope.reset();
609 st.received_count = 0;
610 st.rate_smps = 0.0;
611 st.host_epoch = None;
612 }
613 }
614 }
615 }
616 TreeEvent::Device {
617 route,
618 event: DeviceEvent::RpcInvalidated(method),
619 } => {
620 self.log_event(
621 format!("[{}] RPC INVALIDATED: {:?}", route, method),
622 Color::Cyan,
623 );
624 }
625 TreeEvent::Device {
626 route,
627 event: DeviceEvent::MetadataReady(metadata),
628 } => {
629 self.log_event(
630 format!("[{}] METADATA READY: {}", route, metadata.device.name),
631 Color::Green,
632 );
633 }
634 TreeEvent::Device {
635 route,
636 event: DeviceEvent::NewHash(hash),
637 } => {
638 self.log_event(format!("[{}] NEW HASH: {:?}", route, hash), Color::Green);
639 if self
640 .rpc_routes
641 .entry(route.clone())
642 .or_default()
643 .on_new_hash(hash)
644 {
645 let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
646 }
647 }
648 }
649 }
650
651 fn tick(&mut self, now: Instant) {
652 for (_key, st) in self.stats.iter_mut() {
653 if st.is_stale(now, self.stale_dur) && st.rate_slope.n >= 2 {
654 st.reset_timing();
655 st.rate_slope.reset();
656 st.received_count = 0;
657 st.rate_smps = 0.0;
658 st.host_epoch = None;
659 }
660 }
661 }
662
663 fn update(
664 &mut self,
665 action: Action,
666 root_route: &DeviceRoute,
667 rpc_tx: &Sender<RpcWorkerReq>,
668 ) -> bool {
669 let total = self.filtered_event_count();
670 let display_count = self.event_display_lines;
671 match action {
672 Action::Quit => return true,
673 Action::ToggleHeartbeat => self.show_heartbeat = !self.show_heartbeat,
674 Action::TogglePpm => self.show_ppm = !self.show_ppm,
675 Action::ToggleSampleTime => self.show_sample_time = !self.show_sample_time,
676 Action::ScrollUp => {
677 self.event_scroll_offset = self.event_scroll_offset.saturating_sub(1);
678 }
679 Action::ScrollDown => {
680 if total > display_count {
681 self.event_scroll_offset =
682 (self.event_scroll_offset + 1).min(total.saturating_sub(display_count));
683 }
684 }
685 Action::PageUp => {
686 self.event_scroll_offset = self.event_scroll_offset.saturating_sub(display_count);
687 }
688 Action::PageDown => {
689 if total > display_count {
690 self.event_scroll_offset = (self.event_scroll_offset + display_count)
691 .min(total.saturating_sub(display_count));
692 }
693 }
694 Action::ScrollHome => {
695 self.event_scroll_offset = 0;
696 }
697 Action::ScrollEnd => {
698 if total > display_count {
699 self.event_scroll_offset = total.saturating_sub(display_count);
700 }
701 }
702 Action::SetMode(Mode::Command) => {
703 let active = self.active_route(root_route).clone();
704 let registry = self
705 .rpc_routes
706 .get(&active)
707 .and_then(RouteRpcState::registry);
708 self.palette.enter(registry);
709 self.mode = Mode::Command;
710 }
711 Action::SetMode(Mode::Normal) => {
712 self.mode = Mode::Normal;
713 self.palette.exit();
714 }
715 Action::ExecuteRpc(req) => {
716 let _ = rpc_tx.send(RpcWorkerReq::Execute(req));
717 }
718 Action::SelectRoute(route) => {
719 self.palette_route = Some(route.clone());
720 let registry = self
721 .rpc_routes
722 .get(&route)
723 .and_then(RouteRpcState::registry);
724 self.palette.update_suggestions(registry);
725 }
726 Action::ResetStats => {
727 for st in self.stats.values_mut() {
728 st.reset_all();
729 }
730 }
731 Action::ClearLog => {
732 self.event_log.clear();
733 self.event_scroll_offset = 0;
734 }
735 }
736 false
737 }
738
739 fn update_rpclists(&mut self, list: RpcList, root_route: &DeviceRoute) {
740 let route = list.route.clone();
741 self.rpc_routes
742 .entry(route.clone())
743 .or_default()
744 .on_fetch_success(&list);
745 self.update_palette_suggestions_for(&route, root_route);
746 }
747
748 fn update_rpclist_error(
749 &mut self,
750 route: DeviceRoute,
751 error: String,
752 root_route: &DeviceRoute,
753 ) {
754 self.rpc_routes
755 .entry(route.clone())
756 .or_default()
757 .on_fetch_error(error);
758 self.update_palette_suggestions_for(&route, root_route);
759 }
760}
761
762struct DisplayRow {
763 route: String,
764 stream_id: u8,
765 name: String,
766 rate_smps: f64,
767 drift_s: f64,
768 ppm: f64,
769 jitter_ms: f64,
770 samples_dropped: u64,
771 last_n: Option<u32>,
772 last_data: Option<f64>,
773 elapsed_time: Option<f64>,
774 status: &'static str,
775 color: Color,
776}
777
778impl DisplayRow {
779 fn from_stats(
780 route: String,
781 stream_id: u8,
782 st: &StreamStats,
783 now: Instant,
784 stale_dur: Duration,
785 ppm_warn: f64,
786 ppm_err: f64,
787 ) -> Self {
788 let stale = st.is_stale(now, stale_dur);
789 let (color, status) = if stale {
790 (Color::DarkGray, "STALLED")
791 } else if st.ppm.abs() >= ppm_err {
792 (Color::Red, "ERROR")
793 } else if st.ppm.abs() >= ppm_warn {
794 (Color::Yellow, "WARN")
795 } else {
796 (Color::Green, "OK")
797 };
798
799 let elapsed_time = st
800 .host_epoch
801 .map(|epoch| now.duration_since(epoch).as_secs_f64());
802
803 DisplayRow {
804 route,
805 stream_id,
806 name: st.name.clone(),
807 rate_smps: st.rate_smps,
808 drift_s: st.drift_s,
809 ppm: st.ppm,
810 jitter_ms: st.jitter_ms,
811 samples_dropped: st.samples_dropped,
812 last_n: st.last_n,
813 last_data: st.last_data,
814 elapsed_time,
815 status,
816 color,
817 }
818 }
819
820 fn to_table_row(&self, show_ppm: bool, show_sample_time: bool) -> Row<'static> {
821 let style = Style::default().fg(self.color);
822 let drift_cell = if show_ppm {
823 Cell::from(format!("{:.2}", self.ppm))
824 } else {
825 Cell::from(format!("{:.4}", self.drift_s))
826 };
827 let time_cell = if show_sample_time {
828 Cell::from(format!("{:.3}", self.last_data.unwrap_or(0.0)))
829 } else {
830 Cell::from(format!("{:.1}", self.elapsed_time.unwrap_or(0.0)))
831 };
832 Row::new(vec![
833 Cell::from(self.route.clone()).style(style),
834 Cell::from(format!("{}", self.stream_id)).style(style),
835 Cell::from(self.name.clone()).style(style),
836 Cell::from(format!("{:.1}", self.rate_smps)).style(style),
837 drift_cell.style(style),
838 Cell::from(format!("{:.2}", self.jitter_ms)).style(style),
839 Cell::from(format!("{}", self.samples_dropped)).style(style),
840 Cell::from(format!("{}", self.last_n.unwrap_or(0))).style(style),
841 time_cell.style(style),
842 Cell::from(self.status).style(style),
843 ])
844 }
845}
846
847fn draw_ui(
848 terminal: &mut Terminal<ratatui::backend::CrosstermBackend<io::Stdout>>,
849 app: &mut HealthState,
850 config: &HealthConfig,
851 root_route: &DeviceRoute,
852) -> io::Result<()> {
853 let now = Instant::now();
854
855 let mut rows: Vec<DisplayRow> = app
856 .stats
857 .iter()
858 .map(|(key, st)| {
859 DisplayRow::from_stats(
860 key.route.to_string(),
861 key.stream_id,
862 st,
863 now,
864 app.stale_dur,
865 app.ppm_warn,
866 app.ppm_err,
867 )
868 })
869 .collect();
870
871 rows.sort_by(|a, b| a.route.cmp(&b.route).then(a.stream_id.cmp(&b.stream_id)));
872
873 let mut heartbeat_entries: Vec<_> = app
874 .device_states
875 .iter()
876 .map(|(route, state)| (route.to_string(), state.heartbeat_char(now)))
877 .collect();
878 heartbeat_entries.sort_by(|a, b| a.0.cmp(&b.0));
879 let heartbeat_display: String = heartbeat_entries
880 .iter()
881 .map(|(route, ch)| format!("{}: {}", route, ch))
882 .collect::<Vec<_>>()
883 .join(" ");
884
885 let show_heartbeat = app.show_heartbeat;
886 let show_ppm = app.show_ppm;
887 let show_sample_time = app.show_sample_time;
888 let event_scroll_offset = app.event_scroll_offset;
889 let warnings_only = app.warnings_only;
890 let event_display_lines = app.event_display_lines as u16;
891 let quiet = app.quiet;
892
893 let in_command = app.mode == Mode::Command;
894 let palette_rows = app.palette.suggestion_rows();
895 let active_route = app.active_route(root_route).clone();
896 let palette_status = app.rpc_palette_status(&active_route);
897 let session_str = indicatif::FormattedDuration(app.session_start.elapsed()).to_string();
898
899 terminal.draw(|f| {
900 let size = f.area();
901 let event_block_height = if app.event_log.is_empty() {
902 0
903 } else {
904 event_display_lines + 2
905 };
906 let footer_height = if in_command {
907 palette_rows + 5
909 } else if quiet {
910 0
911 } else {
912 1
913 };
914 app.footer_height = footer_height;
915 let heartbeat_height = if show_heartbeat { 1 } else { 0 };
916
917 let chunks = Layout::default()
918 .direction(Direction::Vertical)
919 .constraints([
920 Constraint::Length(3),
921 Constraint::Length(heartbeat_height),
922 Constraint::Min(10),
923 Constraint::Length(event_block_height),
924 Constraint::Length(footer_height),
925 ])
926 .split(size);
927
928 let header_text = format!(
930 "tio health ({}) — jitter={}s warn/err={}/{}ppm fps={} stale={}ms",
931 session_str,
932 config.jitter_window,
933 config.ppm_warn,
934 config.ppm_err,
935 config.fps,
936 config.stale_dur.as_millis()
937 );
938 f.render_widget(
939 Paragraph::new(header_text).style(Style::default().add_modifier(Modifier::BOLD)),
940 chunks[0],
941 );
942
943 if show_heartbeat {
945 f.render_widget(
946 Paragraph::new(heartbeat_display.clone()).style(Style::default().fg(Color::Cyan)),
947 chunks[1],
948 );
949 }
950
951 let drift_header = if show_ppm { "ppm" } else { "drift(s)" };
953 let time_header = if show_sample_time {
954 "sample_time"
955 } else {
956 "elapsed(s)"
957 };
958
959 let header_cells = [
960 "route",
961 "sid",
962 "stream",
963 "smps/s",
964 drift_header,
965 "jitter(ms)",
966 "dropped",
967 "last_n",
968 time_header,
969 "status",
970 ]
971 .into_iter()
972 .map(|h| Cell::from(h).style(Style::default().add_modifier(Modifier::BOLD)));
973
974 let widths = [
975 Constraint::Length(10),
976 Constraint::Length(4),
977 Constraint::Length(20),
978 Constraint::Length(9),
979 Constraint::Length(9),
980 Constraint::Length(11),
981 Constraint::Length(8),
982 Constraint::Length(10),
983 Constraint::Length(12),
984 Constraint::Length(8),
985 ];
986
987 let table = Table::new(
988 rows.iter()
989 .map(|r| r.to_table_row(show_ppm, show_sample_time))
990 .collect::<Vec<_>>(),
991 widths,
992 )
993 .header(Row::new(header_cells).height(1))
994 .column_spacing(2);
995
996 f.render_stateful_widget(table, chunks[2], &mut TableState::default());
997
998 if !app.event_log.is_empty() {
1000 let events_to_show: Vec<&LoggedEvent> = app
1001 .event_log
1002 .iter()
1003 .filter(|e| !warnings_only || matches!(e.color, Color::Red | Color::Yellow))
1004 .collect();
1005
1006 let total = events_to_show.len();
1007 let display_count = event_display_lines as usize;
1008 let start = event_scroll_offset.min(total.saturating_sub(1));
1009 let end = (start + display_count).min(total);
1010
1011 let visible: Vec<Line> = events_to_show[start..end]
1012 .iter()
1013 .map(|e| {
1014 let dt: DateTime<Local> = e.timestamp.into();
1015 Line::from(vec![
1016 Span::styled(
1017 format!("[{}] ", dt.format("%H:%M:%S%.3f")),
1018 Style::default().fg(e.color),
1019 ),
1020 Span::styled(e.event.clone(), Style::default().fg(e.color)),
1021 ])
1022 })
1023 .collect();
1024
1025 let title = if total > display_count {
1026 format!("Events [{}-{}/{}] (↑/↓)", start + 1, end, total)
1027 } else {
1028 "Events".to_string()
1029 };
1030
1031 f.render_widget(
1032 Paragraph::new(visible).block(
1033 Block::default()
1034 .title(title)
1035 .borders(Borders::ALL)
1036 .border_style(Style::default().fg(Color::Gray)),
1037 ),
1038 chunks[3],
1039 );
1040 }
1041
1042 if in_command {
1044 let registry = app
1045 .rpc_routes
1046 .get(&active_route)
1047 .and_then(RouteRpcState::registry);
1048 app.palette
1049 .render(f, chunks[4], &active_route, registry, palette_status, false);
1050 } else if !quiet {
1051 let heartbeat_hint = if show_heartbeat {
1052 "h:hide heartbeat"
1053 } else {
1054 "h:show heartbeat"
1055 };
1056 let drift_hint = if show_ppm { "p:drift" } else { "p:ppm" };
1057 let time_hint = if show_sample_time {
1058 "s:elapsed"
1059 } else {
1060 "s:sample"
1061 };
1062 f.render_widget(
1063 Paragraph::new(format!(
1064 "q to quit | : RPC | {} {} {} | r:reset c:clear log | ↑/↓/PgUp/PgDn",
1065 heartbeat_hint, drift_hint, time_hint
1066 ))
1067 .style(Style::default().fg(Color::Gray)),
1068 chunks[4],
1069 );
1070 }
1071 })?;
1072 Ok(())
1073}
1074
1075fn get_action(ev: Event, app: &mut HealthState, root_route: &DeviceRoute) -> Option<Action> {
1076 let Event::Key(k) = ev else { return None };
1077 if k.kind != KeyEventKind::Press {
1078 return None;
1079 }
1080 if k.code == KeyCode::Char('c')
1081 && k.modifiers == KeyModifiers::CONTROL
1082 && app.mode == Mode::Normal
1083 {
1084 return Some(Action::Quit);
1085 }
1086 match app.mode {
1087 Mode::Command => {
1088 let active = app.active_route(root_route).clone();
1089 let registry = app
1090 .rpc_routes
1091 .get(&active)
1092 .and_then(RouteRpcState::registry);
1093 let routes = app.available_routes(root_route);
1094 match app
1095 .palette
1096 .handle_key(k, registry, &active, &routes, app.footer_height)
1097 {
1098 PaletteEvent::Submit(req) => Some(Action::ExecuteRpc(req)),
1099 PaletteEvent::SelectRoute(r) => Some(Action::SelectRoute(r)),
1100 PaletteEvent::Exit => Some(Action::SetMode(Mode::Normal)),
1101 PaletteEvent::Consumed => None,
1102 }
1103 }
1104 Mode::Normal => match k.code {
1105 KeyCode::Char(':') => Some(Action::SetMode(Mode::Command)),
1106 KeyCode::Char('q') => Some(Action::Quit),
1107 KeyCode::Char('h') => Some(Action::ToggleHeartbeat),
1108 KeyCode::Char('p') => Some(Action::TogglePpm),
1109 KeyCode::Char('s') => Some(Action::ToggleSampleTime),
1110 KeyCode::Char('r') => Some(Action::ResetStats),
1111 KeyCode::Char('c') => Some(Action::ClearLog),
1112 KeyCode::Up => Some(Action::ScrollUp),
1113 KeyCode::Down => Some(Action::ScrollDown),
1114 KeyCode::PageUp => Some(Action::PageUp),
1115 KeyCode::PageDown => Some(Action::PageDown),
1116 KeyCode::Home => Some(Action::ScrollHome),
1117 KeyCode::End => Some(Action::ScrollEnd),
1118 _ => None,
1119 },
1120 }
1121}
1122
1123fn run_health_app(config: HealthConfig) -> eyre::Result<()> {
1124 use eyre::WrapErr;
1125
1126 let mut terminal = ratatui::init();
1127
1128 let proxy = tio::proxy::Interface::new(&config.tio.root);
1129 let root_route = config.tio.route.clone();
1130
1131 let tree = DeviceTree::open(&proxy, root_route.clone())
1132 .map_err(|e| {
1133 ratatui::restore();
1134 eyre::Report::new(e)
1135 })
1136 .wrap_err_with(|| format!("could not open device tree on {}", config.tio.root))
1137 .with_proxy_help()?;
1138
1139 let rpc_client = RpcClient::open(&proxy, root_route.clone())
1140 .map_err(|e| {
1141 ratatui::restore();
1142 eyre::Report::new(e)
1143 })
1144 .wrap_err_with(|| format!("could not open RPC client on {}", config.tio.root))
1145 .with_proxy_help()?;
1146 let (rpc_tx, rpc_resp_rx) = spawn_rpc_worker(rpc_client);
1147
1148 let data_rx = spawn_tree_worker(tree);
1149
1150 let (key_tx, key_rx) = channel::unbounded();
1152 std::thread::spawn(move || loop {
1153 if let Ok(ev) = event::read() {
1154 if key_tx.send(ev).is_err() {
1155 return;
1156 }
1157 }
1158 });
1159
1160 let mut app = HealthState::new(&config);
1161 let ui_tick = channel::tick(Duration::from_millis(1000 / config.fps));
1162 let mut stream_error = None;
1163
1164 'main: loop {
1165 crossbeam::select! {
1166 recv(data_rx) -> item => {
1167 let now = Instant::now();
1168 match item {
1169 Ok(Ok(TreeItem::Sample(sample, route))) => {
1170 app.handle_sample(sample, route, now);
1171 }
1172 Ok(Ok(TreeItem::Event(event))) => {
1173 app.handle_event(event, now, &rpc_tx);
1174 }
1175 Ok(Err(e)) => {
1176 stream_error = Some(e);
1177 break 'main;
1178 }
1179 Err(_) => break 'main,
1180 }
1181 }
1182
1183 recv(key_rx) -> ev => {
1184 if let Ok(ev) = ev {
1185 if let Some(action) = get_action(ev, &mut app, &root_route) {
1186 if app.update(action, &root_route, &rpc_tx) {
1187 break 'main;
1188 }
1189 }
1190 }
1191 }
1192
1193 recv(rpc_resp_rx) -> resp => {
1194 if let Ok(resp) = resp {
1195 match resp {
1196 RpcWorkerResp::List(list) => app.update_rpclists(list, &root_route),
1197 RpcWorkerResp::ListErr { route, error } => {
1198 app.update_rpclist_error(route, error, &root_route);
1199 }
1200 RpcWorkerResp::RpcResult(res) => {
1201 let (msg, col) = match res.result {
1202 Ok(s) => (
1203 format!("{}: {}", app.palette.last_rpc_command(), s),
1204 Color::Green,
1205 ),
1206 Err(s) => (format!("ERR: {}", s), Color::Red),
1207 };
1208 app.palette.set_rpc_result(msg, col);
1209 }
1210 }
1211 }
1212 }
1213
1214 recv(ui_tick) -> _ => {
1215 app.tick(Instant::now());
1216 if draw_ui(&mut terminal, &mut app, &config, &root_route).is_err() {
1217 break 'main;
1218 }
1219 }
1220 }
1221 }
1222
1223 ratatui::restore();
1224 if let Some(e) = stream_error {
1225 use color_eyre::Help;
1226 return Err(eyre::Report::new(e))
1227 .wrap_err("lost connection to data source")
1228 .suggestion("the data source went away (proxy exited or device disconnected); restart it and re-run this command");
1229 }
1230 Ok(())
1231}