Skip to main content

cu_tuimon/
model.rs

1#[cfg(feature = "log_pane")]
2use crate::logpane::StyledLine;
3use compact_str::{CompactString, ToCompactString};
4use cu29::clock::CuDuration;
5use cu29::cutask::CuMsgMetadata;
6use cu29::monitoring::{
7    ComponentId, CopperListInfo, CopperListIoStats, CopperListView, CuDurationStatistics,
8    CuMonitoringMetadata, MonitorComponentMetadata, MonitorTopology,
9};
10use cu29::prelude::{CuCompactString, CuTime, pool};
11#[cfg(feature = "log_pane")]
12use std::collections::VecDeque;
13use std::sync::{Arc, Mutex};
14#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
15use std::time::{Duration, Instant};
16#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
17use web_time::{Duration, Instant};
18
19const COPPERLIST_RATE_WINDOW: Duration = Duration::from_secs(1);
20#[cfg(feature = "log_pane")]
21const DEFAULT_LOG_CAPACITY: usize = 1_024;
22
23#[derive(Clone)]
24pub struct MonitorModel {
25    pub(crate) inner: Arc<MonitorModelInner>,
26}
27
28pub(crate) struct MonitorModelInner {
29    pub(crate) components: &'static [MonitorComponentMetadata],
30    pub(crate) topology: MonitorTopology,
31    pub(crate) component_stats: Mutex<ComponentStats>,
32    pub(crate) component_statuses: Mutex<Vec<ComponentStatus>>,
33    pub(crate) pool_stats: Mutex<Vec<PoolStats>>,
34    pub(crate) copperlist_stats: Mutex<CopperListStats>,
35    #[cfg(feature = "log_pane")]
36    pub(crate) log_lines: Mutex<VecDeque<StyledLine>>,
37}
38
39impl MonitorModel {
40    pub fn from_metadata(metadata: &CuMonitoringMetadata) -> Self {
41        Self::from_parts(
42            metadata.components(),
43            metadata.copperlist_info(),
44            metadata.topology().clone(),
45        )
46    }
47
48    pub fn from_parts(
49        components: &'static [MonitorComponentMetadata],
50        copperlist_info: CopperListInfo,
51        topology: MonitorTopology,
52    ) -> Self {
53        let component_count = components.len();
54        let mut copperlist_stats = CopperListStats::new();
55        copperlist_stats.set_info(copperlist_info);
56
57        Self {
58            inner: Arc::new(MonitorModelInner {
59                components,
60                topology,
61                component_stats: Mutex::new(ComponentStats::new(
62                    component_count,
63                    CuDuration::from(Duration::from_secs(5)),
64                )),
65                component_statuses: Mutex::new(vec![ComponentStatus::default(); component_count]),
66                pool_stats: Mutex::new(Vec::new()),
67                copperlist_stats: Mutex::new(copperlist_stats),
68                #[cfg(feature = "log_pane")]
69                log_lines: Mutex::new(VecDeque::with_capacity(DEFAULT_LOG_CAPACITY)),
70            }),
71        }
72    }
73
74    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
75        self.inner.components
76    }
77
78    pub fn topology(&self) -> &MonitorTopology {
79        &self.inner.topology
80    }
81
82    pub fn component_count(&self) -> usize {
83        self.inner.components.len()
84    }
85
86    pub fn set_copperlist_info(&self, info: CopperListInfo) {
87        self.inner.copperlist_stats.lock().unwrap().set_info(info);
88    }
89
90    pub fn last_seen_copperlist_id(&self) -> Option<u64> {
91        self.inner.copperlist_stats.lock().unwrap().last_seen_clid
92    }
93
94    pub fn record_component_latency(&self, component_id: ComponentId, duration: CuDuration) {
95        let mut component_stats = self.inner.component_stats.lock().unwrap();
96        if let Some(stat) = component_stats.stats.get_mut(component_id.index()) {
97            stat.record(duration);
98        }
99    }
100
101    pub fn record_end_to_end_latency(&self, duration: CuDuration) {
102        self.inner
103            .component_stats
104            .lock()
105            .unwrap()
106            .end2end
107            .record(duration);
108    }
109
110    pub fn reset_latency(&self) {
111        self.inner.component_stats.lock().unwrap().reset();
112    }
113
114    pub fn set_component_status(
115        &self,
116        component_id: ComponentId,
117        status_txt: impl ToCompactString,
118    ) {
119        if let Some(status) = self
120            .inner
121            .component_statuses
122            .lock()
123            .unwrap()
124            .get_mut(component_id.index())
125        {
126            status.status_txt = status_txt.to_compact_string();
127        }
128    }
129
130    pub fn set_component_error(&self, component_id: ComponentId, error_txt: impl ToCompactString) {
131        if let Some(status) = self
132            .inner
133            .component_statuses
134            .lock()
135            .unwrap()
136            .get_mut(component_id.index())
137        {
138            status.is_error = true;
139            status.error = error_txt.to_compact_string();
140        }
141    }
142
143    pub fn clear_component_error(&self, component_id: ComponentId) {
144        if let Some(status) = self
145            .inner
146            .component_statuses
147            .lock()
148            .unwrap()
149            .get_mut(component_id.index())
150        {
151            status.is_error = false;
152            status.error.clear();
153        }
154    }
155
156    pub fn observe_copperlist_io(&self, stats: CopperListIoStats) {
157        self.inner.copperlist_stats.lock().unwrap().update_io(stats);
158    }
159
160    pub fn update_copperlist_rate(&self, clid: u64) {
161        self.inner
162            .copperlist_stats
163            .lock()
164            .unwrap()
165            .update_rate(clid);
166    }
167
168    pub fn upsert_pool_stat(
169        &self,
170        id: impl ToCompactString,
171        space_left: usize,
172        total_size: usize,
173        buffer_size: usize,
174    ) {
175        let id = id.to_compact_string();
176        let mut pool_stats = self.inner.pool_stats.lock().unwrap();
177        if let Some(existing) = pool_stats.iter_mut().find(|pool| pool.id == id) {
178            existing.buffer_size = buffer_size;
179            existing.update(space_left, total_size);
180        } else {
181            pool_stats.push(PoolStats::new(id, space_left, total_size, buffer_size));
182        }
183    }
184
185    pub fn refresh_pool_stats_from_runtime(&self) {
186        let pool_stats_data = pool::pools_statistics();
187        for (id, space_left, total_size, buffer_size) in pool_stats_data {
188            self.upsert_pool_stat(id.to_string(), space_left, total_size, buffer_size);
189        }
190    }
191
192    #[cfg(feature = "log_pane")]
193    pub fn push_log_line(&self, line: StyledLine) {
194        if line.text.is_empty() {
195            return;
196        }
197
198        let mut log_lines = self.inner.log_lines.lock().unwrap();
199        log_lines.push_back(line);
200        while log_lines.len() > DEFAULT_LOG_CAPACITY {
201            log_lines.pop_front();
202        }
203    }
204
205    #[cfg(feature = "log_pane")]
206    pub fn log_lines(&self) -> Vec<StyledLine> {
207        self.inner
208            .log_lines
209            .lock()
210            .unwrap()
211            .iter()
212            .cloned()
213            .collect()
214    }
215
216    #[cfg(feature = "log_pane")]
217    pub fn log_line_count(&self) -> usize {
218        self.inner.log_lines.lock().unwrap().len()
219    }
220
221    pub fn process_copperlist(&self, copperlist_id: u64, view: CopperListView<'_>) {
222        self.inner.component_stats.lock().unwrap().update(view);
223        self.update_copperlist_rate(copperlist_id);
224
225        let mut component_statuses = self.inner.component_statuses.lock().unwrap();
226        for entry in view.entries() {
227            let component_index = entry.component_id.index();
228            assert!(
229                component_index < component_statuses.len(),
230                "cu_tuimon: mapped component index {} out of component_statuses bounds {}",
231                component_index,
232                component_statuses.len()
233            );
234            let CuCompactString(status_txt) = &entry.msg.status_txt;
235            component_statuses[component_index].status_txt = status_txt.clone();
236        }
237        drop(component_statuses);
238
239        self.refresh_pool_stats_from_runtime();
240    }
241}
242
243pub(crate) struct ComponentStats {
244    pub(crate) stats: Vec<CuDurationStatistics>,
245    pub(crate) end2end: CuDurationStatistics,
246}
247
248impl ComponentStats {
249    fn new(component_count: usize, max_duration: CuDuration) -> Self {
250        let stats = vec![CuDurationStatistics::new(max_duration); component_count];
251        Self {
252            stats,
253            end2end: CuDurationStatistics::new(max_duration),
254        }
255    }
256
257    fn update(&mut self, view: CopperListView<'_>) {
258        for entry in view.entries() {
259            let component_index = entry.component_id.index();
260            assert!(
261                component_index < self.stats.len(),
262                "cu_tuimon: mapped component index {} out of stats bounds {}",
263                component_index,
264                self.stats.len()
265            );
266            let msg = entry.msg;
267            let before = Option::<CuTime>::from(msg.process_time.start);
268            let after = Option::<CuTime>::from(msg.process_time.end);
269            if let (Some(before), Some(after)) = (before, after)
270                && after >= before
271            {
272                self.stats[component_index].record(after - before);
273            }
274        }
275        self.end2end.record(compute_end_to_end_latency(view.msgs()));
276    }
277
278    fn reset(&mut self) {
279        for stat in &mut self.stats {
280            stat.reset();
281        }
282        self.end2end.reset();
283    }
284}
285
286#[derive(Default, Clone)]
287pub(crate) struct ComponentStatus {
288    pub(crate) is_error: bool,
289    pub(crate) status_txt: CompactString,
290    pub(crate) error: CompactString,
291}
292
293pub(crate) struct PoolStats {
294    pub(crate) id: CompactString,
295    pub(crate) space_left: usize,
296    pub(crate) total_size: usize,
297    pub(crate) buffer_size: usize,
298    pub(crate) handles_in_use: usize,
299    pub(crate) handles_per_second: usize,
300    last_update: Instant,
301}
302
303impl PoolStats {
304    fn new(
305        id: impl ToCompactString,
306        space_left: usize,
307        total_size: usize,
308        buffer_size: usize,
309    ) -> Self {
310        Self {
311            id: id.to_compact_string(),
312            space_left,
313            total_size,
314            buffer_size,
315            handles_in_use: total_size.saturating_sub(space_left),
316            handles_per_second: 0,
317            last_update: Instant::now(),
318        }
319    }
320
321    fn update(&mut self, space_left: usize, total_size: usize) {
322        let now = Instant::now();
323        let handles_in_use = total_size.saturating_sub(space_left);
324        let elapsed = now.duration_since(self.last_update).as_secs_f32();
325
326        if elapsed >= 1.0 {
327            self.handles_per_second =
328                ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
329            self.last_update = now;
330        }
331
332        self.handles_in_use = handles_in_use;
333        self.space_left = space_left;
334        self.total_size = total_size;
335    }
336}
337
338pub(crate) struct CopperListStats {
339    pub(crate) size_bytes: usize,
340    pub(crate) raw_culist_bytes: u64,
341    pub(crate) handle_bytes: u64,
342    pub(crate) encoded_bytes: u64,
343    pub(crate) keyframe_bytes: u64,
344    pub(crate) structured_total_bytes: u64,
345    pub(crate) structured_bytes_per_cl: u64,
346    pub(crate) total_copperlists: u64,
347    pub(crate) window_copperlists: u64,
348    pub(crate) last_seen_clid: Option<u64>,
349    last_rate_at: Instant,
350    pub(crate) rate_hz: f64,
351}
352
353impl CopperListStats {
354    fn new() -> Self {
355        Self {
356            size_bytes: 0,
357            raw_culist_bytes: 0,
358            handle_bytes: 0,
359            encoded_bytes: 0,
360            keyframe_bytes: 0,
361            structured_total_bytes: 0,
362            structured_bytes_per_cl: 0,
363            total_copperlists: 0,
364            window_copperlists: 0,
365            last_seen_clid: None,
366            last_rate_at: Instant::now(),
367            rate_hz: 0.0,
368        }
369    }
370
371    fn set_info(&mut self, info: CopperListInfo) {
372        self.size_bytes = info.size_bytes;
373    }
374
375    fn update_io(&mut self, stats: CopperListIoStats) {
376        self.raw_culist_bytes = stats.raw_culist_bytes;
377        self.handle_bytes = stats.handle_bytes;
378        self.encoded_bytes = stats.encoded_culist_bytes;
379        self.keyframe_bytes = stats.keyframe_bytes;
380        let total = stats.structured_log_bytes_total;
381        self.structured_bytes_per_cl = total.saturating_sub(self.structured_total_bytes);
382        self.structured_total_bytes = total;
383    }
384
385    fn update_rate(&mut self, clid: u64) {
386        let newly_seen = self
387            .last_seen_clid
388            .map_or(1, |prev| clid.wrapping_sub(prev));
389        self.last_seen_clid = Some(clid);
390        self.total_copperlists = self.total_copperlists.saturating_add(newly_seen);
391        self.window_copperlists = self.window_copperlists.saturating_add(newly_seen);
392
393        let now = Instant::now();
394        let elapsed = now.duration_since(self.last_rate_at);
395        if elapsed >= COPPERLIST_RATE_WINDOW {
396            let elapsed_secs = elapsed.as_secs_f64();
397            self.rate_hz = if elapsed_secs > 0.0 {
398                self.window_copperlists as f64 / elapsed_secs
399            } else {
400                0.0
401            };
402            self.window_copperlists = 0;
403            self.last_rate_at = now;
404        }
405    }
406}
407
408fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
409    let start = msgs.first().map(|msg| msg.process_time.start);
410    let end = msgs.last().map(|msg| msg.process_time.end);
411
412    if let (Some(start), Some(end)) = (start, end)
413        && let (Some(start), Some(end)) =
414            (Option::<CuTime>::from(start), Option::<CuTime>::from(end))
415        && end >= start
416    {
417        end - start
418    } else {
419        CuDuration::MIN
420    }
421}