1use std::time::Duration;
15
16use fast_telemetry::otlp::{build_export_request, build_resource, pb};
17use prost::Message;
18use tokio::time::{MissedTickBehavior, interval};
19use tokio_util::sync::CancellationToken;
20
21#[derive(Clone)]
23pub struct OtlpConfig {
24 pub endpoint: String,
27 pub interval: Duration,
29 pub service_name: String,
31 pub scope_name: String,
33 pub resource_attributes: Vec<(String, String)>,
35 pub timeout: Duration,
37 pub headers: Vec<(String, String)>,
43}
44
45impl Default for OtlpConfig {
46 fn default() -> Self {
47 Self {
48 endpoint: "http://localhost:4318".to_string(),
49 interval: Duration::from_secs(60),
50 service_name: "unknown_service".to_string(),
51 scope_name: "fast-telemetry".to_string(),
52 resource_attributes: Vec::new(),
53 timeout: Duration::from_secs(10),
54 headers: Vec::new(),
55 }
56 }
57}
58
59impl OtlpConfig {
60 pub fn new(endpoint: impl Into<String>) -> Self {
61 Self {
62 endpoint: endpoint.into(),
63 ..Default::default()
64 }
65 }
66
67 pub fn with_interval(mut self, interval: Duration) -> Self {
68 self.interval = interval;
69 self
70 }
71
72 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
73 self.service_name = name.into();
74 self
75 }
76
77 pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
78 self.scope_name = name.into();
79 self
80 }
81
82 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83 self.resource_attributes.push((key.into(), value.into()));
84 self
85 }
86
87 pub fn with_timeout(mut self, timeout: Duration) -> Self {
88 self.timeout = timeout;
89 self
90 }
91
92 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
93 self.headers.push((name.into(), value.into()));
94 self
95 }
96}
97
98const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104const GZIP_THRESHOLD: usize = 1024;
107
108fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
113 if data.len() < GZIP_THRESHOLD {
114 return false;
115 }
116 use flate2::Compression;
117 use flate2::write::GzEncoder;
118 use std::io::Write;
119
120 out.clear();
121 let mut encoder = GzEncoder::new(out, Compression::fast());
122 let _ = encoder.write_all(data);
123 let _ = encoder.finish();
124 true
125}
126
127async fn send_otlp(
129 client: &reqwest::Client,
130 url: &str,
131 body: &[u8],
132 gzip_buf: &mut Vec<u8>,
133 extra_headers: &[(String, String)],
134) -> Result<reqwest::Response, reqwest::Error> {
135 let mut req = client
136 .post(url)
137 .header("Content-Type", "application/x-protobuf");
138
139 for (name, value) in extra_headers {
140 req = req.header(name, value);
141 }
142
143 if gzip_compress(body, gzip_buf) {
144 req.header("Content-Encoding", "gzip")
145 .body(gzip_buf.clone())
146 .send()
147 .await
148 } else {
149 req.body(body.to_vec()).send().await
150 }
151}
152
153pub async fn run<F>(config: OtlpConfig, cancel: CancellationToken, mut collect_fn: F)
186where
187 F: FnMut(&mut Vec<pb::Metric>),
188{
189 let url = format!("{}/v1/metrics", config.endpoint.trim_end_matches('/'));
190
191 log::info!(
192 "Starting OTLP metrics exporter, endpoint={url}, interval={}s",
193 config.interval.as_secs()
194 );
195
196 let attr_refs: Vec<(&str, &str)> = config
197 .resource_attributes
198 .iter()
199 .map(|(k, v)| (k.as_str(), v.as_str()))
200 .collect();
201 let resource = build_resource(&config.service_name, &attr_refs);
202
203 let client = match reqwest::Client::builder().timeout(config.timeout).build() {
204 Ok(c) => c,
205 Err(e) => {
206 log::error!("Failed to build HTTP client for OTLP exporter: {e}");
207 return;
208 }
209 };
210
211 let mut interval = interval(config.interval);
212 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
213 interval.tick().await;
214
215 let mut consecutive_failures: u32 = 0;
216 let mut bufs = ExportBufs::default();
217
218 let ctx = ExportContext {
219 client: &client,
220 url: &url,
221 resource: &resource,
222 scope_name: &config.scope_name,
223 extra_headers: &config.headers,
224 };
225
226 loop {
227 tokio::select! {
228 _ = interval.tick() => {}
229 _ = cancel.cancelled() => {
230 log::info!("OTLP metrics exporter shutting down, performing final export");
231 export_once(&ctx, &mut collect_fn, &mut bufs).await;
232 return;
233 }
234 }
235
236 if consecutive_failures > 0 {
237 let backoff = backoff_with_jitter(consecutive_failures);
238 log::debug!(
239 "OTLP export backing off {}ms (failures={consecutive_failures})",
240 backoff.as_millis()
241 );
242 tokio::select! {
243 _ = tokio::time::sleep(backoff) => {}
244 _ = cancel.cancelled() => {
245 export_once(&ctx, &mut collect_fn, &mut bufs).await;
246 return;
247 }
248 }
249 }
250
251 let mut metric_messages = Vec::new();
252 collect_fn(&mut metric_messages);
253
254 if metric_messages.is_empty() {
255 continue;
256 }
257
258 let metric_count = metric_messages.len();
259 let request = build_export_request(&resource, &config.scope_name, metric_messages);
260
261 bufs.encode.clear();
262 if let Err(e) = request.encode(&mut bufs.encode) {
263 log::warn!("OTLP protobuf encode failed: {e}");
264 continue;
265 }
266 let body_len = bufs.encode.len();
267
268 match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
269 Ok(resp) if resp.status().is_success() => {
270 consecutive_failures = 0;
271 log::debug!("Exported {metric_count} OTLP metrics ({body_len} bytes)");
272 }
273 Ok(resp) => {
274 consecutive_failures = consecutive_failures.saturating_add(1);
275 let status = resp.status();
276 let body = resp.text().await.unwrap_or_default();
277 log::warn!("OTLP export failed: status={status}, body={body}");
278 }
279 Err(e) => {
280 consecutive_failures = consecutive_failures.saturating_add(1);
281 log::warn!("OTLP export request failed: {e}");
282 }
283 }
284 }
285}
286
287struct ExportContext<'a> {
288 client: &'a reqwest::Client,
289 url: &'a str,
290 resource: &'a pb::Resource,
291 scope_name: &'a str,
292 extra_headers: &'a [(String, String)],
293}
294
295#[derive(Default)]
296struct ExportBufs {
297 encode: Vec<u8>,
298 gzip: Vec<u8>,
299}
300
301async fn export_once<F>(ctx: &ExportContext<'_>, collect_fn: &mut F, bufs: &mut ExportBufs)
302where
303 F: FnMut(&mut Vec<pb::Metric>),
304{
305 let mut metric_messages = Vec::new();
306 collect_fn(&mut metric_messages);
307
308 if metric_messages.is_empty() {
309 return;
310 }
311
312 let request = build_export_request(ctx.resource, ctx.scope_name, metric_messages);
313
314 bufs.encode.clear();
315 if let Err(e) = request.encode(&mut bufs.encode) {
316 log::warn!("Final OTLP protobuf encode failed: {e}");
317 return;
318 }
319
320 match send_otlp(
321 ctx.client,
322 ctx.url,
323 &bufs.encode,
324 &mut bufs.gzip,
325 ctx.extra_headers,
326 )
327 .await
328 {
329 Ok(resp) if !resp.status().is_success() => {
330 let status = resp.status();
331 let body = resp.text().await.unwrap_or_default();
332 log::warn!("Final OTLP export returned {status}: {body}");
333 }
334 Err(e) => log::warn!("Final OTLP export failed: {e}"),
335 _ => {}
336 }
337}
338
339fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
341 let exp = consecutive_failures.min(10);
342 let base_ms = BASE_BACKOFF.as_millis() as u64;
343 let backoff_ms = base_ms
344 .saturating_mul(1u64 << exp)
345 .min(MAX_BACKOFF.as_millis() as u64);
346
347 let nanos = std::time::SystemTime::now()
348 .duration_since(std::time::UNIX_EPOCH)
349 .unwrap_or_default()
350 .subsec_nanos();
351 let jitter_range = (backoff_ms / 4).max(1);
352 let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
353 let final_ms = backoff_ms.saturating_add(jitter);
354
355 Duration::from_millis(final_ms)
356}