Skip to main content

autoagents_telemetry/
tracer.rs

1use autoagents_core::agent::{AgentDeriveT, AgentExecutor, AgentHooks, DirectAgentHandle};
2use autoagents_core::environment::Environment;
3use autoagents_core::utils::BoxEventStream;
4use autoagents_protocol::{Event, RuntimeID};
5use std::sync::Arc;
6
7use crate::runner::start_telemetry;
8use crate::{TelemetryConfig, TelemetryError, TelemetryHandle, TelemetryProvider};
9
10/// Owns the telemetry lifecycle for a specific event stream.
11pub struct Tracer {
12    provider: Arc<dyn TelemetryProvider>,
13    event_stream: Option<BoxEventStream<Event>>,
14    runtime_id: Option<RuntimeID>,
15    handle: Option<TelemetryHandle>,
16    shutdown_grace: std::time::Duration,
17}
18
19impl Tracer {
20    pub fn new(provider: Arc<dyn TelemetryProvider>, event_stream: BoxEventStream<Event>) -> Self {
21        Self {
22            provider,
23            event_stream: Some(event_stream),
24            runtime_id: None,
25            handle: None,
26            shutdown_grace: std::time::Duration::from_secs(10),
27        }
28    }
29
30    pub fn from_direct<T>(
31        provider: Arc<dyn TelemetryProvider>,
32        handle: &mut DirectAgentHandle<T>,
33    ) -> Self
34    where
35        T: AgentDeriveT + AgentExecutor + AgentHooks + Send + Sync + 'static,
36    {
37        let stream = handle.subscribe_events();
38        Self::new(provider, stream)
39    }
40
41    pub async fn from_environment(
42        provider: Arc<dyn TelemetryProvider>,
43        env: &mut Environment,
44        runtime_id: Option<RuntimeID>,
45    ) -> Result<Self, TelemetryError> {
46        let stream = env.subscribe_events(runtime_id).await?;
47        Ok(Self {
48            provider,
49            event_stream: Some(stream),
50            runtime_id,
51            handle: None,
52            shutdown_grace: std::time::Duration::from_secs(2),
53        })
54    }
55
56    pub fn with_shutdown_grace(mut self, duration: std::time::Duration) -> Self {
57        self.shutdown_grace = duration;
58        self
59    }
60
61    /// Start exporting spans and metrics from the configured event stream.
62    pub fn start(&mut self) -> Result<(), TelemetryError> {
63        if self.handle.is_some() {
64            return Err(TelemetryError::AlreadyStarted);
65        }
66
67        let event_stream = self
68            .event_stream
69            .take()
70            .ok_or(TelemetryError::MissingEventStream)?;
71        let config = self.provider_config();
72        let attributes = self.provider.attribute_provider();
73        let handle = start_telemetry(event_stream, config, attributes, self.shutdown_grace)?;
74        self.handle = Some(handle);
75        Ok(())
76    }
77
78    /// Flush and shut down exporters.
79    pub async fn shutdown(&mut self) -> Result<(), TelemetryError> {
80        if let Some(handle) = self.handle.take() {
81            handle.shutdown().await;
82        }
83        Ok(())
84    }
85
86    fn provider_config(&self) -> TelemetryConfig {
87        let mut config = self.provider.telemetry_config();
88        if let Some(runtime_id) = self.runtime_id {
89            config = config.with_runtime_id(runtime_id);
90        }
91        config
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use autoagents_core::utils::BoxEventStream;
99    use autoagents_protocol::Event;
100    use tokio::sync::mpsc;
101    use tokio::time::{Duration, timeout};
102    use tokio_stream::wrappers::ReceiverStream;
103
104    #[derive(Debug, Clone)]
105    struct TestProvider;
106
107    impl TelemetryProvider for TestProvider {
108        fn telemetry_config(&self) -> TelemetryConfig {
109            let mut config = TelemetryConfig::new("autoagents-test");
110            config.exporter.stdout = true;
111            config.metrics_enabled = false;
112            config.install_tracing_subscriber = false;
113            config
114        }
115    }
116
117    #[tokio::test]
118    async fn tracer_start_rejects_double_start() {
119        let (tx, rx) = mpsc::channel::<Event>(1);
120        let stream: BoxEventStream<Event> = Box::pin(ReceiverStream::new(rx));
121        let provider: Arc<dyn TelemetryProvider> = Arc::new(TestProvider);
122        let mut tracer = Tracer::new(provider, stream);
123
124        tracer.start().expect("start succeeds");
125        let err = tracer.start().expect_err("double start fails");
126        assert!(matches!(err, TelemetryError::AlreadyStarted));
127
128        drop(tx);
129        timeout(Duration::from_secs(2), tracer.shutdown())
130            .await
131            .expect("shutdown completes")
132            .expect("shutdown succeeds");
133    }
134
135    #[tokio::test]
136    async fn tracer_shutdown_is_idempotent() {
137        let (tx, rx) = mpsc::channel::<Event>(1);
138        let stream: BoxEventStream<Event> = Box::pin(ReceiverStream::new(rx));
139        let provider: Arc<dyn TelemetryProvider> = Arc::new(TestProvider);
140        let mut tracer = Tracer::new(provider, stream);
141
142        tracer.start().expect("start succeeds");
143        drop(tx);
144        timeout(Duration::from_secs(2), tracer.shutdown())
145            .await
146            .expect("shutdown completes")
147            .expect("shutdown succeeds");
148        timeout(Duration::from_secs(2), tracer.shutdown())
149            .await
150            .expect("shutdown completes")
151            .expect("shutdown is idempotent");
152    }
153}