rust_telemetry/
lib.rs

1//! OpenTelemetry initialization
2//!
3//! Lib containing the definitions and initializations of the OpenTelemetry
4//! tools
5#![cfg_attr(all(doc, not(doctest)), feature(doc_auto_cfg))]
6use std::{collections::BTreeMap as Map, str::FromStr as _};
7
8use config::{OtelConfig, OtelUrl, StdoutLogsConfig};
9use opentelemetry::{
10	trace::{TraceError, TracerProvider as _},
11	KeyValue,
12};
13use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
14use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _};
15use opentelemetry_sdk::{
16	logs::{LogError, LoggerProvider},
17	metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider},
18	propagation::TraceContextPropagator,
19	runtime,
20	trace::{RandomIdGenerator, TracerProvider},
21	Resource,
22};
23use opentelemetry_semantic_conventions::{
24	resource::{SERVICE_NAME, SERVICE_VERSION},
25	SCHEMA_URL,
26};
27use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
28use tracing_subscriber::{
29	layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer,
30};
31
32#[cfg(feature = "axum")]
33pub mod axum;
34pub mod config;
35#[cfg(feature = "reqwest-middleware")]
36pub mod reqwest_middleware;
37
38/// Crates a resource for the Otel providers
39fn mk_resource(
40	service_name: &'static str,
41	version: &'static str,
42	resource_metadata: Map<String, String>,
43) -> Resource {
44	Resource::from_schema_url(
45		[KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, version)]
46			.into_iter()
47			.chain(resource_metadata.into_iter().map(|(k, v)| KeyValue::new(k, v))),
48		SCHEMA_URL,
49	)
50}
51
52/// Setup a Otel exporter and a provider for traces
53fn init_traces(endpoint: OtelUrl, resource: Resource) -> Result<TracerProvider, TraceError> {
54	let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint.url).build()?;
55	let tracer_provider = TracerProvider::builder()
56		.with_id_generator(RandomIdGenerator::default())
57		.with_resource(resource)
58		.with_batch_exporter(exporter, runtime::Tokio)
59		.build();
60
61	opentelemetry::global::set_tracer_provider(tracer_provider.clone());
62	Ok(tracer_provider)
63}
64
65/// Setup a Otel exporter and a provider for metrics
66fn init_metrics(endpoint: OtelUrl, resource: Resource) -> Result<SdkMeterProvider, MetricError> {
67	let exporter = opentelemetry_otlp::MetricExporter::builder()
68		.with_tonic()
69		.with_endpoint(endpoint.url)
70		.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
71		.build()?;
72
73	let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
74
75	let meter_provider =
76		MeterProviderBuilder::default().with_resource(resource).with_reader(reader).build();
77
78	opentelemetry::global::set_meter_provider(meter_provider.clone());
79	Ok(meter_provider)
80}
81
82/// Setup a Otel exporter and a provider for logs
83fn init_logs(endpoint: OtelUrl, resource: Resource) -> Result<LoggerProvider, LogError> {
84	let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint.url).build()?;
85
86	Ok(LoggerProvider::builder()
87		.with_resource(resource)
88		.with_batch_exporter(exporter, runtime::Tokio)
89		.build())
90}
91
92/// Initializes the OpenTelemetry
93///
94/// example
95/// ```rust
96/// use rust_telemetry::{config::OtelConfig, init_otel};
97///
98/// #[tokio::main]
99/// async fn main() {
100/// 	let _guard = init_otel!(&OtelConfig::default());
101///
102/// 	// ...
103/// }
104/// ```
105#[macro_export]
106macro_rules! init_otel {
107	($config:expr) => {
108		$crate::init_otel(
109			$config,
110			env!("CARGO_CRATE_NAME"),
111			env!("CARGO_PKG_NAME"),
112			env!("CARGO_PKG_VERSION"),
113		)
114	};
115}
116
117/// Initializes the OpenTelemetry
118///
119/// example
120/// ```rust
121/// use rust_telemetry::config;
122///
123/// #[tokio::main]
124/// async fn main() {
125/// 	let _guard = rust_telemetry::init_otel(
126/// 		&config::OtelConfig::default(),
127/// 		env!("CARGO_CRATE_NAME"),
128/// 		env!("CARGO_PKG_NAME"),
129/// 		env!("CARGO_PKG_VERSION"),
130/// 	);
131///
132/// 	// ...
133/// }
134/// ```
135#[must_use = "The return is a guard for the providers and it need to be kept to properly shutdown them"]
136pub fn init_otel(
137	config: &OtelConfig,
138	main_crate: &'static str,
139	service_name: &'static str,
140	pkg_version: &'static str,
141) -> Result<ProvidersGuard, OtelInitError> {
142	opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default());
143
144	let stdout_layer = config
145		.stdout
146		.as_ref()
147		.or(Some(&StdoutLogsConfig::default()))
148		.and_then(|stdout| stdout.enabled.then_some(stdout))
149		.map(|logger_config| {
150			let filter_fmt = EnvFilter::from_str(&logger_config.get_filter(main_crate))?;
151			let stdout_layer = tracing_subscriber::fmt::layer().with_thread_names(true);
152			Ok::<_, OtelInitError>(
153				if logger_config.json_output {
154					Box::new(stdout_layer.json()) as Box<dyn Layer<_> + Send + Sync>
155				} else {
156					Box::new(stdout_layer)
157				}
158				.with_filter(filter_fmt),
159			)
160		})
161		.transpose()?;
162
163	let exporter_with_resource = config.exporter.as_ref().map(|exporter| {
164		(exporter, mk_resource(service_name, pkg_version, exporter.resource_metadata.clone()))
165	});
166
167	let (logger_provider, logs_layer) = exporter_with_resource
168		.as_ref()
169		.and_then(|(exporter, resource)| {
170			exporter.logs.as_ref().and_then(|c| c.enabled.then_some(c)).map(|logger_config| {
171				let filter_otel = EnvFilter::from_str(&logger_config.get_filter(main_crate))?;
172				let logger_provider = init_logs(exporter.endpoint.clone(), resource.clone())?;
173
174				// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
175				let logs_layer =
176					OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel);
177
178				Ok::<_, OtelInitError>((Some(logger_provider), Some(logs_layer)))
179			})
180		})
181		.transpose()?
182		.unwrap_or((None, None));
183
184	let (tracer_provider, tracer_layer) = exporter_with_resource
185		.as_ref()
186		.and_then(|(exporter, resource)| {
187			exporter.traces.as_ref().and_then(|c| c.enabled.then_some(c)).map(|tracer_config| {
188				let trace_filter = EnvFilter::from_str(&tracer_config.get_filter(main_crate))?;
189				let tracer_provider = init_traces(exporter.endpoint.clone(), resource.clone())?;
190				let tracer = tracer_provider.tracer(service_name);
191				let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter);
192				Ok::<_, OtelInitError>((Some(tracer_provider), Some(tracer_layer)))
193			})
194		})
195		.transpose()?
196		.unwrap_or((None, None));
197
198	let (meter_provider, meter_layer) = exporter_with_resource
199		.as_ref()
200		.and_then(|(exporter, resource)| {
201			exporter.metrics.as_ref().and_then(|c| c.enabled.then_some(c)).map(|meter_config| {
202				let metrics_filter = EnvFilter::from_str(&meter_config.get_filter(main_crate))?;
203				let meter_provider = init_metrics(exporter.endpoint.clone(), resource.clone())?;
204				let meter_layer =
205					MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter);
206
207				Ok::<_, OtelInitError>((Some(meter_provider), Some(meter_layer)))
208			})
209		})
210		.transpose()?
211		.unwrap_or((None, None));
212
213	// Initialize the tracing subscriber with the stdout layer and
214	// layers for exporting over OpenTelemetry the logs, traces and metrics.
215	tracing_subscriber::registry()
216		.with(logs_layer)
217		.with(stdout_layer)
218		.with(meter_layer)
219		.with(tracer_layer)
220		.init();
221
222	Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider })
223}
224
225/// Guarding object to make sure the providers are properly shutdown
226#[derive(Debug)]
227pub struct ProvidersGuard {
228	/// Logger provider
229	logger_provider: Option<LoggerProvider>,
230	/// Tracer provider
231	tracer_provider: Option<TracerProvider>,
232	/// Meter provider
233	meter_provider: Option<SdkMeterProvider>,
234}
235
236// Necessary to call TracerProvider::shutdown() on exit
237// due to a bug with flushing on global shutdown:
238// https://github.com/open-telemetry/opentelemetry-rust/issues/1961
239impl Drop for ProvidersGuard {
240	fn drop(&mut self) {
241		// This causes a hang in testing.
242		// Some relevant information:
243		// https://github.com/open-telemetry/opentelemetry-rust/issues/536
244		#[cfg(not(test))]
245		{
246			if let Some(logger_provider) = self.logger_provider.as_ref() {
247				let _ = logger_provider.shutdown().inspect_err(|err| {
248					tracing::error!("Could not shutdown LoggerProvider: {err}");
249				});
250			}
251			if let Some(tracer_provider) = self.tracer_provider.as_ref() {
252				let _ = tracer_provider.shutdown().inspect_err(|err| {
253					tracing::error!("Could not shutdown TracerProvider: {err}");
254				});
255			}
256			if let Some(meter_provider) = self.meter_provider.as_ref() {
257				let _ = meter_provider.shutdown().inspect_err(|err| {
258					tracing::error!("Could not shutdown MeterProvider: {err}");
259				});
260			}
261		}
262	}
263}
264
265/// OpenTelemetry setup errors
266#[allow(missing_docs)]
267#[derive(Debug, thiserror::Error)]
268pub enum OtelInitError {
269	#[error("Logger initialization error: {0}")]
270	LoggerInitError(#[from] LogError),
271	#[error("Tracer initialization error: {0}")]
272	TracerInitError(#[from] TraceError),
273	#[error("Meter initialization error: {0}")]
274	MeterInitError(#[from] MetricError),
275	#[error("Parsing EnvFilter directives error: {0}")]
276	EnvFilterError(#[from] tracing_subscriber::filter::ParseError),
277}
278
279#[cfg(test)]
280mod tests {
281	#![allow(clippy::expect_used)]
282	use super::{
283		config::{ExporterConfig, OtelConfig, ProviderConfig},
284		init_otel,
285	};
286	use crate::config::StdoutLogsConfig;
287
288	#[tokio::test]
289	async fn test_tracer_provider_enabled() {
290		let config = OtelConfig {
291			stdout: None,
292			exporter: Some(ExporterConfig {
293				traces: Some(ProviderConfig { enabled: true, ..Default::default() }),
294				..Default::default()
295			}),
296		};
297		let guard = init_otel!(&config).expect("Error initializing Otel");
298		assert!(guard.tracer_provider.is_some());
299	}
300	#[tokio::test]
301	async fn test_tracer_provider_disabled() {
302		let config_enabled_false = OtelConfig {
303			stdout: None,
304			exporter: Some(ExporterConfig {
305				traces: Some(ProviderConfig { enabled: false, ..Default::default() }),
306				..Default::default()
307			}),
308		};
309		let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
310		assert!(guard.tracer_provider.is_none());
311	}
312
313	// There seems to be a problem when testing the scenario when the meter
314	// provider is enable. The tests hangs when calling the shutdown from the
315	// PeriodicReader. For now we won't test this scenarios
316	//https://github.com/open-telemetry/opentelemetry-rust/issues/2056
317
318	#[tokio::test]
319	async fn test_meter_provider_disabled() {
320		let config_enabled_false = OtelConfig {
321			stdout: None,
322			exporter: Some(ExporterConfig {
323				metrics: Some(ProviderConfig { enabled: false, ..Default::default() }),
324				..Default::default()
325			}),
326		};
327		let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
328		assert!(guard.meter_provider.is_none());
329	}
330	#[tokio::test]
331	async fn test_logger_provider_enabled() {
332		let config = OtelConfig {
333			stdout: None,
334			exporter: Some(ExporterConfig {
335				logs: Some(ProviderConfig { enabled: true, ..Default::default() }),
336				..Default::default()
337			}),
338		};
339		let guard = init_otel!(&config).expect("Error initializing Otel");
340		assert!(guard.logger_provider.is_some());
341	}
342	#[tokio::test]
343	async fn test_logger_provider_disabled() {
344		let config_enabled_false = OtelConfig {
345			stdout: None,
346			exporter: Some(ExporterConfig {
347				logs: Some(ProviderConfig { enabled: false, ..Default::default() }),
348				..Default::default()
349			}),
350		};
351		let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
352		assert!(guard.logger_provider.is_none());
353	}
354
355	#[tokio::test]
356	async fn test_exporter_config_none() {
357		let config_none = OtelConfig {
358			stdout: Some(StdoutLogsConfig { enabled: true, ..Default::default() }),
359			exporter: Some(ExporterConfig::default()),
360		};
361		let guard = init_otel!(&config_none).expect("Error initializing Otel");
362		assert!(guard.meter_provider.is_none());
363		assert!(guard.tracer_provider.is_none());
364		assert!(guard.logger_provider.is_none());
365	}
366}