claude_agent/observability/
otel.rs1use std::time::Duration;
7
8use opentelemetry::{KeyValue, global};
9use opentelemetry_otlp::{MetricExporter, Protocol, SpanExporter, WithExportConfig};
10use opentelemetry_sdk::Resource;
11use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
12use opentelemetry_sdk::propagation::TraceContextPropagator;
13use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
14use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
15use tracing_opentelemetry::OpenTelemetryLayer;
16use tracing_subscriber::EnvFilter;
17use tracing_subscriber::layer::SubscriberExt;
18use tracing_subscriber::util::SubscriberInitExt;
19
20pub const SERVICE_NAME_DEFAULT: &str = "claude-agent";
22
23#[derive(Debug, Clone)]
25pub struct OtelConfig {
26 pub service_name: String,
27 pub service_version: Option<String>,
28 pub otlp_endpoint: String,
29 pub traces_enabled: bool,
30 pub metrics_enabled: bool,
31 pub metrics_export_interval: Duration,
32 pub sample_ratio: f64,
33}
34
35impl Default for OtelConfig {
36 fn default() -> Self {
37 Self {
38 service_name: SERVICE_NAME_DEFAULT.to_string(),
39 service_version: Some(env!("CARGO_PKG_VERSION").to_string()),
40 otlp_endpoint: "http://localhost:4317".to_string(),
41 traces_enabled: true,
42 metrics_enabled: true,
43 metrics_export_interval: Duration::from_secs(60),
44 sample_ratio: 1.0,
45 }
46 }
47}
48
49impl OtelConfig {
50 pub fn new(service_name: impl Into<String>) -> Self {
51 Self {
52 service_name: service_name.into(),
53 ..Default::default()
54 }
55 }
56
57 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
58 self.otlp_endpoint = endpoint.into();
59 self
60 }
61
62 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
63 self.service_version = Some(version.into());
64 self
65 }
66
67 pub fn with_traces(mut self, enabled: bool) -> Self {
68 self.traces_enabled = enabled;
69 self
70 }
71
72 pub fn with_metrics(mut self, enabled: bool) -> Self {
73 self.metrics_enabled = enabled;
74 self
75 }
76
77 pub fn with_metrics_interval(mut self, interval: Duration) -> Self {
78 self.metrics_export_interval = interval;
79 self
80 }
81
82 pub fn with_sample_ratio(mut self, ratio: f64) -> Self {
83 self.sample_ratio = ratio.clamp(0.0, 1.0);
84 self
85 }
86
87 pub fn from_env() -> Self {
88 let mut config = Self::default();
89
90 if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
91 config.otlp_endpoint = endpoint;
92 }
93
94 if let Ok(name) = std::env::var("OTEL_SERVICE_NAME") {
95 config.service_name = name;
96 }
97
98 if let Ok(ratio) = std::env::var("OTEL_TRACES_SAMPLER_ARG")
99 && let Ok(r) = ratio.parse::<f64>()
100 {
101 config.sample_ratio = r.clamp(0.0, 1.0);
102 }
103
104 config
105 }
106
107 fn build_resource(&self) -> Resource {
108 let mut attributes = vec![KeyValue::new(SERVICE_NAME, self.service_name.clone())];
109
110 if let Some(ref version) = self.service_version {
111 attributes.push(KeyValue::new(SERVICE_VERSION, version.clone()));
112 }
113
114 Resource::builder().with_attributes(attributes).build()
115 }
116}
117
118pub struct OtelRuntime {
123 tracer_provider: Option<SdkTracerProvider>,
124 meter_provider: Option<SdkMeterProvider>,
125}
126
127impl OtelRuntime {
128 pub fn init(config: &OtelConfig) -> Result<Self, OtelError> {
130 global::set_text_map_propagator(TraceContextPropagator::new());
131
132 let resource = config.build_resource();
133
134 let tracer_provider = if config.traces_enabled {
135 Some(Self::init_tracer(config, resource.clone())?)
136 } else {
137 None
138 };
139
140 let meter_provider = if config.metrics_enabled {
141 Some(Self::init_metrics(config, resource)?)
142 } else {
143 None
144 };
145
146 Ok(Self {
147 tracer_provider,
148 meter_provider,
149 })
150 }
151
152 fn init_tracer(
153 config: &OtelConfig,
154 resource: Resource,
155 ) -> Result<SdkTracerProvider, OtelError> {
156 let exporter = SpanExporter::builder()
157 .with_http()
158 .with_protocol(Protocol::HttpBinary)
159 .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
160 .build()
161 .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
162
163 let sampler = if config.sample_ratio >= 1.0 {
164 Sampler::AlwaysOn
165 } else if config.sample_ratio <= 0.0 {
166 Sampler::AlwaysOff
167 } else {
168 Sampler::TraceIdRatioBased(config.sample_ratio)
169 };
170
171 let provider = SdkTracerProvider::builder()
172 .with_batch_exporter(exporter)
173 .with_sampler(sampler)
174 .with_id_generator(RandomIdGenerator::default())
175 .with_resource(resource)
176 .build();
177
178 global::set_tracer_provider(provider.clone());
179
180 Ok(provider)
181 }
182
183 fn init_metrics(
184 config: &OtelConfig,
185 resource: Resource,
186 ) -> Result<SdkMeterProvider, OtelError> {
187 let exporter = MetricExporter::builder()
188 .with_http()
189 .with_protocol(Protocol::HttpBinary)
190 .with_endpoint(format!("{}/v1/metrics", config.otlp_endpoint))
191 .build()
192 .map_err(|e| OtelError::Init(format!("Failed to create metric exporter: {}", e)))?;
193
194 let reader = PeriodicReader::builder(exporter)
195 .with_interval(config.metrics_export_interval)
196 .build();
197
198 let provider = SdkMeterProvider::builder()
199 .with_reader(reader)
200 .with_resource(resource)
201 .build();
202
203 global::set_meter_provider(provider.clone());
204
205 Ok(provider)
206 }
207
208 pub fn meter(&self, name: &'static str) -> opentelemetry::metrics::Meter {
210 global::meter(name)
211 }
212
213 pub fn shutdown(self) {
215 if let Some(provider) = self.tracer_provider
216 && let Err(e) = provider.shutdown()
217 {
218 tracing::warn!("Failed to shutdown tracer provider: {:?}", e);
219 }
220
221 if let Some(provider) = self.meter_provider
222 && let Err(e) = provider.shutdown()
223 {
224 tracing::warn!("Failed to shutdown meter provider: {:?}", e);
225 }
226 }
227}
228
229pub fn init_tracing_subscriber(config: &OtelConfig, with_console: bool) -> Result<(), OtelError> {
235 let resource = config.build_resource();
236
237 let exporter = SpanExporter::builder()
238 .with_http()
239 .with_protocol(Protocol::HttpBinary)
240 .with_endpoint(format!("{}/v1/traces", config.otlp_endpoint))
241 .build()
242 .map_err(|e| OtelError::Init(format!("Failed to create span exporter: {}", e)))?;
243
244 let sampler = if config.sample_ratio >= 1.0 {
245 Sampler::AlwaysOn
246 } else if config.sample_ratio <= 0.0 {
247 Sampler::AlwaysOff
248 } else {
249 Sampler::TraceIdRatioBased(config.sample_ratio)
250 };
251
252 let provider = SdkTracerProvider::builder()
253 .with_batch_exporter(exporter)
254 .with_sampler(sampler)
255 .with_id_generator(RandomIdGenerator::default())
256 .with_resource(resource)
257 .build();
258
259 global::set_tracer_provider(provider);
260
261 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
262
263 if with_console {
264 let fmt_layer = tracing_subscriber::fmt::layer()
265 .with_target(true)
266 .with_thread_ids(false)
267 .with_file(false);
268
269 tracing_subscriber::registry()
270 .with(env_filter)
271 .with(fmt_layer)
272 .with(OpenTelemetryLayer::new(global::tracer(
273 SERVICE_NAME_DEFAULT,
274 )))
275 .try_init()
276 .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
277 } else {
278 tracing_subscriber::registry()
279 .with(env_filter)
280 .with(OpenTelemetryLayer::new(global::tracer(
281 SERVICE_NAME_DEFAULT,
282 )))
283 .try_init()
284 .map_err(|e| OtelError::Init(format!("Failed to init subscriber: {}", e)))?;
285 }
286
287 Ok(())
288}
289
290#[derive(Debug, thiserror::Error)]
292pub enum OtelError {
293 #[error("OpenTelemetry initialization failed: {0}")]
294 Init(String),
295
296 #[error("OpenTelemetry export failed: {0}")]
297 Export(String),
298}
299
300pub mod semantic {
302 pub const AGENT_SESSION_ID: &str = "agent.session.id";
303 pub const AGENT_MODEL: &str = "agent.model";
304 pub const AGENT_REQUEST_ID: &str = "agent.request.id";
305 pub const AGENT_TOOL_NAME: &str = "agent.tool.name";
306 pub const AGENT_TOOL_USE_ID: &str = "agent.tool.use_id";
307 pub const AGENT_INPUT_TOKENS: &str = "agent.tokens.input";
308 pub const AGENT_OUTPUT_TOKENS: &str = "agent.tokens.output";
309 pub const AGENT_CACHE_READ_TOKENS: &str = "agent.tokens.cache_read";
310 pub const AGENT_CACHE_CREATION_TOKENS: &str = "agent.tokens.cache_creation";
311 pub const AGENT_COST_USD: &str = "agent.cost.usd";
312}
313
314pub struct OtelMetricsBridge {
316 requests_total: opentelemetry::metrics::Counter<u64>,
317 requests_success: opentelemetry::metrics::Counter<u64>,
318 requests_error: opentelemetry::metrics::Counter<u64>,
319 tokens_input: opentelemetry::metrics::Counter<u64>,
320 tokens_output: opentelemetry::metrics::Counter<u64>,
321 cache_read_tokens: opentelemetry::metrics::Counter<u64>,
322 cache_creation_tokens: opentelemetry::metrics::Counter<u64>,
323 tool_calls_total: opentelemetry::metrics::Counter<u64>,
324 tool_errors: opentelemetry::metrics::Counter<u64>,
325 active_sessions: opentelemetry::metrics::UpDownCounter<i64>,
326 request_latency: opentelemetry::metrics::Histogram<f64>,
327 cost_total: opentelemetry::metrics::Counter<f64>,
328}
329
330impl OtelMetricsBridge {
331 pub fn new(meter: &opentelemetry::metrics::Meter) -> Self {
332 Self {
333 requests_total: meter
334 .u64_counter("agent.requests.total")
335 .with_description("Total number of API requests")
336 .build(),
337 requests_success: meter
338 .u64_counter("agent.requests.success")
339 .with_description("Number of successful API requests")
340 .build(),
341 requests_error: meter
342 .u64_counter("agent.requests.error")
343 .with_description("Number of failed API requests")
344 .build(),
345 tokens_input: meter
346 .u64_counter("agent.tokens.input")
347 .with_description("Total input tokens consumed")
348 .build(),
349 tokens_output: meter
350 .u64_counter("agent.tokens.output")
351 .with_description("Total output tokens generated")
352 .build(),
353 cache_read_tokens: meter
354 .u64_counter("agent.tokens.cache_read")
355 .with_description("Total cache read tokens")
356 .build(),
357 cache_creation_tokens: meter
358 .u64_counter("agent.tokens.cache_creation")
359 .with_description("Total cache creation tokens")
360 .build(),
361 tool_calls_total: meter
362 .u64_counter("agent.tool_calls.total")
363 .with_description("Total number of tool calls")
364 .build(),
365 tool_errors: meter
366 .u64_counter("agent.tool_calls.error")
367 .with_description("Number of failed tool calls")
368 .build(),
369 active_sessions: meter
370 .i64_up_down_counter("agent.sessions.active")
371 .with_description("Number of active sessions")
372 .build(),
373 request_latency: meter
374 .f64_histogram("agent.request.latency")
375 .with_description("Request latency in milliseconds")
376 .with_unit("ms")
377 .build(),
378 cost_total: meter
379 .f64_counter("agent.cost.total")
380 .with_description("Total cost in USD")
381 .with_unit("USD")
382 .build(),
383 }
384 }
385
386 pub fn record_request_start(&self) {
387 self.requests_total.add(1, &[]);
388 self.active_sessions.add(1, &[]);
389 }
390
391 pub fn record_request_end(&self, success: bool, latency_ms: f64) {
392 self.active_sessions.add(-1, &[]);
393 self.request_latency.record(latency_ms, &[]);
394 if success {
395 self.requests_success.add(1, &[]);
396 } else {
397 self.requests_error.add(1, &[]);
398 }
399 }
400
401 pub fn record_tokens(&self, input: u64, output: u64) {
402 self.tokens_input.add(input, &[]);
403 self.tokens_output.add(output, &[]);
404 }
405
406 pub fn record_cache(&self, read: u64, creation: u64) {
407 self.cache_read_tokens.add(read, &[]);
408 self.cache_creation_tokens.add(creation, &[]);
409 }
410
411 pub fn record_tool_call(&self, success: bool) {
412 self.tool_calls_total.add(1, &[]);
413 if !success {
414 self.tool_errors.add(1, &[]);
415 }
416 }
417
418 pub fn record_cost(&self, cost_usd: f64) {
419 self.cost_total.add(cost_usd, &[]);
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 #[test]
428 fn test_otel_config_default() {
429 let config = OtelConfig::default();
430 assert_eq!(config.service_name, "claude-agent");
431 assert!(config.traces_enabled);
432 assert!(config.metrics_enabled);
433 assert_eq!(config.sample_ratio, 1.0);
434 }
435
436 #[test]
437 fn test_otel_config_builder() {
438 let config = OtelConfig::new("my-agent")
439 .with_endpoint("http://otel-collector:4317")
440 .with_sample_ratio(0.5)
441 .with_metrics_interval(Duration::from_secs(30));
442
443 assert_eq!(config.service_name, "my-agent");
444 assert_eq!(config.otlp_endpoint, "http://otel-collector:4317");
445 assert_eq!(config.sample_ratio, 0.5);
446 assert_eq!(config.metrics_export_interval, Duration::from_secs(30));
447 }
448
449 #[test]
450 fn test_sample_ratio_clamping() {
451 let config = OtelConfig::default().with_sample_ratio(1.5);
452 assert_eq!(config.sample_ratio, 1.0);
453
454 let config = OtelConfig::default().with_sample_ratio(-0.5);
455 assert_eq!(config.sample_ratio, 0.0);
456 }
457}