1#[cfg(feature = "log_pane")]
2use crate::logpane::StyledLine;
3use compact_str::{CompactString, ToCompactString};
4use cu29::clock::CuDuration;
5use cu29::cutask::CuMsgMetadata;
6use cu29::monitoring::{
7 ComponentId, CopperListInfo, CopperListIoStats, CopperListView, CuDurationStatistics,
8 CuMonitoringMetadata, MonitorComponentMetadata, MonitorTopology,
9};
10use cu29::prelude::{CuCompactString, CuTime, pool};
11#[cfg(feature = "log_pane")]
12use std::collections::VecDeque;
13use std::sync::{Arc, Mutex};
14#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
15use std::time::{Duration, Instant};
16#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
17use web_time::{Duration, Instant};
18
19const COPPERLIST_RATE_WINDOW: Duration = Duration::from_secs(1);
20#[cfg(feature = "log_pane")]
21const DEFAULT_LOG_CAPACITY: usize = 1_024;
22
23#[derive(Clone)]
24pub struct MonitorModel {
25 pub(crate) inner: Arc<MonitorModelInner>,
26}
27
28pub(crate) struct MonitorModelInner {
29 pub(crate) components: &'static [MonitorComponentMetadata],
30 pub(crate) topology: MonitorTopology,
31 pub(crate) component_stats: Mutex<ComponentStats>,
32 pub(crate) component_statuses: Mutex<Vec<ComponentStatus>>,
33 pub(crate) pool_stats: Mutex<Vec<PoolStats>>,
34 pub(crate) copperlist_stats: Mutex<CopperListStats>,
35 #[cfg(feature = "log_pane")]
36 pub(crate) log_lines: Mutex<VecDeque<StyledLine>>,
37}
38
39impl MonitorModel {
40 pub fn from_metadata(metadata: &CuMonitoringMetadata) -> Self {
41 Self::from_parts(
42 metadata.components(),
43 metadata.copperlist_info(),
44 metadata.topology().clone(),
45 )
46 }
47
48 pub fn from_parts(
49 components: &'static [MonitorComponentMetadata],
50 copperlist_info: CopperListInfo,
51 topology: MonitorTopology,
52 ) -> Self {
53 let component_count = components.len();
54 let mut copperlist_stats = CopperListStats::new();
55 copperlist_stats.set_info(copperlist_info);
56
57 Self {
58 inner: Arc::new(MonitorModelInner {
59 components,
60 topology,
61 component_stats: Mutex::new(ComponentStats::new(
62 component_count,
63 CuDuration::from(Duration::from_secs(5)),
64 )),
65 component_statuses: Mutex::new(vec![ComponentStatus::default(); component_count]),
66 pool_stats: Mutex::new(Vec::new()),
67 copperlist_stats: Mutex::new(copperlist_stats),
68 #[cfg(feature = "log_pane")]
69 log_lines: Mutex::new(VecDeque::with_capacity(DEFAULT_LOG_CAPACITY)),
70 }),
71 }
72 }
73
74 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
75 self.inner.components
76 }
77
78 pub fn topology(&self) -> &MonitorTopology {
79 &self.inner.topology
80 }
81
82 pub fn component_count(&self) -> usize {
83 self.inner.components.len()
84 }
85
86 pub fn set_copperlist_info(&self, info: CopperListInfo) {
87 self.inner.copperlist_stats.lock().unwrap().set_info(info);
88 }
89
90 pub fn last_seen_copperlist_id(&self) -> Option<u64> {
91 self.inner.copperlist_stats.lock().unwrap().last_seen_clid
92 }
93
94 pub fn record_component_latency(&self, component_id: ComponentId, duration: CuDuration) {
95 let mut component_stats = self.inner.component_stats.lock().unwrap();
96 if let Some(stat) = component_stats.stats.get_mut(component_id.index()) {
97 stat.record(duration);
98 }
99 }
100
101 pub fn record_end_to_end_latency(&self, duration: CuDuration) {
102 self.inner
103 .component_stats
104 .lock()
105 .unwrap()
106 .end2end
107 .record(duration);
108 }
109
110 pub fn reset_latency(&self) {
111 self.inner.component_stats.lock().unwrap().reset();
112 }
113
114 pub fn set_component_status(
115 &self,
116 component_id: ComponentId,
117 status_txt: impl ToCompactString,
118 ) {
119 if let Some(status) = self
120 .inner
121 .component_statuses
122 .lock()
123 .unwrap()
124 .get_mut(component_id.index())
125 {
126 status.status_txt = status_txt.to_compact_string();
127 }
128 }
129
130 pub fn set_component_error(&self, component_id: ComponentId, error_txt: impl ToCompactString) {
131 if let Some(status) = self
132 .inner
133 .component_statuses
134 .lock()
135 .unwrap()
136 .get_mut(component_id.index())
137 {
138 status.is_error = true;
139 status.error = error_txt.to_compact_string();
140 }
141 }
142
143 pub fn clear_component_error(&self, component_id: ComponentId) {
144 if let Some(status) = self
145 .inner
146 .component_statuses
147 .lock()
148 .unwrap()
149 .get_mut(component_id.index())
150 {
151 status.is_error = false;
152 status.error.clear();
153 }
154 }
155
156 pub fn observe_copperlist_io(&self, stats: CopperListIoStats) {
157 self.inner.copperlist_stats.lock().unwrap().update_io(stats);
158 }
159
160 pub fn update_copperlist_rate(&self, clid: u64) {
161 self.inner
162 .copperlist_stats
163 .lock()
164 .unwrap()
165 .update_rate(clid);
166 }
167
168 pub fn upsert_pool_stat(
169 &self,
170 id: impl ToCompactString,
171 space_left: usize,
172 total_size: usize,
173 buffer_size: usize,
174 ) {
175 let id = id.to_compact_string();
176 let mut pool_stats = self.inner.pool_stats.lock().unwrap();
177 if let Some(existing) = pool_stats.iter_mut().find(|pool| pool.id == id) {
178 existing.buffer_size = buffer_size;
179 existing.update(space_left, total_size);
180 } else {
181 pool_stats.push(PoolStats::new(id, space_left, total_size, buffer_size));
182 }
183 }
184
185 pub fn refresh_pool_stats_from_runtime(&self) {
186 let pool_stats_data = pool::pools_statistics();
187 for (id, space_left, total_size, buffer_size) in pool_stats_data {
188 self.upsert_pool_stat(id.to_string(), space_left, total_size, buffer_size);
189 }
190 }
191
192 #[cfg(feature = "log_pane")]
193 pub fn push_log_line(&self, line: StyledLine) {
194 if line.text.is_empty() {
195 return;
196 }
197
198 let mut log_lines = self.inner.log_lines.lock().unwrap();
199 log_lines.push_back(line);
200 while log_lines.len() > DEFAULT_LOG_CAPACITY {
201 log_lines.pop_front();
202 }
203 }
204
205 #[cfg(feature = "log_pane")]
206 pub fn log_lines(&self) -> Vec<StyledLine> {
207 self.inner
208 .log_lines
209 .lock()
210 .unwrap()
211 .iter()
212 .cloned()
213 .collect()
214 }
215
216 #[cfg(feature = "log_pane")]
217 pub fn log_line_count(&self) -> usize {
218 self.inner.log_lines.lock().unwrap().len()
219 }
220
221 pub fn process_copperlist(&self, copperlist_id: u64, view: CopperListView<'_>) {
222 self.inner.component_stats.lock().unwrap().update(view);
223 self.update_copperlist_rate(copperlist_id);
224
225 let mut component_statuses = self.inner.component_statuses.lock().unwrap();
226 for entry in view.entries() {
227 let component_index = entry.component_id.index();
228 assert!(
229 component_index < component_statuses.len(),
230 "cu_tuimon: mapped component index {} out of component_statuses bounds {}",
231 component_index,
232 component_statuses.len()
233 );
234 let CuCompactString(status_txt) = &entry.msg.status_txt;
235 component_statuses[component_index].status_txt = status_txt.clone();
236 }
237 drop(component_statuses);
238
239 self.refresh_pool_stats_from_runtime();
240 }
241}
242
243pub(crate) struct ComponentStats {
244 pub(crate) stats: Vec<CuDurationStatistics>,
245 pub(crate) end2end: CuDurationStatistics,
246}
247
248impl ComponentStats {
249 fn new(component_count: usize, max_duration: CuDuration) -> Self {
250 let stats = vec![CuDurationStatistics::new(max_duration); component_count];
251 Self {
252 stats,
253 end2end: CuDurationStatistics::new(max_duration),
254 }
255 }
256
257 fn update(&mut self, view: CopperListView<'_>) {
258 for entry in view.entries() {
259 let component_index = entry.component_id.index();
260 assert!(
261 component_index < self.stats.len(),
262 "cu_tuimon: mapped component index {} out of stats bounds {}",
263 component_index,
264 self.stats.len()
265 );
266 let msg = entry.msg;
267 let before = Option::<CuTime>::from(msg.process_time.start);
268 let after = Option::<CuTime>::from(msg.process_time.end);
269 if let (Some(before), Some(after)) = (before, after)
270 && after >= before
271 {
272 self.stats[component_index].record(after - before);
273 }
274 }
275 self.end2end.record(compute_end_to_end_latency(view.msgs()));
276 }
277
278 fn reset(&mut self) {
279 for stat in &mut self.stats {
280 stat.reset();
281 }
282 self.end2end.reset();
283 }
284}
285
286#[derive(Default, Clone)]
287pub(crate) struct ComponentStatus {
288 pub(crate) is_error: bool,
289 pub(crate) status_txt: CompactString,
290 pub(crate) error: CompactString,
291}
292
293pub(crate) struct PoolStats {
294 pub(crate) id: CompactString,
295 pub(crate) space_left: usize,
296 pub(crate) total_size: usize,
297 pub(crate) buffer_size: usize,
298 pub(crate) handles_in_use: usize,
299 pub(crate) handles_per_second: usize,
300 last_update: Instant,
301}
302
303impl PoolStats {
304 fn new(
305 id: impl ToCompactString,
306 space_left: usize,
307 total_size: usize,
308 buffer_size: usize,
309 ) -> Self {
310 Self {
311 id: id.to_compact_string(),
312 space_left,
313 total_size,
314 buffer_size,
315 handles_in_use: total_size.saturating_sub(space_left),
316 handles_per_second: 0,
317 last_update: Instant::now(),
318 }
319 }
320
321 fn update(&mut self, space_left: usize, total_size: usize) {
322 let now = Instant::now();
323 let handles_in_use = total_size.saturating_sub(space_left);
324 let elapsed = now.duration_since(self.last_update).as_secs_f32();
325
326 if elapsed >= 1.0 {
327 self.handles_per_second =
328 ((handles_in_use.abs_diff(self.handles_in_use)) as f32 / elapsed) as usize;
329 self.last_update = now;
330 }
331
332 self.handles_in_use = handles_in_use;
333 self.space_left = space_left;
334 self.total_size = total_size;
335 }
336}
337
338pub(crate) struct CopperListStats {
339 pub(crate) size_bytes: usize,
340 pub(crate) raw_culist_bytes: u64,
341 pub(crate) handle_bytes: u64,
342 pub(crate) encoded_bytes: u64,
343 pub(crate) keyframe_bytes: u64,
344 pub(crate) structured_total_bytes: u64,
345 pub(crate) structured_bytes_per_cl: u64,
346 pub(crate) total_copperlists: u64,
347 pub(crate) window_copperlists: u64,
348 pub(crate) last_seen_clid: Option<u64>,
349 last_rate_at: Instant,
350 pub(crate) rate_hz: f64,
351}
352
353impl CopperListStats {
354 fn new() -> Self {
355 Self {
356 size_bytes: 0,
357 raw_culist_bytes: 0,
358 handle_bytes: 0,
359 encoded_bytes: 0,
360 keyframe_bytes: 0,
361 structured_total_bytes: 0,
362 structured_bytes_per_cl: 0,
363 total_copperlists: 0,
364 window_copperlists: 0,
365 last_seen_clid: None,
366 last_rate_at: Instant::now(),
367 rate_hz: 0.0,
368 }
369 }
370
371 fn set_info(&mut self, info: CopperListInfo) {
372 self.size_bytes = info.size_bytes;
373 }
374
375 fn update_io(&mut self, stats: CopperListIoStats) {
376 self.raw_culist_bytes = stats.raw_culist_bytes;
377 self.handle_bytes = stats.handle_bytes;
378 self.encoded_bytes = stats.encoded_culist_bytes;
379 self.keyframe_bytes = stats.keyframe_bytes;
380 let total = stats.structured_log_bytes_total;
381 self.structured_bytes_per_cl = total.saturating_sub(self.structured_total_bytes);
382 self.structured_total_bytes = total;
383 }
384
385 fn update_rate(&mut self, clid: u64) {
386 let newly_seen = self
387 .last_seen_clid
388 .map_or(1, |prev| clid.wrapping_sub(prev));
389 self.last_seen_clid = Some(clid);
390 self.total_copperlists = self.total_copperlists.saturating_add(newly_seen);
391 self.window_copperlists = self.window_copperlists.saturating_add(newly_seen);
392
393 let now = Instant::now();
394 let elapsed = now.duration_since(self.last_rate_at);
395 if elapsed >= COPPERLIST_RATE_WINDOW {
396 let elapsed_secs = elapsed.as_secs_f64();
397 self.rate_hz = if elapsed_secs > 0.0 {
398 self.window_copperlists as f64 / elapsed_secs
399 } else {
400 0.0
401 };
402 self.window_copperlists = 0;
403 self.last_rate_at = now;
404 }
405 }
406}
407
408fn compute_end_to_end_latency(msgs: &[&CuMsgMetadata]) -> CuDuration {
409 let start = msgs.first().map(|msg| msg.process_time.start);
410 let end = msgs.last().map(|msg| msg.process_time.end);
411
412 if let (Some(start), Some(end)) = (start, end)
413 && let (Some(start), Some(end)) =
414 (Option::<CuTime>::from(start), Option::<CuTime>::from(end))
415 && end >= start
416 {
417 end - start
418 } else {
419 CuDuration::MIN
420 }
421}