Skip to main content

fast_telemetry_export/
spans.rs

1//! OTLP HTTP/protobuf span exporter.
2//!
3//! Exports completed spans from an [`fast_telemetry::span::SpanCollector`] to an
4//! OTLP-compatible collector via HTTP POST to `/v1/traces`.
5//! Larger payloads are gzip-compressed automatically, and failed exports retry
6//! with exponential backoff.
7
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use fast_telemetry::otlp::{build_resource, build_trace_export_request, pb};
12use fast_telemetry::span::SpanCollector;
13use prost::Message;
14use tokio::time::MissedTickBehavior;
15use tokio_util::sync::CancellationToken;
16
17/// Configuration for the OTLP span exporter.
18#[derive(Clone)]
19pub struct SpanExportConfig {
20    /// OTLP collector endpoint (scheme + host + port), e.g. `"http://localhost:4318"`.
21    /// The path `/v1/traces` is appended automatically.
22    pub endpoint: String,
23    /// Export interval (default: 10s).
24    pub interval: Duration,
25    /// `service.name` resource attribute.
26    pub service_name: String,
27    /// Instrumentation scope name (default: "fast-telemetry").
28    pub scope_name: String,
29    /// Additional resource attributes.
30    pub resource_attributes: Vec<(String, String)>,
31    /// Request timeout (default: 10s).
32    pub timeout: Duration,
33    /// Extra HTTP headers sent with every export request.
34    pub headers: Vec<(String, String)>,
35    /// Maximum number of spans per export batch (default: 512).
36    pub max_batch_size: usize,
37}
38
39impl Default for SpanExportConfig {
40    fn default() -> Self {
41        Self {
42            endpoint: "http://localhost:4318".to_string(),
43            interval: Duration::from_secs(10),
44            service_name: "unknown_service".to_string(),
45            scope_name: "fast-telemetry".to_string(),
46            resource_attributes: Vec::new(),
47            timeout: Duration::from_secs(10),
48            headers: Vec::new(),
49            max_batch_size: 512,
50        }
51    }
52}
53
54impl SpanExportConfig {
55    pub fn new(endpoint: impl Into<String>) -> Self {
56        Self {
57            endpoint: endpoint.into(),
58            ..Default::default()
59        }
60    }
61
62    pub fn with_interval(mut self, interval: Duration) -> Self {
63        self.interval = interval;
64        self
65    }
66
67    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
68        self.service_name = name.into();
69        self
70    }
71
72    pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
73        self.scope_name = name.into();
74        self
75    }
76
77    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
78        self.resource_attributes.push((key.into(), value.into()));
79        self
80    }
81
82    pub fn with_timeout(mut self, timeout: Duration) -> Self {
83        self.timeout = timeout;
84        self
85    }
86
87    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
88        self.headers.push((name.into(), value.into()));
89        self
90    }
91
92    pub fn with_max_batch_size(mut self, size: usize) -> Self {
93        self.max_batch_size = size;
94        self
95    }
96}
97
98/// Maximum backoff delay between retries after export failures.
99const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101/// Base backoff delay after the first failure.
102const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104/// Minimum payload size (bytes) before gzip compression is applied.
105const GZIP_THRESHOLD: usize = 1024;
106
107fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
108    if data.len() < GZIP_THRESHOLD {
109        return false;
110    }
111    use flate2::Compression;
112    use flate2::write::GzEncoder;
113    use std::io::Write;
114
115    out.clear();
116    let mut encoder = GzEncoder::new(out, Compression::fast());
117    let _ = encoder.write_all(data);
118    let _ = encoder.finish();
119    true
120}
121
122async fn send_otlp(
123    client: &reqwest::Client,
124    url: &str,
125    body: &[u8],
126    gzip_buf: &mut Vec<u8>,
127    extra_headers: &[(String, String)],
128) -> Result<reqwest::Response, reqwest::Error> {
129    let mut req = client
130        .post(url)
131        .header("Content-Type", "application/x-protobuf");
132
133    for (name, value) in extra_headers {
134        req = req.header(name, value);
135    }
136
137    if gzip_compress(body, gzip_buf) {
138        req.header("Content-Encoding", "gzip")
139            .body(gzip_buf.clone())
140            .send()
141            .await
142    } else {
143        req.body(body.to_vec()).send().await
144    }
145}
146
147/// Spawn the span exporter on a dedicated thread with its own single-threaded
148/// tokio runtime, matching the original design that avoids contending with the
149/// application's async runtime.
150pub fn spawn(
151    collector: Arc<SpanCollector>,
152    config: SpanExportConfig,
153    cancel: CancellationToken,
154) -> Option<std::thread::JoinHandle<()>> {
155    std::thread::Builder::new()
156        .name("span-exporter".to_string())
157        .spawn(move || {
158            let rt = tokio::runtime::Builder::new_current_thread()
159                .enable_all()
160                .build()
161                .expect("span exporter runtime");
162            rt.block_on(run(collector, config, cancel));
163        })
164        .ok()
165}
166
167/// Run the OTLP span export loop.
168///
169/// Drains completed spans from the collector in batches and sends them to
170/// `/v1/traces`. On cancellation, a final drain+export is performed.
171///
172/// # Example
173///
174/// ```ignore
175/// use std::sync::Arc;
176/// use std::time::Duration;
177///
178/// use fast_telemetry::SpanCollector;
179/// use fast_telemetry_export::spans::{SpanExportConfig, spawn};
180/// use tokio_util::sync::CancellationToken;
181///
182/// let collector = Arc::new(SpanCollector::new(4, 4096));
183/// let cancel = CancellationToken::new();
184/// let config = SpanExportConfig::new("http://otel-collector:4318")
185///     .with_service_name("myapp")
186///     .with_scope_name("proxy")
187///     .with_header("Authorization", "Bearer <token>")
188///     .with_timeout(Duration::from_secs(5))
189///     .with_max_batch_size(1024);
190///
191/// spawn(collector, config, cancel);
192/// ```
193pub async fn run(
194    collector: Arc<SpanCollector>,
195    config: SpanExportConfig,
196    cancel: CancellationToken,
197) {
198    let url = format!("{}/v1/traces", config.endpoint.trim_end_matches('/'));
199
200    log::info!(
201        "Starting OTLP span exporter, endpoint={url}, service={}",
202        config.service_name
203    );
204
205    let attr_refs: Vec<(&str, &str)> = config
206        .resource_attributes
207        .iter()
208        .map(|(k, v)| (k.as_str(), v.as_str()))
209        .collect();
210    let resource = build_resource(&config.service_name, &attr_refs);
211
212    let client = match reqwest::Client::builder().timeout(config.timeout).build() {
213        Ok(c) => c,
214        Err(e) => {
215            log::error!("Failed to build HTTP client for span exporter: {e}");
216            return;
217        }
218    };
219
220    let mut interval = tokio::time::interval(config.interval);
221    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222    interval.tick().await;
223
224    let mut consecutive_failures: u32 = 0;
225    let mut bufs = SpanExportBufs {
226        spans: Vec::with_capacity(config.max_batch_size),
227        encode: Vec::new(),
228        gzip: Vec::new(),
229    };
230
231    let ctx = SpanExportContext {
232        client: &client,
233        url: &url,
234        collector: &collector,
235        resource: &resource,
236        config: &config,
237    };
238
239    loop {
240        tokio::select! {
241            _ = interval.tick() => {}
242            _ = cancel.cancelled() => {
243                log::info!("Span exporter shutting down, performing final export");
244                export_once(&ctx, &mut bufs).await;
245                return;
246            }
247        }
248
249        if consecutive_failures > 0 {
250            let backoff = backoff_with_jitter(consecutive_failures);
251            log::debug!(
252                "Span export backing off {}ms (failures={consecutive_failures})",
253                backoff.as_millis()
254            );
255            tokio::select! {
256                _ = tokio::time::sleep(backoff) => {}
257                _ = cancel.cancelled() => {
258                    export_once(&ctx, &mut bufs).await;
259                    return;
260                }
261            }
262        }
263
264        bufs.spans.clear();
265        collector.drain_into(&mut bufs.spans);
266
267        if bufs.spans.is_empty() {
268            continue;
269        }
270
271        let total_drained = bufs.spans.len();
272        let dropped = total_drained.saturating_sub(config.max_batch_size);
273        bufs.spans.truncate(config.max_batch_size);
274        let span_count = bufs.spans.len();
275
276        if dropped > 0 {
277            log::debug!("Span export dropped {dropped} excess spans (exported {span_count})");
278        }
279
280        let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
281        let request = build_trace_export_request(&resource, &config.scope_name, otlp_spans);
282
283        bufs.encode.clear();
284        if let Err(e) = request.encode(&mut bufs.encode) {
285            log::warn!("Span protobuf encode failed: {e}");
286            continue;
287        }
288
289        let body_len = bufs.encode.len();
290
291        match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
292            Ok(resp) if resp.status().is_success() => {
293                consecutive_failures = 0;
294                log::debug!("Exported {span_count} spans ({body_len} bytes)");
295            }
296            Ok(resp) => {
297                consecutive_failures = consecutive_failures.saturating_add(1);
298                let status = resp.status();
299                let body = resp.text().await.unwrap_or_default();
300                log::warn!("Span export failed: status={status}, body={body}");
301            }
302            Err(e) => {
303                consecutive_failures = consecutive_failures.saturating_add(1);
304                log::warn!("Span export request failed: {e}");
305            }
306        }
307    }
308}
309
310struct SpanExportContext<'a> {
311    client: &'a reqwest::Client,
312    url: &'a str,
313    collector: &'a SpanCollector,
314    resource: &'a pb::Resource,
315    config: &'a SpanExportConfig,
316}
317
318struct SpanExportBufs {
319    spans: Vec<fast_telemetry::span::CompletedSpan>,
320    encode: Vec<u8>,
321    gzip: Vec<u8>,
322}
323
324async fn export_once(ctx: &SpanExportContext<'_>, bufs: &mut SpanExportBufs) {
325    bufs.spans.clear();
326    ctx.collector.drain_into(&mut bufs.spans);
327
328    if bufs.spans.is_empty() {
329        return;
330    }
331
332    let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
333    let request = build_trace_export_request(ctx.resource, &ctx.config.scope_name, otlp_spans);
334
335    bufs.encode.clear();
336    if let Err(e) = request.encode(&mut bufs.encode) {
337        log::warn!("Final span protobuf encode failed: {e}");
338        return;
339    }
340
341    match send_otlp(
342        ctx.client,
343        ctx.url,
344        &bufs.encode,
345        &mut bufs.gzip,
346        &ctx.config.headers,
347    )
348    .await
349    {
350        Ok(resp) if !resp.status().is_success() => {
351            let status = resp.status();
352            let body = resp.text().await.unwrap_or_default();
353            log::warn!("Final span export returned {status}: {body}");
354        }
355        Err(e) => log::warn!("Final span export failed: {e}"),
356        _ => {}
357    }
358}
359
360fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
361    let exp = consecutive_failures.min(10);
362    let base_ms = BASE_BACKOFF.as_millis() as u64;
363    let backoff_ms = base_ms
364        .saturating_mul(1u64 << exp)
365        .min(MAX_BACKOFF.as_millis() as u64);
366
367    let nanos = SystemTime::now()
368        .duration_since(std::time::UNIX_EPOCH)
369        .unwrap_or_default()
370        .subsec_nanos();
371    let jitter_range = (backoff_ms / 4).max(1);
372    let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
373    let final_ms = backoff_ms.saturating_add(jitter);
374
375    Duration::from_millis(final_ms)
376}