1use std::sync::atomic::{AtomicU64, Ordering};
2
3use serde::{Deserialize, Serialize};
4
5const SINK_LATENCY_BUCKET_UPPER_US: [u64; 8] =
6 [100, 250, 500, 1_000, 2_500, 5_000, 10_000, u64::MAX];
7const SINK_LATENCY_BUCKET_COUNT: usize = SINK_LATENCY_BUCKET_UPPER_US.len();
8
9#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
10#[serde(rename_all = "camelCase")]
11pub struct RuntimeMetricsSnapshot {
12 pub uptime_millis: u64,
13 pub ingress_total: u64,
14 pub ingress_rate_per_sec: f64,
15 pub pending_rpc_count: u64,
16 pub pending_server_request_count: u64,
17 pub detached_task_init_failed_count: u64,
18 pub event_sink_queue_depth: u64,
19 pub event_sink_queue_dropped: u64,
20 pub broadcast_send_failed: u64,
21 pub sink_write_count: u64,
22 pub sink_write_error_count: u64,
23 pub sink_latency_avg_micros: f64,
24 pub sink_latency_p95_micros: u64,
25 pub sink_latency_max_micros: u64,
26}
27
28pub(crate) struct RuntimeMetrics {
31 start_unix_millis: i64,
32 ingress_total: AtomicU64,
33 pending_rpc_count: AtomicU64,
34 pending_server_request_count: AtomicU64,
35 detached_task_init_failed_count: AtomicU64,
36 event_sink_queue_depth: AtomicU64,
37 event_sink_queue_dropped: AtomicU64,
38 broadcast_send_failed: AtomicU64,
39 sink_write_count: AtomicU64,
40 sink_write_error_count: AtomicU64,
41 sink_latency_total_micros: AtomicU64,
42 sink_latency_max_micros: AtomicU64,
43 sink_latency_buckets: [AtomicU64; SINK_LATENCY_BUCKET_COUNT],
44}
45
46impl RuntimeMetrics {
47 pub(crate) fn new(start_unix_millis: i64) -> Self {
50 Self {
51 start_unix_millis,
52 ingress_total: AtomicU64::new(0),
53 pending_rpc_count: AtomicU64::new(0),
54 pending_server_request_count: AtomicU64::new(0),
55 detached_task_init_failed_count: AtomicU64::new(0),
56 event_sink_queue_depth: AtomicU64::new(0),
57 event_sink_queue_dropped: AtomicU64::new(0),
58 broadcast_send_failed: AtomicU64::new(0),
59 sink_write_count: AtomicU64::new(0),
60 sink_write_error_count: AtomicU64::new(0),
61 sink_latency_total_micros: AtomicU64::new(0),
62 sink_latency_max_micros: AtomicU64::new(0),
63 sink_latency_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
64 }
65 }
66
67 pub(crate) fn record_ingress(&self) {
70 self.ingress_total.fetch_add(1, Ordering::Relaxed);
71 }
72
73 pub(crate) fn inc_pending_rpc(&self) {
76 self.pending_rpc_count.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub(crate) fn dec_pending_rpc(&self) {
82 saturating_dec(&self.pending_rpc_count);
83 }
84
85 pub(crate) fn set_pending_rpc_count(&self, count: u64) {
88 self.pending_rpc_count.store(count, Ordering::Relaxed);
89 }
90
91 pub(crate) fn inc_pending_server_request(&self) {
94 self.pending_server_request_count
95 .fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub(crate) fn dec_pending_server_request(&self) {
101 saturating_dec(&self.pending_server_request_count);
102 }
103
104 pub(crate) fn set_pending_server_request_count(&self, count: u64) {
107 self.pending_server_request_count
108 .store(count, Ordering::Relaxed);
109 }
110
111 pub(crate) fn record_detached_task_init_failed(&self) {
114 self.detached_task_init_failed_count
115 .fetch_add(1, Ordering::Relaxed);
116 }
117
118 pub(crate) fn inc_event_sink_queue_depth(&self) {
121 self.event_sink_queue_depth.fetch_add(1, Ordering::Relaxed);
122 }
123
124 pub(crate) fn dec_event_sink_queue_depth(&self) {
127 saturating_dec(&self.event_sink_queue_depth);
128 }
129
130 pub(crate) fn record_event_sink_drop(&self) {
133 self.event_sink_queue_dropped
134 .fetch_add(1, Ordering::Relaxed);
135 }
136
137 pub(crate) fn record_broadcast_send_failed(&self) {
140 self.broadcast_send_failed.fetch_add(1, Ordering::Relaxed);
141 }
142
143 pub(crate) fn record_sink_write(&self, latency_micros: u64, is_error: bool) {
146 self.sink_write_count.fetch_add(1, Ordering::Relaxed);
147 if is_error {
148 self.sink_write_error_count.fetch_add(1, Ordering::Relaxed);
149 }
150 self.sink_latency_total_micros
151 .fetch_add(latency_micros, Ordering::Relaxed);
152 max_update(&self.sink_latency_max_micros, latency_micros);
153
154 let bucket_index = sink_latency_bucket_index(latency_micros);
155 self.sink_latency_buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
156 }
157
158 pub(crate) fn snapshot(&self, now_unix_millis: i64) -> RuntimeMetricsSnapshot {
161 let uptime_millis = if now_unix_millis <= self.start_unix_millis {
162 0
163 } else {
164 (now_unix_millis - self.start_unix_millis) as u64
165 };
166 let ingress_total = self.ingress_total.load(Ordering::Relaxed);
167 let ingress_rate_per_sec = if uptime_millis == 0 {
168 0.0
169 } else {
170 (ingress_total as f64) / ((uptime_millis as f64) / 1_000.0)
171 };
172
173 let sink_write_count = self.sink_write_count.load(Ordering::Relaxed);
174 let sink_latency_total_micros = self.sink_latency_total_micros.load(Ordering::Relaxed);
175 let sink_latency_avg_micros = if sink_write_count == 0 {
176 0.0
177 } else {
178 (sink_latency_total_micros as f64) / (sink_write_count as f64)
179 };
180
181 RuntimeMetricsSnapshot {
182 uptime_millis,
183 ingress_total,
184 ingress_rate_per_sec,
185 pending_rpc_count: self.pending_rpc_count.load(Ordering::Relaxed),
186 pending_server_request_count: self.pending_server_request_count.load(Ordering::Relaxed),
187 detached_task_init_failed_count: self
188 .detached_task_init_failed_count
189 .load(Ordering::Relaxed),
190 event_sink_queue_depth: self.event_sink_queue_depth.load(Ordering::Relaxed),
191 event_sink_queue_dropped: self.event_sink_queue_dropped.load(Ordering::Relaxed),
192 broadcast_send_failed: self.broadcast_send_failed.load(Ordering::Relaxed),
193 sink_write_count,
194 sink_write_error_count: self.sink_write_error_count.load(Ordering::Relaxed),
195 sink_latency_avg_micros,
196 sink_latency_p95_micros: self.sink_latency_p95_micros(),
197 sink_latency_max_micros: self.sink_latency_max_micros.load(Ordering::Relaxed),
198 }
199 }
200
201 fn sink_latency_p95_micros(&self) -> u64 {
202 let total = self.sink_write_count.load(Ordering::Relaxed);
203 if total == 0 {
204 return 0;
205 }
206 let threshold = total.saturating_mul(95).div_ceil(100);
207 let mut cumulative = 0u64;
208 for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
209 cumulative =
210 cumulative.saturating_add(self.sink_latency_buckets[i].load(Ordering::Relaxed));
211 if cumulative >= threshold {
212 return *upper;
213 }
214 }
215 u64::MAX
216 }
217}
218
219fn sink_latency_bucket_index(latency_micros: u64) -> usize {
220 for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
221 if latency_micros <= *upper {
222 return i;
223 }
224 }
225 SINK_LATENCY_BUCKET_UPPER_US.len().saturating_sub(1)
226}
227
228fn saturating_dec(v: &AtomicU64) {
229 let mut current = v.load(Ordering::Relaxed);
230 loop {
231 if current == 0 {
232 return;
233 }
234 match v.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed) {
235 Ok(_) => return,
236 Err(next) => current = next,
237 }
238 }
239}
240
241fn max_update(v: &AtomicU64, candidate: u64) {
242 let mut current = v.load(Ordering::Relaxed);
243 while candidate > current {
244 match v.compare_exchange_weak(current, candidate, Ordering::Relaxed, Ordering::Relaxed) {
245 Ok(_) => return,
246 Err(next) => current = next,
247 }
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn snapshot_computes_p95_from_histogram() {
257 let metrics = RuntimeMetrics::new(0);
258 for _ in 0..95 {
259 metrics.record_sink_write(80, false);
260 }
261 for _ in 0..5 {
262 metrics.record_sink_write(8_000, false);
263 }
264
265 let snapshot = metrics.snapshot(2_000);
266 assert_eq!(snapshot.sink_write_count, 100);
267 assert_eq!(snapshot.sink_latency_p95_micros, 100);
268 assert_eq!(snapshot.sink_latency_max_micros, 8_000);
269 }
270
271 #[test]
272 fn pending_counters_do_not_underflow() {
273 let metrics = RuntimeMetrics::new(0);
274 metrics.dec_pending_rpc();
275 metrics.dec_pending_server_request();
276 let snapshot = metrics.snapshot(1_000);
277 assert_eq!(snapshot.pending_rpc_count, 0);
278 assert_eq!(snapshot.pending_server_request_count, 0);
279 }
280
281 #[test]
282 fn snapshot_tracks_detached_task_init_failures() {
283 let metrics = RuntimeMetrics::new(0);
284 metrics.record_detached_task_init_failed();
285 metrics.record_detached_task_init_failed();
286
287 let snapshot = metrics.snapshot(1_000);
288 assert_eq!(snapshot.detached_task_init_failed_count, 2);
289 }
290}