1use std::time::Duration;
2
3use bytes::Bytes;
4use tokio::sync::mpsc;
5
6pub use rolly::BackpressureStrategy;
7
8#[derive(Debug)]
10pub enum ExportMessage {
11 Traces(Bytes),
12 Logs(Bytes),
13 Metrics(Bytes),
14 Flush(tokio::sync::oneshot::Sender<()>),
15 Shutdown(tokio::sync::oneshot::Sender<()>),
16}
17
18#[derive(Debug, Clone)]
22pub struct ExporterConfig {
23 pub traces_url: Option<String>,
24 pub logs_url: Option<String>,
25 pub metrics_url: Option<String>,
26 pub channel_capacity: usize,
27 pub batch_size: usize,
28 pub flush_interval: Duration,
29 pub max_concurrent_exports: usize,
30 pub backpressure_strategy: BackpressureStrategy,
31}
32
33impl Default for ExporterConfig {
34 fn default() -> Self {
35 Self {
36 traces_url: None,
37 logs_url: None,
38 metrics_url: None,
39 channel_capacity: 1024,
40 batch_size: 512,
41 flush_interval: Duration::from_secs(1),
42 max_concurrent_exports: 4,
43 backpressure_strategy: BackpressureStrategy::Drop,
44 }
45 }
46}
47
48#[derive(Clone)]
50pub struct Exporter {
51 tx: mpsc::Sender<ExportMessage>,
52 #[allow(dead_code)] backpressure_strategy: BackpressureStrategy,
54}
55
56impl Exporter {
57 pub fn start(config: ExporterConfig) -> Result<Self, StartError> {
64 let _handle = tokio::runtime::Handle::try_current().map_err(|_| StartError::NoRuntime)?;
66
67 if config.channel_capacity == 0 {
68 return Err(StartError::InvalidConfig("channel_capacity must be > 0"));
69 }
70 if config.flush_interval.is_zero() {
71 return Err(StartError::InvalidConfig("flush_interval must be > 0"));
72 }
73
74 let (tx, rx) = mpsc::channel(config.channel_capacity);
75 let client = reqwest::Client::builder()
76 .timeout(Duration::from_secs(10))
77 .build()
78 .map_err(StartError::HttpClient)?;
79 let batch_config = BatchConfig {
80 traces_url: config.traces_url,
81 logs_url: config.logs_url,
82 metrics_url: config.metrics_url,
83 batch_size: config.batch_size,
84 };
85 tokio::spawn(exporter_loop(
86 rx,
87 client,
88 batch_config,
89 config.flush_interval,
90 config.max_concurrent_exports.max(1),
91 ));
92 Ok(Self {
93 tx,
94 backpressure_strategy: config.backpressure_strategy,
95 })
96 }
97
98 #[cfg(any(test, feature = "_bench"))]
101 pub fn start_test() -> (Self, mpsc::Receiver<ExportMessage>) {
102 Self::start_test_with_capacity(64, BackpressureStrategy::Drop)
103 }
104
105 #[cfg(any(test, feature = "_bench"))]
107 pub fn start_test_with_capacity(
108 capacity: usize,
109 strategy: BackpressureStrategy,
110 ) -> (Self, mpsc::Receiver<ExportMessage>) {
111 let (tx, rx) = mpsc::channel(capacity);
112 (
113 Self {
114 tx,
115 backpressure_strategy: strategy,
116 },
117 rx,
118 )
119 }
120
121 pub fn send_traces(&self, data: Vec<u8>) {
123 self.try_send(ExportMessage::Traces(Bytes::from(data)));
124 }
125
126 pub fn send_logs(&self, data: Vec<u8>) {
128 self.try_send(ExportMessage::Logs(Bytes::from(data)));
129 }
130
131 pub fn send_metrics(&self, data: Vec<u8>) {
133 self.try_send(ExportMessage::Metrics(Bytes::from(data)));
134 }
135
136 fn try_send(&self, msg: ExportMessage) {
137 match self.tx.try_send(msg) {
138 Ok(()) => {}
139 Err(mpsc::error::TrySendError::Full(_)) => {
140 rolly::increment_dropped_total();
141 }
142 Err(mpsc::error::TrySendError::Closed(_)) => {
143 rolly::increment_dropped_total();
144 }
145 }
146 }
147
148 pub async fn flush(&self) {
150 let (tx, rx) = tokio::sync::oneshot::channel();
151 if self.tx.send(ExportMessage::Flush(tx)).await.is_ok() {
152 let _ = rx.await;
153 }
154 }
155
156 pub async fn shutdown(&self) {
159 let (tx, rx) = tokio::sync::oneshot::channel();
160 if self.tx.send(ExportMessage::Shutdown(tx)).await.is_ok() {
161 let _ = rx.await;
162 }
163 }
164}
165
166impl rolly::TelemetrySink for Exporter {
167 fn send_traces(&self, data: Vec<u8>) {
168 self.send_traces(data);
169 }
170 fn send_logs(&self, data: Vec<u8>) {
171 self.send_logs(data);
172 }
173 fn send_metrics(&self, data: Vec<u8>) {
174 self.send_metrics(data);
175 }
176}
177
178#[derive(Debug)]
182#[non_exhaustive]
183pub enum StartError {
184 HttpClient(reqwest::Error),
186 NoRuntime,
189 InvalidConfig(&'static str),
191}
192
193impl std::fmt::Display for StartError {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 match self {
196 Self::HttpClient(e) => write!(f, "failed to build HTTP client: {}", e),
197 Self::NoRuntime => write!(f, "no tokio runtime active"),
198 Self::InvalidConfig(msg) => write!(f, "invalid exporter config: {}", msg),
199 }
200 }
201}
202
203impl std::error::Error for StartError {
204 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
205 match self {
206 Self::HttpClient(e) => Some(e),
207 Self::NoRuntime | Self::InvalidConfig(_) => None,
208 }
209 }
210}
211
212const RETRY_DELAYS: [Duration; 3] = [
215 Duration::from_millis(100),
216 Duration::from_millis(400),
217 Duration::from_millis(1600),
218];
219
220struct BatchConfig {
222 traces_url: Option<String>,
223 logs_url: Option<String>,
224 metrics_url: Option<String>,
225 batch_size: usize,
226}
227
228struct BatchState {
230 traces: Vec<Bytes>,
231 logs: Vec<Bytes>,
232 metrics: Vec<Bytes>,
233 join_set: tokio::task::JoinSet<()>,
234}
235
236impl BatchState {
237 fn new() -> Self {
238 Self {
239 traces: Vec::new(),
240 logs: Vec::new(),
241 metrics: Vec::new(),
242 join_set: tokio::task::JoinSet::new(),
243 }
244 }
245
246 fn batches_empty(&self) -> bool {
247 self.traces.is_empty() && self.logs.is_empty() && self.metrics.is_empty()
248 }
249
250 async fn flush_and_drain(
253 &mut self,
254 config: &BatchConfig,
255 client: &reqwest::Client,
256 semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
257 ) {
258 for _ in 0..64 {
259 self.flush_all(config, client, semaphore);
260 self.drain().await;
261 if self.batches_empty() {
262 return;
263 }
264 tokio::task::yield_now().await;
266 }
267 }
268
269 fn collect(
272 &mut self,
273 msg: ExportMessage,
274 config: &BatchConfig,
275 client: &reqwest::Client,
276 semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
277 ) {
278 match msg {
279 ExportMessage::Traces(data) => {
280 self.traces.push(data);
281 if self.traces.len() >= config.batch_size {
282 flush_batch(
283 &mut self.traces,
284 config.traces_url.as_deref(),
285 client,
286 semaphore,
287 &mut self.join_set,
288 );
289 }
290 }
291 ExportMessage::Logs(data) => {
292 self.logs.push(data);
293 if self.logs.len() >= config.batch_size {
294 flush_batch(
295 &mut self.logs,
296 config.logs_url.as_deref(),
297 client,
298 semaphore,
299 &mut self.join_set,
300 );
301 }
302 }
303 ExportMessage::Metrics(data) => {
304 self.metrics.push(data);
305 if self.metrics.len() >= config.batch_size {
306 flush_batch(
307 &mut self.metrics,
308 config.metrics_url.as_deref(),
309 client,
310 semaphore,
311 &mut self.join_set,
312 );
313 }
314 }
315 ExportMessage::Flush(_) | ExportMessage::Shutdown(_) => {}
317 }
318 }
319
320 fn flush_all(
322 &mut self,
323 config: &BatchConfig,
324 client: &reqwest::Client,
325 semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
326 ) {
327 flush_batch(
328 &mut self.traces,
329 config.traces_url.as_deref(),
330 client,
331 semaphore,
332 &mut self.join_set,
333 );
334 flush_batch(
335 &mut self.logs,
336 config.logs_url.as_deref(),
337 client,
338 semaphore,
339 &mut self.join_set,
340 );
341 flush_batch(
342 &mut self.metrics,
343 config.metrics_url.as_deref(),
344 client,
345 semaphore,
346 &mut self.join_set,
347 );
348 }
349
350 async fn drain(&mut self) {
352 while self.join_set.join_next().await.is_some() {}
353 }
354
355 fn reap(&mut self) {
357 while self.join_set.try_join_next().is_some() {}
358 }
359}
360
361async fn exporter_loop(
362 mut rx: mpsc::Receiver<ExportMessage>,
363 client: reqwest::Client,
364 config: BatchConfig,
365 flush_interval: Duration,
366 max_concurrent_exports: usize,
367) {
368 use std::sync::Arc;
369 use tokio::sync::Semaphore;
370
371 let semaphore = Arc::new(Semaphore::new(max_concurrent_exports));
372 let mut state = BatchState::new();
373 let mut interval = tokio::time::interval(flush_interval);
374 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
375 interval.tick().await;
376
377 loop {
378 tokio::select! {
379 biased;
380 msg = rx.recv() => {
381 match msg {
382 Some(ExportMessage::Flush(done)) => {
383 state.flush_and_drain(&config, &client, &semaphore).await;
384 let _ = done.send(());
385 }
386 Some(ExportMessage::Shutdown(done)) => {
387 state.flush_and_drain(&config, &client, &semaphore).await;
388 let _ = done.send(());
389 break;
390 }
391 Some(msg) => {
392 state.collect(msg, &config, &client, &semaphore);
393 }
394 None => {
395 state.flush_and_drain(&config, &client, &semaphore).await;
396 break;
397 }
398 }
399 }
400 _ = interval.tick() => {
401 state.flush_all(&config, &client, &semaphore);
402 }
403 }
404
405 state.reap();
406 }
407}
408
409fn flush_batch(
416 batch: &mut Vec<Bytes>,
417 url: Option<&str>,
418 client: &reqwest::Client,
419 semaphore: &std::sync::Arc<tokio::sync::Semaphore>,
420 join_set: &mut tokio::task::JoinSet<()>,
421) {
422 if batch.is_empty() {
423 return;
424 }
425 let url = match url {
426 Some(u) => u,
427 None => {
428 batch.clear();
429 return;
430 }
431 };
432
433 let permit = match semaphore.clone().try_acquire_owned() {
434 Ok(permit) => permit,
435 Err(_) => return,
436 };
437
438 let total_len: usize = batch.iter().map(|b| b.len()).sum();
439 let mut payload = Vec::with_capacity(total_len);
440 for item in batch.drain(..) {
441 payload.extend_from_slice(&item);
442 }
443 let data = Bytes::from(payload);
444 let client = client.clone();
445 let url = url.to_string();
446
447 join_set.spawn(async move {
448 let _permit = permit;
449 post_with_retry(&client, &url, data).await;
450 });
451}
452
453async fn post_with_retry(client: &reqwest::Client, url: &str, data: Bytes) {
459 for (attempt, delay) in RETRY_DELAYS.iter().enumerate() {
460 match client
461 .post(url)
462 .header("Content-Type", "application/x-protobuf")
463 .body(data.clone()) .send()
465 .await
466 {
467 Ok(resp) if resp.status().is_success() => return,
468 Ok(resp) => {
469 eprintln!(
470 "rolly-tokio: export attempt {}/{} to {} failed: HTTP {}",
471 attempt + 1,
472 RETRY_DELAYS.len(),
473 url,
474 resp.status()
475 );
476 }
477 Err(e) => {
478 eprintln!(
479 "rolly-tokio: export attempt {}/{} to {} failed: {}",
480 attempt + 1,
481 RETRY_DELAYS.len(),
482 url,
483 e
484 );
485 }
486 }
487 tokio::time::sleep(*delay).await;
488 }
489 eprintln!(
490 "rolly-tokio: dropping batch after {} retries",
491 RETRY_DELAYS.len()
492 );
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn drop_counter_is_callable() {
501 let _count = rolly::telemetry_dropped_total();
502 }
503
504 #[tokio::test]
505 async fn drop_counter_increments_on_channel_full() {
506 let before = rolly::telemetry_dropped_total();
507 let (exporter, _rx) = Exporter::start_test_with_capacity(2, BackpressureStrategy::Drop);
509 exporter.send_traces(vec![0x0A]);
510 exporter.send_traces(vec![0x0A]);
511 exporter.send_traces(vec![0x0A]);
513 let delta = rolly::telemetry_dropped_total() - before;
514 assert!(delta >= 1, "expected at least 1 drop, got {}", delta);
515 }
516
517 #[tokio::test]
518 async fn drop_counter_increments_for_logs_and_traces() {
519 let before = rolly::telemetry_dropped_total();
520 let (exporter, _rx) = Exporter::start_test_with_capacity(1, BackpressureStrategy::Drop);
521 exporter.send_traces(vec![0x0A]); exporter.send_traces(vec![0x0A]); exporter.send_logs(vec![0x0A]); let delta = rolly::telemetry_dropped_total() - before;
525 assert!(delta >= 2, "expected at least 2 drops, got {}", delta);
526 }
527
528 fn test_config(traces_url: Option<String>, logs_url: Option<String>) -> ExporterConfig {
529 ExporterConfig {
530 traces_url,
531 logs_url,
532 metrics_url: None,
533 channel_capacity: 16,
534 batch_size: 512,
535 flush_interval: Duration::from_secs(60),
536 max_concurrent_exports: 4,
537 backpressure_strategy: BackpressureStrategy::Drop,
538 }
539 }
540
541 #[tokio::test]
542 async fn exporter_queues_and_flushes_without_panic() {
543 let config = test_config(
544 Some("http://127.0.0.1:1/v1/traces".to_string()),
545 Some("http://127.0.0.1:1/v1/logs".to_string()),
546 );
547 let exporter = Exporter::start(config).unwrap();
548
549 exporter.send_traces(vec![0x0A, 0x00]);
550 exporter.send_logs(vec![0x0A, 0x00]);
551
552 exporter.shutdown().await;
553 }
554
555 #[tokio::test]
556 async fn exporter_flush_completes() {
557 let config = test_config(
558 Some("http://127.0.0.1:1/v1/traces".to_string()),
559 Some("http://127.0.0.1:1/v1/logs".to_string()),
560 );
561 let exporter = Exporter::start(config).unwrap();
562
563 tokio::time::timeout(Duration::from_secs(5), exporter.flush())
564 .await
565 .expect("flush should complete within timeout");
566
567 exporter.shutdown().await;
568 }
569
570 async fn respond_with_status(listener: &tokio::net::TcpListener, status: &str) {
571 use tokio::io::{AsyncReadExt, AsyncWriteExt};
572 let (mut stream, _) = listener.accept().await.unwrap();
573 let mut buf = [0u8; 65536];
574 let _ = stream.read(&mut buf).await;
575 let resp = format!(
576 "HTTP/1.1 {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
577 status
578 );
579 stream.write_all(resp.as_bytes()).await.unwrap();
580 }
581
582 #[tokio::test]
583 async fn post_with_retry_succeeds_on_first_attempt() {
584 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
585 let addr = listener.local_addr().unwrap();
586
587 tokio::spawn(async move {
588 respond_with_status(&listener, "200 OK").await;
589 });
590
591 let client = reqwest::Client::new();
592 let url = format!("http://{}/v1/traces", addr);
593 post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
594 }
595
596 #[tokio::test]
597 async fn post_with_retry_retries_on_500_then_succeeds() {
598 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
599 let addr = listener.local_addr().unwrap();
600
601 tokio::spawn(async move {
602 respond_with_status(&listener, "500 Internal Server Error").await;
603 respond_with_status(&listener, "200 OK").await;
604 });
605
606 let client = reqwest::Client::new();
607 let url = format!("http://{}/v1/traces", addr);
608 post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
609 }
610
611 #[tokio::test]
612 async fn post_with_retry_gives_up_after_all_retries() {
613 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
615 let addr = listener.local_addr().unwrap();
616
617 tokio::spawn(async move {
618 for _ in 0..3 {
619 respond_with_status(&listener, "500 Internal Server Error").await;
620 }
621 });
622
623 let client = reqwest::Client::new();
624 let url = format!("http://{}/v1/traces", addr);
625 post_with_retry(&client, &url, Bytes::from_static(b"test")).await;
626 }
628
629 #[tokio::test]
630 async fn exporter_sends_to_correct_url_paths() {
631 use tokio::io::{AsyncReadExt, AsyncWriteExt};
632
633 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
634 let addr = listener.local_addr().unwrap();
635
636 let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
637
638 tokio::spawn(async move {
639 loop {
640 let Ok((mut stream, _)) = listener.accept().await else {
641 break;
642 };
643 let path_tx = path_tx.clone();
644 tokio::spawn(async move {
645 let mut buf = [0u8; 4096];
646 let n = stream.read(&mut buf).await.unwrap_or(0);
647 let request = String::from_utf8_lossy(&buf[..n]);
648 let path = request
649 .lines()
650 .next()
651 .unwrap_or("")
652 .split_whitespace()
653 .nth(1)
654 .unwrap_or("")
655 .to_string();
656 let _ = path_tx.send(path).await;
657 let _ = stream
658 .write_all(
659 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
660 )
661 .await;
662 });
663 }
664 });
665
666 let config = test_config(
667 Some(format!("http://{}/v1/traces", addr)),
668 Some(format!("http://{}/v1/logs", addr)),
669 );
670 let exporter = Exporter::start(config).unwrap();
671
672 exporter.send_traces(vec![0x0A, 0x00]);
673 exporter.send_logs(vec![0x0A, 0x00]);
674 exporter.flush().await;
675
676 let mut paths = Vec::new();
677 while let Ok(Some(path)) =
678 tokio::time::timeout(Duration::from_secs(5), path_rx.recv()).await
679 {
680 paths.push(path);
681 if paths.len() >= 2 {
682 break;
683 }
684 }
685
686 assert!(
687 paths.contains(&"/v1/traces".to_string()),
688 "missing /v1/traces, got {:?}",
689 paths
690 );
691 assert!(
692 paths.contains(&"/v1/logs".to_string()),
693 "missing /v1/logs, got {:?}",
694 paths
695 );
696
697 exporter.shutdown().await;
698 }
699
700 #[tokio::test]
701 async fn exporter_skips_logs_when_no_logs_url() {
702 use tokio::io::{AsyncReadExt, AsyncWriteExt};
703
704 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
705 let addr = listener.local_addr().unwrap();
706
707 let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
708
709 tokio::spawn(async move {
710 loop {
711 let Ok((mut stream, _)) = listener.accept().await else {
712 break;
713 };
714 let path_tx = path_tx.clone();
715 tokio::spawn(async move {
716 let mut buf = [0u8; 4096];
717 let n = stream.read(&mut buf).await.unwrap_or(0);
718 let request = String::from_utf8_lossy(&buf[..n]);
719 let path = request
720 .lines()
721 .next()
722 .unwrap_or("")
723 .split_whitespace()
724 .nth(1)
725 .unwrap_or("")
726 .to_string();
727 let _ = path_tx.send(path).await;
728 let _ = stream
729 .write_all(
730 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
731 )
732 .await;
733 });
734 }
735 });
736
737 let config = test_config(Some(format!("http://{}/v1/traces", addr)), None);
739 let exporter = Exporter::start(config).unwrap();
740
741 exporter.send_traces(vec![0x0A, 0x00]);
742 exporter.send_logs(vec![0x0A, 0x00]); exporter.flush().await;
744
745 let mut paths = Vec::new();
746 while let Ok(Some(path)) =
747 tokio::time::timeout(Duration::from_millis(500), path_rx.recv()).await
748 {
749 paths.push(path);
750 }
751
752 assert!(
753 paths.contains(&"/v1/traces".to_string()),
754 "expected /v1/traces, got {:?}",
755 paths
756 );
757 assert!(
758 !paths.contains(&"/v1/logs".to_string()),
759 "should NOT have received /v1/logs, got {:?}",
760 paths
761 );
762
763 exporter.shutdown().await;
764 }
765
766 #[test]
767 fn exporter_config_with_batch_settings() {
768 let config = ExporterConfig {
769 traces_url: None,
770 logs_url: None,
771 channel_capacity: 16,
772 metrics_url: None,
773 batch_size: 100,
774 flush_interval: Duration::from_millis(500),
775 max_concurrent_exports: 2,
776 backpressure_strategy: BackpressureStrategy::Drop,
777 };
778 assert_eq!(config.batch_size, 100);
779 assert_eq!(config.flush_interval, Duration::from_millis(500));
780 assert_eq!(config.max_concurrent_exports, 2);
781 }
782
783 #[tokio::test]
784 async fn exporter_batches_traces_up_to_batch_size() {
785 use tokio::io::{AsyncReadExt, AsyncWriteExt};
786
787 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
788 let addr = listener.local_addr().unwrap();
789
790 let (body_tx, mut body_rx) = mpsc::channel::<Vec<u8>>(16);
791
792 tokio::spawn(async move {
793 loop {
794 let Ok((mut stream, _)) = listener.accept().await else {
795 break;
796 };
797 let body_tx = body_tx.clone();
798 tokio::spawn(async move {
799 let mut buf = vec![0u8; 65536];
800 let n = stream.read(&mut buf).await.unwrap_or(0);
801 buf.truncate(n);
802 let request = &buf[..n];
804 if let Some(pos) = request.windows(4).position(|w| w == b"\r\n\r\n") {
805 let body = request[pos + 4..].to_vec();
806 let _ = body_tx.send(body).await;
807 }
808 let _ = stream
809 .write_all(
810 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
811 )
812 .await;
813 });
814 }
815 });
816
817 let config = ExporterConfig {
818 traces_url: Some(format!("http://{}/v1/traces", addr)),
819 logs_url: None,
820 metrics_url: None,
821 channel_capacity: 16,
822 batch_size: 3,
823 flush_interval: Duration::from_secs(60),
824 max_concurrent_exports: 4,
825 backpressure_strategy: BackpressureStrategy::Drop,
826 };
827 let exporter = Exporter::start(config).unwrap();
828
829 let payload = vec![0x0A, 0x00]; exporter.send_traces(payload.clone());
832 exporter.send_traces(payload.clone());
833 exporter.send_traces(payload.clone());
834 exporter.flush().await;
835
836 let body = tokio::time::timeout(Duration::from_secs(5), body_rx.recv())
838 .await
839 .expect("timeout waiting for POST")
840 .expect("channel closed");
841
842 assert_eq!(body.len(), 6);
844
845 exporter.shutdown().await;
846 }
847
848 #[tokio::test]
849 async fn exporter_flushes_on_interval() {
850 use tokio::io::{AsyncReadExt, AsyncWriteExt};
851
852 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
853 let addr = listener.local_addr().unwrap();
854
855 let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
856
857 tokio::spawn(async move {
858 loop {
859 let Ok((mut stream, _)) = listener.accept().await else {
860 break;
861 };
862 let done_tx = done_tx.clone();
863 tokio::spawn(async move {
864 let mut buf = [0u8; 65536];
865 let _ = stream.read(&mut buf).await;
866 let _ = stream
867 .write_all(
868 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
869 )
870 .await;
871 let _ = done_tx.send(()).await;
872 });
873 }
874 });
875
876 let config = ExporterConfig {
877 traces_url: Some(format!("http://{}/v1/traces", addr)),
878 logs_url: None,
879 metrics_url: None,
880 channel_capacity: 16,
881 batch_size: 100, flush_interval: Duration::from_millis(200), max_concurrent_exports: 4,
884 backpressure_strategy: BackpressureStrategy::Drop,
885 };
886 let exporter = Exporter::start(config).unwrap();
887
888 exporter.send_traces(vec![0x0A, 0x00]);
890
891 let result = tokio::time::timeout(Duration::from_millis(1000), done_rx.recv()).await;
893 assert!(result.is_ok(), "data should arrive via interval flush");
894
895 exporter.shutdown().await;
896 }
897
898 #[tokio::test]
899 async fn exporter_explicit_flush_drains_batch() {
900 use tokio::io::{AsyncReadExt, AsyncWriteExt};
901
902 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
903 let addr = listener.local_addr().unwrap();
904
905 let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
906
907 tokio::spawn(async move {
908 loop {
909 let Ok((mut stream, _)) = listener.accept().await else {
910 break;
911 };
912 let done_tx = done_tx.clone();
913 tokio::spawn(async move {
914 let mut buf = [0u8; 65536];
915 let _ = stream.read(&mut buf).await;
916 let _ = stream
917 .write_all(
918 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
919 )
920 .await;
921 let _ = done_tx.send(()).await;
922 });
923 }
924 });
925
926 let config = ExporterConfig {
927 traces_url: Some(format!("http://{}/v1/traces", addr)),
928 logs_url: None,
929 metrics_url: None,
930 channel_capacity: 16,
931 batch_size: 100, flush_interval: Duration::from_secs(60), max_concurrent_exports: 4,
934 backpressure_strategy: BackpressureStrategy::Drop,
935 };
936 let exporter = Exporter::start(config).unwrap();
937
938 exporter.send_traces(vec![0x0A, 0x00]);
939 exporter.flush().await;
940
941 let result = tokio::time::timeout(Duration::from_millis(500), done_rx.recv()).await;
943 assert!(result.is_ok(), "flush should drain pending batch");
944
945 exporter.shutdown().await;
946 }
947
948 #[tokio::test]
949 async fn exporter_shutdown_drains_remaining_batch() {
950 use tokio::io::{AsyncReadExt, AsyncWriteExt};
951
952 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
953 let addr = listener.local_addr().unwrap();
954
955 let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
956
957 tokio::spawn(async move {
958 loop {
959 let Ok((mut stream, _)) = listener.accept().await else {
960 break;
961 };
962 let done_tx = done_tx.clone();
963 tokio::spawn(async move {
964 let mut buf = [0u8; 65536];
965 let _ = stream.read(&mut buf).await;
966 let _ = stream
967 .write_all(
968 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
969 )
970 .await;
971 let _ = done_tx.send(()).await;
972 });
973 }
974 });
975
976 let config = ExporterConfig {
977 traces_url: Some(format!("http://{}/v1/traces", addr)),
978 logs_url: None,
979 metrics_url: None,
980 channel_capacity: 16,
981 batch_size: 100,
982 flush_interval: Duration::from_secs(60),
983 max_concurrent_exports: 4,
984 backpressure_strategy: BackpressureStrategy::Drop,
985 };
986 let exporter = Exporter::start(config).unwrap();
987
988 exporter.send_traces(vec![0x0A, 0x00]);
989 exporter.shutdown().await;
990
991 let result = tokio::time::timeout(Duration::from_millis(500), done_rx.recv()).await;
993 assert!(result.is_ok(), "shutdown should drain remaining batch");
994 }
995
996 #[tokio::test]
997 async fn exporter_batches_traces_and_logs_independently() {
998 use tokio::io::{AsyncReadExt, AsyncWriteExt};
999
1000 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1001 let addr = listener.local_addr().unwrap();
1002
1003 let (path_tx, mut path_rx) = mpsc::channel::<String>(16);
1004
1005 tokio::spawn(async move {
1006 loop {
1007 let Ok((mut stream, _)) = listener.accept().await else {
1008 break;
1009 };
1010 let path_tx = path_tx.clone();
1011 tokio::spawn(async move {
1012 let mut buf = [0u8; 4096];
1013 let n = stream.read(&mut buf).await.unwrap_or(0);
1014 let request = String::from_utf8_lossy(&buf[..n]);
1015 let path = request
1016 .lines()
1017 .next()
1018 .unwrap_or("")
1019 .split_whitespace()
1020 .nth(1)
1021 .unwrap_or("")
1022 .to_string();
1023 let _ = path_tx.send(path).await;
1024 let _ = stream
1025 .write_all(
1026 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1027 )
1028 .await;
1029 });
1030 }
1031 });
1032
1033 let config = ExporterConfig {
1034 traces_url: Some(format!("http://{}/v1/traces", addr)),
1035 logs_url: Some(format!("http://{}/v1/logs", addr)),
1036 metrics_url: None,
1037 channel_capacity: 16,
1038 batch_size: 2,
1039 flush_interval: Duration::from_secs(60),
1040 max_concurrent_exports: 4,
1041 backpressure_strategy: BackpressureStrategy::Drop,
1042 };
1043 let exporter = Exporter::start(config).unwrap();
1044
1045 exporter.send_traces(vec![0x0A, 0x00]);
1047 exporter.send_traces(vec![0x0A, 0x00]);
1048 exporter.send_logs(vec![0x0A, 0x00]);
1050 exporter.send_logs(vec![0x0A, 0x00]);
1051 exporter.flush().await;
1052
1053 let mut paths = Vec::new();
1054 while let Ok(Some(path)) =
1055 tokio::time::timeout(Duration::from_secs(5), path_rx.recv()).await
1056 {
1057 paths.push(path);
1058 if paths.len() >= 2 {
1059 break;
1060 }
1061 }
1062
1063 assert!(
1064 paths.contains(&"/v1/traces".to_string()),
1065 "missing /v1/traces, got {:?}",
1066 paths
1067 );
1068 assert!(
1069 paths.contains(&"/v1/logs".to_string()),
1070 "missing /v1/logs, got {:?}",
1071 paths
1072 );
1073
1074 exporter.shutdown().await;
1075 }
1076
1077 #[tokio::test]
1078 async fn exporter_sends_concurrently_not_sequentially() {
1079 use std::sync::atomic::{AtomicUsize, Ordering as AtomOrd};
1080 use std::sync::Arc;
1081 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1082
1083 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1084 let addr = listener.local_addr().unwrap();
1085
1086 let concurrent = Arc::new(AtomicUsize::new(0));
1087 let max_concurrent = Arc::new(AtomicUsize::new(0));
1088
1089 let concurrent_c = concurrent.clone();
1090 let max_concurrent_c = max_concurrent.clone();
1091
1092 tokio::spawn(async move {
1093 loop {
1094 let Ok((mut stream, _)) = listener.accept().await else {
1095 break;
1096 };
1097 let conc = concurrent_c.clone();
1098 let max_conc = max_concurrent_c.clone();
1099 tokio::spawn(async move {
1100 let mut buf = [0u8; 65536];
1101 let _ = stream.read(&mut buf).await;
1102 let current = conc.fetch_add(1, AtomOrd::SeqCst) + 1;
1103 max_conc.fetch_max(current, AtomOrd::SeqCst);
1104 tokio::time::sleep(Duration::from_millis(100)).await;
1106 conc.fetch_sub(1, AtomOrd::SeqCst);
1107 let _ = stream
1108 .write_all(
1109 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1110 )
1111 .await;
1112 });
1113 }
1114 });
1115
1116 let config = ExporterConfig {
1117 traces_url: Some(format!("http://{}/v1/traces", addr)),
1118 logs_url: None,
1119 metrics_url: None,
1120 channel_capacity: 64,
1121 batch_size: 1, flush_interval: Duration::from_secs(60),
1123 max_concurrent_exports: 8,
1124 backpressure_strategy: BackpressureStrategy::Drop,
1125 };
1126 let exporter = Exporter::start(config).unwrap();
1127
1128 for _ in 0..8 {
1130 exporter.send_traces(vec![0x0A, 0x00]);
1131 }
1132 exporter.flush().await;
1133
1134 assert!(
1135 max_concurrent.load(AtomOrd::SeqCst) > 1,
1136 "expected concurrent exports > 1, got {}",
1137 max_concurrent.load(AtomOrd::SeqCst)
1138 );
1139
1140 exporter.shutdown().await;
1141 }
1142
1143 #[tokio::test]
1144 async fn exporter_limits_concurrent_exports() {
1145 use std::sync::atomic::{AtomicUsize, Ordering as AtomOrd};
1146 use std::sync::Arc;
1147 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1148
1149 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1150 let addr = listener.local_addr().unwrap();
1151
1152 let max_concurrent = Arc::new(AtomicUsize::new(0));
1153 let concurrent = Arc::new(AtomicUsize::new(0));
1154
1155 let concurrent_c = concurrent.clone();
1156 let max_concurrent_c = max_concurrent.clone();
1157
1158 tokio::spawn(async move {
1159 loop {
1160 let Ok((mut stream, _)) = listener.accept().await else {
1161 break;
1162 };
1163 let conc = concurrent_c.clone();
1164 let max_conc = max_concurrent_c.clone();
1165 tokio::spawn(async move {
1166 let mut buf = [0u8; 65536];
1167 let _ = stream.read(&mut buf).await;
1168 let current = conc.fetch_add(1, AtomOrd::SeqCst) + 1;
1169 max_conc.fetch_max(current, AtomOrd::SeqCst);
1170 tokio::time::sleep(Duration::from_millis(100)).await;
1171 conc.fetch_sub(1, AtomOrd::SeqCst);
1172 let _ = stream
1173 .write_all(
1174 b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
1175 )
1176 .await;
1177 });
1178 }
1179 });
1180
1181 let max_exports = 2;
1182 let config = ExporterConfig {
1183 traces_url: Some(format!("http://{}/v1/traces", addr)),
1184 logs_url: None,
1185 metrics_url: None,
1186 channel_capacity: 64,
1187 batch_size: 1,
1188 flush_interval: Duration::from_secs(60),
1189 max_concurrent_exports: max_exports,
1190 backpressure_strategy: BackpressureStrategy::Drop,
1191 };
1192 let exporter = Exporter::start(config).unwrap();
1193
1194 for _ in 0..8 {
1195 exporter.send_traces(vec![0x0A, 0x00]);
1196 }
1197 exporter.flush().await;
1198
1199 assert!(
1200 max_concurrent.load(AtomOrd::SeqCst) <= max_exports,
1201 "expected max concurrent <= {}, got {}",
1202 max_exports,
1203 max_concurrent.load(AtomOrd::SeqCst)
1204 );
1205
1206 exporter.shutdown().await;
1207 }
1208}