1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//! Lock-free metrics for Cell notifications.
//!
//! Provides observability into cell performance including notification counts,
//! timing, and subscriber tracking.
use std::sync::atomic::{AtomicU64, Ordering};
/// Lock-free metrics for Cell notifications.
///
/// All fields use atomic operations for thread-safe updates without locks.
/// Use `Ordering::Relaxed` for counters as exact ordering isn't critical for monitoring.
#[derive(Debug, Default)]
pub struct CellMetrics {
/// Total number of notify() calls
notify_count: AtomicU64,
/// Total time spent in notify() calls (nanoseconds)
total_notify_time_ns: AtomicU64,
/// Slowest single subscriber callback (nanoseconds)
slowest_subscriber_ns: AtomicU64,
/// Current number of subscribers
subscriber_count: AtomicU64,
/// Duration of the last notify() call (nanoseconds)
last_notify_time_ns: AtomicU64,
}
impl CellMetrics {
/// Create a new metrics instance with all counters at zero.
pub fn new() -> Self {
Self::default()
}
/// Record a notify call with its duration.
#[inline]
pub fn record_notify(&self, duration_ns: u64) {
self.notify_count.fetch_add(1, Ordering::Relaxed);
self.total_notify_time_ns
.fetch_add(duration_ns, Ordering::Relaxed);
self.last_notify_time_ns
.store(duration_ns, Ordering::Relaxed);
}
/// Update slowest subscriber using compare-and-swap to track the maximum.
#[inline]
pub fn update_slowest_subscriber(&self, duration_ns: u64) {
let mut current = self.slowest_subscriber_ns.load(Ordering::Relaxed);
while duration_ns > current {
match self.slowest_subscriber_ns.compare_exchange_weak(
current,
duration_ns,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
/// Record that a subscriber was added.
#[inline]
pub fn record_subscriber_added(&self) {
self.subscriber_count.fetch_add(1, Ordering::Relaxed);
}
/// Record that a subscriber was removed.
#[inline]
pub fn record_subscriber_removed(&self) {
self.subscriber_count.fetch_sub(1, Ordering::Relaxed);
}
/// Get the total number of notify() calls.
pub fn notify_count(&self) -> u64 {
self.notify_count.load(Ordering::Relaxed)
}
/// Get the total time spent in notify() calls (nanoseconds).
pub fn total_notify_time_ns(&self) -> u64 {
self.total_notify_time_ns.load(Ordering::Relaxed)
}
/// Get the average time per notify() call (nanoseconds).
pub fn avg_notify_time_ns(&self) -> u64 {
let count = self.notify_count();
if count == 0 {
0
} else {
self.total_notify_time_ns() / count
}
}
/// Get the slowest subscriber callback time (nanoseconds).
pub fn slowest_subscriber_ns(&self) -> u64 {
self.slowest_subscriber_ns.load(Ordering::Relaxed)
}
/// Get the current number of subscribers.
pub fn subscriber_count(&self) -> u64 {
self.subscriber_count.load(Ordering::Relaxed)
}
/// Get the duration of the last notify() call (nanoseconds).
pub fn last_notify_time_ns(&self) -> u64 {
self.last_notify_time_ns.load(Ordering::Relaxed)
}
/// Reset timing metrics (notify_count, total_notify_time_ns, slowest_subscriber_ns, last_notify_time_ns).
///
/// Note: subscriber_count is NOT reset as it tracks current state.
pub fn reset_timing(&self) {
self.notify_count.store(0, Ordering::Relaxed);
self.total_notify_time_ns.store(0, Ordering::Relaxed);
self.slowest_subscriber_ns.store(0, Ordering::Relaxed);
self.last_notify_time_ns.store(0, Ordering::Relaxed);
}
}