1#[cfg(feature = "debug_pane")]
2mod debug_pane;
3pub mod sysinfo;
4mod tui_nodes;
5
6use crate::tui_nodes::{Connection, NodeGraph, NodeLayout};
7use ansi_to_tui::IntoText;
8use color_eyre::config::HookBuilder;
9use compact_str::{CompactString, ToCompactString};
10use cu29::clock::{CuDuration, RobotClock};
11use cu29::config::CuConfig;
12use cu29::cutask::CuMsgMetadata;
13use cu29::monitoring::{
14 ComponentKind, CuDurationStatistics, CuMonitor, CuTaskState, Decision, MonitorTopology,
15};
16use cu29::prelude::{CuCompactString, CuTime, pool};
17use cu29::{CuError, CuResult};
18#[cfg(feature = "debug_pane")]
19use debug_pane::UIExt;
20use ratatui::backend::CrosstermBackend;
21use ratatui::buffer::Buffer;
22use ratatui::crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode};
23use ratatui::crossterm::terminal::{
24 EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
25};
26use ratatui::crossterm::tty::IsTty;
27use ratatui::crossterm::{event, execute};
28use ratatui::layout::{Alignment, Constraint, Direction, Layout, Size};
29use ratatui::prelude::{Backend, Rect};
30use ratatui::prelude::{Stylize, Widget};
31use ratatui::style::{Color, Modifier, Style};
32use ratatui::text::{Line, Span, Text};
33use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, StatefulWidget, Table};
34use ratatui::{Frame, Terminal};
35use std::backtrace::Backtrace;
36use std::fmt::{Display, Formatter};
37use std::io::{Write, stdin, stdout};
38use std::marker::PhantomData;
39use std::process;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::{Arc, Mutex, OnceLock};
42use std::thread::JoinHandle;
43use std::time::{Duration, Instant};
44use std::{collections::HashMap, io, thread};
45use tui_widgets::scrollview::{ScrollView, ScrollViewState};
46
47#[cfg(feature = "debug_pane")]
48const MENU_CONTENT: &str = " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [5] Debug Output [q] Quit | Scroll: hjkl or ↑↓←→ ";
49#[cfg(not(feature = "debug_pane"))]
50const MENU_CONTENT: &str =
51 " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [q] Quit | Scroll: hjkl or ↑↓←→ ";
52
53#[derive(PartialEq)]
54enum Screen {
55 Neofetch,
56 Dag,
57 Latency,
58 #[cfg(feature = "debug_pane")]
59 DebugOutput,
60 MemoryPools,
61}
62
63struct TaskStats {
64 stats: Vec<CuDurationStatistics>,
65 end2end: CuDurationStatistics,
66}
67
68impl TaskStats {
69 fn new(num_tasks: usize, max_duration: CuDuration) -> Self {
70 let stats = vec![CuDurationStatistics::new(max_duration); num_tasks];
71 TaskStats {
72 stats,
73 end2end: CuDurationStatistics::new(max_duration),
74 }
75 }
76
77 fn update(&mut self, msgs: &[&CuMsgMetadata]) {
78 for (i, &msg) in msgs.iter().enumerate() {
79 let (before, after) = (
80 msg.process_time.start.unwrap(),
81 msg.process_time.end.unwrap(),
82 );
83 self.stats[i].record(after - before);
84 }
85 self.end2end.record(compute_end_to_end_latency(msgs));
86 }
87
88 fn reset(&mut self) {
89 for s in &mut self.stats {
90 s.reset();
91 }
92 self.end2end.reset();
93 }
94}
95struct PoolStats {
96 id: CompactString,
97 space_left: usize,
98 total_size: usize,
99 buffer_size: usize,
100 handles_in_use: usize,
101 handles_per_second: usize,
102 last_update: Instant,
103 prev_handles_in_use: usize,
104}
105
106impl PoolStats {
107 fn new(
108 id: impl ToCompactString,
109 space_left: usize,
110 total_size: usize,
111 buffer_size: usize,
112 ) -> Self {
113 Self {
114 id: id.to_compact_string(),
115 space_left,
116 total_size,
117 buffer_size,
118 handles_in_use: total_size - space_left,
119 handles_per_second: 0,
120 last_update: Instant::now(),
121 prev_handles_in_use: 0,
122 }
123 }
124
125 fn update(&mut self, space_left: usize, total_size: usize) {
126 let now = Instant::now();
127 let handles_in_use = total_size - space_left;
128 let elapsed = now.duration_since(self.last_update).as_secs_f32();
129
130 if elapsed >= 1.0 {
131 self.handles_per_second =
132 ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
133 self.prev_handles_in_use = self.handles_in_use;
134 self.last_update = now;
135 }
136
137 self.handles_in_use = handles_in_use;
138 self.space_left = space_left;
139 self.total_size = total_size;
140 }
141}
142
143fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
144 let start = msgs.first().map(|m| m.process_time.start);
145 let end = msgs.last().map(|m| m.process_time.end);
146
147 match (start, end) {
148 (Some(s), Some(e)) => match (Option::<CuTime>::from(s), Option::<CuTime>::from(e)) {
149 (Some(s), Some(e)) if e >= s => e - s,
150 (Some(_), Some(_)) => CuDuration::MIN,
151 _ => CuDuration::MIN,
152 },
153 _ => CuDuration::MIN,
154 }
155}
156
157#[derive(Copy, Clone)]
159enum NodeType {
160 Unknown,
161 Source,
162 Sink,
163 Task,
164 Bridge,
165}
166
167impl Display for NodeType {
168 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Unknown => write!(f, "?"),
171 Self::Source => write!(f, "◈"),
172 Self::Task => write!(f, "⚙"),
173 Self::Sink => write!(f, "⭳"),
174 Self::Bridge => write!(f, "⇆"),
175 }
176 }
177}
178
179impl NodeType {
180 fn add_incoming(self) -> NodeType {
181 match self {
182 Self::Unknown => Self::Sink,
183 Self::Source => Self::Task,
184 Self::Sink => Self::Sink,
185 Self::Task => Self::Task,
186 Self::Bridge => Self::Bridge,
187 }
188 }
189
190 fn add_outgoing(self) -> NodeType {
191 match self {
192 Self::Unknown => Self::Source,
193 Self::Source => Self::Source,
194 Self::Sink => Self::Task,
195 Self::Task => Self::Task,
196 Self::Bridge => Self::Bridge,
197 }
198 }
199
200 fn color(self) -> Color {
201 match self {
202 Self::Unknown => Color::Gray,
203 Self::Source => Color::Rgb(255, 191, 0),
204 Self::Sink => Color::Rgb(255, 102, 204),
205 Self::Task => Color::White,
206 Self::Bridge => Color::Rgb(204, 153, 255),
207 }
208 }
209}
210
211#[derive(Default, Clone)]
212struct TaskStatus {
213 is_error: bool,
214 status_txt: CompactString,
215 error: CompactString,
216}
217
218#[derive(Clone)]
219struct DisplayNode {
220 id: String,
221 type_label: String,
222 node_type: NodeType,
223 inputs: Vec<String>,
224 outputs: Vec<String>,
225}
226
227struct NodesScrollableWidgetState {
228 display_nodes: Vec<DisplayNode>,
229 connections: Vec<Connection>,
230 statuses: Arc<Mutex<Vec<TaskStatus>>>,
231 status_index_map: Vec<Option<usize>>,
232 task_count: usize,
233 nodes_scrollable_state: ScrollViewState,
234}
235
236impl NodesScrollableWidgetState {
237 fn new(
238 config: &CuConfig,
239 errors: Arc<Mutex<Vec<TaskStatus>>>,
240 mission: Option<&str>,
241 task_ids: &'static [&'static str],
242 topology: Option<MonitorTopology>,
243 ) -> Self {
244 let topology = topology
245 .or_else(|| cu29::monitoring::build_monitor_topology(config, mission).ok())
246 .unwrap_or_default();
247
248 let mut display_nodes: Vec<DisplayNode> = Vec::new();
249 let mut status_index_map = Vec::new();
250 let mut node_lookup = HashMap::new();
251 let task_index_lookup: HashMap<&str, usize> = task_ids
252 .iter()
253 .enumerate()
254 .map(|(i, id)| (*id, i))
255 .collect();
256
257 for node in topology.nodes.iter() {
258 let node_type = match node.kind {
259 ComponentKind::Bridge => NodeType::Bridge,
260 ComponentKind::Task => {
261 let mut role = NodeType::Unknown;
262 if !node.inputs.is_empty() {
263 role = role.add_incoming();
264 }
265 if !node.outputs.is_empty() {
266 role = role.add_outgoing();
267 }
268 role
269 }
270 };
271
272 display_nodes.push(DisplayNode {
273 id: node.id.clone(),
274 type_label: node
275 .type_name
276 .clone()
277 .unwrap_or_else(|| "unknown".to_string()),
278 node_type,
279 inputs: node.inputs.clone(),
280 outputs: node.outputs.clone(),
281 });
282 let idx = display_nodes.len() - 1;
283 node_lookup.insert(node.id.clone(), idx);
284
285 let status_idx = match node.kind {
286 ComponentKind::Task => task_index_lookup.get(node.id.as_str()).cloned(),
287 ComponentKind::Bridge => None,
288 };
289 status_index_map.push(status_idx);
290 }
291
292 let mut connections: Vec<Connection> = Vec::with_capacity(topology.connections.len());
293 for cnx in topology.connections.iter() {
294 let Some(&src_idx) = node_lookup.get(&cnx.src) else {
295 continue;
296 };
297 let Some(&dst_idx) = node_lookup.get(&cnx.dst) else {
298 continue;
299 };
300 let src_node = &display_nodes[src_idx];
301 let dst_node = &display_nodes[dst_idx];
302 let src_port = cnx
303 .src_port
304 .as_ref()
305 .and_then(|p| src_node.outputs.iter().position(|name| name == p))
306 .unwrap_or(0);
307 let dst_port = cnx
308 .dst_port
309 .as_ref()
310 .and_then(|p| dst_node.inputs.iter().position(|name| name == p))
311 .unwrap_or(0);
312
313 connections.push(Connection::new(
314 src_idx,
315 src_port + NODE_PORT_ROW_OFFSET,
316 dst_idx,
317 dst_port + NODE_PORT_ROW_OFFSET,
318 ));
319 }
320
321 if !display_nodes.is_empty() {
324 let mut from_set = std::collections::HashSet::new();
325 for conn in &connections {
326 from_set.insert(conn.from_node);
327 }
328 if from_set.len() == display_nodes.len() {
329 let root_idx = 0;
330 connections.retain(|c| c.from_node != root_idx);
331 }
332 }
333
334 NodesScrollableWidgetState {
335 display_nodes,
336 connections,
337 nodes_scrollable_state: ScrollViewState::default(),
338 statuses: errors,
339 status_index_map,
340 task_count: task_ids.len(),
341 }
342 }
343}
344
345struct NodesScrollableWidget<'a> {
346 _marker: PhantomData<&'a ()>,
347}
348
349struct GraphWrapper<'a> {
350 inner: NodeGraph<'a>,
351}
352
353impl Widget for GraphWrapper<'_> {
354 fn render(self, area: Rect, buf: &mut Buffer)
355 where
356 Self: Sized,
357 {
358 self.inner.render(area, buf, &mut ())
359 }
360}
361
362const NODE_WIDTH: u16 = 29;
363const NODE_WIDTH_CONTENT: u16 = NODE_WIDTH - 2;
364
365const NODE_HEIGHT: u16 = 5;
366const NODE_META_LINES: usize = 2;
367const NODE_PORT_ROW_OFFSET: usize = NODE_META_LINES;
368
369fn clip_tail(value: &str, max_chars: usize) -> String {
370 if max_chars == 0 {
371 return String::new();
372 }
373 let char_count = value.chars().count();
374 if char_count <= max_chars {
375 return value.to_string();
376 }
377 let skip = char_count.saturating_sub(max_chars);
378 let start = value
379 .char_indices()
380 .nth(skip)
381 .map(|(idx, _)| idx)
382 .unwrap_or(value.len());
383 value[start..].to_string()
384}
385
386#[allow(dead_code)]
387const NODE_HEIGHT_CONTENT: u16 = NODE_HEIGHT - 2;
388const GRAPH_WIDTH_PADDING: u16 = NODE_WIDTH * 2;
389const GRAPH_HEIGHT_PADDING: u16 = NODE_HEIGHT * 4;
390
391impl StatefulWidget for NodesScrollableWidget<'_> {
392 type State = NodesScrollableWidgetState;
393
394 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
395 let build_node_layouts = || {
396 state
397 .display_nodes
398 .iter()
399 .map(|node| {
400 let ports = node.inputs.len().max(node.outputs.len());
401 let content_rows = ports + NODE_PORT_ROW_OFFSET;
402 let height = (content_rows as u16).saturating_add(2).max(NODE_HEIGHT);
403 let mut title_line = Line::default();
404 title_line.spans.push(Span::styled(
405 format!(" {}", node.node_type),
406 Style::default().fg(node.node_type.color()),
407 ));
408 title_line.spans.push(Span::styled(
409 format!(" {} ", node.id),
410 Style::default().fg(Color::White),
411 ));
412 NodeLayout::new((NODE_WIDTH, height)).with_title_line(title_line)
413 })
414 .collect::<Vec<_>>()
415 };
416
417 let node_count = state.display_nodes.len().max(1);
418 let content_width = (node_count as u16)
419 .saturating_mul(NODE_WIDTH + 20)
420 .max(NODE_WIDTH);
421 let max_ports = state
422 .display_nodes
423 .iter()
424 .map(|node| node.inputs.len().max(node.outputs.len()))
425 .max()
426 .unwrap_or_default();
427 let content_height =
429 (((max_ports + NODE_PORT_ROW_OFFSET) as u16) * 12).max(NODE_HEIGHT * 6);
430 let connections = state.connections.clone();
431 let build_graph = |width: u16, height: u16| {
432 NodeGraph::new(
433 build_node_layouts(),
434 connections.clone(),
435 width as usize,
436 height as usize,
437 )
438 };
439 let mut graph = build_graph(content_width, content_height);
440 graph.calculate();
441 let mut content_size = Size::new(content_width, content_height);
442 if state.display_nodes.is_empty() {
443 content_size = Size::new(area.width.max(NODE_WIDTH), area.height.max(NODE_HEIGHT));
444 graph = build_graph(content_size.width, content_size.height);
445 graph.calculate();
446 } else {
447 let bounds = graph.content_bounds();
448 let desired_width = bounds
449 .width
450 .saturating_add(GRAPH_WIDTH_PADDING)
451 .max(NODE_WIDTH);
452 let desired_height = bounds
453 .height
454 .saturating_add(GRAPH_HEIGHT_PADDING)
455 .max(NODE_HEIGHT);
456 if desired_width != content_size.width || desired_height != content_size.height {
457 content_size = Size::new(desired_width, desired_height);
458 graph = build_graph(content_size.width, content_size.height);
459 graph.calculate();
460 }
461 }
462 let mut scroll_view = ScrollView::new(content_size);
463 let zones = graph.split(scroll_view.area());
464
465 {
466 let mut statuses = state.statuses.lock().unwrap();
467 if statuses.len() <= state.task_count {
468 statuses.resize(state.task_count + 1, TaskStatus::default());
469 }
470 for (idx, ea_zone) in zones.into_iter().enumerate() {
471 let fallback_idx = state.task_count;
472 let status_idx = state
473 .status_index_map
474 .get(idx)
475 .and_then(|opt| *opt)
476 .unwrap_or(fallback_idx);
477 let safe_index = if status_idx < statuses.len() {
478 status_idx
479 } else {
480 statuses.len() - 1
481 };
482 let status = &mut statuses[safe_index];
483 let s = &state.display_nodes[idx].type_label;
484 let status_line = if status.is_error {
485 format!("❌ {}", status.error)
486 } else {
487 format!("✓ {}", status.status_txt)
488 };
489
490 let label_width = (NODE_WIDTH_CONTENT as usize).saturating_sub(2);
491 let type_label = clip_tail(s, label_width);
492 let status_text = clip_tail(&status_line, label_width);
493 let base_style = if status.is_error {
494 Style::default().fg(Color::Red)
495 } else {
496 Style::default().fg(Color::Green)
497 };
498 let mut lines: Vec<Line> = Vec::new();
499 lines.push(Line::styled(format!(" {}", type_label), base_style));
500 lines.push(Line::styled(format!(" {}", status_text), base_style));
501
502 let max_ports = state.display_nodes[idx]
503 .inputs
504 .len()
505 .max(state.display_nodes[idx].outputs.len());
506 if max_ports > 0 {
507 let left_width = (NODE_WIDTH_CONTENT as usize - 2) / 2;
508 let right_width = NODE_WIDTH_CONTENT as usize - 2 - left_width;
509 let input_style = Style::default().fg(Color::Yellow);
510 let output_style = Style::default().fg(Color::Cyan);
511 let dotted_style = Style::default().fg(Color::DarkGray);
512 for port_idx in 0..max_ports {
513 let input = state.display_nodes[idx]
514 .inputs
515 .get(port_idx)
516 .map(|label| clip_tail(label, left_width))
517 .unwrap_or_default();
518 let output = state.display_nodes[idx]
519 .outputs
520 .get(port_idx)
521 .map(|label| clip_tail(label, right_width))
522 .unwrap_or_default();
523 let mut port_line = Line::default();
524 port_line.spans.push(Span::styled(
525 format!(" {:<left_width$}", input, left_width = left_width),
526 input_style,
527 ));
528 port_line.spans.push(Span::styled("┆", dotted_style));
529 port_line.spans.push(Span::styled(
530 format!("{:>right_width$}", output, right_width = right_width),
531 output_style,
532 ));
533 lines.push(port_line);
534 }
535 }
536
537 let txt = Text::from(lines);
538 let paragraph = Paragraph::new(txt);
539 status.is_error = false; scroll_view.render_widget(paragraph, ea_zone);
541 }
542 }
543
544 scroll_view.render_widget(
545 GraphWrapper { inner: graph },
546 Rect {
547 x: 0,
548 y: 0,
549 width: content_size.width,
550 height: content_size.height,
551 },
552 );
553 scroll_view.render(area, buf, &mut state.nodes_scrollable_state);
554 }
555}
556
557pub struct CuConsoleMon {
559 config: CuConfig,
560 taskids: &'static [&'static str],
561 task_stats: Arc<Mutex<TaskStats>>,
562 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
563 ui_handle: Option<JoinHandle<()>>,
564 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
565 quitting: Arc<AtomicBool>,
566 topology: Option<MonitorTopology>,
567}
568
569impl Drop for CuConsoleMon {
570 fn drop(&mut self) {
571 self.quitting.store(true, Ordering::SeqCst);
572 let _ = restore_terminal();
573 if let Some(handle) = self.ui_handle.take() {
574 let _ = handle.join();
575 }
576 }
577}
578
579struct UI {
580 task_ids: &'static [&'static str],
581 active_screen: Screen,
582 sysinfo: String,
583 task_stats: Arc<Mutex<TaskStats>>,
584 quitting: Arc<AtomicBool>,
585 nodes_scrollable_widget_state: NodesScrollableWidgetState,
586 #[cfg(feature = "debug_pane")]
587 error_redirect: gag::BufferRedirect,
588 #[cfg(feature = "debug_pane")]
589 debug_output: Option<debug_pane::DebugLog>,
590 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
591}
592
593impl UI {
594 #[cfg(feature = "debug_pane")]
595 #[allow(clippy::too_many_arguments)]
596 fn new(
597 config: CuConfig,
598 mission: Option<&str>,
599 task_ids: &'static [&'static str],
600 task_stats: Arc<Mutex<TaskStats>>,
601 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
602 quitting: Arc<AtomicBool>,
603 error_redirect: gag::BufferRedirect,
604 debug_output: Option<debug_pane::DebugLog>,
605 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
606 topology: Option<MonitorTopology>,
607 ) -> UI {
608 init_error_hooks();
609 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
610 &config,
611 task_statuses.clone(),
612 mission,
613 task_ids,
614 topology.clone(),
615 );
616
617 Self {
618 task_ids,
619 active_screen: Screen::Neofetch,
620 sysinfo: sysinfo::pfetch_info(),
621 task_stats,
622 quitting,
623 nodes_scrollable_widget_state,
624 error_redirect,
625 debug_output,
626 pool_stats,
627 }
628 }
629
630 #[cfg(not(feature = "debug_pane"))]
631 fn new(
632 config: CuConfig,
633 task_ids: &'static [&'static str],
634 task_stats: Arc<Mutex<TaskStats>>,
635 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
636 quitting: Arc<AtomicBool>,
637 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
638 topology: Option<MonitorTopology>,
639 ) -> UI {
640 init_error_hooks();
641 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
642 &config,
643 task_statuses.clone(),
644 None,
645 task_ids,
646 topology.clone(),
647 );
648
649 Self {
650 task_ids,
651 active_screen: Screen::Neofetch,
652 sysinfo: sysinfo::pfetch_info(),
653 task_stats,
654 quitting,
655 nodes_scrollable_widget_state,
656 pool_stats,
657 }
658 }
659
660 fn draw_latency_table(&self, f: &mut Frame, area: Rect) {
661 let header_cells = [
662 "🛠 Task",
663 "⬇ Min",
664 "⬆ Max",
665 "∅ Mean",
666 "σ Stddev",
667 "⧖∅ Jitter",
668 "⧗⬆ Jitter",
669 ]
670 .iter()
671 .map(|h| {
672 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
673 Style::default()
674 .fg(Color::Yellow)
675 .add_modifier(Modifier::BOLD),
676 )
677 });
678
679 let header = Row::new(header_cells)
680 .style(Style::default().fg(Color::Yellow))
681 .bottom_margin(1)
682 .top_margin(1);
683
684 let task_stats = self.task_stats.lock().unwrap(); let mut rows = task_stats
686 .stats
687 .iter()
688 .enumerate()
689 .map(|(i, stat)| {
690 let cells = vec![
691 Cell::from(Line::from(self.task_ids[i]).alignment(Alignment::Right))
692 .light_blue(),
693 Cell::from(Line::from(stat.min().to_string()).alignment(Alignment::Right))
694 .style(Style::default()),
695 Cell::from(Line::from(stat.max().to_string()).alignment(Alignment::Right))
696 .style(Style::default()),
697 Cell::from(Line::from(stat.mean().to_string()).alignment(Alignment::Right))
698 .style(Style::default()),
699 Cell::from(Line::from(stat.stddev().to_string()).alignment(Alignment::Right))
700 .style(Style::default()),
701 Cell::from(
702 Line::from(stat.jitter_mean().to_string()).alignment(Alignment::Right),
703 )
704 .style(Style::default()),
705 Cell::from(
706 Line::from(stat.jitter_max().to_string()).alignment(Alignment::Right),
707 )
708 .style(Style::default()),
709 ];
710 Row::new(cells)
711 })
712 .collect::<Vec<Row>>();
713
714 let cells = vec![
715 Cell::from(
716 Line::from("End2End")
717 .light_red()
718 .alignment(Alignment::Right),
719 ),
720 Cell::from(
721 Line::from(task_stats.end2end.min().to_string())
722 .light_red()
723 .alignment(Alignment::Right),
724 )
725 .style(Style::default()),
726 Cell::from(
727 Line::from(task_stats.end2end.max().to_string())
728 .light_red()
729 .alignment(Alignment::Right),
730 )
731 .style(Style::default()),
732 Cell::from(
733 Line::from(task_stats.end2end.mean().to_string())
734 .light_red()
735 .alignment(Alignment::Right),
736 )
737 .style(Style::default()),
738 Cell::from(
739 Line::from(task_stats.end2end.stddev().to_string())
740 .light_red()
741 .alignment(Alignment::Right),
742 )
743 .style(Style::default()),
744 Cell::from(
745 Line::from(task_stats.end2end.jitter_mean().to_string())
746 .light_red()
747 .alignment(Alignment::Right),
748 )
749 .style(Style::default()),
750 Cell::from(
751 Line::from(task_stats.end2end.jitter_max().to_string())
752 .light_red()
753 .alignment(Alignment::Right),
754 )
755 .style(Style::default()),
756 ];
757 rows.push(Row::new(cells).top_margin(1));
758
759 let table = Table::new(
760 rows,
761 &[
762 Constraint::Length(10),
763 Constraint::Length(10),
764 Constraint::Length(12),
765 Constraint::Length(12),
766 Constraint::Length(10),
767 Constraint::Length(12),
768 Constraint::Length(13),
769 ],
770 )
771 .header(header)
772 .block(Block::default().borders(Borders::ALL).title(" Latencies "));
773
774 f.render_widget(table, area);
775 }
776
777 fn draw_memory_pools(&self, f: &mut Frame, area: Rect) {
778 let header_cells = [
779 "Pool ID",
780 "Used/Total",
781 "Buffer Size",
782 "Handles in Use",
783 "Handles/sec",
784 ]
785 .iter()
786 .map(|h| {
787 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
788 Style::default()
789 .fg(Color::Yellow)
790 .add_modifier(Modifier::BOLD),
791 )
792 });
793
794 let header = Row::new(header_cells)
795 .style(Style::default().fg(Color::Yellow))
796 .bottom_margin(1);
797
798 let pool_stats = self.pool_stats.lock().unwrap();
799 let rows = pool_stats
800 .iter()
801 .map(|stat| {
802 let used = stat.total_size - stat.space_left;
803 let percent = if stat.total_size > 0 {
804 100.0 * used as f64 / stat.total_size as f64
805 } else {
806 0.0
807 };
808 let buffer_size = stat.buffer_size;
809 let mb_unit = 1024.0 * 1024.0;
810
811 let cells = vec![
812 Cell::from(Line::from(stat.id.to_string()).alignment(Alignment::Right))
813 .light_blue(),
814 Cell::from(
815 Line::from(format!(
816 "{:.2} MB / {:.2} MB ({:.1}%)",
817 used as f64 * buffer_size as f64 / mb_unit,
818 stat.total_size as f64 * buffer_size as f64 / mb_unit,
819 percent
820 ))
821 .alignment(Alignment::Right),
822 ),
823 Cell::from(
824 Line::from(format!("{} KB", stat.buffer_size / 1024))
825 .alignment(Alignment::Right),
826 ),
827 Cell::from(
828 Line::from(format!("{}", stat.handles_in_use)).alignment(Alignment::Right),
829 ),
830 Cell::from(
831 Line::from(format!("{}/s", stat.handles_per_second))
832 .alignment(Alignment::Right),
833 ),
834 ];
835 Row::new(cells)
836 })
837 .collect::<Vec<Row>>();
838
839 let table = Table::new(
840 rows,
841 &[
842 Constraint::Percentage(30),
843 Constraint::Percentage(20),
844 Constraint::Percentage(15),
845 Constraint::Percentage(15),
846 Constraint::Percentage(20),
847 ],
848 )
849 .header(header)
850 .block(
851 Block::default()
852 .borders(Borders::ALL)
853 .title(" Memory Pools "),
854 );
855
856 f.render_widget(table, area);
857 }
858
859 fn draw_nodes(&mut self, f: &mut Frame, space: Rect) {
860 NodesScrollableWidget {
861 _marker: Default::default(),
862 }
863 .render(
864 space,
865 f.buffer_mut(),
866 &mut self.nodes_scrollable_widget_state,
867 )
868 }
869
870 fn draw(&mut self, f: &mut Frame) {
871 let layout = Layout::default()
872 .direction(Direction::Vertical)
873 .constraints(
874 [
875 Constraint::Length(3), Constraint::Min(0), ]
878 .as_ref(),
879 )
880 .split(f.area());
881
882 let menu = Paragraph::new(MENU_CONTENT)
883 .style(
884 Style::default()
885 .fg(Color::Yellow)
886 .add_modifier(Modifier::ITALIC),
887 )
888 .block(Block::default().borders(Borders::BOTTOM));
889 f.render_widget(menu, layout[0]);
890
891 match self.active_screen {
892 Screen::Neofetch => {
893 const VERSION: &str = env!("CARGO_PKG_VERSION");
894 let text: Text = format!("\n -> Copper v{}\n\n{}\n\n ", VERSION, self.sysinfo)
895 .into_text()
896 .unwrap();
897 let p = Paragraph::new::<Text>(text).block(
898 Block::default()
899 .title(" System Info ")
900 .borders(Borders::ALL),
901 );
902 f.render_widget(p, layout[1]);
903 }
904 Screen::Dag => {
905 self.draw_nodes(f, layout[1]);
906 }
907 Screen::Latency => self.draw_latency_table(f, layout[1]),
908 Screen::MemoryPools => self.draw_memory_pools(f, layout[1]),
909 #[cfg(feature = "debug_pane")]
910 Screen::DebugOutput => self.draw_debug_output(f, layout[1]),
911 };
912 }
913
914 fn run_app<B: Backend<Error = io::Error>>(
915 &mut self,
916 terminal: &mut Terminal<B>,
917 ) -> io::Result<()> {
918 loop {
919 if self.quitting.load(Ordering::SeqCst) {
920 break;
921 }
922 #[cfg(feature = "debug_pane")]
923 self.update_debug_output();
924
925 terminal.draw(|f| {
926 self.draw(f);
927 })?;
928
929 if event::poll(Duration::from_millis(50))? {
930 let event = event::read()?;
931
932 match event {
933 Event::Key(key) => match key.code {
934 KeyCode::Char('1') => self.active_screen = Screen::Neofetch,
935 KeyCode::Char('2') => self.active_screen = Screen::Dag,
936 KeyCode::Char('3') => self.active_screen = Screen::Latency,
937 KeyCode::Char('4') => self.active_screen = Screen::MemoryPools,
938 #[cfg(feature = "debug_pane")]
939 KeyCode::Char('5') => self.active_screen = Screen::DebugOutput,
940 KeyCode::Char('r') => {
941 if self.active_screen == Screen::Latency {
942 self.task_stats.lock().unwrap().reset()
943 }
944 }
945 KeyCode::Char('j') | KeyCode::Down => {
946 if self.active_screen == Screen::Dag {
947 for _ in 0..1 {
948 self.nodes_scrollable_widget_state
949 .nodes_scrollable_state
950 .scroll_down();
951 }
952 }
953 }
954 KeyCode::Char('k') | KeyCode::Up => {
955 if self.active_screen == Screen::Dag {
956 for _ in 0..1 {
957 self.nodes_scrollable_widget_state
958 .nodes_scrollable_state
959 .scroll_up();
960 }
961 }
962 }
963 KeyCode::Char('h') | KeyCode::Left => {
964 if self.active_screen == Screen::Dag {
965 for _ in 0..5 {
966 self.nodes_scrollable_widget_state
967 .nodes_scrollable_state
968 .scroll_left();
969 }
970 }
971 }
972 KeyCode::Char('l') | KeyCode::Right => {
973 if self.active_screen == Screen::Dag {
974 for _ in 0..5 {
975 self.nodes_scrollable_widget_state
976 .nodes_scrollable_state
977 .scroll_right();
978 }
979 }
980 }
981 KeyCode::Char('q') => {
982 break;
983 }
984 _ => {}
985 },
986
987 #[cfg(feature = "debug_pane")]
988 Event::Resize(_columns, rows) => {
989 if let Some(debug_output) = self.debug_output.as_mut() {
990 debug_output.max_rows.store(rows, Ordering::SeqCst)
991 }
992 }
993 _ => {}
994 }
995 }
996 }
997 Ok(())
998 }
999}
1000
1001impl CuMonitor for CuConsoleMon {
1002 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
1003 where
1004 Self: Sized,
1005 {
1006 let task_stats = Arc::new(Mutex::new(TaskStats::new(
1007 taskids.len(),
1008 CuDuration::from(Duration::from_secs(5)),
1009 )));
1010
1011 Ok(Self {
1012 config: config.clone(),
1013 taskids,
1014 task_stats,
1015 task_statuses: Arc::new(Mutex::new(vec![TaskStatus::default(); taskids.len()])),
1016 ui_handle: None,
1017 quitting: Arc::new(AtomicBool::new(false)),
1018 pool_stats: Arc::new(Mutex::new(Vec::new())),
1019 topology: None,
1020 })
1021 }
1022 fn set_topology(&mut self, topology: MonitorTopology) {
1023 self.topology = Some(topology);
1024 }
1025
1026 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
1027 if !should_start_ui() {
1028 return Ok(());
1029 }
1030
1031 let config_dup = self.config.clone();
1032 let taskids = self.taskids;
1033
1034 let task_stats_ui = self.task_stats.clone();
1035 let error_states = self.task_statuses.clone();
1036 let pool_stats_ui = self.pool_stats.clone();
1037 let quitting = self.quitting.clone();
1038 let topology = self.topology.clone();
1039
1040 let handle = thread::spawn(move || {
1042 let backend = CrosstermBackend::new(stdout());
1043 let _terminal_guard = TerminalRestoreGuard;
1044
1045 if let Err(err) = setup_terminal() {
1046 eprintln!("Failed to prepare terminal UI: {err}");
1047 return;
1048 }
1049
1050 let mut terminal = match Terminal::new(backend) {
1051 Ok(terminal) => terminal,
1052 Err(err) => {
1053 eprintln!("Failed to initialize terminal backend: {err}");
1054 return;
1055 }
1056 };
1057
1058 #[cfg(feature = "debug_pane")]
1059 {
1060 let error_redirect = gag::BufferRedirect::stderr().unwrap();
1062
1063 let mut ui = UI::new(
1064 config_dup,
1065 None, taskids,
1067 task_stats_ui,
1068 error_states,
1069 quitting.clone(),
1070 error_redirect,
1071 None,
1072 pool_stats_ui,
1073 topology.clone(),
1074 );
1075
1076 #[cfg(debug_assertions)]
1078 if cu29_log_runtime::EXTRA_TEXT_LOGGER
1079 .read()
1080 .unwrap()
1081 .is_some()
1082 {
1083 let max_lines = terminal.size().unwrap().height - 5;
1084 let (debug_log, tx) = debug_pane::DebugLog::new(max_lines);
1085
1086 let log_subscriber = debug_pane::LogSubscriber::new(tx);
1087
1088 *cu29_log_runtime::EXTRA_TEXT_LOGGER.write().unwrap() =
1089 Some(Box::new(log_subscriber) as Box<dyn log::Log>);
1090
1091 if let Err(err) = setup_terminal() {
1093 eprintln!("Failed to reinitialize terminal after log redirect: {err}");
1094 }
1095
1096 ui.debug_output = Some(debug_log);
1097 } else {
1098 println!("EXTRA_TEXT_LOGGER is none");
1099 }
1100 if let Err(err) = ui.run_app(&mut terminal) {
1101 let _ = restore_terminal();
1102 eprintln!("CuConsoleMon UI exited with error: {err}");
1103 return;
1104 }
1105 }
1106
1107 #[cfg(not(feature = "debug_pane"))]
1108 {
1109 let stderr_gag = gag::Gag::stderr().unwrap();
1110
1111 let mut ui = UI::new(
1112 config_dup,
1113 taskids,
1114 task_stats_ui,
1115 error_states,
1116 quitting,
1117 pool_stats_ui,
1118 topology,
1119 );
1120 if let Err(err) = ui.run_app(&mut terminal) {
1121 let _ = restore_terminal();
1122 eprintln!("CuConsoleMon UI exited with error: {err}");
1123 return;
1124 }
1125
1126 drop(stderr_gag);
1127 }
1128
1129 quitting.store(true, Ordering::SeqCst);
1130 let _ = restore_terminal();
1132 });
1133
1134 self.ui_handle = Some(handle);
1135 Ok(())
1136 }
1137
1138 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1139 {
1140 let mut task_stats = self.task_stats.lock().unwrap();
1141 task_stats.update(msgs);
1142 }
1143 {
1144 let mut task_statuses = self.task_statuses.lock().unwrap();
1145 for (i, msg) in msgs.iter().enumerate() {
1146 let CuCompactString(status_txt) = &msg.status_txt;
1147 task_statuses[i].status_txt = status_txt.clone();
1148 }
1149 }
1150
1151 {
1153 let pool_stats_data = pool::pools_statistics();
1154 let mut pool_stats = self.pool_stats.lock().unwrap();
1155
1156 for (id, space_left, total_size, buffer_size) in pool_stats_data {
1158 let id_str = id.to_string();
1159 if let Some(existing) = pool_stats.iter_mut().find(|p| p.id == id_str) {
1160 existing.update(space_left, total_size);
1161 } else {
1162 pool_stats.push(PoolStats::new(id_str, space_left, total_size, buffer_size));
1163 }
1164 }
1165 }
1166
1167 if self.quitting.load(Ordering::SeqCst) {
1168 return Err("Exiting...".into());
1169 }
1170 Ok(())
1171 }
1172
1173 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
1174 {
1175 let status = &mut self.task_statuses.lock().unwrap()[taskid];
1176 status.is_error = true;
1177 status.error = error.to_compact_string();
1178 }
1179 match step {
1180 CuTaskState::Start => Decision::Shutdown,
1181 CuTaskState::Preprocess => Decision::Abort,
1182 CuTaskState::Process => Decision::Ignore,
1183 CuTaskState::Postprocess => Decision::Ignore,
1184 CuTaskState::Stop => Decision::Shutdown,
1185 }
1186 }
1187
1188 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
1189 self.quitting.store(true, Ordering::SeqCst);
1190 let _ = restore_terminal();
1191
1192 if let Some(handle) = self.ui_handle.take() {
1193 let _ = handle.join();
1194 }
1195
1196 self.task_stats
1197 .lock()
1198 .unwrap()
1199 .stats
1200 .iter_mut()
1201 .for_each(|s| s.reset());
1202 Ok(())
1203 }
1204}
1205
1206struct TerminalRestoreGuard;
1207
1208impl Drop for TerminalRestoreGuard {
1209 fn drop(&mut self) {
1210 let _ = restore_terminal();
1211 }
1212}
1213
1214fn init_error_hooks() {
1215 static ONCE: OnceLock<()> = OnceLock::new();
1216 if ONCE.get().is_some() {
1217 return;
1218 }
1219
1220 let (_panic_hook, error) = HookBuilder::default().into_hooks();
1221 let error = error.into_eyre_hook();
1222 color_eyre::eyre::set_hook(Box::new(move |e| {
1223 let _ = restore_terminal();
1224 error(e)
1225 }))
1226 .unwrap();
1227 std::panic::set_hook(Box::new(move |info| {
1228 let _ = restore_terminal();
1229 let bt = Backtrace::force_capture();
1230 println!("CuConsoleMon panic: {info}");
1232 println!("Backtrace:\n{bt}");
1233 let _ = stdout().flush();
1234 process::exit(1);
1236 }));
1237
1238 let _ = ONCE.set(());
1239}
1240
1241fn setup_terminal() -> io::Result<()> {
1242 enable_raw_mode()?;
1243 execute!(stdout(), EnterAlternateScreen, EnableMouseCapture)?;
1244 Ok(())
1245}
1246
1247fn restore_terminal() -> io::Result<()> {
1248 execute!(stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
1249 disable_raw_mode()
1250}
1251
1252fn should_start_ui() -> bool {
1253 if !stdout().is_tty() || !stdin().is_tty() {
1254 return false;
1255 }
1256
1257 #[cfg(unix)]
1258 {
1259 use std::os::unix::io::AsRawFd;
1260 let stdin_fd = stdin().as_raw_fd();
1261 let fg_pgrp = unsafe { libc::tcgetpgrp(stdin_fd) };
1262 if fg_pgrp == -1 {
1263 return false;
1264 }
1265 let pgrp = unsafe { libc::getpgrp() };
1266 if fg_pgrp != pgrp {
1267 return false;
1268 }
1269 }
1270
1271 true
1272}