cron_when/cli/
telemetry.rs

1//! # Telemetry Module - Educational Implementation
2//!
3//! This module provides production-grade OpenTelemetry integration for a tiny CLI tool.
4//! **This is intentionally over-engineered for educational purposes!**
5//!
6//! ## Why This Exists
7//!
8//! A simple cron parser doesn't "need" distributed tracing. However, this demonstrates:
9//! 1. How to add observability to any Rust CLI
10//! 2. OpenTelemetry patterns that scale from tiny tools to large systems
11//! 3. Handling async/gRPC in short-lived processes
12//!
13//! ## Key Features
14//!
15//! - **Optional at runtime**: Zero overhead if `OTEL_EXPORTER_OTLP_ENDPOINT` not set
16//! - **Multi-backend support**: Jaeger, Honeycomb, Grafana Tempo, AWS X-Ray, etc.
17//! - **Secure**: TLS, header authentication, compression
18//! - **Production-ready**: Proper resource attributes, propagation, graceful shutdown
19//!
20//! ## Known Limitation: Short-lived Process Challenge
21//!
22//! Short-lived CLIs exit faster than spans can flush:
23//! - CLI execution: ~10ms
24//! - Span flush timeout: 5000ms
25//! - Result: You may see "BatchSpanProcessor.Shutdown.Timeout"
26//!
27//! This is **expected and cosmetic** - spans are still sent asynchronously!
28//!
29//! To suppress: `export RUST_LOG="warn,opentelemetry_sdk=error"`
30//!
31//! ## Usage in Your Own Projects
32//!
33//! 1. Copy this module
34//! 2. Add `#[instrument]` to functions you want to trace
35//! 3. Call `telemetry::init()` at startup
36//! 4. Call `telemetry::shutdown_tracer()` before exit
37//! 5. Set `OTEL_EXPORTER_OTLP_ENDPOINT` when you want tracing
38//!
39//! ## Dependencies Required
40//!
41//! ```toml
42//! opentelemetry = "0.31.0"
43//! opentelemetry-otlp = { version = "0.31.0", features = ["grpc-tonic", "tls"] }
44//! opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] }
45//! tokio = { version = "1", features = ["rt", "macros"] }
46//! tracing = "0.1"
47//! tracing-opentelemetry = "0.32.0"
48//! tracing-subscriber = "0.3"
49//! ```
50
51use anyhow::{Result, anyhow};
52use base64::{Engine, engine::general_purpose};
53use once_cell::sync::OnceCell;
54use opentelemetry::propagation::TextMapCompositePropagator;
55use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
56use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
57use opentelemetry_sdk::{
58    Resource,
59    propagation::{BaggagePropagator, TraceContextPropagator},
60    trace::{SdkTracerProvider, Tracer},
61};
62use std::{collections::HashMap, env::var, time::Duration};
63use tonic::{
64    metadata::{Ascii, Binary, MetadataKey, MetadataMap, MetadataValue},
65    transport::ClientTlsConfig,
66};
67use tracing::{Level, debug};
68use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};
69use ulid::Ulid;
70
71/// Global tracer provider (initialized once)
72static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();
73
74fn parse_headers_env(headers_str: &str) -> HashMap<String, String> {
75    headers_str
76        .split(',')
77        .filter_map(|pair| {
78            let mut parts = pair.splitn(2, '=');
79            let key = parts.next()?.trim().to_string();
80            let value = parts.next()?.trim().to_string();
81            Some((key, value))
82        })
83        .collect()
84}
85
86// Convert HashMap<String, String> into tonic::MetadataMap
87// - Supports ASCII metadata (normal keys)
88// - Supports binary metadata keys (ending with "-bin"), values must be base64-encoded
89fn headers_to_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
90    let mut meta = MetadataMap::with_capacity(headers.len());
91
92    for (k, v) in headers {
93        let key_str = k.to_ascii_lowercase();
94
95        if key_str.ends_with("-bin") {
96            let bytes = general_purpose::STANDARD
97                .decode(v.as_bytes())
98                .map_err(|e| anyhow!("failed to base64-decode value for key {key_str}: {e}"))?;
99
100            let key = MetadataKey::<Binary>::from_bytes(key_str.as_bytes())
101                .map_err(|e| anyhow!("invalid binary metadata key {key_str}: {e}"))?;
102
103            let val = MetadataValue::from_bytes(&bytes);
104            meta.insert_bin(key, val);
105        } else {
106            let key = MetadataKey::<Ascii>::from_bytes(key_str.as_bytes())
107                .map_err(|e| anyhow!("invalid ASCII metadata key {key_str}: {e}"))?;
108
109            let val: MetadataValue<_> = v
110                .parse()
111                .map_err(|e| anyhow!("invalid ASCII metadata value for key {key_str}: {e}"))?;
112            meta.insert(key, val);
113        }
114    }
115
116    Ok(meta)
117}
118
119fn normalize_endpoint(ep: String) -> String {
120    if ep.starts_with("http://") || ep.starts_with("https://") {
121        ep
122    } else {
123        // Default to https for gRPC if no scheme supplied
124        format!("https://{}", ep.trim_end_matches('/'))
125    }
126}
127
128fn init_tracer() -> Result<Tracer> {
129    // We only support gRPC now. If the user set a different protocol, log and ignore.
130    if let Ok(proto) = var("OTEL_EXPORTER_OTLP_PROTOCOL")
131        && proto != "grpc"
132    {
133        debug!(
134            "OTEL_EXPORTER_OTLP_PROTOCOL='{}' ignored: only 'grpc' is supported now",
135            proto
136        );
137    }
138
139    // gRPC sensible default
140    let default_ep = "http://localhost:4317";
141    let endpoint = var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_ep.to_string());
142    let endpoint = normalize_endpoint(endpoint);
143
144    let headers = var("OTEL_EXPORTER_OTLP_HEADERS")
145        .ok()
146        .map(|s| parse_headers_env(&s))
147        .unwrap_or_default();
148
149    // Build gRPC exporter
150    let mut builder = opentelemetry_otlp::SpanExporter::builder()
151        .with_tonic()
152        .with_endpoint(&endpoint)
153        .with_compression(Compression::Gzip)
154        .with_timeout(Duration::from_secs(3));
155
156    // TLS (https) support
157    if let Some(host) = endpoint
158        .strip_prefix("https://")
159        .and_then(|s| s.split('/').next())
160        .and_then(|h| h.split(':').next())
161    {
162        let tls = ClientTlsConfig::new()
163            .domain_name(host.to_string())
164            .with_webpki_roots();
165        builder = builder.with_tls_config(tls);
166    }
167
168    if !headers.is_empty() {
169        let metadata = headers_to_metadata(&headers)?;
170        builder = builder.with_metadata(metadata);
171    }
172
173    let exporter = builder.build()?;
174
175    // Generate or take service.instance.id
176    let instance_id = var("OTEL_SERVICE_INSTANCE_ID").unwrap_or_else(|_| Ulid::new().to_string());
177
178    let trace_provider = SdkTracerProvider::builder()
179        .with_batch_exporter(exporter)
180        .with_resource(
181            Resource::builder_empty()
182                .with_attributes(vec![
183                    KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
184                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
185                    KeyValue::new("service.instance.id", instance_id),
186                ])
187                .build(),
188        )
189        .build();
190
191    // Store provider for later shutdown
192    let stored = trace_provider.clone();
193    let _ = TRACER_PROVIDER.set(stored);
194
195    // Register globally
196    global::set_tracer_provider(trace_provider.clone());
197    global::set_text_map_propagator(TextMapCompositePropagator::new(vec![
198        Box::new(TraceContextPropagator::new()),
199        Box::new(BaggagePropagator::new()),
200    ]));
201
202    Ok(trace_provider.tracer(env!("CARGO_PKG_NAME")))
203}
204
205/// Initialize logging + (optional) tracing exporter
206///
207/// Tracing is enabled if `OTEL_EXPORTER_OTLP_ENDPOINT` is set (gRPC only).
208///
209/// # Errors
210///
211/// Returns an error if tracer initialization or subscriber setup fails
212pub fn init(verbosity_level: Option<Level>) -> Result<()> {
213    let verbosity_level = verbosity_level.unwrap_or(Level::ERROR);
214
215    let fmt_layer = fmt::layer()
216        .with_file(false)
217        .with_line_number(false)
218        .with_thread_ids(false)
219        .with_thread_names(false)
220        .with_target(false)
221        .pretty();
222
223    let filter = EnvFilter::builder()
224        .with_default_directive(verbosity_level.into())
225        .from_env_lossy()
226        .add_directive("hyper=error".parse()?)
227        .add_directive("tokio=error".parse()?)
228        .add_directive("opentelemetry_sdk=warn".parse()?);
229
230    if var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
231        let tracer = init_tracer()?;
232        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
233
234        let subscriber = Registry::default()
235            .with(fmt_layer)
236            .with(otel_layer)
237            .with(filter);
238        tracing::subscriber::set_global_default(subscriber)?;
239    } else {
240        let subscriber = Registry::default().with(fmt_layer).with(filter);
241        tracing::subscriber::set_global_default(subscriber)?;
242    }
243
244    Ok(())
245}
246
247/// Gracefully shut down tracer provider (noop if not initialized)
248///
249/// ## Short-lived Process Challenge
250///
251/// This function attempts to flush spans before exit, but for short-lived CLIs
252/// (execution time ~10ms), the flush operation may timeout (needs ~5000ms).
253///
254/// **Expected behavior:**
255/// - `force_flush()` sends pending spans (may timeout)
256/// - `shutdown()` cleans up resources (may timeout)  
257/// - Timeout errors are **cosmetic** - spans are sent asynchronously
258/// - Traces still appear in your observability backend!
259///
260/// **To suppress timeout errors:**
261/// ```bash
262/// export RUST_LOG="warn,opentelemetry_sdk=error"
263/// ```
264///
265/// ## Why This Happens
266///
267/// `OpenTelemetry` uses a `BatchSpanProcessor` which batches spans for efficiency.
268/// For long-running services this is perfect. For CLIs that exit in milliseconds,
269/// we can't wait for the full batch timeout.
270///
271/// ## Alternative Solutions Not Used Here
272///
273/// 1. **`SimpleSpanProcessor`**: Sends each span immediately (slower, no batching)
274/// 2. **Longer sleep**: Wait 5+ seconds before exit (defeats CLI speed)
275/// 3. **Fire and forget**: Don't call shutdown (proper cleanup is better)
276///
277/// We accept the cosmetic timeout for educational purposes to show the "proper"
278/// way to shutdown, even though it's imperfect for short-lived processes.
279pub fn shutdown_tracer() {
280    if let Some(tp) = TRACER_PROVIDER.get() {
281        debug!("flushing and shutting down tracer provider");
282
283        // Force flush all pending spans
284        // Note: May timeout for short-lived CLIs, but spans are sent anyway
285        if let Err(e) = tp.force_flush() {
286            eprintln!("Failed to flush spans: {e}");
287        }
288
289        // Shutdown the provider
290        if let Err(e) = tp.shutdown() {
291            eprintln!("Failed to shutdown tracer provider: {e}");
292        }
293
294        debug!("tracer provider shutdown complete");
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn test_parse_headers_env_empty() {
304        let result = parse_headers_env("");
305        assert!(result.is_empty());
306    }
307
308    #[test]
309    fn test_parse_headers_env_single() {
310        let result = parse_headers_env("key1=value1");
311        assert_eq!(result.len(), 1);
312        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
313    }
314
315    #[test]
316    fn test_parse_headers_env_multiple() {
317        let result = parse_headers_env("key1=value1,key2=value2,key3=value3");
318        assert_eq!(result.len(), 3);
319        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
320        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
321        assert_eq!(result.get("key3"), Some(&"value3".to_string()));
322    }
323
324    #[test]
325    fn test_parse_headers_env_with_spaces() {
326        let result = parse_headers_env("key1 = value1 , key2 = value2");
327        assert_eq!(result.len(), 2);
328        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
329        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
330    }
331
332    #[test]
333    fn test_parse_headers_env_malformed() {
334        // Missing values should be filtered out
335        let result = parse_headers_env("key1=value1,malformed,key2=value2");
336        assert_eq!(result.len(), 2);
337        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
338        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
339        assert!(!result.contains_key("malformed"));
340    }
341
342    #[test]
343    fn test_headers_to_metadata_empty() {
344        let headers = HashMap::new();
345        let result = headers_to_metadata(&headers);
346        assert!(result.is_ok());
347        if let Ok(metadata) = result {
348            assert_eq!(metadata.len(), 0);
349        }
350    }
351
352    #[test]
353    fn test_headers_to_metadata_ascii() {
354        let mut headers = HashMap::new();
355        headers.insert("authorization".to_string(), "Bearer token123".to_string());
356        headers.insert("x-custom-header".to_string(), "custom-value".to_string());
357
358        let result = headers_to_metadata(&headers);
359        assert!(result.is_ok());
360        if let Ok(metadata) = result {
361            assert_eq!(metadata.len(), 2);
362        }
363    }
364
365    #[test]
366    fn test_headers_to_metadata_binary() {
367        let mut headers = HashMap::new();
368        // Base64 encoded "binary data"
369        headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
370
371        let result = headers_to_metadata(&headers);
372        assert!(result.is_ok());
373        if let Ok(metadata) = result {
374            assert_eq!(metadata.len(), 1);
375        }
376    }
377
378    #[test]
379    fn test_headers_to_metadata_invalid_base64() {
380        let mut headers = HashMap::new();
381        headers.insert("custom-bin".to_string(), "not-valid-base64!!!".to_string());
382
383        let result = headers_to_metadata(&headers);
384        assert!(result.is_err());
385        if let Err(e) = result {
386            assert!(e.to_string().contains("failed to base64-decode"));
387        }
388    }
389
390    #[test]
391    fn test_headers_to_metadata_mixed() {
392        let mut headers = HashMap::new();
393        headers.insert("authorization".to_string(), "Bearer token123".to_string());
394        headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
395
396        let result = headers_to_metadata(&headers);
397        assert!(result.is_ok());
398        if let Ok(metadata) = result {
399            assert_eq!(metadata.len(), 2);
400        }
401    }
402
403    #[test]
404    fn test_normalize_endpoint_http() {
405        let result = normalize_endpoint("http://localhost:4317".to_string());
406        assert_eq!(result, "http://localhost:4317");
407    }
408
409    #[test]
410    fn test_normalize_endpoint_https() {
411        let result = normalize_endpoint("https://api.example.com:4317".to_string());
412        assert_eq!(result, "https://api.example.com:4317");
413    }
414
415    #[test]
416    fn test_normalize_endpoint_no_scheme() {
417        let result = normalize_endpoint("localhost:4317".to_string());
418        assert_eq!(result, "https://localhost:4317");
419    }
420
421    #[test]
422    fn test_normalize_endpoint_trailing_slash() {
423        let result = normalize_endpoint("api.example.com:4317/".to_string());
424        assert_eq!(result, "https://api.example.com:4317");
425    }
426
427    #[test]
428    fn test_normalize_endpoint_with_path() {
429        let result = normalize_endpoint("https://api.example.com:4317/v1/traces".to_string());
430        assert_eq!(result, "https://api.example.com:4317/v1/traces");
431    }
432
433    #[test]
434    fn test_shutdown_tracer_no_provider() {
435        // Should not panic when no provider is initialized
436        shutdown_tracer();
437    }
438}