1use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use fast_telemetry::otlp::{build_resource, build_trace_export_request, pb};
12use fast_telemetry::span::SpanCollector;
13use prost::Message;
14use tokio::time::MissedTickBehavior;
15use tokio_util::sync::CancellationToken;
16
17#[derive(Clone)]
19pub struct SpanExportConfig {
20 pub endpoint: String,
23 pub interval: Duration,
25 pub service_name: String,
27 pub scope_name: String,
29 pub resource_attributes: Vec<(String, String)>,
31 pub timeout: Duration,
33 pub headers: Vec<(String, String)>,
35 pub max_batch_size: usize,
37}
38
39impl Default for SpanExportConfig {
40 fn default() -> Self {
41 Self {
42 endpoint: "http://localhost:4318".to_string(),
43 interval: Duration::from_secs(10),
44 service_name: "unknown_service".to_string(),
45 scope_name: "fast-telemetry".to_string(),
46 resource_attributes: Vec::new(),
47 timeout: Duration::from_secs(10),
48 headers: Vec::new(),
49 max_batch_size: 512,
50 }
51 }
52}
53
54impl SpanExportConfig {
55 pub fn new(endpoint: impl Into<String>) -> Self {
56 Self {
57 endpoint: endpoint.into(),
58 ..Default::default()
59 }
60 }
61
62 pub fn with_interval(mut self, interval: Duration) -> Self {
63 self.interval = interval;
64 self
65 }
66
67 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
68 self.service_name = name.into();
69 self
70 }
71
72 pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
73 self.scope_name = name.into();
74 self
75 }
76
77 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
78 self.resource_attributes.push((key.into(), value.into()));
79 self
80 }
81
82 pub fn with_timeout(mut self, timeout: Duration) -> Self {
83 self.timeout = timeout;
84 self
85 }
86
87 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
88 self.headers.push((name.into(), value.into()));
89 self
90 }
91
92 pub fn with_max_batch_size(mut self, size: usize) -> Self {
93 self.max_batch_size = size;
94 self
95 }
96}
97
98const MAX_BACKOFF: Duration = Duration::from_secs(300);
100
101const BASE_BACKOFF: Duration = Duration::from_secs(5);
103
104const GZIP_THRESHOLD: usize = 1024;
106
107fn gzip_compress(data: &[u8], out: &mut Vec<u8>) -> bool {
108 if data.len() < GZIP_THRESHOLD {
109 return false;
110 }
111 use flate2::Compression;
112 use flate2::write::GzEncoder;
113 use std::io::Write;
114
115 out.clear();
116 let mut encoder = GzEncoder::new(out, Compression::fast());
117 let _ = encoder.write_all(data);
118 let _ = encoder.finish();
119 true
120}
121
122async fn send_otlp(
123 client: &reqwest::Client,
124 url: &str,
125 body: &[u8],
126 gzip_buf: &mut Vec<u8>,
127 extra_headers: &[(String, String)],
128) -> Result<reqwest::Response, reqwest::Error> {
129 let mut req = client
130 .post(url)
131 .header("Content-Type", "application/x-protobuf");
132
133 for (name, value) in extra_headers {
134 req = req.header(name, value);
135 }
136
137 if gzip_compress(body, gzip_buf) {
138 req.header("Content-Encoding", "gzip")
139 .body(gzip_buf.clone())
140 .send()
141 .await
142 } else {
143 req.body(body.to_vec()).send().await
144 }
145}
146
147pub fn spawn(
151 collector: Arc<SpanCollector>,
152 config: SpanExportConfig,
153 cancel: CancellationToken,
154) -> Option<std::thread::JoinHandle<()>> {
155 std::thread::Builder::new()
156 .name("span-exporter".to_string())
157 .spawn(move || {
158 let rt = tokio::runtime::Builder::new_current_thread()
159 .enable_all()
160 .build()
161 .expect("span exporter runtime");
162 rt.block_on(run(collector, config, cancel));
163 })
164 .ok()
165}
166
167#[cfg(feature = "monoio")]
178pub async fn run_local_flusher_monoio(
179 collector: Arc<SpanCollector>,
180 interval: Duration,
181 cancel: CancellationToken,
182) {
183 use monoio::time::MissedTickBehavior;
184
185 let mut interval = monoio::time::interval(interval);
186 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
187 interval.tick().await;
188
189 loop {
190 monoio::select! {
191 _ = interval.tick() => {
192 collector.flush_local();
193 }
194 _ = cancel.cancelled() => {
195 collector.flush_local();
196 return;
197 }
198 }
199 }
200}
201
202pub async fn run(
229 collector: Arc<SpanCollector>,
230 config: SpanExportConfig,
231 cancel: CancellationToken,
232) {
233 let url = format!("{}/v1/traces", config.endpoint.trim_end_matches('/'));
234
235 log::info!(
236 "Starting OTLP span exporter, endpoint={url}, service={}",
237 config.service_name
238 );
239
240 let attr_refs: Vec<(&str, &str)> = config
241 .resource_attributes
242 .iter()
243 .map(|(k, v)| (k.as_str(), v.as_str()))
244 .collect();
245 let resource = build_resource(&config.service_name, &attr_refs);
246
247 let client = match reqwest::Client::builder().timeout(config.timeout).build() {
248 Ok(c) => c,
249 Err(e) => {
250 log::error!("Failed to build HTTP client for span exporter: {e}");
251 return;
252 }
253 };
254
255 let mut interval = tokio::time::interval(config.interval);
256 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
257 interval.tick().await;
258
259 let mut consecutive_failures: u32 = 0;
260 let mut bufs = SpanExportBufs {
261 spans: Vec::with_capacity(config.max_batch_size),
262 encode: Vec::new(),
263 gzip: Vec::new(),
264 };
265
266 let ctx = SpanExportContext {
267 client: &client,
268 url: &url,
269 collector: &collector,
270 resource: &resource,
271 config: &config,
272 };
273
274 loop {
275 tokio::select! {
276 _ = interval.tick() => {}
277 _ = cancel.cancelled() => {
278 log::info!("Span exporter shutting down, performing final export");
279 export_once(&ctx, &mut bufs).await;
280 return;
281 }
282 }
283
284 if consecutive_failures > 0 {
285 let backoff = backoff_with_jitter(consecutive_failures);
286 log::debug!(
287 "Span export backing off {}ms (failures={consecutive_failures})",
288 backoff.as_millis()
289 );
290 tokio::select! {
291 _ = tokio::time::sleep(backoff) => {}
292 _ = cancel.cancelled() => {
293 export_once(&ctx, &mut bufs).await;
294 return;
295 }
296 }
297 }
298
299 bufs.spans.clear();
300 collector.drain_into(&mut bufs.spans);
301
302 if bufs.spans.is_empty() {
303 continue;
304 }
305
306 let total_drained = bufs.spans.len();
307 let dropped = total_drained.saturating_sub(config.max_batch_size);
308 bufs.spans.truncate(config.max_batch_size);
309 let span_count = bufs.spans.len();
310
311 if dropped > 0 {
312 log::debug!("Span export dropped {dropped} excess spans (exported {span_count})");
313 }
314
315 let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
316 let request = build_trace_export_request(&resource, &config.scope_name, otlp_spans);
317
318 bufs.encode.clear();
319 if let Err(e) = request.encode(&mut bufs.encode) {
320 log::warn!("Span protobuf encode failed: {e}");
321 continue;
322 }
323
324 let body_len = bufs.encode.len();
325
326 match send_otlp(&client, &url, &bufs.encode, &mut bufs.gzip, &config.headers).await {
327 Ok(resp) if resp.status().is_success() => {
328 consecutive_failures = 0;
329 log::debug!("Exported {span_count} spans ({body_len} bytes)");
330 }
331 Ok(resp) => {
332 consecutive_failures = consecutive_failures.saturating_add(1);
333 let status = resp.status();
334 let body = resp.text().await.unwrap_or_default();
335 log::warn!("Span export failed: status={status}, body={body}");
336 }
337 Err(e) => {
338 consecutive_failures = consecutive_failures.saturating_add(1);
339 log::warn!("Span export request failed: {e}");
340 }
341 }
342 }
343}
344
345struct SpanExportContext<'a> {
346 client: &'a reqwest::Client,
347 url: &'a str,
348 collector: &'a SpanCollector,
349 resource: &'a pb::Resource,
350 config: &'a SpanExportConfig,
351}
352
353struct SpanExportBufs {
354 spans: Vec<fast_telemetry::span::CompletedSpan>,
355 encode: Vec<u8>,
356 gzip: Vec<u8>,
357}
358
359async fn export_once(ctx: &SpanExportContext<'_>, bufs: &mut SpanExportBufs) {
360 bufs.spans.clear();
361 ctx.collector.drain_into(&mut bufs.spans);
362
363 if bufs.spans.is_empty() {
364 return;
365 }
366
367 let otlp_spans: Vec<_> = bufs.spans.iter().map(|s| s.to_otlp()).collect();
368 let request = build_trace_export_request(ctx.resource, &ctx.config.scope_name, otlp_spans);
369
370 bufs.encode.clear();
371 if let Err(e) = request.encode(&mut bufs.encode) {
372 log::warn!("Final span protobuf encode failed: {e}");
373 return;
374 }
375
376 match send_otlp(
377 ctx.client,
378 ctx.url,
379 &bufs.encode,
380 &mut bufs.gzip,
381 &ctx.config.headers,
382 )
383 .await
384 {
385 Ok(resp) if !resp.status().is_success() => {
386 let status = resp.status();
387 let body = resp.text().await.unwrap_or_default();
388 log::warn!("Final span export returned {status}: {body}");
389 }
390 Err(e) => log::warn!("Final span export failed: {e}"),
391 _ => {}
392 }
393}
394
395fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
396 let exp = consecutive_failures.min(10);
397 let base_ms = BASE_BACKOFF.as_millis() as u64;
398 let backoff_ms = base_ms
399 .saturating_mul(1u64 << exp)
400 .min(MAX_BACKOFF.as_millis() as u64);
401
402 let nanos = SystemTime::now()
403 .duration_since(std::time::UNIX_EPOCH)
404 .unwrap_or_default()
405 .subsec_nanos();
406 let jitter_range = (backoff_ms / 4).max(1);
407 let jitter = (nanos as u64 % (jitter_range * 2 + 1)).saturating_sub(jitter_range);
408 let final_ms = backoff_ms.saturating_add(jitter);
409
410 Duration::from_millis(final_ms)
411}