Skip to main content

cu_tuimon/
model.rs

1#[cfg(feature = "log_pane")]
2use crate::logpane::StyledLine;
3use compact_str::{CompactString, ToCompactString};
4use cu29::clock::CuDuration;
5use cu29::cutask::CuMsgMetadata;
6use cu29::monitoring::{
7    ComponentId, CopperListInfo, CopperListIoStats, CopperListView, CuDurationStatistics,
8    CuMonitoringMetadata, MonitorComponentMetadata, MonitorTopology,
9};
10use cu29::prelude::{CuCompactString, CuTime, pool};
11#[cfg(feature = "log_pane")]
12use std::collections::VecDeque;
13use std::sync::{Arc, Mutex};
14#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
15use std::time::{Duration, Instant};
16#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
17use web_time::{Duration, Instant};
18
19const COPPERLIST_RATE_WINDOW: Duration = Duration::from_secs(1);
20#[cfg(feature = "log_pane")]
21const DEFAULT_LOG_CAPACITY: usize = 1_024;
22
23#[derive(Clone)]
24pub struct MonitorModel {
25    pub(crate) inner: Arc<MonitorModelInner>,
26}
27
28pub(crate) struct MonitorModelInner {
29    pub(crate) components: &'static [MonitorComponentMetadata],
30    pub(crate) topology: MonitorTopology,
31    pub(crate) system_name: CompactString,
32    pub(crate) subsystem_name: Option<CompactString>,
33    pub(crate) mission_name: CompactString,
34    pub(crate) instance_id: u32,
35    pub(crate) component_stats: Mutex<ComponentStats>,
36    pub(crate) component_statuses: Mutex<Vec<ComponentStatus>>,
37    pub(crate) pool_stats: Mutex<Vec<PoolStats>>,
38    pub(crate) copperlist_stats: Mutex<CopperListStats>,
39    #[cfg(feature = "log_pane")]
40    pub(crate) log_lines: Mutex<VecDeque<StyledLine>>,
41}
42
43impl MonitorModel {
44    pub fn from_metadata(metadata: &CuMonitoringMetadata) -> Self {
45        Self::from_parts_with_identity(
46            metadata.components(),
47            metadata.copperlist_info(),
48            metadata.topology().clone(),
49            metadata.mission_id(),
50            metadata.subsystem_id(),
51            metadata.instance_id(),
52        )
53    }
54
55    pub fn from_parts(
56        components: &'static [MonitorComponentMetadata],
57        copperlist_info: CopperListInfo,
58        topology: MonitorTopology,
59    ) -> Self {
60        Self::from_parts_with_identity(components, copperlist_info, topology, "default", None, 0)
61    }
62
63    fn from_parts_with_identity(
64        components: &'static [MonitorComponentMetadata],
65        copperlist_info: CopperListInfo,
66        topology: MonitorTopology,
67        mission_name: &str,
68        subsystem_name: Option<&str>,
69        instance_id: u32,
70    ) -> Self {
71        let component_count = components.len();
72        let mut copperlist_stats = CopperListStats::new();
73        copperlist_stats.set_info(copperlist_info);
74
75        Self {
76            inner: Arc::new(MonitorModelInner {
77                components,
78                topology,
79                system_name: cached_system_name(),
80                subsystem_name: subsystem_name
81                    .filter(|name| !name.trim().is_empty())
82                    .map(CompactString::from),
83                mission_name: CompactString::from(mission_name),
84                instance_id,
85                component_stats: Mutex::new(ComponentStats::new(
86                    component_count,
87                    CuDuration::from(Duration::from_secs(5)),
88                )),
89                component_statuses: Mutex::new(vec![ComponentStatus::default(); component_count]),
90                pool_stats: Mutex::new(Vec::new()),
91                copperlist_stats: Mutex::new(copperlist_stats),
92                #[cfg(feature = "log_pane")]
93                log_lines: Mutex::new(VecDeque::with_capacity(DEFAULT_LOG_CAPACITY)),
94            }),
95        }
96    }
97
98    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
99        self.inner.components
100    }
101
102    pub fn topology(&self) -> &MonitorTopology {
103        &self.inner.topology
104    }
105
106    pub fn component_count(&self) -> usize {
107        self.inner.components.len()
108    }
109
110    pub(crate) fn footer_identity(&self) -> MonitorFooterIdentity {
111        MonitorFooterIdentity {
112            system_name: self.inner.system_name.clone(),
113            subsystem_name: self.inner.subsystem_name.clone(),
114            mission_name: self.inner.mission_name.clone(),
115            instance_id: self.inner.instance_id,
116        }
117    }
118
119    pub fn set_copperlist_info(&self, info: CopperListInfo) {
120        self.inner.copperlist_stats.lock().unwrap().set_info(info);
121    }
122
123    pub fn last_seen_copperlist_id(&self) -> Option<u64> {
124        self.inner.copperlist_stats.lock().unwrap().last_seen_clid
125    }
126
127    pub fn record_component_latency(&self, component_id: ComponentId, duration: CuDuration) {
128        let mut component_stats = self.inner.component_stats.lock().unwrap();
129        if let Some(stat) = component_stats.stats.get_mut(component_id.index()) {
130            stat.record(duration);
131        }
132    }
133
134    pub fn record_end_to_end_latency(&self, duration: CuDuration) {
135        self.inner
136            .component_stats
137            .lock()
138            .unwrap()
139            .end2end
140            .record(duration);
141    }
142
143    pub fn reset_latency(&self) {
144        self.inner.component_stats.lock().unwrap().reset();
145    }
146
147    pub fn set_component_status(
148        &self,
149        component_id: ComponentId,
150        status_txt: impl ToCompactString,
151    ) {
152        if let Some(status) = self
153            .inner
154            .component_statuses
155            .lock()
156            .unwrap()
157            .get_mut(component_id.index())
158        {
159            status.status_txt = status_txt.to_compact_string();
160        }
161    }
162
163    pub fn set_component_error(&self, component_id: ComponentId, error_txt: impl ToCompactString) {
164        if let Some(status) = self
165            .inner
166            .component_statuses
167            .lock()
168            .unwrap()
169            .get_mut(component_id.index())
170        {
171            status.is_error = true;
172            status.error = error_txt.to_compact_string();
173        }
174    }
175
176    pub fn clear_component_error(&self, component_id: ComponentId) {
177        if let Some(status) = self
178            .inner
179            .component_statuses
180            .lock()
181            .unwrap()
182            .get_mut(component_id.index())
183        {
184            status.is_error = false;
185            status.error.clear();
186        }
187    }
188
189    pub fn observe_copperlist_io(&self, stats: CopperListIoStats) {
190        self.inner.copperlist_stats.lock().unwrap().update_io(stats);
191    }
192
193    pub fn update_copperlist_rate(&self, clid: u64) {
194        self.inner
195            .copperlist_stats
196            .lock()
197            .unwrap()
198            .update_rate(clid);
199    }
200
201    pub fn upsert_pool_stat(
202        &self,
203        id: impl ToCompactString,
204        space_left: usize,
205        total_size: usize,
206        buffer_size: usize,
207    ) {
208        let id = id.to_compact_string();
209        let mut pool_stats = self.inner.pool_stats.lock().unwrap();
210        if let Some(existing) = pool_stats.iter_mut().find(|pool| pool.id == id) {
211            existing.buffer_size = buffer_size;
212            existing.update(space_left, total_size);
213        } else {
214            pool_stats.push(PoolStats::new(id, space_left, total_size, buffer_size));
215        }
216    }
217
218    pub fn refresh_pool_stats_from_runtime(&self) {
219        let pool_stats_data = pool::pools_statistics();
220        for (id, space_left, total_size, buffer_size) in pool_stats_data {
221            self.upsert_pool_stat(id.to_string(), space_left, total_size, buffer_size);
222        }
223    }
224
225    #[cfg(feature = "log_pane")]
226    pub fn push_log_line(&self, line: StyledLine) {
227        if line.text.is_empty() {
228            return;
229        }
230
231        let mut log_lines = self.inner.log_lines.lock().unwrap();
232        log_lines.push_back(line);
233        while log_lines.len() > DEFAULT_LOG_CAPACITY {
234            log_lines.pop_front();
235        }
236    }
237
238    #[cfg(feature = "log_pane")]
239    pub fn log_lines(&self) -> Vec<StyledLine> {
240        self.inner
241            .log_lines
242            .lock()
243            .unwrap()
244            .iter()
245            .cloned()
246            .collect()
247    }
248
249    #[cfg(feature = "log_pane")]
250    pub fn log_line_count(&self) -> usize {
251        self.inner.log_lines.lock().unwrap().len()
252    }
253
254    pub fn process_copperlist(&self, copperlist_id: u64, view: CopperListView<'_>) {
255        self.inner.component_stats.lock().unwrap().update(view);
256        self.update_copperlist_rate(copperlist_id);
257
258        let mut component_statuses = self.inner.component_statuses.lock().unwrap();
259        for entry in view.entries() {
260            let component_index = entry.component_id.index();
261            assert!(
262                component_index < component_statuses.len(),
263                "cu_tuimon: mapped component index {} out of component_statuses bounds {}",
264                component_index,
265                component_statuses.len()
266            );
267            let CuCompactString(status_txt) = &entry.msg.status_txt;
268            component_statuses[component_index].status_txt = status_txt.clone();
269        }
270        drop(component_statuses);
271
272        self.refresh_pool_stats_from_runtime();
273    }
274}
275
276pub(crate) struct ComponentStats {
277    pub(crate) stats: Vec<CuDurationStatistics>,
278    pub(crate) end2end: CuDurationStatistics,
279}
280
281impl ComponentStats {
282    fn new(component_count: usize, max_duration: CuDuration) -> Self {
283        let stats = vec![CuDurationStatistics::new(max_duration); component_count];
284        Self {
285            stats,
286            end2end: CuDurationStatistics::new(max_duration),
287        }
288    }
289
290    fn update(&mut self, view: CopperListView<'_>) {
291        for entry in view.entries() {
292            let component_index = entry.component_id.index();
293            assert!(
294                component_index < self.stats.len(),
295                "cu_tuimon: mapped component index {} out of stats bounds {}",
296                component_index,
297                self.stats.len()
298            );
299            let msg = entry.msg;
300            let before = Option::<CuTime>::from(msg.process_time.start);
301            let after = Option::<CuTime>::from(msg.process_time.end);
302            if let (Some(before), Some(after)) = (before, after)
303                && after >= before
304            {
305                self.stats[component_index].record(after - before);
306            }
307        }
308        self.end2end.record(compute_end_to_end_latency(view.msgs()));
309    }
310
311    fn reset(&mut self) {
312        for stat in &mut self.stats {
313            stat.reset();
314        }
315        self.end2end.reset();
316    }
317}
318
319#[derive(Default, Clone)]
320pub(crate) struct ComponentStatus {
321    pub(crate) is_error: bool,
322    pub(crate) status_txt: CompactString,
323    pub(crate) error: CompactString,
324}
325
326pub(crate) struct PoolStats {
327    pub(crate) id: CompactString,
328    pub(crate) space_left: usize,
329    pub(crate) total_size: usize,
330    pub(crate) buffer_size: usize,
331    pub(crate) handles_in_use: usize,
332    pub(crate) handles_per_second: usize,
333    last_update: Instant,
334}
335
336#[derive(Clone, Debug, PartialEq, Eq)]
337pub(crate) struct MonitorFooterIdentity {
338    pub(crate) system_name: CompactString,
339    pub(crate) subsystem_name: Option<CompactString>,
340    pub(crate) mission_name: CompactString,
341    pub(crate) instance_id: u32,
342}
343
344#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
345fn cached_system_name() -> CompactString {
346    let mut buf = [0u8; 256];
347    let result = unsafe { libc::gethostname(buf.as_mut_ptr().cast(), buf.len()) };
348    if result == 0 {
349        let end = buf.iter().position(|byte| *byte == 0).unwrap_or(buf.len());
350        if let Ok(hostname) = core::str::from_utf8(&buf[..end]) {
351            let hostname = hostname.trim();
352            if !hostname.is_empty() {
353                return CompactString::from(hostname);
354            }
355        }
356    }
357    CompactString::from("unknown-host")
358}
359
360#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
361fn cached_system_name() -> CompactString {
362    web_sys::window()
363        .and_then(|window| window.location().hostname().ok())
364        .map(|hostname| hostname.trim().to_string())
365        .filter(|hostname| !hostname.is_empty())
366        .map(CompactString::from)
367        .unwrap_or_else(|| CompactString::from("browser"))
368}
369
370impl PoolStats {
371    fn new(
372        id: impl ToCompactString,
373        space_left: usize,
374        total_size: usize,
375        buffer_size: usize,
376    ) -> Self {
377        Self {
378            id: id.to_compact_string(),
379            space_left,
380            total_size,
381            buffer_size,
382            handles_in_use: total_size.saturating_sub(space_left),
383            handles_per_second: 0,
384            last_update: Instant::now(),
385        }
386    }
387
388    fn update(&mut self, space_left: usize, total_size: usize) {
389        let now = Instant::now();
390        let handles_in_use = total_size.saturating_sub(space_left);
391        let elapsed = now.duration_since(self.last_update).as_secs_f32();
392
393        if elapsed >= 1.0 {
394            self.handles_per_second =
395                ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
396            self.last_update = now;
397        }
398
399        self.handles_in_use = handles_in_use;
400        self.space_left = space_left;
401        self.total_size = total_size;
402    }
403}
404
405pub(crate) struct CopperListStats {
406    pub(crate) size_bytes: usize,
407    pub(crate) raw_culist_bytes: u64,
408    pub(crate) handle_bytes: u64,
409    pub(crate) encoded_bytes: u64,
410    pub(crate) keyframe_bytes: u64,
411    pub(crate) structured_total_bytes: u64,
412    pub(crate) structured_bytes_per_cl: u64,
413    pub(crate) total_copperlists: u64,
414    pub(crate) window_copperlists: u64,
415    pub(crate) last_seen_clid: Option<u64>,
416    last_rate_at: Instant,
417    pub(crate) rate_hz: f64,
418}
419
420impl CopperListStats {
421    fn new() -> Self {
422        Self {
423            size_bytes: 0,
424            raw_culist_bytes: 0,
425            handle_bytes: 0,
426            encoded_bytes: 0,
427            keyframe_bytes: 0,
428            structured_total_bytes: 0,
429            structured_bytes_per_cl: 0,
430            total_copperlists: 0,
431            window_copperlists: 0,
432            last_seen_clid: None,
433            last_rate_at: Instant::now(),
434            rate_hz: 0.0,
435        }
436    }
437
438    fn set_info(&mut self, info: CopperListInfo) {
439        self.size_bytes = info.size_bytes;
440    }
441
442    fn update_io(&mut self, stats: CopperListIoStats) {
443        self.raw_culist_bytes = stats.raw_culist_bytes;
444        self.handle_bytes = stats.handle_bytes;
445        self.encoded_bytes = stats.encoded_culist_bytes;
446        self.keyframe_bytes = stats.keyframe_bytes;
447        let total = stats.structured_log_bytes_total;
448        self.structured_bytes_per_cl = total.saturating_sub(self.structured_total_bytes);
449        self.structured_total_bytes = total;
450    }
451
452    fn update_rate(&mut self, clid: u64) {
453        let newly_seen = self
454            .last_seen_clid
455            .map_or(1, |prev| clid.wrapping_sub(prev));
456        self.last_seen_clid = Some(clid);
457        self.total_copperlists = self.total_copperlists.saturating_add(newly_seen);
458        self.window_copperlists = self.window_copperlists.saturating_add(newly_seen);
459
460        let now = Instant::now();
461        let elapsed = now.duration_since(self.last_rate_at);
462        if elapsed >= COPPERLIST_RATE_WINDOW {
463            let elapsed_secs = elapsed.as_secs_f64();
464            self.rate_hz = if elapsed_secs > 0.0 {
465                self.window_copperlists as f64 / elapsed_secs
466            } else {
467                0.0
468            };
469            self.window_copperlists = 0;
470            self.last_rate_at = now;
471        }
472    }
473}
474
475fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
476    let start = msgs.first().map(|msg| msg.process_time.start);
477    let end = msgs.last().map(|msg| msg.process_time.end);
478
479    if let (Some(start), Some(end)) = (start, end)
480        && let (Some(start), Some(end)) =
481            (Option::<CuTime>::from(start), Option::<CuTime>::from(end))
482        && end >= start
483    {
484        end - start
485    } else {
486        CuDuration::MIN
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use cu29::monitoring::{ComponentType, MonitorTopology};
494
495    #[test]
496    fn monitor_model_footer_identity_uses_metadata_and_runtime_instance() {
497        static COMPONENTS: &[MonitorComponentMetadata] = &[MonitorComponentMetadata::new(
498            "task",
499            ComponentType::Task,
500            None,
501        )];
502        static CULIST_COMPONENT_MAPPING: &[ComponentId] = &[ComponentId::new(0)];
503        let metadata = CuMonitoringMetadata::new(
504            CompactString::from("autonomous"),
505            COMPONENTS,
506            CULIST_COMPONENT_MAPPING,
507            CopperListInfo::new(0, 0),
508            MonitorTopology::default(),
509            None,
510        )
511        .expect("valid monitoring metadata")
512        .with_subsystem_id(Some("balancebot"))
513        .with_instance_id(7);
514
515        let model = MonitorModel::from_metadata(&metadata);
516
517        let identity = model.footer_identity();
518        assert_eq!(identity.subsystem_name.as_deref(), Some("balancebot"));
519        assert_eq!(identity.mission_name.as_str(), "autonomous");
520        assert_eq!(identity.instance_id, 7);
521        assert!(!identity.system_name.is_empty());
522    }
523
524    #[test]
525    fn monitor_model_footer_identity_omits_missing_subsystem() {
526        static COMPONENTS: &[MonitorComponentMetadata] = &[MonitorComponentMetadata::new(
527            "task",
528            ComponentType::Task,
529            None,
530        )];
531        let model = MonitorModel::from_parts(
532            COMPONENTS,
533            CopperListInfo::new(0, 0),
534            MonitorTopology::default(),
535        );
536
537        let identity = model.footer_identity();
538        assert!(identity.subsystem_name.is_none());
539        assert_eq!(identity.mission_name.as_str(), "default");
540        assert_eq!(identity.instance_id, 0);
541    }
542}