Skip to main content

ai_agents_observability/
manager.rs

1use crate::aggregator::{
2    AggregatedMetrics, MetricsAggregator, aggregate_events, enrich_dimensions,
3};
4use crate::config::{AggregationDimension, ExportFormat, ObservabilityConfig, UnknownPricePolicy};
5use crate::context::{SpanContext, current_observation_context};
6use crate::cost::CostEstimator;
7use crate::event::{
8    CostEstimate, EventStatus, EventType, ObservationEvent, ObservationPurpose,
9    ObservationTokenUsage,
10};
11use crate::export::{ExportResult, export_observability};
12use crate::redaction::Redactor;
13use crate::report::{ObservabilityReport, generate_report};
14use crate::span::SpanGuard;
15use crate::{ObservabilityError, Result};
16use chrono::Utc;
17use parking_lot::{Mutex, RwLock};
18use serde_json::Value;
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::time::Duration;
23use tokio::sync::mpsc;
24use uuid::Uuid;
25
26/// Central collector that receives events, applies privacy rules, aggregates metrics, and exports reports.
27pub struct ObservabilityManager {
28    config: ObservabilityConfig,
29    sender: mpsc::Sender<ObservationEvent>,
30    receiver: Mutex<mpsc::Receiver<ObservationEvent>>,
31    raw_events: RwLock<VecDeque<ObservationEvent>>,
32    aggregator: MetricsAggregator,
33    cost_estimator: CostEstimator,
34    redactor: Redactor,
35    dropped_events: AtomicU64,
36}
37
38impl ObservabilityManager {
39    /// Creates a shared manager with bounded event buffering.
40    pub fn new(config: ObservabilityConfig) -> Arc<Self> {
41        let _ = config.validate();
42        let (sender, receiver) = mpsc::channel(config.buffer.event_buffer.max(1));
43        Arc::new(Self {
44            cost_estimator: CostEstimator::new(config.cost.clone()),
45            redactor: Redactor::new(config.privacy.clone()),
46            aggregator: MetricsAggregator::new(config.aggregation.clone()),
47            sender,
48            receiver: Mutex::new(receiver),
49            raw_events: RwLock::new(VecDeque::new()),
50            dropped_events: AtomicU64::new(0),
51            config,
52        })
53    }
54
55    /// Returns the immutable configuration used by this manager.
56    pub fn config(&self) -> &ObservabilityConfig {
57        &self.config
58    }
59
60    /// Starts a measured span for an LLM or tool wrapper.
61    pub fn start_span(
62        self: &Arc<Self>,
63        event_type: EventType,
64        purpose: ObservationPurpose,
65    ) -> SpanGuard {
66        let mut context = current_observation_context()
67            .map(|ctx| ctx.child())
68            .unwrap_or_else(|| SpanContext::new_root("unknown"));
69        context.purpose = purpose;
70        SpanGuard::new(Arc::clone(self), context, event_type)
71    }
72
73    /// Records hook-style lifecycle events that are not LLM or tool wrapper calls.
74    pub fn record_lifecycle_event(
75        &self,
76        event_type: EventType,
77        purpose: ObservationPurpose,
78        status: EventStatus,
79        duration_ms: u64,
80        tags: HashMap<String, String>,
81        payload: Option<Value>,
82    ) {
83        let context = current_observation_context()
84            .map(|ctx| ctx.child())
85            .unwrap_or_else(|| SpanContext::new_root("unknown"));
86        let dimensions = context_dimension_map(&context);
87        let event = ObservationEvent {
88            trace_id: context.trace_id,
89            span_id: context.span_id,
90            parent_span_id: context.parent_span_id,
91            turn_id: context.turn_id,
92            agent_id: context.agent_id,
93            actor_id: context.actor_id,
94            session_id: context.session_id,
95            event_type,
96            purpose,
97            status,
98            timestamp: Utc::now(),
99            duration_ms,
100            tokens: None,
101            cost: None,
102            error: None,
103            dimensions,
104            tags,
105            payload,
106        };
107        self.record_event(event);
108    }
109
110    /// Queues a completed event without blocking the observed call path.
111    pub fn record_event(&self, event: ObservationEvent) {
112        if !self.config.enabled {
113            return;
114        }
115        match self.sender.try_send(event) {
116            Ok(()) => {}
117            Err(mpsc::error::TrySendError::Full(event)) => {
118                if self.config.buffer.drop_on_full {
119                    self.dropped_events.fetch_add(1, Ordering::Relaxed);
120                } else {
121                    self.ingest_event(event);
122                }
123            }
124            Err(mpsc::error::TrySendError::Closed(event)) => {
125                self.ingest_event(event);
126            }
127        }
128    }
129
130    /// Drains pending queued events into aggregation and raw buffers.
131    pub async fn flush(&self) -> Result<()> {
132        self.drain_pending();
133        Ok(())
134    }
135
136    /// Returns configured aggregate metrics after draining pending events.
137    pub fn get_metrics(&self) -> Vec<AggregatedMetrics> {
138        self.drain_pending();
139        self.aggregator.aggregate_configured()
140    }
141
142    /// Returns retained raw events after redaction and queue draining.
143    pub fn raw_events(&self) -> Vec<ObservationEvent> {
144        self.drain_pending();
145        self.raw_events.read().iter().cloned().collect()
146    }
147
148    /// Builds the user-facing report from the current rolling event window.
149    pub fn generate_report(&self) -> ObservabilityReport {
150        self.drain_pending();
151        let events = self.aggregator.events();
152        generate_report(
153            &events,
154            self.aggregator.aggregate_configured(),
155            self.dropped_events(),
156        )
157    }
158
159    /// Writes configured report, aggregate, raw event, and Prometheus files.
160    pub async fn export(&self) -> Result<ExportResult> {
161        export_observability(self).map_err(ObservabilityError::Io)
162    }
163
164    /// Returns the total number of events dropped by bounded buffers.
165    pub fn dropped_events(&self) -> u64 {
166        self.dropped_events.load(Ordering::Relaxed)
167    }
168
169    /// Returns the redactor used by wrappers for safe payload summaries.
170    pub fn redactor(&self) -> &Redactor {
171        &self.redactor
172    }
173
174    /// Converts a completed SpanGuard into an ObservationEvent.
175    pub fn build_event_from_span(
176        &self,
177        context: SpanContext,
178        event_type: EventType,
179        duration: Duration,
180        status: EventStatus,
181        tokens: Option<crate::event::ObservationTokenUsage>,
182        error: Option<crate::event::ObservationError>,
183        tags: HashMap<String, String>,
184        payload: Option<Value>,
185    ) -> ObservationEvent {
186        let dimensions = context_dimension_map(&context);
187        ObservationEvent {
188            trace_id: context.trace_id,
189            span_id: context.span_id,
190            parent_span_id: context.parent_span_id,
191            turn_id: context.turn_id,
192            agent_id: context.agent_id,
193            actor_id: context.actor_id,
194            session_id: context.session_id,
195            event_type,
196            purpose: context.purpose,
197            status,
198            timestamp: Utc::now(),
199            duration_ms: duration.as_millis() as u64,
200            tokens,
201            cost: None::<CostEstimate>,
202            error,
203            dimensions,
204            tags,
205            payload,
206        }
207    }
208
209    /// Drains queued events into the synchronous aggregation path.
210    fn drain_pending(&self) {
211        let mut receiver = self.receiver.lock();
212        loop {
213            match receiver.try_recv() {
214                Ok(event) => self.ingest_event(event),
215                Err(mpsc::error::TryRecvError::Empty)
216                | Err(mpsc::error::TryRecvError::Disconnected) => break,
217            }
218        }
219    }
220
221    /// Enriches, costs, redacts, aggregates, and optionally stores one event.
222    fn ingest_event(&self, mut event: ObservationEvent) {
223        enrich_dimensions(&mut event);
224        event.tokens = event
225            .tokens
226            .take()
227            .map(|tokens| self.apply_token_config(tokens));
228        if event.cost.is_none() {
229            let (provider, model) = match &event.event_type {
230                EventType::LlmCall {
231                    provider, model, ..
232                } => (Some(provider.as_str()), Some(model.as_str())),
233                _ => (None, None),
234            };
235            event.cost = self
236                .cost_estimator
237                .estimate(provider, model, event.tokens.as_ref());
238            if matches!(
239                self.config.cost.unknown_price_policy,
240                UnknownPricePolicy::Error
241            ) && event.tokens.is_some()
242                && event.cost.is_none()
243                && matches!(&event.event_type, EventType::LlmCall { .. })
244            {
245                event
246                    .tags
247                    .insert("cost_error".to_string(), "unknown_price".to_string());
248            }
249        }
250        let event = self.redactor.redact_event(event);
251        self.aggregator.record(event.clone());
252        self.store_raw_event(event);
253    }
254
255    /// Applies token count switches before reports and cost estimates read usage.
256    fn apply_token_config(&self, mut tokens: ObservationTokenUsage) -> ObservationTokenUsage {
257        if !self.config.tokens.count_input {
258            tokens.input_tokens = 0;
259        }
260        if !self.config.tokens.count_output {
261            tokens.output_tokens = 0;
262        }
263        tokens.total_tokens = tokens.input_tokens + tokens.output_tokens;
264        tokens
265    }
266
267    /// Retains a redacted raw event when raw event export is enabled.
268    fn store_raw_event(&self, event: ObservationEvent) {
269        if !self.config.export.write_raw_events {
270            return;
271        }
272        if self.config.buffer.raw_event_limit == 0 {
273            self.dropped_events.fetch_add(1, Ordering::Relaxed);
274            return;
275        }
276        let mut raw_events = self.raw_events.write();
277        if raw_events.len() >= self.config.buffer.raw_event_limit {
278            if self.config.buffer.drop_on_full {
279                self.dropped_events.fetch_add(1, Ordering::Relaxed);
280                return;
281            }
282            raw_events.pop_front();
283        }
284        raw_events.push_back(event);
285    }
286
287    /// Renders current aggregate metrics in Prometheus text exposition format.
288    pub fn render_prometheus(&self) -> String {
289        let report = self.generate_report();
290        let events = self.aggregator.events();
291        let llm_events: Vec<_> = events
292            .iter()
293            .filter(|event| matches!(&event.event_type, EventType::LlmCall { .. }))
294            .cloned()
295            .collect();
296        let tool_events: Vec<_> = events
297            .iter()
298            .filter(|event| matches!(&event.event_type, EventType::ToolCall { .. }))
299            .cloned()
300            .collect();
301        let by_model_purpose = aggregate_events(
302            &llm_events,
303            &[AggregationDimension::Model, AggregationDimension::Purpose],
304        );
305        let by_tool = aggregate_events(&tool_events, &[AggregationDimension::Tool]);
306        let mut output = String::new();
307        output.push_str(
308            "# HELP ai_agents_observation_events_total Total recorded observation events\n",
309        );
310        output.push_str("# TYPE ai_agents_observation_events_total counter\n");
311        output.push_str(&format!(
312            "ai_agents_observation_events_total {}\n",
313            report.summary.total_events
314        ));
315        output.push_str("# HELP ai_agents_observation_errors_total Total observation events with error status\n");
316        output.push_str("# TYPE ai_agents_observation_errors_total counter\n");
317        output.push_str(&format!(
318            "ai_agents_observation_errors_total {}\n",
319            report.summary.total_errors
320        ));
321        output.push_str(
322            "# HELP ai_agents_observation_cost_usd_total Estimated total LLM cost in USD\n",
323        );
324        output.push_str("# TYPE ai_agents_observation_cost_usd_total counter\n");
325        output.push_str(&format!(
326            "ai_agents_observation_cost_usd_total {:.8}\n",
327            report.summary.total_cost_usd
328        ));
329        output.push_str("# HELP ai_agents_observation_tokens_total Total observed LLM tokens\n");
330        output.push_str("# TYPE ai_agents_observation_tokens_total counter\n");
331        output.push_str(&format!(
332            "ai_agents_observation_tokens_total {}\n",
333            report.summary.total_tokens
334        ));
335        output.push_str("# HELP ai_agents_llm_calls_total LLM calls grouped by safe labels\n");
336        output.push_str("# TYPE ai_agents_llm_calls_total counter\n");
337        for metric in by_model_purpose {
338            let model = metric
339                .dimensions
340                .get("model")
341                .map(String::as_str)
342                .unwrap_or("unknown");
343            let purpose = metric
344                .dimensions
345                .get("purpose")
346                .map(String::as_str)
347                .unwrap_or("unknown");
348            output.push_str(&format!(
349                "ai_agents_llm_calls_total{{model=\"{}\",purpose=\"{}\"}} {}\n",
350                prometheus_label(model),
351                prometheus_label(purpose),
352                metric.count
353            ));
354        }
355        output.push_str("# HELP ai_agents_tool_calls_total Tool calls grouped by tool ID\n");
356        output.push_str("# TYPE ai_agents_tool_calls_total counter\n");
357        for metric in by_tool {
358            let tool = metric
359                .dimensions
360                .get("tool")
361                .map(String::as_str)
362                .unwrap_or("unknown");
363            if tool != "unknown" {
364                output.push_str(&format!(
365                    "ai_agents_tool_calls_total{{tool=\"{}\"}} {}\n",
366                    prometheus_label(tool),
367                    metric.count
368                ));
369            }
370        }
371        output
372    }
373
374    /// Returns true when a format is enabled in export.formats.
375    pub fn wants_format(&self, format: ExportFormat) -> bool {
376        self.config.export.formats.contains(&format)
377    }
378}
379
380/// Escapes label values for Prometheus text output.
381fn prometheus_label(value: &str) -> String {
382    value
383        .chars()
384        .flat_map(|ch| match ch {
385            '\\' => "\\\\".chars().collect::<Vec<_>>(),
386            '"' => "\\\"".chars().collect::<Vec<_>>(),
387            '\n' | '\r' | '\t' => "_".chars().collect::<Vec<_>>(),
388            _ => vec![ch],
389        })
390        .collect()
391}
392
393/// Builds the base event dimensions from the current span context.
394fn context_dimension_map(context: &SpanContext) -> HashMap<String, String> {
395    let mut dimensions = HashMap::new();
396    dimensions.insert("agent".to_string(), context.agent_id.clone());
397    dimensions.insert("purpose".to_string(), context.purpose.as_label());
398    if let Some(actor) = &context.actor_id {
399        dimensions.insert("actor".to_string(), actor.clone());
400    }
401    if let Some(state) = &context.state {
402        dimensions.insert("state".to_string(), state.clone());
403    }
404    if let Some(language) = &context.language {
405        dimensions.insert("language".to_string(), language.clone());
406    }
407    dimensions.extend(context.tags.clone());
408    dimensions
409}
410
411/// Resolves the language dimension by checking configured context paths in order.
412pub fn resolve_language_from_context(
413    config: &ObservabilityConfig,
414    context: &HashMap<String, Value>,
415) -> String {
416    for path in &config.language.paths {
417        if let Some(value) = get_dotted(context, path) {
418            if let Some(language) = value.as_str() {
419                if !language.trim().is_empty() {
420                    return language.to_string();
421                }
422            }
423        }
424    }
425    config.language.fallback.clone()
426}
427
428/// Looks up a top-level or dotted path in a JSON context map.
429fn get_dotted<'a>(context: &'a HashMap<String, Value>, path: &str) -> Option<&'a Value> {
430    if let Some(value) = context.get(path) {
431        return Some(value);
432    }
433    let mut parts = path.split('.');
434    let first = parts.next()?;
435    let mut current = context.get(first)?;
436    for part in parts {
437        current = current.get(part)?;
438    }
439    Some(current)
440}
441
442/// Generates a session ID for observed runtime sessions that do not have one yet.
443pub fn new_session_id() -> String {
444    Uuid::new_v4().to_string()
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::event::{ObservationTokenUsage, TokenUsageSource};
451
452    #[test]
453    fn token_count_flags_are_applied_before_report() {
454        let mut config = ObservabilityConfig::default();
455        config.enabled = true;
456        config.tokens.count_input = false;
457        config.tokens.count_output = true;
458        config.cost.enabled = false;
459        let manager = ObservabilityManager::new(config);
460        let event = ObservationEvent {
461            trace_id: "trace".to_string(),
462            span_id: "span".to_string(),
463            parent_span_id: None,
464            turn_id: "turn".to_string(),
465            agent_id: "agent".to_string(),
466            actor_id: None,
467            session_id: None,
468            event_type: EventType::LlmCall {
469                provider: "openai".to_string(),
470                model: "test".to_string(),
471                alias: Some("default".to_string()),
472                streaming: false,
473            },
474            purpose: ObservationPurpose::MainResponse,
475            status: EventStatus::Success,
476            timestamp: Utc::now(),
477            duration_ms: 10,
478            tokens: Some(ObservationTokenUsage::new(
479                100,
480                25,
481                TokenUsageSource::Provider,
482            )),
483            cost: None,
484            error: None,
485            dimensions: HashMap::new(),
486            tags: HashMap::new(),
487            payload: None,
488        };
489
490        manager.record_event(event);
491        let report = manager.generate_report();
492        assert_eq!(report.token_breakdown.total_input, 0);
493        assert_eq!(report.token_breakdown.total_output, 25);
494        assert_eq!(report.token_breakdown.total_tokens, 25);
495    }
496}