Skip to main content

cu_tuimon/
model.rs

1use compact_str::{CompactString, ToCompactString};
2use cu29::clock::CuDuration;
3use cu29::cutask::CuMsgMetadata;
4use cu29::monitoring::{
5    ComponentId, CopperListInfo, CopperListIoStats, CopperListView, CuDurationStatistics,
6    CuMonitoringMetadata, MonitorComponentMetadata, MonitorTopology,
7};
8use cu29::prelude::{CuCompactString, CuTime, pools_statistics};
9use std::sync::{Arc, Mutex};
10
11#[cfg(feature = "log_pane")]
12use crate::log_pane::StyledLine;
13#[cfg(feature = "log_pane")]
14use std::collections::VecDeque;
15
16#[cfg(native)]
17use std::time::{Duration, Instant};
18#[cfg(browser)]
19use web_time::{Duration, Instant};
20
21const COPPERLIST_RATE_WINDOW: Duration = Duration::from_secs(1);
22
23#[cfg(feature = "log_pane")]
24const DEFAULT_LOG_CAPACITY: usize = 1_024;
25
26#[derive(Clone)]
27pub struct MonitorModel {
28    pub(crate) inner: Arc<MonitorModelInner>,
29}
30
31pub(crate) struct MonitorModelInner {
32    pub(crate) components: &'static [MonitorComponentMetadata],
33    pub(crate) topology: MonitorTopology,
34    pub(crate) system_name: CompactString,
35    pub(crate) subsystem_name: Option<CompactString>,
36    pub(crate) mission_name: CompactString,
37    pub(crate) instance_id: u32,
38    pub(crate) component_stats: Mutex<ComponentStats>,
39    pub(crate) component_statuses: Mutex<Vec<ComponentStatus>>,
40    pub(crate) pool_stats: Mutex<Vec<PoolStats>>,
41    pub(crate) copperlist_stats: Mutex<CopperListStats>,
42
43    #[cfg(feature = "log_pane")]
44    pub(crate) log_lines: Mutex<VecDeque<StyledLine>>,
45}
46
47impl MonitorModel {
48    pub fn from_metadata(metadata: &CuMonitoringMetadata) -> Self {
49        Self::from_parts_with_identity(
50            metadata.components(),
51            metadata.copperlist_info(),
52            metadata.topology().clone(),
53            metadata.mission_id(),
54            metadata.subsystem_id(),
55            metadata.instance_id(),
56        )
57    }
58
59    pub fn from_parts(
60        components: &'static [MonitorComponentMetadata],
61        copperlist_info: CopperListInfo,
62        topology: MonitorTopology,
63    ) -> Self {
64        Self::from_parts_with_identity(components, copperlist_info, topology, "default", None, 0)
65    }
66
67    fn from_parts_with_identity(
68        components: &'static [MonitorComponentMetadata],
69        copperlist_info: CopperListInfo,
70        topology: MonitorTopology,
71        mission_name: &str,
72        subsystem_name: Option<&str>,
73        instance_id: u32,
74    ) -> Self {
75        let component_count = components.len();
76        let mut copperlist_stats = CopperListStats::new();
77        copperlist_stats.set_info(copperlist_info);
78
79        Self {
80            inner: Arc::new(MonitorModelInner {
81                components,
82                topology,
83                system_name: cached_system_name(),
84                subsystem_name: subsystem_name
85                    .filter(|name| !name.trim().is_empty())
86                    .map(CompactString::from),
87                mission_name: CompactString::from(mission_name),
88                instance_id,
89                component_stats: Mutex::new(ComponentStats::new(
90                    component_count,
91                    CuDuration::from(Duration::from_secs(5)),
92                )),
93                component_statuses: Mutex::new(vec![ComponentStatus::default(); component_count]),
94                pool_stats: Mutex::new(Vec::new()),
95                copperlist_stats: Mutex::new(copperlist_stats),
96
97                #[cfg(feature = "log_pane")]
98                log_lines: Mutex::new(VecDeque::with_capacity(DEFAULT_LOG_CAPACITY)),
99            }),
100        }
101    }
102
103    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
104        self.inner.components
105    }
106
107    pub fn topology(&self) -> &MonitorTopology {
108        &self.inner.topology
109    }
110
111    pub fn component_count(&self) -> usize {
112        self.inner.components.len()
113    }
114
115    pub(crate) fn footer_identity(&self) -> MonitorFooterIdentity {
116        MonitorFooterIdentity {
117            system_name: self.inner.system_name.clone(),
118            subsystem_name: self.inner.subsystem_name.clone(),
119            mission_name: self.inner.mission_name.clone(),
120            instance_id: self.inner.instance_id,
121        }
122    }
123
124    pub fn set_copperlist_info(&self, info: CopperListInfo) {
125        self.inner.copperlist_stats.lock().unwrap().set_info(info);
126    }
127
128    pub fn last_seen_copperlist_id(&self) -> Option<u64> {
129        self.inner.copperlist_stats.lock().unwrap().last_seen_clid
130    }
131
132    pub fn record_component_latency(&self, component_id: ComponentId, duration: CuDuration) {
133        let mut component_stats = self.inner.component_stats.lock().unwrap();
134        if let Some(stat) = component_stats.stats.get_mut(component_id.index()) {
135            stat.record(duration);
136        }
137    }
138
139    pub fn record_end_to_end_latency(&self, duration: CuDuration) {
140        self.inner
141            .component_stats
142            .lock()
143            .unwrap()
144            .end2end
145            .record(duration);
146    }
147
148    pub fn reset_latency(&self) {
149        self.inner.component_stats.lock().unwrap().reset();
150    }
151
152    pub fn set_component_status(
153        &self,
154        component_id: ComponentId,
155        status_txt: impl ToCompactString,
156    ) {
157        if let Some(status) = self
158            .inner
159            .component_statuses
160            .lock()
161            .unwrap()
162            .get_mut(component_id.index())
163        {
164            status.status_txt = status_txt.to_compact_string();
165        }
166    }
167
168    pub fn set_component_error(&self, component_id: ComponentId, error_txt: impl ToCompactString) {
169        if let Some(status) = self
170            .inner
171            .component_statuses
172            .lock()
173            .unwrap()
174            .get_mut(component_id.index())
175        {
176            status.is_error = true;
177            status.error = error_txt.to_compact_string();
178        }
179    }
180
181    pub fn clear_component_error(&self, component_id: ComponentId) {
182        if let Some(status) = self
183            .inner
184            .component_statuses
185            .lock()
186            .unwrap()
187            .get_mut(component_id.index())
188        {
189            status.is_error = false;
190            status.error.clear();
191        }
192    }
193
194    pub fn observe_copperlist_io(&self, stats: CopperListIoStats) {
195        self.inner.copperlist_stats.lock().unwrap().update_io(stats);
196    }
197
198    pub fn update_copperlist_rate(&self, clid: u64) {
199        self.inner
200            .copperlist_stats
201            .lock()
202            .unwrap()
203            .update_rate(clid);
204    }
205
206    pub fn upsert_pool_stat(
207        &self,
208        id: impl ToCompactString,
209        space_left: usize,
210        total_size: usize,
211        buffer_size: usize,
212    ) {
213        let id = id.to_compact_string();
214        let mut pool_stats = self.inner.pool_stats.lock().unwrap();
215        if let Some(existing) = pool_stats.iter_mut().find(|pool| pool.id == id) {
216            existing.buffer_size = buffer_size;
217            existing.update(space_left, total_size);
218        } else {
219            pool_stats.push(PoolStats::new(id, space_left, total_size, buffer_size));
220        }
221    }
222
223    pub fn refresh_pool_stats_from_runtime(&self) {
224        let pool_stats_data = pools_statistics();
225        for (id, space_left, total_size, buffer_size) in pool_stats_data {
226            self.upsert_pool_stat(id.to_string(), space_left, total_size, buffer_size);
227        }
228    }
229}
230
231#[cfg(feature = "log_pane")]
232impl MonitorModel {
233    pub fn push_log_line(&self, line: StyledLine) {
234        if line.text.is_empty() {
235            return;
236        }
237
238        let mut log_lines = self.inner.log_lines.lock().unwrap();
239        log_lines.push_back(line);
240        while log_lines.len() > DEFAULT_LOG_CAPACITY {
241            log_lines.pop_front();
242        }
243    }
244
245    pub fn log_lines(&self) -> Vec<StyledLine> {
246        self.inner
247            .log_lines
248            .lock()
249            .unwrap()
250            .iter()
251            .cloned()
252            .collect()
253    }
254
255    pub fn log_line_count(&self) -> usize {
256        self.inner.log_lines.lock().unwrap().len()
257    }
258
259    pub fn process_copperlist(&self, copperlist_id: u64, view: CopperListView<'_>) {
260        self.inner.component_stats.lock().unwrap().update(view);
261        self.update_copperlist_rate(copperlist_id);
262
263        let mut component_statuses = self.inner.component_statuses.lock().unwrap();
264        for entry in view.entries() {
265            let component_index = entry.component_id.index();
266            assert!(
267                component_index < component_statuses.len(),
268                "cu_tuimon: mapped component index {} out of component_statuses bounds {}",
269                component_index,
270                component_statuses.len()
271            );
272            let CuCompactString(status_txt) = &entry.msg.status_txt;
273            component_statuses[component_index].status_txt = status_txt.clone();
274        }
275        drop(component_statuses);
276
277        self.refresh_pool_stats_from_runtime();
278    }
279}
280
281pub(crate) struct ComponentStats {
282    pub(crate) stats: Vec<CuDurationStatistics>,
283    pub(crate) end2end: CuDurationStatistics,
284}
285
286impl ComponentStats {
287    fn new(component_count: usize, max_duration: CuDuration) -> Self {
288        let stats = vec![CuDurationStatistics::new(max_duration); component_count];
289        Self {
290            stats,
291            end2end: CuDurationStatistics::new(max_duration),
292        }
293    }
294
295    fn update(&mut self, view: CopperListView<'_>) {
296        for entry in view.entries() {
297            let component_index = entry.component_id.index();
298            assert!(
299                component_index < self.stats.len(),
300                "cu_tuimon: mapped component index {} out of stats bounds {}",
301                component_index,
302                self.stats.len()
303            );
304            let msg = entry.msg;
305            let before = Option::<CuTime>::from(msg.process_time.start);
306            let after = Option::<CuTime>::from(msg.process_time.end);
307            if let (Some(before), Some(after)) = (before, after)
308                && after >= before
309            {
310                self.stats[component_index].record(after - before);
311            }
312        }
313        self.end2end.record(compute_end_to_end_latency(view.msgs()));
314    }
315
316    fn reset(&mut self) {
317        for stat in &mut self.stats {
318            stat.reset();
319        }
320        self.end2end.reset();
321    }
322}
323
324#[derive(Default, Clone)]
325pub(crate) struct ComponentStatus {
326    pub(crate) is_error: bool,
327    pub(crate) status_txt: CompactString,
328    pub(crate) error: CompactString,
329}
330
331pub(crate) struct PoolStats {
332    pub(crate) id: CompactString,
333    pub(crate) space_left: usize,
334    pub(crate) total_size: usize,
335    pub(crate) buffer_size: usize,
336    pub(crate) handles_in_use: usize,
337    pub(crate) handles_per_second: usize,
338    last_update: Instant,
339}
340
341#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct MonitorFooterIdentity {
343    pub(crate) system_name: CompactString,
344    pub(crate) subsystem_name: Option<CompactString>,
345    pub(crate) mission_name: CompactString,
346    pub(crate) instance_id: u32,
347}
348
349#[cfg(native)]
350fn cached_system_name() -> CompactString {
351    let mut buf = [0u8; 256];
352    let result = unsafe { libc::gethostname(buf.as_mut_ptr().cast(), buf.len()) };
353    if result == 0 {
354        let end = buf.iter().position(|byte| *byte == 0).unwrap_or(buf.len());
355        if let Ok(hostname) = core::str::from_utf8(&buf[..end]) {
356            let hostname = hostname.trim();
357            if !hostname.is_empty() {
358                return CompactString::from(hostname);
359            }
360        }
361    }
362    CompactString::from("unknown-host")
363}
364
365#[cfg(browser)]
366fn cached_system_name() -> CompactString {
367    web_sys::window()
368        .and_then(|window| window.location().hostname().ok())
369        .map(|hostname| hostname.trim().to_string())
370        .filter(|hostname| !hostname.is_empty())
371        .map(CompactString::from)
372        .unwrap_or_else(|| CompactString::from("browser"))
373}
374
375impl PoolStats {
376    fn new(
377        id: impl ToCompactString,
378        space_left: usize,
379        total_size: usize,
380        buffer_size: usize,
381    ) -> Self {
382        Self {
383            id: id.to_compact_string(),
384            space_left,
385            total_size,
386            buffer_size,
387            handles_in_use: total_size.saturating_sub(space_left),
388            handles_per_second: 0,
389            last_update: Instant::now(),
390        }
391    }
392
393    fn update(&mut self, space_left: usize, total_size: usize) {
394        let now = Instant::now();
395        let handles_in_use = total_size.saturating_sub(space_left);
396        let elapsed = now.duration_since(self.last_update).as_secs_f32();
397
398        if elapsed >= 1.0 {
399            self.handles_per_second =
400                ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
401            self.last_update = now;
402        }
403
404        self.handles_in_use = handles_in_use;
405        self.space_left = space_left;
406        self.total_size = total_size;
407    }
408}
409
410pub(crate) struct CopperListStats {
411    pub(crate) size_bytes: usize,
412    pub(crate) raw_culist_bytes: u64,
413    pub(crate) handle_bytes: u64,
414    pub(crate) encoded_bytes: u64,
415    pub(crate) keyframe_bytes: u64,
416    pub(crate) structured_total_bytes: u64,
417    pub(crate) structured_bytes_per_cl: u64,
418    pub(crate) total_copperlists: u64,
419    pub(crate) window_copperlists: u64,
420    pub(crate) last_seen_clid: Option<u64>,
421    last_rate_at: Instant,
422    pub(crate) rate_hz: f64,
423}
424
425impl CopperListStats {
426    fn new() -> Self {
427        Self {
428            size_bytes: 0,
429            raw_culist_bytes: 0,
430            handle_bytes: 0,
431            encoded_bytes: 0,
432            keyframe_bytes: 0,
433            structured_total_bytes: 0,
434            structured_bytes_per_cl: 0,
435            total_copperlists: 0,
436            window_copperlists: 0,
437            last_seen_clid: None,
438            last_rate_at: Instant::now(),
439            rate_hz: 0.0,
440        }
441    }
442
443    fn set_info(&mut self, info: CopperListInfo) {
444        self.size_bytes = info.size_bytes;
445    }
446
447    fn update_io(&mut self, stats: CopperListIoStats) {
448        self.raw_culist_bytes = stats.raw_culist_bytes;
449        self.handle_bytes = stats.handle_bytes;
450        self.encoded_bytes = stats.encoded_culist_bytes;
451        self.keyframe_bytes = stats.keyframe_bytes;
452        let total = stats.structured_log_bytes_total;
453        self.structured_bytes_per_cl = total.saturating_sub(self.structured_total_bytes);
454        self.structured_total_bytes = total;
455    }
456
457    fn update_rate(&mut self, clid: u64) {
458        let newly_seen = self
459            .last_seen_clid
460            .map_or(1, |prev| clid.wrapping_sub(prev));
461        self.last_seen_clid = Some(clid);
462        self.total_copperlists = self.total_copperlists.saturating_add(newly_seen);
463        self.window_copperlists = self.window_copperlists.saturating_add(newly_seen);
464
465        let now = Instant::now();
466        let elapsed = now.duration_since(self.last_rate_at);
467        if elapsed >= COPPERLIST_RATE_WINDOW {
468            let elapsed_secs = elapsed.as_secs_f64();
469            self.rate_hz = if elapsed_secs > 0.0 {
470                self.window_copperlists as f64 / elapsed_secs
471            } else {
472                0.0
473            };
474            self.window_copperlists = 0;
475            self.last_rate_at = now;
476        }
477    }
478}
479
480fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
481    let start = msgs.first().map(|msg| msg.process_time.start);
482    let end = msgs.last().map(|msg| msg.process_time.end);
483
484    if let (Some(start), Some(end)) = (start, end)
485        && let (Some(start), Some(end)) =
486            (Option::<CuTime>::from(start), Option::<CuTime>::from(end))
487        && end >= start
488    {
489        end - start
490    } else {
491        CuDuration::MIN
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use cu29::monitoring::{ComponentType, MonitorTopology};
499
500    #[test]
501    fn monitor_model_footer_identity_uses_metadata_and_runtime_instance() {
502        static COMPONENTS: &[MonitorComponentMetadata] = &[MonitorComponentMetadata::new(
503            "task",
504            ComponentType::Task,
505            None,
506        )];
507        static CULIST_COMPONENT_MAPPING: &[ComponentId] = &[ComponentId::new(0)];
508        let metadata = CuMonitoringMetadata::new(
509            CompactString::from("autonomous"),
510            COMPONENTS,
511            CULIST_COMPONENT_MAPPING,
512            CopperListInfo::new(0, 0),
513            MonitorTopology::default(),
514            None,
515        )
516        .expect("valid monitoring metadata")
517        .with_subsystem_id(Some("balancebot"))
518        .with_instance_id(7);
519
520        let model = MonitorModel::from_metadata(&metadata);
521
522        let identity = model.footer_identity();
523        assert_eq!(identity.subsystem_name.as_deref(), Some("balancebot"));
524        assert_eq!(identity.mission_name.as_str(), "autonomous");
525        assert_eq!(identity.instance_id, 7);
526        assert!(!identity.system_name.is_empty());
527    }
528
529    #[test]
530    fn monitor_model_footer_identity_omits_missing_subsystem() {
531        static COMPONENTS: &[MonitorComponentMetadata] = &[MonitorComponentMetadata::new(
532            "task",
533            ComponentType::Task,
534            None,
535        )];
536        let model = MonitorModel::from_parts(
537            COMPONENTS,
538            CopperListInfo::new(0, 0),
539            MonitorTopology::default(),
540        );
541
542        let identity = model.footer_identity();
543        assert!(identity.subsystem_name.is_none());
544        assert_eq!(identity.mission_name.as_str(), "default");
545        assert_eq!(identity.instance_id, 0);
546    }
547}