Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10#[cfg(all(feature = "std", debug_assertions))]
11use cu29_log_runtime::{
12    format_message_only, register_live_log_listener, unregister_live_log_listener,
13};
14use cu29_traits::{CuError, CuResult};
15use serde_derive::{Deserialize, Serialize};
16
17#[cfg(not(feature = "std"))]
18extern crate alloc;
19
20#[cfg(feature = "std")]
21use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
22
23#[cfg(not(feature = "std"))]
24use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
25
26#[cfg(not(feature = "std"))]
27mod imp {
28    pub use alloc::alloc::{GlobalAlloc, Layout};
29    pub use core::sync::atomic::{AtomicUsize, Ordering};
30    pub use libm::sqrt;
31}
32
33#[cfg(feature = "std")]
34mod imp {
35    #[cfg(feature = "memory_monitoring")]
36    use super::CountingAlloc;
37    #[cfg(feature = "memory_monitoring")]
38    pub use std::alloc::System;
39    pub use std::alloc::{GlobalAlloc, Layout};
40    pub use std::sync::atomic::{AtomicUsize, Ordering};
41    #[cfg(feature = "memory_monitoring")]
42    #[global_allocator]
43    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
44}
45
46use imp::*;
47
48#[cfg(all(feature = "std", debug_assertions))]
49fn format_timestamp(time: CuDuration) -> String {
50    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
51    let nanos = time.as_nanos();
52    let total_seconds = nanos / 1_000_000_000;
53    let hours = total_seconds / 3600;
54    let minutes = (total_seconds / 60) % 60;
55    let seconds = total_seconds % 60;
56    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
57    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
58}
59
60/// The state of a task.
61#[derive(Debug, Serialize, Deserialize)]
62pub enum CuTaskState {
63    Start,
64    Preprocess,
65    Process,
66    Postprocess,
67    Stop,
68}
69
70/// Monitor decision to be taken when a task errored out.
71#[derive(Debug)]
72pub enum Decision {
73    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
74    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
75    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum ComponentKind {
80    Task,
81    Bridge,
82}
83
84#[derive(Debug, Clone)]
85pub struct MonitorNode {
86    pub id: String,
87    pub type_name: Option<String>,
88    pub kind: ComponentKind,
89    /// Ordered list of input port identifiers.
90    pub inputs: Vec<String>,
91    /// Ordered list of output port identifiers.
92    pub outputs: Vec<String>,
93}
94
95#[derive(Debug, Clone)]
96pub struct MonitorConnection {
97    pub src: String,
98    pub src_port: Option<String>,
99    pub dst: String,
100    pub dst_port: Option<String>,
101    pub msg: String,
102}
103
104#[derive(Debug, Clone, Default)]
105pub struct MonitorTopology {
106    pub nodes: Vec<MonitorNode>,
107    pub connections: Vec<MonitorConnection>,
108}
109
110#[derive(Debug, Clone, Copy, Default)]
111pub struct CopperListInfo {
112    pub size_bytes: usize,
113    pub count: usize,
114}
115
116impl CopperListInfo {
117    pub const fn new(size_bytes: usize, count: usize) -> Self {
118        Self { size_bytes, count }
119    }
120}
121
122/// Reported data about CopperList IO for a single iteration.
123#[derive(Debug, Clone, Copy, Default)]
124pub struct CopperListIoStats {
125    /// CopperList struct size in RAM (excluding dynamic payloads/handles)
126    pub raw_culist_bytes: u64,
127    /// Bytes held by payloads that will be serialized (currently: pooled handles, vecs, slices)
128    pub handle_bytes: u64,
129    /// Bytes produced by bincode serialization of the CopperList
130    pub encoded_culist_bytes: u64,
131    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
132    pub keyframe_bytes: u64,
133    /// Cumulative bytes written to the structured log stream so far
134    pub structured_log_bytes_total: u64,
135    /// CopperList identifier for reference in monitors
136    pub culistid: u32,
137}
138
139/// Lightweight trait to estimate the amount of data a payload will contribute when serialized.
140/// Default implementations return the stack size; specific types override to report dynamic data.
141pub trait CuPayloadSize {
142    /// Total bytes represented by the payload in memory (stack + heap backing).
143    fn raw_bytes(&self) -> usize {
144        core::mem::size_of_val(self)
145    }
146
147    /// Bytes that correspond to reusable/pooled handles (used for IO budgeting).
148    fn handle_bytes(&self) -> usize {
149        0
150    }
151}
152
153impl<T> CuPayloadSize for T
154where
155    T: crate::cutask::CuMsgPayload,
156{
157    fn raw_bytes(&self) -> usize {
158        core::mem::size_of::<T>()
159    }
160}
161
162#[derive(Default, Debug, Clone, Copy)]
163struct NodeIoUsage {
164    has_incoming: bool,
165    has_outgoing: bool,
166}
167
168fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
169    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
170    edge_ids.sort();
171
172    let mut outputs = Vec::new();
173    let mut seen = Vec::new();
174    let mut port_idx = 0usize;
175    for edge_id in edge_ids {
176        let Some(edge) = graph.edge(edge_id) else {
177            continue;
178        };
179        if seen.iter().any(|msg| msg == &edge.msg) {
180            continue;
181        }
182        seen.push(edge.msg.clone());
183        let mut port_label = String::from("out");
184        port_label.push_str(&port_idx.to_string());
185        port_label.push_str(": ");
186        port_label.push_str(edge.msg.as_str());
187        outputs.push((edge.msg.clone(), port_label));
188        port_idx += 1;
189    }
190    outputs
191}
192
193/// Derive a monitor-friendly topology from the runtime configuration.
194pub fn build_monitor_topology(
195    config: &CuConfig,
196    mission: Option<&str>,
197) -> CuResult<MonitorTopology> {
198    let graph = config.get_graph(mission)?;
199    let mut nodes: Map<String, MonitorNode> = Map::new();
200    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
201    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
202
203    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
204    for bridge in &config.bridges {
205        bridge_lookup.insert(bridge.id.as_str(), bridge);
206    }
207
208    for cnx in graph.edges() {
209        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
210        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
211    }
212
213    for (_, node) in graph.get_all_nodes() {
214        let kind = match node.get_flavor() {
215            Flavor::Bridge => ComponentKind::Bridge,
216            _ => ComponentKind::Task,
217        };
218        let node_id = node.get_id();
219
220        let mut inputs = Vec::new();
221        let mut outputs = Vec::new();
222        if kind == ComponentKind::Bridge {
223            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
224                for ch in &bridge.channels {
225                    match ch {
226                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
227                            outputs.push(id.clone())
228                        }
229                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
230                    }
231                }
232            }
233        } else {
234            let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
235            if usage.has_incoming || !usage.has_outgoing {
236                inputs.push("in".to_string());
237            }
238            if usage.has_outgoing {
239                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
240                    let ports = collect_output_ports(graph, node_idx);
241                    let mut port_map: Map<String, String> = Map::new();
242                    for (msg_type, label) in ports {
243                        port_map.insert(msg_type, label.clone());
244                        outputs.push(label);
245                    }
246                    output_port_lookup.insert(node_id.clone(), port_map);
247                }
248            } else if !usage.has_incoming {
249                outputs.push("out".to_string());
250            }
251        }
252
253        nodes.insert(
254            node_id.clone(),
255            MonitorNode {
256                id: node_id,
257                type_name: Some(node.get_type().to_string()),
258                kind,
259                inputs,
260                outputs,
261            },
262        );
263    }
264
265    let mut connections = Vec::new();
266    for cnx in graph.edges() {
267        let src = cnx.src.clone();
268        let dst = cnx.dst.clone();
269
270        let src_port = cnx.src_channel.clone().or_else(|| {
271            output_port_lookup
272                .get(&src)
273                .and_then(|ports| ports.get(&cnx.msg).cloned())
274                .or_else(|| {
275                    nodes
276                        .get(&src)
277                        .and_then(|node| node.outputs.first().cloned())
278                })
279        });
280        let dst_port = cnx.dst_channel.clone().or_else(|| {
281            nodes
282                .get(&dst)
283                .and_then(|node| node.inputs.first().cloned())
284        });
285
286        connections.push(MonitorConnection {
287            src,
288            src_port,
289            dst,
290            dst_port,
291            msg: cnx.msg.clone(),
292        });
293    }
294
295    Ok(MonitorTopology {
296        nodes: nodes.into_values().collect(),
297        connections,
298    })
299}
300
301/// Trait to implement a monitoring task.
302pub trait CuMonitor: Sized {
303    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
304    where
305        Self: Sized;
306
307    fn set_topology(&mut self, _topology: MonitorTopology) {}
308
309    fn set_copperlist_info(&mut self, _info: CopperListInfo) {}
310
311    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
312        Ok(())
313    }
314
315    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
316    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
317
318    /// Called when the runtime finishes serializing a CopperList, giving IO accounting data.
319    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
320
321    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
322    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
323
324    /// Callbacked when copper is stopping.
325    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
326        Ok(())
327    }
328}
329
330/// A do nothing monitor if no monitor is provided.
331/// This is basically defining the default behavior of Copper in case of error.
332pub struct NoMonitor {}
333impl CuMonitor for NoMonitor {
334    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
335        Ok(NoMonitor {})
336    }
337
338    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
339        #[cfg(all(feature = "std", debug_assertions))]
340        register_live_log_listener(|entry, format_str, param_names| {
341            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
342            let named: Map<String, String> = param_names
343                .iter()
344                .zip(params.iter())
345                .map(|(k, v)| (k.to_string(), v.clone()))
346                .collect();
347
348            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
349                let ts = format_timestamp(entry.time);
350                println!("{} [{:?}] {}", ts, entry.level, msg);
351            }
352        });
353        Ok(())
354    }
355
356    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
357        // By default, do nothing.
358        Ok(())
359    }
360
361    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
362        // By default, just try to continue.
363        Decision::Ignore
364    }
365
366    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
367        #[cfg(all(feature = "std", debug_assertions))]
368        unregister_live_log_listener();
369        Ok(())
370    }
371}
372
373/// A simple allocator that counts the number of bytes allocated and deallocated.
374pub struct CountingAlloc<A: GlobalAlloc> {
375    inner: A,
376    allocated: AtomicUsize,
377    deallocated: AtomicUsize,
378}
379
380impl<A: GlobalAlloc> CountingAlloc<A> {
381    pub const fn new(inner: A) -> Self {
382        CountingAlloc {
383            inner,
384            allocated: AtomicUsize::new(0),
385            deallocated: AtomicUsize::new(0),
386        }
387    }
388
389    pub fn allocated(&self) -> usize {
390        self.allocated.load(Ordering::SeqCst)
391    }
392
393    pub fn deallocated(&self) -> usize {
394        self.deallocated.load(Ordering::SeqCst)
395    }
396
397    pub fn reset(&self) {
398        self.allocated.store(0, Ordering::SeqCst);
399        self.deallocated.store(0, Ordering::SeqCst);
400    }
401}
402
403// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
404unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
405    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
406    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
407        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
408        let p = unsafe { self.inner.alloc(layout) };
409        if !p.is_null() {
410            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
411        }
412        p
413    }
414
415    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
416    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
417        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
418        unsafe { self.inner.dealloc(ptr, layout) }
419        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
420    }
421}
422
423/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
424#[cfg(feature = "memory_monitoring")]
425pub struct ScopedAllocCounter {
426    bf_allocated: usize,
427    bf_deallocated: usize,
428}
429
430#[cfg(feature = "memory_monitoring")]
431impl Default for ScopedAllocCounter {
432    fn default() -> Self {
433        Self::new()
434    }
435}
436
437#[cfg(feature = "memory_monitoring")]
438impl ScopedAllocCounter {
439    pub fn new() -> Self {
440        ScopedAllocCounter {
441            bf_allocated: GLOBAL.allocated(),
442            bf_deallocated: GLOBAL.deallocated(),
443        }
444    }
445
446    /// Returns the total number of bytes allocated in the current scope
447    /// since the creation of this `ScopedAllocCounter`.
448    ///
449    /// # Example
450    /// ```
451    /// use cu29_runtime::monitoring::ScopedAllocCounter;
452    ///
453    /// let counter = ScopedAllocCounter::new();
454    /// let _vec = vec![0u8; 1024];
455    /// println!("Bytes allocated: {}", counter.get_allocated());
456    /// ```
457    pub fn allocated(&self) -> usize {
458        GLOBAL.allocated() - self.bf_allocated
459    }
460
461    /// Returns the total number of bytes deallocated in the current scope
462    /// since the creation of this `ScopedAllocCounter`.
463    ///
464    /// # Example
465    /// ```
466    /// use cu29_runtime::monitoring::ScopedAllocCounter;
467    ///
468    /// let counter = ScopedAllocCounter::new();
469    /// let _vec = vec![0u8; 1024];
470    /// drop(_vec);
471    /// println!("Bytes deallocated: {}", counter.get_deallocated());
472    /// ```
473    pub fn deallocated(&self) -> usize {
474        GLOBAL.deallocated() - self.bf_deallocated
475    }
476}
477
478/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
479#[cfg(feature = "memory_monitoring")]
480impl Drop for ScopedAllocCounter {
481    fn drop(&mut self) {
482        let _allocated = GLOBAL.allocated() - self.bf_allocated;
483        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
484        // TODO(gbin): Fix this when the logger is ready.
485        // debug!(
486        //     "Allocations: +{}B -{}B",
487        //     allocated = allocated,
488        //     deallocated = deallocated,
489        // );
490    }
491}
492
493#[cfg(feature = "std")]
494const BUCKET_COUNT: usize = 1024;
495#[cfg(not(feature = "std"))]
496const BUCKET_COUNT: usize = 256;
497
498/// Accumulative stat object that can give your some real time statistics.
499/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
500#[derive(Debug, Clone)]
501pub struct LiveStatistics {
502    buckets: [u64; BUCKET_COUNT],
503    min_val: u64,
504    max_val: u64,
505    sum: u64,
506    sum_sq: u64,
507    count: u64,
508    max_value: u64,
509}
510
511impl LiveStatistics {
512    /// Creates a new `LiveStatistics` instance with a specified maximum value.
513    ///
514    /// This function initializes a `LiveStatistics` structure with default values
515    /// for tracking statistical data, while setting an upper limit for the data
516    /// points that the structure tracks.
517    ///
518    /// # Parameters
519    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
520    ///
521    /// # Returns
522    /// A new instance of `LiveStatistics` with:
523    /// - `buckets`: An array pre-filled with zeros to categorize data points.
524    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
525    /// - `max_val`: Initialized to zero.
526    /// - `sum`: The sum of all data points, initialized to zero.
527    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
528    /// - `count`: The total number of data points, initialized to zero.
529    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
530    ///
531    pub fn new_with_max(max_value: u64) -> Self {
532        LiveStatistics {
533            buckets: [0; BUCKET_COUNT],
534            min_val: u64::MAX,
535            max_val: 0,
536            sum: 0,
537            sum_sq: 0,
538            count: 0,
539            max_value,
540        }
541    }
542
543    #[inline]
544    fn value_to_bucket(&self, value: u64) -> usize {
545        if value >= self.max_value {
546            BUCKET_COUNT - 1
547        } else {
548            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
549        }
550    }
551
552    #[inline]
553    pub fn min(&self) -> u64 {
554        if self.count == 0 { 0 } else { self.min_val }
555    }
556
557    #[inline]
558    pub fn max(&self) -> u64 {
559        self.max_val
560    }
561
562    #[inline]
563    pub fn mean(&self) -> f64 {
564        if self.count == 0 {
565            0.0
566        } else {
567            self.sum as f64 / self.count as f64
568        }
569    }
570
571    #[inline]
572    pub fn stdev(&self) -> f64 {
573        if self.count == 0 {
574            return 0.0;
575        }
576        let mean = self.mean();
577        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
578        if variance < 0.0 {
579            return 0.0;
580        }
581        #[cfg(feature = "std")]
582        return variance.sqrt();
583        #[cfg(not(feature = "std"))]
584        return sqrt(variance);
585    }
586
587    #[inline]
588    pub fn percentile(&self, percentile: f64) -> u64 {
589        if self.count == 0 {
590            return 0;
591        }
592
593        let target_count = (self.count as f64 * percentile) as u64;
594        let mut accumulated = 0u64;
595
596        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
597            accumulated += bucket_count;
598            if accumulated >= target_count {
599                // Linear interpolation within the bucket
600                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
601                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
602                let bucket_fraction = if bucket_count > 0 {
603                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
604                } else {
605                    0.5
606                };
607                return bucket_start
608                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
609            }
610        }
611
612        self.max_val
613    }
614
615    /// Adds a value to the statistics.
616    #[inline]
617    pub fn record(&mut self, value: u64) {
618        if value < self.min_val {
619            self.min_val = value;
620        }
621        if value > self.max_val {
622            self.max_val = value;
623        }
624        self.sum += value;
625        self.sum_sq += value * value;
626        self.count += 1;
627
628        let bucket = self.value_to_bucket(value);
629        self.buckets[bucket] += 1;
630    }
631
632    #[inline]
633    pub fn len(&self) -> u64 {
634        self.count
635    }
636
637    #[inline]
638    pub fn is_empty(&self) -> bool {
639        self.count == 0
640    }
641
642    #[inline]
643    pub fn reset(&mut self) {
644        self.buckets.fill(0);
645        self.min_val = u64::MAX;
646        self.max_val = 0;
647        self.sum = 0;
648        self.sum_sq = 0;
649        self.count = 0;
650    }
651}
652
653/// A Specialized statistics object for CuDuration.
654/// It will also keep track of the jitter between the values.
655#[derive(Debug, Clone)]
656pub struct CuDurationStatistics {
657    bare: LiveStatistics,
658    jitter: LiveStatistics,
659    last_value: CuDuration,
660}
661
662impl CuDurationStatistics {
663    pub fn new(max: CuDuration) -> Self {
664        let CuDuration(max) = max;
665        CuDurationStatistics {
666            bare: LiveStatistics::new_with_max(max),
667            jitter: LiveStatistics::new_with_max(max),
668            last_value: CuDuration::default(),
669        }
670    }
671
672    #[inline]
673    pub fn min(&self) -> CuDuration {
674        CuDuration(self.bare.min())
675    }
676
677    #[inline]
678    pub fn max(&self) -> CuDuration {
679        CuDuration(self.bare.max())
680    }
681
682    #[inline]
683    pub fn mean(&self) -> CuDuration {
684        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
685    }
686
687    #[inline]
688    pub fn percentile(&self, percentile: f64) -> CuDuration {
689        CuDuration(self.bare.percentile(percentile))
690    }
691
692    #[inline]
693    pub fn stddev(&self) -> CuDuration {
694        CuDuration(self.bare.stdev() as u64)
695    }
696
697    #[inline]
698    pub fn len(&self) -> u64 {
699        self.bare.len()
700    }
701
702    #[inline]
703    pub fn is_empty(&self) -> bool {
704        self.bare.len() == 0
705    }
706
707    #[inline]
708    pub fn jitter_min(&self) -> CuDuration {
709        CuDuration(self.jitter.min())
710    }
711
712    #[inline]
713    pub fn jitter_max(&self) -> CuDuration {
714        CuDuration(self.jitter.max())
715    }
716
717    #[inline]
718    pub fn jitter_mean(&self) -> CuDuration {
719        CuDuration(self.jitter.mean() as u64)
720    }
721
722    #[inline]
723    pub fn jitter_stddev(&self) -> CuDuration {
724        CuDuration(self.jitter.stdev() as u64)
725    }
726
727    #[inline]
728    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
729        CuDuration(self.jitter.percentile(percentile))
730    }
731
732    #[inline]
733    pub fn record(&mut self, value: CuDuration) {
734        let CuDuration(nanos) = value;
735        if self.bare.is_empty() {
736            self.bare.record(nanos);
737            self.last_value = value;
738            return;
739        }
740        self.bare.record(nanos);
741        let CuDuration(last_nanos) = self.last_value;
742        self.jitter.record(nanos.abs_diff(last_nanos));
743        self.last_value = value;
744    }
745
746    #[inline]
747    pub fn reset(&mut self) {
748        self.bare.reset();
749        self.jitter.reset();
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756
757    #[test]
758    fn test_live_statistics_percentiles() {
759        let mut stats = LiveStatistics::new_with_max(1000);
760
761        // Record 100 values from 0 to 99
762        for i in 0..100 {
763            stats.record(i);
764        }
765
766        assert_eq!(stats.len(), 100);
767        assert_eq!(stats.min(), 0);
768        assert_eq!(stats.max(), 99);
769        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
770
771        // Test percentiles - should be approximately correct
772        let p50 = stats.percentile(0.5);
773        let p90 = stats.percentile(0.90);
774        let p95 = stats.percentile(0.95);
775        let p99 = stats.percentile(0.99);
776
777        // With 100 samples from 0-99, percentiles should be close to their index
778        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
779        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
780        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
781        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
782    }
783
784    #[test]
785    fn test_duration_stats() {
786        let mut stats = CuDurationStatistics::new(CuDuration(1000));
787        stats.record(CuDuration(100));
788        stats.record(CuDuration(200));
789        stats.record(CuDuration(500));
790        stats.record(CuDuration(400));
791        assert_eq!(stats.min(), CuDuration(100));
792        assert_eq!(stats.max(), CuDuration(500));
793        assert_eq!(stats.mean(), CuDuration(300));
794        assert_eq!(stats.len(), 4);
795        assert_eq!(stats.jitter.len(), 3);
796        assert_eq!(stats.jitter_min(), CuDuration(100));
797        assert_eq!(stats.jitter_max(), CuDuration(300));
798        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
799        stats.reset();
800        assert_eq!(stats.len(), 0);
801    }
802}