opentelemetry_testing/
lib.rs1#![doc = include_str!("../README.md")]
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::time::Duration;
6
7use opentelemetry::trace::TracerProvider;
8use opentelemetry::{InstrumentationScope, KeyValue};
9use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
10use opentelemetry_otlp::WithExportConfig;
11use opentelemetry_sdk::Resource;
12use opentelemetry_sdk::logs::SdkLoggerProvider;
13use opentelemetry_sdk::metrics::SdkMeterProvider;
14use opentelemetry_sdk::trace::{BatchSpanProcessor, SdkTracerProvider};
15use opentelemetry_semantic_conventions::attribute as semver;
16use testcontainers::core::{AccessMode, ContainerPort, Mount, WaitFor};
17use testcontainers::runners::AsyncRunner;
18use testcontainers::{GenericImage, ImageExt};
19use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
20use tracing_subscriber::EnvFilter;
21use tracing_subscriber::layer::SubscriberExt;
22use tracing_subscriber::util::SubscriberInitExt;
23
24const SERVICE_NAME: &str = env!("CARGO_PKG_NAME");
25const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
26
27struct OpenTelemetryBuilder {
28 otel_collector_endpoint: Cow<'static, str>,
29 otel_internal_level: Cow<'static, str>,
30}
31
32impl OpenTelemetryBuilder {
33 fn attributes(&self) -> impl IntoIterator<Item = KeyValue> {
34 [
35 KeyValue::new(semver::SERVICE_NAME, SERVICE_NAME),
36 KeyValue::new(semver::SERVICE_VERSION, SERVICE_VERSION),
37 KeyValue::new("deployment.environment", "test"),
38 ]
39 }
40
41 fn resources(&self) -> Resource {
42 self.attributes()
43 .into_iter()
44 .fold(Resource::builder(), |res, attr| res.with_attribute(attr))
45 .build()
46 }
47
48 fn metric_provider(&self) -> anyhow::Result<SdkMeterProvider> {
49 let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
50 .with_tonic()
51 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
52 .with_endpoint(self.otel_collector_endpoint.as_ref())
53 .build()?;
54
55 Ok(opentelemetry_sdk::metrics::MeterProviderBuilder::default()
56 .with_periodic_exporter(metric_exporter)
57 .with_resource(self.resources())
58 .build())
59 }
60
61 fn trace_provider(&self) -> anyhow::Result<SdkTracerProvider> {
62 let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
63 .with_tonic()
64 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
65 .with_endpoint(self.otel_collector_endpoint.as_ref())
66 .build()?;
67
68 let trace_processor = BatchSpanProcessor::builder(trace_exporter).build();
69
70 Ok(opentelemetry_sdk::trace::TracerProviderBuilder::default()
71 .with_span_processor(trace_processor)
72 .with_resource(self.resources())
73 .build())
74 }
75
76 fn logger_provider(&self) -> anyhow::Result<SdkLoggerProvider> {
77 let log_exporter = opentelemetry_otlp::LogExporter::builder()
78 .with_tonic()
79 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
80 .with_endpoint(self.otel_collector_endpoint.as_ref())
81 .build()?;
82
83 Ok(opentelemetry_sdk::logs::SdkLoggerProvider::builder()
84 .with_resource(self.resources())
85 .with_batch_exporter(log_exporter)
86 .build())
87 }
88
89 pub fn build(self) -> anyhow::Result<OpenTelemetryProvider> {
90 let scope = InstrumentationScope::builder(SERVICE_NAME)
91 .with_version(SERVICE_VERSION)
92 .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
93 .with_attributes(self.attributes())
94 .build();
95
96 Ok(OpenTelemetryProvider {
97 metric: self.metric_provider()?,
98 trace: self.trace_provider()?,
99 logger: self.logger_provider()?,
100 scope,
101 internal_level: self.otel_internal_level,
102 })
103 }
104}
105
106pub struct OpenTelemetryProvider {
107 internal_level: Cow<'static, str>,
108 metric: SdkMeterProvider,
109 trace: SdkTracerProvider,
110 logger: SdkLoggerProvider,
111 scope: InstrumentationScope,
112}
113
114impl std::fmt::Debug for OpenTelemetryProvider {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct(stringify!(OpenTelemetryProvider))
117 .finish_non_exhaustive()
118 }
119}
120
121impl OpenTelemetryProvider {
122 pub fn install(&self) -> anyhow::Result<()> {
123 opentelemetry::global::set_meter_provider(self.metric.clone());
124 opentelemetry::global::set_tracer_provider(self.trace.clone());
125 opentelemetry::global::set_text_map_propagator(
126 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
127 );
128
129 let tracer = self.trace.tracer_with_scope(self.scope.clone());
130
131 let registry = tracing_subscriber::registry()
132 .with(otel_filter(&self.internal_level))
133 .with(OpenTelemetryLayer::new(tracer))
134 .with(MetricsLayer::new(self.metric.clone()))
135 .with(OpenTelemetryTracingBridge::new(&self.logger));
136 registry.try_init()?;
137
138 Ok(())
139 }
140
141 pub fn flush(&self) {
142 if let Err(err) = self.metric.force_flush() {
143 eprintln!("failed flushing metrics provider: {err:?}");
144 }
145 if let Err(err) = self.trace.force_flush() {
146 eprintln!("failed flushing traces provider: {err:?}");
147 }
148 }
149
150 pub fn shutdown(&self) {
151 if let Err(err) = self.metric.shutdown() {
152 eprintln!("failed shutting down metrics provider: {err:?}");
153 }
154 if let Err(err) = self.trace.shutdown() {
155 eprintln!("failed shutting down traces provider: {err:?}");
156 }
157 }
158}
159
160impl Drop for OpenTelemetryProvider {
161 fn drop(&mut self) {
162 self.shutdown();
163 }
164}
165
166fn otel_filter(level: &str) -> EnvFilter {
167 EnvFilter::from_default_env()
168 .add_directive("debug".parse().unwrap())
169 .add_directive(format!("h2={level}").parse().unwrap())
170 .add_directive(format!("hyper_util={level}").parse().unwrap())
171 .add_directive(format!("opentelemetry={level}").parse().unwrap())
172 .add_directive(format!("reqwest={level}").parse().unwrap())
173 .add_directive(format!("tonic={level}").parse().unwrap())
174 .add_directive(format!("tower={level}").parse().unwrap())
175}
176
177#[derive(Debug)]
178pub struct ObservabilityContainer {
179 container: testcontainers::ContainerAsync<testcontainers::GenericImage>,
180 tmp_dir: tempfile::TempDir,
181}
182
183impl ObservabilityContainer {
184 pub async fn create() -> Self {
185 let tmp_dir = tempfile::TempDir::new().unwrap();
186
187 let container = GenericImage::new("otel/opentelemetry-collector-contrib", "latest")
188 .with_wait_for(WaitFor::message_on_stderr(
189 "Everything is ready. Begin running and processing data.",
190 ))
191 .with_exposed_port(ContainerPort::Tcp(4317))
192 .with_copy_to(
193 "/etc/otelcol-contrib/config.yaml",
194 include_bytes!("../asset/otelcol-config.yml").to_vec(),
195 )
196 .with_mount(
197 Mount::bind_mount(tmp_dir.path().to_string_lossy(), "/tmp/output")
198 .with_access_mode(AccessMode::ReadWrite),
199 )
200 .with_user("1001:1001")
201 .with_startup_timeout(Duration::from_secs(30))
202 .start()
203 .await
204 .unwrap();
205
206 Self { container, tmp_dir }
207 }
208
209 pub fn traces(&self) -> String {
210 let path = self.tmp_dir.path().join("traces.json");
211 std::fs::read_to_string(path).unwrap()
212 }
213
214 pub fn json_traces(&self) -> RootTrace {
215 let content = self.traces();
218 let last = content
219 .split("\n")
220 .filter(|item| !item.is_empty())
221 .last()
222 .unwrap();
223 serde_json::from_str(last).unwrap()
224 }
225
226 pub async fn address(&self) -> String {
227 let port = self.container.get_host_port_ipv4(4317).await.unwrap();
228
229 format!("http://127.0.0.1:{port}")
230 }
231
232 pub async fn install(&self) -> OpenTelemetryProvider {
233 let builder = OpenTelemetryBuilder {
234 otel_collector_endpoint: self.address().await.into(),
235 otel_internal_level: "off".into(),
236 };
237 let provider = builder.build().unwrap();
238 provider.install().unwrap();
239
240 provider
241 }
242}
243
244#[derive(Debug, serde::Deserialize)]
245#[serde(rename_all = "camelCase")]
246pub struct RootTrace {
247 pub resource_spans: Vec<TraceResourceSpan>,
248}
249
250impl RootTrace {
251 pub fn find_scope_span(&self, name: &str) -> Option<&ScopeSpan> {
252 self.resource_spans
253 .iter()
254 .flat_map(|span| span.scope_spans.iter())
255 .find(|scope| scope.scope.name == name)
256 }
257
258 pub fn find_child(&self, parent_id: &str, name: &str) -> Option<&Span> {
259 self.resource_spans
260 .iter()
261 .flat_map(|span| span.scope_spans.iter())
262 .flat_map(|span| span.spans.iter())
263 .find(|span| {
264 span.parent_span_id
265 .as_ref()
266 .is_some_and(|id| id == parent_id)
267 && span.name == name
268 })
269 }
270}
271
272#[derive(Debug, serde::Deserialize)]
273#[serde(rename_all = "camelCase")]
274pub struct TraceResourceSpan {
275 pub resource: TraceResource,
276 pub scope_spans: Vec<ScopeSpan>,
277}
278
279#[derive(Debug, serde::Deserialize)]
280#[serde(rename_all = "camelCase")]
281pub struct TraceResource {
282 pub attributes: Vec<Attribute>,
283}
284
285#[derive(Debug, serde::Deserialize)]
286#[serde(rename_all = "camelCase")]
287pub struct Attribute {
288 pub key: String,
289 pub value: serde_json::Value,
290}
291
292#[derive(Debug, serde::Deserialize)]
293#[serde(rename_all = "camelCase")]
294pub struct ScopeSpan {
295 pub scope: Scope,
296 #[serde(default)]
297 pub spans: Vec<Span>,
298}
299
300impl ScopeSpan {
301 pub fn first_span(&self) -> Option<&Span> {
302 self.spans.iter().find(|span| span.parent_span_id.is_none())
303 }
304}
305
306#[derive(Debug, serde::Deserialize)]
307#[serde(rename_all = "camelCase")]
308pub struct Scope {
309 pub name: String,
310 #[serde(default)]
311 pub version: Option<String>,
312 #[serde(default)]
313 pub attributes: Vec<Attribute>,
314}
315
316#[derive(Debug, serde::Deserialize)]
317#[serde(rename_all = "camelCase")]
318pub struct Span {
319 pub trace_id: String,
320 pub span_id: String,
321 #[serde(default)]
322 pub parent_span_id: Option<String>,
323 pub flags: u32,
324 pub name: String,
325 #[serde(default)]
326 pub attributes: Vec<Attribute>,
327 #[serde(default)]
328 pub events: Vec<Event>,
329 #[serde(default)]
330 pub status: HashMap<String, serde_json::Value>,
331}
332
333impl Span {
334 pub fn int_attribute(&self, name: &str) -> Option<&str> {
335 self.attributes
336 .iter()
337 .find(|attr| attr.key == name)
338 .and_then(|attr| attr.value.as_object())
339 .and_then(|obj| obj.get("intValue"))
340 .and_then(|value| value.as_str())
341 }
342
343 pub fn string_attribute(&self, name: &str) -> Option<&str> {
344 self.attributes
345 .iter()
346 .find(|attr| attr.key == name)
347 .and_then(|attr| attr.value.as_object())
348 .and_then(|obj| obj.get("stringValue"))
349 .and_then(|value| value.as_str())
350 }
351}
352
353#[derive(Debug, serde::Deserialize)]
354#[serde(rename_all = "camelCase")]
355pub struct Event {
356 pub name: String,
357 #[serde(default)]
358 pub attributes: Vec<Attribute>,
359}