cu_consolemon/
lib.rs

1#[cfg(feature = "debug_pane")]
2mod debug_pane;
3pub mod sysinfo;
4mod tui_nodes;
5
6use crate::tui_nodes::{Connection, NodeGraph, NodeLayout};
7use ansi_to_tui::IntoText;
8use color_eyre::config::HookBuilder;
9use compact_str::{CompactString, ToCompactString};
10use cu29::clock::{CuDuration, RobotClock};
11use cu29::config::CuConfig;
12use cu29::cutask::CuMsgMetadata;
13use cu29::monitoring::{
14    ComponentKind, CuDurationStatistics, CuMonitor, CuTaskState, Decision, MonitorTopology,
15};
16use cu29::prelude::{pool, CuCompactString, CuTime};
17use cu29::{CuError, CuResult};
18#[cfg(feature = "debug_pane")]
19use debug_pane::UIExt;
20use ratatui::backend::CrosstermBackend;
21use ratatui::buffer::Buffer;
22use ratatui::crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode};
23use ratatui::crossterm::terminal::{
24    disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
25};
26use ratatui::crossterm::{event, execute};
27use ratatui::layout::{Alignment, Constraint, Direction, Layout, Size};
28use ratatui::prelude::{Backend, Rect};
29use ratatui::prelude::{Stylize, Widget};
30use ratatui::style::{Color, Modifier, Style};
31use ratatui::text::{Line, Span, Text};
32use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, StatefulWidget, Table};
33use ratatui::{Frame, Terminal};
34use std::fmt::{Display, Formatter};
35use std::io::stdout;
36use std::marker::PhantomData;
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, Mutex, OnceLock};
39use std::thread::JoinHandle;
40use std::time::{Duration, Instant};
41use std::{collections::HashMap, io, panic, thread};
42use tui_widgets::scrollview::{ScrollView, ScrollViewState};
43
44#[cfg(feature = "debug_pane")]
45const MENU_CONTENT: &str =
46    "   [1] SysInfo  [2] DAG  [3] Latencies  [4] Memory Pools [5] Debug Output  [q] Quit | Scroll: hjkl or ↑↓←→   ";
47#[cfg(not(feature = "debug_pane"))]
48const MENU_CONTENT: &str =
49    "   [1] SysInfo  [2] DAG  [3] Latencies  [4] Memory Pools [q] Quit | Scroll: hjkl or ↑↓←→   ";
50
51#[derive(PartialEq)]
52enum Screen {
53    Neofetch,
54    Dag,
55    Latency,
56    #[cfg(feature = "debug_pane")]
57    DebugOutput,
58    MemoryPools,
59}
60
61struct TaskStats {
62    stats: Vec<CuDurationStatistics>,
63    end2end: CuDurationStatistics,
64}
65
66impl TaskStats {
67    fn new(num_tasks: usize, max_duration: CuDuration) -> Self {
68        let stats = vec![CuDurationStatistics::new(max_duration); num_tasks];
69        TaskStats {
70            stats,
71            end2end: CuDurationStatistics::new(max_duration),
72        }
73    }
74
75    fn update(&mut self, msgs: &[&CuMsgMetadata]) {
76        for (i, &msg) in msgs.iter().enumerate() {
77            let (before, after) = (
78                msg.process_time.start.unwrap(),
79                msg.process_time.end.unwrap(),
80            );
81            self.stats[i].record(after - before);
82        }
83        self.end2end.record(compute_end_to_end_latency(msgs));
84    }
85
86    fn reset(&mut self) {
87        for s in &mut self.stats {
88            s.reset();
89        }
90        self.end2end.reset();
91    }
92}
93struct PoolStats {
94    id: CompactString,
95    space_left: usize,
96    total_size: usize,
97    buffer_size: usize,
98    handles_in_use: usize,
99    handles_per_second: usize,
100    last_update: Instant,
101    prev_handles_in_use: usize,
102}
103
104impl PoolStats {
105    fn new(
106        id: impl ToCompactString,
107        space_left: usize,
108        total_size: usize,
109        buffer_size: usize,
110    ) -> Self {
111        Self {
112            id: id.to_compact_string(),
113            space_left,
114            total_size,
115            buffer_size,
116            handles_in_use: total_size - space_left,
117            handles_per_second: 0,
118            last_update: Instant::now(),
119            prev_handles_in_use: 0,
120        }
121    }
122
123    fn update(&mut self, space_left: usize, total_size: usize) {
124        let now = Instant::now();
125        let handles_in_use = total_size - space_left;
126        let elapsed = now.duration_since(self.last_update).as_secs_f32();
127
128        if elapsed >= 1.0 {
129            self.handles_per_second =
130                ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
131            self.prev_handles_in_use = self.handles_in_use;
132            self.last_update = now;
133        }
134
135        self.handles_in_use = handles_in_use;
136        self.space_left = space_left;
137        self.total_size = total_size;
138    }
139}
140
141fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
142    let start = msgs.first().map(|m| m.process_time.start);
143    let end = msgs.last().map(|m| m.process_time.end);
144
145    match (start, end) {
146        (Some(s), Some(e)) => match (Option::<CuTime>::from(s), Option::<CuTime>::from(e)) {
147            (Some(s), Some(e)) if e >= s => e - s,
148            (Some(_), Some(_)) => CuDuration::MIN,
149            _ => CuDuration::MIN,
150        },
151        _ => CuDuration::MIN,
152    }
153}
154
155// This is kind of terrible.
156#[derive(Copy, Clone)]
157enum NodeType {
158    Unknown,
159    Source,
160    Sink,
161    Task,
162    Bridge,
163}
164
165impl Display for NodeType {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        match self {
168            Self::Unknown => write!(f, "?"),
169            Self::Source => write!(f, "◈"),
170            Self::Task => write!(f, "⚙"),
171            Self::Sink => write!(f, "⭳"),
172            Self::Bridge => write!(f, "⇆"),
173        }
174    }
175}
176
177impl NodeType {
178    fn add_incoming(self) -> NodeType {
179        match self {
180            Self::Unknown => Self::Sink,
181            Self::Source => Self::Task,
182            Self::Sink => Self::Sink,
183            Self::Task => Self::Task,
184            Self::Bridge => Self::Bridge,
185        }
186    }
187
188    fn add_outgoing(self) -> NodeType {
189        match self {
190            Self::Unknown => Self::Source,
191            Self::Source => Self::Source,
192            Self::Sink => Self::Task,
193            Self::Task => Self::Task,
194            Self::Bridge => Self::Bridge,
195        }
196    }
197
198    fn color(self) -> Color {
199        match self {
200            Self::Unknown => Color::Gray,
201            Self::Source => Color::Rgb(255, 191, 0),
202            Self::Sink => Color::Rgb(255, 102, 204),
203            Self::Task => Color::White,
204            Self::Bridge => Color::Rgb(204, 153, 255),
205        }
206    }
207}
208
209#[derive(Default, Clone)]
210struct TaskStatus {
211    is_error: bool,
212    status_txt: CompactString,
213    error: CompactString,
214}
215
216#[derive(Clone)]
217struct DisplayNode {
218    id: String,
219    type_label: String,
220    node_type: NodeType,
221    inputs: Vec<String>,
222    outputs: Vec<String>,
223}
224
225struct NodesScrollableWidgetState {
226    display_nodes: Vec<DisplayNode>,
227    connections: Vec<Connection>,
228    statuses: Arc<Mutex<Vec<TaskStatus>>>,
229    status_index_map: Vec<Option<usize>>,
230    task_count: usize,
231    nodes_scrollable_state: ScrollViewState,
232}
233
234impl NodesScrollableWidgetState {
235    fn new(
236        config: &CuConfig,
237        errors: Arc<Mutex<Vec<TaskStatus>>>,
238        mission: Option<&str>,
239        task_ids: &'static [&'static str],
240        topology: Option<MonitorTopology>,
241    ) -> Self {
242        let topology = topology
243            .or_else(|| cu29::monitoring::build_monitor_topology(config, mission).ok())
244            .unwrap_or_default();
245
246        let mut display_nodes: Vec<DisplayNode> = Vec::new();
247        let mut status_index_map = Vec::new();
248        let mut node_lookup = HashMap::new();
249        let task_index_lookup: HashMap<&str, usize> = task_ids
250            .iter()
251            .enumerate()
252            .map(|(i, id)| (*id, i))
253            .collect();
254
255        for node in topology.nodes.iter() {
256            let node_type = match node.kind {
257                ComponentKind::Bridge => NodeType::Bridge,
258                ComponentKind::Task => {
259                    let mut role = NodeType::Unknown;
260                    if !node.inputs.is_empty() {
261                        role = role.add_incoming();
262                    }
263                    if !node.outputs.is_empty() {
264                        role = role.add_outgoing();
265                    }
266                    role
267                }
268            };
269
270            display_nodes.push(DisplayNode {
271                id: node.id.clone(),
272                type_label: node
273                    .type_name
274                    .clone()
275                    .unwrap_or_else(|| "unknown".to_string()),
276                node_type,
277                inputs: node.inputs.clone(),
278                outputs: node.outputs.clone(),
279            });
280            let idx = display_nodes.len() - 1;
281            node_lookup.insert(node.id.clone(), idx);
282
283            let status_idx = match node.kind {
284                ComponentKind::Task => task_index_lookup.get(node.id.as_str()).cloned(),
285                ComponentKind::Bridge => None,
286            };
287            status_index_map.push(status_idx);
288        }
289
290        let mut connections: Vec<Connection> = Vec::with_capacity(topology.connections.len());
291        for cnx in topology.connections.iter() {
292            let Some(&src_idx) = node_lookup.get(&cnx.src) else {
293                continue;
294            };
295            let Some(&dst_idx) = node_lookup.get(&cnx.dst) else {
296                continue;
297            };
298            let src_node = &display_nodes[src_idx];
299            let dst_node = &display_nodes[dst_idx];
300            let src_port = cnx
301                .src_port
302                .as_ref()
303                .and_then(|p| src_node.outputs.iter().position(|name| name == p))
304                .unwrap_or(0);
305            let dst_port = cnx
306                .dst_port
307                .as_ref()
308                .and_then(|p| dst_node.inputs.iter().position(|name| name == p))
309                .unwrap_or(0);
310
311            connections.push(Connection::new(
312                src_idx,
313                src_port + NODE_PORT_ROW_OFFSET,
314                dst_idx,
315                dst_port + NODE_PORT_ROW_OFFSET,
316            ));
317        }
318
319        // tui-nodes drops all nodes when every node has an outgoing edge (no roots).
320        // If that happens, drop the outgoing edges for the first node so at least one root exists.
321        if !display_nodes.is_empty() {
322            let mut from_set = std::collections::HashSet::new();
323            for conn in &connections {
324                from_set.insert(conn.from_node);
325            }
326            if from_set.len() == display_nodes.len() {
327                let root_idx = 0;
328                connections.retain(|c| c.from_node != root_idx);
329            }
330        }
331
332        NodesScrollableWidgetState {
333            display_nodes,
334            connections,
335            nodes_scrollable_state: ScrollViewState::default(),
336            statuses: errors,
337            status_index_map,
338            task_count: task_ids.len(),
339        }
340    }
341}
342
343struct NodesScrollableWidget<'a> {
344    _marker: PhantomData<&'a ()>,
345}
346
347struct GraphWrapper<'a> {
348    inner: NodeGraph<'a>,
349}
350
351impl Widget for GraphWrapper<'_> {
352    fn render(self, area: Rect, buf: &mut Buffer)
353    where
354        Self: Sized,
355    {
356        self.inner.render(area, buf, &mut ())
357    }
358}
359
360const NODE_WIDTH: u16 = 29;
361const NODE_WIDTH_CONTENT: u16 = NODE_WIDTH - 2;
362
363const NODE_HEIGHT: u16 = 5;
364const NODE_META_LINES: usize = 2;
365const NODE_PORT_ROW_OFFSET: usize = NODE_META_LINES;
366
367fn clip_tail(value: &str, max_chars: usize) -> String {
368    if max_chars == 0 {
369        return String::new();
370    }
371    let char_count = value.chars().count();
372    if char_count <= max_chars {
373        return value.to_string();
374    }
375    let skip = char_count.saturating_sub(max_chars);
376    let start = value
377        .char_indices()
378        .nth(skip)
379        .map(|(idx, _)| idx)
380        .unwrap_or(value.len());
381    value[start..].to_string()
382}
383
384#[allow(dead_code)]
385const NODE_HEIGHT_CONTENT: u16 = NODE_HEIGHT - 2;
386const GRAPH_WIDTH_PADDING: u16 = NODE_WIDTH;
387const GRAPH_HEIGHT_PADDING: u16 = NODE_HEIGHT;
388
389impl StatefulWidget for NodesScrollableWidget<'_> {
390    type State = NodesScrollableWidgetState;
391
392    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
393        let build_node_layouts = || {
394            state
395                .display_nodes
396                .iter()
397                .map(|node| {
398                    let ports = node.inputs.len().max(node.outputs.len());
399                    let content_rows = ports + NODE_PORT_ROW_OFFSET;
400                    let height = (content_rows as u16).saturating_add(2).max(NODE_HEIGHT);
401                    let mut title_line = Line::default();
402                    title_line.spans.push(Span::styled(
403                        format!(" {}", node.node_type),
404                        Style::default().fg(node.node_type.color()),
405                    ));
406                    title_line.spans.push(Span::styled(
407                        format!(" {} ", node.id),
408                        Style::default().fg(Color::White),
409                    ));
410                    NodeLayout::new((NODE_WIDTH, height)).with_title_line(title_line)
411                })
412                .collect::<Vec<_>>()
413        };
414
415        let node_count = state.display_nodes.len().max(1);
416        let content_width = (node_count as u16)
417            .saturating_mul(NODE_WIDTH + 12)
418            .max(NODE_WIDTH);
419        let max_ports = state
420            .display_nodes
421            .iter()
422            .map(|node| node.inputs.len().max(node.outputs.len()))
423            .max()
424            .unwrap_or_default();
425        let content_height = (((max_ports + NODE_PORT_ROW_OFFSET) as u16) * 4).max(NODE_HEIGHT);
426        let connections = state.connections.clone();
427        let build_graph = |width: u16, height: u16| {
428            NodeGraph::new(
429                build_node_layouts(),
430                connections.clone(),
431                width as usize,
432                height as usize,
433            )
434        };
435        let mut graph = build_graph(content_width, content_height);
436        graph.calculate();
437        let mut content_size = Size::new(content_width, content_height);
438        if state.display_nodes.is_empty() {
439            content_size = Size::new(area.width.max(NODE_WIDTH), area.height.max(NODE_HEIGHT));
440            graph = build_graph(content_size.width, content_size.height);
441            graph.calculate();
442        } else {
443            let bounds = graph.content_bounds();
444            let desired_width = bounds
445                .width
446                .saturating_add(GRAPH_WIDTH_PADDING)
447                .max(NODE_WIDTH);
448            let desired_height = bounds
449                .height
450                .saturating_add(GRAPH_HEIGHT_PADDING)
451                .max(NODE_HEIGHT);
452            if desired_width != content_size.width || desired_height != content_size.height {
453                content_size = Size::new(desired_width, desired_height);
454                graph = build_graph(content_size.width, content_size.height);
455                graph.calculate();
456            }
457        }
458        let mut scroll_view = ScrollView::new(content_size);
459        let zones = graph.split(scroll_view.area());
460
461        {
462            let mut statuses = state.statuses.lock().unwrap();
463            if statuses.len() <= state.task_count {
464                statuses.resize(state.task_count + 1, TaskStatus::default());
465            }
466            for (idx, ea_zone) in zones.into_iter().enumerate() {
467                let fallback_idx = state.task_count;
468                let status_idx = state
469                    .status_index_map
470                    .get(idx)
471                    .and_then(|opt| *opt)
472                    .unwrap_or(fallback_idx);
473                let safe_index = if status_idx < statuses.len() {
474                    status_idx
475                } else {
476                    statuses.len() - 1
477                };
478                let status = &mut statuses[safe_index];
479                let s = &state.display_nodes[idx].type_label;
480                let status_line = if status.is_error {
481                    format!("❌ {}", status.error)
482                } else {
483                    format!("✓ {}", status.status_txt)
484                };
485
486                let label_width = (NODE_WIDTH_CONTENT as usize).saturating_sub(2);
487                let type_label = clip_tail(s, label_width);
488                let status_text = clip_tail(&status_line, label_width);
489                let base_style = if status.is_error {
490                    Style::default().fg(Color::Red)
491                } else {
492                    Style::default().fg(Color::Green)
493                };
494                let mut lines: Vec<Line> = Vec::new();
495                lines.push(Line::styled(format!(" {}", type_label), base_style));
496                lines.push(Line::styled(format!(" {}", status_text), base_style));
497
498                let max_ports = state.display_nodes[idx]
499                    .inputs
500                    .len()
501                    .max(state.display_nodes[idx].outputs.len());
502                if max_ports > 0 {
503                    let left_width = (NODE_WIDTH_CONTENT as usize - 2) / 2;
504                    let right_width = NODE_WIDTH_CONTENT as usize - 2 - left_width;
505                    let input_style = Style::default().fg(Color::Yellow);
506                    let output_style = Style::default().fg(Color::Cyan);
507                    let dotted_style = Style::default().fg(Color::DarkGray);
508                    for port_idx in 0..max_ports {
509                        let input = state.display_nodes[idx]
510                            .inputs
511                            .get(port_idx)
512                            .map(|label| clip_tail(label, left_width))
513                            .unwrap_or_default();
514                        let output = state.display_nodes[idx]
515                            .outputs
516                            .get(port_idx)
517                            .map(|label| clip_tail(label, right_width))
518                            .unwrap_or_default();
519                        let mut port_line = Line::default();
520                        port_line.spans.push(Span::styled(
521                            format!(" {:<left_width$}", input, left_width = left_width),
522                            input_style,
523                        ));
524                        port_line.spans.push(Span::styled("┆", dotted_style));
525                        port_line.spans.push(Span::styled(
526                            format!("{:>right_width$}", output, right_width = right_width),
527                            output_style,
528                        ));
529                        lines.push(port_line);
530                    }
531                }
532
533                let txt = Text::from(lines);
534                let paragraph = Paragraph::new(txt);
535                status.is_error = false; // reset if it was displayed
536                scroll_view.render_widget(paragraph, ea_zone);
537            }
538        }
539
540        scroll_view.render_widget(
541            GraphWrapper { inner: graph },
542            Rect {
543                x: 0,
544                y: 0,
545                width: content_size.width,
546                height: content_size.height,
547            },
548        );
549        scroll_view.render(area, buf, &mut state.nodes_scrollable_state);
550    }
551}
552
553/// A TUI based realtime console for Copper.
554pub struct CuConsoleMon {
555    config: CuConfig,
556    taskids: &'static [&'static str],
557    task_stats: Arc<Mutex<TaskStats>>,
558    task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
559    ui_handle: Option<JoinHandle<()>>,
560    pool_stats: Arc<Mutex<Vec<PoolStats>>>,
561    quitting: Arc<AtomicBool>,
562    topology: Option<MonitorTopology>,
563}
564
565impl Drop for CuConsoleMon {
566    fn drop(&mut self) {
567        self.quitting.store(true, Ordering::SeqCst);
568        let _ = restore_terminal();
569        if let Some(handle) = self.ui_handle.take() {
570            let _ = handle.join();
571        }
572    }
573}
574
575struct UI {
576    task_ids: &'static [&'static str],
577    active_screen: Screen,
578    sysinfo: String,
579    task_stats: Arc<Mutex<TaskStats>>,
580    quitting: Arc<AtomicBool>,
581    nodes_scrollable_widget_state: NodesScrollableWidgetState,
582    #[cfg(feature = "debug_pane")]
583    error_redirect: gag::BufferRedirect,
584    #[cfg(feature = "debug_pane")]
585    debug_output: Option<debug_pane::DebugLog>,
586    pool_stats: Arc<Mutex<Vec<PoolStats>>>,
587}
588
589impl UI {
590    #[cfg(feature = "debug_pane")]
591    #[allow(clippy::too_many_arguments)]
592    fn new(
593        config: CuConfig,
594        mission: Option<&str>,
595        task_ids: &'static [&'static str],
596        task_stats: Arc<Mutex<TaskStats>>,
597        task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
598        quitting: Arc<AtomicBool>,
599        error_redirect: gag::BufferRedirect,
600        debug_output: Option<debug_pane::DebugLog>,
601        pool_stats: Arc<Mutex<Vec<PoolStats>>>,
602        topology: Option<MonitorTopology>,
603    ) -> UI {
604        init_error_hooks();
605        let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
606            &config,
607            task_statuses.clone(),
608            mission,
609            task_ids,
610            topology.clone(),
611        );
612
613        Self {
614            task_ids,
615            active_screen: Screen::Neofetch,
616            sysinfo: sysinfo::pfetch_info(),
617            task_stats,
618            quitting,
619            nodes_scrollable_widget_state,
620            error_redirect,
621            debug_output,
622            pool_stats,
623        }
624    }
625
626    #[cfg(not(feature = "debug_pane"))]
627    fn new(
628        config: CuConfig,
629        task_ids: &'static [&'static str],
630        task_stats: Arc<Mutex<TaskStats>>,
631        task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
632        quitting: Arc<AtomicBool>,
633        pool_stats: Arc<Mutex<Vec<PoolStats>>>,
634        topology: Option<MonitorTopology>,
635    ) -> UI {
636        init_error_hooks();
637        let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
638            &config,
639            task_statuses.clone(),
640            None,
641            task_ids,
642            topology.clone(),
643        );
644
645        Self {
646            task_ids,
647            active_screen: Screen::Neofetch,
648            sysinfo: sysinfo::pfetch_info(),
649            task_stats,
650            quitting,
651            nodes_scrollable_widget_state,
652            pool_stats,
653        }
654    }
655
656    fn draw_latency_table(&self, f: &mut Frame, area: Rect) {
657        let header_cells = [
658            "🛠 Task",
659            "⬇ Min",
660            "⬆ Max",
661            "∅ Mean",
662            "σ Stddev",
663            "⧖∅ Jitter",
664            "⧗⬆ Jitter",
665        ]
666        .iter()
667        .map(|h| {
668            Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
669                Style::default()
670                    .fg(Color::Yellow)
671                    .add_modifier(Modifier::BOLD),
672            )
673        });
674
675        let header = Row::new(header_cells)
676            .style(Style::default().fg(Color::Yellow))
677            .bottom_margin(1)
678            .top_margin(1);
679
680        let task_stats = self.task_stats.lock().unwrap(); // Acquire lock to read task_stats
681        let mut rows = task_stats
682            .stats
683            .iter()
684            .enumerate()
685            .map(|(i, stat)| {
686                let cells = vec![
687                    Cell::from(Line::from(self.task_ids[i]).alignment(Alignment::Right))
688                        .light_blue(),
689                    Cell::from(Line::from(stat.min().to_string()).alignment(Alignment::Right))
690                        .style(Style::default()),
691                    Cell::from(Line::from(stat.max().to_string()).alignment(Alignment::Right))
692                        .style(Style::default()),
693                    Cell::from(Line::from(stat.mean().to_string()).alignment(Alignment::Right))
694                        .style(Style::default()),
695                    Cell::from(Line::from(stat.stddev().to_string()).alignment(Alignment::Right))
696                        .style(Style::default()),
697                    Cell::from(
698                        Line::from(stat.jitter_mean().to_string()).alignment(Alignment::Right),
699                    )
700                    .style(Style::default()),
701                    Cell::from(
702                        Line::from(stat.jitter_max().to_string()).alignment(Alignment::Right),
703                    )
704                    .style(Style::default()),
705                ];
706                Row::new(cells)
707            })
708            .collect::<Vec<Row>>();
709
710        let cells = vec![
711            Cell::from(
712                Line::from("End2End")
713                    .light_red()
714                    .alignment(Alignment::Right),
715            ),
716            Cell::from(
717                Line::from(task_stats.end2end.min().to_string())
718                    .light_red()
719                    .alignment(Alignment::Right),
720            )
721            .style(Style::default()),
722            Cell::from(
723                Line::from(task_stats.end2end.max().to_string())
724                    .light_red()
725                    .alignment(Alignment::Right),
726            )
727            .style(Style::default()),
728            Cell::from(
729                Line::from(task_stats.end2end.mean().to_string())
730                    .light_red()
731                    .alignment(Alignment::Right),
732            )
733            .style(Style::default()),
734            Cell::from(
735                Line::from(task_stats.end2end.stddev().to_string())
736                    .light_red()
737                    .alignment(Alignment::Right),
738            )
739            .style(Style::default()),
740            Cell::from(
741                Line::from(task_stats.end2end.jitter_mean().to_string())
742                    .light_red()
743                    .alignment(Alignment::Right),
744            )
745            .style(Style::default()),
746            Cell::from(
747                Line::from(task_stats.end2end.jitter_max().to_string())
748                    .light_red()
749                    .alignment(Alignment::Right),
750            )
751            .style(Style::default()),
752        ];
753        rows.push(Row::new(cells).top_margin(1));
754
755        let table = Table::new(
756            rows,
757            &[
758                Constraint::Length(10),
759                Constraint::Length(10),
760                Constraint::Length(12),
761                Constraint::Length(12),
762                Constraint::Length(10),
763                Constraint::Length(12),
764                Constraint::Length(13),
765            ],
766        )
767        .header(header)
768        .block(Block::default().borders(Borders::ALL).title(" Latencies "));
769
770        f.render_widget(table, area);
771    }
772
773    fn draw_memory_pools(&self, f: &mut Frame, area: Rect) {
774        let header_cells = [
775            "Pool ID",
776            "Used/Total",
777            "Buffer Size",
778            "Handles in Use",
779            "Handles/sec",
780        ]
781        .iter()
782        .map(|h| {
783            Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
784                Style::default()
785                    .fg(Color::Yellow)
786                    .add_modifier(Modifier::BOLD),
787            )
788        });
789
790        let header = Row::new(header_cells)
791            .style(Style::default().fg(Color::Yellow))
792            .bottom_margin(1);
793
794        let pool_stats = self.pool_stats.lock().unwrap();
795        let rows = pool_stats
796            .iter()
797            .map(|stat| {
798                let used = stat.total_size - stat.space_left;
799                let percent = if stat.total_size > 0 {
800                    100.0 * used as f64 / stat.total_size as f64
801                } else {
802                    0.0
803                };
804                let buffer_size = stat.buffer_size;
805                let mb_unit = 1024.0 * 1024.0;
806
807                let cells = vec![
808                    Cell::from(Line::from(stat.id.to_string()).alignment(Alignment::Right))
809                        .light_blue(),
810                    Cell::from(
811                        Line::from(format!(
812                            "{:.2} MB / {:.2} MB ({:.1}%)",
813                            used as f64 * buffer_size as f64 / mb_unit,
814                            stat.total_size as f64 * buffer_size as f64 / mb_unit,
815                            percent
816                        ))
817                        .alignment(Alignment::Right),
818                    ),
819                    Cell::from(
820                        Line::from(format!("{} KB", stat.buffer_size / 1024))
821                            .alignment(Alignment::Right),
822                    ),
823                    Cell::from(
824                        Line::from(format!("{}", stat.handles_in_use)).alignment(Alignment::Right),
825                    ),
826                    Cell::from(
827                        Line::from(format!("{}/s", stat.handles_per_second))
828                            .alignment(Alignment::Right),
829                    ),
830                ];
831                Row::new(cells)
832            })
833            .collect::<Vec<Row>>();
834
835        let table = Table::new(
836            rows,
837            &[
838                Constraint::Percentage(30),
839                Constraint::Percentage(20),
840                Constraint::Percentage(15),
841                Constraint::Percentage(15),
842                Constraint::Percentage(20),
843            ],
844        )
845        .header(header)
846        .block(
847            Block::default()
848                .borders(Borders::ALL)
849                .title(" Memory Pools "),
850        );
851
852        f.render_widget(table, area);
853    }
854
855    fn draw_nodes(&mut self, f: &mut Frame, space: Rect) {
856        NodesScrollableWidget {
857            _marker: Default::default(),
858        }
859        .render(
860            space,
861            f.buffer_mut(),
862            &mut self.nodes_scrollable_widget_state,
863        )
864    }
865
866    fn draw(&mut self, f: &mut Frame) {
867        let layout = Layout::default()
868            .direction(Direction::Vertical)
869            .constraints(
870                [
871                    Constraint::Length(3), // For the top menu bar
872                    Constraint::Min(0),    // For the main content
873                ]
874                .as_ref(),
875            )
876            .split(f.area());
877
878        let menu = Paragraph::new(MENU_CONTENT)
879            .style(
880                Style::default()
881                    .fg(Color::Yellow)
882                    .add_modifier(Modifier::ITALIC),
883            )
884            .block(Block::default().borders(Borders::BOTTOM));
885        f.render_widget(menu, layout[0]);
886
887        match self.active_screen {
888            Screen::Neofetch => {
889                const VERSION: &str = env!("CARGO_PKG_VERSION");
890                let text: Text = format!("\n   -> Copper v{}\n\n{}\n\n ", VERSION, self.sysinfo)
891                    .into_text()
892                    .unwrap();
893                let p = Paragraph::new::<Text>(text).block(
894                    Block::default()
895                        .title(" System Info ")
896                        .borders(Borders::ALL),
897                );
898                f.render_widget(p, layout[1]);
899            }
900            Screen::Dag => {
901                self.draw_nodes(f, layout[1]);
902            }
903            Screen::Latency => self.draw_latency_table(f, layout[1]),
904            Screen::MemoryPools => self.draw_memory_pools(f, layout[1]),
905            #[cfg(feature = "debug_pane")]
906            Screen::DebugOutput => self.draw_debug_output(f, layout[1]),
907        };
908    }
909
910    fn run_app<B: Backend>(&mut self, terminal: &mut Terminal<B>) -> io::Result<()> {
911        loop {
912            if self.quitting.load(Ordering::SeqCst) {
913                break;
914            }
915            #[cfg(feature = "debug_pane")]
916            self.update_debug_output();
917
918            terminal.draw(|f| {
919                self.draw(f);
920            })?;
921
922            if event::poll(Duration::from_millis(50))? {
923                let event = event::read()?;
924
925                match event {
926                    Event::Key(key) => match key.code {
927                        KeyCode::Char('1') => self.active_screen = Screen::Neofetch,
928                        KeyCode::Char('2') => self.active_screen = Screen::Dag,
929                        KeyCode::Char('3') => self.active_screen = Screen::Latency,
930                        KeyCode::Char('4') => self.active_screen = Screen::MemoryPools,
931                        #[cfg(feature = "debug_pane")]
932                        KeyCode::Char('5') => self.active_screen = Screen::DebugOutput,
933                        KeyCode::Char('r') => {
934                            if self.active_screen == Screen::Latency {
935                                self.task_stats.lock().unwrap().reset()
936                            }
937                        }
938                        KeyCode::Char('j') | KeyCode::Down => {
939                            if self.active_screen == Screen::Dag {
940                                for _ in 0..1 {
941                                    self.nodes_scrollable_widget_state
942                                        .nodes_scrollable_state
943                                        .scroll_down();
944                                }
945                            }
946                        }
947                        KeyCode::Char('k') | KeyCode::Up => {
948                            if self.active_screen == Screen::Dag {
949                                for _ in 0..1 {
950                                    self.nodes_scrollable_widget_state
951                                        .nodes_scrollable_state
952                                        .scroll_up();
953                                }
954                            }
955                        }
956                        KeyCode::Char('h') | KeyCode::Left => {
957                            if self.active_screen == Screen::Dag {
958                                for _ in 0..5 {
959                                    self.nodes_scrollable_widget_state
960                                        .nodes_scrollable_state
961                                        .scroll_left();
962                                }
963                            }
964                        }
965                        KeyCode::Char('l') | KeyCode::Right => {
966                            if self.active_screen == Screen::Dag {
967                                for _ in 0..5 {
968                                    self.nodes_scrollable_widget_state
969                                        .nodes_scrollable_state
970                                        .scroll_right();
971                                }
972                            }
973                        }
974                        KeyCode::Char('q') => {
975                            break;
976                        }
977                        _ => {}
978                    },
979
980                    #[cfg(feature = "debug_pane")]
981                    Event::Resize(_columns, rows) => {
982                        if let Some(debug_output) = self.debug_output.as_mut() {
983                            debug_output.max_rows.store(rows, Ordering::SeqCst)
984                        }
985                    }
986                    _ => {}
987                }
988            }
989        }
990        Ok(())
991    }
992}
993
994impl CuMonitor for CuConsoleMon {
995    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
996    where
997        Self: Sized,
998    {
999        let task_stats = Arc::new(Mutex::new(TaskStats::new(
1000            taskids.len(),
1001            CuDuration::from(Duration::from_secs(5)),
1002        )));
1003
1004        Ok(Self {
1005            config: config.clone(),
1006            taskids,
1007            task_stats,
1008            task_statuses: Arc::new(Mutex::new(vec![TaskStatus::default(); taskids.len()])),
1009            ui_handle: None,
1010            quitting: Arc::new(AtomicBool::new(false)),
1011            pool_stats: Arc::new(Mutex::new(Vec::new())),
1012            topology: None,
1013        })
1014    }
1015    fn set_topology(&mut self, topology: MonitorTopology) {
1016        self.topology = Some(topology);
1017    }
1018
1019    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
1020        let config_dup = self.config.clone();
1021        let taskids = self.taskids;
1022
1023        let task_stats_ui = self.task_stats.clone();
1024        let error_states = self.task_statuses.clone();
1025        let pool_stats_ui = self.pool_stats.clone();
1026        let quitting = self.quitting.clone();
1027        let topology = self.topology.clone();
1028
1029        // Start the main UI loop
1030        let handle = thread::spawn(move || {
1031            let backend = CrosstermBackend::new(stdout());
1032            let _terminal_guard = TerminalRestoreGuard;
1033
1034            // Ensure the terminal is restored if a panic occurs while the TUI is active.
1035            let prev_hook = panic::take_hook();
1036            panic::set_hook(Box::new(move |info| {
1037                let _ = restore_terminal();
1038                prev_hook(info);
1039            }));
1040
1041            install_signal_handlers(quitting.clone());
1042
1043            if let Err(err) = setup_terminal() {
1044                eprintln!("Failed to prepare terminal UI: {err}");
1045                return;
1046            }
1047
1048            let mut terminal = match Terminal::new(backend) {
1049                Ok(terminal) => terminal,
1050                Err(err) => {
1051                    eprintln!("Failed to initialize terminal backend: {err}");
1052                    return;
1053                }
1054            };
1055
1056            #[cfg(feature = "debug_pane")]
1057            {
1058                // redirect stderr, so it doesn't pop in the terminal
1059                let error_redirect = gag::BufferRedirect::stderr().unwrap();
1060
1061                let mut ui = UI::new(
1062                    config_dup,
1063                    None, // FIXME(gbin): Allow somethere an API to get the current mission running
1064                    taskids,
1065                    task_stats_ui,
1066                    error_states,
1067                    quitting.clone(),
1068                    error_redirect,
1069                    None,
1070                    pool_stats_ui,
1071                    topology.clone(),
1072                );
1073
1074                // Override the cu29-log-runtime Log Subscriber
1075                #[cfg(debug_assertions)]
1076                if cu29_log_runtime::EXTRA_TEXT_LOGGER
1077                    .read()
1078                    .unwrap()
1079                    .is_some()
1080                {
1081                    let max_lines = terminal.size().unwrap().height - 5;
1082                    let (debug_log, tx) = debug_pane::DebugLog::new(max_lines);
1083
1084                    let log_subscriber = debug_pane::LogSubscriber::new(tx);
1085
1086                    *cu29_log_runtime::EXTRA_TEXT_LOGGER.write().unwrap() =
1087                        Some(Box::new(log_subscriber) as Box<dyn log::Log>);
1088
1089                    // Set up the terminal again, as there might be some logs which in the console before updating `EXTRA_TEXT_LOGGER`
1090                    if let Err(err) = setup_terminal() {
1091                        eprintln!("Failed to reinitialize terminal after log redirect: {err}");
1092                    }
1093
1094                    ui.debug_output = Some(debug_log);
1095                } else {
1096                    println!("EXTRA_TEXT_LOGGER is none");
1097                }
1098                ui.run_app(&mut terminal).expect("Failed to run app");
1099            }
1100
1101            #[cfg(not(feature = "debug_pane"))]
1102            {
1103                let stderr_gag = gag::Gag::stderr().unwrap();
1104
1105                let mut ui = UI::new(
1106                    config_dup,
1107                    taskids,
1108                    task_stats_ui,
1109                    error_states,
1110                    quitting,
1111                    pool_stats_ui,
1112                    topology,
1113                );
1114                ui.run_app(&mut terminal).expect("Failed to run app");
1115
1116                drop(stderr_gag);
1117            }
1118
1119            quitting.store(true, Ordering::SeqCst);
1120            // restoring the terminal
1121            let _ = restore_terminal();
1122        });
1123
1124        self.ui_handle = Some(handle);
1125        Ok(())
1126    }
1127
1128    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1129        {
1130            let mut task_stats = self.task_stats.lock().unwrap();
1131            task_stats.update(msgs);
1132        }
1133        {
1134            let mut task_statuses = self.task_statuses.lock().unwrap();
1135            for (i, msg) in msgs.iter().enumerate() {
1136                let CuCompactString(status_txt) = &msg.status_txt;
1137                task_statuses[i].status_txt = status_txt.clone();
1138            }
1139        }
1140
1141        // Update pool statistics
1142        {
1143            let pool_stats_data = pool::pools_statistics();
1144            let mut pool_stats = self.pool_stats.lock().unwrap();
1145
1146            // Update existing pools or add new ones
1147            for (id, space_left, total_size, buffer_size) in pool_stats_data {
1148                let id_str = id.to_string();
1149                if let Some(existing) = pool_stats.iter_mut().find(|p| p.id == id_str) {
1150                    existing.update(space_left, total_size);
1151                } else {
1152                    pool_stats.push(PoolStats::new(id_str, space_left, total_size, buffer_size));
1153                }
1154            }
1155        }
1156
1157        if self.quitting.load(Ordering::SeqCst) {
1158            return Err("Exiting...".into());
1159        }
1160        Ok(())
1161    }
1162
1163    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
1164        {
1165            let status = &mut self.task_statuses.lock().unwrap()[taskid];
1166            status.is_error = true;
1167            status.error = error.to_compact_string();
1168        }
1169        match step {
1170            CuTaskState::Start => Decision::Shutdown,
1171            CuTaskState::Preprocess => Decision::Abort,
1172            CuTaskState::Process => Decision::Ignore,
1173            CuTaskState::Postprocess => Decision::Ignore,
1174            CuTaskState::Stop => Decision::Shutdown,
1175        }
1176    }
1177
1178    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
1179        self.quitting.store(true, Ordering::SeqCst);
1180        let _ = restore_terminal();
1181
1182        if let Some(handle) = self.ui_handle.take() {
1183            let _ = handle.join();
1184        }
1185
1186        self.task_stats
1187            .lock()
1188            .unwrap()
1189            .stats
1190            .iter_mut()
1191            .for_each(|s| s.reset());
1192        Ok(())
1193    }
1194}
1195
1196struct TerminalRestoreGuard;
1197
1198impl Drop for TerminalRestoreGuard {
1199    fn drop(&mut self) {
1200        let _ = restore_terminal();
1201    }
1202}
1203
1204fn install_signal_handlers(quitting: Arc<AtomicBool>) {
1205    static SIGNAL_HANDLER: OnceLock<()> = OnceLock::new();
1206
1207    let _ = SIGNAL_HANDLER.get_or_init(|| {
1208        let quitting = quitting.clone();
1209        if let Err(err) = ctrlc::set_handler(move || {
1210            quitting.store(true, Ordering::SeqCst);
1211            let _ = restore_terminal();
1212            // Match the conventional 130 exit code for Ctrl-C.
1213            std::process::exit(130);
1214        }) {
1215            eprintln!("Failed to install Ctrl-C handler: {err}");
1216        }
1217    });
1218}
1219
1220fn init_error_hooks() {
1221    let (panic, error) = HookBuilder::default().into_hooks();
1222    let panic = panic.into_panic_hook();
1223    let error = error.into_eyre_hook();
1224    color_eyre::eyre::set_hook(Box::new(move |e| {
1225        let _ = restore_terminal();
1226        error(e)
1227    }))
1228    .unwrap();
1229    std::panic::set_hook(Box::new(move |info| {
1230        let _ = restore_terminal();
1231        panic(info)
1232    }));
1233}
1234
1235fn setup_terminal() -> io::Result<()> {
1236    enable_raw_mode()?;
1237    execute!(stdout(), EnterAlternateScreen, EnableMouseCapture)?;
1238    Ok(())
1239}
1240
1241fn restore_terminal() -> io::Result<()> {
1242    execute!(stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
1243    disable_raw_mode()
1244}