Skip to main content

rust_web_server/otel/
mod.rs

1//! OpenTelemetry-compatible distributed tracing.
2//!
3//! [`OtelLayer`] is a [`Middleware`] that:
4//! 1. Reads the W3C `traceparent` header from incoming requests and continues
5//!    an existing trace, or starts a fresh one.
6//! 2. Creates an HTTP server span with standard semantic attributes.
7//! 3. Stores the active span context in thread-local storage so downstream
8//!    middleware (e.g. [`crate::proxy::ReverseProxy`]) can propagate it.
9//! 4. Records the completed span to a configurable exporter.
10//!
11//! # Quick start
12//!
13//! ```rust,no_run
14//! use rust_web_server::app::App;
15//! use rust_web_server::core::New;
16//! use rust_web_server::otel::{OtelLayer, TracingConfig, ExporterConfig};
17//!
18//! // Dev: print spans to stdout.
19//! rust_web_server::otel::setup(TracingConfig {
20//!     service_name: "my-service".to_string(),
21//!     service_version: env!("CARGO_PKG_VERSION").to_string(),
22//!     exporter: ExporterConfig::Stdout,
23//!     sample_rate: 1.0,
24//!     batch_size: 128,
25//! });
26//!
27//! let app = App::new().wrap(OtelLayer);
28//! ```
29//!
30//! # Production: OTLP HTTP export
31//!
32//! ```rust,no_run
33//! use rust_web_server::otel::{ExporterConfig, TracingConfig};
34//!
35//! rust_web_server::otel::setup(TracingConfig {
36//!     service_name: "my-service".to_string(),
37//!     service_version: "1.0.0".to_string(),
38//!     exporter: ExporterConfig::Otlp {
39//!         endpoint: "http://localhost:4318".to_string(),
40//!     },
41//!     sample_rate: 0.1,
42//!     batch_size: 512,
43//! });
44//! ```
45//!
46//! Alternatively, set environment variables before calling [`setup_from_env`]:
47//!
48//! ```text
49//! OTEL_SERVICE_NAME=my-service
50//! OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
51//! OTEL_TRACES_SAMPLER_ARG=0.1    # sample rate 0.0–1.0 (default 1.0)
52//! ```
53
54#[cfg(test)]
55mod tests;
56
57use std::cell::Cell;
58use std::io::{Read, Write};
59use std::net::TcpStream;
60use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
61use std::sync::{Mutex, OnceLock};
62use std::time::{Duration, SystemTime, UNIX_EPOCH};
63
64use crate::application::Application;
65use crate::middleware::Middleware;
66use crate::request::Request;
67use crate::response::Response;
68use crate::server::ConnectionInfo;
69
70// ── ID generation ─────────────────────────────────────────────────────────────
71
72static COUNTER: AtomicU64 = AtomicU64::new(1);
73static START_SECS: OnceLock<u64> = OnceLock::new();
74
75fn start_secs() -> u64 {
76    *START_SECS.get_or_init(|| {
77        SystemTime::now()
78            .duration_since(UNIX_EPOCH)
79            .unwrap_or_default()
80            .as_secs()
81    })
82}
83
84/// Generate a new 128-bit trace ID. Not cryptographically random but unique
85/// enough for tracing purposes across service restarts.
86pub fn new_trace_id() -> [u8; 16] {
87    let start = start_secs();
88    let seq = COUNTER.fetch_add(2, Ordering::Relaxed);
89    let mut id = [0u8; 16];
90    id[..8].copy_from_slice(&start.to_be_bytes());
91    id[8..].copy_from_slice(&seq.to_be_bytes());
92    id
93}
94
95/// Generate a new 64-bit span ID.
96pub fn new_span_id() -> [u8; 8] {
97    let seq = COUNTER.fetch_add(2, Ordering::Relaxed) + 1;
98    seq.to_be_bytes()
99}
100
101fn hex16(b: &[u8; 16]) -> String {
102    b.iter().map(|x| format!("{:02x}", x)).collect()
103}
104
105fn hex8(b: &[u8; 8]) -> String {
106    b.iter().map(|x| format!("{:02x}", x)).collect()
107}
108
109// ── W3C Trace Context ─────────────────────────────────────────────────────────
110
111/// Parsed W3C `traceparent` header value.
112///
113/// Format: `00-{trace-id}-{parent-id}-{flags}`
114#[derive(Copy, Clone, Debug, PartialEq, Eq)]
115pub struct TraceContext {
116    pub trace_id: [u8; 16],
117    pub parent_span_id: [u8; 8],
118    pub sampled: bool,
119}
120
121impl TraceContext {
122    /// Parse a `traceparent` header value.
123    pub fn parse(header: &str) -> Option<Self> {
124        let parts: Vec<&str> = header.trim().splitn(4, '-').collect();
125        if parts.len() != 4 || parts[0] != "00" {
126            return None;
127        }
128        let trace_id = parse_hex16(parts[1])?;
129        let parent_span_id = parse_hex8(parts[2])?;
130        let flags = u8::from_str_radix(parts[3], 16).ok()?;
131        Some(TraceContext { trace_id, parent_span_id, sampled: flags & 0x01 != 0 })
132    }
133
134    /// Render as a `traceparent` header value for this context acting as parent.
135    pub fn as_header(&self, span_id: &[u8; 8]) -> String {
136        format!("00-{}-{}-{:02x}", hex16(&self.trace_id), hex8(span_id), self.sampled as u8)
137    }
138}
139
140fn parse_hex16(s: &str) -> Option<[u8; 16]> {
141    if s.len() != 32 { return None; }
142    let mut out = [0u8; 16];
143    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
144        out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
145    }
146    Some(out)
147}
148
149fn parse_hex8(s: &str) -> Option<[u8; 8]> {
150    if s.len() != 16 { return None; }
151    let mut out = [0u8; 8];
152    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
153        out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
154    }
155    Some(out)
156}
157
158// ── thread-local active span ──────────────────────────────────────────────────
159
160/// Compact span context stored in thread-local for downstream propagation.
161#[derive(Copy, Clone)]
162struct ActiveSpan {
163    trace_id: [u8; 16],
164    span_id: [u8; 8],
165    sampled: bool,
166}
167
168thread_local! {
169    static ACTIVE: Cell<Option<ActiveSpan>> = Cell::new(None);
170}
171
172/// Return the W3C `traceparent` value for the span currently being processed
173/// on this thread. Returns `None` when no `OtelLayer` span is active.
174///
175/// Used by [`crate::proxy::ReverseProxy`] to propagate trace context to
176/// upstream services.
177pub fn current_traceparent() -> Option<String> {
178    ACTIVE.with(|cell| {
179        cell.get().map(|s| {
180            format!(
181                "00-{}-{}-{:02x}",
182                hex16(&s.trace_id),
183                hex8(&s.span_id),
184                s.sampled as u8,
185            )
186        })
187    })
188}
189
190// ── span data ─────────────────────────────────────────────────────────────────
191
192/// A completed span ready for export.
193#[derive(Debug, Clone)]
194pub struct SpanData {
195    pub trace_id: [u8; 16],
196    pub span_id: [u8; 8],
197    pub parent_span_id: Option<[u8; 8]>,
198    /// `"GET /api/users"` — method + path, query stripped.
199    pub name: String,
200    pub start_ns: u64,
201    pub end_ns: u64,
202    pub http_method: String,
203    pub http_target: String,
204    pub http_status: i16,
205    /// 0=Unset, 1=Ok, 2=Error
206    pub status_code: u8,
207}
208
209impl SpanData {
210    fn duration_ms(&self) -> f64 {
211        (self.end_ns.saturating_sub(self.start_ns)) as f64 / 1_000_000.0
212    }
213}
214
215fn now_ns() -> u64 {
216    SystemTime::now()
217        .duration_since(UNIX_EPOCH)
218        .unwrap_or_default()
219        .as_nanos() as u64
220}
221
222fn strip_query(uri: &str) -> &str {
223    match uri.find('?') {
224        Some(i) => &uri[..i],
225        None => uri,
226    }
227}
228
229// ── exporter ──────────────────────────────────────────────────────────────────
230
231/// Destination for completed spans.
232pub trait Exporter: Send + Sync {
233    fn export(&self, spans: &[SpanData]);
234    fn shutdown(&self) {}
235}
236
237/// Print one JSON line per span to stdout. Useful for development and for
238/// piping into `jq` or a log aggregator.
239pub struct StdoutExporter;
240
241impl Exporter for StdoutExporter {
242    fn export(&self, spans: &[SpanData]) {
243        for span in spans {
244            println!(
245                "{{\"traceId\":\"{}\",\"spanId\":\"{}\",\"parentSpanId\":{},\
246                 \"name\":\"{}\",\"startNs\":{},\"durationMs\":{:.3},\
247                 \"httpMethod\":\"{}\",\"httpTarget\":\"{}\",\"httpStatus\":{}}}",
248                hex16(&span.trace_id),
249                hex8(&span.span_id),
250                span.parent_span_id
251                    .as_ref()
252                    .map(|p| format!("\"{}\"", hex8(p)))
253                    .unwrap_or_else(|| "null".to_string()),
254                span.name,
255                span.start_ns,
256                span.duration_ms(),
257                span.http_method,
258                span.http_target,
259                span.http_status,
260            );
261        }
262    }
263}
264
265/// Send spans to an OTLP-compatible collector over HTTP (JSON encoding).
266///
267/// Compatible with Jaeger ≥ 1.35, Grafana Tempo, and OpenTelemetry Collector.
268/// Point at `http://localhost:4318` (the default OTLP HTTP port).
269pub struct OtlpHttpExporter {
270    host: String,
271    port: u16,
272    timeout: Duration,
273    service_name: String,
274    service_version: String,
275}
276
277impl OtlpHttpExporter {
278    pub fn new(endpoint: &str, service_name: &str, service_version: &str) -> Self {
279        // Parse "http://host:port" or "host:port"
280        let stripped = endpoint
281            .trim_start_matches("http://")
282            .trim_start_matches("https://");
283        let (host, port) = if let Some(i) = stripped.rfind(':') {
284            let p = stripped[i + 1..].parse().unwrap_or(4318);
285            (stripped[..i].to_string(), p)
286        } else {
287            (stripped.to_string(), 4318)
288        };
289        OtlpHttpExporter {
290            host,
291            port,
292            timeout: Duration::from_secs(5),
293            service_name: service_name.to_string(),
294            service_version: service_version.to_string(),
295        }
296    }
297
298    pub fn build_body(&self, spans: &[SpanData]) -> String {
299        let span_jsons: Vec<String> = spans.iter().map(|s| {
300            let parent = s.parent_span_id
301                .as_ref()
302                .map(|p| format!(",\"parentSpanId\":\"{}\"", hex8(p)))
303                .unwrap_or_default();
304            let status_msg = if s.status_code == 2 { "Error" } else { "Unset" };
305            format!(
306                "{{\"traceId\":\"{trace}\",\"spanId\":\"{span}\"{parent},\
307                 \"name\":\"{name}\",\"kind\":2,\
308                 \"startTimeUnixNano\":\"{start}\",\"endTimeUnixNano\":\"{end}\",\
309                 \"attributes\":[\
310                   {{\"key\":\"http.method\",\"value\":{{\"stringValue\":\"{method}\"}} }},\
311                   {{\"key\":\"http.target\",\"value\":{{\"stringValue\":\"{target}\"}} }},\
312                   {{\"key\":\"http.status_code\",\"value\":{{\"intValue\":{status}}} }}\
313                 ],\
314                 \"status\":{{\"code\":{scode},\"message\":\"{smsg}\"}} }}",
315                trace  = hex16(&s.trace_id),
316                span   = hex8(&s.span_id),
317                name   = s.name,
318                start  = s.start_ns,
319                end    = s.end_ns,
320                method = s.http_method,
321                target = s.http_target,
322                status = s.http_status,
323                scode  = s.status_code,
324                smsg   = status_msg,
325            )
326        }).collect();
327
328        format!(
329            "{{\"resourceSpans\":[{{\"resource\":{{\"attributes\":[\
330               {{\"key\":\"service.name\",\"value\":{{\"stringValue\":\"{svc}\"}} }},\
331               {{\"key\":\"service.version\",\"value\":{{\"stringValue\":\"{ver}\"}} }}\
332             ]}},\"scopeSpans\":[{{\"scope\":{{\"name\":\"rws\"}},\"spans\":[{spans}]}}]}}]}}",
333            svc   = self.service_name,
334            ver   = self.service_version,
335            spans = span_jsons.join(","),
336        )
337    }
338
339    fn post(&self, body: &str) {
340        use std::net::ToSocketAddrs;
341        let addr = format!("{}:{}", self.host, self.port);
342        let Some(socket_addr) = addr.to_socket_addrs().ok().and_then(|mut i| i.next()) else {
343            return;
344        };
345        let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, self.timeout) else {
346            return;
347        };
348        let _ = stream.set_write_timeout(Some(self.timeout));
349        let _ = stream.set_read_timeout(Some(self.timeout));
350        let request = format!(
351            "POST /v1/traces HTTP/1.1\r\n\
352             Host: {host}:{port}\r\n\
353             Content-Type: application/json\r\n\
354             Content-Length: {len}\r\n\
355             Connection: close\r\n\r\n\
356             {body}",
357            host = self.host,
358            port = self.port,
359            len  = body.len(),
360            body = body,
361        );
362        if stream.write_all(request.as_bytes()).is_ok() {
363            let mut _buf = [0u8; 256];
364            let _ = stream.read(&mut _buf); // drain response
365        }
366    }
367}
368
369impl Exporter for OtlpHttpExporter {
370    fn export(&self, spans: &[SpanData]) {
371        if spans.is_empty() { return; }
372        let body = self.build_body(spans);
373        self.post(&body);
374    }
375}
376
377// ── global tracer ─────────────────────────────────────────────────────────────
378
379struct GlobalTracer {
380    exporter: Box<dyn Exporter>,
381    batch: Mutex<Vec<SpanData>>,
382    batch_size: usize,
383    sample_rate: f64,
384    shutdown_flag: AtomicBool,
385}
386
387impl GlobalTracer {
388    fn should_sample(&self) -> bool {
389        if self.sample_rate >= 1.0 { return true; }
390        if self.sample_rate <= 0.0 { return false; }
391        // Use the counter as a pseudo-random source — cheap and uniform enough.
392        let n = COUNTER.load(Ordering::Relaxed);
393        (n % 10000) < (self.sample_rate * 10000.0) as u64
394    }
395
396    fn record(&self, span: SpanData) {
397        let mut guard = self.batch.lock().unwrap();
398        guard.push(span);
399        if guard.len() >= self.batch_size {
400            let spans = std::mem::take(&mut *guard);
401            drop(guard);
402            self.exporter.export(&spans);
403        }
404    }
405
406    fn flush(&self) {
407        let spans = std::mem::take(&mut *self.batch.lock().unwrap());
408        if !spans.is_empty() {
409            self.exporter.export(&spans);
410        }
411    }
412}
413
414static TRACER: OnceLock<GlobalTracer> = OnceLock::new();
415
416fn tracer() -> Option<&'static GlobalTracer> {
417    TRACER.get()
418}
419
420// ── public API ────────────────────────────────────────────────────────────────
421
422/// Which backend to export spans to.
423#[derive(Clone, Debug)]
424pub enum ExporterConfig {
425    /// Print JSON-encoded spans to stdout. Suitable for development.
426    Stdout,
427    /// POST OTLP JSON to `{endpoint}/v1/traces`.
428    ///
429    /// Compatible with Jaeger ≥ 1.35, Grafana Tempo, OpenTelemetry Collector.
430    /// Typical endpoint: `"http://localhost:4318"`.
431    Otlp { endpoint: String },
432    /// No-op — spans are discarded. Useful in tests to silence output.
433    Discard,
434}
435
436/// Configuration for the tracing subsystem.
437#[derive(Clone, Debug)]
438pub struct TracingConfig {
439    /// Value of the `service.name` resource attribute (e.g. `"checkout-service"`).
440    pub service_name: String,
441    /// Value of the `service.version` resource attribute.
442    pub service_version: String,
443    /// Where to send completed spans.
444    pub exporter: ExporterConfig,
445    /// Fraction of requests to sample. `1.0` = 100%, `0.1` = 10%.
446    pub sample_rate: f64,
447    /// Maximum number of spans to accumulate before flushing to the exporter.
448    pub batch_size: usize,
449}
450
451impl Default for TracingConfig {
452    fn default() -> Self {
453        TracingConfig {
454            service_name: "rws".to_string(),
455            service_version: env!("CARGO_PKG_VERSION").to_string(),
456            exporter: ExporterConfig::Stdout,
457            sample_rate: 1.0,
458            batch_size: 128,
459        }
460    }
461}
462
463/// Initialize tracing with an explicit config. Call once at startup before
464/// the server starts accepting requests.
465///
466/// Calling this more than once is a no-op (the first call wins).
467pub fn setup(config: TracingConfig) {
468    let exporter: Box<dyn Exporter> = match &config.exporter {
469        ExporterConfig::Stdout => Box::new(StdoutExporter),
470        ExporterConfig::Otlp { endpoint } => Box::new(OtlpHttpExporter::new(
471            endpoint,
472            &config.service_name,
473            &config.service_version,
474        )),
475        ExporterConfig::Discard => Box::new(DiscardExporter),
476    };
477    let _ = TRACER.set(GlobalTracer {
478        exporter,
479        batch: Mutex::new(Vec::new()),
480        batch_size: config.batch_size.max(1),
481        sample_rate: config.sample_rate.clamp(0.0, 1.0),
482        shutdown_flag: AtomicBool::new(false),
483    });
484}
485
486/// Initialize tracing from standard OpenTelemetry environment variables:
487///
488/// - `OTEL_SERVICE_NAME` — service name (default `"rws"`)
489/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — OTLP endpoint URL (default: stdout)
490/// - `OTEL_TRACES_SAMPLER_ARG` — sample rate `0.0`–`1.0` (default `1.0`)
491pub fn setup_from_env() {
492    let service_name = std::env::var("OTEL_SERVICE_NAME")
493        .unwrap_or_else(|_| "rws".to_string());
494    let sample_rate: f64 = std::env::var("OTEL_TRACES_SAMPLER_ARG")
495        .ok()
496        .and_then(|v| v.parse().ok())
497        .unwrap_or(1.0);
498    let exporter = match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok() {
499        Some(ep) if !ep.is_empty() => ExporterConfig::Otlp { endpoint: ep },
500        _ => ExporterConfig::Stdout,
501    };
502    setup(TracingConfig {
503        service_name,
504        service_version: env!("CARGO_PKG_VERSION").to_string(),
505        exporter,
506        sample_rate,
507        batch_size: 128,
508    });
509}
510
511/// Flush all buffered spans to the exporter. Call before the process exits
512/// to ensure no spans are lost.
513pub fn shutdown() {
514    if let Some(t) = tracer() {
515        t.shutdown_flag.store(true, Ordering::Relaxed);
516        t.flush();
517        t.exporter.shutdown();
518    }
519}
520
521/// Flush buffered spans without shutting down. Useful in tests.
522pub fn flush() {
523    if let Some(t) = tracer() {
524        t.flush();
525    }
526}
527
528// ── OtelLayer middleware ──────────────────────────────────────────────────────
529
530/// Middleware that creates an HTTP server span for each request.
531///
532/// - Reads W3C `traceparent` from the request to continue an existing trace.
533/// - Creates a new trace when no `traceparent` is present.
534/// - Stores the active span in thread-local storage so downstream middleware
535///   (e.g. [`crate::proxy::ReverseProxy`]) can propagate it.
536/// - Records the span on the way out with `http.method`, `http.target`, and
537///   `http.status_code` attributes.
538///
539/// Requires [`setup`] or [`setup_from_env`] to be called at startup. Without
540/// initialisation the layer is a no-op passthrough.
541pub struct OtelLayer;
542
543impl Middleware for OtelLayer {
544    fn handle(
545        &self,
546        request: &Request,
547        connection: &ConnectionInfo,
548        next: &dyn Application,
549    ) -> Result<Response, String> {
550        let Some(t) = tracer() else {
551            return next.execute(request, connection);
552        };
553
554        let sampled = t.should_sample();
555
556        // Extract or create trace context.
557        let incoming = request.headers.iter()
558            .find(|h| h.name.eq_ignore_ascii_case("traceparent"))
559            .and_then(|h| TraceContext::parse(&h.value));
560
561        let trace_id = incoming.map(|c| c.trace_id).unwrap_or_else(new_trace_id);
562        let parent_span_id = incoming.map(|c| c.parent_span_id);
563        let span_id = new_span_id();
564
565        // Publish active span for downstream propagation.
566        ACTIVE.with(|cell| {
567            cell.set(Some(ActiveSpan { trace_id, span_id, sampled }));
568        });
569
570        let start_ns = now_ns();
571        let result = next.execute(request, connection);
572        let end_ns = now_ns();
573
574        // Clear active span.
575        ACTIVE.with(|cell| cell.set(None));
576
577        if sampled {
578            let status = match &result {
579                Ok(r) => r.status_code,
580                Err(_) => 500,
581            };
582            let path = strip_query(&request.request_uri).to_string();
583            t.record(SpanData {
584                trace_id,
585                span_id,
586                parent_span_id,
587                name: format!("{} {}", request.method, path),
588                start_ns,
589                end_ns,
590                http_method: request.method.clone(),
591                http_target: request.request_uri.clone(),
592                http_status: status,
593                status_code: if status >= 500 { 2 } else { 0 },
594            });
595        }
596
597        result
598    }
599}
600
601// ── internal no-op exporter (used for tests / Discard config) ─────────────────
602
603struct DiscardExporter;
604impl Exporter for DiscardExporter {
605    fn export(&self, _: &[SpanData]) {}
606}
607
608// ── collect spans in tests ────────────────────────────────────────────────────
609
610/// Captures spans in memory instead of exporting them. Use in unit tests.
611///
612/// ```rust
613/// use rust_web_server::otel::{CapturingExporter, SpanData};
614/// use std::sync::{Arc, Mutex};
615///
616/// let captured: Arc<Mutex<Vec<SpanData>>> = Arc::new(Mutex::new(Vec::new()));
617/// // (CapturingExporter is constructed internally by the test helpers)
618/// ```
619pub struct CapturingExporter {
620    pub spans: Mutex<Vec<SpanData>>,
621}
622
623impl CapturingExporter {
624    pub fn new() -> Self {
625        CapturingExporter { spans: Mutex::new(Vec::new()) }
626    }
627
628    pub fn take(&self) -> Vec<SpanData> {
629        std::mem::take(&mut *self.spans.lock().unwrap())
630    }
631}
632
633impl Default for CapturingExporter {
634    fn default() -> Self { Self::new() }
635}
636
637impl Exporter for CapturingExporter {
638    fn export(&self, spans: &[SpanData]) {
639        self.spans.lock().unwrap().extend_from_slice(spans);
640    }
641}