Skip to main content

rolly_tokio/
exporter.rs

1use std::time::Duration;
2
3use bytes::Bytes;
4use tokio::sync::mpsc;
5
6pub use rolly::BackpressureStrategy;
7
8/// Message types sent to the exporter background task.
9#[derive(Debug)]
10pub enum ExportMessage {
11    Traces(Bytes),
12    Logs(Bytes),
13    Metrics(Bytes),
14    Flush(tokio::sync::oneshot::Sender<()>),
15    Shutdown(tokio::sync::oneshot::Sender<()>),
16}
17
18/// Configuration for the exporter.
19///
20/// Use `Default` for standard values (1024 channel, 512 batch, 1s flush, 4 concurrent).
21#[derive(Debug, Clone)]
22pub struct ExporterConfig {
23    pub traces_url: Option<String>,
24    pub logs_url: Option<String>,
25    pub metrics_url: Option<String>,
26    pub channel_capacity: usize,
27    pub batch_size: usize,
28    pub flush_interval: Duration,
29    pub max_concurrent_exports: usize,
30    pub backpressure_strategy: BackpressureStrategy,
31}
32
33impl Default for ExporterConfig {
34    fn default() -> Self {
35        Self {
36            traces_url: None,
37            logs_url: None,
38            metrics_url: None,
39            channel_capacity: 1024,
40            batch_size: 512,
41            flush_interval: Duration::from_secs(1),
42            max_concurrent_exports: 4,
43            backpressure_strategy: BackpressureStrategy::Drop,
44        }
45    }
46}
47
48/// Handle to the exporter background task.
49#[derive(Clone)]
50pub struct Exporter {
51    tx: mpsc::Sender<ExportMessage>,
52    #[allow(dead_code)] // Only one strategy currently; field reserved for future variants
53    backpressure_strategy: BackpressureStrategy,
54}
55
56impl Exporter {
57    /// Start the exporter background task. Returns a handle for sending data.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the HTTP client cannot be built (e.g. TLS
62    /// backend misconfiguration) or if no tokio runtime is active.
63    pub fn start(config: ExporterConfig) -> Result<Self, StartError> {
64        // Verify a tokio runtime is available before doing anything.
65        let _handle = tokio::runtime::Handle::try_current().map_err(|_| StartError::NoRuntime)?;
66
67        if config.channel_capacity == 0 {
68            return Err(StartError::InvalidConfig("channel_capacity must be > 0"));
69        }
70        if config.flush_interval.is_zero() {
71            return Err(StartError::InvalidConfig("flush_interval must be > 0"));
72        }
73
74        let (tx, rx) = mpsc::channel(config.channel_capacity);
75        let client = reqwest::Client::builder()
76            .timeout(Duration::from_secs(10))
77            .build()
78            .map_err(StartError::HttpClient)?;
79        let batch_config = BatchConfig {
80            traces_url: config.traces_url,
81            logs_url: config.logs_url,
82            metrics_url: config.metrics_url,
83            batch_size: config.batch_size,
84        };
85        tokio::spawn(exporter_loop(
86            rx,
87            client,
88            batch_config,
89            config.flush_interval,
90            config.max_concurrent_exports.max(1),
91        ));
92        Ok(Self {
93            tx,
94            backpressure_strategy: config.backpressure_strategy,
95        })
96    }
97
98    /// Create an exporter for testing that doesn't spawn the HTTP loop.
99    /// Returns the exporter and receiver so tests can read messages directly.
100    #[cfg(any(test, feature = "_bench"))]
101    pub fn start_test() -> (Self, mpsc::Receiver<ExportMessage>) {
102        Self::start_test_with_capacity(64, BackpressureStrategy::Drop)
103    }
104
105    /// Create a test exporter with a specific channel capacity and backpressure strategy.
106    #[cfg(any(test, feature = "_bench"))]
107    pub fn start_test_with_capacity(
108        capacity: usize,
109        strategy: BackpressureStrategy,
110    ) -> (Self, mpsc::Receiver<ExportMessage>) {
111        let (tx, rx) = mpsc::channel(capacity);
112        (
113            Self {
114                tx,
115                backpressure_strategy: strategy,
116            },
117            rx,
118        )
119    }
120
121    /// Send encoded trace data to the exporter (non-blocking).
122    pub fn send_traces(&self, data: Vec<u8>) {
123        self.try_send(ExportMessage::Traces(Bytes::from(data)));
124    }
125
126    /// Send encoded log data to the exporter.
127    pub fn send_logs(&self, data: Vec<u8>) {
128        self.try_send(ExportMessage::Logs(Bytes::from(data)));
129    }
130
131    /// Send encoded metrics data to the exporter.
132    pub fn send_metrics(&self, data: Vec<u8>) {
133        self.try_send(ExportMessage::Metrics(Bytes::from(data)));
134    }
135
136    fn try_send(&self, msg: ExportMessage) {
137        match self.tx.try_send(msg) {
138            Ok(()) => {}
139            Err(mpsc::error::TrySendError::Full(_)) => {
140                rolly::increment_dropped_total();
141            }
142            Err(mpsc::error::TrySendError::Closed(_)) => {
143                rolly::increment_dropped_total();
144            }
145        }
146    }
147
148    /// Flush all pending data. Blocks until the exporter has processed everything.
149    pub async fn flush(&self) {
150        let (tx, rx) = tokio::sync::oneshot::channel();
151        if self.tx.send(ExportMessage::Flush(tx)).await.is_ok() {
152            let _ = rx.await;
153        }
154    }
155
156    /// Signal the exporter to stop after draining remaining messages.
157    /// Waits for the exporter loop to finish processing before returning.
158    pub async fn shutdown(&self) {
159        let (tx, rx) = tokio::sync::oneshot::channel();
160        if self.tx.send(ExportMessage::Shutdown(tx)).await.is_ok() {
161            let _ = rx.await;
162        }
163    }
164}
165
166impl rolly::TelemetrySink for Exporter {
167    fn send_traces(&self, data: Vec<u8>) {
168        self.send_traces(data);
169    }
170    fn send_logs(&self, data: Vec<u8>) {
171        self.send_logs(data);
172    }
173    fn send_metrics(&self, data: Vec<u8>) {
174        self.send_metrics(data);
175    }
176}
177
178// ── Error types ─────────────────────────────────────────────────────────
179
180/// Errors that can occur when starting the exporter.
181#[derive(Debug)]
182#[non_exhaustive]
183pub enum StartError {
184    /// The HTTP client could not be built (e.g. TLS misconfiguration).
185    HttpClient(reqwest::Error),
186    /// No tokio runtime is active. Call from within a `#[tokio::main]`
187    /// or `tokio::runtime::Runtime` context.
188    NoRuntime,
189    /// A configuration value is invalid (e.g. zero capacity or zero interval).
190    InvalidConfig(&'static str),
191}
192
193impl std::fmt::Display for StartError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        match self {
196            Self::HttpClient(e) => write!(f, "failed to build HTTP client: {}", e),
197            Self::NoRuntime => write!(f, "no tokio runtime active"),
198            Self::InvalidConfig(msg) => write!(f, "invalid exporter config: {}", msg),
199        }
200    }
201}
202
203impl std::error::Error for StartError {
204    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
205        match self {
206            Self::HttpClient(e) => Some(e),
207            Self::NoRuntime | Self::InvalidConfig(_) => None,
208        }
209    }
210}
211
212// ── Exporter background loop ────────────────────────────────────────────
213
214const RETRY_DELAYS: [Duration; 3] = [
215    Duration::from_millis(100),
216    Duration::from_millis(400),
217    Duration::from_millis(1600),
218];
219
220/// Immutable config for batch accumulation.
221struct BatchConfig {
222    traces_url: Option<String>,
223    logs_url: Option<String>,
224    metrics_url: Option<String>,
225    batch_size: usize,
226}
227
228/// Mutable state for batch accumulation and in-flight export tasks.
229struct BatchState {
230    traces: Vec<Bytes>,
231    logs: Vec<Bytes>,
232    metrics: Vec<Bytes>,
233    join_set: tokio::task::JoinSet<()>,
234}
235
236impl BatchState {
237    fn new() -> Self {
238        Self {
239            traces: Vec::new(),
240            logs: Vec::new(),
241            metrics: Vec::new(),
242            join_set: tokio::task::JoinSet::new(),
243        }
244    }
245
246    fn batches_empty(&self) -> bool {
247        self.traces.is_empty() && self.logs.is_empty() && self.metrics.is_empty()
248    }
249
250    /// Flush all batches and drain in-flight tasks, retrying until
251    /// all local buffers are empty (handles semaphore contention).
252    async fn flush_and_drain(
253        &mut self,
254        config: &BatchConfig,
255        client: &reqwest::Client,
256        semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
257    ) {
258        for _ in 0..64 {
259            self.flush_all(config, client, semaphore);
260            self.drain().await;
261            if self.batches_empty() {
262                return;
263            }
264            // Yield to let in-flight tasks complete and release permits.
265            tokio::task::yield_now().await;
266        }
267    }
268
269    /// Route a single message into the appropriate batch, flushing when
270    /// the batch reaches `config.batch_size`.
271    fn collect(
272        &mut self,
273        msg: ExportMessage,
274        config: &BatchConfig,
275        client: &reqwest::Client,
276        semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
277    ) {
278        match msg {
279            ExportMessage::Traces(data) => {
280                self.traces.push(data);
281                if self.traces.len() >= config.batch_size {
282                    flush_batch(
283                        &mut self.traces,
284                        config.traces_url.as_deref(),
285                        client,
286                        semaphore,
287                        &mut self.join_set,
288                    );
289                }
290            }
291            ExportMessage::Logs(data) => {
292                self.logs.push(data);
293                if self.logs.len() >= config.batch_size {
294                    flush_batch(
295                        &mut self.logs,
296                        config.logs_url.as_deref(),
297                        client,
298                        semaphore,
299                        &mut self.join_set,
300                    );
301                }
302            }
303            ExportMessage::Metrics(data) => {
304                self.metrics.push(data);
305                if self.metrics.len() >= config.batch_size {
306                    flush_batch(
307                        &mut self.metrics,
308                        config.metrics_url.as_deref(),
309                        client,
310                        semaphore,
311                        &mut self.join_set,
312                    );
313                }
314            }
315            // Control messages handled by the loop, not here
316            ExportMessage::Flush(_) | ExportMessage::Shutdown(_) => {}
317        }
318    }
319
320    /// Flush all partial batches into export tasks.
321    fn flush_all(
322        &mut self,
323        config: &BatchConfig,
324        client: &reqwest::Client,
325        semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
326    ) {
327        flush_batch(
328            &mut self.traces,
329            config.traces_url.as_deref(),
330            client,
331            semaphore,
332            &mut self.join_set,
333        );
334        flush_batch(
335            &mut self.logs,
336            config.logs_url.as_deref(),
337            client,
338            semaphore,
339            &mut self.join_set,
340        );
341        flush_batch(
342            &mut self.metrics,
343            config.metrics_url.as_deref(),
344            client,
345            semaphore,
346            &mut self.join_set,
347        );
348    }
349
350    /// Wait for all in-flight export tasks to complete.
351    async fn drain(&mut self) {
352        while self.join_set.join_next().await.is_some() {}
353    }
354
355    /// Reap completed tasks without blocking.
356    fn reap(&mut self) {
357        while self.join_set.try_join_next().is_some() {}
358    }
359}
360
361async fn exporter_loop(
362    mut rx: mpsc::Receiver<ExportMessage>,
363    client: reqwest::Client,
364    config: BatchConfig,
365    flush_interval: Duration,
366    max_concurrent_exports: usize,
367) {
368    use std::sync::Arc;
369    use tokio::sync::Semaphore;
370
371    let semaphore = Arc::new(Semaphore::new(max_concurrent_exports));
372    let mut state = BatchState::new();
373    let mut interval = tokio::time::interval(flush_interval);
374    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
375    interval.tick().await;
376
377    loop {
378        tokio::select! {
379            biased;
380            msg = rx.recv() => {
381                match msg {
382                    Some(ExportMessage::Flush(done)) => {
383                        state.flush_and_drain(&config, &client, &semaphore).await;
384                        let _ = done.send(());
385                    }
386                    Some(ExportMessage::Shutdown(done)) => {
387                        state.flush_and_drain(&config, &client, &semaphore).await;
388                        let _ = done.send(());
389                        break;
390                    }
391                    Some(msg) => {
392                        state.collect(msg, &config, &client, &semaphore);
393                    }
394                    None => {
395                        state.flush_and_drain(&config, &client, &semaphore).await;
396                        break;
397                    }
398                }
399            }
400            _ = interval.tick() => {
401                state.flush_all(&config, &client, &semaphore);
402            }
403        }
404
405        state.reap();
406    }
407}
408
409/// Concatenate batch payloads and spawn a concurrent POST task.
410///
411/// The semaphore permit is acquired synchronously (try_acquire) before
412/// spawning so that payloads cannot accumulate in an unbounded JoinSet
413/// when the collector is slow. If no permit is available, the batch is
414/// left for the next flush cycle.
415fn flush_batch(
416    batch: &mut Vec<Bytes>,
417    url: Option<&str>,
418    client: &reqwest::Client,
419    semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
420    join_set: &mut tokio::task::JoinSet<()>,
421) {
422    if batch.is_empty() {
423        return;
424    }
425    let url = match url {
426        Some(u) => u,
427        None => {
428            batch.clear();
429            return;
430        }
431    };
432
433    let permit = match semaphore.clone().try_acquire_owned() {
434        Ok(permit) => permit,
435        Err(_) => return,
436    };
437
438    let total_len: usize = batch.iter().map(|b| b.len()).sum();
439    let mut payload = Vec::with_capacity(total_len);
440    for item in batch.drain(..) {
441        payload.extend_from_slice(&item);
442    }
443    let data = Bytes::from(payload);
444    let client = client.clone();
445    let url = url.to_string();
446
447    join_set.spawn(async move {
448        let _permit = permit;
449        post_with_retry(&client, &url, data).await;
450    });
451}
452
453/// POST with exponential backoff. On total failure, drop the batch.
454///
455/// Uses `eprintln!` intentionally — not `tracing::warn!` — because this runs
456/// inside the telemetry pipeline. Using tracing here would re-enter the OtlpLayer
457/// and cause infinite recursion.
458async fn post_with_retry(client: &reqwest::Client, url: &str, data: Bytes) {
459    for (attempt, delay) in RETRY_DELAYS.iter().enumerate() {
460        match client
461            .post(url)
462            .header("Content-Type", "application/x-protobuf")
463            .body(data.clone()) // Bytes::clone is O(1) — just an Arc bump
464            .send()
465            .await
466        {
467            Ok(resp) if resp.status().is_success() => return,
468            Ok(resp) => {
469                eprintln!(
470                    "rolly-tokio: export attempt {}/{} to {} failed: HTTP {}",
471                    attempt + 1,
472                    RETRY_DELAYS.len(),
473                    url,
474                    resp.status()
475                );
476            }
477            Err(e) => {
478                eprintln!(
479                    "rolly-tokio: export attempt {}/{} to {} failed: {}",
480                    attempt + 1,
481                    RETRY_DELAYS.len(),
482                    url,
483                    e
484                );
485            }
486        }
487        tokio::time::sleep(*delay).await;
488    }
489    eprintln!(
490        "rolly-tokio: dropping batch after {} retries",
491        RETRY_DELAYS.len()
492    );
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn drop_counter_is_callable() {
501        let _count = rolly::telemetry_dropped_total();
502    }
503
504    #[tokio::test]
505    async fn drop_counter_increments_on_channel_full() {
506        let before = rolly::telemetry_dropped_total();
507        // Capacity 2: fill channel, then the 3rd send should drop
508        let (exporter, _rx) = Exporter::start_test_with_capacity(2, BackpressureStrategy::Drop);
509        exporter.send_traces(vec![0x0A]);
510        exporter.send_traces(vec![0x0A]);
511        // Channel is full now
512        exporter.send_traces(vec![0x0A]);
513        let delta = rolly::telemetry_dropped_total() - before;
514        assert!(delta >= 1, "expected at least 1 drop, got {}", delta);
515    }
516
517    #[tokio::test]
518    async fn drop_counter_increments_for_logs_and_traces() {
519        let before = rolly::telemetry_dropped_total();
520        let (exporter, _rx) = Exporter::start_test_with_capacity(1, BackpressureStrategy::Drop);
521        exporter.send_traces(vec![0x0A]); // fills the channel
522        exporter.send_traces(vec![0x0A]); // dropped
523        exporter.send_logs(vec![0x0A]); // dropped
524        let delta = rolly::telemetry_dropped_total() - before;
525        assert!(delta >= 2, "expected at least 2 drops, got {}", delta);
526    }
527
528    fn test_config(traces_url: Option<String>, logs_url: Option<String>) -> ExporterConfig {
529        ExporterConfig {
530            traces_url,
531            logs_url,
532            metrics_url: None,
533            channel_capacity: 16,
534            batch_size: 512,
535            flush_interval: Duration::from_secs(60),
536            max_concurrent_exports: 4,
537            backpressure_strategy: BackpressureStrategy::Drop,
538        }
539    }
540
541    #[tokio::test]
542    async fn exporter_queues_and_flushes_without_panic() {
543        let config = test_config(
544            Some("http://127.0.0.1:1/v1/traces".to_string()),
545            Some("http://127.0.0.1:1/v1/logs".to_string()),
546        );
547        let exporter = Exporter::start(config).unwrap();
548
549        exporter.send_traces(vec![0x0A, 0x00]);
550        exporter.send_logs(vec![0x0A, 0x00]);
551
552        exporter.shutdown().await;
553    }
554
555    #[tokio::test]
556    async fn exporter_flush_completes() {
557        let config = test_config(
558            Some("http://127.0.0.1:1/v1/traces".to_string()),
559            Some("http://127.0.0.1:1/v1/logs".to_string()),
560        );
561        let exporter = Exporter::start(config).unwrap();
562
563        tokio::time::timeout(Duration::from_secs(5), exporter.flush())
564            .await
565            .expect("flush should complete within timeout");
566
567        exporter.shutdown().await;
568    }
569
570    async fn respond_with_status(listener: &tokio::net::TcpListener, status: &str) {
571        use tokio::io::{AsyncReadExt, AsyncWriteExt};
572        let (mut stream, _) = listener.accept().await.unwrap();
573        let mut buf = [0u8; 65536];
574        let _ = stream.read(&mut buf).await;
575        let resp = format!(
576            "HTTP/1.1 {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
577            status
578        );
579        stream.write_all(resp.as_bytes()).await.unwrap();
580    }
581
582    #[tokio::test]
583    async fn post_with_retry_succeeds_on_first_attempt() {
584        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
585        let addr = listener.local_addr().unwrap();
586
587        tokio::spawn(async move {
588            respond_with_status(&listener, "200 OK").await;
589        });
590
591        let client = reqwest::Client::new();
592        let url = format!("http://{}/v1/traces", addr);
593        post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
594    }
595
596    #[tokio::test]
597    async fn post_with_retry_retries_on_500_then_succeeds() {
598        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
599        let addr = listener.local_addr().unwrap();
600
601        tokio::spawn(async move {
602            respond_with_status(&listener, "500 Internal Server Error").await;
603            respond_with_status(&listener, "200 OK").await;
604        });
605
606        let client = reqwest::Client::new();
607        let url = format!("http://{}/v1/traces", addr);
608        post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
609    }
610
611    #[tokio::test]
612    async fn post_with_retry_gives_up_after_all_retries() {
613        // Takes ~2.1s due to real retry delays (100ms + 400ms + 1600ms)
614        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
615        let addr = listener.local_addr().unwrap();
616
617        tokio::spawn(async move {
618            for _ in 0..3 {
619                respond_with_status(&listener, "500 Internal Server Error").await;
620            }
621        });
622
623        let client = reqwest::Client::new();
624        let url = format!("http://{}/v1/traces", addr);
625        post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
626        // Returns without panic after exhausting retries
627    }
628
629    #[tokio::test]
630    async fn exporter_sends_to_correct_url_paths() {
631        use tokio::io::{AsyncReadExt, AsyncWriteExt};
632
633        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
634        let addr = listener.local_addr().unwrap();
635
636        let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
637
638        tokio::spawn(async move {
639            loop {
640                let Ok((mut stream, _)) = listener.accept().await else {
641                    break;
642                };
643                let path_tx = path_tx.clone();
644                tokio::spawn(async move {
645                    let mut buf = [0u8; 4096];
646                    let n = stream.read(&mut buf).await.unwrap_or(0);
647                    let request = String::from_utf8_lossy(&buf[..n]);
648                    let path = request
649                        .lines()
650                        .next()
651                        .unwrap_or("")
652                        .split_whitespace()
653                        .nth(1)
654                        .unwrap_or("")
655                        .to_string();
656                    let _ = path_tx.send(path).await;
657                    let _ = stream
658                        .write_all(
659                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
660                        )
661                        .await;
662                });
663            }
664        });
665
666        let config = test_config(
667            Some(format!("http://{}/v1/traces", addr)),
668            Some(format!("http://{}/v1/logs", addr)),
669        );
670        let exporter = Exporter::start(config).unwrap();
671
672        exporter.send_traces(vec![0x0A, 0x00]);
673        exporter.send_logs(vec![0x0A, 0x00]);
674        exporter.flush().await;
675
676        let mut paths = Vec::new();
677        while let Ok(Some(path)) =
678            tokio::time::timeout(Duration::from_secs(5), path_rx.recv()).await
679        {
680            paths.push(path);
681            if paths.len() >= 2 {
682                break;
683            }
684        }
685
686        assert!(
687            paths.contains(&"/v1/traces".to_string()),
688            "missing /v1/traces, got {:?}",
689            paths
690        );
691        assert!(
692            paths.contains(&"/v1/logs".to_string()),
693            "missing /v1/logs, got {:?}",
694            paths
695        );
696
697        exporter.shutdown().await;
698    }
699
700    #[tokio::test]
701    async fn exporter_skips_logs_when_no_logs_url() {
702        use tokio::io::{AsyncReadExt, AsyncWriteExt};
703
704        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
705        let addr = listener.local_addr().unwrap();
706
707        let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
708
709        tokio::spawn(async move {
710            loop {
711                let Ok((mut stream, _)) = listener.accept().await else {
712                    break;
713                };
714                let path_tx = path_tx.clone();
715                tokio::spawn(async move {
716                    let mut buf = [0u8; 4096];
717                    let n = stream.read(&mut buf).await.unwrap_or(0);
718                    let request = String::from_utf8_lossy(&buf[..n]);
719                    let path = request
720                        .lines()
721                        .next()
722                        .unwrap_or("")
723                        .split_whitespace()
724                        .nth(1)
725                        .unwrap_or("")
726                        .to_string();
727                    let _ = path_tx.send(path).await;
728                    let _ = stream
729                        .write_all(
730                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
731                        )
732                        .await;
733                });
734            }
735        });
736
737        // Only traces_url set, logs_url is None
738        let config = test_config(Some(format!("http://{}/v1/traces", addr)), None);
739        let exporter = Exporter::start(config).unwrap();
740
741        exporter.send_traces(vec![0x0A, 0x00]);
742        exporter.send_logs(vec![0x0A, 0x00]); // should be silently dropped
743        exporter.flush().await;
744
745        let mut paths = Vec::new();
746        while let Ok(Some(path)) =
747            tokio::time::timeout(Duration::from_millis(500), path_rx.recv()).await
748        {
749            paths.push(path);
750        }
751
752        assert!(
753            paths.contains(&"/v1/traces".to_string()),
754            "expected /v1/traces, got {:?}",
755            paths
756        );
757        assert!(
758            !paths.contains(&"/v1/logs".to_string()),
759            "should NOT have received /v1/logs, got {:?}",
760            paths
761        );
762
763        exporter.shutdown().await;
764    }
765
766    #[test]
767    fn exporter_config_with_batch_settings() {
768        let config = ExporterConfig {
769            traces_url: None,
770            logs_url: None,
771            channel_capacity: 16,
772            metrics_url: None,
773            batch_size: 100,
774            flush_interval: Duration::from_millis(500),
775            max_concurrent_exports: 2,
776            backpressure_strategy: BackpressureStrategy::Drop,
777        };
778        assert_eq!(config.batch_size, 100);
779        assert_eq!(config.flush_interval, Duration::from_millis(500));
780        assert_eq!(config.max_concurrent_exports, 2);
781    }
782
783    #[tokio::test]
784    async fn exporter_batches_traces_up_to_batch_size() {
785        use tokio::io::{AsyncReadExt, AsyncWriteExt};
786
787        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
788        let addr = listener.local_addr().unwrap();
789
790        let (body_tx, mut body_rx) = mpsc::channel::<Vec<u8>>(16);
791
792        tokio::spawn(async move {
793            loop {
794                let Ok((mut stream, _)) = listener.accept().await else {
795                    break;
796                };
797                let body_tx = body_tx.clone();
798                tokio::spawn(async move {
799                    let mut buf = vec![0u8; 65536];
800                    let n = stream.read(&mut buf).await.unwrap_or(0);
801                    buf.truncate(n);
802                    // Extract body after \r\n\r\n
803                    let request = &buf[..n];
804                    if let Some(pos) = request.windows(4).position(|w| w == b"\r\n\r\n") {
805                        let body = request[pos + 4..].to_vec();
806                        let _ = body_tx.send(body).await;
807                    }
808                    let _ = stream
809                        .write_all(
810                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
811                        )
812                        .await;
813                });
814            }
815        });
816
817        let config = ExporterConfig {
818            traces_url: Some(format!("http://{}/v1/traces", addr)),
819            logs_url: None,
820            metrics_url: None,
821            channel_capacity: 16,
822            batch_size: 3,
823            flush_interval: Duration::from_secs(60),
824            max_concurrent_exports: 4,
825            backpressure_strategy: BackpressureStrategy::Drop,
826        };
827        let exporter = Exporter::start(config).unwrap();
828
829        // Send exactly batch_size items
830        let payload = vec![0x0A, 0x00]; // minimal protobuf
831        exporter.send_traces(payload.clone());
832        exporter.send_traces(payload.clone());
833        exporter.send_traces(payload.clone());
834        exporter.flush().await;
835
836        // Should receive exactly 1 HTTP POST containing all 3 concatenated
837        let body = tokio::time::timeout(Duration::from_secs(5), body_rx.recv())
838            .await
839            .expect("timeout waiting for POST")
840            .expect("channel closed");
841
842        // 3 * 2 bytes = 6 bytes total (concatenated payloads)
843        assert_eq!(body.len(), 6);
844
845        exporter.shutdown().await;
846    }
847
848    #[tokio::test]
849    async fn exporter_flushes_on_interval() {
850        use tokio::io::{AsyncReadExt, AsyncWriteExt};
851
852        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
853        let addr = listener.local_addr().unwrap();
854
855        let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
856
857        tokio::spawn(async move {
858            loop {
859                let Ok((mut stream, _)) = listener.accept().await else {
860                    break;
861                };
862                let done_tx = done_tx.clone();
863                tokio::spawn(async move {
864                    let mut buf = [0u8; 65536];
865                    let _ = stream.read(&mut buf).await;
866                    let _ = stream
867                        .write_all(
868                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
869                        )
870                        .await;
871                    let _ = done_tx.send(()).await;
872                });
873            }
874        });
875
876        let config = ExporterConfig {
877            traces_url: Some(format!("http://{}/v1/traces", addr)),
878            logs_url: None,
879            metrics_url: None,
880            channel_capacity: 16,
881            batch_size: 100, // large batch size — won't trigger batch flush
882            flush_interval: Duration::from_millis(200), // short interval
883            max_concurrent_exports: 4,
884            backpressure_strategy: BackpressureStrategy::Drop,
885        };
886        let exporter = Exporter::start(config).unwrap();
887
888        // Send 1 item — won't reach batch_size, must flush on interval
889        exporter.send_traces(vec![0x0A, 0x00]);
890
891        // Should arrive within ~500ms (interval + processing)
892        let result = tokio::time::timeout(Duration::from_millis(1000), done_rx.recv()).await;
893        assert!(result.is_ok(), "data should arrive via interval flush");
894
895        exporter.shutdown().await;
896    }
897
898    #[tokio::test]
899    async fn exporter_explicit_flush_drains_batch() {
900        use tokio::io::{AsyncReadExt, AsyncWriteExt};
901
902        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
903        let addr = listener.local_addr().unwrap();
904
905        let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
906
907        tokio::spawn(async move {
908            loop {
909                let Ok((mut stream, _)) = listener.accept().await else {
910                    break;
911                };
912                let done_tx = done_tx.clone();
913                tokio::spawn(async move {
914                    let mut buf = [0u8; 65536];
915                    let _ = stream.read(&mut buf).await;
916                    let _ = stream
917                        .write_all(
918                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
919                        )
920                        .await;
921                    let _ = done_tx.send(()).await;
922                });
923            }
924        });
925
926        let config = ExporterConfig {
927            traces_url: Some(format!("http://{}/v1/traces", addr)),
928            logs_url: None,
929            metrics_url: None,
930            channel_capacity: 16,
931            batch_size: 100, // large — won't trigger automatically
932            flush_interval: Duration::from_secs(60), // long — won't trigger on time
933            max_concurrent_exports: 4,
934            backpressure_strategy: BackpressureStrategy::Drop,
935        };
936        let exporter = Exporter::start(config).unwrap();
937
938        exporter.send_traces(vec![0x0A, 0x00]);
939        exporter.flush().await;
940
941        // Should have been sent by now
942        let result = tokio::time::timeout(Duration::from_millis(500), done_rx.recv()).await;
943        assert!(result.is_ok(), "flush should drain pending batch");
944
945        exporter.shutdown().await;
946    }
947
948    #[tokio::test]
949    async fn exporter_shutdown_drains_remaining_batch() {
950        use tokio::io::{AsyncReadExt, AsyncWriteExt};
951
952        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
953        let addr = listener.local_addr().unwrap();
954
955        let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
956
957        tokio::spawn(async move {
958            loop {
959                let Ok((mut stream, _)) = listener.accept().await else {
960                    break;
961                };
962                let done_tx = done_tx.clone();
963                tokio::spawn(async move {
964                    let mut buf = [0u8; 65536];
965                    let _ = stream.read(&mut buf).await;
966                    let _ = stream
967                        .write_all(
968                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
969                        )
970                        .await;
971                    let _ = done_tx.send(()).await;
972                });
973            }
974        });
975
976        let config = ExporterConfig {
977            traces_url: Some(format!("http://{}/v1/traces", addr)),
978            logs_url: None,
979            metrics_url: None,
980            channel_capacity: 16,
981            batch_size: 100,
982            flush_interval: Duration::from_secs(60),
983            max_concurrent_exports: 4,
984            backpressure_strategy: BackpressureStrategy::Drop,
985        };
986        let exporter = Exporter::start(config).unwrap();
987
988        exporter.send_traces(vec![0x0A, 0x00]);
989        exporter.shutdown().await;
990
991        // Shutdown should have drained the batch
992        let result = tokio::time::timeout(Duration::from_millis(500), done_rx.recv()).await;
993        assert!(result.is_ok(), "shutdown should drain remaining batch");
994    }
995
996    #[tokio::test]
997    async fn exporter_batches_traces_and_logs_independently() {
998        use tokio::io::{AsyncReadExt, AsyncWriteExt};
999
1000        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1001        let addr = listener.local_addr().unwrap();
1002
1003        let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
1004
1005        tokio::spawn(async move {
1006            loop {
1007                let Ok((mut stream, _)) = listener.accept().await else {
1008                    break;
1009                };
1010                let path_tx = path_tx.clone();
1011                tokio::spawn(async move {
1012                    let mut buf = [0u8; 4096];
1013                    let n = stream.read(&mut buf).await.unwrap_or(0);
1014                    let request = String::from_utf8_lossy(&buf[..n]);
1015                    let path = request
1016                        .lines()
1017                        .next()
1018                        .unwrap_or("")
1019                        .split_whitespace()
1020                        .nth(1)
1021                        .unwrap_or("")
1022                        .to_string();
1023                    let _ = path_tx.send(path).await;
1024                    let _ = stream
1025                        .write_all(
1026                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1027                        )
1028                        .await;
1029                });
1030            }
1031        });
1032
1033        let config = ExporterConfig {
1034            traces_url: Some(format!("http://{}/v1/traces", addr)),
1035            logs_url: Some(format!("http://{}/v1/logs", addr)),
1036            metrics_url: None,
1037            channel_capacity: 16,
1038            batch_size: 2,
1039            flush_interval: Duration::from_secs(60),
1040            max_concurrent_exports: 4,
1041            backpressure_strategy: BackpressureStrategy::Drop,
1042        };
1043        let exporter = Exporter::start(config).unwrap();
1044
1045        // Fill trace batch (batch_size = 2)
1046        exporter.send_traces(vec![0x0A, 0x00]);
1047        exporter.send_traces(vec![0x0A, 0x00]);
1048        // Fill log batch (batch_size = 2)
1049        exporter.send_logs(vec![0x0A, 0x00]);
1050        exporter.send_logs(vec![0x0A, 0x00]);
1051        exporter.flush().await;
1052
1053        let mut paths = Vec::new();
1054        while let Ok(Some(path)) =
1055            tokio::time::timeout(Duration::from_secs(5), path_rx.recv()).await
1056        {
1057            paths.push(path);
1058            if paths.len() >= 2 {
1059                break;
1060            }
1061        }
1062
1063        assert!(
1064            paths.contains(&"/v1/traces".to_string()),
1065            "missing /v1/traces, got {:?}",
1066            paths
1067        );
1068        assert!(
1069            paths.contains(&"/v1/logs".to_string()),
1070            "missing /v1/logs, got {:?}",
1071            paths
1072        );
1073
1074        exporter.shutdown().await;
1075    }
1076
1077    #[tokio::test]
1078    async fn exporter_sends_concurrently_not_sequentially() {
1079        use std::sync::atomic::{AtomicUsize, Ordering as AtomOrd};
1080        use std::sync::Arc;
1081        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1082
1083        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1084        let addr = listener.local_addr().unwrap();
1085
1086        let concurrent = Arc::new(AtomicUsize::new(0));
1087        let max_concurrent = Arc::new(AtomicUsize::new(0));
1088
1089        let concurrent_c = concurrent.clone();
1090        let max_concurrent_c = max_concurrent.clone();
1091
1092        tokio::spawn(async move {
1093            loop {
1094                let Ok((mut stream, _)) = listener.accept().await else {
1095                    break;
1096                };
1097                let conc = concurrent_c.clone();
1098                let max_conc = max_concurrent_c.clone();
1099                tokio::spawn(async move {
1100                    let mut buf = [0u8; 65536];
1101                    let _ = stream.read(&mut buf).await;
1102                    let current = conc.fetch_add(1, AtomOrd::SeqCst) + 1;
1103                    max_conc.fetch_max(current, AtomOrd::SeqCst);
1104                    // Hold the connection open for a bit to allow overlap
1105                    tokio::time::sleep(Duration::from_millis(100)).await;
1106                    conc.fetch_sub(1, AtomOrd::SeqCst);
1107                    let _ = stream
1108                        .write_all(
1109                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1110                        )
1111                        .await;
1112                });
1113            }
1114        });
1115
1116        let config = ExporterConfig {
1117            traces_url: Some(format!("http://{}/v1/traces", addr)),
1118            logs_url: None,
1119            metrics_url: None,
1120            channel_capacity: 64,
1121            batch_size: 1, // flush on every message — creates many concurrent posts
1122            flush_interval: Duration::from_secs(60),
1123            max_concurrent_exports: 8,
1124            backpressure_strategy: BackpressureStrategy::Drop,
1125        };
1126        let exporter = Exporter::start(config).unwrap();
1127
1128        // Send many items rapidly to trigger concurrent exports
1129        for _ in 0..8 {
1130            exporter.send_traces(vec![0x0A, 0x00]);
1131        }
1132        exporter.flush().await;
1133
1134        assert!(
1135            max_concurrent.load(AtomOrd::SeqCst) > 1,
1136            "expected concurrent exports > 1, got {}",
1137            max_concurrent.load(AtomOrd::SeqCst)
1138        );
1139
1140        exporter.shutdown().await;
1141    }
1142
1143    #[tokio::test]
1144    async fn exporter_limits_concurrent_exports() {
1145        use std::sync::atomic::{AtomicUsize, Ordering as AtomOrd};
1146        use std::sync::Arc;
1147        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1148
1149        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1150        let addr = listener.local_addr().unwrap();
1151
1152        let max_concurrent = Arc::new(AtomicUsize::new(0));
1153        let concurrent = Arc::new(AtomicUsize::new(0));
1154
1155        let concurrent_c = concurrent.clone();
1156        let max_concurrent_c = max_concurrent.clone();
1157
1158        tokio::spawn(async move {
1159            loop {
1160                let Ok((mut stream, _)) = listener.accept().await else {
1161                    break;
1162                };
1163                let conc = concurrent_c.clone();
1164                let max_conc = max_concurrent_c.clone();
1165                tokio::spawn(async move {
1166                    let mut buf = [0u8; 65536];
1167                    let _ = stream.read(&mut buf).await;
1168                    let current = conc.fetch_add(1, AtomOrd::SeqCst) + 1;
1169                    max_conc.fetch_max(current, AtomOrd::SeqCst);
1170                    tokio::time::sleep(Duration::from_millis(100)).await;
1171                    conc.fetch_sub(1, AtomOrd::SeqCst);
1172                    let _ = stream
1173                        .write_all(
1174                            b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1175                        )
1176                        .await;
1177                });
1178            }
1179        });
1180
1181        let max_exports = 2;
1182        let config = ExporterConfig {
1183            traces_url: Some(format!("http://{}/v1/traces", addr)),
1184            logs_url: None,
1185            metrics_url: None,
1186            channel_capacity: 64,
1187            batch_size: 1,
1188            flush_interval: Duration::from_secs(60),
1189            max_concurrent_exports: max_exports,
1190            backpressure_strategy: BackpressureStrategy::Drop,
1191        };
1192        let exporter = Exporter::start(config).unwrap();
1193
1194        for _ in 0..8 {
1195            exporter.send_traces(vec![0x0A, 0x00]);
1196        }
1197        exporter.flush().await;
1198
1199        assert!(
1200            max_concurrent.load(AtomOrd::SeqCst) <= max_exports,
1201            "expected max concurrent <= {}, got {}",
1202            max_exports,
1203            max_concurrent.load(AtomOrd::SeqCst)
1204        );
1205
1206        exporter.shutdown().await;
1207    }
1208}