1#[cfg(feature = "debug_pane")]
2mod debug_pane;
3pub mod sysinfo;
4mod tui_nodes;
5
6use cu29::prelude::ctrlc;
7
8use crate::tui_nodes::{Connection, NodeGraph, NodeLayout};
9use ansi_to_tui::IntoText;
10use color_eyre::config::HookBuilder;
11use compact_str::{CompactString, ToCompactString};
12use cu29::clock::{CuDuration, RobotClock};
13use cu29::config::CuConfig;
14use cu29::cutask::CuMsgMetadata;
15use cu29::monitoring::{
16 ComponentKind, CuDurationStatistics, CuMonitor, CuTaskState, Decision, MonitorTopology,
17};
18use cu29::prelude::{pool, CuCompactString, CuTime};
19use cu29::{CuError, CuResult};
20#[cfg(feature = "debug_pane")]
21use debug_pane::UIExt;
22use ratatui::backend::CrosstermBackend;
23use ratatui::buffer::Buffer;
24use ratatui::crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode};
25use ratatui::crossterm::terminal::{
26 disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
27};
28use ratatui::crossterm::{event, execute};
29use ratatui::layout::{Alignment, Constraint, Direction, Layout, Size};
30use ratatui::prelude::{Backend, Rect};
31use ratatui::prelude::{Stylize, Widget};
32use ratatui::style::{Color, Modifier, Style};
33use ratatui::text::{Line, Span, Text};
34use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, StatefulWidget, Table};
35use ratatui::{Frame, Terminal};
36use std::fmt::{Display, Formatter};
37use std::io::stdout;
38use std::marker::PhantomData;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::{Arc, Mutex, OnceLock};
41use std::thread::JoinHandle;
42use std::time::{Duration, Instant};
43use std::{collections::HashMap, io, panic, thread};
44use tui_widgets::scrollview::{ScrollView, ScrollViewState};
45
46#[cfg(feature = "debug_pane")]
47const MENU_CONTENT: &str =
48 " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [5] Debug Output [q] Quit | Scroll: hjkl or ↑↓←→ ";
49#[cfg(not(feature = "debug_pane"))]
50const MENU_CONTENT: &str =
51 " [1] SysInfo [2] DAG [3] Latencies [4] Memory Pools [q] Quit | Scroll: hjkl or ↑↓←→ ";
52
53#[derive(PartialEq)]
54enum Screen {
55 Neofetch,
56 Dag,
57 Latency,
58 #[cfg(feature = "debug_pane")]
59 DebugOutput,
60 MemoryPools,
61}
62
63struct TaskStats {
64 stats: Vec<CuDurationStatistics>,
65 end2end: CuDurationStatistics,
66}
67
68impl TaskStats {
69 fn new(num_tasks: usize, max_duration: CuDuration) -> Self {
70 let stats = vec![CuDurationStatistics::new(max_duration); num_tasks];
71 TaskStats {
72 stats,
73 end2end: CuDurationStatistics::new(max_duration),
74 }
75 }
76
77 fn update(&mut self, msgs: &[&CuMsgMetadata]) {
78 for (i, &msg) in msgs.iter().enumerate() {
79 let (before, after) = (
80 msg.process_time.start.unwrap(),
81 msg.process_time.end.unwrap(),
82 );
83 self.stats[i].record(after - before);
84 }
85 self.end2end.record(compute_end_to_end_latency(msgs));
86 }
87
88 fn reset(&mut self) {
89 for s in &mut self.stats {
90 s.reset();
91 }
92 self.end2end.reset();
93 }
94}
95struct PoolStats {
96 id: CompactString,
97 space_left: usize,
98 total_size: usize,
99 buffer_size: usize,
100 handles_in_use: usize,
101 handles_per_second: usize,
102 last_update: Instant,
103 prev_handles_in_use: usize,
104}
105
106impl PoolStats {
107 fn new(
108 id: impl ToCompactString,
109 space_left: usize,
110 total_size: usize,
111 buffer_size: usize,
112 ) -> Self {
113 Self {
114 id: id.to_compact_string(),
115 space_left,
116 total_size,
117 buffer_size,
118 handles_in_use: total_size - space_left,
119 handles_per_second: 0,
120 last_update: Instant::now(),
121 prev_handles_in_use: 0,
122 }
123 }
124
125 fn update(&mut self, space_left: usize, total_size: usize) {
126 let now = Instant::now();
127 let handles_in_use = total_size - space_left;
128 let elapsed = now.duration_since(self.last_update).as_secs_f32();
129
130 if elapsed >= 1.0 {
131 self.handles_per_second =
132 ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
133 self.prev_handles_in_use = self.handles_in_use;
134 self.last_update = now;
135 }
136
137 self.handles_in_use = handles_in_use;
138 self.space_left = space_left;
139 self.total_size = total_size;
140 }
141}
142
143fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
144 let start = msgs.first().map(|m| m.process_time.start);
145 let end = msgs.last().map(|m| m.process_time.end);
146
147 match (start, end) {
148 (Some(s), Some(e)) => match (Option::<CuTime>::from(s), Option::<CuTime>::from(e)) {
149 (Some(s), Some(e)) if e >= s => e - s,
150 (Some(_), Some(_)) => CuDuration::MIN,
151 _ => CuDuration::MIN,
152 },
153 _ => CuDuration::MIN,
154 }
155}
156
157#[derive(Copy, Clone)]
159enum NodeType {
160 Unknown,
161 Source,
162 Sink,
163 Task,
164 Bridge,
165}
166
167impl Display for NodeType {
168 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Unknown => write!(f, "?"),
171 Self::Source => write!(f, "◈"),
172 Self::Task => write!(f, "⚙"),
173 Self::Sink => write!(f, "⭳"),
174 Self::Bridge => write!(f, "⇆"),
175 }
176 }
177}
178
179impl NodeType {
180 fn add_incoming(self) -> NodeType {
181 match self {
182 Self::Unknown => Self::Sink,
183 Self::Source => Self::Task,
184 Self::Sink => Self::Sink,
185 Self::Task => Self::Task,
186 Self::Bridge => Self::Bridge,
187 }
188 }
189
190 fn add_outgoing(self) -> NodeType {
191 match self {
192 Self::Unknown => Self::Source,
193 Self::Source => Self::Source,
194 Self::Sink => Self::Task,
195 Self::Task => Self::Task,
196 Self::Bridge => Self::Bridge,
197 }
198 }
199
200 fn color(self) -> Color {
201 match self {
202 Self::Unknown => Color::Gray,
203 Self::Source => Color::Rgb(255, 191, 0),
204 Self::Sink => Color::Rgb(255, 102, 204),
205 Self::Task => Color::White,
206 Self::Bridge => Color::Rgb(204, 153, 255),
207 }
208 }
209}
210
211#[derive(Default, Clone)]
212struct TaskStatus {
213 is_error: bool,
214 status_txt: CompactString,
215 error: CompactString,
216}
217
218#[derive(Clone)]
219struct DisplayNode {
220 id: String,
221 type_label: String,
222 node_type: NodeType,
223 inputs: Vec<String>,
224 outputs: Vec<String>,
225}
226
227struct NodesScrollableWidgetState {
228 display_nodes: Vec<DisplayNode>,
229 connections: Vec<Connection>,
230 statuses: Arc<Mutex<Vec<TaskStatus>>>,
231 status_index_map: Vec<Option<usize>>,
232 task_count: usize,
233 nodes_scrollable_state: ScrollViewState,
234}
235
236impl NodesScrollableWidgetState {
237 fn new(
238 config: &CuConfig,
239 errors: Arc<Mutex<Vec<TaskStatus>>>,
240 mission: Option<&str>,
241 task_ids: &'static [&'static str],
242 topology: Option<MonitorTopology>,
243 ) -> Self {
244 let topology = topology
245 .or_else(|| cu29::monitoring::build_monitor_topology(config, mission).ok())
246 .unwrap_or_default();
247
248 let mut display_nodes: Vec<DisplayNode> = Vec::new();
249 let mut status_index_map = Vec::new();
250 let mut node_lookup = HashMap::new();
251 let task_index_lookup: HashMap<&str, usize> = task_ids
252 .iter()
253 .enumerate()
254 .map(|(i, id)| (*id, i))
255 .collect();
256
257 for node in topology.nodes.iter() {
258 let node_type = match node.kind {
259 ComponentKind::Bridge => NodeType::Bridge,
260 ComponentKind::Task => {
261 let mut role = NodeType::Unknown;
262 if !node.inputs.is_empty() {
263 role = role.add_incoming();
264 }
265 if !node.outputs.is_empty() {
266 role = role.add_outgoing();
267 }
268 role
269 }
270 };
271
272 display_nodes.push(DisplayNode {
273 id: node.id.clone(),
274 type_label: node
275 .type_name
276 .clone()
277 .unwrap_or_else(|| "unknown".to_string()),
278 node_type,
279 inputs: node.inputs.clone(),
280 outputs: node.outputs.clone(),
281 });
282 let idx = display_nodes.len() - 1;
283 node_lookup.insert(node.id.clone(), idx);
284
285 let status_idx = match node.kind {
286 ComponentKind::Task => task_index_lookup.get(node.id.as_str()).cloned(),
287 ComponentKind::Bridge => None,
288 };
289 status_index_map.push(status_idx);
290 }
291
292 let mut connections: Vec<Connection> = Vec::with_capacity(topology.connections.len());
293 for cnx in topology.connections.iter() {
294 let Some(&src_idx) = node_lookup.get(&cnx.src) else {
295 continue;
296 };
297 let Some(&dst_idx) = node_lookup.get(&cnx.dst) else {
298 continue;
299 };
300 let src_node = &display_nodes[src_idx];
301 let dst_node = &display_nodes[dst_idx];
302 let src_port = cnx
303 .src_port
304 .as_ref()
305 .and_then(|p| src_node.outputs.iter().position(|name| name == p))
306 .unwrap_or(0);
307 let dst_port = cnx
308 .dst_port
309 .as_ref()
310 .and_then(|p| dst_node.inputs.iter().position(|name| name == p))
311 .unwrap_or(0);
312
313 connections.push(Connection::new(
314 src_idx,
315 src_port + NODE_PORT_ROW_OFFSET,
316 dst_idx,
317 dst_port + NODE_PORT_ROW_OFFSET,
318 ));
319 }
320
321 if !display_nodes.is_empty() {
324 let mut from_set = std::collections::HashSet::new();
325 for conn in &connections {
326 from_set.insert(conn.from_node);
327 }
328 if from_set.len() == display_nodes.len() {
329 let root_idx = 0;
330 connections.retain(|c| c.from_node != root_idx);
331 }
332 }
333
334 NodesScrollableWidgetState {
335 display_nodes,
336 connections,
337 nodes_scrollable_state: ScrollViewState::default(),
338 statuses: errors,
339 status_index_map,
340 task_count: task_ids.len(),
341 }
342 }
343}
344
345struct NodesScrollableWidget<'a> {
346 _marker: PhantomData<&'a ()>,
347}
348
349struct GraphWrapper<'a> {
350 inner: NodeGraph<'a>,
351}
352
353impl Widget for GraphWrapper<'_> {
354 fn render(self, area: Rect, buf: &mut Buffer)
355 where
356 Self: Sized,
357 {
358 self.inner.render(area, buf, &mut ())
359 }
360}
361
362const NODE_WIDTH: u16 = 29;
363const NODE_WIDTH_CONTENT: u16 = NODE_WIDTH - 2;
364
365const NODE_HEIGHT: u16 = 5;
366const NODE_META_LINES: usize = 2;
367const NODE_PORT_ROW_OFFSET: usize = NODE_META_LINES;
368
369fn clip_tail(value: &str, max_chars: usize) -> String {
370 if max_chars == 0 {
371 return String::new();
372 }
373 let char_count = value.chars().count();
374 if char_count <= max_chars {
375 return value.to_string();
376 }
377 let skip = char_count.saturating_sub(max_chars);
378 let start = value
379 .char_indices()
380 .nth(skip)
381 .map(|(idx, _)| idx)
382 .unwrap_or(value.len());
383 value[start..].to_string()
384}
385
386#[allow(dead_code)]
387const NODE_HEIGHT_CONTENT: u16 = NODE_HEIGHT - 2;
388const GRAPH_WIDTH_PADDING: u16 = NODE_WIDTH;
389const GRAPH_HEIGHT_PADDING: u16 = NODE_HEIGHT;
390
391impl StatefulWidget for NodesScrollableWidget<'_> {
392 type State = NodesScrollableWidgetState;
393
394 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
395 let build_node_layouts = || {
396 state
397 .display_nodes
398 .iter()
399 .map(|node| {
400 let ports = node.inputs.len().max(node.outputs.len());
401 let content_rows = ports + NODE_PORT_ROW_OFFSET;
402 let height = (content_rows as u16).saturating_add(2).max(NODE_HEIGHT);
403 let mut title_line = Line::default();
404 title_line.spans.push(Span::styled(
405 format!(" {}", node.node_type),
406 Style::default().fg(node.node_type.color()),
407 ));
408 title_line.spans.push(Span::styled(
409 format!(" {} ", node.id),
410 Style::default().fg(Color::White),
411 ));
412 NodeLayout::new((NODE_WIDTH, height)).with_title_line(title_line)
413 })
414 .collect::<Vec<_>>()
415 };
416
417 let node_count = state.display_nodes.len().max(1);
418 let content_width = (node_count as u16)
419 .saturating_mul(NODE_WIDTH + 12)
420 .max(NODE_WIDTH);
421 let max_ports = state
422 .display_nodes
423 .iter()
424 .map(|node| node.inputs.len().max(node.outputs.len()))
425 .max()
426 .unwrap_or_default();
427 let content_height = (((max_ports + NODE_PORT_ROW_OFFSET) as u16) * 4).max(NODE_HEIGHT);
428 let connections = state.connections.clone();
429 let build_graph = |width: u16, height: u16| {
430 NodeGraph::new(
431 build_node_layouts(),
432 connections.clone(),
433 width as usize,
434 height as usize,
435 )
436 };
437 let mut graph = build_graph(content_width, content_height);
438 graph.calculate();
439 let mut content_size = Size::new(content_width, content_height);
440 if state.display_nodes.is_empty() {
441 content_size = Size::new(area.width.max(NODE_WIDTH), area.height.max(NODE_HEIGHT));
442 graph = build_graph(content_size.width, content_size.height);
443 graph.calculate();
444 } else {
445 let bounds = graph.content_bounds();
446 let desired_width = bounds
447 .width
448 .saturating_add(GRAPH_WIDTH_PADDING)
449 .max(NODE_WIDTH);
450 let desired_height = bounds
451 .height
452 .saturating_add(GRAPH_HEIGHT_PADDING)
453 .max(NODE_HEIGHT);
454 if desired_width != content_size.width || desired_height != content_size.height {
455 content_size = Size::new(desired_width, desired_height);
456 graph = build_graph(content_size.width, content_size.height);
457 graph.calculate();
458 }
459 }
460 let mut scroll_view = ScrollView::new(content_size);
461 let zones = graph.split(scroll_view.area());
462
463 {
464 let mut statuses = state.statuses.lock().unwrap();
465 if statuses.len() <= state.task_count {
466 statuses.resize(state.task_count + 1, TaskStatus::default());
467 }
468 for (idx, ea_zone) in zones.into_iter().enumerate() {
469 let fallback_idx = state.task_count;
470 let status_idx = state
471 .status_index_map
472 .get(idx)
473 .and_then(|opt| *opt)
474 .unwrap_or(fallback_idx);
475 let safe_index = if status_idx < statuses.len() {
476 status_idx
477 } else {
478 statuses.len() - 1
479 };
480 let status = &mut statuses[safe_index];
481 let s = &state.display_nodes[idx].type_label;
482 let status_line = if status.is_error {
483 format!("❌ {}", status.error)
484 } else {
485 format!("✓ {}", status.status_txt)
486 };
487
488 let label_width = (NODE_WIDTH_CONTENT as usize).saturating_sub(2);
489 let type_label = clip_tail(s, label_width);
490 let status_text = clip_tail(&status_line, label_width);
491 let base_style = if status.is_error {
492 Style::default().fg(Color::Red)
493 } else {
494 Style::default().fg(Color::Green)
495 };
496 let mut lines: Vec<Line> = Vec::new();
497 lines.push(Line::styled(format!(" {}", type_label), base_style));
498 lines.push(Line::styled(format!(" {}", status_text), base_style));
499
500 let max_ports = state.display_nodes[idx]
501 .inputs
502 .len()
503 .max(state.display_nodes[idx].outputs.len());
504 if max_ports > 0 {
505 let left_width = (NODE_WIDTH_CONTENT as usize - 2) / 2;
506 let right_width = NODE_WIDTH_CONTENT as usize - 2 - left_width;
507 let input_style = Style::default().fg(Color::Yellow);
508 let output_style = Style::default().fg(Color::Cyan);
509 let dotted_style = Style::default().fg(Color::DarkGray);
510 for port_idx in 0..max_ports {
511 let input = state.display_nodes[idx]
512 .inputs
513 .get(port_idx)
514 .map(|label| clip_tail(label, left_width))
515 .unwrap_or_default();
516 let output = state.display_nodes[idx]
517 .outputs
518 .get(port_idx)
519 .map(|label| clip_tail(label, right_width))
520 .unwrap_or_default();
521 let mut port_line = Line::default();
522 port_line.spans.push(Span::styled(
523 format!(" {:<left_width$}", input, left_width = left_width),
524 input_style,
525 ));
526 port_line.spans.push(Span::styled("┆", dotted_style));
527 port_line.spans.push(Span::styled(
528 format!("{:>right_width$}", output, right_width = right_width),
529 output_style,
530 ));
531 lines.push(port_line);
532 }
533 }
534
535 let txt = Text::from(lines);
536 let paragraph = Paragraph::new(txt);
537 status.is_error = false; scroll_view.render_widget(paragraph, ea_zone);
539 }
540 }
541
542 scroll_view.render_widget(
543 GraphWrapper { inner: graph },
544 Rect {
545 x: 0,
546 y: 0,
547 width: content_size.width,
548 height: content_size.height,
549 },
550 );
551 scroll_view.render(area, buf, &mut state.nodes_scrollable_state);
552 }
553}
554
555pub struct CuConsoleMon {
557 config: CuConfig,
558 taskids: &'static [&'static str],
559 task_stats: Arc<Mutex<TaskStats>>,
560 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
561 ui_handle: Option<JoinHandle<()>>,
562 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
563 quitting: Arc<AtomicBool>,
564 topology: Option<MonitorTopology>,
565}
566
567impl Drop for CuConsoleMon {
568 fn drop(&mut self) {
569 self.quitting.store(true, Ordering::SeqCst);
570 let _ = restore_terminal();
571 if let Some(handle) = self.ui_handle.take() {
572 let _ = handle.join();
573 }
574 }
575}
576
577struct UI {
578 task_ids: &'static [&'static str],
579 active_screen: Screen,
580 sysinfo: String,
581 task_stats: Arc<Mutex<TaskStats>>,
582 quitting: Arc<AtomicBool>,
583 nodes_scrollable_widget_state: NodesScrollableWidgetState,
584 #[cfg(feature = "debug_pane")]
585 error_redirect: gag::BufferRedirect,
586 #[cfg(feature = "debug_pane")]
587 debug_output: Option<debug_pane::DebugLog>,
588 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
589}
590
591impl UI {
592 #[cfg(feature = "debug_pane")]
593 #[allow(clippy::too_many_arguments)]
594 fn new(
595 config: CuConfig,
596 mission: Option<&str>,
597 task_ids: &'static [&'static str],
598 task_stats: Arc<Mutex<TaskStats>>,
599 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
600 quitting: Arc<AtomicBool>,
601 error_redirect: gag::BufferRedirect,
602 debug_output: Option<debug_pane::DebugLog>,
603 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
604 topology: Option<MonitorTopology>,
605 ) -> UI {
606 init_error_hooks();
607 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
608 &config,
609 task_statuses.clone(),
610 mission,
611 task_ids,
612 topology.clone(),
613 );
614
615 Self {
616 task_ids,
617 active_screen: Screen::Neofetch,
618 sysinfo: sysinfo::pfetch_info(),
619 task_stats,
620 quitting,
621 nodes_scrollable_widget_state,
622 error_redirect,
623 debug_output,
624 pool_stats,
625 }
626 }
627
628 #[cfg(not(feature = "debug_pane"))]
629 fn new(
630 config: CuConfig,
631 task_ids: &'static [&'static str],
632 task_stats: Arc<Mutex<TaskStats>>,
633 task_statuses: Arc<Mutex<Vec<TaskStatus>>>,
634 quitting: Arc<AtomicBool>,
635 pool_stats: Arc<Mutex<Vec<PoolStats>>>,
636 topology: Option<MonitorTopology>,
637 ) -> UI {
638 init_error_hooks();
639 let nodes_scrollable_widget_state = NodesScrollableWidgetState::new(
640 &config,
641 task_statuses.clone(),
642 None,
643 task_ids,
644 topology.clone(),
645 );
646
647 Self {
648 task_ids,
649 active_screen: Screen::Neofetch,
650 sysinfo: sysinfo::pfetch_info(),
651 task_stats,
652 quitting,
653 nodes_scrollable_widget_state,
654 pool_stats,
655 }
656 }
657
658 fn draw_latency_table(&self, f: &mut Frame, area: Rect) {
659 let header_cells = [
660 "🛠 Task",
661 "⬇ Min",
662 "⬆ Max",
663 "∅ Mean",
664 "σ Stddev",
665 "⧖∅ Jitter",
666 "⧗⬆ Jitter",
667 ]
668 .iter()
669 .map(|h| {
670 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
671 Style::default()
672 .fg(Color::Yellow)
673 .add_modifier(Modifier::BOLD),
674 )
675 });
676
677 let header = Row::new(header_cells)
678 .style(Style::default().fg(Color::Yellow))
679 .bottom_margin(1)
680 .top_margin(1);
681
682 let task_stats = self.task_stats.lock().unwrap(); let mut rows = task_stats
684 .stats
685 .iter()
686 .enumerate()
687 .map(|(i, stat)| {
688 let cells = vec![
689 Cell::from(Line::from(self.task_ids[i]).alignment(Alignment::Right))
690 .light_blue(),
691 Cell::from(Line::from(stat.min().to_string()).alignment(Alignment::Right))
692 .style(Style::default()),
693 Cell::from(Line::from(stat.max().to_string()).alignment(Alignment::Right))
694 .style(Style::default()),
695 Cell::from(Line::from(stat.mean().to_string()).alignment(Alignment::Right))
696 .style(Style::default()),
697 Cell::from(Line::from(stat.stddev().to_string()).alignment(Alignment::Right))
698 .style(Style::default()),
699 Cell::from(
700 Line::from(stat.jitter_mean().to_string()).alignment(Alignment::Right),
701 )
702 .style(Style::default()),
703 Cell::from(
704 Line::from(stat.jitter_max().to_string()).alignment(Alignment::Right),
705 )
706 .style(Style::default()),
707 ];
708 Row::new(cells)
709 })
710 .collect::<Vec<Row>>();
711
712 let cells = vec![
713 Cell::from(
714 Line::from("End2End")
715 .light_red()
716 .alignment(Alignment::Right),
717 ),
718 Cell::from(
719 Line::from(task_stats.end2end.min().to_string())
720 .light_red()
721 .alignment(Alignment::Right),
722 )
723 .style(Style::default()),
724 Cell::from(
725 Line::from(task_stats.end2end.max().to_string())
726 .light_red()
727 .alignment(Alignment::Right),
728 )
729 .style(Style::default()),
730 Cell::from(
731 Line::from(task_stats.end2end.mean().to_string())
732 .light_red()
733 .alignment(Alignment::Right),
734 )
735 .style(Style::default()),
736 Cell::from(
737 Line::from(task_stats.end2end.stddev().to_string())
738 .light_red()
739 .alignment(Alignment::Right),
740 )
741 .style(Style::default()),
742 Cell::from(
743 Line::from(task_stats.end2end.jitter_mean().to_string())
744 .light_red()
745 .alignment(Alignment::Right),
746 )
747 .style(Style::default()),
748 Cell::from(
749 Line::from(task_stats.end2end.jitter_max().to_string())
750 .light_red()
751 .alignment(Alignment::Right),
752 )
753 .style(Style::default()),
754 ];
755 rows.push(Row::new(cells).top_margin(1));
756
757 let table = Table::new(
758 rows,
759 &[
760 Constraint::Length(10),
761 Constraint::Length(10),
762 Constraint::Length(12),
763 Constraint::Length(12),
764 Constraint::Length(10),
765 Constraint::Length(12),
766 Constraint::Length(13),
767 ],
768 )
769 .header(header)
770 .block(Block::default().borders(Borders::ALL).title(" Latencies "));
771
772 f.render_widget(table, area);
773 }
774
775 fn draw_memory_pools(&self, f: &mut Frame, area: Rect) {
776 let header_cells = [
777 "Pool ID",
778 "Used/Total",
779 "Buffer Size",
780 "Handles in Use",
781 "Handles/sec",
782 ]
783 .iter()
784 .map(|h| {
785 Cell::from(Line::from(*h).alignment(Alignment::Right)).style(
786 Style::default()
787 .fg(Color::Yellow)
788 .add_modifier(Modifier::BOLD),
789 )
790 });
791
792 let header = Row::new(header_cells)
793 .style(Style::default().fg(Color::Yellow))
794 .bottom_margin(1);
795
796 let pool_stats = self.pool_stats.lock().unwrap();
797 let rows = pool_stats
798 .iter()
799 .map(|stat| {
800 let used = stat.total_size - stat.space_left;
801 let percent = if stat.total_size > 0 {
802 100.0 * used as f64 / stat.total_size as f64
803 } else {
804 0.0
805 };
806 let buffer_size = stat.buffer_size;
807 let mb_unit = 1024.0 * 1024.0;
808
809 let cells = vec![
810 Cell::from(Line::from(stat.id.to_string()).alignment(Alignment::Right))
811 .light_blue(),
812 Cell::from(
813 Line::from(format!(
814 "{:.2} MB / {:.2} MB ({:.1}%)",
815 used as f64 * buffer_size as f64 / mb_unit,
816 stat.total_size as f64 * buffer_size as f64 / mb_unit,
817 percent
818 ))
819 .alignment(Alignment::Right),
820 ),
821 Cell::from(
822 Line::from(format!("{} KB", stat.buffer_size / 1024))
823 .alignment(Alignment::Right),
824 ),
825 Cell::from(
826 Line::from(format!("{}", stat.handles_in_use)).alignment(Alignment::Right),
827 ),
828 Cell::from(
829 Line::from(format!("{}/s", stat.handles_per_second))
830 .alignment(Alignment::Right),
831 ),
832 ];
833 Row::new(cells)
834 })
835 .collect::<Vec<Row>>();
836
837 let table = Table::new(
838 rows,
839 &[
840 Constraint::Percentage(30),
841 Constraint::Percentage(20),
842 Constraint::Percentage(15),
843 Constraint::Percentage(15),
844 Constraint::Percentage(20),
845 ],
846 )
847 .header(header)
848 .block(
849 Block::default()
850 .borders(Borders::ALL)
851 .title(" Memory Pools "),
852 );
853
854 f.render_widget(table, area);
855 }
856
857 fn draw_nodes(&mut self, f: &mut Frame, space: Rect) {
858 NodesScrollableWidget {
859 _marker: Default::default(),
860 }
861 .render(
862 space,
863 f.buffer_mut(),
864 &mut self.nodes_scrollable_widget_state,
865 )
866 }
867
868 fn draw(&mut self, f: &mut Frame) {
869 let layout = Layout::default()
870 .direction(Direction::Vertical)
871 .constraints(
872 [
873 Constraint::Length(3), Constraint::Min(0), ]
876 .as_ref(),
877 )
878 .split(f.area());
879
880 let menu = Paragraph::new(MENU_CONTENT)
881 .style(
882 Style::default()
883 .fg(Color::Yellow)
884 .add_modifier(Modifier::ITALIC),
885 )
886 .block(Block::default().borders(Borders::BOTTOM));
887 f.render_widget(menu, layout[0]);
888
889 match self.active_screen {
890 Screen::Neofetch => {
891 const VERSION: &str = env!("CARGO_PKG_VERSION");
892 let text: Text = format!("\n -> Copper v{}\n\n{}\n\n ", VERSION, self.sysinfo)
893 .into_text()
894 .unwrap();
895 let p = Paragraph::new::<Text>(text).block(
896 Block::default()
897 .title(" System Info ")
898 .borders(Borders::ALL),
899 );
900 f.render_widget(p, layout[1]);
901 }
902 Screen::Dag => {
903 self.draw_nodes(f, layout[1]);
904 }
905 Screen::Latency => self.draw_latency_table(f, layout[1]),
906 Screen::MemoryPools => self.draw_memory_pools(f, layout[1]),
907 #[cfg(feature = "debug_pane")]
908 Screen::DebugOutput => self.draw_debug_output(f, layout[1]),
909 };
910 }
911
912 fn run_app<B: Backend<Error = io::Error>>(
913 &mut self,
914 terminal: &mut Terminal<B>,
915 ) -> io::Result<()> {
916 loop {
917 if self.quitting.load(Ordering::SeqCst) {
918 break;
919 }
920 #[cfg(feature = "debug_pane")]
921 self.update_debug_output();
922
923 terminal.draw(|f| {
924 self.draw(f);
925 })?;
926
927 if event::poll(Duration::from_millis(50))? {
928 let event = event::read()?;
929
930 match event {
931 Event::Key(key) => match key.code {
932 KeyCode::Char('1') => self.active_screen = Screen::Neofetch,
933 KeyCode::Char('2') => self.active_screen = Screen::Dag,
934 KeyCode::Char('3') => self.active_screen = Screen::Latency,
935 KeyCode::Char('4') => self.active_screen = Screen::MemoryPools,
936 #[cfg(feature = "debug_pane")]
937 KeyCode::Char('5') => self.active_screen = Screen::DebugOutput,
938 KeyCode::Char('r') => {
939 if self.active_screen == Screen::Latency {
940 self.task_stats.lock().unwrap().reset()
941 }
942 }
943 KeyCode::Char('j') | KeyCode::Down => {
944 if self.active_screen == Screen::Dag {
945 for _ in 0..1 {
946 self.nodes_scrollable_widget_state
947 .nodes_scrollable_state
948 .scroll_down();
949 }
950 }
951 }
952 KeyCode::Char('k') | KeyCode::Up => {
953 if self.active_screen == Screen::Dag {
954 for _ in 0..1 {
955 self.nodes_scrollable_widget_state
956 .nodes_scrollable_state
957 .scroll_up();
958 }
959 }
960 }
961 KeyCode::Char('h') | KeyCode::Left => {
962 if self.active_screen == Screen::Dag {
963 for _ in 0..5 {
964 self.nodes_scrollable_widget_state
965 .nodes_scrollable_state
966 .scroll_left();
967 }
968 }
969 }
970 KeyCode::Char('l') | KeyCode::Right => {
971 if self.active_screen == Screen::Dag {
972 for _ in 0..5 {
973 self.nodes_scrollable_widget_state
974 .nodes_scrollable_state
975 .scroll_right();
976 }
977 }
978 }
979 KeyCode::Char('q') => {
980 break;
981 }
982 _ => {}
983 },
984
985 #[cfg(feature = "debug_pane")]
986 Event::Resize(_columns, rows) => {
987 if let Some(debug_output) = self.debug_output.as_mut() {
988 debug_output.max_rows.store(rows, Ordering::SeqCst)
989 }
990 }
991 _ => {}
992 }
993 }
994 }
995 Ok(())
996 }
997}
998
999impl CuMonitor for CuConsoleMon {
1000 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
1001 where
1002 Self: Sized,
1003 {
1004 let task_stats = Arc::new(Mutex::new(TaskStats::new(
1005 taskids.len(),
1006 CuDuration::from(Duration::from_secs(5)),
1007 )));
1008
1009 Ok(Self {
1010 config: config.clone(),
1011 taskids,
1012 task_stats,
1013 task_statuses: Arc::new(Mutex::new(vec![TaskStatus::default(); taskids.len()])),
1014 ui_handle: None,
1015 quitting: Arc::new(AtomicBool::new(false)),
1016 pool_stats: Arc::new(Mutex::new(Vec::new())),
1017 topology: None,
1018 })
1019 }
1020 fn set_topology(&mut self, topology: MonitorTopology) {
1021 self.topology = Some(topology);
1022 }
1023
1024 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
1025 let config_dup = self.config.clone();
1026 let taskids = self.taskids;
1027
1028 let task_stats_ui = self.task_stats.clone();
1029 let error_states = self.task_statuses.clone();
1030 let pool_stats_ui = self.pool_stats.clone();
1031 let quitting = self.quitting.clone();
1032 let topology = self.topology.clone();
1033
1034 let handle = thread::spawn(move || {
1036 let backend = CrosstermBackend::new(stdout());
1037 let _terminal_guard = TerminalRestoreGuard;
1038
1039 let prev_hook = panic::take_hook();
1041 panic::set_hook(Box::new(move |info| {
1042 let _ = restore_terminal();
1043 prev_hook(info);
1044 }));
1045
1046 install_signal_handlers(quitting.clone());
1047
1048 if let Err(err) = setup_terminal() {
1049 eprintln!("Failed to prepare terminal UI: {err}");
1050 return;
1051 }
1052
1053 let mut terminal = match Terminal::new(backend) {
1054 Ok(terminal) => terminal,
1055 Err(err) => {
1056 eprintln!("Failed to initialize terminal backend: {err}");
1057 return;
1058 }
1059 };
1060
1061 #[cfg(feature = "debug_pane")]
1062 {
1063 let error_redirect = gag::BufferRedirect::stderr().unwrap();
1065
1066 let mut ui = UI::new(
1067 config_dup,
1068 None, taskids,
1070 task_stats_ui,
1071 error_states,
1072 quitting.clone(),
1073 error_redirect,
1074 None,
1075 pool_stats_ui,
1076 topology.clone(),
1077 );
1078
1079 #[cfg(debug_assertions)]
1081 if cu29_log_runtime::EXTRA_TEXT_LOGGER
1082 .read()
1083 .unwrap()
1084 .is_some()
1085 {
1086 let max_lines = terminal.size().unwrap().height - 5;
1087 let (debug_log, tx) = debug_pane::DebugLog::new(max_lines);
1088
1089 let log_subscriber = debug_pane::LogSubscriber::new(tx);
1090
1091 *cu29_log_runtime::EXTRA_TEXT_LOGGER.write().unwrap() =
1092 Some(Box::new(log_subscriber) as Box<dyn log::Log>);
1093
1094 if let Err(err) = setup_terminal() {
1096 eprintln!("Failed to reinitialize terminal after log redirect: {err}");
1097 }
1098
1099 ui.debug_output = Some(debug_log);
1100 } else {
1101 println!("EXTRA_TEXT_LOGGER is none");
1102 }
1103 ui.run_app(&mut terminal).expect("Failed to run app");
1104 }
1105
1106 #[cfg(not(feature = "debug_pane"))]
1107 {
1108 let stderr_gag = gag::Gag::stderr().unwrap();
1109
1110 let mut ui = UI::new(
1111 config_dup,
1112 taskids,
1113 task_stats_ui,
1114 error_states,
1115 quitting,
1116 pool_stats_ui,
1117 topology,
1118 );
1119 ui.run_app(&mut terminal).expect("Failed to run app");
1120
1121 drop(stderr_gag);
1122 }
1123
1124 quitting.store(true, Ordering::SeqCst);
1125 let _ = restore_terminal();
1127 });
1128
1129 self.ui_handle = Some(handle);
1130 Ok(())
1131 }
1132
1133 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1134 {
1135 let mut task_stats = self.task_stats.lock().unwrap();
1136 task_stats.update(msgs);
1137 }
1138 {
1139 let mut task_statuses = self.task_statuses.lock().unwrap();
1140 for (i, msg) in msgs.iter().enumerate() {
1141 let CuCompactString(status_txt) = &msg.status_txt;
1142 task_statuses[i].status_txt = status_txt.clone();
1143 }
1144 }
1145
1146 {
1148 let pool_stats_data = pool::pools_statistics();
1149 let mut pool_stats = self.pool_stats.lock().unwrap();
1150
1151 for (id, space_left, total_size, buffer_size) in pool_stats_data {
1153 let id_str = id.to_string();
1154 if let Some(existing) = pool_stats.iter_mut().find(|p| p.id == id_str) {
1155 existing.update(space_left, total_size);
1156 } else {
1157 pool_stats.push(PoolStats::new(id_str, space_left, total_size, buffer_size));
1158 }
1159 }
1160 }
1161
1162 if self.quitting.load(Ordering::SeqCst) {
1163 return Err("Exiting...".into());
1164 }
1165 Ok(())
1166 }
1167
1168 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
1169 {
1170 let status = &mut self.task_statuses.lock().unwrap()[taskid];
1171 status.is_error = true;
1172 status.error = error.to_compact_string();
1173 }
1174 match step {
1175 CuTaskState::Start => Decision::Shutdown,
1176 CuTaskState::Preprocess => Decision::Abort,
1177 CuTaskState::Process => Decision::Ignore,
1178 CuTaskState::Postprocess => Decision::Ignore,
1179 CuTaskState::Stop => Decision::Shutdown,
1180 }
1181 }
1182
1183 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
1184 self.quitting.store(true, Ordering::SeqCst);
1185 let _ = restore_terminal();
1186
1187 if let Some(handle) = self.ui_handle.take() {
1188 let _ = handle.join();
1189 }
1190
1191 self.task_stats
1192 .lock()
1193 .unwrap()
1194 .stats
1195 .iter_mut()
1196 .for_each(|s| s.reset());
1197 Ok(())
1198 }
1199}
1200
1201struct TerminalRestoreGuard;
1202
1203impl Drop for TerminalRestoreGuard {
1204 fn drop(&mut self) {
1205 let _ = restore_terminal();
1206 }
1207}
1208
1209fn install_signal_handlers(quitting: Arc<AtomicBool>) {
1210 static SIGNAL_HANDLER: OnceLock<()> = OnceLock::new();
1211
1212 let _ = SIGNAL_HANDLER.get_or_init(|| {
1213 let quitting = quitting.clone();
1214 if let Err(err) = ctrlc::set_handler(move || {
1215 quitting.store(true, Ordering::SeqCst);
1216 let _ = restore_terminal();
1217 std::process::exit(130);
1219 }) {
1220 eprintln!("Failed to install Ctrl-C handler: {err}");
1221 }
1222 });
1223}
1224
1225fn init_error_hooks() {
1226 let (panic, error) = HookBuilder::default().into_hooks();
1227 let panic = panic.into_panic_hook();
1228 let error = error.into_eyre_hook();
1229 color_eyre::eyre::set_hook(Box::new(move |e| {
1230 let _ = restore_terminal();
1231 error(e)
1232 }))
1233 .unwrap();
1234 std::panic::set_hook(Box::new(move |info| {
1235 let _ = restore_terminal();
1236 panic(info)
1237 }));
1238}
1239
1240fn setup_terminal() -> io::Result<()> {
1241 enable_raw_mode()?;
1242 execute!(stdout(), EnterAlternateScreen, EnableMouseCapture)?;
1243 Ok(())
1244}
1245
1246fn restore_terminal() -> io::Result<()> {
1247 execute!(stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
1248 disable_raw_mode()
1249}