Skip to main content

oxios_kernel/
metrics.rs

1//! Metrics — Prometheus-compatible counters, gauges, and histograms.
2//!
3//! This module provides in-process metrics without external dependencies.
4//! Exposed via GET /api/metrics in Prometheus text format.
5
6#![allow(missing_docs)]
7
8use parking_lot::{Mutex, RwLock};
9use std::sync::atomic::{AtomicU64, Ordering};
10
11/// Thread-safe metrics registry.
12#[derive(Default)]
13pub struct MetricsRegistry {
14    counters: RwLock<Vec<Counter>>,
15    gauges: RwLock<Vec<Gauge>>,
16    histograms: RwLock<Vec<Histogram>>,
17}
18
19impl MetricsRegistry {
20    /// Create a new metrics registry.
21    pub fn new() -> Self {
22        Self::default()
23    }
24
25    /// Register a new counter and return a handle.
26    pub fn counter(
27        &self,
28        name: &'static str,
29        help: &'static str,
30        labels: &[(&'static str, &'static str)],
31    ) -> CounterHandle {
32        let mut counters = self.counters.write();
33        let id = counters.len();
34        counters.push(Counter {
35            name: name.into(),
36            help: help.into(),
37            labels: labels.into(),
38            value: AtomicU64::new(0),
39        });
40        CounterHandle { id }
41    }
42
43    /// Register a new gauge and return a handle.
44    pub fn gauge(&self, name: &'static str, help: &'static str, initial: f64) -> GaugeHandle {
45        let mut gauges = self.gauges.write();
46        let id = gauges.len();
47        gauges.push(Gauge {
48            name: name.into(),
49            help: help.into(),
50            value: Mutex::new(initial),
51        });
52        GaugeHandle { id }
53    }
54
55    /// Register a new histogram and return a handle.
56    pub fn histogram(
57        &self,
58        name: &'static str,
59        help: &'static str,
60        buckets: Vec<f64>,
61    ) -> HistogramHandle {
62        let mut histograms = self.histograms.write();
63        let id = histograms.len();
64        let counts: Vec<usize> = vec![0; buckets.len() + 1];
65        histograms.push(Histogram {
66            name: name.into(),
67            help: help.into(),
68            buckets: buckets.clone(),
69            counts: RwLock::new(counts),
70            sum: Mutex::new(0.0),
71            count: Mutex::new(0u64),
72        });
73        HistogramHandle { id, buckets }
74    }
75
76    /// Export all metrics in Prometheus text format.
77    pub fn export(&self) -> String {
78        let mut out = String::new();
79
80        // Counters
81        {
82            let counters = self.counters.read();
83            for c in counters.iter() {
84                out.push_str(&format!("# HELP {} {}\n", c.name, c.help));
85                out.push_str(&format!("# TYPE {} counter\n", c.name));
86                let value = c.value.load(Ordering::Relaxed);
87                let labels_str = if c.labels.is_empty() {
88                    String::new()
89                } else {
90                    format!(
91                        "{{{}}}",
92                        c.labels
93                            .iter()
94                            .map(|(k, v)| format!("{k}=\"{v}\""))
95                            .collect::<Vec<_>>()
96                            .join(",")
97                    )
98                };
99                out.push_str(&format!("{}{} {}\n", c.name, labels_str, value));
100            }
101        }
102
103        // Gauges
104        {
105            let gauges = self.gauges.read();
106            for g in gauges.iter() {
107                out.push_str(&format!("# HELP {} {}\n", g.name, g.help));
108                out.push_str(&format!("# TYPE {} gauge\n", g.name));
109                let value = *g.value.lock();
110                out.push_str(&format!("{} {}\n", g.name, value));
111            }
112        }
113
114        // Histograms
115        {
116            let histograms = self.histograms.read();
117            for h in histograms.iter() {
118                out.push_str(&format!("# HELP {} {}\n", h.name, h.help));
119                out.push_str(&format!("# TYPE {} histogram\n", h.name));
120                let sum = *h.sum.lock();
121                let count = *h.count.lock();
122                let bucket_values = h.buckets.clone();
123                let counts = h.counts.read();
124                let mut cumulative = 0usize;
125                for (i, _) in bucket_values.iter().enumerate() {
126                    cumulative += counts[i];
127                    let boundary = bucket_values[i];
128                    out.push_str(&format!(
129                        "{}{{le=\"{}\"}} {}\n",
130                        h.name, boundary, cumulative
131                    ));
132                }
133                // +Inf bucket
134                out.push_str(&format!("{}{{le=\"+Inf\"}} {}\n", h.name, cumulative));
135                out.push_str(&format!("{}_sum {} \n", h.name, sum));
136                out.push_str(&format!("{}_count {} \n", h.name, count));
137            }
138        }
139
140        out
141    }
142}
143
144/// Global metrics registry.
145static REGISTRY: std::sync::OnceLock<MetricsRegistry> = std::sync::OnceLock::new();
146
147/// Get the global metrics registry.
148pub fn registry() -> &'static MetricsRegistry {
149    REGISTRY.get_or_init(MetricsRegistry::new)
150}
151
152#[derive(Clone)]
153pub struct CounterHandle {
154    id: usize,
155}
156
157impl CounterHandle {
158    /// Increment the counter by 1.
159    pub fn inc(&self) {
160        let r = registry();
161        let counters = r.counters.read();
162        if let Some(c) = counters.get(self.id) {
163            c.value.fetch_add(1, Ordering::Relaxed);
164        }
165    }
166
167    /// Increment the counter by `n`.
168    pub fn inc_by(&self, n: u64) {
169        if n == 0 {
170            return;
171        }
172        let r = registry();
173        let counters = r.counters.read();
174        if let Some(c) = counters.get(self.id) {
175            c.value.fetch_add(n, Ordering::Relaxed);
176        }
177    }
178}
179
180#[derive(Clone)]
181pub struct GaugeHandle {
182    id: usize,
183}
184
185impl GaugeHandle {
186    /// Set the gauge to a specific value.
187    pub fn set(&self, v: f64) {
188        let r = registry();
189        let gauges = r.gauges.read();
190        if let Some(g) = gauges.get(self.id) {
191            *g.value.lock() = v;
192        }
193    }
194
195    /// Increment the gauge by 1.
196    pub fn inc(&self) {
197        let r = registry();
198        let gauges = r.gauges.read();
199        if let Some(g) = gauges.get(self.id) {
200            let mut val = g.value.lock();
201            *val += 1.0;
202        }
203    }
204
205    /// Decrement the gauge by 1.
206    pub fn dec(&self) {
207        let r = registry();
208        let gauges = r.gauges.read();
209        if let Some(g) = gauges.get(self.id) {
210            let mut val = g.value.lock();
211            *val -= 1.0;
212        }
213    }
214}
215
216#[derive(Clone)]
217pub struct HistogramHandle {
218    id: usize,
219    buckets: Vec<f64>,
220}
221
222impl HistogramHandle {
223    /// Observe a value, adding it to the histogram.
224    pub fn observe(&self, value: f64) {
225        let r = registry();
226        let histograms = r.histograms.read();
227        if let Some(h) = histograms.get(self.id) {
228            {
229                let mut sum = h.sum.lock();
230                *sum += value;
231            }
232            {
233                let mut count = h.count.lock();
234                *count += 1;
235            }
236            {
237                let mut counts = h.counts.write();
238                for (i, boundary) in self.buckets.iter().enumerate() {
239                    if value <= *boundary {
240                        counts[i] += 1;
241                    }
242                }
243                // +Inf bucket
244                counts[self.buckets.len()] += 1;
245            }
246        }
247    }
248}
249
250struct Counter {
251    name: String,
252    help: String,
253    labels: Vec<(&'static str, &'static str)>,
254    value: AtomicU64,
255}
256
257struct Gauge {
258    name: String,
259    help: String,
260    value: Mutex<f64>,
261}
262
263struct Histogram {
264    name: String,
265    help: String,
266    buckets: Vec<f64>,
267    counts: RwLock<Vec<usize>>,
268    sum: Mutex<f64>,
269    count: Mutex<u64>,
270}
271
272/// Metrics handles initialized at startup.
273#[derive(Clone)]
274pub struct MetricsHandles {
275    pub agents_forked: CounterHandle,
276    pub agents_completed: CounterHandle,
277    pub agents_failed: CounterHandle,
278    /// RFC-027 retry metrics.
279    pub retry_attempted: CounterHandle,
280    pub retry_improved: CounterHandle,
281    pub retry_unchanged: CounterHandle,
282    pub retry_degraded: CounterHandle,
283    pub orch_duration: HistogramHandle,
284    pub messages: CounterHandle,
285    /// LLM circuit breaker state: 0=closed, 1=open, 2=half_open.
286    pub llm_circuit_breaker_state: GaugeHandle,
287    /// Tool execution metrics.
288    pub tool_calls: CounterHandle,
289    pub tool_errors: CounterHandle,
290    pub tool_duration: HistogramHandle,
291    /// LLM call metrics.
292    pub llm_calls: CounterHandle,
293    pub llm_errors: CounterHandle,
294    /// Audit trail events dropped because the bus subscriber lagged.
295    /// Non-zero indicates the audit pipeline can't keep up with the
296    /// event rate; investigate bus capacity or high-frequency publishers.
297    pub audit_lagged_events: CounterHandle,
298}
299
300impl MetricsHandles {
301    /// Increment agents_forked counter.
302    pub fn inc_agents_forked(&self) {
303        self.agents_forked.inc();
304    }
305
306    /// Increment agents_completed counter.
307    pub fn inc_agents_completed(&self) {
308        self.agents_completed.inc();
309    }
310
311    /// Increment agents_failed counter.
312    pub fn inc_agents_failed(&self) {
313        self.agents_failed.inc();
314    }
315
316    /// Increment messages counter.
317    pub fn inc_messages(&self) {
318        self.messages.inc();
319    }
320
321    /// Observe orchestration duration.
322    pub fn observe_orch_duration(&self, value: f64) {
323        self.orch_duration.observe(value);
324    }
325}
326
327/// Global lazy metric handles.
328static METRICS: std::sync::OnceLock<MetricsHandles> = std::sync::OnceLock::new();
329
330/// Get or create the metrics handles.
331pub fn get_metrics() -> &'static MetricsHandles {
332    METRICS.get_or_init(|| {
333        let r = registry();
334        MetricsHandles {
335            agents_forked: r.counter("oxios_agents_forked_total", "Total agents forked", &[]),
336            agents_completed: r.counter(
337                "oxios_agents_completed_total",
338                "Total agents completed",
339                &[],
340            ),
341            agents_failed: r.counter("oxios_agents_failed_total", "Total agents failed", &[]),
342            retry_attempted: r.counter(
343                "oxios_retry_attempted_total",
344                "Review retries attempted",
345                &[],
346            ),
347            retry_improved: r.counter(
348                "oxios_retry_improved_total",
349                "Retries that improved score",
350                &[],
351            ),
352            retry_unchanged: r.counter(
353                "oxios_retry_unchanged_total",
354                "Retries with same score",
355                &[],
356            ),
357            retry_degraded: r.counter(
358                "oxios_retry_degraded_total",
359                "Retries that degraded score",
360                &[],
361            ),
362            orch_duration: r.histogram(
363                "oxios_orchestration_duration_seconds",
364                "Orchestration duration",
365                vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
366            ),
367            messages: r.counter("oxios_messages_processed_total", "Messages processed", &[]),
368            llm_circuit_breaker_state: r.gauge(
369                "oxios_llm_circuit_breaker_state",
370                "LLM circuit breaker state: 0=closed, 1=open, 2=half_open",
371                0.0,
372            ),
373            tool_calls: r.counter("oxios_tool_calls_total", "Tool calls", &[]),
374            tool_errors: r.counter("oxios_tool_errors_total", "Tool errors", &[]),
375            tool_duration: r.histogram(
376                "oxios_tool_duration_seconds",
377                "Tool call duration",
378                vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
379            ),
380            llm_calls: r.counter("oxios_llm_calls_total", "LLM API calls", &[]),
381            llm_errors: r.counter("oxios_llm_errors_total", "LLM API errors", &[]),
382            audit_lagged_events: r.counter(
383                "oxios_audit_lagged_events_total",
384                "Audit events dropped due to broadcast subscriber lag",
385                &[],
386            ),
387        }
388    })
389}
390
391/// Register all built-in metrics. Call once at startup.
392pub fn register_builtin_metrics() {
393    let r = registry();
394
395    // Agent metrics
396    r.counter("oxios_agents_forked_total", "Total agents forked", &[]);
397    r.gauge("oxios_agents_running", "Currently running agents", 0.0);
398    r.counter(
399        "oxios_agents_completed_total",
400        "Total agents completed",
401        &[],
402    );
403    r.counter("oxios_agents_failed_total", "Total agents failed", &[]);
404
405    // Message metrics
406    r.counter(
407        "oxios_messages_processed_total",
408        "User messages processed",
409        &[],
410    );
411    r.histogram(
412        "oxios_orchestration_duration_seconds",
413        "Full orchestration duration",
414        vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
415    );
416
417    // RFC-024 §11: web↔daemon reliability metrics. Registered here so they
418    // appear on /metrics from startup; the corresponding increment sites
419    // live in oxios-gateway (deliver/timeout/replay) and the web routes
420    // (SSE/WS reconnect, asset swap, readiness state).
421    r.counter(
422        "oxios_gateway_messages_total",
423        "Outgoing messages (label: result=delivered|dropped|resynced|timed_out)",
424        &[],
425    );
426    r.counter(
427        "oxios_gateway_replay_requests_total",
428        "Replay requests (label: outcome=replay|resync)",
429        &[],
430    );
431    r.counter(
432        "oxios_sse_reconnects_total",
433        "SSE reconnects (label: reason=ok|lag|error|unauthorized)",
434        &[],
435    );
436    r.counter(
437        "oxios_ws_reconnects_total",
438        "WS reconnects (label: reason=ok|lag|error|unauthorized)",
439        &[],
440    );
441    r.counter(
442        "oxios_web_dist_swaps_total",
443        "Atomic web-dist swaps (RFC-024 SP3)",
444        &[],
445    );
446    r.gauge(
447        "oxios_readiness_state",
448        "0 = warming up, 1 = ready (RFC-024 SP4)",
449        0.0,
450    );
451    r.histogram(
452        "oxios_phase_duration_seconds",
453        "Phase duration",
454        vec![0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0],
455    );
456
457    // LLM metrics
458    r.counter("oxios_llm_calls_total", "LLM API calls", &[]);
459    r.histogram(
460        "oxios_llm_duration_seconds",
461        "LLM call duration",
462        vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
463    );
464    r.counter("oxios_llm_errors_total", "LLM API errors", &[]);
465    // Audit pipeline metric (state-area F4): events silently dropped when
466    // the audit trail subscriber falls behind the broadcast bus.
467    r.counter(
468        "oxios_audit_lagged_events_total",
469        "Audit events dropped due to broadcast subscriber lag",
470        &[],
471    );
472
473    // Tool metrics
474    r.counter("oxios_tool_calls_total", "Tool calls", &[]);
475    r.histogram(
476        "oxios_tool_duration_seconds",
477        "Tool call duration",
478        vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
479    );
480    r.counter("oxios_tool_errors_total", "Tool errors", &[]);
481
482    // Memory metrics
483    r.gauge("oxios_memory_entries_total", "Total memory entries", 0.0);
484    r.counter("oxios_memory_recall_total", "Memory recall operations", &[]);
485
486    // Container metrics
487    r.counter("oxios_exec_total", "Exec calls", &[]);
488    r.histogram(
489        "oxios_exec_duration_seconds",
490        "Exec duration",
491        vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0],
492    );
493
494    // Session metrics
495    r.gauge("oxios_active_sessions", "Active sessions", 0.0);
496
497    // Initialize get_metrics() handles
498    let _ = get_metrics();
499}