1#[cfg(feature = "debug_pane")]
2mod debug_pane;
3pub mod sysinfo;
4mod tui_nodes;
5
6use crate::tui_nodes::{Connection, NodeGraph, NodeLayout};
7use ansi_to_tui::IntoText;
8use color_eyre::config::HookBuilder;
9use compact_str::{CompactString, ToCompactString};
10use cu29::clock::{CuDuration, RobotClock};
11use cu29::config::CuConfig;
12use cu29::cutask::CuMsgMetadata;
13use cu29::monitoring::{
14 ComponentKind, CuDurationStatistics, CuMonitor, CuTaskState, Decision, MonitorTopology,
15};
16use cu29::prelude::{pool, CuCompactString, CuTime};
17use cu29::{CuError, CuResult};
18#[cfg(feature = "debug_pane")]
19use debug_pane::UIExt;
20use ratatui::backend::CrosstermBackend;
21use ratatui::buffer::Buffer;
22use ratatui::crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode};
23use ratatui::crossterm::terminal::{
24 disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
25};
26use ratatui::crossterm::{event, execute};
27use ratatui::layout::{Alignment, Constraint, Direction, Layout, Size};
28use ratatui::prelude::{Backend, Rect};
29use ratatui::prelude::{Stylize, Widget};
30use ratatui::style::{Color, Modifier, Style};
31use ratatui::text::{Line, Span, Text};
32use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, StatefulWidget, Table};
33use ratatui::{Frame, Terminal};
34use std::fmt::{Display, Formatter};
35use std::io::stdout;
36use std::marker::PhantomData;
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, Mutex, OnceLock};
39use std::thread::JoinHandle;
40use std::time::{Duration, Instant};
41use std::{collections::HashMap, io, panic, thread};
42use tui_widgets::scrollview::{ScrollView, ScrollViewState};
43
44#[cfg(feature = "debug_pane")]
45const MENU_CONTENT: &str =
46 " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [5] Debug Output [q] Quit | Scroll: hjkl or ↑↓←→ ";
47#[cfg(not(feature = "debug_pane"))]
48const MENU_CONTENT: &str =
49 " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [q] Quit | Scroll: hjkl or ↑↓←→ ";
50
51#[derive(PartialEq)]
52enum Screen {
53 Neofetch,
54 Dag,
55 Latency,
56 #[cfg(feature = "debug_pane")]
57 DebugOutput,
58 MemoryPools,
59}
60
61struct TaskStats {
62 stats: Vec<CuDurationStatistics>,
63 end2end: CuDurationStatistics,
64}
65
66impl TaskStats {
67 fn new(num_tasks: usize, max_duration: CuDuration) -> Self {
68 let stats = vec![CuDurationStatistics::new(max_duration); num_tasks];
69 TaskStats {
70 stats,
71 end2end: CuDurationStatistics::new(max_duration),
72 }
73 }
74
75 fn update(&mut self, msgs: &[&CuMsgMetadata]) {
76 for (i, &msg) in msgs.iter().enumerate() {
77 let (before, after) = (
78 msg.process_time.start.unwrap(),
79 msg.process_time.end.unwrap(),
80 );
81 self.stats[i].record(after - before);
82 }
83 self.end2end.record(compute_end_to_end_latency(msgs));
84 }
85
86 fn reset(&mut self) {
87 for s in &mut self.stats {
88 s.reset();
89 }
90 self.end2end.reset();
91 }
92}
93struct PoolStats {
94 id: CompactString,
95 space_left: usize,
96 total_size: usize,
97 buffer_size: usize,
98 handles_in_use: usize,
99 handles_per_second: usize,
100 last_update: Instant,
101 prev_handles_in_use: usize,
102}
103
104impl PoolStats {
105 fn new(
106 id: impl ToCompactString,
107 space_left: usize,
108 total_size: usize,
109 buffer_size: usize,
110 ) -> Self {
111 Self {
112 id: id.to_compact_string(),
113 space_left,
114 total_size,
115 buffer_size,
116 handles_in_use: total_size - space_left,
117 handles_per_second: 0,
118 last_update: Instant::now(),
119 prev_handles_in_use: 0,
120 }
121 }
122
123 fn update(&mut self, space_left: usize, total_size: usize) {
124 let now = Instant::now();
125 let handles_in_use = total_size - space_left;
126 let elapsed = now.duration_since(self.last_update).as_secs_f32();
127
128 if elapsed >= 1.0 {
129 self.handles_per_second =
130 ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
131 self.prev_handles_in_use = self.handles_in_use;
132 self.last_update = now;
133 }
134
135 self.handles_in_use = handles_in_use;
136 self.space_left = space_left;
137 self.total_size = total_size;
138 }
139}
140
141fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
142 let start = msgs.first().map(|m| m.process_time.start);
143 let end = msgs.last().map(|m| m.process_time.end);
144
145 match (start, end) {
146 (Some(s), Some(e)) => match (Option::<CuTime>::from(s), Option::<CuTime>::from(e)) {
147 (Some(s), Some(e)) if e >= s => e - s,
148 (Some(_), Some(_)) => CuDuration::MIN,
149 _ => CuDuration::MIN,
150 },
151 _ => CuDuration::MIN,
152 }
153}
154
155#[derive(Copy, Clone)]
157enum NodeType {
158 Unknown,
159 Source,
160 Sink,
161 Task,
162 Bridge,
163}
164
165impl Display for NodeType {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 match self {
168 Self::Unknown => write!(f, "?"),
169 Self::Source => write!(f, "◈"),
170 Self::Task => write!(f, "⚙"),
171 Self::Sink => write!(f, "⭳"),
172 Self::Bridge => write!(f, "⇆"),
173 }
174 }
175}
176
177impl NodeType {
178 fn add_incoming(self) -> NodeType {
179 match self {
180 Self::Unknown => Self::Sink,
181 Self::Source => Self::Task,
182 Self::Sink => Self::Sink,
183 Self::Task => Self::Task,
184 Self::Bridge => Self::Bridge,
185 }
186 }
187
188 fn add_outgoing(self) -> NodeType {
189 match self {
190 Self::Unknown => Self::Source,
191 Self::Source => Self::Source,
192 Self::Sink => Self::Task,
193 Self::Task => Self::Task,
194 Self::Bridge => Self::Bridge,
195 }
196 }
197
198 fn color(self) -> Color {
199 match self {
200 Self::Unknown => Color::Gray,
201 Self::Source => Color::Rgb(255, 191, 0),
202 Self::Sink => Color::Rgb(255, 102, 204),
203 Self::Task => Color::White,
204 Self::Bridge => Color::Rgb(204, 153, 255),
205 }
206 }
207}
208
209#[derive(Default, Clone)]
210struct TaskStatus {
211 is_error: bool,
212 status_txt: CompactString,
213 error: CompactString,
214}
215
216#[derive(Clone)]
217struct DisplayNode {
218 id: String,
219 type_label: String,
220 node_type: NodeType,
221 inputs: Vec<String>,
222 outputs: Vec<String>,
223}
224
225struct NodesScrollableWidgetState {
226 display_nodes: Vec<DisplayNode>,
227 connections: Vec<Connection>,
228 statuses: Arc<Mutex<Vec<TaskStatus>>>,
229 status_index_map: Vec<Option<usize>>,
230 task_count: usize,
231 nodes_scrollable_state: ScrollViewState,
232}
233
234impl NodesScrollableWidgetState {
235 fn new(
236 config: &CuConfig,
237 errors: Arc<Mutex<Vec<TaskStatus>>>,
238 mission: Option<&str>,
239 task_ids: &'static [&'static str],
240 topology: Option<MonitorTopology>,
241 ) -> Self {
242 let topology = topology
243 .or_else(|| cu29::monitoring::build_monitor_topology(config, mission).ok())
244 .unwrap_or_default();
245
246 let mut display_nodes: Vec<DisplayNode> = Vec::new();
247 let mut status_index_map = Vec::new();
248 let mut node_lookup = HashMap::new();
249 let task_index_lookup: HashMap<&str, usize> = task_ids
250 .iter()
251 .enumerate()
252 .map(|(i, id)| (*id, i))
253 .collect();
254
255 for node in topology.nodes.iter() {
256 let node_type = match node.kind {
257 ComponentKind::Bridge => NodeType::Bridge,
258 ComponentKind::Task => {
259 let mut role = NodeType::Unknown;
260 if !node.inputs.is_empty() {
261 role = role.add_incoming();
262 }
263 if !node.outputs.is_empty() {
264 role = role.add_outgoing();
265 }
266 role
267 }
268 };
269
270 display_nodes.push(DisplayNode {
271 id: node.id.clone(),
272 type_label: node
273 .type_name
274 .clone()
275 .unwrap_or_else(|| "unknown".to_string()),
276 node_type,
277 inputs: node.inputs.clone(),
278 outputs: node.outputs.clone(),
279 });
280 let idx = display_nodes.len() - 1;
281 node_lookup.insert(node.id.clone(), idx);
282
283 let status_idx = match node.kind {
284 ComponentKind::Task => task_index_lookup.get(node.id.as_str()).cloned(),
285 ComponentKind::Bridge => None,
286 };
287 status_index_map.push(status_idx);
288 }
289
290 let mut connections: Vec<Connection> = Vec::with_capacity(topology.connections.len());
291 for cnx in topology.connections.iter() {
292 let Some(&src_idx) = node_lookup.get(&cnx.src) else {
293 continue;
294 };
295 let Some(&dst_idx) = node_lookup.get(&cnx.dst) else {
296 continue;
297 };
298 let src_node = &display_nodes[src_idx];
299 let dst_node = &display_nodes[dst_idx];
300 let src_port = cnx
301 .src_port
302 .as_ref()
303 .and_then(|p| src_node.outputs.iter().position(|name| name == p))
304 .unwrap_or(0);
305 let dst_port = cnx
306 .dst_port
307 .as_ref()
308 .and_then(|p| dst_node.inputs.iter().position(|name| name == p))
309 .unwrap_or(0);
310
311 connections.push(Connection::new(
312 src_idx,
313 src_port + NODE_PORT_ROW_OFFSET,
314 dst_idx,
315 dst_port + NODE_PORT_ROW_OFFSET,
316 ));
317 }
318
319 if !display_nodes.is_empty() {
322 let mut from_set = std::collections::HashSet::new();
323 for conn in &connections {
324 from_set.insert(conn.from_node);
325 }
326 if from_set.len() == display_nodes.len() {
327 let root_idx = 0;
328 connections.retain(|c| c.from_node != root_idx);
329 }
330 }
331
332 NodesScrollableWidgetState {
333 display_nodes,
334 connections,
335 nodes_scrollable_state: ScrollViewState::default(),
336 statuses: errors,
337 status_index_map,
338 task_count: task_ids.len(),
339 }
340 }
341}
342
343struct NodesScrollableWidget<'a> {
344 _marker: PhantomData<&'a ()>,
345}
346
347struct GraphWrapper<'a> {
348 inner: NodeGraph<'a>,
349}
350
351impl Widget for GraphWrapper<'_> {
352 fn render(self, area: Rect, buf: &mut Buffer)
353 where
354 Self: Sized,
355 {
356 self.inner.render(area, buf, &mut ())
357 }
358}
359
360const NODE_WIDTH: u16 = 29;
361const NODE_WIDTH_CONTENT: u16 = NODE_WIDTH - 2;
362
363const NODE_HEIGHT: u16 = 5;
364const NODE_META_LINES: usize = 2;
365const NODE_PORT_ROW_OFFSET: usize = NODE_META_LINES;
366
367fn clip_tail(value: &str, max_chars: usize) -> String {
368 if max_chars == 0 {
369 return String::new();
370 }
371 let char_count = value.chars().count();
372 if char_count <= max_chars {
373 return value.to_string();
374 }
375 let skip = char_count.saturating_sub(max_chars);
376 let start = value
377 .char_indices()
378 .nth(skip)
379 .map(|(idx, _)| idx)
380 .unwrap_or(value.len());
381 value[start..].to_string()
382}
383
384#[allow(dead_code)]
385const NODE_HEIGHT_CONTENT: u16 = NODE_HEIGHT - 2;
386const GRAPH_WIDTH_PADDING: u16 = NODE_WIDTH;
387const GRAPH_HEIGHT_PADDING: u16 = NODE_HEIGHT;
388
389impl StatefulWidget for NodesScrollableWidget<'_> {
390 type State = NodesScrollableWidgetState;
391
392 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
393 let build_node_layouts = || {
394 state
395 .display_nodes
396 .iter()
397 .map(|node| {
398 let ports = node.inputs.len().max(node.outputs.len());
399 let content_rows = ports + NODE_PORT_ROW_OFFSET;
400 let height = (content_rows as u16).saturating_add(2).max(NODE_HEIGHT);
401 let mut title_line = Line::default();
402 title_line.spans.push(Span::styled(
403 format!(" {}", node.node_type),
404 Style::default().fg(node.node_type.color()),
405 ));
406 title_line.spans.push(Span::styled(
407 format!(" {} ", node.id),
408 Style::default().fg(Color::White),
409 ));
410 NodeLayout::new((NODE_WIDTH, height)).with_title_line(title_line)
411 })
412 .collect::<Vec<_>>()
413 };
414
415 let node_count = state.display_nodes.len().max(1);
416 let content_width = (node_count as u16)
417 .saturating_mul(NODE_WIDTH + 12)
418 .max(NODE_WIDTH);
419 let max_ports = state
420 .display_nodes
421 .iter()
422 .map(|node| node.inputs.len().max(node.outputs.len()))
423 .max()
424 .unwrap_or_default();
425 let content_height = (((max_ports + NODE_PORT_ROW_OFFSET) as u16) * 4).max(NODE_HEIGHT);
426 let connections = state.connections.clone();
427 let build_graph = |width: u16, height: u16| {
428 NodeGraph::new(
429 build_node_layouts(),
430 connections.clone(),
431 width as usize,
432 height as usize,
433 )
434 };
435 let mut graph = build_graph(content_width, content_height);
436 graph.calculate();
437 let mut content_size = Size::new(content_width, content_height);
438 if state.display_nodes.is_empty() {
439 content_size = Size::new(area.width.max(NODE_WIDTH), area.height.max(NODE_HEIGHT));
440 graph = build_graph(content_size.width, content_size.height);
441 graph.calculate();
442 } else {
443 let bounds = graph.content_bounds();
444 let desired_width = bounds
445 .width
446 .saturating_add(GRAPH_WIDTH_PADDING)
447 .max(NODE_WIDTH);
448 let desired_height = bounds
449 .height
450 .saturating_add(GRAPH_HEIGHT_PADDING)
451 .max(NODE_HEIGHT);
452 if desired_width != content_size.width || desired_height != content_size.height {
453 content_size = Size::new(desired_width, desired_height);
454 graph = build_graph(content_size.width, content_size.height);
455 graph.calculate();
456 }
457 }
458 let mut scroll_view = ScrollView::new(content_size);
459 let zones = graph.split(scroll_view.area());
460
461 {
462 let mut statuses = state.statuses.lock().unwrap();
463 if statuses.len() <= state.task_count {
464 statuses.resize(state.task_count + 1, TaskStatus::default());
465 }
466 for (idx, ea_zone) in zones.into_iter().enumerate() {
467 let fallback_idx = state.task_count;
468 let status_idx = state
469 .status_index_map
470 .get(idx)
471 .and_then(|opt| *opt)
472 .unwrap_or(fallback_idx);
473 let safe_index = if status_idx < statuses.len() {
474 status_idx
475 } else {
476 statuses.len() - 1
477 };
478 let status = &mut statuses[safe_index];
479 let s = &state.display_nodes[idx].type_label;
480 let status_line = if status.is_error {
481 format!("❌ {}", status.error)
482 } else {
483 format!("✓ {}", status.status_txt)
484 };
485
486 let label_width = (NODE_WIDTH_CONTENT as usize).saturating_sub(2);
487 let type_label = clip_tail(s, label_width);
488 let status_text = clip_tail(&status_line, label_width);
489 let base_style = if status.is_error {
490 Style::default().fg(Color::Red)
491 } else {
492 Style::default().fg(Color::Green)
493 };
494 let mut lines: Vec<Line> = Vec::new();
495 lines.push(Line::styled(format!(" {}", type_label), base_style));
496 lines.push(Line::styled(format!(" {}", status_text), base_style));
497
498 let max_ports = state.display_nodes[idx]
499 .inputs
500 .len()
501 .max(state.display_nodes[idx].outputs.len());
502 if max_ports > 0 {
503 let left_width = (NODE_WIDTH_CONTENT as usize - 2) / 2;
504 let right_width = NODE_WIDTH_CONTENT as usize - 2 - left_width;
505 let input_style = Style::default().fg(Color::Yellow);
506 let output_style = Style::default().fg(Color::Cyan);
507 let dotted_style = Style::default().fg(Color::DarkGray);
508 for port_idx in 0..max_ports {
509 let input = state.display_nodes[idx]
510 .inputs
511 .get(port_idx)
512 .map(|label| clip_tail(label, left_width))
513 .unwrap_or_default();
514 let output = state.display_nodes[idx]
515 .outputs
516 .get(port_idx)
517 .map(|label| clip_tail(label, right_width))
518 .unwrap_or_default();
519 let mut port_line = Line::default();
520 port_line.spans.push(Span::styled(
521 format!(" {:<left_width$}", input, left_width = left_width),
522 input_style,
523 ));
524 port_line.spans.push(Span::styled("┆", dotted_style));
525 port_line.spans.push(Span::styled(
526 format!("{:>right_width$}", output, right_width = right_width),
527 output_style,
528 ));
529 lines.push(port_line);
530 }
531 }
532
533 let txt = Text::from(lines);
534 let paragraph = Paragraph::new(txt);
535 status.is_error = false; scroll_view.render_widget(paragraph, ea_zone);
537 }
538 }
539
540 scroll_view.render_widget(
541 GraphWrapper { inner: graph },
542 Rect {
543 x: 0,
544 y: 0,
545 width: content_size.width,
546 height: content_size.height,
547 },
548 );
549 scroll_view.render(area, buf, &mut state.nodes_scrollable_state);
550 }
551}
552
553pub struct CuConsoleMon {
555 config: CuConfig,
556 taskids: &'static [&'static str],
557 task_stats: Arc<Mutex<TaskStats>>,
558 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
559 ui_handle: Option<JoinHandle<()>>,
560 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
561 quitting: Arc<AtomicBool>,
562 topology: Option<MonitorTopology>,
563}
564
565impl Drop for CuConsoleMon {
566 fn drop(&mut self) {
567 self.quitting.store(true, Ordering::SeqCst);
568 let _ = restore_terminal();
569 if let Some(handle) = self.ui_handle.take() {
570 let _ = handle.join();
571 }
572 }
573}
574
575struct UI {
576 task_ids: &'static [&'static str],
577 active_screen: Screen,
578 sysinfo: String,
579 task_stats: Arc<Mutex<TaskStats>>,
580 quitting: Arc<AtomicBool>,
581 nodes_scrollable_widget_state: NodesScrollableWidgetState,
582 #[cfg(feature = "debug_pane")]
583 error_redirect: gag::BufferRedirect,
584 #[cfg(feature = "debug_pane")]
585 debug_output: Option<debug_pane::DebugLog>,
586 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
587}
588
589impl UI {
590 #[cfg(feature = "debug_pane")]
591 #[allow(clippy::too_many_arguments)]
592 fn new(
593 config: CuConfig,
594 mission: Option<&str>,
595 task_ids: &'static [&'static str],
596 task_stats: Arc<Mutex<TaskStats>>,
597 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
598 quitting: Arc<AtomicBool>,
599 error_redirect: gag::BufferRedirect,
600 debug_output: Option<debug_pane::DebugLog>,
601 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
602 topology: Option<MonitorTopology>,
603 ) -> UI {
604 init_error_hooks();
605 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
606 &config,
607 task_statuses.clone(),
608 mission,
609 task_ids,
610 topology.clone(),
611 );
612
613 Self {
614 task_ids,
615 active_screen: Screen::Neofetch,
616 sysinfo: sysinfo::pfetch_info(),
617 task_stats,
618 quitting,
619 nodes_scrollable_widget_state,
620 error_redirect,
621 debug_output,
622 pool_stats,
623 }
624 }
625
626 #[cfg(not(feature = "debug_pane"))]
627 fn new(
628 config: CuConfig,
629 task_ids: &'static [&'static str],
630 task_stats: Arc<Mutex<TaskStats>>,
631 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
632 quitting: Arc<AtomicBool>,
633 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
634 topology: Option<MonitorTopology>,
635 ) -> UI {
636 init_error_hooks();
637 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
638 &config,
639 task_statuses.clone(),
640 None,
641 task_ids,
642 topology.clone(),
643 );
644
645 Self {
646 task_ids,
647 active_screen: Screen::Neofetch,
648 sysinfo: sysinfo::pfetch_info(),
649 task_stats,
650 quitting,
651 nodes_scrollable_widget_state,
652 pool_stats,
653 }
654 }
655
656 fn draw_latency_table(&self, f: &mut Frame, area: Rect) {
657 let header_cells = [
658 "🛠 Task",
659 "⬇ Min",
660 "⬆ Max",
661 "∅ Mean",
662 "σ Stddev",
663 "⧖∅ Jitter",
664 "⧗⬆ Jitter",
665 ]
666 .iter()
667 .map(|h| {
668 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
669 Style::default()
670 .fg(Color::Yellow)
671 .add_modifier(Modifier::BOLD),
672 )
673 });
674
675 let header = Row::new(header_cells)
676 .style(Style::default().fg(Color::Yellow))
677 .bottom_margin(1)
678 .top_margin(1);
679
680 let task_stats = self.task_stats.lock().unwrap(); let mut rows = task_stats
682 .stats
683 .iter()
684 .enumerate()
685 .map(|(i, stat)| {
686 let cells = vec![
687 Cell::from(Line::from(self.task_ids[i]).alignment(Alignment::Right))
688 .light_blue(),
689 Cell::from(Line::from(stat.min().to_string()).alignment(Alignment::Right))
690 .style(Style::default()),
691 Cell::from(Line::from(stat.max().to_string()).alignment(Alignment::Right))
692 .style(Style::default()),
693 Cell::from(Line::from(stat.mean().to_string()).alignment(Alignment::Right))
694 .style(Style::default()),
695 Cell::from(Line::from(stat.stddev().to_string()).alignment(Alignment::Right))
696 .style(Style::default()),
697 Cell::from(
698 Line::from(stat.jitter_mean().to_string()).alignment(Alignment::Right),
699 )
700 .style(Style::default()),
701 Cell::from(
702 Line::from(stat.jitter_max().to_string()).alignment(Alignment::Right),
703 )
704 .style(Style::default()),
705 ];
706 Row::new(cells)
707 })
708 .collect::<Vec<Row>>();
709
710 let cells = vec![
711 Cell::from(
712 Line::from("End2End")
713 .light_red()
714 .alignment(Alignment::Right),
715 ),
716 Cell::from(
717 Line::from(task_stats.end2end.min().to_string())
718 .light_red()
719 .alignment(Alignment::Right),
720 )
721 .style(Style::default()),
722 Cell::from(
723 Line::from(task_stats.end2end.max().to_string())
724 .light_red()
725 .alignment(Alignment::Right),
726 )
727 .style(Style::default()),
728 Cell::from(
729 Line::from(task_stats.end2end.mean().to_string())
730 .light_red()
731 .alignment(Alignment::Right),
732 )
733 .style(Style::default()),
734 Cell::from(
735 Line::from(task_stats.end2end.stddev().to_string())
736 .light_red()
737 .alignment(Alignment::Right),
738 )
739 .style(Style::default()),
740 Cell::from(
741 Line::from(task_stats.end2end.jitter_mean().to_string())
742 .light_red()
743 .alignment(Alignment::Right),
744 )
745 .style(Style::default()),
746 Cell::from(
747 Line::from(task_stats.end2end.jitter_max().to_string())
748 .light_red()
749 .alignment(Alignment::Right),
750 )
751 .style(Style::default()),
752 ];
753 rows.push(Row::new(cells).top_margin(1));
754
755 let table = Table::new(
756 rows,
757 &[
758 Constraint::Length(10),
759 Constraint::Length(10),
760 Constraint::Length(12),
761 Constraint::Length(12),
762 Constraint::Length(10),
763 Constraint::Length(12),
764 Constraint::Length(13),
765 ],
766 )
767 .header(header)
768 .block(Block::default().borders(Borders::ALL).title(" Latencies "));
769
770 f.render_widget(table, area);
771 }
772
773 fn draw_memory_pools(&self, f: &mut Frame, area: Rect) {
774 let header_cells = [
775 "Pool ID",
776 "Used/Total",
777 "Buffer Size",
778 "Handles in Use",
779 "Handles/sec",
780 ]
781 .iter()
782 .map(|h| {
783 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
784 Style::default()
785 .fg(Color::Yellow)
786 .add_modifier(Modifier::BOLD),
787 )
788 });
789
790 let header = Row::new(header_cells)
791 .style(Style::default().fg(Color::Yellow))
792 .bottom_margin(1);
793
794 let pool_stats = self.pool_stats.lock().unwrap();
795 let rows = pool_stats
796 .iter()
797 .map(|stat| {
798 let used = stat.total_size - stat.space_left;
799 let percent = if stat.total_size > 0 {
800 100.0 * used as f64 / stat.total_size as f64
801 } else {
802 0.0
803 };
804 let buffer_size = stat.buffer_size;
805 let mb_unit = 1024.0 * 1024.0;
806
807 let cells = vec![
808 Cell::from(Line::from(stat.id.to_string()).alignment(Alignment::Right))
809 .light_blue(),
810 Cell::from(
811 Line::from(format!(
812 "{:.2} MB / {:.2} MB ({:.1}%)",
813 used as f64 * buffer_size as f64 / mb_unit,
814 stat.total_size as f64 * buffer_size as f64 / mb_unit,
815 percent
816 ))
817 .alignment(Alignment::Right),
818 ),
819 Cell::from(
820 Line::from(format!("{} KB", stat.buffer_size / 1024))
821 .alignment(Alignment::Right),
822 ),
823 Cell::from(
824 Line::from(format!("{}", stat.handles_in_use)).alignment(Alignment::Right),
825 ),
826 Cell::from(
827 Line::from(format!("{}/s", stat.handles_per_second))
828 .alignment(Alignment::Right),
829 ),
830 ];
831 Row::new(cells)
832 })
833 .collect::<Vec<Row>>();
834
835 let table = Table::new(
836 rows,
837 &[
838 Constraint::Percentage(30),
839 Constraint::Percentage(20),
840 Constraint::Percentage(15),
841 Constraint::Percentage(15),
842 Constraint::Percentage(20),
843 ],
844 )
845 .header(header)
846 .block(
847 Block::default()
848 .borders(Borders::ALL)
849 .title(" Memory Pools "),
850 );
851
852 f.render_widget(table, area);
853 }
854
855 fn draw_nodes(&mut self, f: &mut Frame, space: Rect) {
856 NodesScrollableWidget {
857 _marker: Default::default(),
858 }
859 .render(
860 space,
861 f.buffer_mut(),
862 &mut self.nodes_scrollable_widget_state,
863 )
864 }
865
866 fn draw(&mut self, f: &mut Frame) {
867 let layout = Layout::default()
868 .direction(Direction::Vertical)
869 .constraints(
870 [
871 Constraint::Length(3), Constraint::Min(0), ]
874 .as_ref(),
875 )
876 .split(f.area());
877
878 let menu = Paragraph::new(MENU_CONTENT)
879 .style(
880 Style::default()
881 .fg(Color::Yellow)
882 .add_modifier(Modifier::ITALIC),
883 )
884 .block(Block::default().borders(Borders::BOTTOM));
885 f.render_widget(menu, layout[0]);
886
887 match self.active_screen {
888 Screen::Neofetch => {
889 const VERSION: &str = env!("CARGO_PKG_VERSION");
890 let text: Text = format!("\n -> Copper v{}\n\n{}\n\n ", VERSION, self.sysinfo)
891 .into_text()
892 .unwrap();
893 let p = Paragraph::new::<Text>(text).block(
894 Block::default()
895 .title(" System Info ")
896 .borders(Borders::ALL),
897 );
898 f.render_widget(p, layout[1]);
899 }
900 Screen::Dag => {
901 self.draw_nodes(f, layout[1]);
902 }
903 Screen::Latency => self.draw_latency_table(f, layout[1]),
904 Screen::MemoryPools => self.draw_memory_pools(f, layout[1]),
905 #[cfg(feature = "debug_pane")]
906 Screen::DebugOutput => self.draw_debug_output(f, layout[1]),
907 };
908 }
909
910 fn run_app<B: Backend>(&mut self, terminal: &mut Terminal<B>) -> io::Result<()> {
911 loop {
912 if self.quitting.load(Ordering::SeqCst) {
913 break;
914 }
915 #[cfg(feature = "debug_pane")]
916 self.update_debug_output();
917
918 terminal.draw(|f| {
919 self.draw(f);
920 })?;
921
922 if event::poll(Duration::from_millis(50))? {
923 let event = event::read()?;
924
925 match event {
926 Event::Key(key) => match key.code {
927 KeyCode::Char('1') => self.active_screen = Screen::Neofetch,
928 KeyCode::Char('2') => self.active_screen = Screen::Dag,
929 KeyCode::Char('3') => self.active_screen = Screen::Latency,
930 KeyCode::Char('4') => self.active_screen = Screen::MemoryPools,
931 #[cfg(feature = "debug_pane")]
932 KeyCode::Char('5') => self.active_screen = Screen::DebugOutput,
933 KeyCode::Char('r') => {
934 if self.active_screen == Screen::Latency {
935 self.task_stats.lock().unwrap().reset()
936 }
937 }
938 KeyCode::Char('j') | KeyCode::Down => {
939 if self.active_screen == Screen::Dag {
940 for _ in 0..1 {
941 self.nodes_scrollable_widget_state
942 .nodes_scrollable_state
943 .scroll_down();
944 }
945 }
946 }
947 KeyCode::Char('k') | KeyCode::Up => {
948 if self.active_screen == Screen::Dag {
949 for _ in 0..1 {
950 self.nodes_scrollable_widget_state
951 .nodes_scrollable_state
952 .scroll_up();
953 }
954 }
955 }
956 KeyCode::Char('h') | KeyCode::Left => {
957 if self.active_screen == Screen::Dag {
958 for _ in 0..5 {
959 self.nodes_scrollable_widget_state
960 .nodes_scrollable_state
961 .scroll_left();
962 }
963 }
964 }
965 KeyCode::Char('l') | KeyCode::Right => {
966 if self.active_screen == Screen::Dag {
967 for _ in 0..5 {
968 self.nodes_scrollable_widget_state
969 .nodes_scrollable_state
970 .scroll_right();
971 }
972 }
973 }
974 KeyCode::Char('q') => {
975 break;
976 }
977 _ => {}
978 },
979
980 #[cfg(feature = "debug_pane")]
981 Event::Resize(_columns, rows) => {
982 if let Some(debug_output) = self.debug_output.as_mut() {
983 debug_output.max_rows.store(rows, Ordering::SeqCst)
984 }
985 }
986 _ => {}
987 }
988 }
989 }
990 Ok(())
991 }
992}
993
994impl CuMonitor for CuConsoleMon {
995 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
996 where
997 Self: Sized,
998 {
999 let task_stats = Arc::new(Mutex::new(TaskStats::new(
1000 taskids.len(),
1001 CuDuration::from(Duration::from_secs(5)),
1002 )));
1003
1004 Ok(Self {
1005 config: config.clone(),
1006 taskids,
1007 task_stats,
1008 task_statuses: Arc::new(Mutex::new(vec![TaskStatus::default(); taskids.len()])),
1009 ui_handle: None,
1010 quitting: Arc::new(AtomicBool::new(false)),
1011 pool_stats: Arc::new(Mutex::new(Vec::new())),
1012 topology: None,
1013 })
1014 }
1015 fn set_topology(&mut self, topology: MonitorTopology) {
1016 self.topology = Some(topology);
1017 }
1018
1019 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
1020 let config_dup = self.config.clone();
1021 let taskids = self.taskids;
1022
1023 let task_stats_ui = self.task_stats.clone();
1024 let error_states = self.task_statuses.clone();
1025 let pool_stats_ui = self.pool_stats.clone();
1026 let quitting = self.quitting.clone();
1027 let topology = self.topology.clone();
1028
1029 let handle = thread::spawn(move || {
1031 let backend = CrosstermBackend::new(stdout());
1032 let _terminal_guard = TerminalRestoreGuard;
1033
1034 let prev_hook = panic::take_hook();
1036 panic::set_hook(Box::new(move |info| {
1037 let _ = restore_terminal();
1038 prev_hook(info);
1039 }));
1040
1041 install_signal_handlers(quitting.clone());
1042
1043 if let Err(err) = setup_terminal() {
1044 eprintln!("Failed to prepare terminal UI: {err}");
1045 return;
1046 }
1047
1048 let mut terminal = match Terminal::new(backend) {
1049 Ok(terminal) => terminal,
1050 Err(err) => {
1051 eprintln!("Failed to initialize terminal backend: {err}");
1052 return;
1053 }
1054 };
1055
1056 #[cfg(feature = "debug_pane")]
1057 {
1058 let error_redirect = gag::BufferRedirect::stderr().unwrap();
1060
1061 let mut ui = UI::new(
1062 config_dup,
1063 None, taskids,
1065 task_stats_ui,
1066 error_states,
1067 quitting.clone(),
1068 error_redirect,
1069 None,
1070 pool_stats_ui,
1071 topology.clone(),
1072 );
1073
1074 #[cfg(debug_assertions)]
1076 if cu29_log_runtime::EXTRA_TEXT_LOGGER
1077 .read()
1078 .unwrap()
1079 .is_some()
1080 {
1081 let max_lines = terminal.size().unwrap().height - 5;
1082 let (debug_log, tx) = debug_pane::DebugLog::new(max_lines);
1083
1084 let log_subscriber = debug_pane::LogSubscriber::new(tx);
1085
1086 *cu29_log_runtime::EXTRA_TEXT_LOGGER.write().unwrap() =
1087 Some(Box::new(log_subscriber) as Box<dyn log::Log>);
1088
1089 if let Err(err) = setup_terminal() {
1091 eprintln!("Failed to reinitialize terminal after log redirect: {err}");
1092 }
1093
1094 ui.debug_output = Some(debug_log);
1095 } else {
1096 println!("EXTRA_TEXT_LOGGER is none");
1097 }
1098 ui.run_app(&mut terminal).expect("Failed to run app");
1099 }
1100
1101 #[cfg(not(feature = "debug_pane"))]
1102 {
1103 let stderr_gag = gag::Gag::stderr().unwrap();
1104
1105 let mut ui = UI::new(
1106 config_dup,
1107 taskids,
1108 task_stats_ui,
1109 error_states,
1110 quitting,
1111 pool_stats_ui,
1112 topology,
1113 );
1114 ui.run_app(&mut terminal).expect("Failed to run app");
1115
1116 drop(stderr_gag);
1117 }
1118
1119 quitting.store(true, Ordering::SeqCst);
1120 let _ = restore_terminal();
1122 });
1123
1124 self.ui_handle = Some(handle);
1125 Ok(())
1126 }
1127
1128 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1129 {
1130 let mut task_stats = self.task_stats.lock().unwrap();
1131 task_stats.update(msgs);
1132 }
1133 {
1134 let mut task_statuses = self.task_statuses.lock().unwrap();
1135 for (i, msg) in msgs.iter().enumerate() {
1136 let CuCompactString(status_txt) = &msg.status_txt;
1137 task_statuses[i].status_txt = status_txt.clone();
1138 }
1139 }
1140
1141 {
1143 let pool_stats_data = pool::pools_statistics();
1144 let mut pool_stats = self.pool_stats.lock().unwrap();
1145
1146 for (id, space_left, total_size, buffer_size) in pool_stats_data {
1148 let id_str = id.to_string();
1149 if let Some(existing) = pool_stats.iter_mut().find(|p| p.id == id_str) {
1150 existing.update(space_left, total_size);
1151 } else {
1152 pool_stats.push(PoolStats::new(id_str, space_left, total_size, buffer_size));
1153 }
1154 }
1155 }
1156
1157 if self.quitting.load(Ordering::SeqCst) {
1158 return Err("Exiting...".into());
1159 }
1160 Ok(())
1161 }
1162
1163 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
1164 {
1165 let status = &mut self.task_statuses.lock().unwrap()[taskid];
1166 status.is_error = true;
1167 status.error = error.to_compact_string();
1168 }
1169 match step {
1170 CuTaskState::Start => Decision::Shutdown,
1171 CuTaskState::Preprocess => Decision::Abort,
1172 CuTaskState::Process => Decision::Ignore,
1173 CuTaskState::Postprocess => Decision::Ignore,
1174 CuTaskState::Stop => Decision::Shutdown,
1175 }
1176 }
1177
1178 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
1179 self.quitting.store(true, Ordering::SeqCst);
1180 let _ = restore_terminal();
1181
1182 if let Some(handle) = self.ui_handle.take() {
1183 let _ = handle.join();
1184 }
1185
1186 self.task_stats
1187 .lock()
1188 .unwrap()
1189 .stats
1190 .iter_mut()
1191 .for_each(|s| s.reset());
1192 Ok(())
1193 }
1194}
1195
1196struct TerminalRestoreGuard;
1197
1198impl Drop for TerminalRestoreGuard {
1199 fn drop(&mut self) {
1200 let _ = restore_terminal();
1201 }
1202}
1203
1204fn install_signal_handlers(quitting: Arc<AtomicBool>) {
1205 static SIGNAL_HANDLER: OnceLock<()> = OnceLock::new();
1206
1207 let _ = SIGNAL_HANDLER.get_or_init(|| {
1208 let quitting = quitting.clone();
1209 if let Err(err) = ctrlc::set_handler(move || {
1210 quitting.store(true, Ordering::SeqCst);
1211 let _ = restore_terminal();
1212 std::process::exit(130);
1214 }) {
1215 eprintln!("Failed to install Ctrl-C handler: {err}");
1216 }
1217 });
1218}
1219
1220fn init_error_hooks() {
1221 let (panic, error) = HookBuilder::default().into_hooks();
1222 let panic = panic.into_panic_hook();
1223 let error = error.into_eyre_hook();
1224 color_eyre::eyre::set_hook(Box::new(move |e| {
1225 let _ = restore_terminal();
1226 error(e)
1227 }))
1228 .unwrap();
1229 std::panic::set_hook(Box::new(move |info| {
1230 let _ = restore_terminal();
1231 panic(info)
1232 }));
1233}
1234
1235fn setup_terminal() -> io::Result<()> {
1236 enable_raw_mode()?;
1237 execute!(stdout(), EnterAlternateScreen, EnableMouseCapture)?;
1238 Ok(())
1239}
1240
1241fn restore_terminal() -> io::Result<()> {
1242 execute!(stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
1243 disable_raw_mode()
1244}