Skip to main content

rolly_tokio/
lib.rs

1//! Tokio-based transport for the rolly observability crate.
2//!
3//! Provides [`TokioExporter`] for batching and shipping OTLP telemetry
4//! over HTTP, plus convenience functions for one-liner telemetry setup.
5
6mod exporter;
7
8// Re-export the public API from the exporter module
9pub use exporter::ExportMessage;
10pub use exporter::Exporter as TokioExporter;
11pub use exporter::ExporterConfig;
12pub use exporter::StartError;
13
14/// Guard that flushes pending telemetry on drop.
15///
16/// Hold this in your main function to ensure all spans are exported
17/// before shutdown. On a `current_thread` tokio runtime, `Drop` can
18/// only trigger a best-effort async drain; call [`TelemetryGuard::shutdown`]
19/// when you need deterministic delivery. Created by [`init_global_once`]
20/// or manually.
21pub struct TelemetryGuard {
22    exporter: Option<exporter::Exporter>,
23    task_handles: Vec<tokio::task::JoinHandle<()>>,
24    /// Stored so we can do one final metrics collect on shutdown.
25    metrics_flush: Option<MetricsFlushState>,
26}
27
28struct MetricsFlushState {
29    config: MetricsExportConfig,
30    sink: Arc<dyn TelemetrySink>,
31}
32
33impl From<exporter::Exporter> for TelemetryGuard {
34    fn from(exporter: exporter::Exporter) -> Self {
35        Self {
36            exporter: Some(exporter),
37            task_handles: Vec::new(),
38            metrics_flush: None,
39        }
40    }
41}
42
43impl TelemetryGuard {
44    /// Flush pending telemetry and stop background tasks.
45    ///
46    /// Prefer this over relying on `Drop` when running on a
47    /// `current_thread` tokio runtime and you need a deterministic drain.
48    pub async fn shutdown(mut self) {
49        self.abort_tasks();
50        if let Some(mf) = self.metrics_flush.take() {
51            if let Some(ref exporter) = self.exporter {
52                // Drain channel first to make room for final metrics
53                exporter.flush().await;
54                if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
55                    mf.sink.send_metrics(data);
56                }
57            }
58        }
59        if let Some(exporter) = self.exporter.take() {
60            exporter.flush().await;
61            exporter.shutdown().await;
62        }
63    }
64
65    fn abort_tasks(&mut self) {
66        for handle in self.task_handles.drain(..) {
67            handle.abort();
68        }
69    }
70
71    /// Collect any metrics still in the registry and send them through
72    /// the sink, so the last interval is not lost on shutdown.
73    fn final_metrics_flush(&self) {
74        if let Some(ref mf) = self.metrics_flush {
75            if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
76                mf.sink.send_metrics(data);
77            }
78        }
79    }
80}
81
82impl Drop for TelemetryGuard {
83    fn drop(&mut self) {
84        self.abort_tasks();
85        self.final_metrics_flush();
86        if let Some(ref exporter) = self.exporter {
87            let flush_shutdown = async {
88                exporter.flush().await;
89                exporter.shutdown().await;
90            };
91            // If we're inside a tokio runtime, use block_in_place to avoid
92            // the "cannot start a runtime from within a runtime" panic.
93            if let Ok(handle) = tokio::runtime::Handle::try_current() {
94                // block_in_place requires a multi-thread runtime; on
95                // current_thread it would deadlock. In that case, spawn
96                // the work and hope the runtime lives long enough.
97                if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread {
98                    tokio::task::block_in_place(|| {
99                        handle.block_on(flush_shutdown);
100                    });
101                } else {
102                    // current_thread runtime: spawn the flush and let it
103                    // complete on the next poll. We cannot block here.
104                    let exporter = exporter.clone();
105                    handle.spawn(async move {
106                        exporter.flush().await;
107                        exporter.shutdown().await;
108                    });
109                }
110            } else {
111                // No runtime active — create a temporary one for shutdown.
112                let rt = tokio::runtime::Builder::new_current_thread()
113                    .enable_all()
114                    .build();
115                if let Ok(rt) = rt {
116                    rt.block_on(flush_shutdown);
117                }
118            }
119        }
120    }
121}
122
123// Re-export everything from rolly core
124pub use rolly::*;
125
126#[cfg(feature = "_bench")]
127#[doc(hidden)]
128pub mod bench {
129    pub use crate::exporter::{ExportMessage, Exporter, ExporterConfig};
130    pub use rolly::bench::*;
131}
132
133use std::sync::Arc;
134use std::time::Duration;
135
136/// Errors returned by [`try_init_global`].
137#[derive(Debug)]
138#[non_exhaustive]
139pub enum InitError {
140    /// A global tracing subscriber is already set.
141    SubscriberAlreadySet(tracing_subscriber::util::TryInitError),
142    /// The exporter could not be started (e.g. no tokio runtime, TLS failure).
143    Exporter(StartError),
144}
145
146impl std::fmt::Display for InitError {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            Self::SubscriberAlreadySet(e) => write!(f, "global subscriber already set: {}", e),
150            Self::Exporter(e) => write!(f, "failed to start exporter: {}", e),
151        }
152    }
153}
154
155impl std::error::Error for InitError {
156    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
157        match self {
158            Self::SubscriberAlreadySet(e) => Some(e),
159            Self::Exporter(e) => Some(e),
160        }
161    }
162}
163
164impl From<tracing_subscriber::util::TryInitError> for InitError {
165    fn from(e: tracing_subscriber::util::TryInitError) -> Self {
166        Self::SubscriberAlreadySet(e)
167    }
168}
169
170impl From<StartError> for InitError {
171    fn from(e: StartError) -> Self {
172        Self::Exporter(e)
173    }
174}
175
176/// Initialize the full telemetry stack and set the global subscriber.
177///
178/// Creates a [`TokioExporter`], calls [`rolly::build_layer()`], installs the
179/// global subscriber, and spawns background tasks for metrics aggregation
180/// and USE polling.
181///
182/// # Panics
183///
184/// Panics if initialization fails for any reason. For fallible
185/// initialization, use [`try_init_global`].
186///
187/// # Requirements
188///
189/// Must be called from within a tokio runtime context.
190pub fn init_global_once(config: TelemetryConfig) -> TelemetryGuard {
191    match try_init_global(config) {
192        Ok(guard) => guard,
193        Err(InitError::SubscriberAlreadySet(_)) => {
194            tracing::warn!(
195                "rolly: global tracing subscriber already set, \
196                 skipping telemetry initialization"
197            );
198            TelemetryGuard {
199                exporter: None,
200                task_handles: Vec::new(),
201                metrics_flush: None,
202            }
203        }
204        Err(e) => panic!("failed to initialize telemetry: {}", e),
205    }
206}
207
208/// Same as [`init_global_once`], but returns an error instead of panicking.
209///
210/// # Errors
211///
212/// Returns [`InitError::Exporter`] if no tokio runtime is active or the
213/// HTTP client cannot be built. Returns [`InitError::SubscriberAlreadySet`]
214/// if a global tracing subscriber is already installed.
215pub fn try_init_global(config: TelemetryConfig) -> Result<TelemetryGuard, InitError> {
216    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
217
218    let export_traces = config.otlp_traces_endpoint.is_some();
219    let export_logs = config.otlp_logs_endpoint.is_some();
220    let export_metrics = config.otlp_metrics_endpoint.is_some();
221
222    // Reject zero-duration intervals that would panic in tokio::time::interval.
223    if let Some(interval) = config.use_metrics_interval {
224        if interval.is_zero() {
225            return Err(InitError::Exporter(StartError::InvalidConfig(
226                "use_metrics_interval must be > 0",
227            )));
228        }
229    }
230    if let Some(interval) = config.metrics_flush_interval {
231        if interval.is_zero() {
232            return Err(InitError::Exporter(StartError::InvalidConfig(
233                "metrics_flush_interval must be > 0",
234            )));
235        }
236    }
237
238    let metrics_url = config
239        .otlp_metrics_endpoint
240        .as_deref()
241        .map(|ep| format!("{}/v1/metrics", ep));
242
243    let exporter = if export_traces || export_logs || export_metrics {
244        let traces_url = config
245            .otlp_traces_endpoint
246            .as_deref()
247            .map(|ep| format!("{}/v1/traces", ep));
248        let logs_url = config
249            .otlp_logs_endpoint
250            .as_deref()
251            .map(|ep| format!("{}/v1/logs", ep));
252        Some(exporter::Exporter::start(ExporterConfig {
253            traces_url,
254            logs_url,
255            metrics_url,
256            backpressure_strategy: config.backpressure_strategy,
257            ..ExporterConfig::default()
258        })?)
259    } else {
260        None
261    };
262
263    let sink: Arc<dyn TelemetrySink> = match &exporter {
264        Some(exp) => Arc::new(exp.clone()),
265        None => Arc::new(NullSink),
266    };
267
268    let layer_config = LayerConfig {
269        log_to_stderr: config.log_to_stderr,
270        export_traces,
271        export_logs,
272        service_name: config.service_name.clone(),
273        service_version: config.service_version.clone(),
274        environment: config.environment.clone(),
275        resource_attributes: config.resource_attributes.clone(),
276        sampling_rate: config.sampling_rate.unwrap_or(1.0),
277        ..LayerConfig::default()
278    };
279
280    let layer = rolly::build_layer(&layer_config, sink.clone());
281
282    tracing_subscriber::registry().with(layer).try_init()?;
283
284    tracing::info!(
285        service.name = layer_config.service_name.as_str(),
286        service.version = layer_config.service_version.as_str(),
287        environment = layer_config.environment.as_str(),
288        "telemetry initialized"
289    );
290
291    let mut task_handles = Vec::new();
292
293    // Spawn USE metrics polling (Linux only)
294    #[cfg(target_os = "linux")]
295    if let Some(interval) = config.use_metrics_interval {
296        let handle = tokio::spawn(async move {
297            let mut state = UseMetricsState::default();
298            let mut ticker = tokio::time::interval(interval);
299            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
300            ticker.tick().await;
301            loop {
302                ticker.tick().await;
303                rolly::collect_use_metrics(&mut state);
304            }
305        });
306        task_handles.push(handle);
307    }
308
309    // Spawn metrics aggregation loop
310    let metrics_flush = if export_metrics {
311        let flush_interval = config
312            .metrics_flush_interval
313            .unwrap_or(Duration::from_secs(10));
314        let metrics_config = MetricsExportConfig {
315            service_name: config.service_name,
316            service_version: config.service_version,
317            environment: config.environment,
318            resource_attributes: config.resource_attributes,
319            scope_name: "rolly".to_string(),
320            scope_version: env!("CARGO_PKG_VERSION").to_string(),
321            start_time: std::time::SystemTime::now()
322                .duration_since(std::time::UNIX_EPOCH)
323                .unwrap_or_default()
324                .as_nanos() as u64,
325        };
326        let guard_state = MetricsFlushState {
327            config: metrics_config.clone(),
328            sink: sink.clone(),
329        };
330        let handle = spawn_metrics_loop(metrics_config, sink, flush_interval);
331        task_handles.push(handle);
332        Some(guard_state)
333    } else {
334        None
335    };
336
337    Ok(TelemetryGuard {
338        exporter,
339        task_handles,
340        metrics_flush,
341    })
342}
343
344/// Start the metrics aggregation loop as a background tokio task.
345///
346/// Calls [`rolly::collect_and_encode_metrics()`] on the configured
347/// interval and sends results through the provided sink.
348///
349/// Returns a `JoinHandle` the caller can use to abort the task.
350pub fn spawn_metrics_loop(
351    config: MetricsExportConfig,
352    sink: Arc<dyn TelemetrySink>,
353    interval: Duration,
354) -> tokio::task::JoinHandle<()> {
355    tokio::spawn(async move {
356        let mut ticker = tokio::time::interval(interval);
357        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
358        ticker.tick().await;
359        loop {
360            ticker.tick().await;
361            if let Some(data) = rolly::collect_and_encode_metrics(&config) {
362                sink.send_metrics(data);
363            }
364        }
365    })
366}