Skip to main content

agent_sdk/observability/
metrics.rs

1//! `GenAI` client + SDK-specific metric instruments.
2//!
3//! Compiled only with `feature = "otel"`. The module owns a small,
4//! lazily-initialised [`Metrics`] singleton that the rest of the SDK
5//! reaches via [`Metrics::global`]. Every histogram declared here uses
6//! buckets defined by the `OpenTelemetry` `GenAI` metrics semantic
7//! conventions; we never accept the SDK default boundaries because
8//! they are tuned for HTTP timings, not `GenAI` ranges.
9//!
10//! ## Lifecycle
11//!
12//! Instruments are bound to whichever meter provider is current at the
13//! first call to [`Metrics::global`] / [`Metrics::init`].
14//! `agent_sdk_otel::install_global_provider` calls [`Metrics::rebind`]
15//! right after installing the global meter provider so the singleton binds
16//! to the real provider even if an earlier code path lazily built it
17//! against the no-op meter (or a previous provider was replaced). Tests
18//! that swap in a fresh `opentelemetry_sdk::metrics::SdkMeterProvider`
19//! between cases must call [`Metrics::reset_for_testing`] so the next
20//! lookup rebuilds the cache against the new provider.
21//!
22//! ## Naming
23//!
24//! * `gen_ai.client.*` — `OTel` `GenAI` client metrics ([spec][spec]).
25//! * `agent_sdk.*` — SDK-specific instruments. The namespace is shared
26//!   with the span attributes in [`super::attrs`] so dashboards and
27//!   alerts can correlate a metric and a span without translation.
28//!
29//! [spec]: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/
30
31use std::sync::{Arc, RwLock};
32
33use opentelemetry::global;
34use opentelemetry::metrics::{Counter, Histogram};
35
36/// Bucket boundaries for `gen_ai.client.token.usage` (`{token}`).
37///
38/// Mirrors the spec's recommended `ExplicitBucketBoundaries`.
39const TOKEN_USAGE_BUCKETS: &[f64] = &[
40    1.0,
41    4.0,
42    16.0,
43    64.0,
44    256.0,
45    1024.0,
46    4096.0,
47    16_384.0,
48    65_536.0,
49    262_144.0,
50    1_048_576.0,
51    4_194_304.0,
52    16_777_216.0,
53    67_108_864.0,
54];
55
56/// Bucket boundaries for `gen_ai.client.operation.duration` and the
57/// streaming TTFC / TPOC histograms (`s`).
58///
59/// Also reused by `agent_sdk.turns.duration` and
60/// `agent_sdk.mcp.requests.duration` because their typical ranges
61/// overlap LLM call latencies.
62const SHORT_DURATION_BUCKETS_S: &[f64] = &[
63    0.01, 0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56, 5.12, 10.24, 20.48, 40.96, 81.92,
64];
65
66/// Bucket boundaries for `agent_sdk.tools.execution.duration` (`ms`).
67///
68/// Tool execution mixes near-instant `Observe` reads with multi-minute
69/// async work, so we span 1 ms .. 5 minutes with denser buckets at the
70/// short end where most calls land.
71const TOOL_DURATION_BUCKETS_MS: &[f64] = &[
72    1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1_000.0, 5_000.0, 10_000.0, 60_000.0, 300_000.0,
73];
74
75/// Container for every metric instrument the SDK records.
76///
77/// The struct is held behind an `Arc` so call sites that need
78/// frequent recording (per LLM call, per tool call) can clone the
79/// handle into an async future without re-walking a `Mutex`.
80///
81/// The streaming TTFC / TPOC histograms live alongside the
82/// non-streaming `operation_duration` because the recorder
83/// (`agent_loop::llm::process_stream`) treats them as paired
84/// instruments — one fires once per stream, the other once per
85/// post-first chunk.
86#[derive(Debug)]
87pub struct Metrics {
88    pub(crate) token_usage: Histogram<u64>,
89    pub(crate) operation_duration: Histogram<f64>,
90    pub(crate) time_to_first_chunk: Histogram<f64>,
91    pub(crate) time_per_output_chunk: Histogram<f64>,
92
93    pub(crate) turns_duration: Histogram<f64>,
94    pub(crate) runs_outcome: Counter<u64>,
95    pub(crate) tools_execution_duration: Histogram<f64>,
96    pub(crate) tools_execution_count: Counter<u64>,
97    pub(crate) context_compaction: Counter<u64>,
98    pub(crate) context_compaction_tokens_saved: Histogram<u64>,
99    pub(crate) subagent_invocations: Counter<u64>,
100    // Only recorded by the MCP client, which is compiled behind the `mcp`
101    // feature; gated here so the field is not dead code without it.
102    #[cfg(feature = "mcp")]
103    pub(crate) mcp_requests_duration: Histogram<f64>,
104    pub(crate) llm_retries: Counter<u64>,
105}
106
107static METRICS: RwLock<Option<Arc<Metrics>>> = RwLock::new(None);
108
109impl Metrics {
110    /// Build instruments under the supplied meter scope and cache the
111    /// resulting [`Arc<Metrics>`] for subsequent [`Metrics::global`]
112    /// callers.
113    ///
114    /// If the cache is already populated the supplied `name` is
115    /// ignored and the cached handle is returned — the first caller
116    /// in the process wins. Tests that rotate the global meter
117    /// provider between cases must call
118    /// [`Metrics::reset_for_testing`] beforehand so the next `init`
119    /// rebuilds against the fresh provider.
120    #[must_use]
121    pub fn init(name: &'static str) -> Arc<Self> {
122        if let Some(existing) = read_cached() {
123            return existing;
124        }
125
126        let built = Arc::new(Self::build(name));
127        write_cached(Arc::clone(&built));
128        built
129    }
130
131    /// Convenience wrapper that initialises the singleton under the
132    /// `agent-sdk` meter scope.
133    #[must_use]
134    pub fn global() -> Arc<Self> {
135        Self::init(env!("CARGO_PKG_NAME"))
136    }
137
138    /// Drop the cached instrument singleton so the next
139    /// [`Metrics::global`] call rebuilds against the **currently
140    /// installed** global meter provider.
141    ///
142    /// `agent_sdk_otel::install_global_provider` calls this immediately
143    /// after `global::set_meter_provider`. Without it, any telemetry path
144    /// that lazily built the singleton before the provider was installed
145    /// (or before a re-install) would stay bound to the no-op meter — or a
146    /// now-shut-down provider — for the rest of the process, silently
147    /// dropping every counter / histogram. Calling it at install time is
148    /// safe because no real data points exist yet; it must NOT be used
149    /// mid-run, where rebuilding would lose in-flight aggregation.
150    pub fn rebind() {
151        clear_cache();
152    }
153
154    /// Drop the cached singleton.
155    ///
156    /// Test-only escape hatch so tests that rotate the global meter
157    /// provider between cases force a rebuild against the fresh provider.
158    pub fn reset_for_testing() {
159        clear_cache();
160    }
161
162    /// Record the `gen_ai.client.token.usage` histogram for a chat
163    /// response, splitting one data point per non-zero token type
164    /// (`input` / `output` / `cache_read` / `cache_creation`).
165    ///
166    /// This is the single source of truth for the token-usage label
167    /// set so the in-process `agent_loop` and the
168    /// daemon-hosted `agent-server` worker emit byte-identical labels.
169    /// Splitting by type keeps the histogram aggregatable in
170    /// Prometheus / Grafana — collapsing the four types into one
171    /// record would erase the cache-hit-ratio dimension dashboards
172    /// care about most.
173    pub fn record_chat_token_usage(
174        &self,
175        usage: &crate::llm::Usage,
176        provider_name: &'static str,
177        request_model: &str,
178        response_model: &str,
179    ) {
180        use opentelemetry::KeyValue;
181
182        let entries: [(u32, &'static str); 4] = [
183            (usage.input_tokens, "input"),
184            (usage.output_tokens, "output"),
185            (usage.cached_input_tokens, "cache_read"),
186            (usage.cache_creation_input_tokens, "cache_creation"),
187        ];
188
189        for (count, token_type) in entries {
190            if count == 0 {
191                continue;
192            }
193            self.token_usage.record(
194                u64::from(count),
195                &[
196                    KeyValue::new(super::attrs::GEN_AI_OPERATION_NAME, "chat"),
197                    KeyValue::new(super::attrs::GEN_AI_PROVIDER_NAME, provider_name),
198                    KeyValue::new("gen_ai.token.type", token_type),
199                    KeyValue::new(
200                        super::attrs::GEN_AI_REQUEST_MODEL,
201                        request_model.to_string(),
202                    ),
203                    KeyValue::new(
204                        super::attrs::GEN_AI_RESPONSE_MODEL,
205                        response_model.to_string(),
206                    ),
207                ],
208            );
209        }
210    }
211
212    /// Record a `gen_ai.client.operation.duration` sample for a
213    /// successful chat call. The label set mirrors the success arm of
214    /// the in-process loop so both code paths land in the same series.
215    pub fn record_chat_operation_duration_success(
216        &self,
217        elapsed_secs: f64,
218        provider_name: &'static str,
219        request_model: &str,
220        response_model: &str,
221    ) {
222        use opentelemetry::KeyValue;
223
224        self.operation_duration.record(
225            elapsed_secs,
226            &[
227                KeyValue::new(super::attrs::GEN_AI_OPERATION_NAME, "chat"),
228                KeyValue::new(super::attrs::GEN_AI_PROVIDER_NAME, provider_name),
229                KeyValue::new(
230                    super::attrs::GEN_AI_REQUEST_MODEL,
231                    request_model.to_string(),
232                ),
233                KeyValue::new(
234                    super::attrs::GEN_AI_RESPONSE_MODEL,
235                    response_model.to_string(),
236                ),
237            ],
238        );
239    }
240
241    /// Record a `gen_ai.client.operation.duration` sample for a failed
242    /// chat call, carrying the stable `error.type` label in place of
243    /// the response model. Mirrors the error arm of the in-process
244    /// loop.
245    pub fn record_chat_operation_duration_error(
246        &self,
247        elapsed_secs: f64,
248        provider_name: &'static str,
249        request_model: &str,
250        error_type: &'static str,
251    ) {
252        use opentelemetry::KeyValue;
253
254        self.operation_duration.record(
255            elapsed_secs,
256            &[
257                KeyValue::new(super::attrs::GEN_AI_OPERATION_NAME, "chat"),
258                KeyValue::new(super::attrs::GEN_AI_PROVIDER_NAME, provider_name),
259                KeyValue::new(
260                    super::attrs::GEN_AI_REQUEST_MODEL,
261                    request_model.to_string(),
262                ),
263                KeyValue::new(super::attrs::ERROR_TYPE, error_type),
264            ],
265        );
266    }
267
268    /// Record the `agent_sdk.tools.execution.count` counter and, when a
269    /// duration is known, the `agent_sdk.tools.execution.duration`
270    /// histogram for a single tool invocation.
271    ///
272    /// `outcome` is one of the stable strings emitted by the loop
273    /// (`success` / `error` / `blocked` / `rejected` /
274    /// `awaiting_confirmation`). Both instruments share the same three
275    /// labels (`gen_ai.tool.name`, `agent_sdk.tool.kind`,
276    /// `agent_sdk.tool.outcome`) so a dashboard can join the count and
277    /// the duration without translation.
278    pub fn record_tool_execution(
279        &self,
280        tool_name: &str,
281        tool_kind: &'static str,
282        outcome: &'static str,
283        duration_ms: Option<u64>,
284    ) {
285        use opentelemetry::KeyValue;
286
287        let metric_attrs = [
288            KeyValue::new(super::attrs::GEN_AI_TOOL_NAME, tool_name.to_string()),
289            KeyValue::new(super::attrs::SDK_TOOL_KIND, tool_kind),
290            KeyValue::new(super::attrs::SDK_TOOL_OUTCOME, outcome),
291        ];
292        self.tools_execution_count.add(1, &metric_attrs);
293        if let Some(ms) = duration_ms {
294            self.tools_execution_duration
295                .record(tool_duration_ms_to_f64(ms), &metric_attrs);
296        }
297    }
298
299    fn build(scope: &'static str) -> Self {
300        let meter = global::meter(scope);
301
302        let token_usage = meter
303            .u64_histogram("gen_ai.client.token.usage")
304            .with_unit("{token}")
305            .with_description("Number of input and output tokens used.")
306            .with_boundaries(TOKEN_USAGE_BUCKETS.to_vec())
307            .build();
308        let operation_duration = meter
309            .f64_histogram("gen_ai.client.operation.duration")
310            .with_unit("s")
311            .with_description("GenAI operation duration.")
312            .with_boundaries(SHORT_DURATION_BUCKETS_S.to_vec())
313            .build();
314        let time_to_first_chunk = meter
315            .f64_histogram("gen_ai.client.operation.time_to_first_chunk")
316            .with_unit("s")
317            .with_description("Time to first response chunk for streaming GenAI operations.")
318            .with_boundaries(SHORT_DURATION_BUCKETS_S.to_vec())
319            .build();
320        let time_per_output_chunk = meter
321            .f64_histogram("gen_ai.client.operation.time_per_output_chunk")
322            .with_unit("s")
323            .with_description(
324                "Time per output chunk after the first chunk for streaming GenAI operations.",
325            )
326            .with_boundaries(SHORT_DURATION_BUCKETS_S.to_vec())
327            .build();
328
329        let turns_duration = meter
330            .f64_histogram("agent_sdk.turns.duration")
331            .with_unit("s")
332            .with_description("Wall-clock duration of a single agent turn.")
333            .with_boundaries(SHORT_DURATION_BUCKETS_S.to_vec())
334            .build();
335        let runs_outcome = meter
336            .u64_counter("agent_sdk.runs.outcome")
337            .with_description("Count of completed agent runs by outcome.")
338            .build();
339        let tools_execution_duration = meter
340            .f64_histogram("agent_sdk.tools.execution.duration")
341            .with_unit("ms")
342            .with_description("Duration of a single tool invocation.")
343            .with_boundaries(TOOL_DURATION_BUCKETS_MS.to_vec())
344            .build();
345        let tools_execution_count = meter
346            .u64_counter("agent_sdk.tools.execution.count")
347            .with_description("Count of tool invocations by name, kind, and outcome.")
348            .build();
349        let context_compaction = meter
350            .u64_counter("agent_sdk.context.compaction")
351            .with_description("Count of context-compaction operations by trigger.")
352            .build();
353        let context_compaction_tokens_saved = meter
354            .u64_histogram("agent_sdk.context.compaction.tokens_saved")
355            .with_unit("{token}")
356            .with_description("Tokens saved by a single compaction operation.")
357            .with_boundaries(TOKEN_USAGE_BUCKETS.to_vec())
358            .build();
359        let subagent_invocations = meter
360            .u64_counter("agent_sdk.subagent.invocations")
361            .with_description("Count of subagent invocations by agent name and outcome.")
362            .build();
363        #[cfg(feature = "mcp")]
364        let mcp_requests_duration = meter
365            .f64_histogram("agent_sdk.mcp.requests.duration")
366            .with_unit("s")
367            .with_description("Duration of an MCP JSON-RPC client request.")
368            .with_boundaries(SHORT_DURATION_BUCKETS_S.to_vec())
369            .build();
370        let llm_retries = meter
371            .u64_counter("agent_sdk.llm.retries")
372            .with_description("Count of LLM call retries by provider and error type.")
373            .build();
374
375        Self {
376            token_usage,
377            operation_duration,
378            time_to_first_chunk,
379            time_per_output_chunk,
380            turns_duration,
381            runs_outcome,
382            tools_execution_duration,
383            tools_execution_count,
384            context_compaction,
385            context_compaction_tokens_saved,
386            subagent_invocations,
387            #[cfg(feature = "mcp")]
388            mcp_requests_duration,
389            llm_retries,
390        }
391    }
392}
393
394/// Convert a `ToolResult::duration_ms` value (`u64` milliseconds)
395/// into a histogram-friendly `f64`.
396///
397/// Bounding through `u32` keeps the conversion lossless because
398/// `u32::MAX` ≈ 49.7 days — far above the histogram's 5-minute top
399/// bucket, so any clamped value still falls in the overflow bucket
400/// dashboards expect. The clamp path also emits a `warn!` so a real
401/// runaway duration is investigable rather than silently swallowed.
402#[must_use]
403pub fn tool_duration_ms_to_f64(ms: u64) -> f64 {
404    if let Ok(v) = u32::try_from(ms) {
405        return f64::from(v);
406    }
407    log::warn!("tool duration {ms}ms exceeds u32::MAX; clamping for histogram");
408    f64::from(u32::MAX)
409}
410
411fn clear_cache() {
412    let mut guard = match METRICS.write() {
413        Ok(g) => g,
414        Err(poisoned) => poisoned.into_inner(),
415    };
416    *guard = None;
417}
418
419fn read_cached() -> Option<Arc<Metrics>> {
420    let guard = match METRICS.read() {
421        Ok(g) => g,
422        Err(poisoned) => poisoned.into_inner(),
423    };
424    guard.as_ref().map(Arc::clone)
425}
426
427fn write_cached(value: Arc<Metrics>) {
428    let mut guard = match METRICS.write() {
429        Ok(g) => g,
430        Err(poisoned) => poisoned.into_inner(),
431    };
432    if guard.is_none() {
433        *guard = Some(value);
434    }
435    // If a concurrent caller raced us in, drop the freshly-built
436    // handle on the floor. Both copies are functionally equivalent
437    // (same scope, same global meter), so the duplicate is harmless.
438}
439
440#[cfg(test)]
441mod tests {
442    use super::Metrics;
443
444    #[test]
445    fn rebind_forces_fresh_instruments_against_current_provider() {
446        // Populate the cache, then rebind: the next `global()` must rebuild
447        // a distinct handle bound to whatever provider is now installed,
448        // rather than returning the stale (possibly no-op-bound) singleton.
449        let first = Metrics::global();
450        Metrics::rebind();
451        let second = Metrics::global();
452        assert!(
453            !std::sync::Arc::ptr_eq(&first, &second),
454            "rebind must clear the cache so the next global() rebuilds"
455        );
456    }
457}