wire_framework/vlog/opentelemetry/
mod.rs1use std::str::FromStr;
2
3use opentelemetry::{KeyValue, trace::TracerProvider};
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::{
6 Resource,
7 propagation::TraceContextPropagator,
8 trace::{RandomIdGenerator, Sampler},
9};
10use opentelemetry_semantic_conventions::resource::{
11 DEPLOYMENT_ENVIRONMENT_NAME, K8S_CLUSTER_NAME, K8S_NAMESPACE_NAME, K8S_POD_NAME, SERVICE_NAME,
12};
13use tracing_subscriber::{EnvFilter, Layer, registry::LookupSpan};
14use url::Url;
15
16#[derive(Debug, Clone)]
24#[non_exhaustive]
25pub struct ServiceDescriptor {
26 pub k8s_pod_name: String,
28 pub k8s_namespace_name: String,
30 pub k8s_cluster_name: String,
32 pub deployment_environment: String,
35 pub service_name: String,
37}
38
39impl Default for ServiceDescriptor {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl ServiceDescriptor {
46 pub const K8S_POD_NAME_ENV_VAR: &'static str = "POD_NAME";
48 pub const K8S_NAMESPACE_NAME_ENV_VAR: &'static str = "POD_NAMESPACE";
50 pub const K8S_CLUSTER_NAME_ENV_VAR: &'static str = "CLUSTER_NAME";
52 pub const DEPLOYMENT_ENVIRONMENT_ENV_VAR: &'static str = "DEPLOYMENT_ENVIRONMENT";
54 pub const SERVICE_NAME_ENV_VAR: &'static str = "SERVICE_NAME";
56 pub const DEFAULT_K8S_POD_NAME: &'static str = "wire_framework-0";
58 pub const DEFAULT_K8S_NAMESPACE_NAME: &'static str = "local";
60 pub const DEFAULT_K8S_CLUSTER_NAME: &'static str = "local";
62 pub const DEFAULT_DEPLOYMENT_ENVIRONMENT: &'static str = "local";
64 pub const DEFAULT_SERVICE_NAME: &'static str = "wire_framework";
66
67 pub fn new() -> Self {
70 fn env_or(env_var: &str, default: &str) -> String {
72 std::env::var(env_var).unwrap_or_else(|_| default.to_string())
73 }
74 Self {
75 k8s_pod_name: env_or(Self::K8S_POD_NAME_ENV_VAR, Self::DEFAULT_K8S_POD_NAME),
76 k8s_namespace_name: env_or(
77 Self::K8S_NAMESPACE_NAME_ENV_VAR,
78 Self::DEFAULT_K8S_NAMESPACE_NAME,
79 ),
80 k8s_cluster_name: env_or(
81 Self::K8S_CLUSTER_NAME_ENV_VAR,
82 Self::DEFAULT_K8S_CLUSTER_NAME,
83 ),
84 deployment_environment: env_or(
85 Self::DEPLOYMENT_ENVIRONMENT_ENV_VAR,
86 Self::DEFAULT_DEPLOYMENT_ENVIRONMENT,
87 ),
88 service_name: env_or(Self::SERVICE_NAME_ENV_VAR, Self::DEFAULT_SERVICE_NAME),
89 }
90 }
91
92 pub fn with_k8s_pod_name(mut self, k8s_pod_name: Option<String>) -> Self {
93 if let Some(k8s_pod_name) = k8s_pod_name {
94 self.k8s_pod_name = k8s_pod_name;
95 }
96 self
97 }
98
99 pub fn with_k8s_namespace_name(mut self, k8s_namespace_name: Option<String>) -> Self {
100 if let Some(k8s_namespace_name) = k8s_namespace_name {
101 self.k8s_namespace_name = k8s_namespace_name;
102 }
103 self
104 }
105
106 pub fn with_service_name(mut self, service_name: Option<String>) -> Self {
107 if let Some(service_name) = service_name {
108 self.service_name = service_name;
109 }
110 self
111 }
112
113 fn into_otlp_resource(self) -> Resource {
114 Resource::builder()
115 .with_attribute(KeyValue::new(K8S_POD_NAME, self.k8s_pod_name))
116 .with_attribute(KeyValue::new(K8S_NAMESPACE_NAME, self.k8s_namespace_name))
117 .with_attribute(KeyValue::new(K8S_CLUSTER_NAME, self.k8s_cluster_name))
118 .with_attribute(KeyValue::new(
119 DEPLOYMENT_ENVIRONMENT_NAME,
120 self.deployment_environment,
121 ))
122 .with_attribute(KeyValue::new(SERVICE_NAME, self.service_name))
123 .build()
124 }
125}
126
127#[derive(Debug)]
128pub struct OpenTelemetry {
129 pub opentelemetry_level: OpenTelemetryLevel,
131 pub tracing_endpoint: Option<Url>,
133 pub logging_endpoint: Option<Url>,
135 pub service: ServiceDescriptor,
137}
138
139impl OpenTelemetry {
140 pub fn new(
141 opentelemetry_level: &str,
142 tracing_endpoint: Option<String>,
143 logging_endpoint: Option<String>,
144 ) -> Result<Self, OpenTelemetryLayerError> {
145 fn parse_url(url: Option<String>) -> Result<Option<Url>, OpenTelemetryLayerError> {
146 url.map(|v| {
147 v.parse()
148 .map_err(|e| OpenTelemetryLayerError::InvalidUrl(v, e))
149 })
150 .transpose()
151 }
152
153 Ok(Self {
154 opentelemetry_level: opentelemetry_level.parse()?,
155 tracing_endpoint: parse_url(tracing_endpoint)?,
156 logging_endpoint: parse_url(logging_endpoint)?,
157 service: ServiceDescriptor::new(),
158 })
159 }
160
161 pub fn with_service_descriptor(mut self, service: ServiceDescriptor) -> Self {
163 self.service = service;
164 self
165 }
166
167 pub(super) fn logs_layer<S>(
187 self,
188 ) -> Option<(opentelemetry_sdk::logs::SdkLoggerProvider, impl Layer<S>)>
189 where
190 S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
191 {
192 let logging_endpoint = self.logging_endpoint.clone()?;
193 let resource = self.service.clone().into_otlp_resource();
194
195 let exporter = opentelemetry_otlp::LogExporter::builder()
196 .with_http()
197 .with_endpoint(logging_endpoint)
198 .build()
199 .expect("Failed to create OTLP exporter"); let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
202 .with_batch_exporter(exporter)
203 .with_resource(resource)
204 .build();
205
206 let layer =
207 opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&provider);
208
209 Some((provider, layer))
210 }
211
212 pub(super) fn tracing_layer<S>(
215 &self,
216 ) -> Option<(opentelemetry_sdk::trace::SdkTracerProvider, impl Layer<S>)>
217 where
218 S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
219 {
220 let tracing_endpoint = self.tracing_endpoint.clone()?;
221 let filter = self
224 .filter()
225 .add_directive("otel::tracing=trace".parse().unwrap())
226 .add_directive("otel=debug".parse().unwrap());
227
228 let service_name = self.service.service_name.clone();
229 let resource = self.service.clone().into_otlp_resource();
230
231 let exporter = opentelemetry_otlp::SpanExporter::builder()
232 .with_http()
233 .with_endpoint(tracing_endpoint)
234 .build()
235 .expect("Failed to create OTLP exporter"); let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
238 .with_batch_exporter(exporter)
239 .with_id_generator(RandomIdGenerator::default())
240 .with_sampler(Sampler::AlwaysOn)
241 .with_resource(resource)
242 .build();
243
244 let tracer = provider.tracer(service_name);
246
247 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
248 let layer = tracing_opentelemetry::layer()
249 .with_tracer(tracer)
250 .with_filter(filter);
251
252 Some((provider, layer))
253 }
254
255 fn filter(&self) -> EnvFilter {
259 match self.opentelemetry_level {
260 OpenTelemetryLevel::OFF => EnvFilter::new("off"),
261 OpenTelemetryLevel::INFO => EnvFilter::new("info"),
262 OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"),
263 OpenTelemetryLevel::TRACE => EnvFilter::new("trace"),
264 }
265 }
266}
267
268#[derive(Copy, Clone, Debug, Default)]
270pub enum OpenTelemetryLevel {
271 #[default]
272 OFF,
273 INFO,
274 DEBUG,
275 TRACE,
276}
277
278#[derive(Debug, thiserror::Error)]
279#[non_exhaustive]
280pub enum OpenTelemetryLayerError {
281 #[error("Invalid OpenTelemetry level format")]
282 InvalidFormat,
283 #[error("Invalid URL: \"{0}\" - {1}")]
284 InvalidUrl(String, url::ParseError),
285}
286
287impl FromStr for OpenTelemetryLevel {
288 type Err = OpenTelemetryLayerError;
289
290 fn from_str(s: &str) -> Result<Self, Self::Err> {
291 match s {
292 "off" => Ok(OpenTelemetryLevel::OFF),
293 "info" => Ok(OpenTelemetryLevel::INFO),
294 "debug" => Ok(OpenTelemetryLevel::DEBUG),
295 "trace" => Ok(OpenTelemetryLevel::TRACE),
296 _ => Err(OpenTelemetryLayerError::InvalidFormat),
297 }
298 }
299}