1use crate::aggregator::{
2 AggregatedMetrics, MetricsAggregator, aggregate_events, enrich_dimensions,
3};
4use crate::config::{AggregationDimension, ExportFormat, ObservabilityConfig, UnknownPricePolicy};
5use crate::context::{SpanContext, current_observation_context};
6use crate::cost::CostEstimator;
7use crate::event::{
8 CostEstimate, EventStatus, EventType, ObservationEvent, ObservationPurpose,
9 ObservationTokenUsage,
10};
11use crate::export::{ExportResult, export_observability};
12use crate::redaction::Redactor;
13use crate::report::{ObservabilityReport, generate_report};
14use crate::span::SpanGuard;
15use crate::{ObservabilityError, Result};
16use chrono::Utc;
17use parking_lot::{Mutex, RwLock};
18use serde_json::Value;
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::time::Duration;
23use tokio::sync::mpsc;
24use uuid::Uuid;
25
26pub struct ObservabilityManager {
28 config: ObservabilityConfig,
29 sender: mpsc::Sender<ObservationEvent>,
30 receiver: Mutex<mpsc::Receiver<ObservationEvent>>,
31 raw_events: RwLock<VecDeque<ObservationEvent>>,
32 aggregator: MetricsAggregator,
33 cost_estimator: CostEstimator,
34 redactor: Redactor,
35 dropped_events: AtomicU64,
36}
37
38impl ObservabilityManager {
39 pub fn new(config: ObservabilityConfig) -> Arc<Self> {
41 let _ = config.validate();
42 let (sender, receiver) = mpsc::channel(config.buffer.event_buffer.max(1));
43 Arc::new(Self {
44 cost_estimator: CostEstimator::new(config.cost.clone()),
45 redactor: Redactor::new(config.privacy.clone()),
46 aggregator: MetricsAggregator::new(config.aggregation.clone()),
47 sender,
48 receiver: Mutex::new(receiver),
49 raw_events: RwLock::new(VecDeque::new()),
50 dropped_events: AtomicU64::new(0),
51 config,
52 })
53 }
54
55 pub fn config(&self) -> &ObservabilityConfig {
57 &self.config
58 }
59
60 pub fn start_span(
62 self: &Arc<Self>,
63 event_type: EventType,
64 purpose: ObservationPurpose,
65 ) -> SpanGuard {
66 let mut context = current_observation_context()
67 .map(|ctx| ctx.child())
68 .unwrap_or_else(|| SpanContext::new_root("unknown"));
69 context.purpose = purpose;
70 SpanGuard::new(Arc::clone(self), context, event_type)
71 }
72
73 pub fn record_lifecycle_event(
75 &self,
76 event_type: EventType,
77 purpose: ObservationPurpose,
78 status: EventStatus,
79 duration_ms: u64,
80 tags: HashMap<String, String>,
81 payload: Option<Value>,
82 ) {
83 let context = current_observation_context()
84 .map(|ctx| ctx.child())
85 .unwrap_or_else(|| SpanContext::new_root("unknown"));
86 let dimensions = context_dimension_map(&context);
87 let event = ObservationEvent {
88 trace_id: context.trace_id,
89 span_id: context.span_id,
90 parent_span_id: context.parent_span_id,
91 turn_id: context.turn_id,
92 agent_id: context.agent_id,
93 actor_id: context.actor_id,
94 session_id: context.session_id,
95 event_type,
96 purpose,
97 status,
98 timestamp: Utc::now(),
99 duration_ms,
100 tokens: None,
101 cost: None,
102 error: None,
103 dimensions,
104 tags,
105 payload,
106 };
107 self.record_event(event);
108 }
109
110 pub fn record_event(&self, event: ObservationEvent) {
112 if !self.config.enabled {
113 return;
114 }
115 match self.sender.try_send(event) {
116 Ok(()) => {}
117 Err(mpsc::error::TrySendError::Full(event)) => {
118 if self.config.buffer.drop_on_full {
119 self.dropped_events.fetch_add(1, Ordering::Relaxed);
120 } else {
121 self.ingest_event(event);
122 }
123 }
124 Err(mpsc::error::TrySendError::Closed(event)) => {
125 self.ingest_event(event);
126 }
127 }
128 }
129
130 pub async fn flush(&self) -> Result<()> {
132 self.drain_pending();
133 Ok(())
134 }
135
136 pub fn get_metrics(&self) -> Vec<AggregatedMetrics> {
138 self.drain_pending();
139 self.aggregator.aggregate_configured()
140 }
141
142 pub fn raw_events(&self) -> Vec<ObservationEvent> {
144 self.drain_pending();
145 self.raw_events.read().iter().cloned().collect()
146 }
147
148 pub fn generate_report(&self) -> ObservabilityReport {
150 self.drain_pending();
151 let events = self.aggregator.events();
152 generate_report(
153 &events,
154 self.aggregator.aggregate_configured(),
155 self.dropped_events(),
156 )
157 }
158
159 pub async fn export(&self) -> Result<ExportResult> {
161 export_observability(self).map_err(ObservabilityError::Io)
162 }
163
164 pub fn dropped_events(&self) -> u64 {
166 self.dropped_events.load(Ordering::Relaxed)
167 }
168
169 pub fn redactor(&self) -> &Redactor {
171 &self.redactor
172 }
173
174 pub fn build_event_from_span(
176 &self,
177 context: SpanContext,
178 event_type: EventType,
179 duration: Duration,
180 status: EventStatus,
181 tokens: Option<crate::event::ObservationTokenUsage>,
182 error: Option<crate::event::ObservationError>,
183 tags: HashMap<String, String>,
184 payload: Option<Value>,
185 ) -> ObservationEvent {
186 let dimensions = context_dimension_map(&context);
187 ObservationEvent {
188 trace_id: context.trace_id,
189 span_id: context.span_id,
190 parent_span_id: context.parent_span_id,
191 turn_id: context.turn_id,
192 agent_id: context.agent_id,
193 actor_id: context.actor_id,
194 session_id: context.session_id,
195 event_type,
196 purpose: context.purpose,
197 status,
198 timestamp: Utc::now(),
199 duration_ms: duration.as_millis() as u64,
200 tokens,
201 cost: None::<CostEstimate>,
202 error,
203 dimensions,
204 tags,
205 payload,
206 }
207 }
208
209 fn drain_pending(&self) {
211 let mut receiver = self.receiver.lock();
212 loop {
213 match receiver.try_recv() {
214 Ok(event) => self.ingest_event(event),
215 Err(mpsc::error::TryRecvError::Empty)
216 | Err(mpsc::error::TryRecvError::Disconnected) => break,
217 }
218 }
219 }
220
221 fn ingest_event(&self, mut event: ObservationEvent) {
223 enrich_dimensions(&mut event);
224 event.tokens = event
225 .tokens
226 .take()
227 .map(|tokens| self.apply_token_config(tokens));
228 if event.cost.is_none() {
229 let (provider, model) = match &event.event_type {
230 EventType::LlmCall {
231 provider, model, ..
232 } => (Some(provider.as_str()), Some(model.as_str())),
233 _ => (None, None),
234 };
235 event.cost = self
236 .cost_estimator
237 .estimate(provider, model, event.tokens.as_ref());
238 if matches!(
239 self.config.cost.unknown_price_policy,
240 UnknownPricePolicy::Error
241 ) && event.tokens.is_some()
242 && event.cost.is_none()
243 && matches!(&event.event_type, EventType::LlmCall { .. })
244 {
245 event
246 .tags
247 .insert("cost_error".to_string(), "unknown_price".to_string());
248 }
249 }
250 let event = self.redactor.redact_event(event);
251 self.aggregator.record(event.clone());
252 self.store_raw_event(event);
253 }
254
255 fn apply_token_config(&self, mut tokens: ObservationTokenUsage) -> ObservationTokenUsage {
257 if !self.config.tokens.count_input {
258 tokens.input_tokens = 0;
259 }
260 if !self.config.tokens.count_output {
261 tokens.output_tokens = 0;
262 }
263 tokens.total_tokens = tokens.input_tokens + tokens.output_tokens;
264 tokens
265 }
266
267 fn store_raw_event(&self, event: ObservationEvent) {
269 if !self.config.export.write_raw_events {
270 return;
271 }
272 if self.config.buffer.raw_event_limit == 0 {
273 self.dropped_events.fetch_add(1, Ordering::Relaxed);
274 return;
275 }
276 let mut raw_events = self.raw_events.write();
277 if raw_events.len() >= self.config.buffer.raw_event_limit {
278 if self.config.buffer.drop_on_full {
279 self.dropped_events.fetch_add(1, Ordering::Relaxed);
280 return;
281 }
282 raw_events.pop_front();
283 }
284 raw_events.push_back(event);
285 }
286
287 pub fn render_prometheus(&self) -> String {
289 let report = self.generate_report();
290 let events = self.aggregator.events();
291 let llm_events: Vec<_> = events
292 .iter()
293 .filter(|event| matches!(&event.event_type, EventType::LlmCall { .. }))
294 .cloned()
295 .collect();
296 let tool_events: Vec<_> = events
297 .iter()
298 .filter(|event| matches!(&event.event_type, EventType::ToolCall { .. }))
299 .cloned()
300 .collect();
301 let by_model_purpose = aggregate_events(
302 &llm_events,
303 &[AggregationDimension::Model, AggregationDimension::Purpose],
304 );
305 let by_tool = aggregate_events(&tool_events, &[AggregationDimension::Tool]);
306 let mut output = String::new();
307 output.push_str(
308 "# HELP ai_agents_observation_events_total Total recorded observation events\n",
309 );
310 output.push_str("# TYPE ai_agents_observation_events_total counter\n");
311 output.push_str(&format!(
312 "ai_agents_observation_events_total {}\n",
313 report.summary.total_events
314 ));
315 output.push_str("# HELP ai_agents_observation_errors_total Total observation events with error status\n");
316 output.push_str("# TYPE ai_agents_observation_errors_total counter\n");
317 output.push_str(&format!(
318 "ai_agents_observation_errors_total {}\n",
319 report.summary.total_errors
320 ));
321 output.push_str(
322 "# HELP ai_agents_observation_cost_usd_total Estimated total LLM cost in USD\n",
323 );
324 output.push_str("# TYPE ai_agents_observation_cost_usd_total counter\n");
325 output.push_str(&format!(
326 "ai_agents_observation_cost_usd_total {:.8}\n",
327 report.summary.total_cost_usd
328 ));
329 output.push_str("# HELP ai_agents_observation_tokens_total Total observed LLM tokens\n");
330 output.push_str("# TYPE ai_agents_observation_tokens_total counter\n");
331 output.push_str(&format!(
332 "ai_agents_observation_tokens_total {}\n",
333 report.summary.total_tokens
334 ));
335 output.push_str("# HELP ai_agents_llm_calls_total LLM calls grouped by safe labels\n");
336 output.push_str("# TYPE ai_agents_llm_calls_total counter\n");
337 for metric in by_model_purpose {
338 let model = metric
339 .dimensions
340 .get("model")
341 .map(String::as_str)
342 .unwrap_or("unknown");
343 let purpose = metric
344 .dimensions
345 .get("purpose")
346 .map(String::as_str)
347 .unwrap_or("unknown");
348 output.push_str(&format!(
349 "ai_agents_llm_calls_total{{model=\"{}\",purpose=\"{}\"}} {}\n",
350 prometheus_label(model),
351 prometheus_label(purpose),
352 metric.count
353 ));
354 }
355 output.push_str("# HELP ai_agents_tool_calls_total Tool calls grouped by tool ID\n");
356 output.push_str("# TYPE ai_agents_tool_calls_total counter\n");
357 for metric in by_tool {
358 let tool = metric
359 .dimensions
360 .get("tool")
361 .map(String::as_str)
362 .unwrap_or("unknown");
363 if tool != "unknown" {
364 output.push_str(&format!(
365 "ai_agents_tool_calls_total{{tool=\"{}\"}} {}\n",
366 prometheus_label(tool),
367 metric.count
368 ));
369 }
370 }
371 output
372 }
373
374 pub fn wants_format(&self, format: ExportFormat) -> bool {
376 self.config.export.formats.contains(&format)
377 }
378}
379
380fn prometheus_label(value: &str) -> String {
382 value
383 .chars()
384 .flat_map(|ch| match ch {
385 '\\' => "\\\\".chars().collect::<Vec<_>>(),
386 '"' => "\\\"".chars().collect::<Vec<_>>(),
387 '\n' | '\r' | '\t' => "_".chars().collect::<Vec<_>>(),
388 _ => vec![ch],
389 })
390 .collect()
391}
392
393fn context_dimension_map(context: &SpanContext) -> HashMap<String, String> {
395 let mut dimensions = HashMap::new();
396 dimensions.insert("agent".to_string(), context.agent_id.clone());
397 dimensions.insert("purpose".to_string(), context.purpose.as_label());
398 if let Some(actor) = &context.actor_id {
399 dimensions.insert("actor".to_string(), actor.clone());
400 }
401 if let Some(state) = &context.state {
402 dimensions.insert("state".to_string(), state.clone());
403 }
404 if let Some(language) = &context.language {
405 dimensions.insert("language".to_string(), language.clone());
406 }
407 dimensions.extend(context.tags.clone());
408 dimensions
409}
410
411pub fn resolve_language_from_context(
413 config: &ObservabilityConfig,
414 context: &HashMap<String, Value>,
415) -> String {
416 for path in &config.language.paths {
417 if let Some(value) = get_dotted(context, path) {
418 if let Some(language) = value.as_str() {
419 if !language.trim().is_empty() {
420 return language.to_string();
421 }
422 }
423 }
424 }
425 config.language.fallback.clone()
426}
427
428fn get_dotted<'a>(context: &'a HashMap<String, Value>, path: &str) -> Option<&'a Value> {
430 if let Some(value) = context.get(path) {
431 return Some(value);
432 }
433 let mut parts = path.split('.');
434 let first = parts.next()?;
435 let mut current = context.get(first)?;
436 for part in parts {
437 current = current.get(part)?;
438 }
439 Some(current)
440}
441
442pub fn new_session_id() -> String {
444 Uuid::new_v4().to_string()
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::event::{ObservationTokenUsage, TokenUsageSource};
451
452 #[test]
453 fn token_count_flags_are_applied_before_report() {
454 let mut config = ObservabilityConfig::default();
455 config.enabled = true;
456 config.tokens.count_input = false;
457 config.tokens.count_output = true;
458 config.cost.enabled = false;
459 let manager = ObservabilityManager::new(config);
460 let event = ObservationEvent {
461 trace_id: "trace".to_string(),
462 span_id: "span".to_string(),
463 parent_span_id: None,
464 turn_id: "turn".to_string(),
465 agent_id: "agent".to_string(),
466 actor_id: None,
467 session_id: None,
468 event_type: EventType::LlmCall {
469 provider: "openai".to_string(),
470 model: "test".to_string(),
471 alias: Some("default".to_string()),
472 streaming: false,
473 },
474 purpose: ObservationPurpose::MainResponse,
475 status: EventStatus::Success,
476 timestamp: Utc::now(),
477 duration_ms: 10,
478 tokens: Some(ObservationTokenUsage::new(
479 100,
480 25,
481 TokenUsageSource::Provider,
482 )),
483 cost: None,
484 error: None,
485 dimensions: HashMap::new(),
486 tags: HashMap::new(),
487 payload: None,
488 };
489
490 manager.record_event(event);
491 let report = manager.generate_report();
492 assert_eq!(report.token_breakdown.total_input, 0);
493 assert_eq!(report.token_breakdown.total_output, 25);
494 assert_eq!(report.token_breakdown.total_tokens, 25);
495 }
496}