opentelemetry_testing/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::time::Duration;
6
7use opentelemetry::trace::TracerProvider;
8use opentelemetry::{InstrumentationScope, KeyValue};
9use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
10use opentelemetry_otlp::WithExportConfig;
11use opentelemetry_sdk::Resource;
12use opentelemetry_sdk::logs::SdkLoggerProvider;
13use opentelemetry_sdk::metrics::SdkMeterProvider;
14use opentelemetry_sdk::trace::{BatchSpanProcessor, SdkTracerProvider};
15use opentelemetry_semantic_conventions::attribute as semver;
16use testcontainers::core::{AccessMode, ContainerPort, Mount, WaitFor};
17use testcontainers::runners::AsyncRunner;
18use testcontainers::{GenericImage, ImageExt};
19use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
20use tracing_subscriber::EnvFilter;
21use tracing_subscriber::layer::SubscriberExt;
22use tracing_subscriber::util::SubscriberInitExt;
23
24const SERVICE_NAME: &str = env!("CARGO_PKG_NAME");
25const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
26
27struct OpenTelemetryBuilder {
28    otel_collector_endpoint: Cow<'static, str>,
29    otel_internal_level: Cow<'static, str>,
30}
31
32impl OpenTelemetryBuilder {
33    fn attributes(&self) -> impl IntoIterator<Item = KeyValue> {
34        [
35            KeyValue::new(semver::SERVICE_NAME, SERVICE_NAME),
36            KeyValue::new(semver::SERVICE_VERSION, SERVICE_VERSION),
37            KeyValue::new("deployment.environment", "test"),
38        ]
39    }
40
41    fn resources(&self) -> Resource {
42        self.attributes()
43            .into_iter()
44            .fold(Resource::builder(), |res, attr| res.with_attribute(attr))
45            .build()
46    }
47
48    fn metric_provider(&self) -> anyhow::Result<SdkMeterProvider> {
49        let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
50            .with_tonic()
51            .with_protocol(opentelemetry_otlp::Protocol::Grpc)
52            .with_endpoint(self.otel_collector_endpoint.as_ref())
53            .build()?;
54
55        Ok(opentelemetry_sdk::metrics::MeterProviderBuilder::default()
56            .with_periodic_exporter(metric_exporter)
57            .with_resource(self.resources())
58            .build())
59    }
60
61    fn trace_provider(&self) -> anyhow::Result<SdkTracerProvider> {
62        let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
63            .with_tonic()
64            .with_protocol(opentelemetry_otlp::Protocol::Grpc)
65            .with_endpoint(self.otel_collector_endpoint.as_ref())
66            .build()?;
67
68        let trace_processor = BatchSpanProcessor::builder(trace_exporter).build();
69
70        Ok(opentelemetry_sdk::trace::TracerProviderBuilder::default()
71            .with_span_processor(trace_processor)
72            .with_resource(self.resources())
73            .build())
74    }
75
76    fn logger_provider(&self) -> anyhow::Result<SdkLoggerProvider> {
77        let log_exporter = opentelemetry_otlp::LogExporter::builder()
78            .with_tonic()
79            .with_protocol(opentelemetry_otlp::Protocol::Grpc)
80            .with_endpoint(self.otel_collector_endpoint.as_ref())
81            .build()?;
82
83        Ok(opentelemetry_sdk::logs::SdkLoggerProvider::builder()
84            .with_resource(self.resources())
85            .with_batch_exporter(log_exporter)
86            .build())
87    }
88
89    pub fn build(self) -> anyhow::Result<OpenTelemetryProvider> {
90        let scope = InstrumentationScope::builder(SERVICE_NAME)
91            .with_version(SERVICE_VERSION)
92            .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
93            .with_attributes(self.attributes())
94            .build();
95
96        Ok(OpenTelemetryProvider {
97            metric: self.metric_provider()?,
98            trace: self.trace_provider()?,
99            logger: self.logger_provider()?,
100            scope,
101            internal_level: self.otel_internal_level,
102        })
103    }
104}
105
106pub struct OpenTelemetryProvider {
107    internal_level: Cow<'static, str>,
108    metric: SdkMeterProvider,
109    trace: SdkTracerProvider,
110    logger: SdkLoggerProvider,
111    scope: InstrumentationScope,
112}
113
114impl std::fmt::Debug for OpenTelemetryProvider {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct(stringify!(OpenTelemetryProvider))
117            .finish_non_exhaustive()
118    }
119}
120
121impl OpenTelemetryProvider {
122    pub fn install(&self) -> anyhow::Result<()> {
123        opentelemetry::global::set_meter_provider(self.metric.clone());
124        opentelemetry::global::set_tracer_provider(self.trace.clone());
125        opentelemetry::global::set_text_map_propagator(
126            opentelemetry_sdk::propagation::TraceContextPropagator::new(),
127        );
128
129        let tracer = self.trace.tracer_with_scope(self.scope.clone());
130
131        let registry = tracing_subscriber::registry()
132            .with(otel_filter(&self.internal_level))
133            .with(OpenTelemetryLayer::new(tracer))
134            .with(MetricsLayer::new(self.metric.clone()))
135            .with(OpenTelemetryTracingBridge::new(&self.logger));
136        registry.try_init()?;
137
138        Ok(())
139    }
140
141    pub fn flush(&self) {
142        if let Err(err) = self.metric.force_flush() {
143            eprintln!("failed flushing metrics provider: {err:?}");
144        }
145        if let Err(err) = self.trace.force_flush() {
146            eprintln!("failed flushing traces provider: {err:?}");
147        }
148    }
149
150    pub fn shutdown(&self) {
151        if let Err(err) = self.metric.shutdown() {
152            eprintln!("failed shutting down metrics provider: {err:?}");
153        }
154        if let Err(err) = self.trace.shutdown() {
155            eprintln!("failed shutting down traces provider: {err:?}");
156        }
157    }
158}
159
160impl Drop for OpenTelemetryProvider {
161    fn drop(&mut self) {
162        self.shutdown();
163    }
164}
165
166fn otel_filter(level: &str) -> EnvFilter {
167    EnvFilter::from_default_env()
168        .add_directive("debug".parse().unwrap())
169        .add_directive(format!("h2={level}").parse().unwrap())
170        .add_directive(format!("hyper_util={level}").parse().unwrap())
171        .add_directive(format!("opentelemetry={level}").parse().unwrap())
172        .add_directive(format!("reqwest={level}").parse().unwrap())
173        .add_directive(format!("tonic={level}").parse().unwrap())
174        .add_directive(format!("tower={level}").parse().unwrap())
175}
176
177#[derive(Debug)]
178pub struct ObservabilityContainer {
179    container: testcontainers::ContainerAsync<testcontainers::GenericImage>,
180    tmp_dir: tempfile::TempDir,
181}
182
183impl ObservabilityContainer {
184    pub async fn create() -> Self {
185        let tmp_dir = tempfile::TempDir::new().unwrap();
186
187        let container = GenericImage::new("otel/opentelemetry-collector-contrib", "latest")
188            .with_wait_for(WaitFor::message_on_stderr(
189                "Everything is ready. Begin running and processing data.",
190            ))
191            .with_exposed_port(ContainerPort::Tcp(4317))
192            .with_copy_to(
193                "/etc/otelcol-contrib/config.yaml",
194                include_bytes!("../asset/otelcol-config.yml").to_vec(),
195            )
196            .with_mount(
197                Mount::bind_mount(tmp_dir.path().to_string_lossy(), "/tmp/output")
198                    .with_access_mode(AccessMode::ReadWrite),
199            )
200            .with_user("1001:1001")
201            .with_startup_timeout(Duration::from_secs(30))
202            .start()
203            .await
204            .unwrap();
205
206        Self { container, tmp_dir }
207    }
208
209    pub fn traces(&self) -> String {
210        let path = self.tmp_dir.path().join("traces.json");
211        std::fs::read_to_string(path).unwrap()
212    }
213
214    pub fn json_traces(&self) -> RootTrace {
215        // traces are appended to the same file, one line per flush,
216        // so we need to take the last one only.
217        let content = self.traces();
218        let last = content
219            .split("\n")
220            .filter(|item| !item.is_empty())
221            .last()
222            .unwrap();
223        serde_json::from_str(last).unwrap()
224    }
225
226    pub async fn address(&self) -> String {
227        let port = self.container.get_host_port_ipv4(4317).await.unwrap();
228
229        format!("http://127.0.0.1:{port}")
230    }
231
232    pub async fn install(&self) -> OpenTelemetryProvider {
233        let builder = OpenTelemetryBuilder {
234            otel_collector_endpoint: self.address().await.into(),
235            otel_internal_level: "off".into(),
236        };
237        let provider = builder.build().unwrap();
238        provider.install().unwrap();
239
240        provider
241    }
242}
243
244#[derive(Debug, serde::Deserialize)]
245#[serde(rename_all = "camelCase")]
246pub struct RootTrace {
247    pub resource_spans: Vec<TraceResourceSpan>,
248}
249
250impl RootTrace {
251    pub fn find_scope_span(&self, name: &str) -> Option<&ScopeSpan> {
252        self.resource_spans
253            .iter()
254            .flat_map(|span| span.scope_spans.iter())
255            .find(|scope| scope.scope.name == name)
256    }
257
258    pub fn find_child(&self, parent_id: &str, name: &str) -> Option<&Span> {
259        self.resource_spans
260            .iter()
261            .flat_map(|span| span.scope_spans.iter())
262            .flat_map(|span| span.spans.iter())
263            .find(|span| {
264                span.parent_span_id
265                    .as_ref()
266                    .is_some_and(|id| id == parent_id)
267                    && span.name == name
268            })
269    }
270}
271
272#[derive(Debug, serde::Deserialize)]
273#[serde(rename_all = "camelCase")]
274pub struct TraceResourceSpan {
275    pub resource: TraceResource,
276    pub scope_spans: Vec<ScopeSpan>,
277}
278
279#[derive(Debug, serde::Deserialize)]
280#[serde(rename_all = "camelCase")]
281pub struct TraceResource {
282    pub attributes: Vec<Attribute>,
283}
284
285#[derive(Debug, serde::Deserialize)]
286#[serde(rename_all = "camelCase")]
287pub struct Attribute {
288    pub key: String,
289    pub value: serde_json::Value,
290}
291
292#[derive(Debug, serde::Deserialize)]
293#[serde(rename_all = "camelCase")]
294pub struct ScopeSpan {
295    pub scope: Scope,
296    #[serde(default)]
297    pub spans: Vec<Span>,
298}
299
300impl ScopeSpan {
301    pub fn first_span(&self) -> Option<&Span> {
302        self.spans.iter().find(|span| span.parent_span_id.is_none())
303    }
304}
305
306#[derive(Debug, serde::Deserialize)]
307#[serde(rename_all = "camelCase")]
308pub struct Scope {
309    pub name: String,
310    #[serde(default)]
311    pub version: Option<String>,
312    #[serde(default)]
313    pub attributes: Vec<Attribute>,
314}
315
316#[derive(Debug, serde::Deserialize)]
317#[serde(rename_all = "camelCase")]
318pub struct Span {
319    pub trace_id: String,
320    pub span_id: String,
321    #[serde(default)]
322    pub parent_span_id: Option<String>,
323    pub flags: u32,
324    pub name: String,
325    #[serde(default)]
326    pub attributes: Vec<Attribute>,
327    #[serde(default)]
328    pub events: Vec<Event>,
329    #[serde(default)]
330    pub status: HashMap<String, serde_json::Value>,
331}
332
333impl Span {
334    pub fn int_attribute(&self, name: &str) -> Option<&str> {
335        self.attributes
336            .iter()
337            .find(|attr| attr.key == name)
338            .and_then(|attr| attr.value.as_object())
339            .and_then(|obj| obj.get("intValue"))
340            .and_then(|value| value.as_str())
341    }
342
343    pub fn string_attribute(&self, name: &str) -> Option<&str> {
344        self.attributes
345            .iter()
346            .find(|attr| attr.key == name)
347            .and_then(|attr| attr.value.as_object())
348            .and_then(|obj| obj.get("stringValue"))
349            .and_then(|value| value.as_str())
350    }
351}
352
353#[derive(Debug, serde::Deserialize)]
354#[serde(rename_all = "camelCase")]
355pub struct Event {
356    pub name: String,
357    #[serde(default)]
358    pub attributes: Vec<Attribute>,
359}