cron_when/cli/
telemetry.rs

1//! # Telemetry Module - Educational Implementation
2//!
3//! This module provides production-grade OpenTelemetry integration for a tiny CLI tool.
4//! **This is intentionally over-engineered for educational purposes!**
5//!
6//! ## Why This Exists
7//!
8//! A simple cron parser doesn't "need" distributed tracing. However, this demonstrates:
9//! 1. How to add observability to any Rust CLI
10//! 2. OpenTelemetry patterns that scale from tiny tools to large systems
11//! 3. Handling async/gRPC in short-lived processes
12//!
13//! ## Key Features
14//!
15//! - **Optional at runtime**: Zero overhead if `OTEL_EXPORTER_OTLP_ENDPOINT` not set
16//! - **Multi-backend support**: Jaeger, Honeycomb, Grafana Tempo, AWS X-Ray, etc.
17//! - **Secure**: TLS, header authentication, compression
18//! - **Production-ready**: Proper resource attributes, propagation, graceful shutdown
19//!
20//! ## Known Limitation: Short-lived Process Challenge
21//!
22//! Short-lived CLIs exit faster than spans can flush:
23//! - CLI execution: ~10ms
24//! - Span flush timeout: 5000ms
25//! - Result: You may see "BatchSpanProcessor.Shutdown.Timeout"
26//!
27//! This is **expected and cosmetic** - spans are still sent asynchronously!
28//!
29//! To suppress: `export RUST_LOG="warn,opentelemetry_sdk=error"`
30//!
31//! ## Usage in Your Own Projects
32//!
33//! 1. Copy this module
34//! 2. Add `#[instrument]` to functions you want to trace
35//! 3. Call `telemetry::init()` at startup
36//! 4. Call `telemetry::shutdown_tracer()` before exit
37//! 5. Set `OTEL_EXPORTER_OTLP_ENDPOINT` when you want tracing
38//!
39//! ## Dependencies Required
40//!
41//! ```toml
42//! opentelemetry = "0.31.0"
43//! opentelemetry-otlp = { version = "0.31.0", features = ["grpc-tonic", "tls"] }
44//! opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] }
45//! tokio = { version = "1", features = ["rt", "macros"] }
46//! tracing = "0.1"
47//! tracing-opentelemetry = "0.32.0"
48//! tracing-subscriber = "0.3"
49//! ```
50
51use anyhow::{Result, anyhow};
52use base64::{Engine, engine::general_purpose};
53use once_cell::sync::OnceCell;
54use opentelemetry::propagation::TextMapCompositePropagator;
55use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
56use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
57use opentelemetry_sdk::{
58    Resource,
59    propagation::{BaggagePropagator, TraceContextPropagator},
60    trace::{SdkTracerProvider, Tracer},
61};
62use std::{collections::HashMap, env::var, time::Duration};
63use tonic::{metadata::*, transport::ClientTlsConfig};
64use tracing::{Level, debug};
65use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};
66use ulid::Ulid;
67
68/// Global tracer provider (initialized once)
69static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();
70
71fn parse_headers_env(headers_str: &str) -> HashMap<String, String> {
72    headers_str
73        .split(',')
74        .filter_map(|pair| {
75            let mut parts = pair.splitn(2, '=');
76            let key = parts.next()?.trim().to_string();
77            let value = parts.next()?.trim().to_string();
78            Some((key, value))
79        })
80        .collect()
81}
82
83// Convert HashMap<String, String> into tonic::MetadataMap
84// - Supports ASCII metadata (normal keys)
85// - Supports binary metadata keys (ending with "-bin"), values must be base64-encoded
86fn headers_to_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
87    let mut meta = MetadataMap::with_capacity(headers.len());
88
89    for (k, v) in headers {
90        let key_str = k.to_ascii_lowercase();
91
92        if key_str.ends_with("-bin") {
93            let bytes = general_purpose::STANDARD
94                .decode(v.as_bytes())
95                .map_err(|e| anyhow!("failed to base64-decode value for key {}: {}", key_str, e))?;
96
97            let key = MetadataKey::<Binary>::from_bytes(key_str.as_bytes())
98                .map_err(|e| anyhow!("invalid binary metadata key {}: {}", key_str, e))?;
99
100            let val = MetadataValue::from_bytes(&bytes);
101            meta.insert_bin(key, val);
102        } else {
103            let key = MetadataKey::<Ascii>::from_bytes(key_str.as_bytes())
104                .map_err(|e| anyhow!("invalid ASCII metadata key {}: {}", key_str, e))?;
105
106            let val: MetadataValue<_> = v
107                .parse()
108                .map_err(|e| anyhow!("invalid ASCII metadata value for key {}: {}", key_str, e))?;
109            meta.insert(key, val);
110        }
111    }
112
113    Ok(meta)
114}
115
116fn normalize_endpoint(ep: String) -> String {
117    if ep.starts_with("http://") || ep.starts_with("https://") {
118        ep
119    } else {
120        // Default to https for gRPC if no scheme supplied
121        format!("https://{}", ep.trim_end_matches('/'))
122    }
123}
124
125fn init_tracer() -> Result<Tracer> {
126    // We only support gRPC now. If the user set a different protocol, log and ignore.
127    if let Ok(proto) = var("OTEL_EXPORTER_OTLP_PROTOCOL")
128        && proto != "grpc"
129    {
130        debug!(
131            "OTEL_EXPORTER_OTLP_PROTOCOL='{}' ignored: only 'grpc' is supported now",
132            proto
133        );
134    }
135
136    // gRPC sensible default
137    let default_ep = "http://localhost:4317";
138    let endpoint = var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_ep.to_string());
139    let endpoint = normalize_endpoint(endpoint);
140
141    let headers = var("OTEL_EXPORTER_OTLP_HEADERS")
142        .ok()
143        .map(|s| parse_headers_env(&s))
144        .unwrap_or_default();
145
146    // Build gRPC exporter
147    let mut builder = opentelemetry_otlp::SpanExporter::builder()
148        .with_tonic()
149        .with_endpoint(&endpoint)
150        .with_compression(Compression::Gzip)
151        .with_timeout(Duration::from_secs(3));
152
153    // TLS (https) support
154    if let Some(host) = endpoint
155        .strip_prefix("https://")
156        .and_then(|s| s.split('/').next())
157        .and_then(|h| h.split(':').next())
158    {
159        let tls = ClientTlsConfig::new()
160            .domain_name(host.to_string())
161            .with_native_roots();
162        builder = builder.with_tls_config(tls);
163    }
164
165    if !headers.is_empty() {
166        let metadata = headers_to_metadata(&headers)?;
167        builder = builder.with_metadata(metadata);
168    }
169
170    let exporter = builder.build()?;
171
172    // Generate or take service.instance.id
173    let instance_id = var("OTEL_SERVICE_INSTANCE_ID").unwrap_or_else(|_| Ulid::new().to_string());
174
175    let trace_provider = SdkTracerProvider::builder()
176        .with_batch_exporter(exporter)
177        .with_resource(
178            Resource::builder_empty()
179                .with_attributes(vec![
180                    KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
181                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
182                    KeyValue::new("service.instance.id", instance_id),
183                ])
184                .build(),
185        )
186        .build();
187
188    // Store provider for later shutdown
189    let stored = trace_provider.clone();
190    let _ = TRACER_PROVIDER.set(stored);
191
192    // Register globally
193    global::set_tracer_provider(trace_provider.clone());
194    global::set_text_map_propagator(TextMapCompositePropagator::new(vec![
195        Box::new(TraceContextPropagator::new()),
196        Box::new(BaggagePropagator::new()),
197    ]));
198
199    Ok(trace_provider.tracer(env!("CARGO_PKG_NAME")))
200}
201
202/// Initialize logging + (optional) tracing exporter
203/// Tracing is enabled if OTEL_EXPORTER_OTLP_ENDPOINT is set (gRPC only).
204pub fn init(verbosity_level: Option<Level>) -> Result<()> {
205    let verbosity_level = verbosity_level.unwrap_or(Level::ERROR);
206
207    let fmt_layer = fmt::layer()
208        .with_file(false)
209        .with_line_number(false)
210        .with_thread_ids(false)
211        .with_thread_names(false)
212        .with_target(false)
213        .pretty();
214
215    let filter = EnvFilter::builder()
216        .with_default_directive(verbosity_level.into())
217        .from_env_lossy()
218        .add_directive("hyper=error".parse()?)
219        .add_directive("tokio=error".parse()?)
220        .add_directive("opentelemetry_sdk=warn".parse()?);
221
222    if var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
223        let tracer = init_tracer()?;
224        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
225
226        let subscriber = Registry::default()
227            .with(fmt_layer)
228            .with(otel_layer)
229            .with(filter);
230        tracing::subscriber::set_global_default(subscriber)?;
231    } else {
232        let subscriber = Registry::default().with(fmt_layer).with(filter);
233        tracing::subscriber::set_global_default(subscriber)?;
234    }
235
236    Ok(())
237}
238
239/// Gracefully shut down tracer provider (noop if not initialized)
240///
241/// ## Short-lived Process Challenge
242///
243/// This function attempts to flush spans before exit, but for short-lived CLIs
244/// (execution time ~10ms), the flush operation may timeout (needs ~5000ms).
245///
246/// **Expected behavior:**
247/// - `force_flush()` sends pending spans (may timeout)
248/// - `shutdown()` cleans up resources (may timeout)  
249/// - Timeout errors are **cosmetic** - spans are sent asynchronously
250/// - Traces still appear in your observability backend!
251///
252/// **To suppress timeout errors:**
253/// ```bash
254/// export RUST_LOG="warn,opentelemetry_sdk=error"
255/// ```
256///
257/// ## Why This Happens
258///
259/// OpenTelemetry uses a BatchSpanProcessor which batches spans for efficiency.
260/// For long-running services this is perfect. For CLIs that exit in milliseconds,
261/// we can't wait for the full batch timeout.
262///
263/// ## Alternative Solutions Not Used Here
264///
265/// 1. **SimpleSpanProcessor**: Sends each span immediately (slower, no batching)
266/// 2. **Longer sleep**: Wait 5+ seconds before exit (defeats CLI speed)
267/// 3. **Fire and forget**: Don't call shutdown (proper cleanup is better)
268///
269/// We accept the cosmetic timeout for educational purposes to show the "proper"
270/// way to shutdown, even though it's imperfect for short-lived processes.
271pub fn shutdown_tracer() {
272    if let Some(tp) = TRACER_PROVIDER.get() {
273        debug!("flushing and shutting down tracer provider");
274
275        // Force flush all pending spans
276        // Note: May timeout for short-lived CLIs, but spans are sent anyway
277        if let Err(e) = tp.force_flush() {
278            eprintln!("Failed to flush spans: {}", e);
279        }
280
281        // Shutdown the provider
282        if let Err(e) = tp.shutdown() {
283            eprintln!("Failed to shutdown tracer provider: {}", e);
284        }
285
286        debug!("tracer provider shutdown complete");
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_parse_headers_env_empty() {
296        let result = parse_headers_env("");
297        assert!(result.is_empty());
298    }
299
300    #[test]
301    fn test_parse_headers_env_single() {
302        let result = parse_headers_env("key1=value1");
303        assert_eq!(result.len(), 1);
304        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
305    }
306
307    #[test]
308    fn test_parse_headers_env_multiple() {
309        let result = parse_headers_env("key1=value1,key2=value2,key3=value3");
310        assert_eq!(result.len(), 3);
311        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
312        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
313        assert_eq!(result.get("key3"), Some(&"value3".to_string()));
314    }
315
316    #[test]
317    fn test_parse_headers_env_with_spaces() {
318        let result = parse_headers_env("key1 = value1 , key2 = value2");
319        assert_eq!(result.len(), 2);
320        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
321        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
322    }
323
324    #[test]
325    fn test_parse_headers_env_malformed() {
326        // Missing values should be filtered out
327        let result = parse_headers_env("key1=value1,malformed,key2=value2");
328        assert_eq!(result.len(), 2);
329        assert_eq!(result.get("key1"), Some(&"value1".to_string()));
330        assert_eq!(result.get("key2"), Some(&"value2".to_string()));
331        assert!(!result.contains_key("malformed"));
332    }
333
334    #[test]
335    fn test_headers_to_metadata_empty() {
336        let headers = HashMap::new();
337        let result = headers_to_metadata(&headers);
338        assert!(result.is_ok());
339        let metadata = result.unwrap();
340        assert_eq!(metadata.len(), 0);
341    }
342
343    #[test]
344    fn test_headers_to_metadata_ascii() {
345        let mut headers = HashMap::new();
346        headers.insert("authorization".to_string(), "Bearer token123".to_string());
347        headers.insert("x-custom-header".to_string(), "custom-value".to_string());
348
349        let result = headers_to_metadata(&headers);
350        assert!(result.is_ok());
351        let metadata = result.unwrap();
352        assert_eq!(metadata.len(), 2);
353    }
354
355    #[test]
356    fn test_headers_to_metadata_binary() {
357        let mut headers = HashMap::new();
358        // Base64 encoded "binary data"
359        headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
360
361        let result = headers_to_metadata(&headers);
362        assert!(result.is_ok());
363        let metadata = result.unwrap();
364        assert_eq!(metadata.len(), 1);
365    }
366
367    #[test]
368    fn test_headers_to_metadata_invalid_base64() {
369        let mut headers = HashMap::new();
370        headers.insert("custom-bin".to_string(), "not-valid-base64!!!".to_string());
371
372        let result = headers_to_metadata(&headers);
373        assert!(result.is_err());
374        assert!(
375            result
376                .unwrap_err()
377                .to_string()
378                .contains("failed to base64-decode")
379        );
380    }
381
382    #[test]
383    fn test_headers_to_metadata_mixed() {
384        let mut headers = HashMap::new();
385        headers.insert("authorization".to_string(), "Bearer token123".to_string());
386        headers.insert("custom-bin".to_string(), "YmluYXJ5IGRhdGE=".to_string());
387
388        let result = headers_to_metadata(&headers);
389        assert!(result.is_ok());
390        let metadata = result.unwrap();
391        assert_eq!(metadata.len(), 2);
392    }
393
394    #[test]
395    fn test_normalize_endpoint_http() {
396        let result = normalize_endpoint("http://localhost:4317".to_string());
397        assert_eq!(result, "http://localhost:4317");
398    }
399
400    #[test]
401    fn test_normalize_endpoint_https() {
402        let result = normalize_endpoint("https://api.example.com:4317".to_string());
403        assert_eq!(result, "https://api.example.com:4317");
404    }
405
406    #[test]
407    fn test_normalize_endpoint_no_scheme() {
408        let result = normalize_endpoint("localhost:4317".to_string());
409        assert_eq!(result, "https://localhost:4317");
410    }
411
412    #[test]
413    fn test_normalize_endpoint_trailing_slash() {
414        let result = normalize_endpoint("api.example.com:4317/".to_string());
415        assert_eq!(result, "https://api.example.com:4317");
416    }
417
418    #[test]
419    fn test_normalize_endpoint_with_path() {
420        let result = normalize_endpoint("https://api.example.com:4317/v1/traces".to_string());
421        assert_eq!(result, "https://api.example.com:4317/v1/traces");
422    }
423
424    #[test]
425    fn test_shutdown_tracer_no_provider() {
426        // Should not panic when no provider is initialized
427        shutdown_tracer();
428    }
429}