autoagents_telemetry/
tracer.rs1use autoagents_core::agent::{AgentDeriveT, AgentExecutor, AgentHooks, DirectAgentHandle};
2use autoagents_core::environment::Environment;
3use autoagents_core::utils::BoxEventStream;
4use autoagents_protocol::{Event, RuntimeID};
5use std::sync::Arc;
6
7use crate::runner::start_telemetry;
8use crate::{TelemetryConfig, TelemetryError, TelemetryHandle, TelemetryProvider};
9
10pub struct Tracer {
12 provider: Arc<dyn TelemetryProvider>,
13 event_stream: Option<BoxEventStream<Event>>,
14 runtime_id: Option<RuntimeID>,
15 handle: Option<TelemetryHandle>,
16 shutdown_grace: std::time::Duration,
17}
18
19impl Tracer {
20 pub fn new(provider: Arc<dyn TelemetryProvider>, event_stream: BoxEventStream<Event>) -> Self {
21 Self {
22 provider,
23 event_stream: Some(event_stream),
24 runtime_id: None,
25 handle: None,
26 shutdown_grace: std::time::Duration::from_secs(10),
27 }
28 }
29
30 pub fn from_direct<T>(
31 provider: Arc<dyn TelemetryProvider>,
32 handle: &mut DirectAgentHandle<T>,
33 ) -> Self
34 where
35 T: AgentDeriveT + AgentExecutor + AgentHooks + Send + Sync + 'static,
36 {
37 let stream = handle.subscribe_events();
38 Self::new(provider, stream)
39 }
40
41 pub async fn from_environment(
42 provider: Arc<dyn TelemetryProvider>,
43 env: &mut Environment,
44 runtime_id: Option<RuntimeID>,
45 ) -> Result<Self, TelemetryError> {
46 let stream = env.subscribe_events(runtime_id).await?;
47 Ok(Self {
48 provider,
49 event_stream: Some(stream),
50 runtime_id,
51 handle: None,
52 shutdown_grace: std::time::Duration::from_secs(2),
53 })
54 }
55
56 pub fn with_shutdown_grace(mut self, duration: std::time::Duration) -> Self {
57 self.shutdown_grace = duration;
58 self
59 }
60
61 pub fn start(&mut self) -> Result<(), TelemetryError> {
63 if self.handle.is_some() {
64 return Err(TelemetryError::AlreadyStarted);
65 }
66
67 let event_stream = self
68 .event_stream
69 .take()
70 .ok_or(TelemetryError::MissingEventStream)?;
71 let config = self.provider_config();
72 let attributes = self.provider.attribute_provider();
73 let handle = start_telemetry(event_stream, config, attributes, self.shutdown_grace)?;
74 self.handle = Some(handle);
75 Ok(())
76 }
77
78 pub async fn shutdown(&mut self) -> Result<(), TelemetryError> {
80 if let Some(handle) = self.handle.take() {
81 handle.shutdown().await;
82 }
83 Ok(())
84 }
85
86 fn provider_config(&self) -> TelemetryConfig {
87 let mut config = self.provider.telemetry_config();
88 if let Some(runtime_id) = self.runtime_id {
89 config = config.with_runtime_id(runtime_id);
90 }
91 config
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use autoagents_core::utils::BoxEventStream;
99 use autoagents_protocol::Event;
100 use tokio::sync::mpsc;
101 use tokio::time::{Duration, timeout};
102 use tokio_stream::wrappers::ReceiverStream;
103
104 #[derive(Debug, Clone)]
105 struct TestProvider;
106
107 impl TelemetryProvider for TestProvider {
108 fn telemetry_config(&self) -> TelemetryConfig {
109 let mut config = TelemetryConfig::new("autoagents-test");
110 config.exporter.stdout = true;
111 config.metrics_enabled = false;
112 config.install_tracing_subscriber = false;
113 config
114 }
115 }
116
117 #[tokio::test]
118 async fn tracer_start_rejects_double_start() {
119 let (tx, rx) = mpsc::channel::<Event>(1);
120 let stream: BoxEventStream<Event> = Box::pin(ReceiverStream::new(rx));
121 let provider: Arc<dyn TelemetryProvider> = Arc::new(TestProvider);
122 let mut tracer = Tracer::new(provider, stream);
123
124 tracer.start().expect("start succeeds");
125 let err = tracer.start().expect_err("double start fails");
126 assert!(matches!(err, TelemetryError::AlreadyStarted));
127
128 drop(tx);
129 timeout(Duration::from_secs(2), tracer.shutdown())
130 .await
131 .expect("shutdown completes")
132 .expect("shutdown succeeds");
133 }
134
135 #[tokio::test]
136 async fn tracer_shutdown_is_idempotent() {
137 let (tx, rx) = mpsc::channel::<Event>(1);
138 let stream: BoxEventStream<Event> = Box::pin(ReceiverStream::new(rx));
139 let provider: Arc<dyn TelemetryProvider> = Arc::new(TestProvider);
140 let mut tracer = Tracer::new(provider, stream);
141
142 tracer.start().expect("start succeeds");
143 drop(tx);
144 timeout(Duration::from_secs(2), tracer.shutdown())
145 .await
146 .expect("shutdown completes")
147 .expect("shutdown succeeds");
148 timeout(Duration::from_secs(2), tracer.shutdown())
149 .await
150 .expect("shutdown completes")
151 .expect("shutdown is idempotent");
152 }
153}