Skip to main content

codex_runtime/runtime/
metrics.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use serde::{Deserialize, Serialize};
4
5const SINK_LATENCY_BUCKET_UPPER_US: [u64; 8] =
6    [100, 250, 500, 1_000, 2_500, 5_000, 10_000, u64::MAX];
7const SINK_LATENCY_BUCKET_COUNT: usize = SINK_LATENCY_BUCKET_UPPER_US.len();
8
9#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
10#[serde(rename_all = "camelCase")]
11pub struct RuntimeMetricsSnapshot {
12    pub uptime_millis: u64,
13    pub ingress_total: u64,
14    pub ingress_rate_per_sec: f64,
15    pub pending_rpc_count: u64,
16    pub pending_server_request_count: u64,
17    pub detached_task_init_failed_count: u64,
18    pub event_sink_queue_depth: u64,
19    pub event_sink_queue_dropped: u64,
20    pub broadcast_send_failed: u64,
21    pub sink_write_count: u64,
22    pub sink_write_error_count: u64,
23    pub sink_latency_avg_micros: f64,
24    pub sink_latency_p95_micros: u64,
25    pub sink_latency_max_micros: u64,
26}
27
28/// Runtime counters used for snapshots and long-run regression checks.
29/// All counters are lock-free atomics; hot paths must remain O(1).
30pub(crate) struct RuntimeMetrics {
31    start_unix_millis: i64,
32    ingress_total: AtomicU64,
33    pending_rpc_count: AtomicU64,
34    pending_server_request_count: AtomicU64,
35    detached_task_init_failed_count: AtomicU64,
36    event_sink_queue_depth: AtomicU64,
37    event_sink_queue_dropped: AtomicU64,
38    broadcast_send_failed: AtomicU64,
39    sink_write_count: AtomicU64,
40    sink_write_error_count: AtomicU64,
41    sink_latency_total_micros: AtomicU64,
42    sink_latency_max_micros: AtomicU64,
43    sink_latency_buckets: [AtomicU64; SINK_LATENCY_BUCKET_COUNT],
44}
45
46impl RuntimeMetrics {
47    /// Create runtime metrics with fixed zeroed counters.
48    /// Allocation: none. Complexity: O(1).
49    pub(crate) fn new(start_unix_millis: i64) -> Self {
50        Self {
51            start_unix_millis,
52            ingress_total: AtomicU64::new(0),
53            pending_rpc_count: AtomicU64::new(0),
54            pending_server_request_count: AtomicU64::new(0),
55            detached_task_init_failed_count: AtomicU64::new(0),
56            event_sink_queue_depth: AtomicU64::new(0),
57            event_sink_queue_dropped: AtomicU64::new(0),
58            broadcast_send_failed: AtomicU64::new(0),
59            sink_write_count: AtomicU64::new(0),
60            sink_write_error_count: AtomicU64::new(0),
61            sink_latency_total_micros: AtomicU64::new(0),
62            sink_latency_max_micros: AtomicU64::new(0),
63            sink_latency_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
64        }
65    }
66
67    /// Record one inbound message.
68    /// Allocation: none. Complexity: O(1).
69    pub(crate) fn record_ingress(&self) {
70        self.ingress_total.fetch_add(1, Ordering::Relaxed);
71    }
72
73    /// Increment pending RPC count.
74    /// Allocation: none. Complexity: O(1).
75    pub(crate) fn inc_pending_rpc(&self) {
76        self.pending_rpc_count.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Decrement pending RPC count (saturating).
80    /// Allocation: none. Complexity: O(1).
81    pub(crate) fn dec_pending_rpc(&self) {
82        saturating_dec(&self.pending_rpc_count);
83    }
84
85    /// Force pending RPC count to known value.
86    /// Allocation: none. Complexity: O(1).
87    pub(crate) fn set_pending_rpc_count(&self, count: u64) {
88        self.pending_rpc_count.store(count, Ordering::Relaxed);
89    }
90
91    /// Increment pending server-request count.
92    /// Allocation: none. Complexity: O(1).
93    pub(crate) fn inc_pending_server_request(&self) {
94        self.pending_server_request_count
95            .fetch_add(1, Ordering::Relaxed);
96    }
97
98    /// Decrement pending server-request count (saturating).
99    /// Allocation: none. Complexity: O(1).
100    pub(crate) fn dec_pending_server_request(&self) {
101        saturating_dec(&self.pending_server_request_count);
102    }
103
104    /// Force pending server-request count to known value.
105    /// Allocation: none. Complexity: O(1).
106    pub(crate) fn set_pending_server_request_count(&self, count: u64) {
107        self.pending_server_request_count
108            .store(count, Ordering::Relaxed);
109    }
110
111    /// Record one detached-task helper runtime initialization failure.
112    /// Allocation: none. Complexity: O(1).
113    pub(crate) fn record_detached_task_init_failed(&self) {
114        self.detached_task_init_failed_count
115            .fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Record one successful sink queue enqueue.
119    /// Allocation: none. Complexity: O(1).
120    pub(crate) fn inc_event_sink_queue_depth(&self) {
121        self.event_sink_queue_depth.fetch_add(1, Ordering::Relaxed);
122    }
123
124    /// Record one sink queue dequeue.
125    /// Allocation: none. Complexity: O(1).
126    pub(crate) fn dec_event_sink_queue_depth(&self) {
127        saturating_dec(&self.event_sink_queue_depth);
128    }
129
130    /// Record one dropped envelope before sink processing.
131    /// Allocation: none. Complexity: O(1).
132    pub(crate) fn record_event_sink_drop(&self) {
133        self.event_sink_queue_dropped
134            .fetch_add(1, Ordering::Relaxed);
135    }
136
137    /// Record one failed broadcast send.
138    /// Allocation: none. Complexity: O(1).
139    pub(crate) fn record_broadcast_send_failed(&self) {
140        self.broadcast_send_failed.fetch_add(1, Ordering::Relaxed);
141    }
142
143    /// Record one sink write attempt with elapsed latency.
144    /// Allocation: none. Complexity: O(1).
145    pub(crate) fn record_sink_write(&self, latency_micros: u64, is_error: bool) {
146        self.sink_write_count.fetch_add(1, Ordering::Relaxed);
147        if is_error {
148            self.sink_write_error_count.fetch_add(1, Ordering::Relaxed);
149        }
150        self.sink_latency_total_micros
151            .fetch_add(latency_micros, Ordering::Relaxed);
152        max_update(&self.sink_latency_max_micros, latency_micros);
153
154        let bucket_index = sink_latency_bucket_index(latency_micros);
155        self.sink_latency_buckets[bucket_index].fetch_add(1, Ordering::Relaxed);
156    }
157
158    /// Build immutable metrics snapshot for observability/reporting.
159    /// Allocation: none. Complexity: O(bucket_count).
160    pub(crate) fn snapshot(&self, now_unix_millis: i64) -> RuntimeMetricsSnapshot {
161        let uptime_millis = if now_unix_millis <= self.start_unix_millis {
162            0
163        } else {
164            (now_unix_millis - self.start_unix_millis) as u64
165        };
166        let ingress_total = self.ingress_total.load(Ordering::Relaxed);
167        let ingress_rate_per_sec = if uptime_millis == 0 {
168            0.0
169        } else {
170            (ingress_total as f64) / ((uptime_millis as f64) / 1_000.0)
171        };
172
173        let sink_write_count = self.sink_write_count.load(Ordering::Relaxed);
174        let sink_latency_total_micros = self.sink_latency_total_micros.load(Ordering::Relaxed);
175        let sink_latency_avg_micros = if sink_write_count == 0 {
176            0.0
177        } else {
178            (sink_latency_total_micros as f64) / (sink_write_count as f64)
179        };
180
181        RuntimeMetricsSnapshot {
182            uptime_millis,
183            ingress_total,
184            ingress_rate_per_sec,
185            pending_rpc_count: self.pending_rpc_count.load(Ordering::Relaxed),
186            pending_server_request_count: self.pending_server_request_count.load(Ordering::Relaxed),
187            detached_task_init_failed_count: self
188                .detached_task_init_failed_count
189                .load(Ordering::Relaxed),
190            event_sink_queue_depth: self.event_sink_queue_depth.load(Ordering::Relaxed),
191            event_sink_queue_dropped: self.event_sink_queue_dropped.load(Ordering::Relaxed),
192            broadcast_send_failed: self.broadcast_send_failed.load(Ordering::Relaxed),
193            sink_write_count,
194            sink_write_error_count: self.sink_write_error_count.load(Ordering::Relaxed),
195            sink_latency_avg_micros,
196            sink_latency_p95_micros: self.sink_latency_p95_micros(),
197            sink_latency_max_micros: self.sink_latency_max_micros.load(Ordering::Relaxed),
198        }
199    }
200
201    fn sink_latency_p95_micros(&self) -> u64 {
202        let total = self.sink_write_count.load(Ordering::Relaxed);
203        if total == 0 {
204            return 0;
205        }
206        let threshold = total.saturating_mul(95).div_ceil(100);
207        let mut cumulative = 0u64;
208        for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
209            cumulative =
210                cumulative.saturating_add(self.sink_latency_buckets[i].load(Ordering::Relaxed));
211            if cumulative >= threshold {
212                return *upper;
213            }
214        }
215        u64::MAX
216    }
217}
218
219fn sink_latency_bucket_index(latency_micros: u64) -> usize {
220    for (i, upper) in SINK_LATENCY_BUCKET_UPPER_US.iter().enumerate() {
221        if latency_micros <= *upper {
222            return i;
223        }
224    }
225    SINK_LATENCY_BUCKET_UPPER_US.len().saturating_sub(1)
226}
227
228fn saturating_dec(v: &AtomicU64) {
229    let mut current = v.load(Ordering::Relaxed);
230    loop {
231        if current == 0 {
232            return;
233        }
234        match v.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed) {
235            Ok(_) => return,
236            Err(next) => current = next,
237        }
238    }
239}
240
241fn max_update(v: &AtomicU64, candidate: u64) {
242    let mut current = v.load(Ordering::Relaxed);
243    while candidate > current {
244        match v.compare_exchange_weak(current, candidate, Ordering::Relaxed, Ordering::Relaxed) {
245            Ok(_) => return,
246            Err(next) => current = next,
247        }
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn snapshot_computes_p95_from_histogram() {
257        let metrics = RuntimeMetrics::new(0);
258        for _ in 0..95 {
259            metrics.record_sink_write(80, false);
260        }
261        for _ in 0..5 {
262            metrics.record_sink_write(8_000, false);
263        }
264
265        let snapshot = metrics.snapshot(2_000);
266        assert_eq!(snapshot.sink_write_count, 100);
267        assert_eq!(snapshot.sink_latency_p95_micros, 100);
268        assert_eq!(snapshot.sink_latency_max_micros, 8_000);
269    }
270
271    #[test]
272    fn pending_counters_do_not_underflow() {
273        let metrics = RuntimeMetrics::new(0);
274        metrics.dec_pending_rpc();
275        metrics.dec_pending_server_request();
276        let snapshot = metrics.snapshot(1_000);
277        assert_eq!(snapshot.pending_rpc_count, 0);
278        assert_eq!(snapshot.pending_server_request_count, 0);
279    }
280
281    #[test]
282    fn snapshot_tracks_detached_task_init_failures() {
283        let metrics = RuntimeMetrics::new(0);
284        metrics.record_detached_task_init_failed();
285        metrics.record_detached_task_init_failed();
286
287        let snapshot = metrics.snapshot(1_000);
288        assert_eq!(snapshot.detached_task_init_failed_count, 2);
289    }
290}