cu_consolemon/
lib.rs

1#[cfg(feature = "debug_pane")]
2mod debug_pane;
3pub mod sysinfo;
4mod tui_nodes;
5
6use crate::tui_nodes::{Connection, NodeGraph, NodeLayout};
7use ansi_to_tui::IntoText;
8use color_eyre::config::HookBuilder;
9use compact_str::{CompactString, ToCompactString};
10use cu29::clock::{CuDuration, RobotClock};
11use cu29::config::CuConfig;
12use cu29::cutask::CuMsgMetadata;
13use cu29::monitoring::{
14    ComponentKind, CuDurationStatistics, CuMonitor, CuTaskState, Decision, MonitorTopology,
15};
16use cu29::prelude::{CuCompactString, CuTime, pool};
17use cu29::{CuError, CuResult};
18#[cfg(feature = "debug_pane")]
19use debug_pane::UIExt;
20use ratatui::backend::CrosstermBackend;
21use ratatui::buffer::Buffer;
22use ratatui::crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode};
23use ratatui::crossterm::terminal::{
24    EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
25};
26use ratatui::crossterm::tty::IsTty;
27use ratatui::crossterm::{event, execute};
28use ratatui::layout::{Alignment, Constraint, Direction, Layout, Size};
29use ratatui::prelude::{Backend, Rect};
30use ratatui::prelude::{Stylize, Widget};
31use ratatui::style::{Color, Modifier, Style};
32use ratatui::text::{Line, Span, Text};
33use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, StatefulWidget, Table};
34use ratatui::{Frame, Terminal};
35use std::backtrace::Backtrace;
36use std::fmt::{Display, Formatter};
37use std::io::{Write, stdin, stdout};
38use std::marker::PhantomData;
39use std::process;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::{Arc, Mutex, OnceLock};
42use std::thread::JoinHandle;
43use std::time::{Duration, Instant};
44use std::{collections::HashMap, io, thread};
45use tui_widgets::scrollview::{ScrollView, ScrollViewState};
46
47#[cfg(feature = "debug_pane")]
48const MENU_CONTENT: &str = "   [1] SysInfo  [2] DAG  [3] Latencies  [4] Memory Pools [5] Debug Output  [q] Quit | Scroll: hjkl or ↑↓←→   ";
49#[cfg(not(feature = "debug_pane"))]
50const MENU_CONTENT: &str =
51    "   [1] SysInfo  [2] DAG  [3] Latencies  [4] Memory Pools [q] Quit | Scroll: hjkl or ↑↓←→   ";
52
53#[derive(PartialEq)]
54enum Screen {
55    Neofetch,
56    Dag,
57    Latency,
58    #[cfg(feature = "debug_pane")]
59    DebugOutput,
60    MemoryPools,
61}
62
63struct TaskStats {
64    stats: Vec<CuDurationStatistics>,
65    end2end: CuDurationStatistics,
66}
67
68impl TaskStats {
69    fn new(num_tasks: usize, max_duration: CuDuration) -> Self {
70        let stats = vec![CuDurationStatistics::new(max_duration); num_tasks];
71        TaskStats {
72            stats,
73            end2end: CuDurationStatistics::new(max_duration),
74        }
75    }
76
77    fn update(&mut self, msgs: &[&CuMsgMetadata]) {
78        for (i, &msg) in msgs.iter().enumerate() {
79            let (before, after) = (
80                msg.process_time.start.unwrap(),
81                msg.process_time.end.unwrap(),
82            );
83            self.stats[i].record(after - before);
84        }
85        self.end2end.record(compute_end_to_end_latency(msgs));
86    }
87
88    fn reset(&mut self) {
89        for s in &mut self.stats {
90            s.reset();
91        }
92        self.end2end.reset();
93    }
94}
95struct PoolStats {
96    id: CompactString,
97    space_left: usize,
98    total_size: usize,
99    buffer_size: usize,
100    handles_in_use: usize,
101    handles_per_second: usize,
102    last_update: Instant,
103    prev_handles_in_use: usize,
104}
105
106impl PoolStats {
107    fn new(
108        id: impl ToCompactString,
109        space_left: usize,
110        total_size: usize,
111        buffer_size: usize,
112    ) -> Self {
113        Self {
114            id: id.to_compact_string(),
115            space_left,
116            total_size,
117            buffer_size,
118            handles_in_use: total_size - space_left,
119            handles_per_second: 0,
120            last_update: Instant::now(),
121            prev_handles_in_use: 0,
122        }
123    }
124
125    fn update(&mut self, space_left: usize, total_size: usize) {
126        let now = Instant::now();
127        let handles_in_use = total_size - space_left;
128        let elapsed = now.duration_since(self.last_update).as_secs_f32();
129
130        if elapsed >= 1.0 {
131            self.handles_per_second =
132                ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
133            self.prev_handles_in_use = self.handles_in_use;
134            self.last_update = now;
135        }
136
137        self.handles_in_use = handles_in_use;
138        self.space_left = space_left;
139        self.total_size = total_size;
140    }
141}
142
143fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
144    let start = msgs.first().map(|m| m.process_time.start);
145    let end = msgs.last().map(|m| m.process_time.end);
146
147    match (start, end) {
148        (Some(s), Some(e)) => match (Option::<CuTime>::from(s), Option::<CuTime>::from(e)) {
149            (Some(s), Some(e)) if e >= s => e - s,
150            (Some(_), Some(_)) => CuDuration::MIN,
151            _ => CuDuration::MIN,
152        },
153        _ => CuDuration::MIN,
154    }
155}
156
157// This is kind of terrible.
158#[derive(Copy, Clone)]
159enum NodeType {
160    Unknown,
161    Source,
162    Sink,
163    Task,
164    Bridge,
165}
166
167impl Display for NodeType {
168    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::Unknown => write!(f, "?"),
171            Self::Source => write!(f, "◈"),
172            Self::Task => write!(f, "⚙"),
173            Self::Sink => write!(f, "⭳"),
174            Self::Bridge => write!(f, "⇆"),
175        }
176    }
177}
178
179impl NodeType {
180    fn add_incoming(self) -> NodeType {
181        match self {
182            Self::Unknown => Self::Sink,
183            Self::Source => Self::Task,
184            Self::Sink => Self::Sink,
185            Self::Task => Self::Task,
186            Self::Bridge => Self::Bridge,
187        }
188    }
189
190    fn add_outgoing(self) -> NodeType {
191        match self {
192            Self::Unknown => Self::Source,
193            Self::Source => Self::Source,
194            Self::Sink => Self::Task,
195            Self::Task => Self::Task,
196            Self::Bridge => Self::Bridge,
197        }
198    }
199
200    fn color(self) -> Color {
201        match self {
202            Self::Unknown => Color::Gray,
203            Self::Source => Color::Rgb(255, 191, 0),
204            Self::Sink => Color::Rgb(255, 102, 204),
205            Self::Task => Color::White,
206            Self::Bridge => Color::Rgb(204, 153, 255),
207        }
208    }
209}
210
211#[derive(Default, Clone)]
212struct TaskStatus {
213    is_error: bool,
214    status_txt: CompactString,
215    error: CompactString,
216}
217
218#[derive(Clone)]
219struct DisplayNode {
220    id: String,
221    type_label: String,
222    node_type: NodeType,
223    inputs: Vec<String>,
224    outputs: Vec<String>,
225}
226
227struct NodesScrollableWidgetState {
228    display_nodes: Vec<DisplayNode>,
229    connections: Vec<Connection>,
230    statuses: Arc<Mutex<Vec<TaskStatus>>>,
231    status_index_map: Vec<Option<usize>>,
232    task_count: usize,
233    nodes_scrollable_state: ScrollViewState,
234}
235
236impl NodesScrollableWidgetState {
237    fn new(
238        config: &CuConfig,
239        errors: Arc<Mutex<Vec<TaskStatus>>>,
240        mission: Option<&str>,
241        task_ids: &'static [&'static str],
242        topology: Option<MonitorTopology>,
243    ) -> Self {
244        let topology = topology
245            .or_else(|| cu29::monitoring::build_monitor_topology(config, mission).ok())
246            .unwrap_or_default();
247
248        let mut display_nodes: Vec<DisplayNode> = Vec::new();
249        let mut status_index_map = Vec::new();
250        let mut node_lookup = HashMap::new();
251        let task_index_lookup: HashMap<&str, usize> = task_ids
252            .iter()
253            .enumerate()
254            .map(|(i, id)| (*id, i))
255            .collect();
256
257        for node in topology.nodes.iter() {
258            let node_type = match node.kind {
259                ComponentKind::Bridge => NodeType::Bridge,
260                ComponentKind::Task => {
261                    let mut role = NodeType::Unknown;
262                    if !node.inputs.is_empty() {
263                        role = role.add_incoming();
264                    }
265                    if !node.outputs.is_empty() {
266                        role = role.add_outgoing();
267                    }
268                    role
269                }
270            };
271
272            display_nodes.push(DisplayNode {
273                id: node.id.clone(),
274                type_label: node
275                    .type_name
276                    .clone()
277                    .unwrap_or_else(|| "unknown".to_string()),
278                node_type,
279                inputs: node.inputs.clone(),
280                outputs: node.outputs.clone(),
281            });
282            let idx = display_nodes.len() - 1;
283            node_lookup.insert(node.id.clone(), idx);
284
285            let status_idx = match node.kind {
286                ComponentKind::Task => task_index_lookup.get(node.id.as_str()).cloned(),
287                ComponentKind::Bridge => None,
288            };
289            status_index_map.push(status_idx);
290        }
291
292        let mut connections: Vec<Connection> = Vec::with_capacity(topology.connections.len());
293        for cnx in topology.connections.iter() {
294            let Some(&src_idx) = node_lookup.get(&cnx.src) else {
295                continue;
296            };
297            let Some(&dst_idx) = node_lookup.get(&cnx.dst) else {
298                continue;
299            };
300            let src_node = &display_nodes[src_idx];
301            let dst_node = &display_nodes[dst_idx];
302            let src_port = cnx
303                .src_port
304                .as_ref()
305                .and_then(|p| src_node.outputs.iter().position(|name| name == p))
306                .unwrap_or(0);
307            let dst_port = cnx
308                .dst_port
309                .as_ref()
310                .and_then(|p| dst_node.inputs.iter().position(|name| name == p))
311                .unwrap_or(0);
312
313            connections.push(Connection::new(
314                src_idx,
315                src_port + NODE_PORT_ROW_OFFSET,
316                dst_idx,
317                dst_port + NODE_PORT_ROW_OFFSET,
318            ));
319        }
320
321        // tui-nodes drops all nodes when every node has an outgoing edge (no roots).
322        // If that happens, drop the outgoing edges for the first node so at least one root exists.
323        if !display_nodes.is_empty() {
324            let mut from_set = std::collections::HashSet::new();
325            for conn in &connections {
326                from_set.insert(conn.from_node);
327            }
328            if from_set.len() == display_nodes.len() {
329                let root_idx = 0;
330                connections.retain(|c| c.from_node != root_idx);
331            }
332        }
333
334        NodesScrollableWidgetState {
335            display_nodes,
336            connections,
337            nodes_scrollable_state: ScrollViewState::default(),
338            statuses: errors,
339            status_index_map,
340            task_count: task_ids.len(),
341        }
342    }
343}
344
345struct NodesScrollableWidget<'a> {
346    _marker: PhantomData<&'a ()>,
347}
348
349struct GraphWrapper<'a> {
350    inner: NodeGraph<'a>,
351}
352
353impl Widget for GraphWrapper<'_> {
354    fn render(self, area: Rect, buf: &mut Buffer)
355    where
356        Self: Sized,
357    {
358        self.inner.render(area, buf, &mut ())
359    }
360}
361
362const NODE_WIDTH: u16 = 29;
363const NODE_WIDTH_CONTENT: u16 = NODE_WIDTH - 2;
364
365const NODE_HEIGHT: u16 = 5;
366const NODE_META_LINES: usize = 2;
367const NODE_PORT_ROW_OFFSET: usize = NODE_META_LINES;
368
369fn clip_tail(value: &str, max_chars: usize) -> String {
370    if max_chars == 0 {
371        return String::new();
372    }
373    let char_count = value.chars().count();
374    if char_count <= max_chars {
375        return value.to_string();
376    }
377    let skip = char_count.saturating_sub(max_chars);
378    let start = value
379        .char_indices()
380        .nth(skip)
381        .map(|(idx, _)| idx)
382        .unwrap_or(value.len());
383    value[start..].to_string()
384}
385
386#[allow(dead_code)]
387const NODE_HEIGHT_CONTENT: u16 = NODE_HEIGHT - 2;
388const GRAPH_WIDTH_PADDING: u16 = NODE_WIDTH * 2;
389const GRAPH_HEIGHT_PADDING: u16 = NODE_HEIGHT * 4;
390
391impl StatefulWidget for NodesScrollableWidget<'_> {
392    type State = NodesScrollableWidgetState;
393
394    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
395        let build_node_layouts = || {
396            state
397                .display_nodes
398                .iter()
399                .map(|node| {
400                    let ports = node.inputs.len().max(node.outputs.len());
401                    let content_rows = ports + NODE_PORT_ROW_OFFSET;
402                    let height = (content_rows as u16).saturating_add(2).max(NODE_HEIGHT);
403                    let mut title_line = Line::default();
404                    title_line.spans.push(Span::styled(
405                        format!(" {}", node.node_type),
406                        Style::default().fg(node.node_type.color()),
407                    ));
408                    title_line.spans.push(Span::styled(
409                        format!(" {} ", node.id),
410                        Style::default().fg(Color::White),
411                    ));
412                    NodeLayout::new((NODE_WIDTH, height)).with_title_line(title_line)
413                })
414                .collect::<Vec<_>>()
415        };
416
417        let node_count = state.display_nodes.len().max(1);
418        let content_width = (node_count as u16)
419            .saturating_mul(NODE_WIDTH + 20)
420            .max(NODE_WIDTH);
421        let max_ports = state
422            .display_nodes
423            .iter()
424            .map(|node| node.inputs.len().max(node.outputs.len()))
425            .max()
426            .unwrap_or_default();
427        // Give extra vertical room so long connections can route without aliasing
428        let content_height =
429            (((max_ports + NODE_PORT_ROW_OFFSET) as u16) * 12).max(NODE_HEIGHT * 6);
430        let connections = state.connections.clone();
431        let build_graph = |width: u16, height: u16| {
432            NodeGraph::new(
433                build_node_layouts(),
434                connections.clone(),
435                width as usize,
436                height as usize,
437            )
438        };
439        let mut graph = build_graph(content_width, content_height);
440        graph.calculate();
441        let mut content_size = Size::new(content_width, content_height);
442        if state.display_nodes.is_empty() {
443            content_size = Size::new(area.width.max(NODE_WIDTH), area.height.max(NODE_HEIGHT));
444            graph = build_graph(content_size.width, content_size.height);
445            graph.calculate();
446        } else {
447            let bounds = graph.content_bounds();
448            let desired_width = bounds
449                .width
450                .saturating_add(GRAPH_WIDTH_PADDING)
451                .max(NODE_WIDTH);
452            let desired_height = bounds
453                .height
454                .saturating_add(GRAPH_HEIGHT_PADDING)
455                .max(NODE_HEIGHT);
456            if desired_width != content_size.width || desired_height != content_size.height {
457                content_size = Size::new(desired_width, desired_height);
458                graph = build_graph(content_size.width, content_size.height);
459                graph.calculate();
460            }
461        }
462        let mut scroll_view = ScrollView::new(content_size);
463        let zones = graph.split(scroll_view.area());
464
465        {
466            let mut statuses = state.statuses.lock().unwrap();
467            if statuses.len() <= state.task_count {
468                statuses.resize(state.task_count + 1, TaskStatus::default());
469            }
470            for (idx, ea_zone) in zones.into_iter().enumerate() {
471                let fallback_idx = state.task_count;
472                let status_idx = state
473                    .status_index_map
474                    .get(idx)
475                    .and_then(|opt| *opt)
476                    .unwrap_or(fallback_idx);
477                let safe_index = if status_idx < statuses.len() {
478                    status_idx
479                } else {
480                    statuses.len() - 1
481                };
482                let status = &mut statuses[safe_index];
483                let s = &state.display_nodes[idx].type_label;
484                let status_line = if status.is_error {
485                    format!("❌ {}", status.error)
486                } else {
487                    format!("✓ {}", status.status_txt)
488                };
489
490                let label_width = (NODE_WIDTH_CONTENT as usize).saturating_sub(2);
491                let type_label = clip_tail(s, label_width);
492                let status_text = clip_tail(&status_line, label_width);
493                let base_style = if status.is_error {
494                    Style::default().fg(Color::Red)
495                } else {
496                    Style::default().fg(Color::Green)
497                };
498                let mut lines: Vec<Line> = Vec::new();
499                lines.push(Line::styled(format!(" {}", type_label), base_style));
500                lines.push(Line::styled(format!(" {}", status_text), base_style));
501
502                let max_ports = state.display_nodes[idx]
503                    .inputs
504                    .len()
505                    .max(state.display_nodes[idx].outputs.len());
506                if max_ports > 0 {
507                    let left_width = (NODE_WIDTH_CONTENT as usize - 2) / 2;
508                    let right_width = NODE_WIDTH_CONTENT as usize - 2 - left_width;
509                    let input_style = Style::default().fg(Color::Yellow);
510                    let output_style = Style::default().fg(Color::Cyan);
511                    let dotted_style = Style::default().fg(Color::DarkGray);
512                    for port_idx in 0..max_ports {
513                        let input = state.display_nodes[idx]
514                            .inputs
515                            .get(port_idx)
516                            .map(|label| clip_tail(label, left_width))
517                            .unwrap_or_default();
518                        let output = state.display_nodes[idx]
519                            .outputs
520                            .get(port_idx)
521                            .map(|label| clip_tail(label, right_width))
522                            .unwrap_or_default();
523                        let mut port_line = Line::default();
524                        port_line.spans.push(Span::styled(
525                            format!(" {:<left_width$}", input, left_width = left_width),
526                            input_style,
527                        ));
528                        port_line.spans.push(Span::styled("┆", dotted_style));
529                        port_line.spans.push(Span::styled(
530                            format!("{:>right_width$}", output, right_width = right_width),
531                            output_style,
532                        ));
533                        lines.push(port_line);
534                    }
535                }
536
537                let txt = Text::from(lines);
538                let paragraph = Paragraph::new(txt);
539                status.is_error = false; // reset if it was displayed
540                scroll_view.render_widget(paragraph, ea_zone);
541            }
542        }
543
544        scroll_view.render_widget(
545            GraphWrapper { inner: graph },
546            Rect {
547                x: 0,
548                y: 0,
549                width: content_size.width,
550                height: content_size.height,
551            },
552        );
553        scroll_view.render(area, buf, &mut state.nodes_scrollable_state);
554    }
555}
556
557/// A TUI based realtime console for Copper.
558pub struct CuConsoleMon {
559    config: CuConfig,
560    taskids: &'static [&'static str],
561    task_stats: Arc<Mutex<TaskStats>>,
562    task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
563    ui_handle: Option<JoinHandle<()>>,
564    pool_stats: Arc<Mutex<Vec<PoolStats>>>,
565    quitting: Arc<AtomicBool>,
566    topology: Option<MonitorTopology>,
567}
568
569impl Drop for CuConsoleMon {
570    fn drop(&mut self) {
571        self.quitting.store(true, Ordering::SeqCst);
572        let _ = restore_terminal();
573        if let Some(handle) = self.ui_handle.take() {
574            let _ = handle.join();
575        }
576    }
577}
578
579struct UI {
580    task_ids: &'static [&'static str],
581    active_screen: Screen,
582    sysinfo: String,
583    task_stats: Arc<Mutex<TaskStats>>,
584    quitting: Arc<AtomicBool>,
585    nodes_scrollable_widget_state: NodesScrollableWidgetState,
586    #[cfg(feature = "debug_pane")]
587    error_redirect: gag::BufferRedirect,
588    #[cfg(feature = "debug_pane")]
589    debug_output: Option<debug_pane::DebugLog>,
590    pool_stats: Arc<Mutex<Vec<PoolStats>>>,
591}
592
593impl UI {
594    #[cfg(feature = "debug_pane")]
595    #[allow(clippy::too_many_arguments)]
596    fn new(
597        config: CuConfig,
598        mission: Option<&str>,
599        task_ids: &'static [&'static str],
600        task_stats: Arc<Mutex<TaskStats>>,
601        task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
602        quitting: Arc<AtomicBool>,
603        error_redirect: gag::BufferRedirect,
604        debug_output: Option<debug_pane::DebugLog>,
605        pool_stats: Arc<Mutex<Vec<PoolStats>>>,
606        topology: Option<MonitorTopology>,
607    ) -> UI {
608        init_error_hooks();
609        let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
610            &config,
611            task_statuses.clone(),
612            mission,
613            task_ids,
614            topology.clone(),
615        );
616
617        Self {
618            task_ids,
619            active_screen: Screen::Neofetch,
620            sysinfo: sysinfo::pfetch_info(),
621            task_stats,
622            quitting,
623            nodes_scrollable_widget_state,
624            error_redirect,
625            debug_output,
626            pool_stats,
627        }
628    }
629
630    #[cfg(not(feature = "debug_pane"))]
631    fn new(
632        config: CuConfig,
633        task_ids: &'static [&'static str],
634        task_stats: Arc<Mutex<TaskStats>>,
635        task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
636        quitting: Arc<AtomicBool>,
637        pool_stats: Arc<Mutex<Vec<PoolStats>>>,
638        topology: Option<MonitorTopology>,
639    ) -> UI {
640        init_error_hooks();
641        let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
642            &config,
643            task_statuses.clone(),
644            None,
645            task_ids,
646            topology.clone(),
647        );
648
649        Self {
650            task_ids,
651            active_screen: Screen::Neofetch,
652            sysinfo: sysinfo::pfetch_info(),
653            task_stats,
654            quitting,
655            nodes_scrollable_widget_state,
656            pool_stats,
657        }
658    }
659
660    fn draw_latency_table(&self, f: &mut Frame, area: Rect) {
661        let header_cells = [
662            "🛠 Task",
663            "⬇ Min",
664            "⬆ Max",
665            "∅ Mean",
666            "σ Stddev",
667            "⧖∅ Jitter",
668            "⧗⬆ Jitter",
669        ]
670        .iter()
671        .map(|h| {
672            Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
673                Style::default()
674                    .fg(Color::Yellow)
675                    .add_modifier(Modifier::BOLD),
676            )
677        });
678
679        let header = Row::new(header_cells)
680            .style(Style::default().fg(Color::Yellow))
681            .bottom_margin(1)
682            .top_margin(1);
683
684        let task_stats = self.task_stats.lock().unwrap(); // Acquire lock to read task_stats
685        let mut rows = task_stats
686            .stats
687            .iter()
688            .enumerate()
689            .map(|(i, stat)| {
690                let cells = vec![
691                    Cell::from(Line::from(self.task_ids[i]).alignment(Alignment::Right))
692                        .light_blue(),
693                    Cell::from(Line::from(stat.min().to_string()).alignment(Alignment::Right))
694                        .style(Style::default()),
695                    Cell::from(Line::from(stat.max().to_string()).alignment(Alignment::Right))
696                        .style(Style::default()),
697                    Cell::from(Line::from(stat.mean().to_string()).alignment(Alignment::Right))
698                        .style(Style::default()),
699                    Cell::from(Line::from(stat.stddev().to_string()).alignment(Alignment::Right))
700                        .style(Style::default()),
701                    Cell::from(
702                        Line::from(stat.jitter_mean().to_string()).alignment(Alignment::Right),
703                    )
704                    .style(Style::default()),
705                    Cell::from(
706                        Line::from(stat.jitter_max().to_string()).alignment(Alignment::Right),
707                    )
708                    .style(Style::default()),
709                ];
710                Row::new(cells)
711            })
712            .collect::<Vec<Row>>();
713
714        let cells = vec![
715            Cell::from(
716                Line::from("End2End")
717                    .light_red()
718                    .alignment(Alignment::Right),
719            ),
720            Cell::from(
721                Line::from(task_stats.end2end.min().to_string())
722                    .light_red()
723                    .alignment(Alignment::Right),
724            )
725            .style(Style::default()),
726            Cell::from(
727                Line::from(task_stats.end2end.max().to_string())
728                    .light_red()
729                    .alignment(Alignment::Right),
730            )
731            .style(Style::default()),
732            Cell::from(
733                Line::from(task_stats.end2end.mean().to_string())
734                    .light_red()
735                    .alignment(Alignment::Right),
736            )
737            .style(Style::default()),
738            Cell::from(
739                Line::from(task_stats.end2end.stddev().to_string())
740                    .light_red()
741                    .alignment(Alignment::Right),
742            )
743            .style(Style::default()),
744            Cell::from(
745                Line::from(task_stats.end2end.jitter_mean().to_string())
746                    .light_red()
747                    .alignment(Alignment::Right),
748            )
749            .style(Style::default()),
750            Cell::from(
751                Line::from(task_stats.end2end.jitter_max().to_string())
752                    .light_red()
753                    .alignment(Alignment::Right),
754            )
755            .style(Style::default()),
756        ];
757        rows.push(Row::new(cells).top_margin(1));
758
759        let table = Table::new(
760            rows,
761            &[
762                Constraint::Length(10),
763                Constraint::Length(10),
764                Constraint::Length(12),
765                Constraint::Length(12),
766                Constraint::Length(10),
767                Constraint::Length(12),
768                Constraint::Length(13),
769            ],
770        )
771        .header(header)
772        .block(Block::default().borders(Borders::ALL).title(" Latencies "));
773
774        f.render_widget(table, area);
775    }
776
777    fn draw_memory_pools(&self, f: &mut Frame, area: Rect) {
778        let header_cells = [
779            "Pool ID",
780            "Used/Total",
781            "Buffer Size",
782            "Handles in Use",
783            "Handles/sec",
784        ]
785        .iter()
786        .map(|h| {
787            Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
788                Style::default()
789                    .fg(Color::Yellow)
790                    .add_modifier(Modifier::BOLD),
791            )
792        });
793
794        let header = Row::new(header_cells)
795            .style(Style::default().fg(Color::Yellow))
796            .bottom_margin(1);
797
798        let pool_stats = self.pool_stats.lock().unwrap();
799        let rows = pool_stats
800            .iter()
801            .map(|stat| {
802                let used = stat.total_size - stat.space_left;
803                let percent = if stat.total_size > 0 {
804                    100.0 * used as f64 / stat.total_size as f64
805                } else {
806                    0.0
807                };
808                let buffer_size = stat.buffer_size;
809                let mb_unit = 1024.0 * 1024.0;
810
811                let cells = vec![
812                    Cell::from(Line::from(stat.id.to_string()).alignment(Alignment::Right))
813                        .light_blue(),
814                    Cell::from(
815                        Line::from(format!(
816                            "{:.2} MB / {:.2} MB ({:.1}%)",
817                            used as f64 * buffer_size as f64 / mb_unit,
818                            stat.total_size as f64 * buffer_size as f64 / mb_unit,
819                            percent
820                        ))
821                        .alignment(Alignment::Right),
822                    ),
823                    Cell::from(
824                        Line::from(format!("{} KB", stat.buffer_size / 1024))
825                            .alignment(Alignment::Right),
826                    ),
827                    Cell::from(
828                        Line::from(format!("{}", stat.handles_in_use)).alignment(Alignment::Right),
829                    ),
830                    Cell::from(
831                        Line::from(format!("{}/s", stat.handles_per_second))
832                            .alignment(Alignment::Right),
833                    ),
834                ];
835                Row::new(cells)
836            })
837            .collect::<Vec<Row>>();
838
839        let table = Table::new(
840            rows,
841            &[
842                Constraint::Percentage(30),
843                Constraint::Percentage(20),
844                Constraint::Percentage(15),
845                Constraint::Percentage(15),
846                Constraint::Percentage(20),
847            ],
848        )
849        .header(header)
850        .block(
851            Block::default()
852                .borders(Borders::ALL)
853                .title(" Memory Pools "),
854        );
855
856        f.render_widget(table, area);
857    }
858
859    fn draw_nodes(&mut self, f: &mut Frame, space: Rect) {
860        NodesScrollableWidget {
861            _marker: Default::default(),
862        }
863        .render(
864            space,
865            f.buffer_mut(),
866            &mut self.nodes_scrollable_widget_state,
867        )
868    }
869
870    fn draw(&mut self, f: &mut Frame) {
871        let layout = Layout::default()
872            .direction(Direction::Vertical)
873            .constraints(
874                [
875                    Constraint::Length(3), // For the top menu bar
876                    Constraint::Min(0),    // For the main content
877                ]
878                .as_ref(),
879            )
880            .split(f.area());
881
882        let menu = Paragraph::new(MENU_CONTENT)
883            .style(
884                Style::default()
885                    .fg(Color::Yellow)
886                    .add_modifier(Modifier::ITALIC),
887            )
888            .block(Block::default().borders(Borders::BOTTOM));
889        f.render_widget(menu, layout[0]);
890
891        match self.active_screen {
892            Screen::Neofetch => {
893                const VERSION: &str = env!("CARGO_PKG_VERSION");
894                let text: Text = format!("\n   -> Copper v{}\n\n{}\n\n ", VERSION, self.sysinfo)
895                    .into_text()
896                    .unwrap();
897                let p = Paragraph::new::<Text>(text).block(
898                    Block::default()
899                        .title(" System Info ")
900                        .borders(Borders::ALL),
901                );
902                f.render_widget(p, layout[1]);
903            }
904            Screen::Dag => {
905                self.draw_nodes(f, layout[1]);
906            }
907            Screen::Latency => self.draw_latency_table(f, layout[1]),
908            Screen::MemoryPools => self.draw_memory_pools(f, layout[1]),
909            #[cfg(feature = "debug_pane")]
910            Screen::DebugOutput => self.draw_debug_output(f, layout[1]),
911        };
912    }
913
914    fn run_app<B: Backend<Error = io::Error>>(
915        &mut self,
916        terminal: &mut Terminal<B>,
917    ) -> io::Result<()> {
918        loop {
919            if self.quitting.load(Ordering::SeqCst) {
920                break;
921            }
922            #[cfg(feature = "debug_pane")]
923            self.update_debug_output();
924
925            terminal.draw(|f| {
926                self.draw(f);
927            })?;
928
929            if event::poll(Duration::from_millis(50))? {
930                let event = event::read()?;
931
932                match event {
933                    Event::Key(key) => match key.code {
934                        KeyCode::Char('1') => self.active_screen = Screen::Neofetch,
935                        KeyCode::Char('2') => self.active_screen = Screen::Dag,
936                        KeyCode::Char('3') => self.active_screen = Screen::Latency,
937                        KeyCode::Char('4') => self.active_screen = Screen::MemoryPools,
938                        #[cfg(feature = "debug_pane")]
939                        KeyCode::Char('5') => self.active_screen = Screen::DebugOutput,
940                        KeyCode::Char('r') => {
941                            if self.active_screen == Screen::Latency {
942                                self.task_stats.lock().unwrap().reset()
943                            }
944                        }
945                        KeyCode::Char('j') | KeyCode::Down => {
946                            if self.active_screen == Screen::Dag {
947                                for _ in 0..1 {
948                                    self.nodes_scrollable_widget_state
949                                        .nodes_scrollable_state
950                                        .scroll_down();
951                                }
952                            }
953                        }
954                        KeyCode::Char('k') | KeyCode::Up => {
955                            if self.active_screen == Screen::Dag {
956                                for _ in 0..1 {
957                                    self.nodes_scrollable_widget_state
958                                        .nodes_scrollable_state
959                                        .scroll_up();
960                                }
961                            }
962                        }
963                        KeyCode::Char('h') | KeyCode::Left => {
964                            if self.active_screen == Screen::Dag {
965                                for _ in 0..5 {
966                                    self.nodes_scrollable_widget_state
967                                        .nodes_scrollable_state
968                                        .scroll_left();
969                                }
970                            }
971                        }
972                        KeyCode::Char('l') | KeyCode::Right => {
973                            if self.active_screen == Screen::Dag {
974                                for _ in 0..5 {
975                                    self.nodes_scrollable_widget_state
976                                        .nodes_scrollable_state
977                                        .scroll_right();
978                                }
979                            }
980                        }
981                        KeyCode::Char('q') => {
982                            break;
983                        }
984                        _ => {}
985                    },
986
987                    #[cfg(feature = "debug_pane")]
988                    Event::Resize(_columns, rows) => {
989                        if let Some(debug_output) = self.debug_output.as_mut() {
990                            debug_output.max_rows.store(rows, Ordering::SeqCst)
991                        }
992                    }
993                    _ => {}
994                }
995            }
996        }
997        Ok(())
998    }
999}
1000
1001impl CuMonitor for CuConsoleMon {
1002    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
1003    where
1004        Self: Sized,
1005    {
1006        let task_stats = Arc::new(Mutex::new(TaskStats::new(
1007            taskids.len(),
1008            CuDuration::from(Duration::from_secs(5)),
1009        )));
1010
1011        Ok(Self {
1012            config: config.clone(),
1013            taskids,
1014            task_stats,
1015            task_statuses: Arc::new(Mutex::new(vec![TaskStatus::default(); taskids.len()])),
1016            ui_handle: None,
1017            quitting: Arc::new(AtomicBool::new(false)),
1018            pool_stats: Arc::new(Mutex::new(Vec::new())),
1019            topology: None,
1020        })
1021    }
1022    fn set_topology(&mut self, topology: MonitorTopology) {
1023        self.topology = Some(topology);
1024    }
1025
1026    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
1027        if !should_start_ui() {
1028            return Ok(());
1029        }
1030
1031        let config_dup = self.config.clone();
1032        let taskids = self.taskids;
1033
1034        let task_stats_ui = self.task_stats.clone();
1035        let error_states = self.task_statuses.clone();
1036        let pool_stats_ui = self.pool_stats.clone();
1037        let quitting = self.quitting.clone();
1038        let topology = self.topology.clone();
1039
1040        // Start the main UI loop
1041        let handle = thread::spawn(move || {
1042            let backend = CrosstermBackend::new(stdout());
1043            let _terminal_guard = TerminalRestoreGuard;
1044
1045            if let Err(err) = setup_terminal() {
1046                eprintln!("Failed to prepare terminal UI: {err}");
1047                return;
1048            }
1049
1050            let mut terminal = match Terminal::new(backend) {
1051                Ok(terminal) => terminal,
1052                Err(err) => {
1053                    eprintln!("Failed to initialize terminal backend: {err}");
1054                    return;
1055                }
1056            };
1057
1058            #[cfg(feature = "debug_pane")]
1059            {
1060                // redirect stderr, so it doesn't pop in the terminal
1061                let error_redirect = gag::BufferRedirect::stderr().unwrap();
1062
1063                let mut ui = UI::new(
1064                    config_dup,
1065                    None, // FIXME(gbin): Allow somethere an API to get the current mission running
1066                    taskids,
1067                    task_stats_ui,
1068                    error_states,
1069                    quitting.clone(),
1070                    error_redirect,
1071                    None,
1072                    pool_stats_ui,
1073                    topology.clone(),
1074                );
1075
1076                // Override the cu29-log-runtime Log Subscriber
1077                #[cfg(debug_assertions)]
1078                if cu29_log_runtime::EXTRA_TEXT_LOGGER
1079                    .read()
1080                    .unwrap()
1081                    .is_some()
1082                {
1083                    let max_lines = terminal.size().unwrap().height - 5;
1084                    let (debug_log, tx) = debug_pane::DebugLog::new(max_lines);
1085
1086                    let log_subscriber = debug_pane::LogSubscriber::new(tx);
1087
1088                    *cu29_log_runtime::EXTRA_TEXT_LOGGER.write().unwrap() =
1089                        Some(Box::new(log_subscriber) as Box<dyn log::Log>);
1090
1091                    // Set up the terminal again, as there might be some logs which in the console before updating `EXTRA_TEXT_LOGGER`
1092                    if let Err(err) = setup_terminal() {
1093                        eprintln!("Failed to reinitialize terminal after log redirect: {err}");
1094                    }
1095
1096                    ui.debug_output = Some(debug_log);
1097                } else {
1098                    println!("EXTRA_TEXT_LOGGER is none");
1099                }
1100                if let Err(err) = ui.run_app(&mut terminal) {
1101                    let _ = restore_terminal();
1102                    eprintln!("CuConsoleMon UI exited with error: {err}");
1103                    return;
1104                }
1105            }
1106
1107            #[cfg(not(feature = "debug_pane"))]
1108            {
1109                let stderr_gag = gag::Gag::stderr().unwrap();
1110
1111                let mut ui = UI::new(
1112                    config_dup,
1113                    taskids,
1114                    task_stats_ui,
1115                    error_states,
1116                    quitting,
1117                    pool_stats_ui,
1118                    topology,
1119                );
1120                if let Err(err) = ui.run_app(&mut terminal) {
1121                    let _ = restore_terminal();
1122                    eprintln!("CuConsoleMon UI exited with error: {err}");
1123                    return;
1124                }
1125
1126                drop(stderr_gag);
1127            }
1128
1129            quitting.store(true, Ordering::SeqCst);
1130            // restoring the terminal
1131            let _ = restore_terminal();
1132        });
1133
1134        self.ui_handle = Some(handle);
1135        Ok(())
1136    }
1137
1138    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1139        {
1140            let mut task_stats = self.task_stats.lock().unwrap();
1141            task_stats.update(msgs);
1142        }
1143        {
1144            let mut task_statuses = self.task_statuses.lock().unwrap();
1145            for (i, msg) in msgs.iter().enumerate() {
1146                let CuCompactString(status_txt) = &msg.status_txt;
1147                task_statuses[i].status_txt = status_txt.clone();
1148            }
1149        }
1150
1151        // Update pool statistics
1152        {
1153            let pool_stats_data = pool::pools_statistics();
1154            let mut pool_stats = self.pool_stats.lock().unwrap();
1155
1156            // Update existing pools or add new ones
1157            for (id, space_left, total_size, buffer_size) in pool_stats_data {
1158                let id_str = id.to_string();
1159                if let Some(existing) = pool_stats.iter_mut().find(|p| p.id == id_str) {
1160                    existing.update(space_left, total_size);
1161                } else {
1162                    pool_stats.push(PoolStats::new(id_str, space_left, total_size, buffer_size));
1163                }
1164            }
1165        }
1166
1167        if self.quitting.load(Ordering::SeqCst) {
1168            return Err("Exiting...".into());
1169        }
1170        Ok(())
1171    }
1172
1173    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
1174        {
1175            let status = &mut self.task_statuses.lock().unwrap()[taskid];
1176            status.is_error = true;
1177            status.error = error.to_compact_string();
1178        }
1179        match step {
1180            CuTaskState::Start => Decision::Shutdown,
1181            CuTaskState::Preprocess => Decision::Abort,
1182            CuTaskState::Process => Decision::Ignore,
1183            CuTaskState::Postprocess => Decision::Ignore,
1184            CuTaskState::Stop => Decision::Shutdown,
1185        }
1186    }
1187
1188    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
1189        self.quitting.store(true, Ordering::SeqCst);
1190        let _ = restore_terminal();
1191
1192        if let Some(handle) = self.ui_handle.take() {
1193            let _ = handle.join();
1194        }
1195
1196        self.task_stats
1197            .lock()
1198            .unwrap()
1199            .stats
1200            .iter_mut()
1201            .for_each(|s| s.reset());
1202        Ok(())
1203    }
1204}
1205
1206struct TerminalRestoreGuard;
1207
1208impl Drop for TerminalRestoreGuard {
1209    fn drop(&mut self) {
1210        let _ = restore_terminal();
1211    }
1212}
1213
1214fn init_error_hooks() {
1215    static ONCE: OnceLock<()> = OnceLock::new();
1216    if ONCE.get().is_some() {
1217        return;
1218    }
1219
1220    let (_panic_hook, error) = HookBuilder::default().into_hooks();
1221    let error = error.into_eyre_hook();
1222    color_eyre::eyre::set_hook(Box::new(move |e| {
1223        let _ = restore_terminal();
1224        error(e)
1225    }))
1226    .unwrap();
1227    std::panic::set_hook(Box::new(move |info| {
1228        let _ = restore_terminal();
1229        let bt = Backtrace::force_capture();
1230        // stderr may be gagged; print to stdout so the panic is visible.
1231        println!("CuConsoleMon panic: {info}");
1232        println!("Backtrace:\n{bt}");
1233        let _ = stdout().flush();
1234        // Exit immediately so the process doesn't hang after the TUI restores.
1235        process::exit(1);
1236    }));
1237
1238    let _ = ONCE.set(());
1239}
1240
1241fn setup_terminal() -> io::Result<()> {
1242    enable_raw_mode()?;
1243    execute!(stdout(), EnterAlternateScreen, EnableMouseCapture)?;
1244    Ok(())
1245}
1246
1247fn restore_terminal() -> io::Result<()> {
1248    execute!(stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
1249    disable_raw_mode()
1250}
1251
1252fn should_start_ui() -> bool {
1253    if !stdout().is_tty() || !stdin().is_tty() {
1254        return false;
1255    }
1256
1257    #[cfg(unix)]
1258    {
1259        use std::os::unix::io::AsRawFd;
1260        let stdin_fd = stdin().as_raw_fd();
1261        let fg_pgrp = unsafe { libc::tcgetpgrp(stdin_fd) };
1262        if fg_pgrp == -1 {
1263            return false;
1264        }
1265        let pgrp = unsafe { libc::getpgrp() };
1266        if fg_pgrp != pgrp {
1267            return false;
1268        }
1269    }
1270
1271    true
1272}