1use std::sync::atomic::{AtomicU64, Ordering};
2
3use serde::{Deserialize, Serialize};
4
5const SINK_LATENCY_BUCKET_UPPER_US: [u64; 8] =
6 [100, 250, 500, 1_000, 2_500, 5_000, 10_000, u64::MAX];
7const SINK_LATENCY_BUCKET_COUNT: usize = SINK_LATENCY_BUCKET_UPPER_US.len();
8
9#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
10#[serde(rename_all = "camelCase")]
11pub struct RuntimeMetricsSnapshot {
12 pub uptime_millis: u64,
13 pub ingress_total: u64,
14 pub ingress_rate_per_sec: f64,
15 pub pending_rpc_count: u64,
16 pub pending_server_request_count: u64,
17 pub event_sink_queue_depth: u64,
18 pub event_sink_queue_dropped: u64,
19 pub broadcast_send_failed: u64,
20 pub sink_write_count: u64,
21 pub sink_write_error_count: u64,
22 pub sink_latency_avg_micros: f64,
23 pub sink_latency_p95_micros: u64,
24 pub sink_latency_max_micros: u64,
25}
26
27pub(crate) struct RuntimeMetrics {
30 start_unix_millis: i64,
31 ingress_total: AtomicU64,
32 pending_rpc_count: AtomicU64,
33 pending_server_request_count: AtomicU64,
34 event_sink_queue_depth: AtomicU64,
35 event_sink_queue_dropped: AtomicU64,
36 broadcast_send_failed: AtomicU64,
37 sink_write_count: AtomicU64,
38 sink_write_error_count: AtomicU64,
39 sink_latency_total_micros: AtomicU64,
40 sink_latency_max_micros: AtomicU64,
41 sink_latency_buckets: [AtomicU64; SINK_LATENCY_BUCKET_COUNT],
42}
43
44impl RuntimeMetrics {
45 pub(crate) fn new(start_unix_millis: i64) -> Self {
48 Self {
49 start_unix_millis,
50 ingress_total: AtomicU64::new(0),
51 pending_rpc_count: AtomicU64::new(0),
52 pending_server_request_count: AtomicU64::new(0),
53 event_sink_queue_depth: AtomicU64::new(0),
54 event_sink_queue_dropped: AtomicU64::new(0),
55 broadcast_send_failed: AtomicU64::new(0),
56 sink_write_count: AtomicU64::new(0),
57 sink_write_error_count: AtomicU64::new(0),
58 sink_latency_total_micros: AtomicU64::new(0),
59 sink_latency_max_micros: AtomicU64::new(0),
60 sink_latency_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
61 }
62 }
63
64 pub(crate) fn record_ingress(&self) {
67 self.ingress_total.fetch_add(1, Ordering::Relaxed);
68 }
69
70 pub(crate) fn inc_pending_rpc(&self) {
73 self.pending_rpc_count.fetch_add(1, Ordering::Relaxed);
74 }
75
76 pub(crate) fn dec_pending_rpc(&self) {
79 saturating_dec(&self.pending_rpc_count);
80 }
81
82 pub(crate) fn set_pending_rpc_count(&self, count: u64) {
85 self.pending_rpc_count.store(count, Ordering::Relaxed);
86 }
87
88 pub(crate) fn inc_pending_server_request(&self) {
91 self.pending_server_request_count
92 .fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub(crate) fn dec_pending_server_request(&self) {
98 saturating_dec(&self.pending_server_request_count);
99 }
100
101 pub(crate) fn set_pending_server_request_count(&self, count: u64) {
104 self.pending_server_request_count
105 .store(count, Ordering::Relaxed);
106 }
107
108 pub(crate) fn inc_event_sink_queue_depth(&self) {
111 self.event_sink_queue_depth.fetch_add(1, Ordering::Relaxed);
112 }
113
114 pub(crate) fn dec_event_sink_queue_depth(&self) {
117 saturating_dec(&self.event_sink_queue_depth);
118 }
119
120 pub(crate) fn record_event_sink_drop(&self) {
123 self.event_sink_queue_dropped
124 .fetch_add(1, Ordering::Relaxed);
125 }
126
127 pub(crate) fn record_broadcast_send_failed(&self) {
130 self.broadcast_send_failed.fetch_add(1, Ordering::Relaxed);
131 }
132
133 pub(crate) fn record_sink_write(&self, latency_micros: u64, is_error: bool) {
136 self.sink_write_count.fetch_add(1, Ordering::Relaxed);
137 if is_error {
138 self.sink_write_error_count.fetch_add(1, Ordering::Relaxed);
139 }
140 self.sink_latency_total_micros
141 .fetch_add(latency_micros, Ordering::Relaxed);
142 max_update(&self.sink_latency_max_micros, latency_micros);
143
144 let bucket_index = sink_latency_bucket_index(latency_micros);
145 self.sink_latency_buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
146 }
147
148 pub(crate) fn snapshot(&self, now_unix_millis: i64) -> RuntimeMetricsSnapshot {
151 let uptime_millis = if now_unix_millis <= self.start_unix_millis {
152 0
153 } else {
154 (now_unix_millis - self.start_unix_millis) as u64
155 };
156 let ingress_total = self.ingress_total.load(Ordering::Relaxed);
157 let ingress_rate_per_sec = if uptime_millis == 0 {
158 0.0
159 } else {
160 (ingress_total as f64) / ((uptime_millis as f64) / 1_000.0)
161 };
162
163 let sink_write_count = self.sink_write_count.load(Ordering::Relaxed);
164 let sink_latency_total_micros = self.sink_latency_total_micros.load(Ordering::Relaxed);
165 let sink_latency_avg_micros = if sink_write_count == 0 {
166 0.0
167 } else {
168 (sink_latency_total_micros as f64) / (sink_write_count as f64)
169 };
170
171 RuntimeMetricsSnapshot {
172 uptime_millis,
173 ingress_total,
174 ingress_rate_per_sec,
175 pending_rpc_count: self.pending_rpc_count.load(Ordering::Relaxed),
176 pending_server_request_count: self.pending_server_request_count.load(Ordering::Relaxed),
177 event_sink_queue_depth: self.event_sink_queue_depth.load(Ordering::Relaxed),
178 event_sink_queue_dropped: self.event_sink_queue_dropped.load(Ordering::Relaxed),
179 broadcast_send_failed: self.broadcast_send_failed.load(Ordering::Relaxed),
180 sink_write_count,
181 sink_write_error_count: self.sink_write_error_count.load(Ordering::Relaxed),
182 sink_latency_avg_micros,
183 sink_latency_p95_micros: self.sink_latency_p95_micros(),
184 sink_latency_max_micros: self.sink_latency_max_micros.load(Ordering::Relaxed),
185 }
186 }
187
188 fn sink_latency_p95_micros(&self) -> u64 {
189 let total = self.sink_write_count.load(Ordering::Relaxed);
190 if total == 0 {
191 return 0;
192 }
193 let threshold = total.saturating_mul(95).div_ceil(100);
194 let mut cumulative = 0u64;
195 for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
196 cumulative =
197 cumulative.saturating_add(self.sink_latency_buckets[i].load(Ordering::Relaxed));
198 if cumulative >= threshold {
199 return *upper;
200 }
201 }
202 u64::MAX
203 }
204}
205
206fn sink_latency_bucket_index(latency_micros: u64) -> usize {
207 for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
208 if latency_micros <= *upper {
209 return i;
210 }
211 }
212 SINK_LATENCY_BUCKET_UPPER_US.len().saturating_sub(1)
213}
214
215fn saturating_dec(v: &AtomicU64) {
216 let mut current = v.load(Ordering::Relaxed);
217 loop {
218 if current == 0 {
219 return;
220 }
221 match v.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed) {
222 Ok(_) => return,
223 Err(next) => current = next,
224 }
225 }
226}
227
228fn max_update(v: &AtomicU64, candidate: u64) {
229 let mut current = v.load(Ordering::Relaxed);
230 while candidate > current {
231 match v.compare_exchange_weak(current, candidate, Ordering::Relaxed, Ordering::Relaxed) {
232 Ok(_) => return,
233 Err(next) => current = next,
234 }
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn snapshot_computes_p95_from_histogram() {
244 let metrics = RuntimeMetrics::new(0);
245 for _ in 0..95 {
246 metrics.record_sink_write(80, false);
247 }
248 for _ in 0..5 {
249 metrics.record_sink_write(8_000, false);
250 }
251
252 let snapshot = metrics.snapshot(2_000);
253 assert_eq!(snapshot.sink_write_count, 100);
254 assert_eq!(snapshot.sink_latency_p95_micros, 100);
255 assert_eq!(snapshot.sink_latency_max_micros, 8_000);
256 }
257
258 #[test]
259 fn pending_counters_do_not_underflow() {
260 let metrics = RuntimeMetrics::new(0);
261 metrics.dec_pending_rpc();
262 metrics.dec_pending_server_request();
263 let snapshot = metrics.snapshot(1_000);
264 assert_eq!(snapshot.pending_rpc_count, 0);
265 assert_eq!(snapshot.pending_server_request_count, 0);
266 }
267}