Skip to main content

wire_framework/vlog/opentelemetry/
mod.rs

1use std::str::FromStr;
2
3use opentelemetry::{KeyValue, trace::TracerProvider};
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::{
6    Resource,
7    propagation::TraceContextPropagator,
8    trace::{RandomIdGenerator, Sampler},
9};
10use opentelemetry_semantic_conventions::resource::{
11    DEPLOYMENT_ENVIRONMENT_NAME, K8S_CLUSTER_NAME, K8S_NAMESPACE_NAME, K8S_POD_NAME, SERVICE_NAME,
12};
13use tracing_subscriber::{EnvFilter, Layer, registry::LookupSpan};
14use url::Url;
15
16/// Information about the service.
17///
18/// This information is initially filled as follows:
19/// - Fields will be attempted to fetch from environment variables.
20/// - If not found, a default values will be chosen.
21///
22/// For environment variable names and default values, see the constants in the struct.
23#[derive(Debug, Clone)]
24#[non_exhaustive]
25pub struct ServiceDescriptor {
26    /// Name of the k8s pod.
27    pub k8s_pod_name: String,
28    /// Name of the k8s namespace.
29    pub k8s_namespace_name: String,
30    /// Name of the k8s cluster.
31    pub k8s_cluster_name: String,
32    /// Name of the deployment environment.
33    /// Note that the single deployment environment can be spread among multiple clusters.
34    pub deployment_environment: String,
35    /// Name of the service.
36    pub service_name: String,
37}
38
39impl Default for ServiceDescriptor {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl ServiceDescriptor {
46    /// Environment variable to fetch the k8s pod name.
47    pub const K8S_POD_NAME_ENV_VAR: &'static str = "POD_NAME";
48    /// Environment variable to fetch the k8s namespace name.
49    pub const K8S_NAMESPACE_NAME_ENV_VAR: &'static str = "POD_NAMESPACE";
50    /// Environment variable to fetch the k8s cluster name.
51    pub const K8S_CLUSTER_NAME_ENV_VAR: &'static str = "CLUSTER_NAME";
52    /// Environment variable to fetch the deployment environment.
53    pub const DEPLOYMENT_ENVIRONMENT_ENV_VAR: &'static str = "DEPLOYMENT_ENVIRONMENT";
54    /// Environment variable to fetch the service name.
55    pub const SERVICE_NAME_ENV_VAR: &'static str = "SERVICE_NAME";
56    /// Default value for the k8s pod name.
57    pub const DEFAULT_K8S_POD_NAME: &'static str = "wire_framework-0";
58    /// Default value for the k8s namespace name.
59    pub const DEFAULT_K8S_NAMESPACE_NAME: &'static str = "local";
60    /// Default value for the k8s cluster name.
61    pub const DEFAULT_K8S_CLUSTER_NAME: &'static str = "local";
62    /// Default value for the deployment environment.
63    pub const DEFAULT_DEPLOYMENT_ENVIRONMENT: &'static str = "local";
64    /// Default value for the service name.
65    pub const DEFAULT_SERVICE_NAME: &'static str = "wire_framework";
66
67    /// Creates a filled `ServiceDescriptor` object.
68    /// Fetched fields can be overridden.
69    pub fn new() -> Self {
70        // Attempt fetching data from environment variables, and use defaults if not provided.
71        fn env_or(env_var: &str, default: &str) -> String {
72            std::env::var(env_var).unwrap_or_else(|_| default.to_string())
73        }
74        Self {
75            k8s_pod_name: env_or(Self::K8S_POD_NAME_ENV_VAR, Self::DEFAULT_K8S_POD_NAME),
76            k8s_namespace_name: env_or(
77                Self::K8S_NAMESPACE_NAME_ENV_VAR,
78                Self::DEFAULT_K8S_NAMESPACE_NAME,
79            ),
80            k8s_cluster_name: env_or(
81                Self::K8S_CLUSTER_NAME_ENV_VAR,
82                Self::DEFAULT_K8S_CLUSTER_NAME,
83            ),
84            deployment_environment: env_or(
85                Self::DEPLOYMENT_ENVIRONMENT_ENV_VAR,
86                Self::DEFAULT_DEPLOYMENT_ENVIRONMENT,
87            ),
88            service_name: env_or(Self::SERVICE_NAME_ENV_VAR, Self::DEFAULT_SERVICE_NAME),
89        }
90    }
91
92    pub fn with_k8s_pod_name(mut self, k8s_pod_name: Option<String>) -> Self {
93        if let Some(k8s_pod_name) = k8s_pod_name {
94            self.k8s_pod_name = k8s_pod_name;
95        }
96        self
97    }
98
99    pub fn with_k8s_namespace_name(mut self, k8s_namespace_name: Option<String>) -> Self {
100        if let Some(k8s_namespace_name) = k8s_namespace_name {
101            self.k8s_namespace_name = k8s_namespace_name;
102        }
103        self
104    }
105
106    pub fn with_service_name(mut self, service_name: Option<String>) -> Self {
107        if let Some(service_name) = service_name {
108            self.service_name = service_name;
109        }
110        self
111    }
112
113    fn into_otlp_resource(self) -> Resource {
114        Resource::builder()
115            .with_attribute(KeyValue::new(K8S_POD_NAME, self.k8s_pod_name))
116            .with_attribute(KeyValue::new(K8S_NAMESPACE_NAME, self.k8s_namespace_name))
117            .with_attribute(KeyValue::new(K8S_CLUSTER_NAME, self.k8s_cluster_name))
118            .with_attribute(KeyValue::new(
119                DEPLOYMENT_ENVIRONMENT_NAME,
120                self.deployment_environment,
121            ))
122            .with_attribute(KeyValue::new(SERVICE_NAME, self.service_name))
123            .build()
124    }
125}
126
127#[derive(Debug)]
128pub struct OpenTelemetry {
129    /// Enables export of span data of specified level (and above) using opentelemetry exporters.
130    pub opentelemetry_level: OpenTelemetryLevel,
131    /// Opentelemetry HTTP collector endpoint for traces.
132    pub tracing_endpoint: Option<Url>,
133    /// Opentelemetry HTTP collector endpoint for logs.
134    pub logging_endpoint: Option<Url>,
135    /// Information about service
136    pub service: ServiceDescriptor,
137}
138
139impl OpenTelemetry {
140    pub fn new(
141        opentelemetry_level: &str,
142        tracing_endpoint: Option<String>,
143        logging_endpoint: Option<String>,
144    ) -> Result<Self, OpenTelemetryLayerError> {
145        fn parse_url(url: Option<String>) -> Result<Option<Url>, OpenTelemetryLayerError> {
146            url.map(|v| {
147                v.parse()
148                    .map_err(|e| OpenTelemetryLayerError::InvalidUrl(v, e))
149            })
150            .transpose()
151        }
152
153        Ok(Self {
154            opentelemetry_level: opentelemetry_level.parse()?,
155            tracing_endpoint: parse_url(tracing_endpoint)?,
156            logging_endpoint: parse_url(logging_endpoint)?,
157            service: ServiceDescriptor::new(),
158        })
159    }
160
161    /// Can be used to override the service descriptor used by the layer.
162    pub fn with_service_descriptor(mut self, service: ServiceDescriptor) -> Self {
163        self.service = service;
164        self
165    }
166
167    /// Prepares an exporter for OTLP logs and layer for the `tracing` library.
168    /// Will return `None` if no logging URL was provided.
169    ///
170    /// *Important*: we use `tracing` library to generate logs, and convert the logs
171    /// to OTLP format when exporting. However, `tracing` doesn't provide information
172    /// about timestamp of the log. While this value is optional in OTLP, some
173    /// collectors/processors may ignore logs without timestamp. Thus, you may need to
174    /// have a proxy collector, like `opentelemetry-collector-contrib` or `vector`, and
175    /// use the functionality there to set the timestamp. Here's example configuration
176    /// for `opentelemetry-collector-contrib`:
177    ///
178    /// ```text
179    /// processors:
180    ///  transform/set_time_unix_nano:
181    ///  log_statements:
182    ///    - context: log
183    ///      statements:
184    ///        - set(time_unix_nano, observed_time_unix_nano)
185    /// ```
186    pub(super) fn logs_layer<S>(
187        self,
188    ) -> Option<(opentelemetry_sdk::logs::SdkLoggerProvider, impl Layer<S>)>
189    where
190        S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
191    {
192        let logging_endpoint = self.logging_endpoint.clone()?;
193        let resource = self.service.clone().into_otlp_resource();
194
195        let exporter = opentelemetry_otlp::LogExporter::builder()
196            .with_http()
197            .with_endpoint(logging_endpoint)
198            .build()
199            .expect("Failed to create OTLP exporter"); // URL is validated.
200
201        let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
202            .with_batch_exporter(exporter)
203            .with_resource(resource)
204            .build();
205
206        let layer =
207            opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&provider);
208
209        Some((provider, layer))
210    }
211
212    /// Prepares an exporter for OTLP traces and layer for `tracing` library.
213    /// Will return `None` if no tracing URL was provided.
214    pub(super) fn tracing_layer<S>(
215        &self,
216    ) -> Option<(opentelemetry_sdk::trace::SdkTracerProvider, impl Layer<S>)>
217    where
218        S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
219    {
220        let tracing_endpoint = self.tracing_endpoint.clone()?;
221        // `otel::tracing` should be a level info to emit opentelemetry trace & span
222        // `otel` set to debug to log detected resources, configuration read and inferred
223        let filter = self
224            .filter()
225            .add_directive("otel::tracing=trace".parse().unwrap())
226            .add_directive("otel=debug".parse().unwrap());
227
228        let service_name = self.service.service_name.clone();
229        let resource = self.service.clone().into_otlp_resource();
230
231        let exporter = opentelemetry_otlp::SpanExporter::builder()
232            .with_http()
233            .with_endpoint(tracing_endpoint)
234            .build()
235            .expect("Failed to create OTLP exporter"); // URL is validated.
236
237        let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
238            .with_batch_exporter(exporter)
239            .with_id_generator(RandomIdGenerator::default())
240            .with_sampler(Sampler::AlwaysOn)
241            .with_resource(resource)
242            .build();
243
244        // TODO: Version and other metadata
245        let tracer = provider.tracer(service_name);
246
247        opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
248        let layer = tracing_opentelemetry::layer()
249            .with_tracer(tracer)
250            .with_filter(filter);
251
252        Some((provider, layer))
253    }
254
255    /// Returns a filter for opentelemetry layer.
256    /// It's applied to the layer only, but note that there might be a global filter applied to the
257    /// whole subscriber.
258    fn filter(&self) -> EnvFilter {
259        match self.opentelemetry_level {
260            OpenTelemetryLevel::OFF => EnvFilter::new("off"),
261            OpenTelemetryLevel::INFO => EnvFilter::new("info"),
262            OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"),
263            OpenTelemetryLevel::TRACE => EnvFilter::new("trace"),
264        }
265    }
266}
267
268// Doesn't define WARN and ERROR, because the highest verbosity of spans is INFO.
269#[derive(Copy, Clone, Debug, Default)]
270pub enum OpenTelemetryLevel {
271    #[default]
272    OFF,
273    INFO,
274    DEBUG,
275    TRACE,
276}
277
278#[derive(Debug, thiserror::Error)]
279#[non_exhaustive]
280pub enum OpenTelemetryLayerError {
281    #[error("Invalid OpenTelemetry level format")]
282    InvalidFormat,
283    #[error("Invalid URL: \"{0}\" - {1}")]
284    InvalidUrl(String, url::ParseError),
285}
286
287impl FromStr for OpenTelemetryLevel {
288    type Err = OpenTelemetryLayerError;
289
290    fn from_str(s: &str) -> Result<Self, Self::Err> {
291        match s {
292            "off" => Ok(OpenTelemetryLevel::OFF),
293            "info" => Ok(OpenTelemetryLevel::INFO),
294            "debug" => Ok(OpenTelemetryLevel::DEBUG),
295            "trace" => Ok(OpenTelemetryLevel::TRACE),
296            _ => Err(OpenTelemetryLayerError::InvalidFormat),
297        }
298    }
299}