Skip to main content

inference_runtime/
metrics.rs

1//! `MetricsActor` — in-process aggregation of per-deployment counters.
2//! Doc §7.7, §12.4.
3//!
4//! Sinks observability via `tracing` events; a future
5//! `inference-telemetry` integration plugs Prometheus / OTel exporters
6//! over the same actor.
7
8use std::collections::HashMap;
9
10use async_trait::async_trait;
11use rakka_core::actor::{Actor, Context};
12use tokio::sync::oneshot;
13
14use inference_core::tokens::TokenUsage;
15
16#[derive(Debug, Clone, Default)]
17pub struct DeploymentMetrics {
18    pub requests_succeeded: u64,
19    pub requests_failed: u64,
20    pub rate_limited: u64,
21    pub circuit_open: u64,
22    pub timed_out: u64,
23    pub content_filtered: u64,
24    pub usage: TokenUsage,
25    pub cost_usd: f64,
26}
27
28#[derive(Debug, Clone, Default)]
29pub struct MetricsSnapshot {
30    pub per_deployment: HashMap<String, DeploymentMetrics>,
31}
32
33pub enum MetricsMsg {
34    RecordSuccess {
35        deployment: String,
36        usage: TokenUsage,
37        cost_usd: f64,
38    },
39    RecordFailure {
40        deployment: String,
41        kind: FailureKind,
42    },
43    Snapshot {
44        reply: oneshot::Sender<MetricsSnapshot>,
45    },
46}
47
48#[derive(Debug, Clone, Copy)]
49pub enum FailureKind {
50    RateLimited,
51    CircuitOpen,
52    Timeout,
53    ContentFiltered,
54    Other,
55}
56
57#[derive(Default)]
58pub struct MetricsActor {
59    state: MetricsSnapshot,
60}
61
62impl MetricsActor {
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    fn entry(&mut self, name: &str) -> &mut DeploymentMetrics {
68        self.state.per_deployment.entry(name.to_string()).or_default()
69    }
70}
71
72#[async_trait]
73impl Actor for MetricsActor {
74    type Msg = MetricsMsg;
75
76    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
77        match msg {
78            MetricsMsg::RecordSuccess {
79                deployment,
80                usage,
81                cost_usd,
82            } => {
83                let e = self.entry(&deployment);
84                e.requests_succeeded += 1;
85                e.usage.add(usage);
86                e.cost_usd += cost_usd;
87                tracing::trace!(deployment, ?usage, cost_usd, "metrics: success");
88            }
89            MetricsMsg::RecordFailure { deployment, kind } => {
90                let e = self.entry(&deployment);
91                e.requests_failed += 1;
92                match kind {
93                    FailureKind::RateLimited => e.rate_limited += 1,
94                    FailureKind::CircuitOpen => e.circuit_open += 1,
95                    FailureKind::Timeout => e.timed_out += 1,
96                    FailureKind::ContentFiltered => e.content_filtered += 1,
97                    FailureKind::Other => {}
98                }
99                tracing::debug!(deployment, ?kind, "metrics: failure");
100            }
101            MetricsMsg::Snapshot { reply } => {
102                let _ = reply.send(self.state.clone());
103            }
104        }
105    }
106}