opentelemetry_sdk/trace/
span_processor.rs

1//! # OpenTelemetry Span Processor Interface
2//!
3//! Span processor is an interface which allows hooks for span start and end method
4//! invocations. The span processors are invoked only when
5//! [`is_recording`] is true.
6//!
7//! Built-in span processors are responsible for batching and conversion of spans to
8//! exportable representation and passing batches to exporters.
9//!
10//! Span processors can be registered directly on SDK [`TracerProvider`] and they are
11//! invoked in the same order as they were registered.
12//!
13//! All `Tracer` instances created by a `TracerProvider` share the same span processors.
14//! Changes to this collection reflect in all `Tracer` instances.
15//!
16//! The following diagram shows `SpanProcessor`'s relationship to other components
17//! in the SDK:
18//!
19//! ```ascii
20//!   +-----+--------------+   +-----------------------+   +-------------------+
21//!   |     |              |   |                       |   |                   |
22//!   |     |              |   | (Batch)SpanProcessor  |   |    SpanExporter   |
23//!   |     |              +---> (Simple)SpanProcessor +--->  (OTLPExporter)   |
24//!   |     |              |   |                       |   |                   |
25//!   | SDK | Tracer.span()|   +-----------------------+   +-------------------+
26//!   |     | Span.end()   |
27//!   |     |              |
28//!   |     |              |
29//!   |     |              |
30//!   |     |              |
31//!   +-----+--------------+
32//! ```
33//!
34//! [`is_recording`]: opentelemetry::trace::Span::is_recording()
35//! [`TracerProvider`]: opentelemetry::trace::TracerProvider
36
37use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_error, otel_warn};
43use std::cmp::min;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, Mutex};
46use std::{env, str::FromStr, time::Duration};
47
48use std::sync::atomic::AtomicBool;
49use std::thread;
50use std::time::Instant;
51
52/// Delay interval between two consecutive exports.
53pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
54/// Default delay interval between two consecutive exports.
55pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(5_000);
56/// Maximum queue size
57pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
58/// Default maximum queue size
59pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
60/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE
61pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
62/// Default maximum batch size
63pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
64/// Maximum allowed time to export data.
65pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
66/// Default maximum allowed time to export data.
67pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
68/// Environment variable to configure max concurrent exports for batch span
69/// processor.
70pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
71/// Default max concurrent exports for BSP
72pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
73
74/// `SpanProcessor` is an interface which allows hooks for span start and end
75/// method invocations. The span processors are invoked only when is_recording
76/// is true.
77pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
78    /// `on_start` is called when a `Span` is started.  This method is called
79    /// synchronously on the thread that started the span, therefore it should
80    /// not block or throw exceptions.
81    fn on_start(&self, span: &mut Span, cx: &Context);
82    /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
83    /// already set). This method is called synchronously within the `Span::end`
84    /// API, therefore it should not block or throw an exception.
85    /// TODO - This method should take reference to `SpanData`
86    fn on_end(&self, span: SpanData);
87    /// Force the spans lying in the cache to be exported.
88    fn force_flush(&self) -> OTelSdkResult;
89    /// Shuts down the processor. Called when SDK is shut down. This is an
90    /// opportunity for processors to do any cleanup required.
91    ///
92    /// Implementation should make sure shutdown can be called multiple times.
93    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
94    /// shutdown the processor with a default timeout.
95    fn shutdown(&self) -> OTelSdkResult {
96        self.shutdown_with_timeout(Duration::from_secs(5))
97    }
98    /// Set the resource for the span processor.
99    fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102/// A [SpanProcessor] that passes finished spans to the configured
103/// `SpanExporter`, as soon as they are finished, without any batching. This is
104/// typically useful for debugging and testing. For scenarios requiring higher
105/// performance/throughput, consider using [BatchSpanProcessor].
106/// Spans are exported synchronously
107/// in the same thread that emits the log record.
108/// When using this processor with the OTLP Exporter, the following exporter
109/// features are supported:
110/// - `grpc-tonic`: This requires TracerProvider to be created within a tokio
111///   runtime. Spans can be emitted from any thread, including tokio runtime
112///   threads.
113/// - `reqwest-blocking-client`: TracerProvider may be created anywhere, but
114///   spans must be emitted from a non-tokio runtime thread.
115/// - `reqwest-client`: TracerProvider may be created anywhere, but spans must be
116///   emitted from a tokio runtime thread.
117#[derive(Debug)]
118pub struct SimpleSpanProcessor<T: SpanExporter> {
119    exporter: Mutex<T>,
120}
121
122impl<T: SpanExporter> SimpleSpanProcessor<T> {
123    /// Create a new [SimpleSpanProcessor] using the provided exporter.
124    pub fn new(exporter: T) -> Self {
125        Self {
126            exporter: Mutex::new(exporter),
127        }
128    }
129}
130
131impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
132    fn on_start(&self, _span: &mut Span, _cx: &Context) {
133        // Ignored
134    }
135
136    fn on_end(&self, span: SpanData) {
137        if !span.span_context.is_sampled() {
138            return;
139        }
140
141        let result = self
142            .exporter
143            .lock()
144            .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
145            .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
146
147        if let Err(err) = result {
148            // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug`
149            otel_debug!(
150                name: "SimpleProcessor.OnEnd.Error",
151                reason = format!("{:?}", err)
152            );
153        }
154    }
155
156    fn force_flush(&self) -> OTelSdkResult {
157        // Nothing to flush for simple span processor.
158        Ok(())
159    }
160
161    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
162        if let Ok(mut exporter) = self.exporter.lock() {
163            exporter.shutdown_with_timeout(timeout)
164        } else {
165            Err(OTelSdkError::InternalFailure(
166                "SimpleSpanProcessor mutex poison at shutdown".into(),
167            ))
168        }
169    }
170
171    fn set_resource(&mut self, resource: &Resource) {
172        if let Ok(mut exporter) = self.exporter.lock() {
173            exporter.set_resource(resource);
174        }
175    }
176}
177
178/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
179/// in batches to the configured `SpanExporter`. This processor is ideal for
180/// high-throughput environments, as it minimizes the overhead of exporting spans
181/// individually. It uses a **dedicated background thread** to manage and export spans
182/// asynchronously, ensuring that the application's main execution flow is not blocked.
183///
184/// When using this processor with the OTLP Exporter, the following exporter
185/// features are supported:
186/// - `grpc-tonic`: This requires `TracerProvider` to be created within a tokio
187///   runtime.
188/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
189///
190/// In other words, other clients like `reqwest` and `hyper` are not supported.
191/// /// # Example
192///
193/// This example demonstrates how to configure and use the `BatchSpanProcessor`
194/// with a custom configuration. Note that a dedicated thread is used internally
195/// to manage the export process.
196///
197/// ```rust
198/// use opentelemetry::global;
199/// use opentelemetry_sdk::{
200///     trace::{BatchSpanProcessor, BatchConfigBuilder, SdkTracerProvider},
201///     runtime,
202///     testing::trace::NoopSpanExporter,
203/// };
204/// use opentelemetry::trace::Tracer as _;
205/// use opentelemetry::trace::Span;
206/// use std::time::Duration;
207///
208/// fn main() {
209///     // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration).
210///     let exporter = NoopSpanExporter::new();
211///
212///     // Step 2: Configure the BatchSpanProcessor.
213///     let batch_processor = BatchSpanProcessor::builder(exporter)
214///         .with_batch_config(
215///             BatchConfigBuilder::default()
216///                 .with_max_queue_size(1024) // Buffer up to 1024 spans.
217///                 .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
218///                 .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
219///                 .build(),
220///         )
221///         .build();
222///
223///     // Step 3: Set up a TracerProvider with the configured processor.
224///     let provider = SdkTracerProvider::builder()
225///         .with_span_processor(batch_processor)
226///         .build();
227///     global::set_tracer_provider(provider.clone());
228///
229///     // Step 4: Create spans and record operations.
230///     let tracer = global::tracer("example-tracer");
231///     let mut span = tracer.start("example-span");
232///     span.end(); // Mark the span as completed.
233///
234///     // Step 5: Ensure all spans are flushed before exiting.
235///     provider.shutdown();
236/// }
237/// ```
238use std::sync::mpsc::sync_channel;
239use std::sync::mpsc::Receiver;
240use std::sync::mpsc::RecvTimeoutError;
241use std::sync::mpsc::SyncSender;
242
243/// Messages exchanged between the main thread and the background thread.
244#[allow(clippy::large_enum_variant)]
245#[derive(Debug)]
246enum BatchMessage {
247    //ExportSpan(SpanData),
248    ExportSpan(Arc<AtomicBool>),
249    ForceFlush(SyncSender<OTelSdkResult>),
250    Shutdown(SyncSender<OTelSdkResult>),
251    SetResource(Arc<Resource>),
252}
253
254/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
255/// in batches to the configured `SpanExporter`. This processor is ideal for
256/// high-throughput environments, as it minimizes the overhead of exporting spans
257/// individually. It uses a **dedicated background thread** to manage and export spans
258/// asynchronously, ensuring that the application's main execution flow is not blocked.
259///
260/// This processor supports the following configurations:
261/// - **Queue size**: Maximum number of spans that can be buffered.
262/// - **Batch size**: Maximum number of spans to include in a single export.
263/// - **Scheduled delay**: Frequency at which the batch is exported.
264///
265/// When using this processor with the OTLP Exporter, the following exporter
266/// features are supported:
267/// - `grpc-tonic`: Requires `TracerProvider` to be created within a tokio runtime.
268/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
269///
270/// In other words, other clients like `reqwest` and `hyper` are not supported.
271///
272/// `BatchSpanProcessor` buffers spans in memory and exports them in batches. An
273/// export is triggered when `max_export_batch_size` is reached or every
274/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
275/// the `force_flush` method. Shutdown also triggers an export of all buffered
276/// spans and is recommended to be called before the application exits to ensure
277/// all buffered spans are exported.
278///
279/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
280/// is a blocking call ,should not be called from your main thread. This can
281/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
282/// tokio's `spawn_blocking`.
283///
284#[derive(Debug)]
285pub struct BatchSpanProcessor {
286    span_sender: SyncSender<SpanData>, // Data channel to store spans
287    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
288    handle: Mutex<Option<thread::JoinHandle<()>>>,
289    forceflush_timeout: Duration,
290    export_span_message_sent: Arc<AtomicBool>,
291    current_batch_size: Arc<AtomicUsize>,
292    max_export_batch_size: usize,
293    dropped_spans_count: AtomicUsize,
294    max_queue_size: usize,
295}
296
297impl BatchSpanProcessor {
298    /// Creates a new instance of `BatchSpanProcessor`.
299    pub fn new<E>(
300        mut exporter: E,
301        config: BatchConfig,
302        //max_queue_size: usize,
303        //scheduled_delay: Duration,
304        //shutdown_timeout: Duration,
305    ) -> Self
306    where
307        E: SpanExporter + Send + 'static,
308    {
309        let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
310        let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
311        let max_queue_size = config.max_queue_size;
312        let max_export_batch_size = config.max_export_batch_size;
313        let current_batch_size = Arc::new(AtomicUsize::new(0));
314        let current_batch_size_for_thread = current_batch_size.clone();
315
316        let handle = thread::Builder::new()
317            .name("OpenTelemetry.Traces.BatchProcessor".to_string())
318            .spawn(move || {
319                let _suppress_guard = Context::enter_telemetry_suppressed_scope();
320                otel_debug!(
321                    name: "BatchSpanProcessor.ThreadStarted",
322                    interval_in_millisecs = config.scheduled_delay.as_millis(),
323                    max_export_batch_size = config.max_export_batch_size,
324                    max_queue_size = config.max_queue_size,
325                );
326                let mut spans = Vec::with_capacity(config.max_export_batch_size);
327                let mut last_export_time = Instant::now();
328                let current_batch_size = current_batch_size_for_thread;
329                loop {
330                    let remaining_time_option = config
331                        .scheduled_delay
332                        .checked_sub(last_export_time.elapsed());
333                    let remaining_time = match remaining_time_option {
334                        Some(remaining_time) => remaining_time,
335                        None => config.scheduled_delay,
336                    };
337                    match message_receiver.recv_timeout(remaining_time) {
338                        Ok(message) => match message {
339                            BatchMessage::ExportSpan(export_span_message_sent) => {
340                                // Reset the export span message sent flag now it has has been processed.
341                                export_span_message_sent.store(false, Ordering::Relaxed);
342                                otel_debug!(
343                                    name: "BatchSpanProcessor.ExportingDueToBatchSize",
344                                );
345                                let _ = Self::get_spans_and_export(
346                                    &span_receiver,
347                                    &exporter,
348                                    &mut spans,
349                                    &mut last_export_time,
350                                    &current_batch_size,
351                                    &config,
352                                );
353                            }
354                            BatchMessage::ForceFlush(sender) => {
355                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
356                                let result = Self::get_spans_and_export(
357                                    &span_receiver,
358                                    &exporter,
359                                    &mut spans,
360                                    &mut last_export_time,
361                                    &current_batch_size,
362                                    &config,
363                                );
364                                let _ = sender.send(result);
365                            }
366                            BatchMessage::Shutdown(sender) => {
367                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
368                                let result = Self::get_spans_and_export(
369                                    &span_receiver,
370                                    &exporter,
371                                    &mut spans,
372                                    &mut last_export_time,
373                                    &current_batch_size,
374                                    &config,
375                                );
376                                let _ = exporter.shutdown();
377                                let _ = sender.send(result);
378
379                                otel_debug!(
380                                    name: "BatchSpanProcessor.ThreadExiting",
381                                    reason = "ShutdownRequested"
382                                );
383                                //
384                                // break out the loop and return from the current background thread.
385                                //
386                                break;
387                            }
388                            BatchMessage::SetResource(resource) => {
389                                exporter.set_resource(&resource);
390                            }
391                        },
392                        Err(RecvTimeoutError::Timeout) => {
393                            otel_debug!(
394                                name: "BatchSpanProcessor.ExportingDueToTimer",
395                            );
396
397                            let _ = Self::get_spans_and_export(
398                                &span_receiver,
399                                &exporter,
400                                &mut spans,
401                                &mut last_export_time,
402                                &current_batch_size,
403                                &config,
404                            );
405                        }
406                        Err(RecvTimeoutError::Disconnected) => {
407                            // Channel disconnected, only thing to do is break
408                            // out (i.e exit the thread)
409                            otel_debug!(
410                                name: "BatchSpanProcessor.ThreadExiting",
411                                reason = "MessageSenderDisconnected"
412                            );
413                            break;
414                        }
415                    }
416                }
417                otel_debug!(
418                    name: "BatchSpanProcessor.ThreadStopped"
419                );
420            })
421            .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
422
423        Self {
424            span_sender,
425            message_sender,
426            handle: Mutex::new(Some(handle)),
427            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
428            dropped_spans_count: AtomicUsize::new(0),
429            max_queue_size,
430            export_span_message_sent: Arc::new(AtomicBool::new(false)),
431            current_batch_size,
432            max_export_batch_size,
433        }
434    }
435
436    /// builder
437    pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
438    where
439        E: SpanExporter + Send + 'static,
440    {
441        BatchSpanProcessorBuilder {
442            exporter,
443            config: BatchConfig::default(),
444        }
445    }
446
447    // This method gets up to `max_export_batch_size` amount of spans from the channel and exports them.
448    // It returns the result of the export operation.
449    // It expects the spans vec to be empty when it's called.
450    #[inline]
451    fn get_spans_and_export<E>(
452        spans_receiver: &Receiver<SpanData>,
453        exporter: &E,
454        spans: &mut Vec<SpanData>,
455        last_export_time: &mut Instant,
456        current_batch_size: &AtomicUsize,
457        config: &BatchConfig,
458    ) -> OTelSdkResult
459    where
460        E: SpanExporter + Send + Sync + 'static,
461    {
462        let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting spans.
463        let mut result = OTelSdkResult::Ok(());
464        let mut total_exported_spans: usize = 0;
465
466        while target > 0 && total_exported_spans < target {
467            // Get up to `max_export_batch_size` amount of spans from the channel and push them to the spans vec
468            while let Ok(span) = spans_receiver.try_recv() {
469                spans.push(span);
470                if spans.len() == config.max_export_batch_size {
471                    break;
472                }
473            }
474
475            let count_of_spans = spans.len(); // Count of spans that will be exported
476            total_exported_spans += count_of_spans;
477
478            result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting
479
480            current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
481        }
482        result
483    }
484
485    #[allow(clippy::vec_box)]
486    fn export_batch_sync<E>(
487        exporter: &E,
488        batch: &mut Vec<SpanData>,
489        last_export_time: &mut Instant,
490    ) -> OTelSdkResult
491    where
492        E: SpanExporter + ?Sized,
493    {
494        *last_export_time = Instant::now();
495
496        if batch.is_empty() {
497            return OTelSdkResult::Ok(());
498        }
499
500        // Splitting off batch clears the existing batch capacity, and is ready
501        // for re-use in the next export. The newly returned vec! from split_off
502        // is passed to the exporter.
503        // TODO: Compared to Logs, this requires new allocation for vec for
504        // every export. See if this can be optimized by
505        // *not* requiring ownership in the exporter.
506        let export = exporter.export(batch.split_off(0));
507        let export_result = futures_executor::block_on(export);
508
509        match export_result {
510            Ok(_) => OTelSdkResult::Ok(()),
511            Err(err) => {
512                otel_error!(
513                    name: "BatchSpanProcessor.ExportError",
514                    error = format!("{}", err)
515                );
516                OTelSdkResult::Err(err)
517            }
518        }
519    }
520}
521
522impl SpanProcessor for BatchSpanProcessor {
523    /// Handles span start.
524    fn on_start(&self, _span: &mut Span, _cx: &Context) {
525        // Ignored
526    }
527
528    /// Handles span end.
529    fn on_end(&self, span: SpanData) {
530        let result = self.span_sender.try_send(span);
531
532        // match for result and handle each separately
533        match result {
534            Ok(_) => {
535                // Successfully sent the span to the data channel.
536                // Increment the current batch size and check if it has reached
537                // the max export batch size.
538                if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
539                    >= self.max_export_batch_size
540                {
541                    // Check if the a control message for exporting spans is
542                    // already sent to the worker thread. If not, send a control
543                    // message to export spans. `export_span_message_sent` is set
544                    // to false ONLY when the worker thread has processed the
545                    // control message.
546
547                    if !self.export_span_message_sent.load(Ordering::Relaxed) {
548                        // This is a cost-efficient check as atomic load
549                        // operations do not require exclusive access to cache
550                        // line. Perform atomic swap to
551                        // `export_span_message_sent` ONLY when the atomic load
552                        // operation above returns false. Atomic
553                        // swap/compare_exchange operations require exclusive
554                        // access to cache line on most processor architectures.
555                        // We could have used compare_exchange as well here, but
556                        // it's more verbose than swap.
557                        if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
558                            match self.message_sender.try_send(BatchMessage::ExportSpan(
559                                self.export_span_message_sent.clone(),
560                            )) {
561                                Ok(_) => {
562                                    // Control message sent successfully.
563                                }
564                                Err(_err) => {
565                                    // TODO: Log error If the control message
566                                    // could not be sent, reset the
567                                    // `export_span_message_sent` flag.
568                                    self.export_span_message_sent
569                                        .store(false, Ordering::Relaxed);
570                                }
571                            }
572                        }
573                    }
574                }
575            }
576            Err(std::sync::mpsc::TrySendError::Full(_)) => {
577                // Increment dropped spans count. The first time we have to drop
578                // a span, emit a warning.
579                if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
580                    otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
581                        message = "BatchSpanProcessor dropped a Span due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
582                }
583            }
584            Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
585                // Given background thread is the only receiver, and it's
586                // disconnected, it indicates the thread is shutdown
587                otel_warn!(
588                    name: "BatchSpanProcessor.OnEnd.AfterShutdown",
589                    message = "Spans are being emitted even after Shutdown. This indicates incorrect lifecycle management of TracerProvider in application. Spans will not be exported."
590                );
591            }
592        }
593    }
594
595    /// Flushes all pending spans.
596    fn force_flush(&self) -> OTelSdkResult {
597        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
598        match self
599            .message_sender
600            .try_send(BatchMessage::ForceFlush(sender))
601        {
602            Ok(_) => receiver
603                .recv_timeout(self.forceflush_timeout)
604                .map_err(|err| {
605                    if err == std::sync::mpsc::RecvTimeoutError::Timeout {
606                        OTelSdkError::Timeout(self.forceflush_timeout)
607                    } else {
608                        OTelSdkError::InternalFailure(format!("{err}"))
609                    }
610                })?,
611            Err(std::sync::mpsc::TrySendError::Full(_)) => {
612                // If the control message could not be sent, emit a warning.
613                otel_debug!(
614                    name: "BatchSpanProcessor.ForceFlush.ControlChannelFull",
615                    message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
616                );
617                Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
618            }
619            Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
620                // Given background thread is the only receiver, and it's
621                // disconnected, it indicates the thread is shutdown
622                otel_debug!(
623                    name: "BatchSpanProcessor.ForceFlush.AlreadyShutdown",
624                    message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
625                );
626
627                Err(OTelSdkError::AlreadyShutdown)
628            }
629        }
630    }
631
632    /// Shuts down the processor.
633    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
634        let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
635        let max_queue_size = self.max_queue_size;
636        if dropped_spans > 0 {
637            otel_warn!(
638                name: "BatchSpanProcessor.SpansDropped",
639                dropped_span_count = dropped_spans,
640                max_queue_size = max_queue_size,
641                message = "Spans were dropped due to a queue being full. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
642            );
643        }
644
645        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
646        match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
647            Ok(_) => {
648                receiver
649                    .recv_timeout(timeout)
650                    .map(|_| {
651                        // join the background thread after receiving back the
652                        // shutdown signal
653                        if let Some(handle) = self.handle.lock().unwrap().take() {
654                            handle.join().unwrap();
655                        }
656                        OTelSdkResult::Ok(())
657                    })
658                    .map_err(|err| match err {
659                        std::sync::mpsc::RecvTimeoutError::Timeout => {
660                            otel_error!(
661                                name: "BatchSpanProcessor.Shutdown.Timeout",
662                                message = "BatchSpanProcessor shutdown timing out."
663                            );
664                            OTelSdkError::Timeout(timeout)
665                        }
666                        _ => {
667                            otel_error!(
668                                name: "BatchSpanProcessor.Shutdown.Error",
669                                error = format!("{}", err)
670                            );
671                            OTelSdkError::InternalFailure(format!("{err}"))
672                        }
673                    })?
674            }
675            Err(std::sync::mpsc::TrySendError::Full(_)) => {
676                // If the control message could not be sent, emit a warning.
677                otel_debug!(
678                    name: "BatchSpanProcessor.Shutdown.ControlChannelFull",
679                    message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
680                );
681                Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
682            }
683            Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
684                // Given background thread is the only receiver, and it's
685                // disconnected, it indicates the thread is shutdown
686                otel_debug!(
687                    name: "BatchSpanProcessor.Shutdown.AlreadyShutdown",
688                    message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
689                );
690
691                Err(OTelSdkError::AlreadyShutdown)
692            }
693        }
694    }
695
696    /// Set the resource for the processor.
697    fn set_resource(&mut self, resource: &Resource) {
698        let resource = Arc::new(resource.clone());
699        let _ = self
700            .message_sender
701            .try_send(BatchMessage::SetResource(resource));
702    }
703}
704
705/// Builder for `BatchSpanProcessorDedicatedThread`.
706#[derive(Debug, Default)]
707pub struct BatchSpanProcessorBuilder<E>
708where
709    E: SpanExporter + Send + 'static,
710{
711    exporter: E,
712    config: BatchConfig,
713}
714
715impl<E> BatchSpanProcessorBuilder<E>
716where
717    E: SpanExporter + Send + 'static,
718{
719    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
720    pub fn with_batch_config(self, config: BatchConfig) -> Self {
721        BatchSpanProcessorBuilder { config, ..self }
722    }
723
724    /// Build a new instance of `BatchSpanProcessor`.
725    pub fn build(self) -> BatchSpanProcessor {
726        BatchSpanProcessor::new(self.exporter, self.config)
727    }
728}
729
730/// Batch span processor configuration.
731/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
732#[derive(Debug)]
733pub struct BatchConfig {
734    /// The maximum queue size to buffer spans for delayed processing. If the
735    /// queue gets full it drops the spans. The default value of is 2048.
736    pub(crate) max_queue_size: usize,
737
738    /// The delay interval in milliseconds between two consecutive processing
739    /// of batches. The default value is 5 seconds.
740    pub(crate) scheduled_delay: Duration,
741
742    #[allow(dead_code)]
743    /// The maximum number of spans to process in a single batch. If there are
744    /// more than one batch worth of spans then it processes multiple batches
745    /// of spans one batch after the other without any delay. The default value
746    /// is 512.
747    pub(crate) max_export_batch_size: usize,
748
749    #[allow(dead_code)]
750    /// The maximum duration to export a batch of data.
751    pub(crate) max_export_timeout: Duration,
752
753    #[allow(dead_code)]
754    /// Maximum number of concurrent exports
755    ///
756    /// Limits the number of spawned tasks for exports and thus memory consumed
757    /// by an exporter. A value of 1 will cause exports to be performed
758    /// synchronously on the BatchSpanProcessor task.
759    pub(crate) max_concurrent_exports: usize,
760}
761
762impl Default for BatchConfig {
763    fn default() -> Self {
764        BatchConfigBuilder::default().build()
765    }
766}
767
768/// A builder for creating [`BatchConfig`] instances.
769#[derive(Debug)]
770pub struct BatchConfigBuilder {
771    max_queue_size: usize,
772    scheduled_delay: Duration,
773    max_export_batch_size: usize,
774    max_export_timeout: Duration,
775    max_concurrent_exports: usize,
776}
777
778impl Default for BatchConfigBuilder {
779    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
780    /// The values are overriden by environment variables if set.
781    /// The supported environment variables are:
782    /// * `OTEL_BSP_MAX_QUEUE_SIZE`
783    /// * `OTEL_BSP_SCHEDULE_DELAY`
784    /// * `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
785    /// * `OTEL_BSP_EXPORT_TIMEOUT`
786    /// * `OTEL_BSP_MAX_CONCURRENT_EXPORTS`
787    ///
788    /// Note: Programmatic configuration overrides any value set via the environment variable.
789    fn default() -> Self {
790        BatchConfigBuilder {
791            max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
792            scheduled_delay: OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
793            max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
794            max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
795            max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
796        }
797        .init_from_env_vars()
798    }
799}
800
801impl BatchConfigBuilder {
802    /// Set max_queue_size for [`BatchConfigBuilder`].
803    /// It's the maximum queue size to buffer spans for delayed processing.
804    /// If the queue gets full it will drops the spans.
805    /// The default value is 2048.
806    ///
807    /// Corresponding environment variable: `OTEL_BSP_MAX_QUEUE_SIZE`.
808    ///
809    /// Note: Programmatically setting this will override any value set via the environment variable.
810    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
811        self.max_queue_size = max_queue_size;
812        self
813    }
814
815    /// Set max_export_batch_size for [`BatchConfigBuilder`].
816    /// It's the maximum number of spans to process in a single batch. If there are
817    /// more than one batch worth of spans then it processes multiple batches
818    /// of spans one batch after the other without any delay. The default value
819    /// is 512.
820    ///
821    /// Corresponding environment variable: `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`.
822    ///
823    /// Note: Programmatically setting this will override any value set via the environment variable.
824    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
825        self.max_export_batch_size = max_export_batch_size;
826        self
827    }
828
829    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
830    /// Set max_concurrent_exports for [`BatchConfigBuilder`].
831    /// It's the maximum number of concurrent exports.
832    /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
833    /// The default value is 1.
834    /// If the max_concurrent_exports value is default value, it will cause exports to be performed
835    /// synchronously on the BatchSpanProcessor task.
836    /// The default value is 1.
837    ///
838    /// Corresponding environment variable: `OTEL_BSP_MAX_CONCURRENT_EXPORTS`.
839    ///
840    /// Note: Programmatically setting this will override any value set via the environment variable.
841    pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
842        self.max_concurrent_exports = max_concurrent_exports;
843        self
844    }
845
846    /// Set scheduled_delay_duration for [`BatchConfigBuilder`].
847    /// It's the delay interval in milliseconds between two consecutive processing of batches.
848    /// The default value is 5000 milliseconds.
849    ///
850    /// Corresponding environment variable: `OTEL_BSP_SCHEDULE_DELAY`.
851    ///
852    /// Note: Programmatically setting this will override any value set via the environment variable.
853    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
854        self.scheduled_delay = scheduled_delay;
855        self
856    }
857
858    /// Set max_export_timeout for [`BatchConfigBuilder`].
859    /// It's the maximum duration to export a batch of data.
860    /// The The default value is 30000 milliseconds.
861    ///
862    /// Corresponding environment variable: `OTEL_BSP_EXPORT_TIMEOUT`.
863    ///
864    /// Note: Programmatically setting this will override any value set via the environment variable.
865    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
866    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
867        self.max_export_timeout = max_export_timeout;
868        self
869    }
870
871    /// Builds a `BatchConfig` enforcing the following invariants:
872    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
873    pub fn build(self) -> BatchConfig {
874        // max export batch size must be less or equal to max queue size.
875        // we set max export batch size to max queue size if it's larger than max queue size.
876        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
877
878        BatchConfig {
879            max_queue_size: self.max_queue_size,
880            scheduled_delay: self.scheduled_delay,
881            max_export_timeout: self.max_export_timeout,
882            max_concurrent_exports: self.max_concurrent_exports,
883            max_export_batch_size,
884        }
885    }
886
887    fn init_from_env_vars(mut self) -> Self {
888        if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
889            .ok()
890            .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
891        {
892            self.max_concurrent_exports = max_concurrent_exports;
893        }
894
895        if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
896            .ok()
897            .and_then(|queue_size| usize::from_str(&queue_size).ok())
898        {
899            self.max_queue_size = max_queue_size;
900        }
901
902        if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
903            .ok()
904            .and_then(|delay| u64::from_str(&delay).ok())
905        {
906            self.scheduled_delay = Duration::from_millis(scheduled_delay);
907        }
908
909        if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
910            .ok()
911            .and_then(|batch_size| usize::from_str(&batch_size).ok())
912        {
913            self.max_export_batch_size = max_export_batch_size;
914        }
915
916        // max export batch size must be less or equal to max queue size.
917        // we set max export batch size to max queue size if it's larger than max queue size.
918        if self.max_export_batch_size > self.max_queue_size {
919            self.max_export_batch_size = self.max_queue_size;
920        }
921
922        if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
923            .ok()
924            .and_then(|timeout| u64::from_str(&timeout).ok())
925        {
926            self.max_export_timeout = Duration::from_millis(max_export_timeout);
927        }
928
929        self
930    }
931}
932
933#[cfg(all(test, feature = "testing", feature = "trace"))]
934mod tests {
935    // cargo test trace::span_processor::tests:: --features=testing
936    use super::{
937        BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
938        OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
939        OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
940    };
941    use crate::error::OTelSdkResult;
942    use crate::testing::trace::new_test_export_span_data;
943    use crate::trace::span_processor::{
944        OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
945        OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
946    };
947    use crate::trace::InMemorySpanExporterBuilder;
948    use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
949    use crate::trace::{SpanData, SpanExporter};
950    use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
951    use std::fmt::Debug;
952    use std::time::Duration;
953
954    #[test]
955    fn simple_span_processor_on_end_calls_export() {
956        let exporter = InMemorySpanExporterBuilder::new().build();
957        let processor = SimpleSpanProcessor::new(exporter.clone());
958        let span_data = new_test_export_span_data();
959        processor.on_end(span_data.clone());
960        assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
961        let _result = processor.shutdown();
962    }
963
964    #[test]
965    fn simple_span_processor_on_end_skips_export_if_not_sampled() {
966        let exporter = InMemorySpanExporterBuilder::new().build();
967        let processor = SimpleSpanProcessor::new(exporter.clone());
968        let unsampled = SpanData {
969            span_context: SpanContext::empty_context(),
970            parent_span_id: SpanId::INVALID,
971            parent_span_is_remote: false,
972            span_kind: SpanKind::Internal,
973            name: "opentelemetry".into(),
974            start_time: opentelemetry::time::now(),
975            end_time: opentelemetry::time::now(),
976            attributes: Vec::new(),
977            dropped_attributes_count: 0,
978            events: SpanEvents::default(),
979            links: SpanLinks::default(),
980            status: Status::Unset,
981            instrumentation_scope: Default::default(),
982        };
983        processor.on_end(unsampled);
984        assert!(exporter.get_finished_spans().unwrap().is_empty());
985    }
986
987    #[test]
988    fn simple_span_processor_shutdown_calls_shutdown() {
989        let exporter = InMemorySpanExporterBuilder::new().build();
990        let processor = SimpleSpanProcessor::new(exporter.clone());
991        let span_data = new_test_export_span_data();
992        processor.on_end(span_data.clone());
993        assert!(!exporter.get_finished_spans().unwrap().is_empty());
994        let _result = processor.shutdown();
995        // Assume shutdown is called by ensuring spans are empty in the exporter
996        assert!(exporter.get_finished_spans().unwrap().is_empty());
997    }
998
999    #[test]
1000    fn test_default_const_values() {
1001        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
1002        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
1003        assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
1004        assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT.as_millis(), 5000);
1005        assert_eq!(
1006            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
1007            "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
1008        );
1009        assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
1010        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
1011        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30000);
1012    }
1013
1014    #[test]
1015    fn test_default_batch_config_adheres_to_specification() {
1016        let env_vars = vec![
1017            OTEL_BSP_SCHEDULE_DELAY,
1018            OTEL_BSP_EXPORT_TIMEOUT,
1019            OTEL_BSP_MAX_QUEUE_SIZE,
1020            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
1021            OTEL_BSP_MAX_CONCURRENT_EXPORTS,
1022        ];
1023
1024        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
1025
1026        assert_eq!(
1027            config.max_concurrent_exports,
1028            OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
1029        );
1030        assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1031        assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1032        assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
1033        assert_eq!(
1034            config.max_export_batch_size,
1035            OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
1036        );
1037    }
1038
1039    #[test]
1040    fn test_code_based_config_overrides_env_vars() {
1041        let env_vars = vec![
1042            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
1043            (OTEL_BSP_MAX_CONCURRENT_EXPORTS, Some("5")),
1044            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1045            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
1046            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
1047        ];
1048
1049        temp_env::with_vars(env_vars, || {
1050            let config = BatchConfigBuilder::default()
1051                .with_max_export_batch_size(512)
1052                .with_max_queue_size(2048)
1053                .with_scheduled_delay(Duration::from_millis(1000));
1054            #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1055            let config = {
1056                config
1057                    .with_max_concurrent_exports(10)
1058                    .with_max_export_timeout(Duration::from_millis(2000))
1059            };
1060            let config = config.build();
1061
1062            assert_eq!(config.max_export_batch_size, 512);
1063            assert_eq!(config.max_queue_size, 2048);
1064            assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
1065            #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1066            {
1067                assert_eq!(config.max_concurrent_exports, 10);
1068                assert_eq!(config.max_export_timeout, Duration::from_millis(2000));
1069            }
1070        });
1071    }
1072
1073    #[test]
1074    fn test_batch_config_configurable_by_env_vars() {
1075        let env_vars = vec![
1076            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
1077            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
1078            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
1079            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1080        ];
1081
1082        let config = temp_env::with_vars(env_vars, BatchConfig::default);
1083
1084        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
1085        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
1086        assert_eq!(config.max_queue_size, 4096);
1087        assert_eq!(config.max_export_batch_size, 1024);
1088    }
1089
1090    #[test]
1091    fn test_batch_config_max_export_batch_size_validation() {
1092        let env_vars = vec![
1093            (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
1094            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1095        ];
1096
1097        let config = temp_env::with_vars(env_vars, BatchConfig::default);
1098
1099        assert_eq!(config.max_queue_size, 256);
1100        assert_eq!(config.max_export_batch_size, 256);
1101        assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1102        assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1103    }
1104
1105    #[test]
1106    fn test_batch_config_with_fields() {
1107        let batch = BatchConfigBuilder::default()
1108            .with_max_export_batch_size(10)
1109            .with_scheduled_delay(Duration::from_millis(10))
1110            .with_max_queue_size(10);
1111        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1112        let batch = {
1113            batch
1114                .with_max_concurrent_exports(10)
1115                .with_max_export_timeout(Duration::from_millis(10))
1116        };
1117        let batch = batch.build();
1118        assert_eq!(batch.max_export_batch_size, 10);
1119        assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
1120        assert_eq!(batch.max_queue_size, 10);
1121        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1122        {
1123            assert_eq!(batch.max_concurrent_exports, 10);
1124            assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
1125        }
1126    }
1127
1128    // Helper function to create a default test span
1129    fn create_test_span(name: &str) -> SpanData {
1130        SpanData {
1131            span_context: SpanContext::empty_context(),
1132            parent_span_id: SpanId::INVALID,
1133            parent_span_is_remote: false,
1134            span_kind: SpanKind::Internal,
1135            name: name.to_string().into(),
1136            start_time: opentelemetry::time::now(),
1137            end_time: opentelemetry::time::now(),
1138            attributes: Vec::new(),
1139            dropped_attributes_count: 0,
1140            events: SpanEvents::default(),
1141            links: SpanLinks::default(),
1142            status: Status::Unset,
1143            instrumentation_scope: Default::default(),
1144        }
1145    }
1146
1147    use crate::Resource;
1148    use opentelemetry::{Key, KeyValue, Value};
1149    use std::sync::{atomic::Ordering, Arc, Mutex};
1150
1151    // Mock exporter to test functionality
1152    #[derive(Debug)]
1153    struct MockSpanExporter {
1154        exported_spans: Arc<Mutex<Vec<SpanData>>>,
1155        exported_resource: Arc<Mutex<Option<Resource>>>,
1156    }
1157
1158    impl MockSpanExporter {
1159        fn new() -> Self {
1160            Self {
1161                exported_spans: Arc::new(Mutex::new(Vec::new())),
1162                exported_resource: Arc::new(Mutex::new(None)),
1163            }
1164        }
1165    }
1166
1167    impl SpanExporter for MockSpanExporter {
1168        async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1169            let exported_spans = self.exported_spans.clone();
1170            exported_spans.lock().unwrap().extend(batch);
1171            Ok(())
1172        }
1173
1174        fn shutdown(&mut self) -> OTelSdkResult {
1175            Ok(())
1176        }
1177        fn set_resource(&mut self, resource: &Resource) {
1178            let mut exported_resource = self.exported_resource.lock().unwrap();
1179            *exported_resource = Some(resource.clone());
1180        }
1181    }
1182
1183    #[test]
1184    fn batchspanprocessor_handles_on_end() {
1185        let exporter = MockSpanExporter::new();
1186        let exporter_shared = exporter.exported_spans.clone();
1187        let config = BatchConfigBuilder::default()
1188            .with_max_queue_size(10)
1189            .with_max_export_batch_size(10)
1190            .with_scheduled_delay(Duration::from_secs(5))
1191            .build();
1192        let processor = BatchSpanProcessor::new(exporter, config);
1193
1194        let test_span = create_test_span("test_span");
1195        processor.on_end(test_span.clone());
1196
1197        // Wait for flush interval to ensure the span is processed
1198        std::thread::sleep(Duration::from_secs(6));
1199
1200        let exported_spans = exporter_shared.lock().unwrap();
1201        assert_eq!(exported_spans.len(), 1);
1202        assert_eq!(exported_spans[0].name, "test_span");
1203    }
1204
1205    #[test]
1206    fn batchspanprocessor_force_flush() {
1207        let exporter = MockSpanExporter::new();
1208        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1209        let config = BatchConfigBuilder::default()
1210            .with_max_queue_size(10)
1211            .with_max_export_batch_size(10)
1212            .with_scheduled_delay(Duration::from_secs(5))
1213            .build();
1214        let processor = BatchSpanProcessor::new(exporter, config);
1215
1216        // Create a test span and send it to the processor
1217        let test_span = create_test_span("force_flush_span");
1218        processor.on_end(test_span.clone());
1219
1220        // Call force_flush to immediately export the spans
1221        let flush_result = processor.force_flush();
1222        assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1223
1224        // Verify the exported spans in the mock exporter
1225        let exported_spans = exporter_shared.lock().unwrap();
1226        assert_eq!(
1227            exported_spans.len(),
1228            1,
1229            "Unexpected number of exported spans"
1230        );
1231        assert_eq!(exported_spans[0].name, "force_flush_span");
1232    }
1233
1234    #[test]
1235    fn batchspanprocessor_shutdown() {
1236        // Setup exporter and processor - following the same pattern as test_batch_shutdown from logs
1237        let exporter = InMemorySpanExporterBuilder::new()
1238            .keep_records_on_shutdown()
1239            .build();
1240        let processor = BatchSpanProcessor::new(exporter.clone(), BatchConfig::default());
1241
1242        let record = create_test_span("test_span");
1243
1244        processor.on_end(record);
1245        processor.force_flush().unwrap();
1246        processor.shutdown().unwrap();
1247
1248        // todo: expect to see errors here. How should we assert this?
1249        processor.on_end(create_test_span("after_shutdown_span"));
1250
1251        assert_eq!(1, exporter.get_finished_spans().unwrap().len());
1252        assert!(exporter.is_shutdown_called());
1253    }
1254
1255    #[test]
1256    fn batchspanprocessor_handles_dropped_spans() {
1257        let exporter = MockSpanExporter::new();
1258        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1259        let config = BatchConfigBuilder::default()
1260            .with_max_queue_size(2) // Small queue size to test span dropping
1261            .with_max_export_batch_size(512) // Explicitly set to avoid env var override
1262            .with_scheduled_delay(Duration::from_secs(5))
1263            .build();
1264        let processor = BatchSpanProcessor::new(exporter, config);
1265
1266        // Create test spans and send them to the processor
1267        let span1 = create_test_span("span1");
1268        let span2 = create_test_span("span2");
1269        let span3 = create_test_span("span3"); // This span should be dropped
1270
1271        processor.on_end(span1.clone());
1272        processor.on_end(span2.clone());
1273        processor.on_end(span3.clone()); // This span exceeds the queue size
1274
1275        // Wait for the scheduled delay to expire
1276        std::thread::sleep(Duration::from_secs(6));
1277
1278        let exported_spans = exporter_shared.lock().unwrap();
1279
1280        // Verify that only the first two spans are exported
1281        assert_eq!(
1282            exported_spans.len(),
1283            2,
1284            "Unexpected number of exported spans"
1285        );
1286        assert!(exported_spans.iter().any(|s| s.name == "span1"));
1287        assert!(exported_spans.iter().any(|s| s.name == "span2"));
1288
1289        // Ensure the third span is dropped
1290        assert!(
1291            !exported_spans.iter().any(|s| s.name == "span3"),
1292            "Span3 should have been dropped"
1293        );
1294
1295        // Verify dropped spans count (if accessible in your implementation)
1296        let dropped_count = processor.dropped_spans_count.load(Ordering::Relaxed);
1297        assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1298
1299        // Verify current batch size
1300        let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed);
1301        assert_eq!(current_batch_size, 0, "Unexpected current batch size");
1302    }
1303
1304    #[test]
1305    fn validate_span_attributes_exported_correctly() {
1306        let exporter = MockSpanExporter::new();
1307        let exporter_shared = exporter.exported_spans.clone();
1308        let config = BatchConfigBuilder::default().build();
1309        let processor = BatchSpanProcessor::new(exporter, config);
1310
1311        // Create a span with attributes
1312        let mut span_data = create_test_span("attribute_validation");
1313        span_data.attributes = vec![
1314            KeyValue::new("key1", "value1"),
1315            KeyValue::new("key2", "value2"),
1316        ];
1317        processor.on_end(span_data.clone());
1318
1319        // Force flush to export the span
1320        let _ = processor.force_flush();
1321
1322        // Validate the exported attributes
1323        let exported_spans = exporter_shared.lock().unwrap();
1324        assert_eq!(exported_spans.len(), 1);
1325        let exported_span = &exported_spans[0];
1326        assert!(exported_span
1327            .attributes
1328            .contains(&KeyValue::new("key1", "value1")));
1329        assert!(exported_span
1330            .attributes
1331            .contains(&KeyValue::new("key2", "value2")));
1332    }
1333
1334    #[test]
1335    fn batchspanprocessor_sets_and_exports_with_resource() {
1336        let exporter = MockSpanExporter::new();
1337        let exporter_shared = exporter.exported_spans.clone();
1338        let resource_shared = exporter.exported_resource.clone();
1339        let config = BatchConfigBuilder::default().build();
1340        let mut processor = BatchSpanProcessor::new(exporter, config);
1341
1342        // Set a resource for the processor
1343        let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1344        processor.set_resource(&resource);
1345
1346        // Create a span and send it to the processor
1347        let test_span = create_test_span("resource_test");
1348        processor.on_end(test_span.clone());
1349
1350        // Force flush to ensure the span is exported
1351        let _ = processor.force_flush();
1352
1353        // Validate spans are exported
1354        let exported_spans = exporter_shared.lock().unwrap();
1355        assert_eq!(exported_spans.len(), 1);
1356
1357        // Validate the resource is correctly set in the exporter
1358        let exported_resource = resource_shared.lock().unwrap();
1359        assert!(exported_resource.is_some());
1360        assert_eq!(
1361            exported_resource
1362                .as_ref()
1363                .unwrap()
1364                .get(&Key::new("service.name")),
1365            Some(Value::from("test_service"))
1366        );
1367    }
1368
1369    #[tokio::test(flavor = "current_thread")]
1370    async fn test_batch_processor_current_thread_runtime() {
1371        let exporter = MockSpanExporter::new();
1372        let exporter_shared = exporter.exported_spans.clone();
1373
1374        let config = BatchConfigBuilder::default()
1375            .with_max_queue_size(5)
1376            .with_max_export_batch_size(3)
1377            .build();
1378
1379        let processor = BatchSpanProcessor::new(exporter, config);
1380
1381        for _ in 0..4 {
1382            let span = new_test_export_span_data();
1383            processor.on_end(span);
1384        }
1385
1386        processor.force_flush().unwrap();
1387
1388        let exported_spans = exporter_shared.lock().unwrap();
1389        assert_eq!(exported_spans.len(), 4);
1390    }
1391
1392    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1393    async fn test_batch_processor_multi_thread_count_1_runtime() {
1394        let exporter = MockSpanExporter::new();
1395        let exporter_shared = exporter.exported_spans.clone();
1396
1397        let config = BatchConfigBuilder::default()
1398            .with_max_queue_size(5)
1399            .with_max_export_batch_size(3)
1400            .build();
1401
1402        let processor = BatchSpanProcessor::new(exporter, config);
1403
1404        for _ in 0..4 {
1405            let span = new_test_export_span_data();
1406            processor.on_end(span);
1407        }
1408
1409        processor.force_flush().unwrap();
1410
1411        let exported_spans = exporter_shared.lock().unwrap();
1412        assert_eq!(exported_spans.len(), 4);
1413    }
1414
1415    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1416    async fn test_batch_processor_multi_thread() {
1417        let exporter = MockSpanExporter::new();
1418        let exporter_shared = exporter.exported_spans.clone();
1419
1420        let config = BatchConfigBuilder::default()
1421            .with_max_queue_size(20)
1422            .with_max_export_batch_size(5)
1423            .build();
1424
1425        // Create the processor with the thread-safe exporter
1426        let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1427
1428        let mut handles = vec![];
1429        for _ in 0..10 {
1430            let processor_clone = Arc::clone(&processor);
1431            let handle = tokio::spawn(async move {
1432                let span = new_test_export_span_data();
1433                processor_clone.on_end(span);
1434            });
1435            handles.push(handle);
1436        }
1437
1438        for handle in handles {
1439            handle.await.unwrap();
1440        }
1441
1442        processor.force_flush().unwrap();
1443
1444        // Verify exported spans
1445        let exported_spans = exporter_shared.lock().unwrap();
1446        assert_eq!(exported_spans.len(), 10);
1447    }
1448}