Skip to main content

rust_web_server/otel/
mod.rs

1//! OpenTelemetry-compatible distributed tracing.
2//!
3//! [`OtelLayer`] is a [`Middleware`] that:
4//! 1. Reads the W3C `traceparent` header from incoming requests and continues
5//!    an existing trace, or starts a fresh one.
6//! 2. Creates an HTTP server span with standard semantic attributes.
7//! 3. Stores the active span context in thread-local storage so downstream
8//!    middleware (e.g. [`crate::proxy::ReverseProxy`]) can propagate it.
9//! 4. Records the completed span to a configurable exporter.
10//!
11//! # Quick start
12//!
13//! ```rust,no_run
14//! use rust_web_server::app::App;
15//! use rust_web_server::core::New;
16//! use rust_web_server::otel::{OtelLayer, TracingConfig, ExporterConfig};
17//!
18//! // Dev: print spans to stdout.
19//! rust_web_server::otel::setup(TracingConfig {
20//!     service_name: "my-service".to_string(),
21//!     service_version: env!("CARGO_PKG_VERSION").to_string(),
22//!     exporter: ExporterConfig::Stdout,
23//!     sample_rate: 1.0,
24//!     batch_size: 128,
25//! });
26//!
27//! let app = App::new().wrap(OtelLayer);
28//! ```
29//!
30//! # Production: OTLP HTTP export
31//!
32//! ```rust,no_run
33//! use rust_web_server::otel::{ExporterConfig, TracingConfig};
34//!
35//! rust_web_server::otel::setup(TracingConfig {
36//!     service_name: "my-service".to_string(),
37//!     service_version: "1.0.0".to_string(),
38//!     exporter: ExporterConfig::Otlp {
39//!         endpoint: "http://localhost:4318".to_string(),
40//!     },
41//!     sample_rate: 0.1,
42//!     batch_size: 512,
43//! });
44//! ```
45//!
46//! Alternatively, set environment variables before calling [`setup_from_env`]:
47//!
48//! ```text
49//! OTEL_SERVICE_NAME=my-service
50//! OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
51//! OTEL_TRACES_SAMPLER_ARG=0.1    # sample rate 0.0–1.0 (default 1.0)
52//! ```
53
54#[cfg(test)]
55mod tests;
56
57use std::cell::{Cell, RefCell};
58use std::io::{Read, Write};
59use std::net::TcpStream;
60use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
61use std::sync::{Mutex, OnceLock};
62use std::time::{Duration, SystemTime, UNIX_EPOCH};
63
64use crate::application::Application;
65use crate::middleware::Middleware;
66use crate::request::Request;
67use crate::response::Response;
68use crate::server::ConnectionInfo;
69
70// ── ID generation ─────────────────────────────────────────────────────────────
71
72static COUNTER: AtomicU64 = AtomicU64::new(1);
73static START_SECS: OnceLock<u64> = OnceLock::new();
74
75fn start_secs() -> u64 {
76    *START_SECS.get_or_init(|| {
77        SystemTime::now()
78            .duration_since(UNIX_EPOCH)
79            .unwrap_or_default()
80            .as_secs()
81    })
82}
83
84/// Generate a new 128-bit trace ID. Not cryptographically random but unique
85/// enough for tracing purposes across service restarts.
86pub fn new_trace_id() -> [u8; 16] {
87    let start = start_secs();
88    let seq = COUNTER.fetch_add(2, Ordering::Relaxed);
89    let mut id = [0u8; 16];
90    id[..8].copy_from_slice(&start.to_be_bytes());
91    id[8..].copy_from_slice(&seq.to_be_bytes());
92    id
93}
94
95/// Generate a new 64-bit span ID.
96pub fn new_span_id() -> [u8; 8] {
97    let seq = COUNTER.fetch_add(2, Ordering::Relaxed) + 1;
98    seq.to_be_bytes()
99}
100
101fn hex16(b: &[u8; 16]) -> String {
102    b.iter().map(|x| format!("{:02x}", x)).collect()
103}
104
105fn hex8(b: &[u8; 8]) -> String {
106    b.iter().map(|x| format!("{:02x}", x)).collect()
107}
108
109// ── W3C Trace Context ─────────────────────────────────────────────────────────
110
111/// Parsed W3C `traceparent` header value.
112///
113/// Format: `00-{trace-id}-{parent-id}-{flags}`
114#[derive(Copy, Clone, Debug, PartialEq, Eq)]
115pub struct TraceContext {
116    pub trace_id: [u8; 16],
117    pub parent_span_id: [u8; 8],
118    pub sampled: bool,
119}
120
121impl TraceContext {
122    /// Parse a `traceparent` header value.
123    pub fn parse(header: &str) -> Option<Self> {
124        let parts: Vec<&str> = header.trim().splitn(4, '-').collect();
125        if parts.len() != 4 || parts[0] != "00" {
126            return None;
127        }
128        let trace_id = parse_hex16(parts[1])?;
129        let parent_span_id = parse_hex8(parts[2])?;
130        let flags = u8::from_str_radix(parts[3], 16).ok()?;
131        Some(TraceContext { trace_id, parent_span_id, sampled: flags & 0x01 != 0 })
132    }
133
134    /// Render as a `traceparent` header value for this context acting as parent.
135    pub fn as_header(&self, span_id: &[u8; 8]) -> String {
136        format!("00-{}-{}-{:02x}", hex16(&self.trace_id), hex8(span_id), self.sampled as u8)
137    }
138}
139
140fn parse_hex16(s: &str) -> Option<[u8; 16]> {
141    if s.len() != 32 { return None; }
142    let mut out = [0u8; 16];
143    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
144        out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
145    }
146    Some(out)
147}
148
149fn parse_hex8(s: &str) -> Option<[u8; 8]> {
150    if s.len() != 16 { return None; }
151    let mut out = [0u8; 8];
152    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
153        out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
154    }
155    Some(out)
156}
157
158// ── span kind / attributes ───────────────────────────────────────────────────
159
160/// OTLP `SpanKind`. Numbering matches the OTLP spec (`INTERNAL`=1, `SERVER`=2,
161/// `CLIENT`=3) so the numeric value can be cast directly with `as i32` when
162/// building OTLP JSON. `PRODUCER`/`CONSUMER` are intentionally not exposed —
163/// nothing in this crate does message-queue instrumentation yet.
164#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
165pub enum SpanKind {
166    /// Internal work with no remote counterpart — DB query, cache lookup,
167    /// business logic. The default for [`span`].
168    #[default]
169    Internal = 1,
170    /// The span created by [`OtelLayer`] for an incoming HTTP request.
171    Server = 2,
172    /// An outbound call to another service. The kind for [`client_span`].
173    Client = 3,
174}
175
176/// A span attribute value — matches OTLP `AnyValue`'s basic variants.
177#[derive(Clone, Debug, PartialEq)]
178pub enum AttributeValue {
179    String(String),
180    Int(i64),
181    Float(f64),
182    Bool(bool),
183}
184
185impl From<&str> for AttributeValue {
186    fn from(s: &str) -> Self { AttributeValue::String(s.to_string()) }
187}
188impl From<String> for AttributeValue {
189    fn from(s: String) -> Self { AttributeValue::String(s) }
190}
191impl From<i64> for AttributeValue {
192    fn from(v: i64) -> Self { AttributeValue::Int(v) }
193}
194impl From<i32> for AttributeValue {
195    fn from(v: i32) -> Self { AttributeValue::Int(v as i64) }
196}
197impl From<u32> for AttributeValue {
198    fn from(v: u32) -> Self { AttributeValue::Int(v as i64) }
199}
200impl From<f64> for AttributeValue {
201    fn from(v: f64) -> Self { AttributeValue::Float(v) }
202}
203impl From<bool> for AttributeValue {
204    fn from(v: bool) -> Self { AttributeValue::Bool(v) }
205}
206
207// ── span data ─────────────────────────────────────────────────────────────────
208
209/// A completed span ready for export.
210#[derive(Debug, Clone, Default)]
211pub struct SpanData {
212    pub trace_id: [u8; 16],
213    pub span_id: [u8; 8],
214    pub parent_span_id: Option<[u8; 8]>,
215    /// `"GET /api/users"` — method + path, query stripped.
216    pub name: String,
217    pub start_ns: u64,
218    pub end_ns: u64,
219    /// Empty for a non-HTTP span (e.g. a `db.query` child span) — exporters
220    /// omit the `http.*` attributes entirely in that case.
221    pub http_method: String,
222    pub http_target: String,
223    pub http_status: i16,
224    /// 0=Unset, 1=Ok, 2=Error
225    pub status_code: u8,
226    pub kind: SpanKind,
227    /// Extra key/value attributes beyond the first-class `http.*` fields —
228    /// e.g. `db.statement`, `cache.key`, or any custom attribute set via
229    /// [`Span::set_attribute`].
230    pub attributes: Vec<(String, AttributeValue)>,
231}
232
233impl SpanData {
234    fn duration_ms(&self) -> f64 {
235        (self.end_ns.saturating_sub(self.start_ns)) as f64 / 1_000_000.0
236    }
237}
238
239fn now_ns() -> u64 {
240    SystemTime::now()
241        .duration_since(UNIX_EPOCH)
242        .unwrap_or_default()
243        .as_nanos() as u64
244}
245
246fn strip_query(uri: &str) -> &str {
247    match uri.find('?') {
248        Some(i) => &uri[..i],
249        None => uri,
250    }
251}
252
253// ── thread-local span stack ─────────────────────────────────────────────────
254
255/// Compact span context stored on the thread-local stack for downstream
256/// propagation and parent/child nesting.
257#[derive(Copy, Clone)]
258struct ActiveSpanCtx {
259    trace_id: [u8; 16],
260    span_id: [u8; 8],
261    sampled: bool,
262}
263
264thread_local! {
265    /// One entry per currently-open [`Span`] on this thread, innermost last.
266    /// A single-slot cell can't represent nesting (span A active, starts
267    /// span B, B is now "current", B ends, A is "current" again) — this
268    /// stack is what makes multiple nested spans per request possible.
269    static ACTIVE_STACK: RefCell<Vec<ActiveSpanCtx>> = const { RefCell::new(Vec::new()) };
270}
271
272/// Return the W3C `traceparent` value for the innermost span currently being
273/// processed on this thread (the deepest active [`Span`], not necessarily the
274/// request root). Returns `None` when no span is active.
275///
276/// Use this to propagate trace context into outbound calls made from within
277/// a handler or a child span.
278pub fn current_traceparent() -> Option<String> {
279    ACTIVE_STACK.with(|stack| {
280        stack.borrow().last().map(|s| {
281            format!(
282                "00-{}-{}-{:02x}",
283                hex16(&s.trace_id),
284                hex8(&s.span_id),
285                s.sampled as u8,
286            )
287        })
288    })
289}
290
291// ── Span ─────────────────────────────────────────────────────────────────────
292
293/// A single open span. Create one with [`span`] or [`client_span`] (or
294/// [`Span::new`] for full control over [`SpanKind`]); it becomes the
295/// "current" span on this thread until it's dropped (or [`Span::end`] is
296/// called explicitly, which is equivalent).
297///
298/// Nesting: creating a `Span` while another is already active makes the new
299/// one its child — the child inherits the parent's `trace_id` and sampling
300/// decision, and `parent_span_id()` is the parent's `span_id()`. Dropping the
301/// child makes the parent "current" again.
302///
303/// Not `Send`: a `Span` must be dropped on the thread that created it, since
304/// dropping pops a thread-local stack — moving one to another thread and
305/// dropping it there would corrupt that thread's stack.
306///
307/// # Example
308///
309/// ```rust
310/// use rust_web_server::otel;
311///
312/// let span = otel::span("db.query");
313/// span.set_attribute("db.statement", "SELECT 1");
314/// // ... do the work ...
315/// span.end(); // or just let it drop at the end of scope
316/// ```
317pub struct Span {
318    trace_id: [u8; 16],
319    span_id: [u8; 8],
320    parent_span_id: Option<[u8; 8]>,
321    sampled: bool,
322    kind: SpanKind,
323    name: String,
324    start_ns: u64,
325    attributes: RefCell<Vec<(String, AttributeValue)>>,
326    status_code: Cell<u8>,
327    http_method: RefCell<Option<String>>,
328    http_target: RefCell<Option<String>>,
329    http_status: Cell<Option<i16>>,
330    // Makes `Span` `!Send` (a `Rc` is never `Send`) without requiring
331    // unstable negative impls. See the "Not `Send`" note above.
332    _not_send: std::marker::PhantomData<std::rc::Rc<()>>,
333}
334
335fn new_span(name: &str, kind: SpanKind, incoming: Option<TraceContext>) -> Span {
336    let (trace_id, parent_span_id, sampled) = ACTIVE_STACK.with(|stack| {
337        let stack = stack.borrow();
338        if let Some(top) = stack.last() {
339            // Nested child: inherit the trace and the parent's sampling
340            // decision — resampling independently per child would produce
341            // broken/partial traces whenever a child's own coin-flip
342            // disagreed with its parent's.
343            (top.trace_id, Some(top.span_id), top.sampled)
344        } else {
345            // Root-ish: no active parent on this thread, so make a fresh
346            // sampling decision, exactly like `OtelLayer` always has.
347            let sampled = tracer().map(|t| t.should_sample()).unwrap_or(false);
348            match incoming {
349                Some(ctx) => (ctx.trace_id, Some(ctx.parent_span_id), sampled),
350                None => (new_trace_id(), None, sampled),
351            }
352        }
353    });
354    let span_id = new_span_id();
355    ACTIVE_STACK.with(|stack| stack.borrow_mut().push(ActiveSpanCtx { trace_id, span_id, sampled }));
356
357    Span {
358        trace_id,
359        span_id,
360        parent_span_id,
361        sampled,
362        kind,
363        name: name.to_string(),
364        start_ns: now_ns(),
365        attributes: RefCell::new(Vec::new()),
366        status_code: Cell::new(0),
367        http_method: RefCell::new(None),
368        http_target: RefCell::new(None),
369        http_status: Cell::new(None),
370        _not_send: std::marker::PhantomData,
371    }
372}
373
374impl Span {
375    /// Start a new span with an explicit [`SpanKind`]. Prefer [`span`] or
376    /// [`client_span`] for the common cases.
377    pub fn new(name: &str, kind: SpanKind) -> Span {
378        new_span(name, kind, None)
379    }
380
381    /// Used only by [`OtelLayer`] to start the request's root span, honoring
382    /// an incoming W3C `traceparent` header if present.
383    pub(crate) fn start_root(name: &str, kind: SpanKind, incoming: Option<TraceContext>) -> Span {
384        new_span(name, kind, incoming)
385    }
386
387    /// Attach a key/value attribute. Repeated keys are appended, not
388    /// deduplicated — matching this module's existing "loosely OTLP" style.
389    pub fn set_attribute(&self, key: &str, value: impl Into<AttributeValue>) {
390        self.attributes.borrow_mut().push((key.to_string(), value.into()));
391    }
392
393    /// Mark this span as failed (OTLP `Status.code = 2`, Error).
394    pub fn set_error(&self) {
395        self.status_code.set(2);
396    }
397
398    /// Mark this span as failed and attach an `error.message` attribute.
399    pub fn record_error(&self, message: &str) {
400        self.set_error();
401        self.set_attribute("error.message", message);
402    }
403
404    pub fn trace_id(&self) -> [u8; 16] {
405        self.trace_id
406    }
407
408    pub fn span_id(&self) -> [u8; 8] {
409        self.span_id
410    }
411
412    pub fn parent_span_id(&self) -> Option<[u8; 8]> {
413        self.parent_span_id
414    }
415
416    /// End the span now. Equivalent to letting it drop at the end of scope —
417    /// both run the exact same recording logic — but useful when you want
418    /// the span's duration to stop before other work continues in the same
419    /// scope.
420    pub fn end(self) {}
421
422    pub(crate) fn set_http(&self, method: &str, target: &str) {
423        *self.http_method.borrow_mut() = Some(method.to_string());
424        *self.http_target.borrow_mut() = Some(target.to_string());
425    }
426
427    /// Also marks the span as an error when `status >= 500`.
428    pub(crate) fn set_http_status(&self, status: i16) {
429        self.http_status.set(Some(status));
430        if status >= 500 {
431            self.set_error();
432        }
433    }
434
435    fn finish(&mut self, end_ns: u64) -> SpanData {
436        SpanData {
437            trace_id: self.trace_id,
438            span_id: self.span_id,
439            parent_span_id: self.parent_span_id,
440            name: std::mem::take(&mut self.name),
441            start_ns: self.start_ns,
442            end_ns,
443            http_method: self.http_method.get_mut().take().unwrap_or_default(),
444            http_target: self.http_target.get_mut().take().unwrap_or_default(),
445            http_status: self.http_status.get().unwrap_or(0),
446            status_code: self.status_code.get(),
447            kind: self.kind,
448            attributes: std::mem::take(self.attributes.get_mut()),
449        }
450    }
451}
452
453impl Drop for Span {
454    fn drop(&mut self) {
455        ACTIVE_STACK.with(|stack| {
456            let mut stack = stack.borrow_mut();
457            // Pop this span and (defensively) anything still above it, in
458            // case a descendant was somehow leaked without being dropped.
459            if let Some(pos) = stack.iter().rposition(|c| c.span_id == self.span_id) {
460                stack.truncate(pos);
461            }
462        });
463
464        if !self.sampled {
465            return;
466        }
467        let Some(t) = tracer() else { return };
468        let end_ns = now_ns();
469        t.record(self.finish(end_ns));
470    }
471}
472
473/// Start a new [`SpanKind::Internal`] child span nested under the currently
474/// active span (or a fresh trace if none is active). Use for internal work
475/// like a database query or cache lookup.
476///
477/// ```rust
478/// use rust_web_server::otel;
479///
480/// let span = otel::span("db.query");
481/// span.set_attribute("db.statement", "SELECT 1");
482/// ```
483pub fn span(name: &str) -> Span {
484    Span::new(name, SpanKind::Internal)
485}
486
487/// Start a new [`SpanKind::Client`] child span for an outbound call to
488/// another service (an HTTP request, a gRPC call, ...).
489pub fn client_span(name: &str) -> Span {
490    Span::new(name, SpanKind::Client)
491}
492
493// ── exporter ──────────────────────────────────────────────────────────────────
494
495/// Destination for completed spans.
496pub trait Exporter: Send + Sync {
497    fn export(&self, spans: &[SpanData]);
498    fn shutdown(&self) {}
499}
500
501/// Renders one [`AttributeValue`] as an OTLP `AnyValue` JSON object.
502/// `Int` is string-encoded per the OTLP JSON mapping for `int64`.
503fn attr_value_json(v: &AttributeValue) -> String {
504    match v {
505        AttributeValue::String(s) => format!("{{\"stringValue\":\"{s}\"}}"),
506        AttributeValue::Int(i) => format!("{{\"intValue\":\"{i}\"}}"),
507        AttributeValue::Float(f) => format!("{{\"doubleValue\":{f}}}"),
508        AttributeValue::Bool(b) => format!("{{\"boolValue\":{b}}}"),
509    }
510}
511
512/// Renders one `(key, value)` pair as an OTLP `KeyValue` JSON object.
513fn attr_json(key: &str, value: &AttributeValue) -> String {
514    format!("{{\"key\":\"{key}\",\"value\":{}}}", attr_value_json(value))
515}
516
517/// Print one JSON line per span to stdout. Useful for development and for
518/// piping into `jq` or a log aggregator.
519pub struct StdoutExporter;
520
521impl StdoutExporter {
522    fn format_span(span: &SpanData) -> String {
523        let http_attrs = if span.http_method.is_empty() {
524            String::new()
525        } else {
526            format!(
527                ",\"httpMethod\":\"{}\",\"httpTarget\":\"{}\",\"httpStatus\":{}",
528                span.http_method, span.http_target, span.http_status,
529            )
530        };
531        let extra_attrs: String = span.attributes.iter()
532            .map(|(k, v)| format!(",\"{k}\":{}", attr_value_json(v)))
533            .collect();
534        format!(
535            "{{\"traceId\":\"{}\",\"spanId\":\"{}\",\"parentSpanId\":{},\
536             \"name\":\"{}\",\"kind\":{},\"startNs\":{},\"durationMs\":{:.3}{http_attrs}{extra_attrs}}}",
537            hex16(&span.trace_id),
538            hex8(&span.span_id),
539            span.parent_span_id
540                .as_ref()
541                .map(|p| format!("\"{}\"", hex8(p)))
542                .unwrap_or_else(|| "null".to_string()),
543            span.name,
544            span.kind as i32,
545            span.start_ns,
546            span.duration_ms(),
547        )
548    }
549}
550
551impl Exporter for StdoutExporter {
552    fn export(&self, spans: &[SpanData]) {
553        for span in spans {
554            println!("{}", Self::format_span(span));
555        }
556    }
557}
558
559/// Send spans to an OTLP-compatible collector over HTTP (JSON encoding).
560///
561/// Compatible with Jaeger ≥ 1.35, Grafana Tempo, and OpenTelemetry Collector.
562/// Point at `http://localhost:4318` (the default OTLP HTTP port).
563pub struct OtlpHttpExporter {
564    host: String,
565    port: u16,
566    timeout: Duration,
567    service_name: String,
568    service_version: String,
569}
570
571impl OtlpHttpExporter {
572    pub fn new(endpoint: &str, service_name: &str, service_version: &str) -> Self {
573        // Parse "http://host:port" or "host:port"
574        let stripped = endpoint
575            .trim_start_matches("http://")
576            .trim_start_matches("https://");
577        let (host, port) = if let Some(i) = stripped.rfind(':') {
578            let p = stripped[i + 1..].parse().unwrap_or(4318);
579            (stripped[..i].to_string(), p)
580        } else {
581            (stripped.to_string(), 4318)
582        };
583        OtlpHttpExporter {
584            host,
585            port,
586            timeout: Duration::from_secs(5),
587            service_name: service_name.to_string(),
588            service_version: service_version.to_string(),
589        }
590    }
591
592    pub fn build_body(&self, spans: &[SpanData]) -> String {
593        let span_jsons: Vec<String> = spans.iter().map(|s| {
594            let parent = s.parent_span_id
595                .as_ref()
596                .map(|p| format!(",\"parentSpanId\":\"{}\"", hex8(p)))
597                .unwrap_or_default();
598            let status_msg = if s.status_code == 2 { "Error" } else { "Unset" };
599
600            let mut attrs: Vec<String> = Vec::new();
601            if !s.http_method.is_empty() {
602                attrs.push(format!("{{\"key\":\"http.method\",\"value\":{{\"stringValue\":\"{}\"}} }}", s.http_method));
603                attrs.push(format!("{{\"key\":\"http.target\",\"value\":{{\"stringValue\":\"{}\"}} }}", s.http_target));
604                attrs.push(format!("{{\"key\":\"http.status_code\",\"value\":{{\"intValue\":\"{}\"}} }}", s.http_status));
605            }
606            attrs.extend(s.attributes.iter().map(|(k, v)| attr_json(k, v)));
607
608            format!(
609                "{{\"traceId\":\"{trace}\",\"spanId\":\"{span}\"{parent},\
610                 \"name\":\"{name}\",\"kind\":{kind},\
611                 \"startTimeUnixNano\":\"{start}\",\"endTimeUnixNano\":\"{end}\",\
612                 \"attributes\":[{attrs}],\
613                 \"status\":{{\"code\":{scode},\"message\":\"{smsg}\"}} }}",
614                trace = hex16(&s.trace_id),
615                span  = hex8(&s.span_id),
616                name  = s.name,
617                kind  = s.kind as i32,
618                start = s.start_ns,
619                end   = s.end_ns,
620                attrs = attrs.join(","),
621                scode = s.status_code,
622                smsg  = status_msg,
623            )
624        }).collect();
625
626        format!(
627            "{{\"resourceSpans\":[{{\"resource\":{{\"attributes\":[\
628               {{\"key\":\"service.name\",\"value\":{{\"stringValue\":\"{svc}\"}} }},\
629               {{\"key\":\"service.version\",\"value\":{{\"stringValue\":\"{ver}\"}} }}\
630             ]}},\"scopeSpans\":[{{\"scope\":{{\"name\":\"rws\"}},\"spans\":[{spans}]}}]}}]}}",
631            svc   = self.service_name,
632            ver   = self.service_version,
633            spans = span_jsons.join(","),
634        )
635    }
636
637    fn post(&self, body: &str) {
638        use std::net::ToSocketAddrs;
639        let addr = format!("{}:{}", self.host, self.port);
640        let Some(socket_addr) = addr.to_socket_addrs().ok().and_then(|mut i| i.next()) else {
641            return;
642        };
643        let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, self.timeout) else {
644            return;
645        };
646        let _ = stream.set_write_timeout(Some(self.timeout));
647        let _ = stream.set_read_timeout(Some(self.timeout));
648        let request = format!(
649            "POST /v1/traces HTTP/1.1\r\n\
650             Host: {host}:{port}\r\n\
651             Content-Type: application/json\r\n\
652             Content-Length: {len}\r\n\
653             Connection: close\r\n\r\n\
654             {body}",
655            host = self.host,
656            port = self.port,
657            len  = body.len(),
658            body = body,
659        );
660        if stream.write_all(request.as_bytes()).is_ok() {
661            let mut _buf = [0u8; 256];
662            let _ = stream.read(&mut _buf); // drain response
663        }
664    }
665}
666
667impl Exporter for OtlpHttpExporter {
668    fn export(&self, spans: &[SpanData]) {
669        if spans.is_empty() { return; }
670        let body = self.build_body(spans);
671        self.post(&body);
672    }
673}
674
675// ── global tracer ─────────────────────────────────────────────────────────────
676
677struct GlobalTracer {
678    exporter: Box<dyn Exporter>,
679    batch: Mutex<Vec<SpanData>>,
680    batch_size: usize,
681    sample_rate: f64,
682    shutdown_flag: AtomicBool,
683}
684
685impl GlobalTracer {
686    fn should_sample(&self) -> bool {
687        if self.sample_rate >= 1.0 { return true; }
688        if self.sample_rate <= 0.0 { return false; }
689        // Use the counter as a pseudo-random source — cheap and uniform enough.
690        let n = COUNTER.load(Ordering::Relaxed);
691        (n % 10000) < (self.sample_rate * 10000.0) as u64
692    }
693
694    fn record(&self, span: SpanData) {
695        let mut guard = self.batch.lock().unwrap();
696        guard.push(span);
697        if guard.len() >= self.batch_size {
698            let spans = std::mem::take(&mut *guard);
699            drop(guard);
700            self.exporter.export(&spans);
701        }
702    }
703
704    fn flush(&self) {
705        let spans = std::mem::take(&mut *self.batch.lock().unwrap());
706        if !spans.is_empty() {
707            self.exporter.export(&spans);
708        }
709    }
710}
711
712static TRACER: OnceLock<GlobalTracer> = OnceLock::new();
713
714fn tracer() -> Option<&'static GlobalTracer> {
715    TRACER.get()
716}
717
718// ── public API ────────────────────────────────────────────────────────────────
719
720/// Which backend to export spans to.
721#[derive(Clone, Debug)]
722pub enum ExporterConfig {
723    /// Print JSON-encoded spans to stdout. Suitable for development.
724    Stdout,
725    /// POST OTLP JSON to `{endpoint}/v1/traces`.
726    ///
727    /// Compatible with Jaeger ≥ 1.35, Grafana Tempo, OpenTelemetry Collector.
728    /// Typical endpoint: `"http://localhost:4318"`.
729    Otlp { endpoint: String },
730    /// No-op — spans are discarded. Useful in tests to silence output.
731    Discard,
732}
733
734/// Configuration for the tracing subsystem.
735#[derive(Clone, Debug)]
736pub struct TracingConfig {
737    /// Value of the `service.name` resource attribute (e.g. `"checkout-service"`).
738    pub service_name: String,
739    /// Value of the `service.version` resource attribute.
740    pub service_version: String,
741    /// Where to send completed spans.
742    pub exporter: ExporterConfig,
743    /// Fraction of requests to sample. `1.0` = 100%, `0.1` = 10%.
744    pub sample_rate: f64,
745    /// Maximum number of spans to accumulate before flushing to the exporter.
746    pub batch_size: usize,
747}
748
749impl Default for TracingConfig {
750    fn default() -> Self {
751        TracingConfig {
752            service_name: "rws".to_string(),
753            service_version: env!("CARGO_PKG_VERSION").to_string(),
754            exporter: ExporterConfig::Stdout,
755            sample_rate: 1.0,
756            batch_size: 128,
757        }
758    }
759}
760
761/// Initialize tracing with an explicit config. Call once at startup before
762/// the server starts accepting requests.
763///
764/// Calling this more than once is a no-op (the first call wins).
765pub fn setup(config: TracingConfig) {
766    let exporter: Box<dyn Exporter> = match &config.exporter {
767        ExporterConfig::Stdout => Box::new(StdoutExporter),
768        ExporterConfig::Otlp { endpoint } => Box::new(OtlpHttpExporter::new(
769            endpoint,
770            &config.service_name,
771            &config.service_version,
772        )),
773        ExporterConfig::Discard => Box::new(DiscardExporter),
774    };
775    setup_with_exporter(config, exporter);
776}
777
778/// Like [`setup`], but takes the exporter directly instead of building one
779/// from [`TracingConfig::exporter`]. Lets you wire in any [`Exporter`] —
780/// including [`CapturingExporter`] — through the same code path production
781/// code uses, rather than only [`ExporterConfig`]'s three built-in choices.
782///
783/// Calling this (or [`setup`]) more than once is a no-op (the first call wins).
784pub fn setup_with_exporter(config: TracingConfig, exporter: Box<dyn Exporter>) {
785    let _ = TRACER.set(GlobalTracer {
786        exporter,
787        batch: Mutex::new(Vec::new()),
788        batch_size: config.batch_size.max(1),
789        sample_rate: config.sample_rate.clamp(0.0, 1.0),
790        shutdown_flag: AtomicBool::new(false),
791    });
792}
793
794/// Initialize tracing from standard OpenTelemetry environment variables:
795///
796/// - `OTEL_SERVICE_NAME` — service name (default `"rws"`)
797/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — OTLP endpoint URL (default: stdout)
798/// - `OTEL_TRACES_SAMPLER_ARG` — sample rate `0.0`–`1.0` (default `1.0`)
799pub fn setup_from_env() {
800    let service_name = std::env::var("OTEL_SERVICE_NAME")
801        .unwrap_or_else(|_| "rws".to_string());
802    let sample_rate: f64 = std::env::var("OTEL_TRACES_SAMPLER_ARG")
803        .ok()
804        .and_then(|v| v.parse().ok())
805        .unwrap_or(1.0);
806    let exporter = match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok() {
807        Some(ep) if !ep.is_empty() => ExporterConfig::Otlp { endpoint: ep },
808        _ => ExporterConfig::Stdout,
809    };
810    setup(TracingConfig {
811        service_name,
812        service_version: env!("CARGO_PKG_VERSION").to_string(),
813        exporter,
814        sample_rate,
815        batch_size: 128,
816    });
817}
818
819/// Flush all buffered spans to the exporter. Call before the process exits
820/// to ensure no spans are lost.
821pub fn shutdown() {
822    if let Some(t) = tracer() {
823        t.shutdown_flag.store(true, Ordering::Relaxed);
824        t.flush();
825        t.exporter.shutdown();
826    }
827}
828
829/// Flush buffered spans without shutting down. Useful in tests.
830pub fn flush() {
831    if let Some(t) = tracer() {
832        t.flush();
833    }
834}
835
836// ── OtelLayer middleware ──────────────────────────────────────────────────────
837
838/// Middleware that creates an HTTP server span for each request.
839///
840/// - Reads W3C `traceparent` from the request to continue an existing trace.
841/// - Creates a new trace when no `traceparent` is present.
842/// - Stores the active span in thread-local storage so downstream middleware
843///   (e.g. [`crate::proxy::ReverseProxy`]) can propagate it.
844/// - Records the span on the way out with `http.method`, `http.target`, and
845///   `http.status_code` attributes.
846///
847/// Requires [`setup`] or [`setup_from_env`] to be called at startup. Without
848/// initialisation the layer is a no-op passthrough.
849pub struct OtelLayer;
850
851impl Middleware for OtelLayer {
852    fn handle(
853        &self,
854        request: &Request,
855        connection: &ConnectionInfo,
856        next: &dyn Application,
857    ) -> Result<Response, String> {
858        if tracer().is_none() {
859            return next.execute(request, connection);
860        }
861
862        // Extract an existing trace context from the request, if present.
863        let incoming = request.headers.iter()
864            .find(|h| h.name.eq_ignore_ascii_case("traceparent"))
865            .and_then(|h| TraceContext::parse(&h.value));
866
867        let path = strip_query(&request.request_uri).to_string();
868        let root = Span::start_root(&format!("{} {}", request.method, path), SpanKind::Server, incoming);
869        root.set_http(&request.method, &request.request_uri);
870
871        let result = next.execute(request, connection);
872
873        let status = match &result {
874            Ok(r) => r.status_code,
875            Err(_) => 500,
876        };
877        root.set_http_status(status);
878
879        result
880        // `root` drops here: pops the thread-local stack and records the
881        // span (if sampled) — the exact same path a child `Span` follows.
882    }
883}
884
885// ── internal no-op exporter (used for tests / Discard config) ─────────────────
886
887struct DiscardExporter;
888impl Exporter for DiscardExporter {
889    fn export(&self, _: &[SpanData]) {}
890}
891
892// ── collect spans in tests ────────────────────────────────────────────────────
893
894/// Captures spans in memory instead of exporting them. Use in unit tests via
895/// [`setup_with_exporter`] to prove that a span was actually recorded, not
896/// just that its getters return the right values.
897///
898/// ```rust,no_run
899/// use rust_web_server::otel::{self, CapturingExporter, TracingConfig, ExporterConfig};
900/// use std::sync::Arc;
901///
902/// let captured = Arc::new(CapturingExporter::new());
903/// otel::setup_with_exporter(
904///     TracingConfig { exporter: ExporterConfig::Discard, ..Default::default() },
905///     Box::new(captured.clone()),
906/// );
907///
908/// otel::span("db.query").end();
909/// otel::flush();
910/// assert_eq!(1, captured.take().len());
911/// ```
912pub struct CapturingExporter {
913    pub spans: Mutex<Vec<SpanData>>,
914}
915
916impl CapturingExporter {
917    pub fn new() -> Self {
918        CapturingExporter { spans: Mutex::new(Vec::new()) }
919    }
920
921    pub fn take(&self) -> Vec<SpanData> {
922        std::mem::take(&mut *self.spans.lock().unwrap())
923    }
924}
925
926impl Default for CapturingExporter {
927    fn default() -> Self { Self::new() }
928}
929
930impl Exporter for CapturingExporter {
931    fn export(&self, spans: &[SpanData]) {
932        self.spans.lock().unwrap().extend_from_slice(spans);
933    }
934}
935
936/// Lets a shared, externally-held `Arc<CapturingExporter>` be handed to
937/// [`setup_with_exporter`] (which takes ownership of a `Box<dyn Exporter>`)
938/// while the caller keeps its own handle to call [`CapturingExporter::take`]
939/// afterward.
940impl Exporter for std::sync::Arc<CapturingExporter> {
941    fn export(&self, spans: &[SpanData]) {
942        self.spans.lock().unwrap().extend_from_slice(spans);
943    }
944}