Skip to main content

claude_agent/observability/
otel.rs

1//! OpenTelemetry integration for tracing and metrics export.
2//!
3//! This module provides OpenTelemetry SDK initialization and configuration
4//! for exporting traces and metrics to OTLP-compatible backends.
5
6use std::time::Duration;
7
8use opentelemetry::{KeyValue, global};
9use opentelemetry_otlp::{MetricExporter, Protocol, SpanExporter, WithExportConfig};
10use opentelemetry_sdk::Resource;
11use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
12use opentelemetry_sdk::propagation::TraceContextPropagator;
13use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
14use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
15use rust_decimal::Decimal;
16use rust_decimal::prelude::ToPrimitive;
17use tracing_opentelemetry::OpenTelemetryLayer;
18use tracing_subscriber::EnvFilter;
19use tracing_subscriber::layer::SubscriberExt;
20use tracing_subscriber::util::SubscriberInitExt;
21
22/// Default service name for OpenTelemetry instrumentation.
23pub const SERVICE_NAME_DEFAULT: &str = "claude-agent";
24
25/// OpenTelemetry configuration for the agent.
26#[derive(Debug, Clone)]
27pub struct OtelConfig {
28    pub service_name: String,
29    pub service_version: Option<String>,
30    pub otlp_endpoint: String,
31    pub traces_enabled: bool,
32    pub metrics_enabled: bool,
33    pub metrics_export_interval: Duration,
34    pub sample_ratio: f64,
35}
36
37impl Default for OtelConfig {
38    fn default() -> Self {
39        Self {
40            service_name: SERVICE_NAME_DEFAULT.to_string(),
41            service_version: Some(env!("CARGO_PKG_VERSION").to_string()),
42            otlp_endpoint: "http://localhost:4317".to_string(),
43            traces_enabled: true,
44            metrics_enabled: true,
45            metrics_export_interval: Duration::from_secs(60),
46            sample_ratio: 1.0,
47        }
48    }
49}
50
51impl OtelConfig {
52    pub fn new(service_name: impl Into<String>) -> Self {
53        Self {
54            service_name: service_name.into(),
55            ..Default::default()
56        }
57    }
58
59    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
60        self.otlp_endpoint = endpoint.into();
61        self
62    }
63
64    pub fn service_version(mut self, version: impl Into<String>) -> Self {
65        self.service_version = Some(version.into());
66        self
67    }
68
69    pub fn traces(mut self, enabled: bool) -> Self {
70        self.traces_enabled = enabled;
71        self
72    }
73
74    pub fn metrics(mut self, enabled: bool) -> Self {
75        self.metrics_enabled = enabled;
76        self
77    }
78
79    pub fn metrics_interval(mut self, interval: Duration) -> Self {
80        self.metrics_export_interval = interval;
81        self
82    }
83
84    pub fn sample_ratio(mut self, ratio: f64) -> Self {
85        self.sample_ratio = ratio.clamp(0.0, 1.0);
86        self
87    }
88
89    pub fn from_env() -> Self {
90        let mut config = Self::default();
91
92        if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
93            config.otlp_endpoint = endpoint;
94        }
95
96        if let Ok(name) = std::env::var("OTEL_SERVICE_NAME") {
97            config.service_name = name;
98        }
99
100        if let Ok(ratio) = std::env::var("OTEL_TRACES_SAMPLER_ARG")
101            && let Ok(r) = ratio.parse::<f64>()
102        {
103            config.sample_ratio = r.clamp(0.0, 1.0);
104        }
105
106        config
107    }
108
109    fn build_resource(&self) -> Resource {
110        let mut attributes = vec![KeyValue::new(SERVICE_NAME, self.service_name.clone())];
111
112        if let Some(ref version) = self.service_version {
113            attributes.push(KeyValue::new(SERVICE_VERSION, version.clone()));
114        }
115
116        Resource::builder().with_attributes(attributes).build()
117    }
118}
119
120/// OpenTelemetry runtime handle.
121///
122/// Holds references to the tracer and meter providers.
123/// Call `shutdown()` before application exit to flush pending data.
124pub struct OtelRuntime {
125    tracer_provider: Option<SdkTracerProvider>,
126    meter_provider: Option<SdkMeterProvider>,
127}
128
129impl OtelRuntime {
130    /// Initialize OpenTelemetry with the given configuration.
131    pub fn init(config: &OtelConfig) -> Result<Self, OtelError> {
132        global::set_text_map_propagator(TraceContextPropagator::new());
133
134        let resource = config.build_resource();
135
136        let tracer_provider = if config.traces_enabled {
137            Some(Self::init_tracer(config, resource.clone())?)
138        } else {
139            None
140        };
141
142        let meter_provider = if config.metrics_enabled {
143            Some(Self::init_metrics(config, resource)?)
144        } else {
145            None
146        };
147
148        Ok(Self {
149            tracer_provider,
150            meter_provider,
151        })
152    }
153
154    fn init_tracer(
155        config: &OtelConfig,
156        resource: Resource,
157    ) -> Result<SdkTracerProvider, OtelError> {
158        let exporter = SpanExporter::builder()
159            .with_http()
160            .with_protocol(Protocol::HttpBinary)
161            .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
162            .build()
163            .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
164
165        let sampler = if config.sample_ratio >= 1.0 {
166            Sampler::AlwaysOn
167        } else if config.sample_ratio <= 0.0 {
168            Sampler::AlwaysOff
169        } else {
170            Sampler::TraceIdRatioBased(config.sample_ratio)
171        };
172
173        let provider = SdkTracerProvider::builder()
174            .with_batch_exporter(exporter)
175            .with_sampler(sampler)
176            .with_id_generator(RandomIdGenerator::default())
177            .with_resource(resource)
178            .build();
179
180        global::set_tracer_provider(provider.clone());
181
182        Ok(provider)
183    }
184
185    fn init_metrics(
186        config: &OtelConfig,
187        resource: Resource,
188    ) -> Result<SdkMeterProvider, OtelError> {
189        let exporter = MetricExporter::builder()
190            .with_http()
191            .with_protocol(Protocol::HttpBinary)
192            .with_endpoint(format!("{}/v1/metrics", config.otlp_endpoint))
193            .build()
194            .map_err(|e| OtelError::Init(format!("Failed to create metric exporter: {}", e)))?;
195
196        let reader = PeriodicReader::builder(exporter)
197            .with_interval(config.metrics_export_interval)
198            .build();
199
200        let provider = SdkMeterProvider::builder()
201            .with_reader(reader)
202            .with_resource(resource)
203            .build();
204
205        global::set_meter_provider(provider.clone());
206
207        Ok(provider)
208    }
209
210    /// Get the global meter for recording metrics.
211    pub fn meter(&self, name: &'static str) -> opentelemetry::metrics::Meter {
212        global::meter(name)
213    }
214
215    /// Shutdown the OpenTelemetry runtime, flushing any pending data.
216    pub fn shutdown(self) {
217        if let Some(provider) = self.tracer_provider
218            && let Err(e) = provider.shutdown()
219        {
220            tracing::warn!("Failed to shutdown tracer provider: {:?}", e);
221        }
222
223        if let Some(provider) = self.meter_provider
224            && let Err(e) = provider.shutdown()
225        {
226            tracing::warn!("Failed to shutdown meter provider: {:?}", e);
227        }
228    }
229}
230
231/// Initialize tracing subscriber with OpenTelemetry layer.
232///
233/// This sets up the global tracing subscriber with:
234/// - Console output (if enabled)
235/// - OpenTelemetry trace export
236pub fn init_tracing_subscriber(config: &OtelConfig, with_console: bool) -> Result<(), OtelError> {
237    let resource = config.build_resource();
238
239    let exporter = SpanExporter::builder()
240        .with_http()
241        .with_protocol(Protocol::HttpBinary)
242        .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
243        .build()
244        .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
245
246    let sampler = if config.sample_ratio >= 1.0 {
247        Sampler::AlwaysOn
248    } else if config.sample_ratio <= 0.0 {
249        Sampler::AlwaysOff
250    } else {
251        Sampler::TraceIdRatioBased(config.sample_ratio)
252    };
253
254    let provider = SdkTracerProvider::builder()
255        .with_batch_exporter(exporter)
256        .with_sampler(sampler)
257        .with_id_generator(RandomIdGenerator::default())
258        .with_resource(resource)
259        .build();
260
261    global::set_tracer_provider(provider);
262
263    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
264
265    if with_console {
266        let fmt_layer = tracing_subscriber::fmt::layer()
267            .with_target(true)
268            .with_thread_ids(false)
269            .with_file(false);
270
271        tracing_subscriber::registry()
272            .with(env_filter)
273            .with(fmt_layer)
274            .with(OpenTelemetryLayer::new(global::tracer(
275                SERVICE_NAME_DEFAULT,
276            )))
277            .try_init()
278            .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
279    } else {
280        tracing_subscriber::registry()
281            .with(env_filter)
282            .with(OpenTelemetryLayer::new(global::tracer(
283                SERVICE_NAME_DEFAULT,
284            )))
285            .try_init()
286            .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
287    }
288
289    Ok(())
290}
291
292/// Errors that can occur during OpenTelemetry initialization.
293#[derive(Debug, thiserror::Error)]
294pub enum OtelError {
295    #[error("OpenTelemetry initialization failed: {0}")]
296    Init(String),
297}
298
299/// Semantic conventions for agent-specific attributes.
300pub mod semantic {
301    pub const AGENT_SESSION_ID: &str = "agent.session.id";
302    pub const AGENT_MODEL: &str = "agent.model";
303    pub const AGENT_REQUEST_ID: &str = "agent.request.id";
304    pub const AGENT_TOOL_NAME: &str = "agent.tool.name";
305    pub const AGENT_TOOL_USE_ID: &str = "agent.tool.use_id";
306    pub const AGENT_INPUT_TOKENS: &str = "agent.tokens.input";
307    pub const AGENT_OUTPUT_TOKENS: &str = "agent.tokens.output";
308    pub const AGENT_CACHE_READ_TOKENS: &str = "agent.tokens.cache_read";
309    pub const AGENT_CACHE_CREATION_TOKENS: &str = "agent.tokens.cache_creation";
310    pub const AGENT_COST_USD: &str = "agent.cost.usd";
311}
312
313/// OpenTelemetry metrics bridge for the built-in MetricsRegistry.
314pub struct OtelMetricsBridge {
315    requests_total: opentelemetry::metrics::Counter<u64>,
316    requests_success: opentelemetry::metrics::Counter<u64>,
317    requests_error: opentelemetry::metrics::Counter<u64>,
318    tokens_input: opentelemetry::metrics::Counter<u64>,
319    tokens_output: opentelemetry::metrics::Counter<u64>,
320    cache_read_tokens: opentelemetry::metrics::Counter<u64>,
321    cache_creation_tokens: opentelemetry::metrics::Counter<u64>,
322    tool_calls_total: opentelemetry::metrics::Counter<u64>,
323    tool_errors: opentelemetry::metrics::Counter<u64>,
324    active_sessions: opentelemetry::metrics::UpDownCounter<i64>,
325    request_latency: opentelemetry::metrics::Histogram<f64>,
326    cost_total: opentelemetry::metrics::Counter<f64>,
327}
328
329impl OtelMetricsBridge {
330    pub fn new(meter: &opentelemetry::metrics::Meter) -> Self {
331        Self {
332            requests_total: meter
333                .u64_counter("agent.requests.total")
334                .with_description("Total number of API requests")
335                .build(),
336            requests_success: meter
337                .u64_counter("agent.requests.success")
338                .with_description("Number of successful API requests")
339                .build(),
340            requests_error: meter
341                .u64_counter("agent.requests.error")
342                .with_description("Number of failed API requests")
343                .build(),
344            tokens_input: meter
345                .u64_counter("agent.tokens.input")
346                .with_description("Total input tokens consumed")
347                .build(),
348            tokens_output: meter
349                .u64_counter("agent.tokens.output")
350                .with_description("Total output tokens generated")
351                .build(),
352            cache_read_tokens: meter
353                .u64_counter("agent.tokens.cache_read")
354                .with_description("Total cache read tokens")
355                .build(),
356            cache_creation_tokens: meter
357                .u64_counter("agent.tokens.cache_creation")
358                .with_description("Total cache creation tokens")
359                .build(),
360            tool_calls_total: meter
361                .u64_counter("agent.tool_calls.total")
362                .with_description("Total number of tool calls")
363                .build(),
364            tool_errors: meter
365                .u64_counter("agent.tool_calls.error")
366                .with_description("Number of failed tool calls")
367                .build(),
368            active_sessions: meter
369                .i64_up_down_counter("agent.sessions.active")
370                .with_description("Number of active sessions")
371                .build(),
372            request_latency: meter
373                .f64_histogram("agent.request.latency")
374                .with_description("Request latency in milliseconds")
375                .with_unit("ms")
376                .build(),
377            cost_total: meter
378                .f64_counter("agent.cost.total")
379                .with_description("Total cost in USD")
380                .with_unit("USD")
381                .build(),
382        }
383    }
384
385    pub fn record_request_start(&self) {
386        self.requests_total.add(1, &[]);
387        self.active_sessions.add(1, &[]);
388    }
389
390    pub fn record_request_end(&self, success: bool, latency_ms: f64) {
391        self.active_sessions.add(-1, &[]);
392        self.request_latency.record(latency_ms, &[]);
393        if success {
394            self.requests_success.add(1, &[]);
395        } else {
396            self.requests_error.add(1, &[]);
397        }
398    }
399
400    pub fn record_tokens(&self, input: u64, output: u64) {
401        self.tokens_input.add(input, &[]);
402        self.tokens_output.add(output, &[]);
403    }
404
405    pub fn record_cache(&self, read: u64, creation: u64) {
406        self.cache_read_tokens.add(read, &[]);
407        self.cache_creation_tokens.add(creation, &[]);
408    }
409
410    pub fn record_tool_call(&self, success: bool) {
411        self.tool_calls_total.add(1, &[]);
412        if !success {
413            self.tool_errors.add(1, &[]);
414        }
415    }
416
417    pub fn record_cost(&self, cost_usd: Decimal) {
418        // Convert Decimal to f64 at OpenTelemetry boundary
419        let cost_f64 = cost_usd.to_f64().unwrap_or(0.0);
420        self.cost_total.add(cost_f64, &[]);
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427
428    #[test]
429    fn test_otel_config_default() {
430        let config = OtelConfig::default();
431        assert_eq!(config.service_name, "claude-agent");
432        assert!(config.traces_enabled);
433        assert!(config.metrics_enabled);
434        assert_eq!(config.sample_ratio, 1.0);
435    }
436
437    #[test]
438    fn test_otel_config_builder() {
439        let config = OtelConfig::new("my-agent")
440            .endpoint("http://otel-collector:4317")
441            .sample_ratio(0.5)
442            .metrics_interval(Duration::from_secs(30));
443
444        assert_eq!(config.service_name, "my-agent");
445        assert_eq!(config.otlp_endpoint, "http://otel-collector:4317");
446        assert_eq!(config.sample_ratio, 0.5);
447        assert_eq!(config.metrics_export_interval, Duration::from_secs(30));
448    }
449
450    #[test]
451    fn test_sample_ratio_clamping() {
452        let config = OtelConfig::default().sample_ratio(1.5);
453        assert_eq!(config.sample_ratio, 1.0);
454
455        let config = OtelConfig::default().sample_ratio(-0.5);
456        assert_eq!(config.sample_ratio, 0.0);
457    }
458}