Skip to main content

twinleaf_tools/tools/
health.rs

1// tio health
2//
3// Live timing & rate diagnostics by device route.
4// Uses DeviceTree for automatic metadata handling.
5//
6// Build: cargo run --release -- <tio-url> [route] [options]
7// Quit:  q / Ctrl-C
8
9use crate::tui::rpc_palette::{PaletteEvent, RpcPalette, RpcPaletteStatus, RpcReq};
10use crate::tui::rpc_state::RouteRpcState;
11use crate::tui::rpc_worker::{spawn_rpc_worker, RpcWorkerReq, RpcWorkerResp};
12use crate::tui::tree_worker::spawn_tree_worker;
13use crate::{HealthCli, ProxyHelp};
14use chrono::{DateTime, Local};
15use crossbeam::channel::{self, Sender};
16use ratatui::{
17    crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
18    layout::{Constraint, Direction, Layout},
19    style::{Color, Modifier, Style},
20    text::{Line, Span},
21    widgets::{Block, Borders, Cell, Paragraph, Row, Table, TableState},
22    Terminal,
23};
24use std::{
25    collections::{BTreeMap, HashMap, VecDeque},
26    io,
27    time::{Duration, Instant, SystemTime},
28};
29use twinleaf::{
30    data::{BoundaryReason, StreamKey},
31    device::{DeviceEvent, DeviceRoute, DeviceTree, RpcClient, RpcList, TreeEvent, TreeItem},
32    tio,
33};
34
35pub fn run_health(config: HealthConfig) -> eyre::Result<()> {
36    run_health_app(config)
37}
38
39#[derive(Debug, Clone)]
40pub struct HealthConfig {
41    tio: crate::TioOpts,
42    jitter_window: u64,
43    event_log_size: usize,
44    event_display_lines: u16,
45    warnings_only: bool,
46    stale_dur: Duration,
47    ppm_warn: f64,
48    ppm_err: f64,
49    streams: Option<Vec<u8>>,
50    quiet: bool,
51    fps: u64,
52}
53
54impl From<HealthCli> for HealthConfig {
55    fn from(cli: HealthCli) -> Self {
56        let stale_dur = cli.stale_dur();
57        Self {
58            tio: cli.tio,
59            jitter_window: cli.jitter_window,
60            event_log_size: cli.event_log_size as usize,
61            event_display_lines: cli.event_display_lines,
62            warnings_only: cli.warnings_only,
63            stale_dur,
64            ppm_warn: cli.ppm_warn,
65            ppm_err: cli.ppm_err,
66            streams: cli.streams,
67            quiet: cli.quiet,
68            fps: cli.fps,
69        }
70    }
71}
72
73#[derive(Default)]
74struct DeviceState {
75    last_heartbeat: Option<Instant>,
76    heartbeat_toggle: bool,
77}
78
79impl DeviceState {
80    fn heartbeat_char(&self, now: Instant) -> char {
81        let fresh = self
82            .last_heartbeat
83            .map(|t| now.duration_since(t) < Duration::from_millis(500))
84            .unwrap_or(false);
85
86        if !fresh {
87            '♡' // No recent heartbeat
88        } else if self.heartbeat_toggle {
89            '♥' // Filled
90        } else {
91            '♡' // Empty
92        }
93    }
94
95    fn on_heartbeat(&mut self, now: Instant) {
96        self.last_heartbeat = Some(now);
97        self.heartbeat_toggle = !self.heartbeat_toggle;
98    }
99}
100
101struct TimeWindow {
102    buf: Vec<f64>,
103    cap: usize,
104    idx: usize,
105    filled: bool,
106}
107
108impl TimeWindow {
109    fn new(seconds: u64, hz_guess: f64) -> Self {
110        let cap = ((seconds as f64 * hz_guess).round() as usize).max(16);
111        Self {
112            buf: vec![0.0; cap],
113            cap,
114            idx: 0,
115            filled: false,
116        }
117    }
118
119    fn push(&mut self, v: f64) {
120        self.buf[self.idx] = v;
121        self.idx = (self.idx + 1) % self.cap;
122        if self.idx == 0 {
123            self.filled = true;
124        }
125    }
126
127    fn std_ms(&self) -> f64 {
128        let n = if self.filled { self.cap } else { self.idx };
129        if n == 0 {
130            return 0.0;
131        }
132        let mean: f64 = self.buf[..n].iter().sum::<f64>() / (n as f64);
133        let var: f64 = self.buf[..n]
134            .iter()
135            .map(|x| (x - mean).powi(2))
136            .sum::<f64>()
137            / (n as f64);
138        var.sqrt()
139    }
140}
141
142struct OnlineSlope {
143    n: u64,
144    sum_x: f64,
145    sum_y: f64,
146    sum_xx: f64,
147    sum_xy: f64,
148    x0: f64,
149    y0: f64,
150}
151
152impl Default for OnlineSlope {
153    fn default() -> Self {
154        Self {
155            n: 0,
156            sum_x: 0.0,
157            sum_y: 0.0,
158            sum_xx: 0.0,
159            sum_xy: 0.0,
160            x0: 0.0,
161            y0: 0.0,
162        }
163    }
164}
165
166impl OnlineSlope {
167    fn push(&mut self, x: f64, y: f64) {
168        if self.n == 0 {
169            self.x0 = x;
170            self.y0 = y;
171        }
172        let dx = x - self.x0;
173        let dy = y - self.y0;
174        self.n += 1;
175        self.sum_x += dx;
176        self.sum_y += dy;
177        self.sum_xx += dx * dx;
178        self.sum_xy += dx * dy;
179    }
180
181    fn slope(&self) -> Option<f64> {
182        if self.n < 2 {
183            return None;
184        }
185        let denom = self.n as f64 * self.sum_xx - self.sum_x * self.sum_x;
186        if denom.abs() < f64::EPSILON {
187            return None;
188        }
189        Some((self.n as f64 * self.sum_xy - self.sum_x * self.sum_y) / denom)
190    }
191
192    fn reset(&mut self) {
193        *self = Self::default();
194    }
195}
196
197const MIN_DRIFT_SAMPLES: u64 = 50;
198
199#[derive(Default)]
200struct StreamStats {
201    host_epoch: Option<Instant>,
202
203    drift_slope: OnlineSlope,
204    drift_s: f64,
205    ppm: f64,
206
207    last_host: Option<Instant>,
208    last_data: Option<f64>,
209    jitter_ms: f64,
210    jitter_window: Option<TimeWindow>,
211
212    last_n: Option<u32>,
213    samples_dropped: u64,
214    current_session_id: Option<u32>,
215
216    rate_slope: OnlineSlope,
217    received_count: u64,
218    rate_smps: f64,
219
220    name: String,
221    last_seen: Option<Instant>,
222}
223
224impl StreamStats {
225    fn on_sample(&mut self, sample_n: u32, t_data: f64, now: Instant, jitter_window_s: u64) {
226        if self.host_epoch.is_none() {
227            self.host_epoch = Some(now);
228        }
229        let host_time = now.duration_since(self.host_epoch.unwrap()).as_secs_f64();
230
231        // Jitter (unchanged)
232        if self.jitter_window.is_none() {
233            self.jitter_window = Some(TimeWindow::new(jitter_window_s, 100.0));
234        }
235        if let (Some(lh), Some(ld)) = (self.last_host, self.last_data) {
236            let dh = now.duration_since(lh).as_secs_f64();
237            let dd = t_data - ld;
238            if let Some(w) = &mut self.jitter_window {
239                w.push((dd - dh) * 1000.0);
240                self.jitter_ms = w.std_ms();
241            }
242        }
243        self.last_host = Some(now);
244        self.last_data = Some(t_data);
245
246        // Drift / PPM via incremental OLS
247        self.drift_slope.push(host_time, t_data);
248        if self.drift_slope.n >= MIN_DRIFT_SAMPLES {
249            if let Some(beta) = self.drift_slope.slope() {
250                let host_elapsed = host_time - self.drift_slope.x0;
251                self.drift_s = (beta - 1.0) * host_elapsed;
252                self.ppm = (beta - 1.0) * 1e6;
253            }
254        }
255
256        // Rate via incremental OLS
257        self.received_count += 1;
258        self.rate_slope.push(host_time, self.received_count as f64);
259        if let Some(slope) = self.rate_slope.slope() {
260            self.rate_smps = slope;
261        }
262
263        self.last_n = Some(sample_n);
264    }
265
266    fn reset_timing(&mut self) {
267        self.drift_slope.reset();
268        self.drift_s = 0.0;
269        self.ppm = 0.0;
270        self.last_host = None;
271        self.last_data = None;
272        self.jitter_ms = 0.0;
273        self.jitter_window = None;
274        self.last_n = None;
275    }
276
277    fn reset_for_new_session(&mut self, session_id: u32) {
278        self.reset_timing();
279        self.samples_dropped = 0;
280        self.current_session_id = Some(session_id);
281    }
282
283    fn reset_all(&mut self) {
284        self.reset_timing();
285        self.rate_slope.reset();
286        self.received_count = 0;
287        self.rate_smps = 0.0;
288        self.host_epoch = None;
289        self.samples_dropped = 0;
290    }
291
292    // floor of stale_dur
293    fn stale_threshold(&self, floor: Duration) -> Duration {
294        if self.rate_slope.n >= 2 && self.rate_smps > 0.0 {
295            let period = Duration::from_secs_f64(2.0 / self.rate_smps);
296            std::cmp::max(floor, period)
297        } else {
298            floor
299        }
300    }
301
302    fn is_stale(&self, now: Instant, floor: Duration) -> bool {
303        let threshold = self.stale_threshold(floor);
304        self.last_seen
305            .map(|t| now.duration_since(t) > threshold)
306            .unwrap_or(true)
307    }
308}
309
310#[derive(Clone)]
311struct LoggedEvent {
312    timestamp: SystemTime,
313    event: String,
314    color: Color,
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318enum Mode {
319    Normal,
320    Command,
321}
322
323enum Action {
324    Quit,
325    ToggleHeartbeat,
326    TogglePpm,
327    ToggleSampleTime,
328    ScrollUp,
329    ScrollDown,
330    PageUp,
331    PageDown,
332    ScrollHome,
333    ScrollEnd,
334    SetMode(Mode),
335    ExecuteRpc(RpcReq),
336    SelectRoute(DeviceRoute),
337    ResetStats,
338    ClearLog,
339}
340
341struct HealthState {
342    stats: BTreeMap<StreamKey, StreamStats>,
343    device_states: HashMap<DeviceRoute, DeviceState>,
344    event_log: VecDeque<LoggedEvent>,
345    event_scroll_offset: usize,
346    show_heartbeat: bool,
347    show_ppm: bool,
348    show_sample_time: bool,
349    streams_filter: Option<Vec<u8>>,
350    jitter_window_s: u64,
351    event_log_cap: usize,
352    event_display_lines: usize,
353    warnings_only: bool,
354    stale_dur: Duration,
355    ppm_warn: f64,
356    ppm_err: f64,
357    quiet: bool,
358    mode: Mode,
359    palette: RpcPalette,
360    palette_route: Option<DeviceRoute>,
361    rpc_routes: HashMap<DeviceRoute, RouteRpcState>,
362    footer_height: u16,
363    session_start: Instant,
364}
365
366impl HealthState {
367    fn new(config: &HealthConfig) -> Self {
368        Self {
369            stats: BTreeMap::new(),
370            device_states: HashMap::new(),
371            event_log: VecDeque::new(),
372            event_scroll_offset: 0,
373            show_heartbeat: false,
374            show_ppm: true,
375            show_sample_time: true,
376            streams_filter: config.streams.clone(),
377            jitter_window_s: config.jitter_window,
378            event_log_cap: config.event_log_size,
379            event_display_lines: config.event_display_lines as usize,
380            warnings_only: config.warnings_only,
381            stale_dur: config.stale_dur,
382            ppm_warn: config.ppm_warn,
383            ppm_err: config.ppm_err,
384            quiet: config.quiet,
385            mode: Mode::Normal,
386            palette: RpcPalette::default(),
387            palette_route: None,
388            rpc_routes: HashMap::new(),
389            footer_height: 0,
390            session_start: Instant::now(),
391        }
392    }
393
394    fn active_route<'a>(&'a self, root_route: &'a DeviceRoute) -> &'a DeviceRoute {
395        self.palette_route.as_ref().unwrap_or(root_route)
396    }
397
398    fn available_routes(&self, root_route: &DeviceRoute) -> Vec<DeviceRoute> {
399        let mut routes: Vec<DeviceRoute> = self.device_states.keys().cloned().collect();
400        if !routes.contains(root_route) {
401            routes.push(root_route.clone());
402        }
403        routes.sort();
404        routes
405    }
406
407    fn rpc_palette_status(&self, route: &DeviceRoute) -> RpcPaletteStatus {
408        self.rpc_routes
409            .get(route)
410            .map(RouteRpcState::palette_status)
411            .unwrap_or(RpcPaletteStatus::WaitingForRpc)
412    }
413
414    fn update_palette_suggestions_for(&mut self, route: &DeviceRoute, root_route: &DeviceRoute) {
415        if self.mode == Mode::Command && self.active_route(root_route) == route {
416            let registry = self.rpc_routes.get(route).and_then(RouteRpcState::registry);
417            self.palette.update_suggestions(registry);
418        }
419    }
420
421    fn log_event(&mut self, msg: String, color: Color) {
422        self.event_log.push_front(LoggedEvent {
423            timestamp: SystemTime::now(),
424            event: msg,
425            color,
426        });
427        if self.event_log.len() > self.event_log_cap {
428            self.event_log.pop_back();
429        }
430    }
431
432    fn filtered_event_count(&self) -> usize {
433        self.event_log
434            .iter()
435            .filter(|e| !self.warnings_only || matches!(e.color, Color::Red | Color::Yellow))
436            .count()
437    }
438
439    fn handle_sample(&mut self, sample: twinleaf::data::Sample, route: DeviceRoute, now: Instant) {
440        let sid = sample.stream.stream_id;
441
442        if let Some(filter) = &self.streams_filter {
443            if !filter.contains(&sid) {
444                return;
445            }
446        }
447
448        let key = StreamKey::new(route.clone(), sid);
449        let st = self.stats.entry(key).or_insert_with(|| StreamStats {
450            name: sample.stream.name.clone(),
451            current_session_id: Some(sample.device.session_id),
452            ..Default::default()
453        });
454
455        st.name = sample.stream.name.clone();
456
457        if let Some(boundary) = &sample.boundary {
458            self.handle_boundary(&boundary.reason, &route, &sample.stream.name, sid);
459        }
460
461        let st = self.stats.get_mut(&StreamKey::new(route, sid)).unwrap();
462        if st.last_n.map(|n| sample.n != n).unwrap_or(true) {
463            st.last_seen = Some(now);
464        }
465
466        st.on_sample(sample.n, sample.timestamp_end(), now, self.jitter_window_s);
467    }
468
469    fn handle_boundary(
470        &mut self,
471        reason: &BoundaryReason,
472        route: &DeviceRoute,
473        stream_name: &str,
474        stream_id: u8,
475    ) {
476        let key = StreamKey::new(route.clone(), stream_id);
477        match reason {
478            BoundaryReason::Initial => {
479                self.log_event(
480                    format!("[{}/{}] STREAM STARTED", route, stream_name),
481                    Color::Green,
482                );
483            }
484            BoundaryReason::SessionChanged { old, new } => {
485                self.log_event(
486                    format!("[{}/{}] SESSION: {} → {}", route, stream_name, old, new),
487                    Color::Green,
488                );
489                if let Some(st) = self.stats.get_mut(&key) {
490                    st.reset_for_new_session(*new);
491                }
492            }
493            BoundaryReason::SamplesLost { expected, received } => {
494                let count = received.wrapping_sub(*expected);
495                self.log_event(
496                    format!("[{}/{}] DROPPED: {} samples", route, stream_name, count),
497                    Color::Red,
498                );
499                if let Some(st) = self.stats.get_mut(&key) {
500                    st.samples_dropped += count as u64;
501                }
502            }
503            BoundaryReason::TimeBackward { gap_seconds } => {
504                self.log_event(
505                    format!(
506                        "[{}/{}] TIME BACKWARD: {:.3}s",
507                        route, stream_name, gap_seconds
508                    ),
509                    Color::Yellow,
510                );
511            }
512            BoundaryReason::RateChanged { old_rate, new_rate } => {
513                self.log_event(
514                    format!(
515                        "[{}/{}] RATE: {:.1} → {:.1} Hz",
516                        route, stream_name, old_rate, new_rate
517                    ),
518                    Color::Yellow,
519                );
520                if let Some(st) = self.stats.get_mut(&key) {
521                    st.reset_timing();
522                    st.rate_slope.reset();
523                    st.received_count = 0;
524                    st.rate_smps = 0.0;
525                }
526            }
527            BoundaryReason::TimeRefSessionChanged { old, new } => {
528                self.log_event(
529                    format!("[{}/{}] TIME REF: {} → {}", route, stream_name, old, new),
530                    Color::Yellow,
531                );
532                if let Some(st) = self.stats.get_mut(&key) {
533                    st.reset_timing();
534                }
535            }
536            BoundaryReason::SegmentRollover { old_id, new_id } => {
537                self.log_event(
538                    format!(
539                        "[{}/{}] SEGMENT: {} → {}",
540                        route, stream_name, old_id, new_id
541                    ),
542                    Color::Green,
543                );
544            }
545            BoundaryReason::SegmentChanged { old_id, new_id } => {
546                self.log_event(
547                    format!(
548                        "[{}/{}] SEGMENT CHANGED: {} → {}",
549                        route, stream_name, old_id, new_id
550                    ),
551                    Color::Yellow,
552                );
553                if let Some(st) = self.stats.get_mut(&key) {
554                    st.reset_timing();
555                }
556            }
557        }
558    }
559
560    fn handle_event(&mut self, event: TreeEvent, now: Instant, rpc_tx: &Sender<RpcWorkerReq>) {
561        match event {
562            TreeEvent::RouteDiscovered(route) => {
563                self.device_states.entry(route.clone()).or_default();
564                self.log_event(format!("[{}] ROUTE DISCOVERED", route), Color::Green);
565                if self
566                    .rpc_routes
567                    .entry(route.clone())
568                    .or_default()
569                    .on_route_discovered()
570                {
571                    let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
572                }
573            }
574            TreeEvent::Device {
575                route,
576                event: DeviceEvent::Heartbeat { session_id },
577            } => {
578                if self
579                    .rpc_routes
580                    .entry(route.clone())
581                    .or_default()
582                    .on_heartbeat(session_id)
583                {
584                    let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
585                }
586                self.device_states
587                    .entry(route)
588                    .or_default()
589                    .on_heartbeat(now);
590            }
591            TreeEvent::Device {
592                route,
593                event: DeviceEvent::Status(status),
594            } => {
595                self.log_event(format!("[{}] STATUS: {:?}", route, status), Color::Yellow);
596                if self
597                    .rpc_routes
598                    .entry(route.clone())
599                    .or_default()
600                    .on_status(status)
601                {
602                    let _ = rpc_tx.send(RpcWorkerReq::FetchList(route.clone()));
603                }
604                if matches!(status, tio::proto::ProxyStatus::SensorDisconnected) {
605                    for (key, st) in self.stats.iter_mut() {
606                        if key.route == route {
607                            st.reset_timing();
608                            st.rate_slope.reset();
609                            st.received_count = 0;
610                            st.rate_smps = 0.0;
611                            st.host_epoch = None;
612                        }
613                    }
614                }
615            }
616            TreeEvent::Device {
617                route,
618                event: DeviceEvent::RpcInvalidated(method),
619            } => {
620                self.log_event(
621                    format!("[{}] RPC INVALIDATED: {:?}", route, method),
622                    Color::Cyan,
623                );
624            }
625            TreeEvent::Device {
626                route,
627                event: DeviceEvent::MetadataReady(metadata),
628            } => {
629                self.log_event(
630                    format!("[{}] METADATA READY: {}", route, metadata.device.name),
631                    Color::Green,
632                );
633            }
634            TreeEvent::Device {
635                route,
636                event: DeviceEvent::NewHash(hash),
637            } => {
638                self.log_event(format!("[{}] NEW HASH: {:?}", route, hash), Color::Green);
639                if self
640                    .rpc_routes
641                    .entry(route.clone())
642                    .or_default()
643                    .on_new_hash(hash)
644                {
645                    let _ = rpc_tx.send(RpcWorkerReq::FetchList(route));
646                }
647            }
648        }
649    }
650
651    fn tick(&mut self, now: Instant) {
652        for (_key, st) in self.stats.iter_mut() {
653            if st.is_stale(now, self.stale_dur) && st.rate_slope.n >= 2 {
654                st.reset_timing();
655                st.rate_slope.reset();
656                st.received_count = 0;
657                st.rate_smps = 0.0;
658                st.host_epoch = None;
659            }
660        }
661    }
662
663    fn update(
664        &mut self,
665        action: Action,
666        root_route: &DeviceRoute,
667        rpc_tx: &Sender<RpcWorkerReq>,
668    ) -> bool {
669        let total = self.filtered_event_count();
670        let display_count = self.event_display_lines;
671        match action {
672            Action::Quit => return true,
673            Action::ToggleHeartbeat => self.show_heartbeat = !self.show_heartbeat,
674            Action::TogglePpm => self.show_ppm = !self.show_ppm,
675            Action::ToggleSampleTime => self.show_sample_time = !self.show_sample_time,
676            Action::ScrollUp => {
677                self.event_scroll_offset = self.event_scroll_offset.saturating_sub(1);
678            }
679            Action::ScrollDown => {
680                if total > display_count {
681                    self.event_scroll_offset =
682                        (self.event_scroll_offset + 1).min(total.saturating_sub(display_count));
683                }
684            }
685            Action::PageUp => {
686                self.event_scroll_offset = self.event_scroll_offset.saturating_sub(display_count);
687            }
688            Action::PageDown => {
689                if total > display_count {
690                    self.event_scroll_offset = (self.event_scroll_offset + display_count)
691                        .min(total.saturating_sub(display_count));
692                }
693            }
694            Action::ScrollHome => {
695                self.event_scroll_offset = 0;
696            }
697            Action::ScrollEnd => {
698                if total > display_count {
699                    self.event_scroll_offset = total.saturating_sub(display_count);
700                }
701            }
702            Action::SetMode(Mode::Command) => {
703                let active = self.active_route(root_route).clone();
704                let registry = self
705                    .rpc_routes
706                    .get(&active)
707                    .and_then(RouteRpcState::registry);
708                self.palette.enter(registry);
709                self.mode = Mode::Command;
710            }
711            Action::SetMode(Mode::Normal) => {
712                self.mode = Mode::Normal;
713                self.palette.exit();
714            }
715            Action::ExecuteRpc(req) => {
716                let _ = rpc_tx.send(RpcWorkerReq::Execute(req));
717            }
718            Action::SelectRoute(route) => {
719                self.palette_route = Some(route.clone());
720                let registry = self
721                    .rpc_routes
722                    .get(&route)
723                    .and_then(RouteRpcState::registry);
724                self.palette.update_suggestions(registry);
725            }
726            Action::ResetStats => {
727                for st in self.stats.values_mut() {
728                    st.reset_all();
729                }
730            }
731            Action::ClearLog => {
732                self.event_log.clear();
733                self.event_scroll_offset = 0;
734            }
735        }
736        false
737    }
738
739    fn update_rpclists(&mut self, list: RpcList, root_route: &DeviceRoute) {
740        let route = list.route.clone();
741        self.rpc_routes
742            .entry(route.clone())
743            .or_default()
744            .on_fetch_success(&list);
745        self.update_palette_suggestions_for(&route, root_route);
746    }
747
748    fn update_rpclist_error(
749        &mut self,
750        route: DeviceRoute,
751        error: String,
752        root_route: &DeviceRoute,
753    ) {
754        self.rpc_routes
755            .entry(route.clone())
756            .or_default()
757            .on_fetch_error(error);
758        self.update_palette_suggestions_for(&route, root_route);
759    }
760}
761
762struct DisplayRow {
763    route: String,
764    stream_id: u8,
765    name: String,
766    rate_smps: f64,
767    drift_s: f64,
768    ppm: f64,
769    jitter_ms: f64,
770    samples_dropped: u64,
771    last_n: Option<u32>,
772    last_data: Option<f64>,
773    elapsed_time: Option<f64>,
774    status: &'static str,
775    color: Color,
776}
777
778impl DisplayRow {
779    fn from_stats(
780        route: String,
781        stream_id: u8,
782        st: &StreamStats,
783        now: Instant,
784        stale_dur: Duration,
785        ppm_warn: f64,
786        ppm_err: f64,
787    ) -> Self {
788        let stale = st.is_stale(now, stale_dur);
789        let (color, status) = if stale {
790            (Color::DarkGray, "STALLED")
791        } else if st.ppm.abs() >= ppm_err {
792            (Color::Red, "ERROR")
793        } else if st.ppm.abs() >= ppm_warn {
794            (Color::Yellow, "WARN")
795        } else {
796            (Color::Green, "OK")
797        };
798
799        let elapsed_time = st
800            .host_epoch
801            .map(|epoch| now.duration_since(epoch).as_secs_f64());
802
803        DisplayRow {
804            route,
805            stream_id,
806            name: st.name.clone(),
807            rate_smps: st.rate_smps,
808            drift_s: st.drift_s,
809            ppm: st.ppm,
810            jitter_ms: st.jitter_ms,
811            samples_dropped: st.samples_dropped,
812            last_n: st.last_n,
813            last_data: st.last_data,
814            elapsed_time,
815            status,
816            color,
817        }
818    }
819
820    fn to_table_row(&self, show_ppm: bool, show_sample_time: bool) -> Row<'static> {
821        let style = Style::default().fg(self.color);
822        let drift_cell = if show_ppm {
823            Cell::from(format!("{:.2}", self.ppm))
824        } else {
825            Cell::from(format!("{:.4}", self.drift_s))
826        };
827        let time_cell = if show_sample_time {
828            Cell::from(format!("{:.3}", self.last_data.unwrap_or(0.0)))
829        } else {
830            Cell::from(format!("{:.1}", self.elapsed_time.unwrap_or(0.0)))
831        };
832        Row::new(vec![
833            Cell::from(self.route.clone()).style(style),
834            Cell::from(format!("{}", self.stream_id)).style(style),
835            Cell::from(self.name.clone()).style(style),
836            Cell::from(format!("{:.1}", self.rate_smps)).style(style),
837            drift_cell.style(style),
838            Cell::from(format!("{:.2}", self.jitter_ms)).style(style),
839            Cell::from(format!("{}", self.samples_dropped)).style(style),
840            Cell::from(format!("{}", self.last_n.unwrap_or(0))).style(style),
841            time_cell.style(style),
842            Cell::from(self.status).style(style),
843        ])
844    }
845}
846
847fn draw_ui(
848    terminal: &mut Terminal<ratatui::backend::CrosstermBackend<io::Stdout>>,
849    app: &mut HealthState,
850    config: &HealthConfig,
851    root_route: &DeviceRoute,
852) -> io::Result<()> {
853    let now = Instant::now();
854
855    let mut rows: Vec<DisplayRow> = app
856        .stats
857        .iter()
858        .map(|(key, st)| {
859            DisplayRow::from_stats(
860                key.route.to_string(),
861                key.stream_id,
862                st,
863                now,
864                app.stale_dur,
865                app.ppm_warn,
866                app.ppm_err,
867            )
868        })
869        .collect();
870
871    rows.sort_by(|a, b| a.route.cmp(&b.route).then(a.stream_id.cmp(&b.stream_id)));
872
873    let mut heartbeat_entries: Vec<_> = app
874        .device_states
875        .iter()
876        .map(|(route, state)| (route.to_string(), state.heartbeat_char(now)))
877        .collect();
878    heartbeat_entries.sort_by(|a, b| a.0.cmp(&b.0));
879    let heartbeat_display: String = heartbeat_entries
880        .iter()
881        .map(|(route, ch)| format!("{}: {}", route, ch))
882        .collect::<Vec<_>>()
883        .join("  ");
884
885    let show_heartbeat = app.show_heartbeat;
886    let show_ppm = app.show_ppm;
887    let show_sample_time = app.show_sample_time;
888    let event_scroll_offset = app.event_scroll_offset;
889    let warnings_only = app.warnings_only;
890    let event_display_lines = app.event_display_lines as u16;
891    let quiet = app.quiet;
892
893    let in_command = app.mode == Mode::Command;
894    let palette_rows = app.palette.suggestion_rows();
895    let active_route = app.active_route(root_route).clone();
896    let palette_status = app.rpc_palette_status(&active_route);
897    let session_str = indicatif::FormattedDuration(app.session_start.elapsed()).to_string();
898
899    terminal.draw(|f| {
900        let size = f.area();
901        let event_block_height = if app.event_log.is_empty() {
902            0
903        } else {
904            event_display_lines + 2
905        };
906        let footer_height = if in_command {
907            // palette footer: suggestion block (rows + 2 borders) + 1 result + 2 input
908            palette_rows + 5
909        } else if quiet {
910            0
911        } else {
912            1
913        };
914        app.footer_height = footer_height;
915        let heartbeat_height = if show_heartbeat { 1 } else { 0 };
916
917        let chunks = Layout::default()
918            .direction(Direction::Vertical)
919            .constraints([
920                Constraint::Length(3),
921                Constraint::Length(heartbeat_height),
922                Constraint::Min(10),
923                Constraint::Length(event_block_height),
924                Constraint::Length(footer_height),
925            ])
926            .split(size);
927
928        // Header
929        let header_text = format!(
930            "tio health ({}) — jitter={}s  warn/err={}/{}ppm  fps={}  stale={}ms",
931            session_str,
932            config.jitter_window,
933            config.ppm_warn,
934            config.ppm_err,
935            config.fps,
936            config.stale_dur.as_millis()
937        );
938        f.render_widget(
939            Paragraph::new(header_text).style(Style::default().add_modifier(Modifier::BOLD)),
940            chunks[0],
941        );
942
943        // Heartbeat (conditional)
944        if show_heartbeat {
945            f.render_widget(
946                Paragraph::new(heartbeat_display.clone()).style(Style::default().fg(Color::Cyan)),
947                chunks[1],
948            );
949        }
950
951        // Table
952        let drift_header = if show_ppm { "ppm" } else { "drift(s)" };
953        let time_header = if show_sample_time {
954            "sample_time"
955        } else {
956            "elapsed(s)"
957        };
958
959        let header_cells = [
960            "route",
961            "sid",
962            "stream",
963            "smps/s",
964            drift_header,
965            "jitter(ms)",
966            "dropped",
967            "last_n",
968            time_header,
969            "status",
970        ]
971        .into_iter()
972        .map(|h| Cell::from(h).style(Style::default().add_modifier(Modifier::BOLD)));
973
974        let widths = [
975            Constraint::Length(10),
976            Constraint::Length(4),
977            Constraint::Length(20),
978            Constraint::Length(9),
979            Constraint::Length(9),
980            Constraint::Length(11),
981            Constraint::Length(8),
982            Constraint::Length(10),
983            Constraint::Length(12),
984            Constraint::Length(8),
985        ];
986
987        let table = Table::new(
988            rows.iter()
989                .map(|r| r.to_table_row(show_ppm, show_sample_time))
990                .collect::<Vec<_>>(),
991            widths,
992        )
993        .header(Row::new(header_cells).height(1))
994        .column_spacing(2);
995
996        f.render_stateful_widget(table, chunks[2], &mut TableState::default());
997
998        // Event log
999        if !app.event_log.is_empty() {
1000            let events_to_show: Vec<&LoggedEvent> = app
1001                .event_log
1002                .iter()
1003                .filter(|e| !warnings_only || matches!(e.color, Color::Red | Color::Yellow))
1004                .collect();
1005
1006            let total = events_to_show.len();
1007            let display_count = event_display_lines as usize;
1008            let start = event_scroll_offset.min(total.saturating_sub(1));
1009            let end = (start + display_count).min(total);
1010
1011            let visible: Vec<Line> = events_to_show[start..end]
1012                .iter()
1013                .map(|e| {
1014                    let dt: DateTime<Local> = e.timestamp.into();
1015                    Line::from(vec![
1016                        Span::styled(
1017                            format!("[{}] ", dt.format("%H:%M:%S%.3f")),
1018                            Style::default().fg(e.color),
1019                        ),
1020                        Span::styled(e.event.clone(), Style::default().fg(e.color)),
1021                    ])
1022                })
1023                .collect();
1024
1025            let title = if total > display_count {
1026                format!("Events [{}-{}/{}] (↑/↓)", start + 1, end, total)
1027            } else {
1028                "Events".to_string()
1029            };
1030
1031            f.render_widget(
1032                Paragraph::new(visible).block(
1033                    Block::default()
1034                        .title(title)
1035                        .borders(Borders::ALL)
1036                        .border_style(Style::default().fg(Color::Gray)),
1037                ),
1038                chunks[3],
1039            );
1040        }
1041
1042        // Footer: RPC palette when in Command mode, keybind hints otherwise
1043        if in_command {
1044            let registry = app
1045                .rpc_routes
1046                .get(&active_route)
1047                .and_then(RouteRpcState::registry);
1048            app.palette
1049                .render(f, chunks[4], &active_route, registry, palette_status, false);
1050        } else if !quiet {
1051            let heartbeat_hint = if show_heartbeat {
1052                "h:hide heartbeat"
1053            } else {
1054                "h:show heartbeat"
1055            };
1056            let drift_hint = if show_ppm { "p:drift" } else { "p:ppm" };
1057            let time_hint = if show_sample_time {
1058                "s:elapsed"
1059            } else {
1060                "s:sample"
1061            };
1062            f.render_widget(
1063                Paragraph::new(format!(
1064                    "q to quit  |  : RPC  |  {}  {}  {}  |  r:reset  c:clear log  |  ↑/↓/PgUp/PgDn",
1065                    heartbeat_hint, drift_hint, time_hint
1066                ))
1067                .style(Style::default().fg(Color::Gray)),
1068                chunks[4],
1069            );
1070        }
1071    })?;
1072    Ok(())
1073}
1074
1075fn get_action(ev: Event, app: &mut HealthState, root_route: &DeviceRoute) -> Option<Action> {
1076    let Event::Key(k) = ev else { return None };
1077    if k.kind != KeyEventKind::Press {
1078        return None;
1079    }
1080    if k.code == KeyCode::Char('c')
1081        && k.modifiers == KeyModifiers::CONTROL
1082        && app.mode == Mode::Normal
1083    {
1084        return Some(Action::Quit);
1085    }
1086    match app.mode {
1087        Mode::Command => {
1088            let active = app.active_route(root_route).clone();
1089            let registry = app
1090                .rpc_routes
1091                .get(&active)
1092                .and_then(RouteRpcState::registry);
1093            let routes = app.available_routes(root_route);
1094            match app
1095                .palette
1096                .handle_key(k, registry, &active, &routes, app.footer_height)
1097            {
1098                PaletteEvent::Submit(req) => Some(Action::ExecuteRpc(req)),
1099                PaletteEvent::SelectRoute(r) => Some(Action::SelectRoute(r)),
1100                PaletteEvent::Exit => Some(Action::SetMode(Mode::Normal)),
1101                PaletteEvent::Consumed => None,
1102            }
1103        }
1104        Mode::Normal => match k.code {
1105            KeyCode::Char(':') => Some(Action::SetMode(Mode::Command)),
1106            KeyCode::Char('q') => Some(Action::Quit),
1107            KeyCode::Char('h') => Some(Action::ToggleHeartbeat),
1108            KeyCode::Char('p') => Some(Action::TogglePpm),
1109            KeyCode::Char('s') => Some(Action::ToggleSampleTime),
1110            KeyCode::Char('r') => Some(Action::ResetStats),
1111            KeyCode::Char('c') => Some(Action::ClearLog),
1112            KeyCode::Up => Some(Action::ScrollUp),
1113            KeyCode::Down => Some(Action::ScrollDown),
1114            KeyCode::PageUp => Some(Action::PageUp),
1115            KeyCode::PageDown => Some(Action::PageDown),
1116            KeyCode::Home => Some(Action::ScrollHome),
1117            KeyCode::End => Some(Action::ScrollEnd),
1118            _ => None,
1119        },
1120    }
1121}
1122
1123fn run_health_app(config: HealthConfig) -> eyre::Result<()> {
1124    use eyre::WrapErr;
1125
1126    let mut terminal = ratatui::init();
1127
1128    let proxy = tio::proxy::Interface::new(&config.tio.root);
1129    let root_route = config.tio.route.clone();
1130
1131    let tree = DeviceTree::open(&proxy, root_route.clone())
1132        .map_err(|e| {
1133            ratatui::restore();
1134            eyre::Report::new(e)
1135        })
1136        .wrap_err_with(|| format!("could not open device tree on {}", config.tio.root))
1137        .with_proxy_help()?;
1138
1139    let rpc_client = RpcClient::open(&proxy, root_route.clone())
1140        .map_err(|e| {
1141            ratatui::restore();
1142            eyre::Report::new(e)
1143        })
1144        .wrap_err_with(|| format!("could not open RPC client on {}", config.tio.root))
1145        .with_proxy_help()?;
1146    let (rpc_tx, rpc_resp_rx) = spawn_rpc_worker(rpc_client);
1147
1148    let data_rx = spawn_tree_worker(tree);
1149
1150    // Key thread
1151    let (key_tx, key_rx) = channel::unbounded();
1152    std::thread::spawn(move || loop {
1153        if let Ok(ev) = event::read() {
1154            if key_tx.send(ev).is_err() {
1155                return;
1156            }
1157        }
1158    });
1159
1160    let mut app = HealthState::new(&config);
1161    let ui_tick = channel::tick(Duration::from_millis(1000 / config.fps));
1162    let mut stream_error = None;
1163
1164    'main: loop {
1165        crossbeam::select! {
1166            recv(data_rx) -> item => {
1167                let now = Instant::now();
1168                match item {
1169                    Ok(Ok(TreeItem::Sample(sample, route))) => {
1170                        app.handle_sample(sample, route, now);
1171                    }
1172                    Ok(Ok(TreeItem::Event(event))) => {
1173                        app.handle_event(event, now, &rpc_tx);
1174                    }
1175                    Ok(Err(e)) => {
1176                        stream_error = Some(e);
1177                        break 'main;
1178                    }
1179                    Err(_) => break 'main,
1180                }
1181            }
1182
1183            recv(key_rx) -> ev => {
1184                if let Ok(ev) = ev {
1185                    if let Some(action) = get_action(ev, &mut app, &root_route) {
1186                        if app.update(action, &root_route, &rpc_tx) {
1187                            break 'main;
1188                        }
1189                    }
1190                }
1191            }
1192
1193            recv(rpc_resp_rx) -> resp => {
1194                if let Ok(resp) = resp {
1195                    match resp {
1196                        RpcWorkerResp::List(list) => app.update_rpclists(list, &root_route),
1197                        RpcWorkerResp::ListErr { route, error } => {
1198                            app.update_rpclist_error(route, error, &root_route);
1199                        }
1200                        RpcWorkerResp::RpcResult(res) => {
1201                            let (msg, col) = match res.result {
1202                                Ok(s) => (
1203                                    format!("{}: {}", app.palette.last_rpc_command(), s),
1204                                    Color::Green,
1205                                ),
1206                                Err(s) => (format!("ERR: {}", s), Color::Red),
1207                            };
1208                            app.palette.set_rpc_result(msg, col);
1209                        }
1210                    }
1211                }
1212            }
1213
1214            recv(ui_tick) -> _ => {
1215                app.tick(Instant::now());
1216                if draw_ui(&mut terminal, &mut app, &config, &root_route).is_err() {
1217                    break 'main;
1218                }
1219            }
1220        }
1221    }
1222
1223    ratatui::restore();
1224    if let Some(e) = stream_error {
1225        use color_eyre::Help;
1226        return Err(eyre::Report::new(e))
1227            .wrap_err("lost connection to data source")
1228            .suggestion("the data source went away (proxy exited or device disconnected); restart it and re-run this command");
1229    }
1230    Ok(())
1231}