Skip to main content

fast_telemetry_export/
spans.rs

1//! OTLP HTTP/protobuf span exporter.
2//!
3//! Exports completed spans from an [`fast_telemetry::span::SpanCollector`] to an
4//! OTLP-compatible collector via HTTP POST to `/v1/traces`.
5//! Larger payloads are gzip-compressed automatically, and failed exports retry
6//! with exponential backoff.
7
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use fast_telemetry::otlp::{build_resource, build_trace_export_request, pb};
12use fast_telemetry::span::SpanCollector;
13use prost::Message;
14use tokio::time::MissedTickBehavior;
15use tokio_util::sync::CancellationToken;
16
17/// Configuration for the OTLP span exporter.
18#[derive(Clone)]
19pub struct SpanExportConfig {
20    /// OTLP collector endpoint (scheme + host + port), e.g. `"http://localhost:4318"`.
21    /// The path `/v1/traces` is appended automatically.
22    pub endpoint: String,
23    /// Export interval (default: 10s).
24    pub interval: Duration,
25    /// `service.name` resource attribute.
26    pub service_name: String,
27    /// Instrumentation scope name (default: "fast-telemetry").
28    pub scope_name: String,
29    /// Additional resource attributes.
30    pub resource_attributes: Vec<(String, String)>,
31    /// Request timeout (default: 10s).
32    pub timeout: Duration,
33    /// Extra HTTP headers sent with every export request.
34    pub headers: Vec<(String, String)>,
35    /// Maximum number of spans per export batch (default: 512).
36    pub max_batch_size: usize,
37}
38
39impl Default for SpanExportConfig {
40    fn default() -> Self {
41        Self {
42            endpoint: "http://localhost:4318".to_string(),
43            interval: Duration::from_secs(10),
44            service_name: "unknown_service".to_string(),
45            scope_name: "fast-telemetry".to_string(),
46            resource_attributes: Vec::new(),
47            timeout: Duration::from_secs(10),
48            headers: Vec::new(),
49            max_batch_size: 512,
50        }
51    }
52}
53
54impl SpanExportConfig {
55    pub fn new(endpoint: impl Into<String>) -> Self {
56        Self {
57            endpoint: endpoint.into(),
58            ..Default::default()
59        }
60    }
61
62    pub fn with_interval(mut self, interval: Duration) -> Self {
63        self.interval = interval;
64        self
65    }
66
67    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
68        self.service_name = name.into();
69        self
70    }
71
72    pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
73        self.scope_name = name.into();
74        self
75    }
76
77    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
78        self.resource_attributes.push((key.into(), value.into()));
79        self
80    }
81
82    pub fn with_timeout(mut self, timeout: Duration) -> Self {
83        self.timeout = timeout;
84        self
85    }
86
87    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
88        self.headers.push((name.into(), value.into()));
89        self
90    }
91
92    pub fn with_max_batch_size(mut self, size: usize) -> Self {
93        self.max_batch_size = size;
94        self
95    }
96}
97
98/// Maximum backoff delay between retries after export failures.
99const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101/// Base backoff delay after the first failure.
102const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104/// Minimum payload size (bytes) before gzip compression is applied.
105const GZIP_THRESHOLD: usize = 1024;
106
107fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
108    if data.len() < GZIP_THRESHOLD {
109        return false;
110    }
111    use flate2::Compression;
112    use flate2::write::GzEncoder;
113    use std::io::Write;
114
115    out.clear();
116    let mut encoder = GzEncoder::new(out, Compression::fast());
117    let _ = encoder.write_all(data);
118    let _ = encoder.finish();
119    true
120}
121
122async fn send_otlp(
123    client: &reqwest::Client,
124    url: &str,
125    body: &[u8],
126    gzip_buf: &mut Vec<u8>,
127    extra_headers: &[(String, String)],
128) -> Result<reqwest::Response, reqwest::Error> {
129    let mut req = client
130        .post(url)
131        .header("Content-Type", "application/x-protobuf");
132
133    for (name, value) in extra_headers {
134        req = req.header(name, value);
135    }
136
137    if gzip_compress(body, gzip_buf) {
138        req.header("Content-Encoding", "gzip")
139            .body(gzip_buf.clone())
140            .send()
141            .await
142    } else {
143        req.body(body.to_vec()).send().await
144    }
145}
146
147/// Spawn the span exporter on a dedicated thread with its own single-threaded
148/// tokio runtime, matching the original design that avoids contending with the
149/// application's async runtime.
150pub fn spawn(
151    collector: Arc<SpanCollector>,
152    config: SpanExportConfig,
153    cancel: CancellationToken,
154) -> Option<std::thread::JoinHandle<()>> {
155    std::thread::Builder::new()
156        .name("span-exporter".to_string())
157        .spawn(move || {
158            let rt = tokio::runtime::Builder::new_current_thread()
159                .enable_all()
160                .build()
161                .expect("span exporter runtime");
162            rt.block_on(run(collector, config, cancel));
163        })
164        .ok()
165}
166
167/// Periodically flush this monoio worker's thread-local span buffer.
168///
169/// [`SpanCollector::drain_into`] can only drain spans that have already moved
170/// from a worker's thread-local buffer into the shared outbox. On long-lived
171/// monoio workers, low-volume spans may sit below the collector's automatic
172/// flush threshold for a long time, so run one of these tasks on each monoio
173/// worker that records spans.
174///
175/// The actual OTLP span exporter can remain [`spawn`], which runs on its own
176/// private Tokio runtime and drains the shared outboxes.
177#[cfg(feature = "monoio")]
178pub async fn run_local_flusher_monoio(
179    collector: Arc<SpanCollector>,
180    interval: Duration,
181    cancel: CancellationToken,
182) {
183    use monoio::time::MissedTickBehavior;
184
185    let mut interval = monoio::time::interval(interval);
186    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
187    interval.tick().await;
188
189    loop {
190        monoio::select! {
191            _ = interval.tick() => {
192                collector.flush_local();
193            }
194            _ = cancel.cancelled() => {
195                collector.flush_local();
196                return;
197            }
198        }
199    }
200}
201
202/// Run the OTLP span export loop.
203///
204/// Drains completed spans from the collector in batches and sends them to
205/// `/v1/traces`. On cancellation, a final drain+export is performed.
206///
207/// # Example
208///
209/// ```ignore
210/// use std::sync::Arc;
211/// use std::time::Duration;
212///
213/// use fast_telemetry::SpanCollector;
214/// use fast_telemetry_export::spans::{SpanExportConfig, spawn};
215/// use tokio_util::sync::CancellationToken;
216///
217/// let collector = Arc::new(SpanCollector::new(4, 4096));
218/// let cancel = CancellationToken::new();
219/// let config = SpanExportConfig::new("http://otel-collector:4318")
220///     .with_service_name("myapp")
221///     .with_scope_name("proxy")
222///     .with_header("Authorization", "Bearer <token>")
223///     .with_timeout(Duration::from_secs(5))
224///     .with_max_batch_size(1024);
225///
226/// spawn(collector, config, cancel);
227/// ```
228pub async fn run(
229    collector: Arc<SpanCollector>,
230    config: SpanExportConfig,
231    cancel: CancellationToken,
232) {
233    let url = format!("{}/v1/traces", config.endpoint.trim_end_matches('/'));
234
235    log::info!(
236        "Starting OTLP span exporter, endpoint={url}, service={}",
237        config.service_name
238    );
239
240    let attr_refs: Vec<(&str, &str)> = config
241        .resource_attributes
242        .iter()
243        .map(|(k, v)| (k.as_str(), v.as_str()))
244        .collect();
245    let resource = build_resource(&config.service_name, &attr_refs);
246
247    let client = match reqwest::Client::builder().timeout(config.timeout).build() {
248        Ok(c) => c,
249        Err(e) => {
250            log::error!("Failed to build HTTP client for span exporter: {e}");
251            return;
252        }
253    };
254
255    let mut interval = tokio::time::interval(config.interval);
256    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
257    interval.tick().await;
258
259    let mut consecutive_failures: u32 = 0;
260    let mut bufs = SpanExportBufs {
261        spans: Vec::with_capacity(config.max_batch_size),
262        encode: Vec::new(),
263        gzip: Vec::new(),
264    };
265
266    let ctx = SpanExportContext {
267        client: &client,
268        url: &url,
269        collector: &collector,
270        resource: &resource,
271        config: &config,
272    };
273
274    loop {
275        tokio::select! {
276            _ = interval.tick() => {}
277            _ = cancel.cancelled() => {
278                log::info!("Span exporter shutting down, performing final export");
279                export_once(&ctx, &mut bufs).await;
280                return;
281            }
282        }
283
284        if consecutive_failures > 0 {
285            let backoff = backoff_with_jitter(consecutive_failures);
286            log::debug!(
287                "Span export backing off {}ms (failures={consecutive_failures})",
288                backoff.as_millis()
289            );
290            tokio::select! {
291                _ = tokio::time::sleep(backoff) => {}
292                _ = cancel.cancelled() => {
293                    export_once(&ctx, &mut bufs).await;
294                    return;
295                }
296            }
297        }
298
299        bufs.spans.clear();
300        collector.drain_into(&mut bufs.spans);
301
302        if bufs.spans.is_empty() {
303            continue;
304        }
305
306        let total_drained = bufs.spans.len();
307        let dropped = total_drained.saturating_sub(config.max_batch_size);
308        bufs.spans.truncate(config.max_batch_size);
309        let span_count = bufs.spans.len();
310
311        if dropped > 0 {
312            log::debug!("Span export dropped {dropped} excess spans (exported {span_count})");
313        }
314
315        let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
316        let request = build_trace_export_request(&resource, &config.scope_name, otlp_spans);
317
318        bufs.encode.clear();
319        if let Err(e) = request.encode(&mut bufs.encode) {
320            log::warn!("Span protobuf encode failed: {e}");
321            continue;
322        }
323
324        let body_len = bufs.encode.len();
325
326        match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
327            Ok(resp) if resp.status().is_success() => {
328                consecutive_failures = 0;
329                log::debug!("Exported {span_count} spans ({body_len} bytes)");
330            }
331            Ok(resp) => {
332                consecutive_failures = consecutive_failures.saturating_add(1);
333                let status = resp.status();
334                let body = resp.text().await.unwrap_or_default();
335                log::warn!("Span export failed: status={status}, body={body}");
336            }
337            Err(e) => {
338                consecutive_failures = consecutive_failures.saturating_add(1);
339                log::warn!("Span export request failed: {e}");
340            }
341        }
342    }
343}
344
345struct SpanExportContext<'a> {
346    client: &'a reqwest::Client,
347    url: &'a str,
348    collector: &'a SpanCollector,
349    resource: &'a pb::Resource,
350    config: &'a SpanExportConfig,
351}
352
353struct SpanExportBufs {
354    spans: Vec<fast_telemetry::span::CompletedSpan>,
355    encode: Vec<u8>,
356    gzip: Vec<u8>,
357}
358
359async fn export_once(ctx: &SpanExportContext<'_>, bufs: &mut SpanExportBufs) {
360    bufs.spans.clear();
361    ctx.collector.drain_into(&mut bufs.spans);
362
363    if bufs.spans.is_empty() {
364        return;
365    }
366
367    let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
368    let request = build_trace_export_request(ctx.resource, &ctx.config.scope_name, otlp_spans);
369
370    bufs.encode.clear();
371    if let Err(e) = request.encode(&mut bufs.encode) {
372        log::warn!("Final span protobuf encode failed: {e}");
373        return;
374    }
375
376    match send_otlp(
377        ctx.client,
378        ctx.url,
379        &bufs.encode,
380        &mut bufs.gzip,
381        &ctx.config.headers,
382    )
383    .await
384    {
385        Ok(resp) if !resp.status().is_success() => {
386            let status = resp.status();
387            let body = resp.text().await.unwrap_or_default();
388            log::warn!("Final span export returned {status}: {body}");
389        }
390        Err(e) => log::warn!("Final span export failed: {e}"),
391        _ => {}
392    }
393}
394
395fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
396    let exp = consecutive_failures.min(10);
397    let base_ms = BASE_BACKOFF.as_millis() as u64;
398    let backoff_ms = base_ms
399        .saturating_mul(1u64 << exp)
400        .min(MAX_BACKOFF.as_millis() as u64);
401
402    let nanos = SystemTime::now()
403        .duration_since(std::time::UNIX_EPOCH)
404        .unwrap_or_default()
405        .subsec_nanos();
406    let jitter_range = (backoff_ms / 4).max(1);
407    let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
408    let final_ms = backoff_ms.saturating_add(jitter);
409
410    Duration::from_millis(final_ms)
411}