claude_agent/observability/
otel.rs1use std::time::Duration;
7
8use opentelemetry::{KeyValue, global};
9use opentelemetry_otlp::{MetricExporter, Protocol, SpanExporter, WithExportConfig};
10use opentelemetry_sdk::Resource;
11use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
12use opentelemetry_sdk::propagation::TraceContextPropagator;
13use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
14use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
15use rust_decimal::Decimal;
16use rust_decimal::prelude::ToPrimitive;
17use tracing_opentelemetry::OpenTelemetryLayer;
18use tracing_subscriber::EnvFilter;
19use tracing_subscriber::layer::SubscriberExt;
20use tracing_subscriber::util::SubscriberInitExt;
21
22pub const SERVICE_NAME_DEFAULT: &str = "claude-agent";
24
25#[derive(Debug, Clone)]
27pub struct OtelConfig {
28 pub service_name: String,
29 pub service_version: Option<String>,
30 pub otlp_endpoint: String,
31 pub traces_enabled: bool,
32 pub metrics_enabled: bool,
33 pub metrics_export_interval: Duration,
34 pub sample_ratio: f64,
35}
36
37impl Default for OtelConfig {
38 fn default() -> Self {
39 Self {
40 service_name: SERVICE_NAME_DEFAULT.to_string(),
41 service_version: Some(env!("CARGO_PKG_VERSION").to_string()),
42 otlp_endpoint: "http://localhost:4317".to_string(),
43 traces_enabled: true,
44 metrics_enabled: true,
45 metrics_export_interval: Duration::from_secs(60),
46 sample_ratio: 1.0,
47 }
48 }
49}
50
51impl OtelConfig {
52 pub fn new(service_name: impl Into<String>) -> Self {
53 Self {
54 service_name: service_name.into(),
55 ..Default::default()
56 }
57 }
58
59 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
60 self.otlp_endpoint = endpoint.into();
61 self
62 }
63
64 pub fn service_version(mut self, version: impl Into<String>) -> Self {
65 self.service_version = Some(version.into());
66 self
67 }
68
69 pub fn traces(mut self, enabled: bool) -> Self {
70 self.traces_enabled = enabled;
71 self
72 }
73
74 pub fn metrics(mut self, enabled: bool) -> Self {
75 self.metrics_enabled = enabled;
76 self
77 }
78
79 pub fn metrics_interval(mut self, interval: Duration) -> Self {
80 self.metrics_export_interval = interval;
81 self
82 }
83
84 pub fn sample_ratio(mut self, ratio: f64) -> Self {
85 self.sample_ratio = ratio.clamp(0.0, 1.0);
86 self
87 }
88
89 pub fn from_env() -> Self {
90 let mut config = Self::default();
91
92 if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
93 config.otlp_endpoint = endpoint;
94 }
95
96 if let Ok(name) = std::env::var("OTEL_SERVICE_NAME") {
97 config.service_name = name;
98 }
99
100 if let Ok(ratio) = std::env::var("OTEL_TRACES_SAMPLER_ARG")
101 && let Ok(r) = ratio.parse::<f64>()
102 {
103 config.sample_ratio = r.clamp(0.0, 1.0);
104 }
105
106 config
107 }
108
109 fn build_resource(&self) -> Resource {
110 let mut attributes = vec![KeyValue::new(SERVICE_NAME, self.service_name.clone())];
111
112 if let Some(ref version) = self.service_version {
113 attributes.push(KeyValue::new(SERVICE_VERSION, version.clone()));
114 }
115
116 Resource::builder().with_attributes(attributes).build()
117 }
118}
119
120pub struct OtelRuntime {
125 tracer_provider: Option<SdkTracerProvider>,
126 meter_provider: Option<SdkMeterProvider>,
127}
128
129impl OtelRuntime {
130 pub fn init(config: &OtelConfig) -> Result<Self, OtelError> {
132 global::set_text_map_propagator(TraceContextPropagator::new());
133
134 let resource = config.build_resource();
135
136 let tracer_provider = if config.traces_enabled {
137 Some(Self::init_tracer(config, resource.clone())?)
138 } else {
139 None
140 };
141
142 let meter_provider = if config.metrics_enabled {
143 Some(Self::init_metrics(config, resource)?)
144 } else {
145 None
146 };
147
148 Ok(Self {
149 tracer_provider,
150 meter_provider,
151 })
152 }
153
154 fn init_tracer(
155 config: &OtelConfig,
156 resource: Resource,
157 ) -> Result<SdkTracerProvider, OtelError> {
158 let exporter = SpanExporter::builder()
159 .with_http()
160 .with_protocol(Protocol::HttpBinary)
161 .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
162 .build()
163 .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
164
165 let sampler = if config.sample_ratio >= 1.0 {
166 Sampler::AlwaysOn
167 } else if config.sample_ratio <= 0.0 {
168 Sampler::AlwaysOff
169 } else {
170 Sampler::TraceIdRatioBased(config.sample_ratio)
171 };
172
173 let provider = SdkTracerProvider::builder()
174 .with_batch_exporter(exporter)
175 .with_sampler(sampler)
176 .with_id_generator(RandomIdGenerator::default())
177 .with_resource(resource)
178 .build();
179
180 global::set_tracer_provider(provider.clone());
181
182 Ok(provider)
183 }
184
185 fn init_metrics(
186 config: &OtelConfig,
187 resource: Resource,
188 ) -> Result<SdkMeterProvider, OtelError> {
189 let exporter = MetricExporter::builder()
190 .with_http()
191 .with_protocol(Protocol::HttpBinary)
192 .with_endpoint(format!("{}/v1/metrics", config.otlp_endpoint))
193 .build()
194 .map_err(|e| OtelError::Init(format!("Failed to create metric exporter: {}", e)))?;
195
196 let reader = PeriodicReader::builder(exporter)
197 .with_interval(config.metrics_export_interval)
198 .build();
199
200 let provider = SdkMeterProvider::builder()
201 .with_reader(reader)
202 .with_resource(resource)
203 .build();
204
205 global::set_meter_provider(provider.clone());
206
207 Ok(provider)
208 }
209
210 pub fn meter(&self, name: &'static str) -> opentelemetry::metrics::Meter {
212 global::meter(name)
213 }
214
215 pub fn shutdown(self) {
217 if let Some(provider) = self.tracer_provider
218 && let Err(e) = provider.shutdown()
219 {
220 tracing::warn!("Failed to shutdown tracer provider: {:?}", e);
221 }
222
223 if let Some(provider) = self.meter_provider
224 && let Err(e) = provider.shutdown()
225 {
226 tracing::warn!("Failed to shutdown meter provider: {:?}", e);
227 }
228 }
229}
230
231pub fn init_tracing_subscriber(config: &OtelConfig, with_console: bool) -> Result<(), OtelError> {
237 let resource = config.build_resource();
238
239 let exporter = SpanExporter::builder()
240 .with_http()
241 .with_protocol(Protocol::HttpBinary)
242 .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
243 .build()
244 .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
245
246 let sampler = if config.sample_ratio >= 1.0 {
247 Sampler::AlwaysOn
248 } else if config.sample_ratio <= 0.0 {
249 Sampler::AlwaysOff
250 } else {
251 Sampler::TraceIdRatioBased(config.sample_ratio)
252 };
253
254 let provider = SdkTracerProvider::builder()
255 .with_batch_exporter(exporter)
256 .with_sampler(sampler)
257 .with_id_generator(RandomIdGenerator::default())
258 .with_resource(resource)
259 .build();
260
261 global::set_tracer_provider(provider);
262
263 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
264
265 if with_console {
266 let fmt_layer = tracing_subscriber::fmt::layer()
267 .with_target(true)
268 .with_thread_ids(false)
269 .with_file(false);
270
271 tracing_subscriber::registry()
272 .with(env_filter)
273 .with(fmt_layer)
274 .with(OpenTelemetryLayer::new(global::tracer(
275 SERVICE_NAME_DEFAULT,
276 )))
277 .try_init()
278 .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
279 } else {
280 tracing_subscriber::registry()
281 .with(env_filter)
282 .with(OpenTelemetryLayer::new(global::tracer(
283 SERVICE_NAME_DEFAULT,
284 )))
285 .try_init()
286 .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
287 }
288
289 Ok(())
290}
291
292#[derive(Debug, thiserror::Error)]
294pub enum OtelError {
295 #[error("OpenTelemetry initialization failed: {0}")]
296 Init(String),
297}
298
299pub mod semantic {
301 pub const AGENT_SESSION_ID: &str = "agent.session.id";
302 pub const AGENT_MODEL: &str = "agent.model";
303 pub const AGENT_REQUEST_ID: &str = "agent.request.id";
304 pub const AGENT_TOOL_NAME: &str = "agent.tool.name";
305 pub const AGENT_TOOL_USE_ID: &str = "agent.tool.use_id";
306 pub const AGENT_INPUT_TOKENS: &str = "agent.tokens.input";
307 pub const AGENT_OUTPUT_TOKENS: &str = "agent.tokens.output";
308 pub const AGENT_CACHE_READ_TOKENS: &str = "agent.tokens.cache_read";
309 pub const AGENT_CACHE_CREATION_TOKENS: &str = "agent.tokens.cache_creation";
310 pub const AGENT_COST_USD: &str = "agent.cost.usd";
311}
312
313pub struct OtelMetricsBridge {
315 requests_total: opentelemetry::metrics::Counter<u64>,
316 requests_success: opentelemetry::metrics::Counter<u64>,
317 requests_error: opentelemetry::metrics::Counter<u64>,
318 tokens_input: opentelemetry::metrics::Counter<u64>,
319 tokens_output: opentelemetry::metrics::Counter<u64>,
320 cache_read_tokens: opentelemetry::metrics::Counter<u64>,
321 cache_creation_tokens: opentelemetry::metrics::Counter<u64>,
322 tool_calls_total: opentelemetry::metrics::Counter<u64>,
323 tool_errors: opentelemetry::metrics::Counter<u64>,
324 active_sessions: opentelemetry::metrics::UpDownCounter<i64>,
325 request_latency: opentelemetry::metrics::Histogram<f64>,
326 cost_total: opentelemetry::metrics::Counter<f64>,
327}
328
329impl OtelMetricsBridge {
330 pub fn new(meter: &opentelemetry::metrics::Meter) -> Self {
331 Self {
332 requests_total: meter
333 .u64_counter("agent.requests.total")
334 .with_description("Total number of API requests")
335 .build(),
336 requests_success: meter
337 .u64_counter("agent.requests.success")
338 .with_description("Number of successful API requests")
339 .build(),
340 requests_error: meter
341 .u64_counter("agent.requests.error")
342 .with_description("Number of failed API requests")
343 .build(),
344 tokens_input: meter
345 .u64_counter("agent.tokens.input")
346 .with_description("Total input tokens consumed")
347 .build(),
348 tokens_output: meter
349 .u64_counter("agent.tokens.output")
350 .with_description("Total output tokens generated")
351 .build(),
352 cache_read_tokens: meter
353 .u64_counter("agent.tokens.cache_read")
354 .with_description("Total cache read tokens")
355 .build(),
356 cache_creation_tokens: meter
357 .u64_counter("agent.tokens.cache_creation")
358 .with_description("Total cache creation tokens")
359 .build(),
360 tool_calls_total: meter
361 .u64_counter("agent.tool_calls.total")
362 .with_description("Total number of tool calls")
363 .build(),
364 tool_errors: meter
365 .u64_counter("agent.tool_calls.error")
366 .with_description("Number of failed tool calls")
367 .build(),
368 active_sessions: meter
369 .i64_up_down_counter("agent.sessions.active")
370 .with_description("Number of active sessions")
371 .build(),
372 request_latency: meter
373 .f64_histogram("agent.request.latency")
374 .with_description("Request latency in milliseconds")
375 .with_unit("ms")
376 .build(),
377 cost_total: meter
378 .f64_counter("agent.cost.total")
379 .with_description("Total cost in USD")
380 .with_unit("USD")
381 .build(),
382 }
383 }
384
385 pub fn record_request_start(&self) {
386 self.requests_total.add(1, &[]);
387 self.active_sessions.add(1, &[]);
388 }
389
390 pub fn record_request_end(&self, success: bool, latency_ms: f64) {
391 self.active_sessions.add(-1, &[]);
392 self.request_latency.record(latency_ms, &[]);
393 if success {
394 self.requests_success.add(1, &[]);
395 } else {
396 self.requests_error.add(1, &[]);
397 }
398 }
399
400 pub fn record_tokens(&self, input: u64, output: u64) {
401 self.tokens_input.add(input, &[]);
402 self.tokens_output.add(output, &[]);
403 }
404
405 pub fn record_cache(&self, read: u64, creation: u64) {
406 self.cache_read_tokens.add(read, &[]);
407 self.cache_creation_tokens.add(creation, &[]);
408 }
409
410 pub fn record_tool_call(&self, success: bool) {
411 self.tool_calls_total.add(1, &[]);
412 if !success {
413 self.tool_errors.add(1, &[]);
414 }
415 }
416
417 pub fn record_cost(&self, cost_usd: Decimal) {
418 let cost_f64 = cost_usd.to_f64().unwrap_or(0.0);
420 self.cost_total.add(cost_f64, &[]);
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427
428 #[test]
429 fn test_otel_config_default() {
430 let config = OtelConfig::default();
431 assert_eq!(config.service_name, "claude-agent");
432 assert!(config.traces_enabled);
433 assert!(config.metrics_enabled);
434 assert_eq!(config.sample_ratio, 1.0);
435 }
436
437 #[test]
438 fn test_otel_config_builder() {
439 let config = OtelConfig::new("my-agent")
440 .endpoint("http://otel-collector:4317")
441 .sample_ratio(0.5)
442 .metrics_interval(Duration::from_secs(30));
443
444 assert_eq!(config.service_name, "my-agent");
445 assert_eq!(config.otlp_endpoint, "http://otel-collector:4317");
446 assert_eq!(config.sample_ratio, 0.5);
447 assert_eq!(config.metrics_export_interval, Duration::from_secs(30));
448 }
449
450 #[test]
451 fn test_sample_ratio_clamping() {
452 let config = OtelConfig::default().sample_ratio(1.5);
453 assert_eq!(config.sample_ratio, 1.0);
454
455 let config = OtelConfig::default().sample_ratio(-0.5);
456 assert_eq!(config.sample_ratio, 0.0);
457 }
458}