Skip to main content

fast_telemetry_export/
otlp.rs

1//! OTLP HTTP/protobuf metrics exporter.
2//!
3//! Exports metrics to an OTLP-compatible collector (OpenTelemetry Collector,
4//! Grafana Alloy, Datadog OTLP intake, etc.) via HTTP POST of protobuf-encoded
5//! `ExportMetricsServiceRequest` to `/v1/metrics`.
6//!
7//! Uses cumulative temporality — no state tracking needed between export cycles.
8//! Larger payloads are gzip-compressed automatically, and failed exports retry
9//! with exponential backoff.
10//!
11//! The actual metric collection is provided by the caller via a closure,
12//! making this exporter work with any metrics struct.
13
14use std::time::Duration;
15
16use fast_telemetry::otlp::{build_export_request, build_resource, pb};
17use prost::Message;
18use tokio::time::{MissedTickBehavior, interval};
19use tokio_util::sync::CancellationToken;
20
21/// Configuration for the OTLP HTTP metrics exporter.
22#[derive(Clone)]
23pub struct OtlpConfig {
24    /// OTLP collector endpoint (scheme + host + port), e.g. `"http://localhost:4318"`.
25    /// The path `/v1/metrics` is appended automatically.
26    pub endpoint: String,
27    /// Export interval (default: 60s).
28    pub interval: Duration,
29    /// `service.name` resource attribute.
30    pub service_name: String,
31    /// Instrumentation scope name (default: "fast-telemetry").
32    pub scope_name: String,
33    /// Additional resource attributes (e.g. `("service.version", "1.0")`).
34    pub resource_attributes: Vec<(String, String)>,
35    /// Request timeout (default: 10s).
36    pub timeout: Duration,
37    /// Extra HTTP headers sent with every export request.
38    ///
39    /// Use this for collector authentication, e.g.:
40    /// - `("Authorization", "Bearer <token>")`
41    /// - `("x-api-key", "<key>")`
42    pub headers: Vec<(String, String)>,
43}
44
45impl Default for OtlpConfig {
46    fn default() -> Self {
47        Self {
48            endpoint: "http://localhost:4318".to_string(),
49            interval: Duration::from_secs(60),
50            service_name: "unknown_service".to_string(),
51            scope_name: "fast-telemetry".to_string(),
52            resource_attributes: Vec::new(),
53            timeout: Duration::from_secs(10),
54            headers: Vec::new(),
55        }
56    }
57}
58
59impl OtlpConfig {
60    pub fn new(endpoint: impl Into<String>) -> Self {
61        Self {
62            endpoint: endpoint.into(),
63            ..Default::default()
64        }
65    }
66
67    pub fn with_interval(mut self, interval: Duration) -> Self {
68        self.interval = interval;
69        self
70    }
71
72    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
73        self.service_name = name.into();
74        self
75    }
76
77    pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
78        self.scope_name = name.into();
79        self
80    }
81
82    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83        self.resource_attributes.push((key.into(), value.into()));
84        self
85    }
86
87    pub fn with_timeout(mut self, timeout: Duration) -> Self {
88        self.timeout = timeout;
89        self
90    }
91
92    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
93        self.headers.push((name.into(), value.into()));
94        self
95    }
96}
97
98/// Maximum backoff delay between retries after export failures.
99const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101/// Base backoff delay after the first failure.
102const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104/// Minimum payload size (bytes) before gzip compression is applied.
105/// Below this threshold, compression overhead exceeds savings.
106const GZIP_THRESHOLD: usize = 1024;
107
108/// Gzip-compress `data` into `out` using fast compression (level 1).
109///
110/// Returns `true` if compression was applied, `false` if the payload was below
111/// the threshold (in which case `out` is untouched).
112fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
113    if data.len() < GZIP_THRESHOLD {
114        return false;
115    }
116    use flate2::Compression;
117    use flate2::write::GzEncoder;
118    use std::io::Write;
119
120    out.clear();
121    let mut encoder = GzEncoder::new(out, Compression::fast());
122    let _ = encoder.write_all(data);
123    let _ = encoder.finish();
124    true
125}
126
127/// Send a protobuf-encoded body, applying gzip when beneficial.
128async fn send_otlp(
129    client: &reqwest::Client,
130    url: &str,
131    body: &[u8],
132    gzip_buf: &mut Vec<u8>,
133    extra_headers: &[(String, String)],
134) -> Result<reqwest::Response, reqwest::Error> {
135    let mut req = client
136        .post(url)
137        .header("Content-Type", "application/x-protobuf");
138
139    for (name, value) in extra_headers {
140        req = req.header(name, value);
141    }
142
143    if gzip_compress(body, gzip_buf) {
144        req.header("Content-Encoding", "gzip")
145            .body(gzip_buf.clone())
146            .send()
147            .await
148    } else {
149        req.body(body.to_vec()).send().await
150    }
151}
152
153/// Run the OTLP metrics export loop.
154///
155/// `collect_fn` is called each cycle with a `&mut Vec<pb::Metric>`. The closure
156/// should append OTLP metric messages (typically via `ExportMetrics::export_otlp`).
157/// The exporter handles protobuf encoding, gzip compression, HTTP transport, and
158/// exponential backoff on failures.
159///
160/// On cancellation, a final export is performed to flush pending metrics.
161///
162/// # Example
163///
164/// ```ignore
165/// use std::sync::Arc;
166/// use std::time::Duration;
167///
168/// use fast_telemetry_export::otlp::{OtlpConfig, run};
169/// use tokio_util::sync::CancellationToken;
170///
171/// let metrics = Arc::new(MyMetrics::new());
172/// let cancel = CancellationToken::new();
173/// let config = OtlpConfig::new("http://otel-collector:4318")
174///     .with_service_name("myapp")
175///     .with_scope_name("proxy")
176///     .with_attribute("service.version", "1.0")
177///     .with_header("Authorization", "Bearer <token>")
178///     .with_timeout(Duration::from_secs(5));
179///
180/// let m = metrics.clone();
181/// tokio::spawn(run(config, cancel, move |out| {
182///     m.export_otlp(out);
183/// }));
184/// ```
185pub async fn run<F>(config: OtlpConfig, cancel: CancellationToken, mut collect_fn: F)
186where
187    F: FnMut(&mut Vec<pb::Metric>),
188{
189    let url = format!("{}/v1/metrics", config.endpoint.trim_end_matches('/'));
190
191    log::info!(
192        "Starting OTLP metrics exporter, endpoint={url}, interval={}s",
193        config.interval.as_secs()
194    );
195
196    let attr_refs: Vec<(&str, &str)> = config
197        .resource_attributes
198        .iter()
199        .map(|(k, v)| (k.as_str(), v.as_str()))
200        .collect();
201    let resource = build_resource(&config.service_name, &attr_refs);
202
203    let client = match reqwest::Client::builder().timeout(config.timeout).build() {
204        Ok(c) => c,
205        Err(e) => {
206            log::error!("Failed to build HTTP client for OTLP exporter: {e}");
207            return;
208        }
209    };
210
211    let mut interval = interval(config.interval);
212    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
213    interval.tick().await;
214
215    let mut consecutive_failures: u32 = 0;
216    let mut bufs = ExportBufs::default();
217
218    let ctx = ExportContext {
219        client: &client,
220        url: &url,
221        resource: &resource,
222        scope_name: &config.scope_name,
223        extra_headers: &config.headers,
224    };
225
226    loop {
227        tokio::select! {
228            _ = interval.tick() => {}
229            _ = cancel.cancelled() => {
230                log::info!("OTLP metrics exporter shutting down, performing final export");
231                export_once(&ctx, &mut collect_fn, &mut bufs).await;
232                return;
233            }
234        }
235
236        if consecutive_failures > 0 {
237            let backoff = backoff_with_jitter(consecutive_failures);
238            log::debug!(
239                "OTLP export backing off {}ms (failures={consecutive_failures})",
240                backoff.as_millis()
241            );
242            tokio::select! {
243                _ = tokio::time::sleep(backoff) => {}
244                _ = cancel.cancelled() => {
245                    export_once(&ctx, &mut collect_fn, &mut bufs).await;
246                    return;
247                }
248            }
249        }
250
251        let mut metric_messages = Vec::new();
252        collect_fn(&mut metric_messages);
253
254        if metric_messages.is_empty() {
255            continue;
256        }
257
258        let metric_count = metric_messages.len();
259        let request = build_export_request(&resource, &config.scope_name, metric_messages);
260
261        bufs.encode.clear();
262        if let Err(e) = request.encode(&mut bufs.encode) {
263            log::warn!("OTLP protobuf encode failed: {e}");
264            continue;
265        }
266        let body_len = bufs.encode.len();
267
268        match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
269            Ok(resp) if resp.status().is_success() => {
270                consecutive_failures = 0;
271                log::debug!("Exported {metric_count} OTLP metrics ({body_len} bytes)");
272            }
273            Ok(resp) => {
274                consecutive_failures = consecutive_failures.saturating_add(1);
275                let status = resp.status();
276                let body = resp.text().await.unwrap_or_default();
277                log::warn!("OTLP export failed: status={status}, body={body}");
278            }
279            Err(e) => {
280                consecutive_failures = consecutive_failures.saturating_add(1);
281                log::warn!("OTLP export request failed: {e}");
282            }
283        }
284    }
285}
286
287struct ExportContext<'a> {
288    client: &'a reqwest::Client,
289    url: &'a str,
290    resource: &'a pb::Resource,
291    scope_name: &'a str,
292    extra_headers: &'a [(String, String)],
293}
294
295#[derive(Default)]
296struct ExportBufs {
297    encode: Vec<u8>,
298    gzip: Vec<u8>,
299}
300
301async fn export_once<F>(ctx: &ExportContext<'_>, collect_fn: &mut F, bufs: &mut ExportBufs)
302where
303    F: FnMut(&mut Vec<pb::Metric>),
304{
305    let mut metric_messages = Vec::new();
306    collect_fn(&mut metric_messages);
307
308    if metric_messages.is_empty() {
309        return;
310    }
311
312    let request = build_export_request(ctx.resource, ctx.scope_name, metric_messages);
313
314    bufs.encode.clear();
315    if let Err(e) = request.encode(&mut bufs.encode) {
316        log::warn!("Final OTLP protobuf encode failed: {e}");
317        return;
318    }
319
320    match send_otlp(
321        ctx.client,
322        ctx.url,
323        &bufs.encode,
324        &mut bufs.gzip,
325        ctx.extra_headers,
326    )
327    .await
328    {
329        Ok(resp) if !resp.status().is_success() => {
330            let status = resp.status();
331            let body = resp.text().await.unwrap_or_default();
332            log::warn!("Final OTLP export returned {status}: {body}");
333        }
334        Err(e) => log::warn!("Final OTLP export failed: {e}"),
335        _ => {}
336    }
337}
338
339/// Compute backoff with jitter: min(MAX_BACKOFF, BASE_BACKOFF * 2^failures) +/- 25% jitter.
340fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
341    let exp = consecutive_failures.min(10);
342    let base_ms = BASE_BACKOFF.as_millis() as u64;
343    let backoff_ms = base_ms
344        .saturating_mul(1u64 << exp)
345        .min(MAX_BACKOFF.as_millis() as u64);
346
347    let nanos = std::time::SystemTime::now()
348        .duration_since(std::time::UNIX_EPOCH)
349        .unwrap_or_default()
350        .subsec_nanos();
351    let jitter_range = (backoff_ms / 4).max(1);
352    let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
353    let final_ms = backoff_ms.saturating_add(jitter);
354
355    Duration::from_millis(final_ms)
356}