1use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use fast_telemetry::otlp::{build_resource, build_trace_export_request, pb};
12use fast_telemetry::span::SpanCollector;
13use prost::Message;
14use tokio::time::MissedTickBehavior;
15use tokio_util::sync::CancellationToken;
16
17#[derive(Clone)]
19pub struct SpanExportConfig {
20 pub endpoint: String,
23 pub interval: Duration,
25 pub service_name: String,
27 pub scope_name: String,
29 pub resource_attributes: Vec<(String, String)>,
31 pub timeout: Duration,
33 pub headers: Vec<(String, String)>,
35 pub max_batch_size: usize,
37}
38
39impl Default for SpanExportConfig {
40 fn default() -> Self {
41 Self {
42 endpoint: "http://localhost:4318".to_string(),
43 interval: Duration::from_secs(10),
44 service_name: "unknown_service".to_string(),
45 scope_name: "fast-telemetry".to_string(),
46 resource_attributes: Vec::new(),
47 timeout: Duration::from_secs(10),
48 headers: Vec::new(),
49 max_batch_size: 512,
50 }
51 }
52}
53
54impl SpanExportConfig {
55 pub fn new(endpoint: impl Into<String>) -> Self {
56 Self {
57 endpoint: endpoint.into(),
58 ..Default::default()
59 }
60 }
61
62 pub fn with_interval(mut self, interval: Duration) -> Self {
63 self.interval = interval;
64 self
65 }
66
67 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
68 self.service_name = name.into();
69 self
70 }
71
72 pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
73 self.scope_name = name.into();
74 self
75 }
76
77 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
78 self.resource_attributes.push((key.into(), value.into()));
79 self
80 }
81
82 pub fn with_timeout(mut self, timeout: Duration) -> Self {
83 self.timeout = timeout;
84 self
85 }
86
87 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
88 self.headers.push((name.into(), value.into()));
89 self
90 }
91
92 pub fn with_max_batch_size(mut self, size: usize) -> Self {
93 self.max_batch_size = size;
94 self
95 }
96}
97
98const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104const GZIP_THRESHOLD: usize = 1024;
106
107fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
108 if data.len() < GZIP_THRESHOLD {
109 return false;
110 }
111 use flate2::Compression;
112 use flate2::write::GzEncoder;
113 use std::io::Write;
114
115 out.clear();
116 let mut encoder = GzEncoder::new(out, Compression::fast());
117 let _ = encoder.write_all(data);
118 let _ = encoder.finish();
119 true
120}
121
122async fn send_otlp(
123 client: &reqwest::Client,
124 url: &str,
125 body: &[u8],
126 gzip_buf: &mut Vec<u8>,
127 extra_headers: &[(String, String)],
128) -> Result<reqwest::Response, reqwest::Error> {
129 let mut req = client
130 .post(url)
131 .header("Content-Type", "application/x-protobuf");
132
133 for (name, value) in extra_headers {
134 req = req.header(name, value);
135 }
136
137 if gzip_compress(body, gzip_buf) {
138 req.header("Content-Encoding", "gzip")
139 .body(gzip_buf.clone())
140 .send()
141 .await
142 } else {
143 req.body(body.to_vec()).send().await
144 }
145}
146
147pub fn spawn(
151 collector: Arc<SpanCollector>,
152 config: SpanExportConfig,
153 cancel: CancellationToken,
154) -> Option<std::thread::JoinHandle<()>> {
155 std::thread::Builder::new()
156 .name("span-exporter".to_string())
157 .spawn(move || {
158 let rt = tokio::runtime::Builder::new_current_thread()
159 .enable_all()
160 .build()
161 .expect("span exporter runtime");
162 rt.block_on(run(collector, config, cancel));
163 })
164 .ok()
165}
166
167pub async fn run(
194 collector: Arc<SpanCollector>,
195 config: SpanExportConfig,
196 cancel: CancellationToken,
197) {
198 let url = format!("{}/v1/traces", config.endpoint.trim_end_matches('/'));
199
200 log::info!(
201 "Starting OTLP span exporter, endpoint={url}, service={}",
202 config.service_name
203 );
204
205 let attr_refs: Vec<(&str, &str)> = config
206 .resource_attributes
207 .iter()
208 .map(|(k, v)| (k.as_str(), v.as_str()))
209 .collect();
210 let resource = build_resource(&config.service_name, &attr_refs);
211
212 let client = match reqwest::Client::builder().timeout(config.timeout).build() {
213 Ok(c) => c,
214 Err(e) => {
215 log::error!("Failed to build HTTP client for span exporter: {e}");
216 return;
217 }
218 };
219
220 let mut interval = tokio::time::interval(config.interval);
221 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222 interval.tick().await;
223
224 let mut consecutive_failures: u32 = 0;
225 let mut bufs = SpanExportBufs {
226 spans: Vec::with_capacity(config.max_batch_size),
227 encode: Vec::new(),
228 gzip: Vec::new(),
229 };
230
231 let ctx = SpanExportContext {
232 client: &client,
233 url: &url,
234 collector: &collector,
235 resource: &resource,
236 config: &config,
237 };
238
239 loop {
240 tokio::select! {
241 _ = interval.tick() => {}
242 _ = cancel.cancelled() => {
243 log::info!("Span exporter shutting down, performing final export");
244 export_once(&ctx, &mut bufs).await;
245 return;
246 }
247 }
248
249 if consecutive_failures > 0 {
250 let backoff = backoff_with_jitter(consecutive_failures);
251 log::debug!(
252 "Span export backing off {}ms (failures={consecutive_failures})",
253 backoff.as_millis()
254 );
255 tokio::select! {
256 _ = tokio::time::sleep(backoff) => {}
257 _ = cancel.cancelled() => {
258 export_once(&ctx, &mut bufs).await;
259 return;
260 }
261 }
262 }
263
264 bufs.spans.clear();
265 collector.drain_into(&mut bufs.spans);
266
267 if bufs.spans.is_empty() {
268 continue;
269 }
270
271 let total_drained = bufs.spans.len();
272 let dropped = total_drained.saturating_sub(config.max_batch_size);
273 bufs.spans.truncate(config.max_batch_size);
274 let span_count = bufs.spans.len();
275
276 if dropped > 0 {
277 log::debug!("Span export dropped {dropped} excess spans (exported {span_count})");
278 }
279
280 let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
281 let request = build_trace_export_request(&resource, &config.scope_name, otlp_spans);
282
283 bufs.encode.clear();
284 if let Err(e) = request.encode(&mut bufs.encode) {
285 log::warn!("Span protobuf encode failed: {e}");
286 continue;
287 }
288
289 let body_len = bufs.encode.len();
290
291 match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
292 Ok(resp) if resp.status().is_success() => {
293 consecutive_failures = 0;
294 log::debug!("Exported {span_count} spans ({body_len} bytes)");
295 }
296 Ok(resp) => {
297 consecutive_failures = consecutive_failures.saturating_add(1);
298 let status = resp.status();
299 let body = resp.text().await.unwrap_or_default();
300 log::warn!("Span export failed: status={status}, body={body}");
301 }
302 Err(e) => {
303 consecutive_failures = consecutive_failures.saturating_add(1);
304 log::warn!("Span export request failed: {e}");
305 }
306 }
307 }
308}
309
310struct SpanExportContext<'a> {
311 client: &'a reqwest::Client,
312 url: &'a str,
313 collector: &'a SpanCollector,
314 resource: &'a pb::Resource,
315 config: &'a SpanExportConfig,
316}
317
318struct SpanExportBufs {
319 spans: Vec<fast_telemetry::span::CompletedSpan>,
320 encode: Vec<u8>,
321 gzip: Vec<u8>,
322}
323
324async fn export_once(ctx: &SpanExportContext<'_>, bufs: &mut SpanExportBufs) {
325 bufs.spans.clear();
326 ctx.collector.drain_into(&mut bufs.spans);
327
328 if bufs.spans.is_empty() {
329 return;
330 }
331
332 let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
333 let request = build_trace_export_request(ctx.resource, &ctx.config.scope_name, otlp_spans);
334
335 bufs.encode.clear();
336 if let Err(e) = request.encode(&mut bufs.encode) {
337 log::warn!("Final span protobuf encode failed: {e}");
338 return;
339 }
340
341 match send_otlp(
342 ctx.client,
343 ctx.url,
344 &bufs.encode,
345 &mut bufs.gzip,
346 &ctx.config.headers,
347 )
348 .await
349 {
350 Ok(resp) if !resp.status().is_success() => {
351 let status = resp.status();
352 let body = resp.text().await.unwrap_or_default();
353 log::warn!("Final span export returned {status}: {body}");
354 }
355 Err(e) => log::warn!("Final span export failed: {e}"),
356 _ => {}
357 }
358}
359
360fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
361 let exp = consecutive_failures.min(10);
362 let base_ms = BASE_BACKOFF.as_millis() as u64;
363 let backoff_ms = base_ms
364 .saturating_mul(1u64 << exp)
365 .min(MAX_BACKOFF.as_millis() as u64);
366
367 let nanos = SystemTime::now()
368 .duration_since(std::time::UNIX_EPOCH)
369 .unwrap_or_default()
370 .subsec_nanos();
371 let jitter_range = (backoff_ms / 4).max(1);
372 let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
373 let final_ms = backoff_ms.saturating_add(jitter);
374
375 Duration::from_millis(final_ms)
376}