cron_when/cli/
telemetry.rs1use anyhow::{Result, anyhow};
52use base64::{Engine, engine::general_purpose};
53use once_cell::sync::OnceCell;
54use opentelemetry::propagation::TextMapCompositePropagator;
55use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
56use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
57use opentelemetry_sdk::{
58 Resource,
59 propagation::{BaggagePropagator, TraceContextPropagator},
60 trace::{SdkTracerProvider, Tracer},
61};
62use std::{collections::HashMap, env::var, time::Duration};
63use tonic::{metadata::*, transport::ClientTlsConfig};
64use tracing::{Level, debug};
65use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};
66use ulid::Ulid;
67
68static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();
70
71fn parse_headers_env(headers_str: &str) -> HashMap<String, String> {
72 headers_str
73 .split(',')
74 .filter_map(|pair| {
75 let mut parts = pair.splitn(2, '=');
76 let key = parts.next()?.trim().to_string();
77 let value = parts.next()?.trim().to_string();
78 Some((key, value))
79 })
80 .collect()
81}
82
83fn headers_to_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
87 let mut meta = MetadataMap::with_capacity(headers.len());
88
89 for (k, v) in headers {
90 let key_str = k.to_ascii_lowercase();
91
92 if key_str.ends_with("-bin") {
93 let bytes = general_purpose::STANDARD
94 .decode(v.as_bytes())
95 .map_err(|e| anyhow!("failed to base64-decode value for key {}: {}", key_str, e))?;
96
97 let key = MetadataKey::<Binary>::from_bytes(key_str.as_bytes())
98 .map_err(|e| anyhow!("invalid binary metadata key {}: {}", key_str, e))?;
99
100 let val = MetadataValue::from_bytes(&bytes);
101 meta.insert_bin(key, val);
102 } else {
103 let key = MetadataKey::<Ascii>::from_bytes(key_str.as_bytes())
104 .map_err(|e| anyhow!("invalid ASCII metadata key {}: {}", key_str, e))?;
105
106 let val: MetadataValue<_> = v
107 .parse()
108 .map_err(|e| anyhow!("invalid ASCII metadata value for key {}: {}", key_str, e))?;
109 meta.insert(key, val);
110 }
111 }
112
113 Ok(meta)
114}
115
116fn normalize_endpoint(ep: String) -> String {
117 if ep.starts_with("http://") || ep.starts_with("https://") {
118 ep
119 } else {
120 format!("https://{}", ep.trim_end_matches('/'))
122 }
123}
124
125fn init_tracer() -> Result<Tracer> {
126 if let Ok(proto) = var("OTEL_EXPORTER_OTLP_PROTOCOL")
128 && proto != "grpc"
129 {
130 debug!(
131 "OTEL_EXPORTER_OTLP_PROTOCOL='{}' ignored: only 'grpc' is supported now",
132 proto
133 );
134 }
135
136 let default_ep = "http://localhost:4317";
138 let endpoint = var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_ep.to_string());
139 let endpoint = normalize_endpoint(endpoint);
140
141 let headers = var("OTEL_EXPORTER_OTLP_HEADERS")
142 .ok()
143 .map(|s| parse_headers_env(&s))
144 .unwrap_or_default();
145
146 let mut builder = opentelemetry_otlp::SpanExporter::builder()
148 .with_tonic()
149 .with_endpoint(&endpoint)
150 .with_compression(Compression::Gzip)
151 .with_timeout(Duration::from_secs(3));
152
153 if let Some(host) = endpoint
155 .strip_prefix("https://")
156 .and_then(|s| s.split('/').next())
157 .and_then(|h| h.split(':').next())
158 {
159 let tls = ClientTlsConfig::new()
160 .domain_name(host.to_string())
161 .with_native_roots();
162 builder = builder.with_tls_config(tls);
163 }
164
165 if !headers.is_empty() {
166 let metadata = headers_to_metadata(&headers)?;
167 builder = builder.with_metadata(metadata);
168 }
169
170 let exporter = builder.build()?;
171
172 let instance_id = var("OTEL_SERVICE_INSTANCE_ID").unwrap_or_else(|_| Ulid::new().to_string());
174
175 let trace_provider = SdkTracerProvider::builder()
176 .with_batch_exporter(exporter)
177 .with_resource(
178 Resource::builder_empty()
179 .with_attributes(vec![
180 KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
181 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
182 KeyValue::new("service.instance.id", instance_id),
183 ])
184 .build(),
185 )
186 .build();
187
188 let stored = trace_provider.clone();
190 let _ = TRACER_PROVIDER.set(stored);
191
192 global::set_tracer_provider(trace_provider.clone());
194 global::set_text_map_propagator(TextMapCompositePropagator::new(vec![
195 Box::new(TraceContextPropagator::new()),
196 Box::new(BaggagePropagator::new()),
197 ]));
198
199 Ok(trace_provider.tracer(env!("CARGO_PKG_NAME")))
200}
201
202pub fn init(verbosity_level: Option<Level>) -> Result<()> {
205 let verbosity_level = verbosity_level.unwrap_or(Level::ERROR);
206
207 let fmt_layer = fmt::layer()
208 .with_file(false)
209 .with_line_number(false)
210 .with_thread_ids(false)
211 .with_thread_names(false)
212 .with_target(false)
213 .pretty();
214
215 let filter = EnvFilter::builder()
216 .with_default_directive(verbosity_level.into())
217 .from_env_lossy()
218 .add_directive("hyper=error".parse()?)
219 .add_directive("tokio=error".parse()?)
220 .add_directive("opentelemetry_sdk=warn".parse()?);
221
222 if var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
223 let tracer = init_tracer()?;
224 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
225
226 let subscriber = Registry::default()
227 .with(fmt_layer)
228 .with(otel_layer)
229 .with(filter);
230 tracing::subscriber::set_global_default(subscriber)?;
231 } else {
232 let subscriber = Registry::default().with(fmt_layer).with(filter);
233 tracing::subscriber::set_global_default(subscriber)?;
234 }
235
236 Ok(())
237}
238
239pub fn shutdown_tracer() {
272 if let Some(tp) = TRACER_PROVIDER.get() {
273 debug!("flushing and shutting down tracer provider");
274
275 if let Err(e) = tp.force_flush() {
278 eprintln!("Failed to flush spans: {}", e);
279 }
280
281 if let Err(e) = tp.shutdown() {
283 eprintln!("Failed to shutdown tracer provider: {}", e);
284 }
285
286 debug!("tracer provider shutdown complete");
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn test_parse_headers_env_empty() {
296 let result = parse_headers_env("");
297 assert!(result.is_empty());
298 }
299
300 #[test]
301 fn test_parse_headers_env_single() {
302 let result = parse_headers_env("key1=value1");
303 assert_eq!(result.len(), 1);
304 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
305 }
306
307 #[test]
308 fn test_parse_headers_env_multiple() {
309 let result = parse_headers_env("key1=value1,key2=value2,key3=value3");
310 assert_eq!(result.len(), 3);
311 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
312 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
313 assert_eq!(result.get("key3"), Some(&"value3".to_string()));
314 }
315
316 #[test]
317 fn test_parse_headers_env_with_spaces() {
318 let result = parse_headers_env("key1 = value1 , key2 = value2");
319 assert_eq!(result.len(), 2);
320 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
321 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
322 }
323
324 #[test]
325 fn test_parse_headers_env_malformed() {
326 let result = parse_headers_env("key1=value1,malformed,key2=value2");
328 assert_eq!(result.len(), 2);
329 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
330 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
331 assert!(!result.contains_key("malformed"));
332 }
333
334 #[test]
335 fn test_headers_to_metadata_empty() {
336 let headers = HashMap::new();
337 let result = headers_to_metadata(&headers);
338 assert!(result.is_ok());
339 let metadata = result.unwrap();
340 assert_eq!(metadata.len(), 0);
341 }
342
343 #[test]
344 fn test_headers_to_metadata_ascii() {
345 let mut headers = HashMap::new();
346 headers.insert("authorization".to_string(), "Bearer token123".to_string());
347 headers.insert("x-custom-header".to_string(), "custom-value".to_string());
348
349 let result = headers_to_metadata(&headers);
350 assert!(result.is_ok());
351 let metadata = result.unwrap();
352 assert_eq!(metadata.len(), 2);
353 }
354
355 #[test]
356 fn test_headers_to_metadata_binary() {
357 let mut headers = HashMap::new();
358 headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
360
361 let result = headers_to_metadata(&headers);
362 assert!(result.is_ok());
363 let metadata = result.unwrap();
364 assert_eq!(metadata.len(), 1);
365 }
366
367 #[test]
368 fn test_headers_to_metadata_invalid_base64() {
369 let mut headers = HashMap::new();
370 headers.insert("custom-bin".to_string(), "not-valid-base64!!!".to_string());
371
372 let result = headers_to_metadata(&headers);
373 assert!(result.is_err());
374 assert!(
375 result
376 .unwrap_err()
377 .to_string()
378 .contains("failed to base64-decode")
379 );
380 }
381
382 #[test]
383 fn test_headers_to_metadata_mixed() {
384 let mut headers = HashMap::new();
385 headers.insert("authorization".to_string(), "Bearer token123".to_string());
386 headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
387
388 let result = headers_to_metadata(&headers);
389 assert!(result.is_ok());
390 let metadata = result.unwrap();
391 assert_eq!(metadata.len(), 2);
392 }
393
394 #[test]
395 fn test_normalize_endpoint_http() {
396 let result = normalize_endpoint("http://localhost:4317".to_string());
397 assert_eq!(result, "http://localhost:4317");
398 }
399
400 #[test]
401 fn test_normalize_endpoint_https() {
402 let result = normalize_endpoint("https://api.example.com:4317".to_string());
403 assert_eq!(result, "https://api.example.com:4317");
404 }
405
406 #[test]
407 fn test_normalize_endpoint_no_scheme() {
408 let result = normalize_endpoint("localhost:4317".to_string());
409 assert_eq!(result, "https://localhost:4317");
410 }
411
412 #[test]
413 fn test_normalize_endpoint_trailing_slash() {
414 let result = normalize_endpoint("api.example.com:4317/".to_string());
415 assert_eq!(result, "https://api.example.com:4317");
416 }
417
418 #[test]
419 fn test_normalize_endpoint_with_path() {
420 let result = normalize_endpoint("https://api.example.com:4317/v1/traces".to_string());
421 assert_eq!(result, "https://api.example.com:4317/v1/traces");
422 }
423
424 #[test]
425 fn test_shutdown_tracer_no_provider() {
426 shutdown_tracer();
428 }
429}