opentelemetry_spanprocessor_any/sdk/trace/
span_processor.rs

1//! # OpenTelemetry Span Processor Interface
2//!
3//! Span processor is an interface which allows hooks for span start and end method
4//! invocations. The span processors are invoked only when
5//! [`is_recording`] is true.
6//!
7//! Built-in span processors are responsible for batching and conversion of spans to
8//! exportable representation and passing batches to exporters.
9//!
10//! Span processors can be registered directly on SDK [`TracerProvider`] and they are
11//! invoked in the same order as they were registered.
12//!
13//! All `Tracer` instances created by a `TracerProvider` share the same span processors.
14//! Changes to this collection reflect in all `Tracer` instances.
15//!
16//! The following diagram shows `SpanProcessor`'s relationship to other components
17//! in the SDK:
18//!
19//! ```ascii
20//!   +-----+--------------+   +-----------------------+   +-------------------+
21//!   |     |              |   |                       |   |                   |
22//!   |     |              |   | (Batch)SpanProcessor  |   |    SpanExporter   |
23//!   |     |              +---> (Simple)SpanProcessor +--->  (JaegerExporter) |
24//!   |     |              |   |                       |   |                   |
25//!   | SDK | Tracer.span()|   +-----------------------+   +-------------------+
26//!   |     | Span.end()   |
27//!   |     |              |   +---------------------+
28//!   |     |              |   |                     |
29//!   |     |              +---> ZPagesProcessor     |
30//!   |     |              |   |                     |
31//!   +-----+--------------+   +---------------------+
32//! ```
33//!
34//! [`is_recording`]: crate::trace::Span::is_recording()
35//! [`TracerProvider`]: crate::trace::TracerProvider
36
37use crate::global;
38use crate::sdk::trace::runtime::{TraceRuntime, TrySend};
39use crate::sdk::trace::Span;
40use crate::{
41    sdk::export::trace::{ExportResult, SpanData, SpanExporter},
42    trace::{TraceError, TraceResult},
43    Context,
44};
45use futures_channel::oneshot;
46use futures_util::future::{self, Either};
47use futures_util::{pin_mut, stream, StreamExt as _};
48use std::{env, fmt, str::FromStr, thread, time::Duration};
49use std::any::Any;
50
51/// Delay interval between two consecutive exports.
52const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
53/// Default delay interval between two consecutive exports.
54const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
55/// Maximum queue size
56const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
57/// Default maximum queue size
58const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
59/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE
60const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
61/// Default maximum batch size
62const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
63/// Maximum allowed time to export data.
64const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
65/// Default maximum allowed time to export data.
66const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
67
68/// `SpanProcessor` is an interface which allows hooks for span start and end
69/// method invocations. The span processors are invoked only when is_recording
70/// is true.
71pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
72    /// `on_start` is called when a `Span` is started.  This method is called
73    /// synchronously on the thread that started the span, therefore it should
74    /// not block or throw exceptions.
75    fn on_start(&self, span: &mut Span, cx: &Context);
76    /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
77    /// already set). This method is called synchronously within the `Span::end`
78    /// API, therefore it should not block or throw an exception.
79    fn on_end(&self, span: SpanData);
80    /// Force the spans lying in the cache to be exported.
81    fn force_flush(&self) -> TraceResult<()>;
82    /// Shuts down the processor. Called when SDK is shut down. This is an
83    /// opportunity for processors to do any cleanup required.
84    fn shutdown(&mut self) -> TraceResult<()>;
85    /// For casting
86    fn as_any(&self) -> &dyn Any;
87}
88
89/// A [`SpanProcessor`] that exports synchronously when spans are finished.
90///
91/// # Examples
92///
93/// Note that the simple processor exports synchronously every time a span is
94/// ended. If you find this limiting, consider the batch processor instead.
95///
96/// ```
97/// use opentelemetry::{trace::noop::NoopSpanExporter, sdk, global};
98///
99/// // Configure your preferred exporter
100/// let exporter = NoopSpanExporter::new();
101///
102/// // Then use the `with_simple_exporter` method to have the provider export when spans finish.
103/// let provider = sdk::trace::TracerProvider::builder()
104///     .with_simple_exporter(exporter)
105///     .build();
106///
107/// let previous_provider = global::set_tracer_provider(provider);
108/// ```
109#[derive(Debug)]
110pub struct SimpleSpanProcessor {
111    sender: crossbeam_channel::Sender<Option<SpanData>>,
112    shutdown: crossbeam_channel::Receiver<()>,
113}
114
115impl SimpleSpanProcessor {
116    pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
117        let (span_tx, span_rx) = crossbeam_channel::unbounded();
118        let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
119
120        let _ = thread::Builder::new()
121            .name("opentelemetry-exporter".to_string())
122            .spawn(move || {
123                while let Ok(Some(span)) = span_rx.recv() {
124                    if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) {
125                        global::handle_error(err);
126                    }
127                }
128
129                exporter.shutdown();
130
131                if let Err(err) = shutdown_tx.send(()) {
132                    global::handle_error(TraceError::from(format!(
133                        "could not send shutdown: {:?}",
134                        err
135                    )));
136                }
137            });
138
139        SimpleSpanProcessor {
140            sender: span_tx,
141            shutdown: shutdown_rx,
142        }
143    }
144}
145
146impl SpanProcessor for SimpleSpanProcessor {
147    fn on_start(&self, _span: &mut Span, _cx: &Context) {
148        // Ignored
149    }
150
151    fn on_end(&self, span: SpanData) {
152        if let Err(err) = self.sender.send(Some(span)) {
153            global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
154        }
155    }
156
157    fn force_flush(&self) -> TraceResult<()> {
158        // Ignored since all spans in Simple Processor will be exported as they ended.
159        Ok(())
160    }
161
162    fn shutdown(&mut self) -> TraceResult<()> {
163        if self.sender.send(None).is_ok() {
164            if let Err(err) = self.shutdown.recv() {
165                global::handle_error(TraceError::from(format!(
166                    "error shutting down span processor: {:?}",
167                    err
168                )))
169            }
170        }
171
172        Ok(())
173    }
174
175    fn as_any(&self) -> &dyn Any {
176        self
177    }
178}
179
180/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
181/// them at a preconfigured interval.
182///
183/// Batch span processors need to run a background task to collect and send
184/// spans. Different runtimes need different ways to handle the background task.
185///
186/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the
187/// underlying runtime can cause deadlocks (see tokio section).
188///
189/// ### Use with Tokio
190///
191/// Tokio currently offers two different schedulers. One is
192/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both
193/// of them default to use batch span processors to install span exporters.
194///
195/// Tokio's `current_thread_scheduler` can cause the program to hang forever if
196/// blocking work is scheduled with other tasks in the same runtime. To avoid
197/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate
198/// if you are using that runtime (e.g. users of actix-web), and blocking tasks
199/// will then be scheduled on a different thread.
200///
201/// # Examples
202///
203/// This processor can be configured with an [`executor`] of your choice to
204/// batch and upload spans asynchronously when they end. If you have added a
205/// library like [`tokio`] or [`async-std`], you can pass in their respective
206/// `spawn` and `interval` functions to have batching performed in those
207/// contexts.
208///
209/// ```
210/// # #[cfg(feature="tokio")]
211/// # {
212/// use opentelemetry::{global, runtime, sdk, trace::noop::NoopSpanExporter};
213/// use std::time::Duration;
214///
215/// #[tokio::main]
216/// async fn main() {
217///     // Configure your preferred exporter
218///     let exporter = NoopSpanExporter::new();
219///
220///     // Create a batch span processor using an exporter and a runtime
221///     let batch = sdk::trace::BatchSpanProcessor::builder(exporter, runtime::Tokio)
222///         .with_max_queue_size(4096)
223///         .build();
224///
225///     // Then use the `with_batch_exporter` method to have the provider export spans in batches.
226///     let provider = sdk::trace::TracerProvider::builder()
227///         .with_span_processor(batch)
228///         .build();
229///
230///     let _ = global::set_tracer_provider(provider);
231/// }
232/// # }
233/// ```
234///
235/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
236/// [`tokio`]: https://tokio.rs
237/// [`async-std`]: https://async.rs
238pub struct BatchSpanProcessor<R: TraceRuntime> {
239    message_sender: R::Sender,
240}
241
242impl<R: TraceRuntime> fmt::Debug for BatchSpanProcessor<R> {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        f.debug_struct("BatchSpanProcessor")
245            .field("message_sender", &self.message_sender)
246            .finish()
247    }
248}
249
250impl<R: TraceRuntime> SpanProcessor for BatchSpanProcessor<R> {
251    fn on_start(&self, _span: &mut Span, _cx: &Context) {
252        // Ignored
253    }
254
255    fn on_end(&self, span: SpanData) {
256        let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
257
258        if let Err(err) = result {
259            global::handle_error(err);
260        }
261    }
262
263    fn force_flush(&self) -> TraceResult<()> {
264        let (res_sender, res_receiver) = oneshot::channel();
265        self.message_sender
266            .try_send(BatchMessage::Flush(Some(res_sender)))?;
267
268        futures_executor::block_on(res_receiver)
269            .map_err(|err| TraceError::Other(err.into()))
270            .and_then(|identity| identity)
271    }
272
273    fn shutdown(&mut self) -> TraceResult<()> {
274        let (res_sender, res_receiver) = oneshot::channel();
275        self.message_sender
276            .try_send(BatchMessage::Shutdown(res_sender))?;
277
278        futures_executor::block_on(res_receiver)
279            .map_err(|err| TraceError::Other(err.into()))
280            .and_then(|identity| identity)
281    }
282
283    fn as_any(&self) -> &dyn Any {
284        self
285    }
286}
287
288/// Messages sent between application thread and batch span processor's work thread.
289// In this enum the size difference is not a concern because:
290// 1. If we wrap SpanData into a pointer, it will add overhead when processing.
291// 2. Most of the messages will be ExportSpan.
292#[allow(clippy::large_enum_variant)]
293#[derive(Debug)]
294pub enum BatchMessage {
295    /// Export spans, usually called when span ends
296    ExportSpan(SpanData),
297    /// Flush the current buffer to the backend, it can be triggered by
298    /// pre configured interval or a call to `force_push` function.
299    Flush(Option<oneshot::Sender<ExportResult>>),
300    /// Shut down the worker thread, push all spans in buffer to the backend.
301    Shutdown(oneshot::Sender<ExportResult>),
302}
303
304impl<R: TraceRuntime> BatchSpanProcessor<R> {
305    pub(crate) fn new(
306        mut exporter: Box<dyn SpanExporter>,
307        config: BatchConfig,
308        runtime: R,
309    ) -> Self {
310        let (message_sender, message_receiver) =
311            runtime.batch_message_channel(config.max_queue_size);
312        let ticker = runtime
313            .interval(config.scheduled_delay)
314            .map(|_| BatchMessage::Flush(None));
315        let timeout_runtime = runtime.clone();
316
317        // Spawn worker process via user-defined spawn function.
318        runtime.spawn(Box::pin(async move {
319            let mut spans = Vec::new();
320            let mut messages = Box::pin(stream::select(message_receiver, ticker));
321
322            while let Some(message) = messages.next().await {
323                match message {
324                    // Span has finished, add to buffer of pending spans.
325                    BatchMessage::ExportSpan(span) => {
326                        spans.push(span);
327
328                        if spans.len() == config.max_export_batch_size {
329                            let result = export_with_timeout(
330                                config.max_export_timeout,
331                                exporter.as_mut(),
332                                &timeout_runtime,
333                                spans.split_off(0),
334                            )
335                            .await;
336
337                            if let Err(err) = result {
338                                global::handle_error(err);
339                            }
340                        }
341                    }
342                    // Span batch interval time reached or a force flush has been invoked, export current spans.
343                    BatchMessage::Flush(res_channel) => {
344                        let result = export_with_timeout(
345                            config.max_export_timeout,
346                            exporter.as_mut(),
347                            &timeout_runtime,
348                            spans.split_off(0),
349                        )
350                        .await;
351
352                        if let Some(channel) = res_channel {
353                            if let Err(result) = channel.send(result) {
354                                global::handle_error(TraceError::from(format!(
355                                    "failed to send flush result: {:?}",
356                                    result
357                                )));
358                            }
359                        } else if let Err(err) = result {
360                            global::handle_error(err);
361                        }
362                    }
363                    // Stream has terminated or processor is shutdown, return to finish execution.
364                    BatchMessage::Shutdown(ch) => {
365                        let result = export_with_timeout(
366                            config.max_export_timeout,
367                            exporter.as_mut(),
368                            &timeout_runtime,
369                            spans.split_off(0),
370                        )
371                        .await;
372
373                        exporter.shutdown();
374
375                        if let Err(result) = ch.send(result) {
376                            global::handle_error(TraceError::from(format!(
377                                "failed to send batch processor shutdown result: {:?}",
378                                result
379                            )));
380                        }
381
382                        break;
383                    }
384                }
385            }
386        }));
387
388        // Return batch processor with link to worker
389        BatchSpanProcessor { message_sender }
390    }
391
392    /// Create a new batch processor builder
393    pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
394    where
395        E: SpanExporter,
396    {
397        BatchSpanProcessorBuilder {
398            exporter,
399            config: BatchConfig::default(),
400            runtime,
401        }
402    }
403}
404
405async fn export_with_timeout<R, E>(
406    time_out: Duration,
407    exporter: &mut E,
408    runtime: &R,
409    batch: Vec<SpanData>,
410) -> ExportResult
411where
412    R: TraceRuntime,
413    E: SpanExporter + ?Sized,
414{
415    if batch.is_empty() {
416        return Ok(());
417    }
418
419    let export = exporter.export(batch);
420    let timeout = runtime.delay(time_out);
421    pin_mut!(export);
422    pin_mut!(timeout);
423    match future::select(export, timeout).await {
424        Either::Left((export_res, _)) => export_res,
425        Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
426    }
427}
428
429/// Batch span processor configuration
430#[derive(Debug)]
431pub struct BatchConfig {
432    /// The maximum queue size to buffer spans for delayed processing. If the
433    /// queue gets full it drops the spans. The default value of is 2048.
434    max_queue_size: usize,
435
436    /// The delay interval in milliseconds between two consecutive processing
437    /// of batches. The default value is 5 seconds.
438    scheduled_delay: Duration,
439
440    /// The maximum number of spans to process in a single batch. If there are
441    /// more than one batch worth of spans then it processes multiple batches
442    /// of spans one batch after the other without any delay. The default value
443    /// is 512.
444    max_export_batch_size: usize,
445
446    /// The maximum duration to export a batch of data.
447    max_export_timeout: Duration,
448}
449
450impl Default for BatchConfig {
451    fn default() -> Self {
452        let mut config = BatchConfig {
453            max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
454            scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
455            max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
456            max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
457        };
458
459        if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
460            .ok()
461            .and_then(|queue_size| usize::from_str(&queue_size).ok())
462        {
463            config.max_queue_size = max_queue_size;
464        }
465
466        if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
467            .ok()
468            .or_else(|| env::var("OTEL_BSP_SCHEDULE_DELAY_MILLIS").ok())
469            .and_then(|delay| u64::from_str(&delay).ok())
470        {
471            config.scheduled_delay = Duration::from_millis(scheduled_delay);
472        }
473
474        if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
475            .ok()
476            .and_then(|batch_size| usize::from_str(&batch_size).ok())
477        {
478            config.max_export_batch_size = max_export_batch_size;
479        }
480
481        // max export batch size must be less or equal to max queue size.
482        // we set max export batch size to max queue size if it's larger than max queue size.
483        if config.max_export_batch_size > config.max_queue_size {
484            config.max_export_batch_size = config.max_queue_size;
485        }
486
487        if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
488            .ok()
489            .or_else(|| env::var("OTEL_BSP_EXPORT_TIMEOUT_MILLIS").ok())
490            .and_then(|timeout| u64::from_str(&timeout).ok())
491        {
492            config.max_export_timeout = Duration::from_millis(max_export_timeout);
493        }
494
495        config
496    }
497}
498
499/// A builder for creating [`BatchSpanProcessor`] instances.
500///
501#[derive(Debug)]
502pub struct BatchSpanProcessorBuilder<E, R> {
503    exporter: E,
504    config: BatchConfig,
505    runtime: R,
506}
507
508impl<E, R> BatchSpanProcessorBuilder<E, R>
509where
510    E: SpanExporter + 'static,
511    R: TraceRuntime,
512{
513    /// Set max queue size for batches
514    pub fn with_max_queue_size(self, size: usize) -> Self {
515        let mut config = self.config;
516        config.max_queue_size = size;
517
518        BatchSpanProcessorBuilder { config, ..self }
519    }
520
521    /// Set scheduled delay for batches
522    pub fn with_scheduled_delay(self, delay: Duration) -> Self {
523        let mut config = self.config;
524        config.scheduled_delay = delay;
525
526        BatchSpanProcessorBuilder { config, ..self }
527    }
528
529    /// Set max timeout for exporting.
530    pub fn with_max_timeout(self, timeout: Duration) -> Self {
531        let mut config = self.config;
532        config.max_export_timeout = timeout;
533
534        BatchSpanProcessorBuilder { config, ..self }
535    }
536
537    /// Set max export size for batches, should always less than or equals to max queue size.
538    ///
539    /// If input is larger than max queue size, will lower it to be equal to max queue size
540    pub fn with_max_export_batch_size(self, size: usize) -> Self {
541        let mut config = self.config;
542        if size > config.max_queue_size {
543            config.max_export_batch_size = config.max_queue_size;
544        } else {
545            config.max_export_batch_size = size;
546        }
547
548        BatchSpanProcessorBuilder { config, ..self }
549    }
550
551    /// Build a batch processor
552    pub fn build(self) -> BatchSpanProcessor<R> {
553        BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
554    }
555}
556
557#[cfg(all(test, feature = "testing", feature = "trace"))]
558mod tests {
559    use super::{
560        BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
561        OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
562        OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
563    };
564    use crate::runtime;
565    use crate::sdk::export::trace::{stdout, ExportResult, SpanData, SpanExporter};
566    use crate::sdk::trace::BatchConfig;
567    use crate::testing::trace::{
568        new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
569    };
570    use async_trait::async_trait;
571    use std::fmt::Debug;
572    use std::future::Future;
573    use std::time::Duration;
574
575    #[test]
576    fn simple_span_processor_on_end_calls_export() {
577        let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
578        let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
579        processor.on_end(new_test_export_span_data());
580        assert!(rx_export.recv().is_ok());
581        let _result = processor.shutdown();
582    }
583
584    #[test]
585    fn simple_span_processor_shutdown_calls_shutdown() {
586        let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
587        let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
588        let _result = processor.shutdown();
589        assert!(rx_shutdown.try_recv().is_ok());
590    }
591
592    #[test]
593    fn test_build_batch_span_processor_builder() {
594        std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
595        std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT, "2046");
596        std::env::set_var(OTEL_BSP_SCHEDULE_DELAY, "I am not number");
597
598        let mut builder = BatchSpanProcessor::builder(
599            stdout::Exporter::new(std::io::stdout(), true),
600            runtime::Tokio,
601        );
602        // export batch size cannot exceed max queue size
603        assert_eq!(builder.config.max_export_batch_size, 500);
604        assert_eq!(
605            builder.config.scheduled_delay,
606            Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
607        );
608        assert_eq!(
609            builder.config.max_queue_size,
610            OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
611        );
612        assert_eq!(
613            builder.config.max_export_timeout,
614            Duration::from_millis(2046)
615        );
616
617        std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120");
618        builder = BatchSpanProcessor::builder(
619            stdout::Exporter::new(std::io::stdout(), true),
620            runtime::Tokio,
621        );
622
623        assert_eq!(builder.config.max_export_batch_size, 120);
624        assert_eq!(builder.config.max_queue_size, 120);
625    }
626
627    #[tokio::test]
628    async fn test_batch_span_processor() {
629        let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
630        let config = BatchConfig {
631            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
632            ..Default::default()
633        };
634        let mut processor =
635            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
636        let handle = tokio::spawn(async move {
637            loop {
638                if let Some(span) = export_receiver.recv().await {
639                    assert_eq!(span.span_context, new_test_export_span_data().span_context);
640                    break;
641                }
642            }
643        });
644        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
645        processor.on_end(new_test_export_span_data());
646        let flush_res = processor.force_flush();
647        assert!(flush_res.is_ok());
648        let _shutdown_result = processor.shutdown();
649
650        assert!(
651            tokio::time::timeout(Duration::from_secs(5), handle)
652                .await
653                .is_ok(),
654            "timed out in 5 seconds. force_flush may not export any data when called"
655        );
656    }
657
658    struct BlockingExporter<D> {
659        delay_for: Duration,
660        delay_fn: D,
661    }
662
663    impl<D, DS> Debug for BlockingExporter<D>
664    where
665        D: Fn(Duration) -> DS + 'static + Send + Sync,
666        DS: Future<Output = ()> + Send + Sync + 'static,
667    {
668        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669            f.write_str("blocking exporter for testing")
670        }
671    }
672
673    #[async_trait]
674    impl<D, DS> SpanExporter for BlockingExporter<D>
675    where
676        D: Fn(Duration) -> DS + 'static + Send + Sync,
677        DS: Future<Output = ()> + Send + Sync + 'static,
678    {
679        async fn export(&mut self, _batch: Vec<SpanData>) -> ExportResult {
680            (self.delay_fn)(self.delay_for).await;
681            Ok(())
682        }
683    }
684
685    #[test]
686    fn test_timeout_tokio_timeout() {
687        // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
688        // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
689        // Either way, the test should be finished within 5s.
690        let runtime = tokio::runtime::Builder::new_multi_thread()
691            .enable_all()
692            .build()
693            .unwrap();
694        runtime.block_on(timeout_test_tokio(true));
695    }
696
697    #[test]
698    fn test_timeout_tokio_not_timeout() {
699        let runtime = tokio::runtime::Builder::new_multi_thread()
700            .enable_all()
701            .build()
702            .unwrap();
703        runtime.block_on(timeout_test_tokio(false));
704    }
705
706    #[test]
707    #[cfg(feature = "rt-async-std")]
708    fn test_timeout_async_std_timeout() {
709        async_std::task::block_on(timeout_test_std_async(true));
710    }
711
712    #[test]
713    #[cfg(feature = "rt-async-std")]
714    fn test_timeout_async_std_not_timeout() {
715        async_std::task::block_on(timeout_test_std_async(false));
716    }
717
718    // If the time_out is true, then the result suppose to ended with timeout.
719    // otherwise the exporter should be able to export within time out duration.
720    #[cfg(feature = "rt-async-std")]
721    async fn timeout_test_std_async(time_out: bool) {
722        let config = BatchConfig {
723            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
724            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
725            ..Default::default()
726        };
727        let exporter = BlockingExporter {
728            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
729            delay_fn: async_std::task::sleep,
730        };
731        let mut processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
732        processor.on_end(new_test_export_span_data());
733        let flush_res = processor.force_flush();
734        if time_out {
735            assert!(flush_res.is_err());
736        } else {
737            assert!(flush_res.is_ok());
738        }
739        let shutdown_res = processor.shutdown();
740        assert!(shutdown_res.is_ok());
741    }
742
743    // If the time_out is true, then the result suppose to ended with timeout.
744    // otherwise the exporter should be able to export within time out duration.
745    async fn timeout_test_tokio(time_out: bool) {
746        let config = BatchConfig {
747            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
748            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush,
749            ..Default::default()
750        };
751        let exporter = BlockingExporter {
752            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
753            delay_fn: tokio::time::sleep,
754        };
755        let mut processor =
756            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
757        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
758        processor.on_end(new_test_export_span_data());
759        let flush_res = processor.force_flush();
760        if time_out {
761            assert!(flush_res.is_err());
762        } else {
763            assert!(flush_res.is_ok());
764        }
765        let shutdown_res = processor.shutdown();
766        assert!(shutdown_res.is_ok());
767    }
768}