cron_when/cli/
telemetry.rs1use anyhow::{Result, anyhow};
52use base64::{Engine, engine::general_purpose};
53use once_cell::sync::OnceCell;
54use opentelemetry::propagation::TextMapCompositePropagator;
55use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
56use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
57use opentelemetry_sdk::{
58 Resource,
59 propagation::{BaggagePropagator, TraceContextPropagator},
60 trace::{SdkTracerProvider, Tracer},
61};
62use std::{collections::HashMap, env::var, time::Duration};
63use tonic::{
64 metadata::{Ascii, Binary, MetadataKey, MetadataMap, MetadataValue},
65 transport::ClientTlsConfig,
66};
67use tracing::{Level, debug};
68use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};
69use ulid::Ulid;
70
71static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();
73
74fn parse_headers_env(headers_str: &str) -> HashMap<String, String> {
75 headers_str
76 .split(',')
77 .filter_map(|pair| {
78 let mut parts = pair.splitn(2, '=');
79 let key = parts.next()?.trim().to_string();
80 let value = parts.next()?.trim().to_string();
81 Some((key, value))
82 })
83 .collect()
84}
85
86fn headers_to_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
90 let mut meta = MetadataMap::with_capacity(headers.len());
91
92 for (k, v) in headers {
93 let key_str = k.to_ascii_lowercase();
94
95 if key_str.ends_with("-bin") {
96 let bytes = general_purpose::STANDARD
97 .decode(v.as_bytes())
98 .map_err(|e| anyhow!("failed to base64-decode value for key {key_str}: {e}"))?;
99
100 let key = MetadataKey::<Binary>::from_bytes(key_str.as_bytes())
101 .map_err(|e| anyhow!("invalid binary metadata key {key_str}: {e}"))?;
102
103 let val = MetadataValue::from_bytes(&bytes);
104 meta.insert_bin(key, val);
105 } else {
106 let key = MetadataKey::<Ascii>::from_bytes(key_str.as_bytes())
107 .map_err(|e| anyhow!("invalid ASCII metadata key {key_str}: {e}"))?;
108
109 let val: MetadataValue<_> = v
110 .parse()
111 .map_err(|e| anyhow!("invalid ASCII metadata value for key {key_str}: {e}"))?;
112 meta.insert(key, val);
113 }
114 }
115
116 Ok(meta)
117}
118
119fn normalize_endpoint(ep: String) -> String {
120 if ep.starts_with("http://") || ep.starts_with("https://") {
121 ep
122 } else {
123 format!("https://{}", ep.trim_end_matches('/'))
125 }
126}
127
128fn init_tracer() -> Result<Tracer> {
129 if let Ok(proto) = var("OTEL_EXPORTER_OTLP_PROTOCOL")
131 && proto != "grpc"
132 {
133 debug!(
134 "OTEL_EXPORTER_OTLP_PROTOCOL='{}' ignored: only 'grpc' is supported now",
135 proto
136 );
137 }
138
139 let default_ep = "http://localhost:4317";
141 let endpoint = var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_ep.to_string());
142 let endpoint = normalize_endpoint(endpoint);
143
144 let headers = var("OTEL_EXPORTER_OTLP_HEADERS")
145 .ok()
146 .map(|s| parse_headers_env(&s))
147 .unwrap_or_default();
148
149 let mut builder = opentelemetry_otlp::SpanExporter::builder()
151 .with_tonic()
152 .with_endpoint(&endpoint)
153 .with_compression(Compression::Gzip)
154 .with_timeout(Duration::from_secs(3));
155
156 if let Some(host) = endpoint
158 .strip_prefix("https://")
159 .and_then(|s| s.split('/').next())
160 .and_then(|h| h.split(':').next())
161 {
162 let tls = ClientTlsConfig::new()
163 .domain_name(host.to_string())
164 .with_webpki_roots();
165 builder = builder.with_tls_config(tls);
166 }
167
168 if !headers.is_empty() {
169 let metadata = headers_to_metadata(&headers)?;
170 builder = builder.with_metadata(metadata);
171 }
172
173 let exporter = builder.build()?;
174
175 let instance_id = var("OTEL_SERVICE_INSTANCE_ID").unwrap_or_else(|_| Ulid::new().to_string());
177
178 let trace_provider = SdkTracerProvider::builder()
179 .with_batch_exporter(exporter)
180 .with_resource(
181 Resource::builder_empty()
182 .with_attributes(vec![
183 KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
184 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
185 KeyValue::new("service.instance.id", instance_id),
186 ])
187 .build(),
188 )
189 .build();
190
191 let stored = trace_provider.clone();
193 let _ = TRACER_PROVIDER.set(stored);
194
195 global::set_tracer_provider(trace_provider.clone());
197 global::set_text_map_propagator(TextMapCompositePropagator::new(vec![
198 Box::new(TraceContextPropagator::new()),
199 Box::new(BaggagePropagator::new()),
200 ]));
201
202 Ok(trace_provider.tracer(env!("CARGO_PKG_NAME")))
203}
204
205pub fn init(verbosity_level: Option<Level>) -> Result<()> {
213 let verbosity_level = verbosity_level.unwrap_or(Level::ERROR);
214
215 let fmt_layer = fmt::layer()
216 .with_file(false)
217 .with_line_number(false)
218 .with_thread_ids(false)
219 .with_thread_names(false)
220 .with_target(false)
221 .pretty();
222
223 let filter = EnvFilter::builder()
224 .with_default_directive(verbosity_level.into())
225 .from_env_lossy()
226 .add_directive("hyper=error".parse()?)
227 .add_directive("tokio=error".parse()?)
228 .add_directive("opentelemetry_sdk=warn".parse()?);
229
230 if var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
231 let tracer = init_tracer()?;
232 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
233
234 let subscriber = Registry::default()
235 .with(fmt_layer)
236 .with(otel_layer)
237 .with(filter);
238 tracing::subscriber::set_global_default(subscriber)?;
239 } else {
240 let subscriber = Registry::default().with(fmt_layer).with(filter);
241 tracing::subscriber::set_global_default(subscriber)?;
242 }
243
244 Ok(())
245}
246
247pub fn shutdown_tracer() {
280 if let Some(tp) = TRACER_PROVIDER.get() {
281 debug!("flushing and shutting down tracer provider");
282
283 if let Err(e) = tp.force_flush() {
286 eprintln!("Failed to flush spans: {e}");
287 }
288
289 if let Err(e) = tp.shutdown() {
291 eprintln!("Failed to shutdown tracer provider: {e}");
292 }
293
294 debug!("tracer provider shutdown complete");
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn test_parse_headers_env_empty() {
304 let result = parse_headers_env("");
305 assert!(result.is_empty());
306 }
307
308 #[test]
309 fn test_parse_headers_env_single() {
310 let result = parse_headers_env("key1=value1");
311 assert_eq!(result.len(), 1);
312 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
313 }
314
315 #[test]
316 fn test_parse_headers_env_multiple() {
317 let result = parse_headers_env("key1=value1,key2=value2,key3=value3");
318 assert_eq!(result.len(), 3);
319 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
320 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
321 assert_eq!(result.get("key3"), Some(&"value3".to_string()));
322 }
323
324 #[test]
325 fn test_parse_headers_env_with_spaces() {
326 let result = parse_headers_env("key1 = value1 , key2 = value2");
327 assert_eq!(result.len(), 2);
328 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
329 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
330 }
331
332 #[test]
333 fn test_parse_headers_env_malformed() {
334 let result = parse_headers_env("key1=value1,malformed,key2=value2");
336 assert_eq!(result.len(), 2);
337 assert_eq!(result.get("key1"), Some(&"value1".to_string()));
338 assert_eq!(result.get("key2"), Some(&"value2".to_string()));
339 assert!(!result.contains_key("malformed"));
340 }
341
342 #[test]
343 fn test_headers_to_metadata_empty() {
344 let headers = HashMap::new();
345 let result = headers_to_metadata(&headers);
346 assert!(result.is_ok());
347 if let Ok(metadata) = result {
348 assert_eq!(metadata.len(), 0);
349 }
350 }
351
352 #[test]
353 fn test_headers_to_metadata_ascii() {
354 let mut headers = HashMap::new();
355 headers.insert("authorization".to_string(), "Bearer token123".to_string());
356 headers.insert("x-custom-header".to_string(), "custom-value".to_string());
357
358 let result = headers_to_metadata(&headers);
359 assert!(result.is_ok());
360 if let Ok(metadata) = result {
361 assert_eq!(metadata.len(), 2);
362 }
363 }
364
365 #[test]
366 fn test_headers_to_metadata_binary() {
367 let mut headers = HashMap::new();
368 headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
370
371 let result = headers_to_metadata(&headers);
372 assert!(result.is_ok());
373 if let Ok(metadata) = result {
374 assert_eq!(metadata.len(), 1);
375 }
376 }
377
378 #[test]
379 fn test_headers_to_metadata_invalid_base64() {
380 let mut headers = HashMap::new();
381 headers.insert("custom-bin".to_string(), "not-valid-base64!!!".to_string());
382
383 let result = headers_to_metadata(&headers);
384 assert!(result.is_err());
385 if let Err(e) = result {
386 assert!(e.to_string().contains("failed to base64-decode"));
387 }
388 }
389
390 #[test]
391 fn test_headers_to_metadata_mixed() {
392 let mut headers = HashMap::new();
393 headers.insert("authorization".to_string(), "Bearer token123".to_string());
394 headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
395
396 let result = headers_to_metadata(&headers);
397 assert!(result.is_ok());
398 if let Ok(metadata) = result {
399 assert_eq!(metadata.len(), 2);
400 }
401 }
402
403 #[test]
404 fn test_normalize_endpoint_http() {
405 let result = normalize_endpoint("http://localhost:4317".to_string());
406 assert_eq!(result, "http://localhost:4317");
407 }
408
409 #[test]
410 fn test_normalize_endpoint_https() {
411 let result = normalize_endpoint("https://api.example.com:4317".to_string());
412 assert_eq!(result, "https://api.example.com:4317");
413 }
414
415 #[test]
416 fn test_normalize_endpoint_no_scheme() {
417 let result = normalize_endpoint("localhost:4317".to_string());
418 assert_eq!(result, "https://localhost:4317");
419 }
420
421 #[test]
422 fn test_normalize_endpoint_trailing_slash() {
423 let result = normalize_endpoint("api.example.com:4317/".to_string());
424 assert_eq!(result, "https://api.example.com:4317");
425 }
426
427 #[test]
428 fn test_normalize_endpoint_with_path() {
429 let result = normalize_endpoint("https://api.example.com:4317/v1/traces".to_string());
430 assert_eq!(result, "https://api.example.com:4317/v1/traces");
431 }
432
433 #[test]
434 fn test_shutdown_tracer_no_provider() {
435 shutdown_tracer();
437 }
438}