1use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_error, otel_warn};
43use std::cmp::min;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, Mutex};
46use std::{env, str::FromStr, time::Duration};
47
48use std::sync::atomic::AtomicBool;
49use std::thread;
50use std::time::Instant;
51
52pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
54pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(5_000);
56pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
58pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
60pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
62pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
64pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
66pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
68pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
71pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
73
74pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
78 fn on_start(&self, span: &mut Span, cx: &Context);
82 fn on_end(&self, span: SpanData);
87 fn force_flush(&self) -> OTelSdkResult;
89 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
94 fn shutdown(&self) -> OTelSdkResult {
96 self.shutdown_with_timeout(Duration::from_secs(5))
97 }
98 fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102#[derive(Debug)]
118pub struct SimpleSpanProcessor<T: SpanExporter> {
119 exporter: Mutex<T>,
120}
121
122impl<T: SpanExporter> SimpleSpanProcessor<T> {
123 pub fn new(exporter: T) -> Self {
125 Self {
126 exporter: Mutex::new(exporter),
127 }
128 }
129}
130
131impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
132 fn on_start(&self, _span: &mut Span, _cx: &Context) {
133 }
135
136 fn on_end(&self, span: SpanData) {
137 if !span.span_context.is_sampled() {
138 return;
139 }
140
141 let result = self
142 .exporter
143 .lock()
144 .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
145 .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
146
147 if let Err(err) = result {
148 otel_debug!(
150 name: "SimpleProcessor.OnEnd.Error",
151 reason = format!("{:?}", err)
152 );
153 }
154 }
155
156 fn force_flush(&self) -> OTelSdkResult {
157 Ok(())
159 }
160
161 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
162 if let Ok(mut exporter) = self.exporter.lock() {
163 exporter.shutdown_with_timeout(timeout)
164 } else {
165 Err(OTelSdkError::InternalFailure(
166 "SimpleSpanProcessor mutex poison at shutdown".into(),
167 ))
168 }
169 }
170
171 fn set_resource(&mut self, resource: &Resource) {
172 if let Ok(mut exporter) = self.exporter.lock() {
173 exporter.set_resource(resource);
174 }
175 }
176}
177
178use std::sync::mpsc::sync_channel;
239use std::sync::mpsc::Receiver;
240use std::sync::mpsc::RecvTimeoutError;
241use std::sync::mpsc::SyncSender;
242
243#[allow(clippy::large_enum_variant)]
245#[derive(Debug)]
246enum BatchMessage {
247 ExportSpan(Arc<AtomicBool>),
249 ForceFlush(SyncSender<OTelSdkResult>),
250 Shutdown(SyncSender<OTelSdkResult>),
251 SetResource(Arc<Resource>),
252}
253
254#[derive(Debug)]
285pub struct BatchSpanProcessor {
286 span_sender: SyncSender<SpanData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
289 forceflush_timeout: Duration,
290 export_span_message_sent: Arc<AtomicBool>,
291 current_batch_size: Arc<AtomicUsize>,
292 max_export_batch_size: usize,
293 dropped_spans_count: AtomicUsize,
294 max_queue_size: usize,
295}
296
297impl BatchSpanProcessor {
298 pub fn new<E>(
300 mut exporter: E,
301 config: BatchConfig,
302 ) -> Self
306 where
307 E: SpanExporter + Send + 'static,
308 {
309 let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
310 let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
312 let max_export_batch_size = config.max_export_batch_size;
313 let current_batch_size = Arc::new(AtomicUsize::new(0));
314 let current_batch_size_for_thread = current_batch_size.clone();
315
316 let handle = thread::Builder::new()
317 .name("OpenTelemetry.Traces.BatchProcessor".to_string())
318 .spawn(move || {
319 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
320 otel_debug!(
321 name: "BatchSpanProcessor.ThreadStarted",
322 interval_in_millisecs = config.scheduled_delay.as_millis(),
323 max_export_batch_size = config.max_export_batch_size,
324 max_queue_size = config.max_queue_size,
325 );
326 let mut spans = Vec::with_capacity(config.max_export_batch_size);
327 let mut last_export_time = Instant::now();
328 let current_batch_size = current_batch_size_for_thread;
329 loop {
330 let remaining_time_option = config
331 .scheduled_delay
332 .checked_sub(last_export_time.elapsed());
333 let remaining_time = match remaining_time_option {
334 Some(remaining_time) => remaining_time,
335 None => config.scheduled_delay,
336 };
337 match message_receiver.recv_timeout(remaining_time) {
338 Ok(message) => match message {
339 BatchMessage::ExportSpan(export_span_message_sent) => {
340 export_span_message_sent.store(false, Ordering::Relaxed);
342 otel_debug!(
343 name: "BatchSpanProcessor.ExportingDueToBatchSize",
344 );
345 let _ = Self::get_spans_and_export(
346 &span_receiver,
347 &exporter,
348 &mut spans,
349 &mut last_export_time,
350 ¤t_batch_size,
351 &config,
352 );
353 }
354 BatchMessage::ForceFlush(sender) => {
355 otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
356 let result = Self::get_spans_and_export(
357 &span_receiver,
358 &exporter,
359 &mut spans,
360 &mut last_export_time,
361 ¤t_batch_size,
362 &config,
363 );
364 let _ = sender.send(result);
365 }
366 BatchMessage::Shutdown(sender) => {
367 otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
368 let result = Self::get_spans_and_export(
369 &span_receiver,
370 &exporter,
371 &mut spans,
372 &mut last_export_time,
373 ¤t_batch_size,
374 &config,
375 );
376 let _ = exporter.shutdown();
377 let _ = sender.send(result);
378
379 otel_debug!(
380 name: "BatchSpanProcessor.ThreadExiting",
381 reason = "ShutdownRequested"
382 );
383 break;
387 }
388 BatchMessage::SetResource(resource) => {
389 exporter.set_resource(&resource);
390 }
391 },
392 Err(RecvTimeoutError::Timeout) => {
393 otel_debug!(
394 name: "BatchSpanProcessor.ExportingDueToTimer",
395 );
396
397 let _ = Self::get_spans_and_export(
398 &span_receiver,
399 &exporter,
400 &mut spans,
401 &mut last_export_time,
402 ¤t_batch_size,
403 &config,
404 );
405 }
406 Err(RecvTimeoutError::Disconnected) => {
407 otel_debug!(
410 name: "BatchSpanProcessor.ThreadExiting",
411 reason = "MessageSenderDisconnected"
412 );
413 break;
414 }
415 }
416 }
417 otel_debug!(
418 name: "BatchSpanProcessor.ThreadStopped"
419 );
420 })
421 .expect("Failed to spawn thread"); Self {
424 span_sender,
425 message_sender,
426 handle: Mutex::new(Some(handle)),
427 forceflush_timeout: Duration::from_secs(5), dropped_spans_count: AtomicUsize::new(0),
429 max_queue_size,
430 export_span_message_sent: Arc::new(AtomicBool::new(false)),
431 current_batch_size,
432 max_export_batch_size,
433 }
434 }
435
436 pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
438 where
439 E: SpanExporter + Send + 'static,
440 {
441 BatchSpanProcessorBuilder {
442 exporter,
443 config: BatchConfig::default(),
444 }
445 }
446
447 #[inline]
451 fn get_spans_and_export<E>(
452 spans_receiver: &Receiver<SpanData>,
453 exporter: &E,
454 spans: &mut Vec<SpanData>,
455 last_export_time: &mut Instant,
456 current_batch_size: &AtomicUsize,
457 config: &BatchConfig,
458 ) -> OTelSdkResult
459 where
460 E: SpanExporter + Send + Sync + 'static,
461 {
462 let target = current_batch_size.load(Ordering::Relaxed); let mut result = OTelSdkResult::Ok(());
464 let mut total_exported_spans: usize = 0;
465
466 while target > 0 && total_exported_spans < target {
467 while let Ok(span) = spans_receiver.try_recv() {
469 spans.push(span);
470 if spans.len() == config.max_export_batch_size {
471 break;
472 }
473 }
474
475 let count_of_spans = spans.len(); total_exported_spans += count_of_spans;
477
478 result = Self::export_batch_sync(exporter, spans, last_export_time); current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
481 }
482 result
483 }
484
485 #[allow(clippy::vec_box)]
486 fn export_batch_sync<E>(
487 exporter: &E,
488 batch: &mut Vec<SpanData>,
489 last_export_time: &mut Instant,
490 ) -> OTelSdkResult
491 where
492 E: SpanExporter + ?Sized,
493 {
494 *last_export_time = Instant::now();
495
496 if batch.is_empty() {
497 return OTelSdkResult::Ok(());
498 }
499
500 let export = exporter.export(batch.split_off(0));
507 let export_result = futures_executor::block_on(export);
508
509 match export_result {
510 Ok(_) => OTelSdkResult::Ok(()),
511 Err(err) => {
512 otel_error!(
513 name: "BatchSpanProcessor.ExportError",
514 error = format!("{}", err)
515 );
516 OTelSdkResult::Err(err)
517 }
518 }
519 }
520}
521
522impl SpanProcessor for BatchSpanProcessor {
523 fn on_start(&self, _span: &mut Span, _cx: &Context) {
525 }
527
528 fn on_end(&self, span: SpanData) {
530 let result = self.span_sender.try_send(span);
531
532 match result {
534 Ok(_) => {
535 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
539 >= self.max_export_batch_size
540 {
541 if !self.export_span_message_sent.load(Ordering::Relaxed) {
548 if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
558 match self.message_sender.try_send(BatchMessage::ExportSpan(
559 self.export_span_message_sent.clone(),
560 )) {
561 Ok(_) => {
562 }
564 Err(_err) => {
565 self.export_span_message_sent
569 .store(false, Ordering::Relaxed);
570 }
571 }
572 }
573 }
574 }
575 }
576 Err(std::sync::mpsc::TrySendError::Full(_)) => {
577 if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
580 otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
581 message = "BatchSpanProcessor dropped a Span due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
582 }
583 }
584 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
585 otel_warn!(
588 name: "BatchSpanProcessor.OnEnd.AfterShutdown",
589 message = "Spans are being emitted even after Shutdown. This indicates incorrect lifecycle management of TracerProvider in application. Spans will not be exported."
590 );
591 }
592 }
593 }
594
595 fn force_flush(&self) -> OTelSdkResult {
597 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
598 match self
599 .message_sender
600 .try_send(BatchMessage::ForceFlush(sender))
601 {
602 Ok(_) => receiver
603 .recv_timeout(self.forceflush_timeout)
604 .map_err(|err| {
605 if err == std::sync::mpsc::RecvTimeoutError::Timeout {
606 OTelSdkError::Timeout(self.forceflush_timeout)
607 } else {
608 OTelSdkError::InternalFailure(format!("{err}"))
609 }
610 })?,
611 Err(std::sync::mpsc::TrySendError::Full(_)) => {
612 otel_debug!(
614 name: "BatchSpanProcessor.ForceFlush.ControlChannelFull",
615 message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
616 );
617 Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
618 }
619 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
620 otel_debug!(
623 name: "BatchSpanProcessor.ForceFlush.AlreadyShutdown",
624 message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
625 );
626
627 Err(OTelSdkError::AlreadyShutdown)
628 }
629 }
630 }
631
632 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
634 let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
635 let max_queue_size = self.max_queue_size;
636 if dropped_spans > 0 {
637 otel_warn!(
638 name: "BatchSpanProcessor.SpansDropped",
639 dropped_span_count = dropped_spans,
640 max_queue_size = max_queue_size,
641 message = "Spans were dropped due to a queue being full. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
642 );
643 }
644
645 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
646 match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
647 Ok(_) => {
648 receiver
649 .recv_timeout(timeout)
650 .map(|_| {
651 if let Some(handle) = self.handle.lock().unwrap().take() {
654 handle.join().unwrap();
655 }
656 OTelSdkResult::Ok(())
657 })
658 .map_err(|err| match err {
659 std::sync::mpsc::RecvTimeoutError::Timeout => {
660 otel_error!(
661 name: "BatchSpanProcessor.Shutdown.Timeout",
662 message = "BatchSpanProcessor shutdown timing out."
663 );
664 OTelSdkError::Timeout(timeout)
665 }
666 _ => {
667 otel_error!(
668 name: "BatchSpanProcessor.Shutdown.Error",
669 error = format!("{}", err)
670 );
671 OTelSdkError::InternalFailure(format!("{err}"))
672 }
673 })?
674 }
675 Err(std::sync::mpsc::TrySendError::Full(_)) => {
676 otel_debug!(
678 name: "BatchSpanProcessor.Shutdown.ControlChannelFull",
679 message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
680 );
681 Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
682 }
683 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
684 otel_debug!(
687 name: "BatchSpanProcessor.Shutdown.AlreadyShutdown",
688 message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
689 );
690
691 Err(OTelSdkError::AlreadyShutdown)
692 }
693 }
694 }
695
696 fn set_resource(&mut self, resource: &Resource) {
698 let resource = Arc::new(resource.clone());
699 let _ = self
700 .message_sender
701 .try_send(BatchMessage::SetResource(resource));
702 }
703}
704
705#[derive(Debug, Default)]
707pub struct BatchSpanProcessorBuilder<E>
708where
709 E: SpanExporter + Send + 'static,
710{
711 exporter: E,
712 config: BatchConfig,
713}
714
715impl<E> BatchSpanProcessorBuilder<E>
716where
717 E: SpanExporter + Send + 'static,
718{
719 pub fn with_batch_config(self, config: BatchConfig) -> Self {
721 BatchSpanProcessorBuilder { config, ..self }
722 }
723
724 pub fn build(self) -> BatchSpanProcessor {
726 BatchSpanProcessor::new(self.exporter, self.config)
727 }
728}
729
730#[derive(Debug)]
733pub struct BatchConfig {
734 pub(crate) max_queue_size: usize,
737
738 pub(crate) scheduled_delay: Duration,
741
742 #[allow(dead_code)]
743 pub(crate) max_export_batch_size: usize,
748
749 #[allow(dead_code)]
750 pub(crate) max_export_timeout: Duration,
752
753 #[allow(dead_code)]
754 pub(crate) max_concurrent_exports: usize,
760}
761
762impl Default for BatchConfig {
763 fn default() -> Self {
764 BatchConfigBuilder::default().build()
765 }
766}
767
768#[derive(Debug)]
770pub struct BatchConfigBuilder {
771 max_queue_size: usize,
772 scheduled_delay: Duration,
773 max_export_batch_size: usize,
774 max_export_timeout: Duration,
775 max_concurrent_exports: usize,
776}
777
778impl Default for BatchConfigBuilder {
779 fn default() -> Self {
790 BatchConfigBuilder {
791 max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
792 scheduled_delay: OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
793 max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
794 max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
795 max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
796 }
797 .init_from_env_vars()
798 }
799}
800
801impl BatchConfigBuilder {
802 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
811 self.max_queue_size = max_queue_size;
812 self
813 }
814
815 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
825 self.max_export_batch_size = max_export_batch_size;
826 self
827 }
828
829 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
830 pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
842 self.max_concurrent_exports = max_concurrent_exports;
843 self
844 }
845
846 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
854 self.scheduled_delay = scheduled_delay;
855 self
856 }
857
858 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
866 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
867 self.max_export_timeout = max_export_timeout;
868 self
869 }
870
871 pub fn build(self) -> BatchConfig {
874 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
877
878 BatchConfig {
879 max_queue_size: self.max_queue_size,
880 scheduled_delay: self.scheduled_delay,
881 max_export_timeout: self.max_export_timeout,
882 max_concurrent_exports: self.max_concurrent_exports,
883 max_export_batch_size,
884 }
885 }
886
887 fn init_from_env_vars(mut self) -> Self {
888 if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
889 .ok()
890 .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
891 {
892 self.max_concurrent_exports = max_concurrent_exports;
893 }
894
895 if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
896 .ok()
897 .and_then(|queue_size| usize::from_str(&queue_size).ok())
898 {
899 self.max_queue_size = max_queue_size;
900 }
901
902 if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
903 .ok()
904 .and_then(|delay| u64::from_str(&delay).ok())
905 {
906 self.scheduled_delay = Duration::from_millis(scheduled_delay);
907 }
908
909 if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
910 .ok()
911 .and_then(|batch_size| usize::from_str(&batch_size).ok())
912 {
913 self.max_export_batch_size = max_export_batch_size;
914 }
915
916 if self.max_export_batch_size > self.max_queue_size {
919 self.max_export_batch_size = self.max_queue_size;
920 }
921
922 if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
923 .ok()
924 .and_then(|timeout| u64::from_str(&timeout).ok())
925 {
926 self.max_export_timeout = Duration::from_millis(max_export_timeout);
927 }
928
929 self
930 }
931}
932
933#[cfg(all(test, feature = "testing", feature = "trace"))]
934mod tests {
935 use super::{
937 BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
938 OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
939 OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
940 };
941 use crate::error::OTelSdkResult;
942 use crate::testing::trace::new_test_export_span_data;
943 use crate::trace::span_processor::{
944 OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
945 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
946 };
947 use crate::trace::InMemorySpanExporterBuilder;
948 use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
949 use crate::trace::{SpanData, SpanExporter};
950 use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
951 use std::fmt::Debug;
952 use std::time::Duration;
953
954 #[test]
955 fn simple_span_processor_on_end_calls_export() {
956 let exporter = InMemorySpanExporterBuilder::new().build();
957 let processor = SimpleSpanProcessor::new(exporter.clone());
958 let span_data = new_test_export_span_data();
959 processor.on_end(span_data.clone());
960 assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
961 let _result = processor.shutdown();
962 }
963
964 #[test]
965 fn simple_span_processor_on_end_skips_export_if_not_sampled() {
966 let exporter = InMemorySpanExporterBuilder::new().build();
967 let processor = SimpleSpanProcessor::new(exporter.clone());
968 let unsampled = SpanData {
969 span_context: SpanContext::empty_context(),
970 parent_span_id: SpanId::INVALID,
971 parent_span_is_remote: false,
972 span_kind: SpanKind::Internal,
973 name: "opentelemetry".into(),
974 start_time: opentelemetry::time::now(),
975 end_time: opentelemetry::time::now(),
976 attributes: Vec::new(),
977 dropped_attributes_count: 0,
978 events: SpanEvents::default(),
979 links: SpanLinks::default(),
980 status: Status::Unset,
981 instrumentation_scope: Default::default(),
982 };
983 processor.on_end(unsampled);
984 assert!(exporter.get_finished_spans().unwrap().is_empty());
985 }
986
987 #[test]
988 fn simple_span_processor_shutdown_calls_shutdown() {
989 let exporter = InMemorySpanExporterBuilder::new().build();
990 let processor = SimpleSpanProcessor::new(exporter.clone());
991 let span_data = new_test_export_span_data();
992 processor.on_end(span_data.clone());
993 assert!(!exporter.get_finished_spans().unwrap().is_empty());
994 let _result = processor.shutdown();
995 assert!(exporter.get_finished_spans().unwrap().is_empty());
997 }
998
999 #[test]
1000 fn test_default_const_values() {
1001 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
1002 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
1003 assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
1004 assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT.as_millis(), 5000);
1005 assert_eq!(
1006 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
1007 "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
1008 );
1009 assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
1010 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
1011 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30000);
1012 }
1013
1014 #[test]
1015 fn test_default_batch_config_adheres_to_specification() {
1016 let env_vars = vec![
1017 OTEL_BSP_SCHEDULE_DELAY,
1018 OTEL_BSP_EXPORT_TIMEOUT,
1019 OTEL_BSP_MAX_QUEUE_SIZE,
1020 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
1021 OTEL_BSP_MAX_CONCURRENT_EXPORTS,
1022 ];
1023
1024 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
1025
1026 assert_eq!(
1027 config.max_concurrent_exports,
1028 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
1029 );
1030 assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1031 assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1032 assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
1033 assert_eq!(
1034 config.max_export_batch_size,
1035 OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
1036 );
1037 }
1038
1039 #[test]
1040 fn test_code_based_config_overrides_env_vars() {
1041 let env_vars = vec![
1042 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
1043 (OTEL_BSP_MAX_CONCURRENT_EXPORTS, Some("5")),
1044 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1045 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
1046 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
1047 ];
1048
1049 temp_env::with_vars(env_vars, || {
1050 let config = BatchConfigBuilder::default()
1051 .with_max_export_batch_size(512)
1052 .with_max_queue_size(2048)
1053 .with_scheduled_delay(Duration::from_millis(1000));
1054 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1055 let config = {
1056 config
1057 .with_max_concurrent_exports(10)
1058 .with_max_export_timeout(Duration::from_millis(2000))
1059 };
1060 let config = config.build();
1061
1062 assert_eq!(config.max_export_batch_size, 512);
1063 assert_eq!(config.max_queue_size, 2048);
1064 assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
1065 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1066 {
1067 assert_eq!(config.max_concurrent_exports, 10);
1068 assert_eq!(config.max_export_timeout, Duration::from_millis(2000));
1069 }
1070 });
1071 }
1072
1073 #[test]
1074 fn test_batch_config_configurable_by_env_vars() {
1075 let env_vars = vec![
1076 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
1077 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
1078 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
1079 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1080 ];
1081
1082 let config = temp_env::with_vars(env_vars, BatchConfig::default);
1083
1084 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
1085 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
1086 assert_eq!(config.max_queue_size, 4096);
1087 assert_eq!(config.max_export_batch_size, 1024);
1088 }
1089
1090 #[test]
1091 fn test_batch_config_max_export_batch_size_validation() {
1092 let env_vars = vec![
1093 (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
1094 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1095 ];
1096
1097 let config = temp_env::with_vars(env_vars, BatchConfig::default);
1098
1099 assert_eq!(config.max_queue_size, 256);
1100 assert_eq!(config.max_export_batch_size, 256);
1101 assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1102 assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1103 }
1104
1105 #[test]
1106 fn test_batch_config_with_fields() {
1107 let batch = BatchConfigBuilder::default()
1108 .with_max_export_batch_size(10)
1109 .with_scheduled_delay(Duration::from_millis(10))
1110 .with_max_queue_size(10);
1111 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1112 let batch = {
1113 batch
1114 .with_max_concurrent_exports(10)
1115 .with_max_export_timeout(Duration::from_millis(10))
1116 };
1117 let batch = batch.build();
1118 assert_eq!(batch.max_export_batch_size, 10);
1119 assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
1120 assert_eq!(batch.max_queue_size, 10);
1121 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1122 {
1123 assert_eq!(batch.max_concurrent_exports, 10);
1124 assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
1125 }
1126 }
1127
1128 fn create_test_span(name: &str) -> SpanData {
1130 SpanData {
1131 span_context: SpanContext::empty_context(),
1132 parent_span_id: SpanId::INVALID,
1133 parent_span_is_remote: false,
1134 span_kind: SpanKind::Internal,
1135 name: name.to_string().into(),
1136 start_time: opentelemetry::time::now(),
1137 end_time: opentelemetry::time::now(),
1138 attributes: Vec::new(),
1139 dropped_attributes_count: 0,
1140 events: SpanEvents::default(),
1141 links: SpanLinks::default(),
1142 status: Status::Unset,
1143 instrumentation_scope: Default::default(),
1144 }
1145 }
1146
1147 use crate::Resource;
1148 use opentelemetry::{Key, KeyValue, Value};
1149 use std::sync::{atomic::Ordering, Arc, Mutex};
1150
1151 #[derive(Debug)]
1153 struct MockSpanExporter {
1154 exported_spans: Arc<Mutex<Vec<SpanData>>>,
1155 exported_resource: Arc<Mutex<Option<Resource>>>,
1156 }
1157
1158 impl MockSpanExporter {
1159 fn new() -> Self {
1160 Self {
1161 exported_spans: Arc::new(Mutex::new(Vec::new())),
1162 exported_resource: Arc::new(Mutex::new(None)),
1163 }
1164 }
1165 }
1166
1167 impl SpanExporter for MockSpanExporter {
1168 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1169 let exported_spans = self.exported_spans.clone();
1170 exported_spans.lock().unwrap().extend(batch);
1171 Ok(())
1172 }
1173
1174 fn shutdown(&mut self) -> OTelSdkResult {
1175 Ok(())
1176 }
1177 fn set_resource(&mut self, resource: &Resource) {
1178 let mut exported_resource = self.exported_resource.lock().unwrap();
1179 *exported_resource = Some(resource.clone());
1180 }
1181 }
1182
1183 #[test]
1184 fn batchspanprocessor_handles_on_end() {
1185 let exporter = MockSpanExporter::new();
1186 let exporter_shared = exporter.exported_spans.clone();
1187 let config = BatchConfigBuilder::default()
1188 .with_max_queue_size(10)
1189 .with_max_export_batch_size(10)
1190 .with_scheduled_delay(Duration::from_secs(5))
1191 .build();
1192 let processor = BatchSpanProcessor::new(exporter, config);
1193
1194 let test_span = create_test_span("test_span");
1195 processor.on_end(test_span.clone());
1196
1197 std::thread::sleep(Duration::from_secs(6));
1199
1200 let exported_spans = exporter_shared.lock().unwrap();
1201 assert_eq!(exported_spans.len(), 1);
1202 assert_eq!(exported_spans[0].name, "test_span");
1203 }
1204
1205 #[test]
1206 fn batchspanprocessor_force_flush() {
1207 let exporter = MockSpanExporter::new();
1208 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1210 .with_max_queue_size(10)
1211 .with_max_export_batch_size(10)
1212 .with_scheduled_delay(Duration::from_secs(5))
1213 .build();
1214 let processor = BatchSpanProcessor::new(exporter, config);
1215
1216 let test_span = create_test_span("force_flush_span");
1218 processor.on_end(test_span.clone());
1219
1220 let flush_result = processor.force_flush();
1222 assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1223
1224 let exported_spans = exporter_shared.lock().unwrap();
1226 assert_eq!(
1227 exported_spans.len(),
1228 1,
1229 "Unexpected number of exported spans"
1230 );
1231 assert_eq!(exported_spans[0].name, "force_flush_span");
1232 }
1233
1234 #[test]
1235 fn batchspanprocessor_shutdown() {
1236 let exporter = InMemorySpanExporterBuilder::new()
1238 .keep_records_on_shutdown()
1239 .build();
1240 let processor = BatchSpanProcessor::new(exporter.clone(), BatchConfig::default());
1241
1242 let record = create_test_span("test_span");
1243
1244 processor.on_end(record);
1245 processor.force_flush().unwrap();
1246 processor.shutdown().unwrap();
1247
1248 processor.on_end(create_test_span("after_shutdown_span"));
1250
1251 assert_eq!(1, exporter.get_finished_spans().unwrap().len());
1252 assert!(exporter.is_shutdown_called());
1253 }
1254
1255 #[test]
1256 fn batchspanprocessor_handles_dropped_spans() {
1257 let exporter = MockSpanExporter::new();
1258 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1260 .with_max_queue_size(2) .with_max_export_batch_size(512) .with_scheduled_delay(Duration::from_secs(5))
1263 .build();
1264 let processor = BatchSpanProcessor::new(exporter, config);
1265
1266 let span1 = create_test_span("span1");
1268 let span2 = create_test_span("span2");
1269 let span3 = create_test_span("span3"); processor.on_end(span1.clone());
1272 processor.on_end(span2.clone());
1273 processor.on_end(span3.clone()); std::thread::sleep(Duration::from_secs(6));
1277
1278 let exported_spans = exporter_shared.lock().unwrap();
1279
1280 assert_eq!(
1282 exported_spans.len(),
1283 2,
1284 "Unexpected number of exported spans"
1285 );
1286 assert!(exported_spans.iter().any(|s| s.name == "span1"));
1287 assert!(exported_spans.iter().any(|s| s.name == "span2"));
1288
1289 assert!(
1291 !exported_spans.iter().any(|s| s.name == "span3"),
1292 "Span3 should have been dropped"
1293 );
1294
1295 let dropped_count = processor.dropped_spans_count.load(Ordering::Relaxed);
1297 assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1298
1299 let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed);
1301 assert_eq!(current_batch_size, 0, "Unexpected current batch size");
1302 }
1303
1304 #[test]
1305 fn validate_span_attributes_exported_correctly() {
1306 let exporter = MockSpanExporter::new();
1307 let exporter_shared = exporter.exported_spans.clone();
1308 let config = BatchConfigBuilder::default().build();
1309 let processor = BatchSpanProcessor::new(exporter, config);
1310
1311 let mut span_data = create_test_span("attribute_validation");
1313 span_data.attributes = vec![
1314 KeyValue::new("key1", "value1"),
1315 KeyValue::new("key2", "value2"),
1316 ];
1317 processor.on_end(span_data.clone());
1318
1319 let _ = processor.force_flush();
1321
1322 let exported_spans = exporter_shared.lock().unwrap();
1324 assert_eq!(exported_spans.len(), 1);
1325 let exported_span = &exported_spans[0];
1326 assert!(exported_span
1327 .attributes
1328 .contains(&KeyValue::new("key1", "value1")));
1329 assert!(exported_span
1330 .attributes
1331 .contains(&KeyValue::new("key2", "value2")));
1332 }
1333
1334 #[test]
1335 fn batchspanprocessor_sets_and_exports_with_resource() {
1336 let exporter = MockSpanExporter::new();
1337 let exporter_shared = exporter.exported_spans.clone();
1338 let resource_shared = exporter.exported_resource.clone();
1339 let config = BatchConfigBuilder::default().build();
1340 let mut processor = BatchSpanProcessor::new(exporter, config);
1341
1342 let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1344 processor.set_resource(&resource);
1345
1346 let test_span = create_test_span("resource_test");
1348 processor.on_end(test_span.clone());
1349
1350 let _ = processor.force_flush();
1352
1353 let exported_spans = exporter_shared.lock().unwrap();
1355 assert_eq!(exported_spans.len(), 1);
1356
1357 let exported_resource = resource_shared.lock().unwrap();
1359 assert!(exported_resource.is_some());
1360 assert_eq!(
1361 exported_resource
1362 .as_ref()
1363 .unwrap()
1364 .get(&Key::new("service.name")),
1365 Some(Value::from("test_service"))
1366 );
1367 }
1368
1369 #[tokio::test(flavor = "current_thread")]
1370 async fn test_batch_processor_current_thread_runtime() {
1371 let exporter = MockSpanExporter::new();
1372 let exporter_shared = exporter.exported_spans.clone();
1373
1374 let config = BatchConfigBuilder::default()
1375 .with_max_queue_size(5)
1376 .with_max_export_batch_size(3)
1377 .build();
1378
1379 let processor = BatchSpanProcessor::new(exporter, config);
1380
1381 for _ in 0..4 {
1382 let span = new_test_export_span_data();
1383 processor.on_end(span);
1384 }
1385
1386 processor.force_flush().unwrap();
1387
1388 let exported_spans = exporter_shared.lock().unwrap();
1389 assert_eq!(exported_spans.len(), 4);
1390 }
1391
1392 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1393 async fn test_batch_processor_multi_thread_count_1_runtime() {
1394 let exporter = MockSpanExporter::new();
1395 let exporter_shared = exporter.exported_spans.clone();
1396
1397 let config = BatchConfigBuilder::default()
1398 .with_max_queue_size(5)
1399 .with_max_export_batch_size(3)
1400 .build();
1401
1402 let processor = BatchSpanProcessor::new(exporter, config);
1403
1404 for _ in 0..4 {
1405 let span = new_test_export_span_data();
1406 processor.on_end(span);
1407 }
1408
1409 processor.force_flush().unwrap();
1410
1411 let exported_spans = exporter_shared.lock().unwrap();
1412 assert_eq!(exported_spans.len(), 4);
1413 }
1414
1415 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1416 async fn test_batch_processor_multi_thread() {
1417 let exporter = MockSpanExporter::new();
1418 let exporter_shared = exporter.exported_spans.clone();
1419
1420 let config = BatchConfigBuilder::default()
1421 .with_max_queue_size(20)
1422 .with_max_export_batch_size(5)
1423 .build();
1424
1425 let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1427
1428 let mut handles = vec![];
1429 for _ in 0..10 {
1430 let processor_clone = Arc::clone(&processor);
1431 let handle = tokio::spawn(async move {
1432 let span = new_test_export_span_data();
1433 processor_clone.on_end(span);
1434 });
1435 handles.push(handle);
1436 }
1437
1438 for handle in handles {
1439 handle.await.unwrap();
1440 }
1441
1442 processor.force_flush().unwrap();
1443
1444 let exported_spans = exporter_shared.lock().unwrap();
1446 assert_eq!(exported_spans.len(), 10);
1447 }
1448}