Skip to main content

osproxy_otlp/
lib.rs

1//! OTLP/HTTP span exporter.
2//!
3//! Ships the shape-only OTLP `ResourceSpans` payloads built by
4//! [`osproxy_observe::resource_spans`] to a collector's `/v1/traces` endpoint
5//! over HTTP with `Content-Type: application/json` (the OTLP/HTTP JSON binding).
6//!
7//! Export is **read-only and never on the request's critical path** (`docs/05`,
8//! ADR-005): [`SpanExporter::export`] returns immediately and the actual POST
9//! runs in a spawned task whose result is ignored, a slow or down collector can
10//! never add latency to, or fail, a client request. Telemetry is best-effort by
11//! construction.
12#![deny(missing_docs)]
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use bytes::Bytes;
18use http_body_util::Full;
19use hyper::{Method, Request};
20use hyper_util::client::legacy::connect::HttpConnector;
21use hyper_util::client::legacy::Client;
22use hyper_util::rt::TokioExecutor;
23use osproxy_observe::SpanExporter;
24use serde_json::Value;
25use tokio::sync::Semaphore;
26
27/// Per-export deadline: a hung collector connection cannot outlive this, so
28/// background export tasks never accumulate indefinitely.
29const EXPORT_TIMEOUT: Duration = Duration::from_secs(10);
30
31/// Cap on concurrent in-flight exports. When the collector is slow/down, exports
32/// beyond this are **dropped** (best-effort telemetry) rather than queued, so a
33/// failing collector cannot grow memory/FDs without bound under load.
34const MAX_INFLIGHT: usize = 256;
35
36/// An [`SpanExporter`] that POSTs OTLP/HTTP JSON spans to a collector.
37///
38/// Construct it with the collector's base URL (e.g.
39/// `http://otel-collector:4318`); the exporter appends the standard
40/// `/v1/traces` path. A pooled HTTP client is reused across exports.
41///
42/// **Must be constructed within a Tokio runtime** (it captures the runtime
43/// handle to spawn background sends). Outside a runtime, export is a no-op rather
44/// than a panic, telemetry is best-effort and never affects the caller.
45#[derive(Clone, Debug)]
46pub struct OtlpHttpExporter {
47    endpoint: String,
48    client: Client<HttpConnector, Full<Bytes>>,
49    handle: Option<tokio::runtime::Handle>,
50    inflight: Arc<Semaphore>,
51}
52
53impl OtlpHttpExporter {
54    /// Builds an exporter targeting `endpoint_base` (the collector base URL).
55    /// The OTLP `/v1/traces` path is appended automatically.
56    #[must_use]
57    pub fn new(endpoint_base: &str) -> Self {
58        let endpoint = format!("{}/v1/traces", endpoint_base.trim_end_matches('/'));
59        let client = Client::builder(TokioExecutor::new()).build_http();
60        Self {
61            endpoint,
62            client,
63            handle: tokio::runtime::Handle::try_current().ok(),
64            inflight: Arc::new(Semaphore::new(MAX_INFLIGHT)),
65        }
66    }
67
68    /// The full `/v1/traces` URL this exporter posts to.
69    #[must_use]
70    pub fn endpoint(&self) -> &str {
71        &self.endpoint
72    }
73}
74
75impl SpanExporter for OtlpHttpExporter {
76    fn export(&self, payload: Value) {
77        // No runtime to spawn on (constructed outside Tokio): drop, never panic.
78        let Some(handle) = self.handle.clone() else {
79            return;
80        };
81        // Bound concurrency: if too many exports are already in flight (a slow or
82        // down collector), drop this one rather than queueing unboundedly.
83        let Ok(permit) = Arc::clone(&self.inflight).try_acquire_owned() else {
84            return;
85        };
86        let client = self.client.clone();
87        let endpoint = self.endpoint.clone();
88        // Fire-and-forget: the POST runs in the background under a deadline and its
89        // result is discarded, so collector latency/failure never reaches the
90        // request path.
91        handle.spawn(async move {
92            let _permit = permit; // released when the export finishes or times out
93            let Ok(body) = serde_json::to_vec(&payload) else {
94                return;
95            };
96            let Ok(req) = Request::builder()
97                .method(Method::POST)
98                .uri(&endpoint)
99                .header("content-type", "application/json")
100                .body(Full::new(Bytes::from(body)))
101            else {
102                return;
103            };
104            let _ = tokio::time::timeout(EXPORT_TIMEOUT, client.request(req)).await;
105        });
106    }
107}