Skip to main content

autoagents_telemetry/runner/
handle.rs

1use opentelemetry_sdk::metrics::SdkMeterProvider;
2use opentelemetry_sdk::trace::SdkTracerProvider;
3use std::time::Duration;
4use tokio::sync::watch;
5use tokio::task::JoinHandle;
6use tokio::time::timeout;
7
8/// Holds exporter state and the mapper task for graceful shutdown.
9pub struct TelemetryHandle {
10    pub(crate) task: Option<JoinHandle<()>>,
11    pub(crate) tracer_provider: SdkTracerProvider,
12    pub(crate) meter_provider: Option<SdkMeterProvider>,
13    pub(crate) shutdown_tx: Option<watch::Sender<bool>>,
14}
15
16impl TelemetryHandle {
17    pub async fn shutdown(mut self) {
18        const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
19        const OTEL_TIMEOUT: Duration = Duration::from_secs(2);
20
21        if let Some(shutdown_tx) = self.shutdown_tx.take() {
22            let _ = shutdown_tx.send(true);
23        }
24
25        if let Some(task) = self.task.take() {
26            let mut task = task;
27            if timeout(SHUTDOWN_TIMEOUT, &mut task).await.is_err() {
28                // If the mapper doesn't exit in time, abort to avoid hanging shutdown.
29                task.abort();
30                let _ = task.await;
31            }
32        }
33
34        let tracer_provider = self.tracer_provider;
35        let _ = timeout(
36            OTEL_TIMEOUT,
37            tokio::task::spawn_blocking(move || {
38                let _ = tracer_provider.force_flush();
39                let _ = tracer_provider.shutdown();
40            }),
41        )
42        .await;
43
44        if let Some(meter_provider) = self.meter_provider {
45            let _ = timeout(
46                OTEL_TIMEOUT,
47                tokio::task::spawn_blocking(move || {
48                    let _ = meter_provider.force_flush();
49                    let _ = meter_provider.shutdown();
50                }),
51            )
52            .await;
53        }
54    }
55}