1use crate::global;
38use crate::sdk::trace::runtime::{TraceRuntime, TrySend};
39use crate::sdk::trace::Span;
40use crate::{
41 sdk::export::trace::{ExportResult, SpanData, SpanExporter},
42 trace::{TraceError, TraceResult},
43 Context,
44};
45use futures_channel::oneshot;
46use futures_util::future::{self, Either};
47use futures_util::{pin_mut, stream, StreamExt as _};
48use std::{env, fmt, str::FromStr, thread, time::Duration};
49use std::any::Any;
50
51const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
53const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
55const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
57const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
59const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
61const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
63const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
65const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
67
68pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
72 fn on_start(&self, span: &mut Span, cx: &Context);
76 fn on_end(&self, span: SpanData);
80 fn force_flush(&self) -> TraceResult<()>;
82 fn shutdown(&mut self) -> TraceResult<()>;
85 fn as_any(&self) -> &dyn Any;
87}
88
89#[derive(Debug)]
110pub struct SimpleSpanProcessor {
111 sender: crossbeam_channel::Sender<Option<SpanData>>,
112 shutdown: crossbeam_channel::Receiver<()>,
113}
114
115impl SimpleSpanProcessor {
116 pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
117 let (span_tx, span_rx) = crossbeam_channel::unbounded();
118 let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
119
120 let _ = thread::Builder::new()
121 .name("opentelemetry-exporter".to_string())
122 .spawn(move || {
123 while let Ok(Some(span)) = span_rx.recv() {
124 if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) {
125 global::handle_error(err);
126 }
127 }
128
129 exporter.shutdown();
130
131 if let Err(err) = shutdown_tx.send(()) {
132 global::handle_error(TraceError::from(format!(
133 "could not send shutdown: {:?}",
134 err
135 )));
136 }
137 });
138
139 SimpleSpanProcessor {
140 sender: span_tx,
141 shutdown: shutdown_rx,
142 }
143 }
144}
145
146impl SpanProcessor for SimpleSpanProcessor {
147 fn on_start(&self, _span: &mut Span, _cx: &Context) {
148 }
150
151 fn on_end(&self, span: SpanData) {
152 if let Err(err) = self.sender.send(Some(span)) {
153 global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
154 }
155 }
156
157 fn force_flush(&self) -> TraceResult<()> {
158 Ok(())
160 }
161
162 fn shutdown(&mut self) -> TraceResult<()> {
163 if self.sender.send(None).is_ok() {
164 if let Err(err) = self.shutdown.recv() {
165 global::handle_error(TraceError::from(format!(
166 "error shutting down span processor: {:?}",
167 err
168 )))
169 }
170 }
171
172 Ok(())
173 }
174
175 fn as_any(&self) -> &dyn Any {
176 self
177 }
178}
179
180pub struct BatchSpanProcessor<R: TraceRuntime> {
239 message_sender: R::Sender,
240}
241
242impl<R: TraceRuntime> fmt::Debug for BatchSpanProcessor<R> {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 f.debug_struct("BatchSpanProcessor")
245 .field("message_sender", &self.message_sender)
246 .finish()
247 }
248}
249
250impl<R: TraceRuntime> SpanProcessor for BatchSpanProcessor<R> {
251 fn on_start(&self, _span: &mut Span, _cx: &Context) {
252 }
254
255 fn on_end(&self, span: SpanData) {
256 let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
257
258 if let Err(err) = result {
259 global::handle_error(err);
260 }
261 }
262
263 fn force_flush(&self) -> TraceResult<()> {
264 let (res_sender, res_receiver) = oneshot::channel();
265 self.message_sender
266 .try_send(BatchMessage::Flush(Some(res_sender)))?;
267
268 futures_executor::block_on(res_receiver)
269 .map_err(|err| TraceError::Other(err.into()))
270 .and_then(|identity| identity)
271 }
272
273 fn shutdown(&mut self) -> TraceResult<()> {
274 let (res_sender, res_receiver) = oneshot::channel();
275 self.message_sender
276 .try_send(BatchMessage::Shutdown(res_sender))?;
277
278 futures_executor::block_on(res_receiver)
279 .map_err(|err| TraceError::Other(err.into()))
280 .and_then(|identity| identity)
281 }
282
283 fn as_any(&self) -> &dyn Any {
284 self
285 }
286}
287
288#[allow(clippy::large_enum_variant)]
293#[derive(Debug)]
294pub enum BatchMessage {
295 ExportSpan(SpanData),
297 Flush(Option<oneshot::Sender<ExportResult>>),
300 Shutdown(oneshot::Sender<ExportResult>),
302}
303
304impl<R: TraceRuntime> BatchSpanProcessor<R> {
305 pub(crate) fn new(
306 mut exporter: Box<dyn SpanExporter>,
307 config: BatchConfig,
308 runtime: R,
309 ) -> Self {
310 let (message_sender, message_receiver) =
311 runtime.batch_message_channel(config.max_queue_size);
312 let ticker = runtime
313 .interval(config.scheduled_delay)
314 .map(|_| BatchMessage::Flush(None));
315 let timeout_runtime = runtime.clone();
316
317 runtime.spawn(Box::pin(async move {
319 let mut spans = Vec::new();
320 let mut messages = Box::pin(stream::select(message_receiver, ticker));
321
322 while let Some(message) = messages.next().await {
323 match message {
324 BatchMessage::ExportSpan(span) => {
326 spans.push(span);
327
328 if spans.len() == config.max_export_batch_size {
329 let result = export_with_timeout(
330 config.max_export_timeout,
331 exporter.as_mut(),
332 &timeout_runtime,
333 spans.split_off(0),
334 )
335 .await;
336
337 if let Err(err) = result {
338 global::handle_error(err);
339 }
340 }
341 }
342 BatchMessage::Flush(res_channel) => {
344 let result = export_with_timeout(
345 config.max_export_timeout,
346 exporter.as_mut(),
347 &timeout_runtime,
348 spans.split_off(0),
349 )
350 .await;
351
352 if let Some(channel) = res_channel {
353 if let Err(result) = channel.send(result) {
354 global::handle_error(TraceError::from(format!(
355 "failed to send flush result: {:?}",
356 result
357 )));
358 }
359 } else if let Err(err) = result {
360 global::handle_error(err);
361 }
362 }
363 BatchMessage::Shutdown(ch) => {
365 let result = export_with_timeout(
366 config.max_export_timeout,
367 exporter.as_mut(),
368 &timeout_runtime,
369 spans.split_off(0),
370 )
371 .await;
372
373 exporter.shutdown();
374
375 if let Err(result) = ch.send(result) {
376 global::handle_error(TraceError::from(format!(
377 "failed to send batch processor shutdown result: {:?}",
378 result
379 )));
380 }
381
382 break;
383 }
384 }
385 }
386 }));
387
388 BatchSpanProcessor { message_sender }
390 }
391
392 pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
394 where
395 E: SpanExporter,
396 {
397 BatchSpanProcessorBuilder {
398 exporter,
399 config: BatchConfig::default(),
400 runtime,
401 }
402 }
403}
404
405async fn export_with_timeout<R, E>(
406 time_out: Duration,
407 exporter: &mut E,
408 runtime: &R,
409 batch: Vec<SpanData>,
410) -> ExportResult
411where
412 R: TraceRuntime,
413 E: SpanExporter + ?Sized,
414{
415 if batch.is_empty() {
416 return Ok(());
417 }
418
419 let export = exporter.export(batch);
420 let timeout = runtime.delay(time_out);
421 pin_mut!(export);
422 pin_mut!(timeout);
423 match future::select(export, timeout).await {
424 Either::Left((export_res, _)) => export_res,
425 Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
426 }
427}
428
429#[derive(Debug)]
431pub struct BatchConfig {
432 max_queue_size: usize,
435
436 scheduled_delay: Duration,
439
440 max_export_batch_size: usize,
445
446 max_export_timeout: Duration,
448}
449
450impl Default for BatchConfig {
451 fn default() -> Self {
452 let mut config = BatchConfig {
453 max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
454 scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
455 max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
456 max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
457 };
458
459 if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
460 .ok()
461 .and_then(|queue_size| usize::from_str(&queue_size).ok())
462 {
463 config.max_queue_size = max_queue_size;
464 }
465
466 if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
467 .ok()
468 .or_else(|| env::var("OTEL_BSP_SCHEDULE_DELAY_MILLIS").ok())
469 .and_then(|delay| u64::from_str(&delay).ok())
470 {
471 config.scheduled_delay = Duration::from_millis(scheduled_delay);
472 }
473
474 if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
475 .ok()
476 .and_then(|batch_size| usize::from_str(&batch_size).ok())
477 {
478 config.max_export_batch_size = max_export_batch_size;
479 }
480
481 if config.max_export_batch_size > config.max_queue_size {
484 config.max_export_batch_size = config.max_queue_size;
485 }
486
487 if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
488 .ok()
489 .or_else(|| env::var("OTEL_BSP_EXPORT_TIMEOUT_MILLIS").ok())
490 .and_then(|timeout| u64::from_str(&timeout).ok())
491 {
492 config.max_export_timeout = Duration::from_millis(max_export_timeout);
493 }
494
495 config
496 }
497}
498
499#[derive(Debug)]
502pub struct BatchSpanProcessorBuilder<E, R> {
503 exporter: E,
504 config: BatchConfig,
505 runtime: R,
506}
507
508impl<E, R> BatchSpanProcessorBuilder<E, R>
509where
510 E: SpanExporter + 'static,
511 R: TraceRuntime,
512{
513 pub fn with_max_queue_size(self, size: usize) -> Self {
515 let mut config = self.config;
516 config.max_queue_size = size;
517
518 BatchSpanProcessorBuilder { config, ..self }
519 }
520
521 pub fn with_scheduled_delay(self, delay: Duration) -> Self {
523 let mut config = self.config;
524 config.scheduled_delay = delay;
525
526 BatchSpanProcessorBuilder { config, ..self }
527 }
528
529 pub fn with_max_timeout(self, timeout: Duration) -> Self {
531 let mut config = self.config;
532 config.max_export_timeout = timeout;
533
534 BatchSpanProcessorBuilder { config, ..self }
535 }
536
537 pub fn with_max_export_batch_size(self, size: usize) -> Self {
541 let mut config = self.config;
542 if size > config.max_queue_size {
543 config.max_export_batch_size = config.max_queue_size;
544 } else {
545 config.max_export_batch_size = size;
546 }
547
548 BatchSpanProcessorBuilder { config, ..self }
549 }
550
551 pub fn build(self) -> BatchSpanProcessor<R> {
553 BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
554 }
555}
556
557#[cfg(all(test, feature = "testing", feature = "trace"))]
558mod tests {
559 use super::{
560 BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
561 OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
562 OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
563 };
564 use crate::runtime;
565 use crate::sdk::export::trace::{stdout, ExportResult, SpanData, SpanExporter};
566 use crate::sdk::trace::BatchConfig;
567 use crate::testing::trace::{
568 new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
569 };
570 use async_trait::async_trait;
571 use std::fmt::Debug;
572 use std::future::Future;
573 use std::time::Duration;
574
575 #[test]
576 fn simple_span_processor_on_end_calls_export() {
577 let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
578 let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
579 processor.on_end(new_test_export_span_data());
580 assert!(rx_export.recv().is_ok());
581 let _result = processor.shutdown();
582 }
583
584 #[test]
585 fn simple_span_processor_shutdown_calls_shutdown() {
586 let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
587 let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
588 let _result = processor.shutdown();
589 assert!(rx_shutdown.try_recv().is_ok());
590 }
591
592 #[test]
593 fn test_build_batch_span_processor_builder() {
594 std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
595 std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT, "2046");
596 std::env::set_var(OTEL_BSP_SCHEDULE_DELAY, "I am not number");
597
598 let mut builder = BatchSpanProcessor::builder(
599 stdout::Exporter::new(std::io::stdout(), true),
600 runtime::Tokio,
601 );
602 assert_eq!(builder.config.max_export_batch_size, 500);
604 assert_eq!(
605 builder.config.scheduled_delay,
606 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
607 );
608 assert_eq!(
609 builder.config.max_queue_size,
610 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
611 );
612 assert_eq!(
613 builder.config.max_export_timeout,
614 Duration::from_millis(2046)
615 );
616
617 std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120");
618 builder = BatchSpanProcessor::builder(
619 stdout::Exporter::new(std::io::stdout(), true),
620 runtime::Tokio,
621 );
622
623 assert_eq!(builder.config.max_export_batch_size, 120);
624 assert_eq!(builder.config.max_queue_size, 120);
625 }
626
627 #[tokio::test]
628 async fn test_batch_span_processor() {
629 let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
630 let config = BatchConfig {
631 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
633 };
634 let mut processor =
635 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
636 let handle = tokio::spawn(async move {
637 loop {
638 if let Some(span) = export_receiver.recv().await {
639 assert_eq!(span.span_context, new_test_export_span_data().span_context);
640 break;
641 }
642 }
643 });
644 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
646 let flush_res = processor.force_flush();
647 assert!(flush_res.is_ok());
648 let _shutdown_result = processor.shutdown();
649
650 assert!(
651 tokio::time::timeout(Duration::from_secs(5), handle)
652 .await
653 .is_ok(),
654 "timed out in 5 seconds. force_flush may not export any data when called"
655 );
656 }
657
658 struct BlockingExporter<D> {
659 delay_for: Duration,
660 delay_fn: D,
661 }
662
663 impl<D, DS> Debug for BlockingExporter<D>
664 where
665 D: Fn(Duration) -> DS + 'static + Send + Sync,
666 DS: Future<Output = ()> + Send + Sync + 'static,
667 {
668 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669 f.write_str("blocking exporter for testing")
670 }
671 }
672
673 #[async_trait]
674 impl<D, DS> SpanExporter for BlockingExporter<D>
675 where
676 D: Fn(Duration) -> DS + 'static + Send + Sync,
677 DS: Future<Output = ()> + Send + Sync + 'static,
678 {
679 async fn export(&mut self, _batch: Vec<SpanData>) -> ExportResult {
680 (self.delay_fn)(self.delay_for).await;
681 Ok(())
682 }
683 }
684
685 #[test]
686 fn test_timeout_tokio_timeout() {
687 let runtime = tokio::runtime::Builder::new_multi_thread()
691 .enable_all()
692 .build()
693 .unwrap();
694 runtime.block_on(timeout_test_tokio(true));
695 }
696
697 #[test]
698 fn test_timeout_tokio_not_timeout() {
699 let runtime = tokio::runtime::Builder::new_multi_thread()
700 .enable_all()
701 .build()
702 .unwrap();
703 runtime.block_on(timeout_test_tokio(false));
704 }
705
706 #[test]
707 #[cfg(feature = "rt-async-std")]
708 fn test_timeout_async_std_timeout() {
709 async_std::task::block_on(timeout_test_std_async(true));
710 }
711
712 #[test]
713 #[cfg(feature = "rt-async-std")]
714 fn test_timeout_async_std_not_timeout() {
715 async_std::task::block_on(timeout_test_std_async(false));
716 }
717
718 #[cfg(feature = "rt-async-std")]
721 async fn timeout_test_std_async(time_out: bool) {
722 let config = BatchConfig {
723 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
724 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
726 };
727 let exporter = BlockingExporter {
728 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
729 delay_fn: async_std::task::sleep,
730 };
731 let mut processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
732 processor.on_end(new_test_export_span_data());
733 let flush_res = processor.force_flush();
734 if time_out {
735 assert!(flush_res.is_err());
736 } else {
737 assert!(flush_res.is_ok());
738 }
739 let shutdown_res = processor.shutdown();
740 assert!(shutdown_res.is_ok());
741 }
742
743 async fn timeout_test_tokio(time_out: bool) {
746 let config = BatchConfig {
747 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
748 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
750 };
751 let exporter = BlockingExporter {
752 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
753 delay_fn: tokio::time::sleep,
754 };
755 let mut processor =
756 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
757 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
759 let flush_res = processor.force_flush();
760 if time_out {
761 assert!(flush_res.is_err());
762 } else {
763 assert!(flush_res.is_ok());
764 }
765 let shutdown_res = processor.shutdown();
766 assert!(shutdown_res.is_ok());
767 }
768}