1mod exporter;
7
8pub use exporter::ExportMessage;
10pub use exporter::Exporter as TokioExporter;
11pub use exporter::ExporterConfig;
12pub use exporter::StartError;
13
14pub struct TelemetryGuard {
22 exporter: Option<exporter::Exporter>,
23 task_handles: Vec<tokio::task::JoinHandle<()>>,
24 metrics_flush: Option<MetricsFlushState>,
26}
27
28struct MetricsFlushState {
29 config: MetricsExportConfig,
30 sink: Arc<dyn TelemetrySink>,
31}
32
33impl From<exporter::Exporter> for TelemetryGuard {
34 fn from(exporter: exporter::Exporter) -> Self {
35 Self {
36 exporter: Some(exporter),
37 task_handles: Vec::new(),
38 metrics_flush: None,
39 }
40 }
41}
42
43impl TelemetryGuard {
44 pub async fn shutdown(mut self) {
49 self.abort_tasks();
50 if let Some(mf) = self.metrics_flush.take() {
51 if let Some(ref exporter) = self.exporter {
52 exporter.flush().await;
54 if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
55 mf.sink.send_metrics(data);
56 }
57 }
58 }
59 if let Some(exporter) = self.exporter.take() {
60 exporter.flush().await;
61 exporter.shutdown().await;
62 }
63 }
64
65 fn abort_tasks(&mut self) {
66 for handle in self.task_handles.drain(..) {
67 handle.abort();
68 }
69 }
70
71 fn final_metrics_flush(&self) {
74 if let Some(ref mf) = self.metrics_flush {
75 if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
76 mf.sink.send_metrics(data);
77 }
78 }
79 }
80}
81
82impl Drop for TelemetryGuard {
83 fn drop(&mut self) {
84 self.abort_tasks();
85 self.final_metrics_flush();
86 if let Some(ref exporter) = self.exporter {
87 let flush_shutdown = async {
88 exporter.flush().await;
89 exporter.shutdown().await;
90 };
91 if let Ok(handle) = tokio::runtime::Handle::try_current() {
94 if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread {
98 tokio::task::block_in_place(|| {
99 handle.block_on(flush_shutdown);
100 });
101 } else {
102 let exporter = exporter.clone();
105 handle.spawn(async move {
106 exporter.flush().await;
107 exporter.shutdown().await;
108 });
109 }
110 } else {
111 let rt = tokio::runtime::Builder::new_current_thread()
113 .enable_all()
114 .build();
115 if let Ok(rt) = rt {
116 rt.block_on(flush_shutdown);
117 }
118 }
119 }
120 }
121}
122
123pub use rolly::*;
125
126#[cfg(feature = "_bench")]
127#[doc(hidden)]
128pub mod bench {
129 pub use crate::exporter::{ExportMessage, Exporter, ExporterConfig};
130 pub use rolly::bench::*;
131}
132
133use std::sync::Arc;
134use std::time::Duration;
135
136#[derive(Debug)]
138#[non_exhaustive]
139pub enum InitError {
140 SubscriberAlreadySet(tracing_subscriber::util::TryInitError),
142 Exporter(StartError),
144}
145
146impl std::fmt::Display for InitError {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 Self::SubscriberAlreadySet(e) => write!(f, "global subscriber already set: {}", e),
150 Self::Exporter(e) => write!(f, "failed to start exporter: {}", e),
151 }
152 }
153}
154
155impl std::error::Error for InitError {
156 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
157 match self {
158 Self::SubscriberAlreadySet(e) => Some(e),
159 Self::Exporter(e) => Some(e),
160 }
161 }
162}
163
164impl From<tracing_subscriber::util::TryInitError> for InitError {
165 fn from(e: tracing_subscriber::util::TryInitError) -> Self {
166 Self::SubscriberAlreadySet(e)
167 }
168}
169
170impl From<StartError> for InitError {
171 fn from(e: StartError) -> Self {
172 Self::Exporter(e)
173 }
174}
175
176pub fn init_global_once(config: TelemetryConfig) -> TelemetryGuard {
191 match try_init_global(config) {
192 Ok(guard) => guard,
193 Err(InitError::SubscriberAlreadySet(_)) => {
194 tracing::warn!(
195 "rolly: global tracing subscriber already set, \
196 skipping telemetry initialization"
197 );
198 TelemetryGuard {
199 exporter: None,
200 task_handles: Vec::new(),
201 metrics_flush: None,
202 }
203 }
204 Err(e) => panic!("failed to initialize telemetry: {}", e),
205 }
206}
207
208pub fn try_init_global(config: TelemetryConfig) -> Result<TelemetryGuard, InitError> {
216 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
217
218 let export_traces = config.otlp_traces_endpoint.is_some();
219 let export_logs = config.otlp_logs_endpoint.is_some();
220 let export_metrics = config.otlp_metrics_endpoint.is_some();
221
222 if let Some(interval) = config.use_metrics_interval {
224 if interval.is_zero() {
225 return Err(InitError::Exporter(StartError::InvalidConfig(
226 "use_metrics_interval must be > 0",
227 )));
228 }
229 }
230 if let Some(interval) = config.metrics_flush_interval {
231 if interval.is_zero() {
232 return Err(InitError::Exporter(StartError::InvalidConfig(
233 "metrics_flush_interval must be > 0",
234 )));
235 }
236 }
237
238 let metrics_url = config
239 .otlp_metrics_endpoint
240 .as_deref()
241 .map(|ep| format!("{}/v1/metrics", ep));
242
243 let exporter = if export_traces || export_logs || export_metrics {
244 let traces_url = config
245 .otlp_traces_endpoint
246 .as_deref()
247 .map(|ep| format!("{}/v1/traces", ep));
248 let logs_url = config
249 .otlp_logs_endpoint
250 .as_deref()
251 .map(|ep| format!("{}/v1/logs", ep));
252 Some(exporter::Exporter::start(ExporterConfig {
253 traces_url,
254 logs_url,
255 metrics_url,
256 backpressure_strategy: config.backpressure_strategy,
257 ..ExporterConfig::default()
258 })?)
259 } else {
260 None
261 };
262
263 let sink: Arc<dyn TelemetrySink> = match &exporter {
264 Some(exp) => Arc::new(exp.clone()),
265 None => Arc::new(NullSink),
266 };
267
268 let layer_config = LayerConfig {
269 log_to_stderr: config.log_to_stderr,
270 export_traces,
271 export_logs,
272 service_name: config.service_name.clone(),
273 service_version: config.service_version.clone(),
274 environment: config.environment.clone(),
275 resource_attributes: config.resource_attributes.clone(),
276 sampling_rate: config.sampling_rate.unwrap_or(1.0),
277 ..LayerConfig::default()
278 };
279
280 let layer = rolly::build_layer(&layer_config, sink.clone());
281
282 tracing_subscriber::registry().with(layer).try_init()?;
283
284 tracing::info!(
285 service.name = layer_config.service_name.as_str(),
286 service.version = layer_config.service_version.as_str(),
287 environment = layer_config.environment.as_str(),
288 "telemetry initialized"
289 );
290
291 let mut task_handles = Vec::new();
292
293 #[cfg(target_os = "linux")]
295 if let Some(interval) = config.use_metrics_interval {
296 let handle = tokio::spawn(async move {
297 let mut state = UseMetricsState::default();
298 let mut ticker = tokio::time::interval(interval);
299 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
300 ticker.tick().await;
301 loop {
302 ticker.tick().await;
303 rolly::collect_use_metrics(&mut state);
304 }
305 });
306 task_handles.push(handle);
307 }
308
309 let metrics_flush = if export_metrics {
311 let flush_interval = config
312 .metrics_flush_interval
313 .unwrap_or(Duration::from_secs(10));
314 let metrics_config = MetricsExportConfig {
315 service_name: config.service_name,
316 service_version: config.service_version,
317 environment: config.environment,
318 resource_attributes: config.resource_attributes,
319 scope_name: "rolly".to_string(),
320 scope_version: env!("CARGO_PKG_VERSION").to_string(),
321 start_time: std::time::SystemTime::now()
322 .duration_since(std::time::UNIX_EPOCH)
323 .unwrap_or_default()
324 .as_nanos() as u64,
325 };
326 let guard_state = MetricsFlushState {
327 config: metrics_config.clone(),
328 sink: sink.clone(),
329 };
330 let handle = spawn_metrics_loop(metrics_config, sink, flush_interval);
331 task_handles.push(handle);
332 Some(guard_state)
333 } else {
334 None
335 };
336
337 Ok(TelemetryGuard {
338 exporter,
339 task_handles,
340 metrics_flush,
341 })
342}
343
344pub fn spawn_metrics_loop(
351 config: MetricsExportConfig,
352 sink: Arc<dyn TelemetrySink>,
353 interval: Duration,
354) -> tokio::task::JoinHandle<()> {
355 tokio::spawn(async move {
356 let mut ticker = tokio::time::interval(interval);
357 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
358 ticker.tick().await;
359 loop {
360 ticker.tick().await;
361 if let Some(data) = rolly::collect_and_encode_metrics(&config) {
362 sink.send_metrics(data);
363 }
364 }
365 })
366}