use crate::error::{OTelSdkError, OTelSdkResult};
use crate::resource::Resource;
use crate::trace::Span;
use crate::trace::{SpanData, SpanExporter};
use opentelemetry::Context;
use opentelemetry::{otel_debug, otel_error, otel_warn};
use std::cmp::min;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, str::FromStr, time::Duration};
use std::sync::atomic::AtomicBool;
use std::thread;
use std::time::Instant;
pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(5_000);
pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn on_start(&self, span: &mut Span, cx: &Context);
fn on_end(&self, span: SpanData);
fn force_flush(&self) -> OTelSdkResult;
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
fn set_resource(&mut self, _resource: &Resource) {}
}
#[derive(Debug)]
pub struct SimpleSpanProcessor<T: SpanExporter> {
exporter: Mutex<T>,
}
impl<T: SpanExporter> SimpleSpanProcessor<T> {
pub fn new(exporter: T) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if !span.span_context.is_sampled() {
return;
}
let result = self
.exporter
.lock()
.map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
.and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
if let Err(err) = result {
otel_debug!(
name: "SimpleProcessor.OnEnd.Error",
reason = format!("{:?}", err)
);
}
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))
}
}
fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
}
}
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::SyncSender;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
ExportSpan(Arc<AtomicBool>),
ForceFlush(SyncSender<OTelSdkResult>),
Shutdown(SyncSender<OTelSdkResult>),
SetResource(Arc<Resource>),
}
#[derive(Debug)]
pub struct BatchSpanProcessor {
span_sender: SyncSender<SpanData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
export_span_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
dropped_spans_count: AtomicUsize,
max_queue_size: usize,
}
impl BatchSpanProcessor {
pub fn new<E>(
mut exporter: E,
config: BatchConfig,
) -> Self
where
E: SpanExporter + Send + 'static,
{
let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();
let handle = thread::Builder::new()
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
.spawn(move || {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
otel_debug!(
name: "BatchSpanProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),
max_export_batch_size = config.max_export_batch_size,
max_queue_size = config.max_queue_size,
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();
let current_batch_size = current_batch_size_for_thread;
loop {
let remaining_time_option = config
.scheduled_delay
.checked_sub(last_export_time.elapsed());
let remaining_time = match remaining_time_option {
Some(remaining_time) => remaining_time,
None => config.scheduled_delay,
};
match message_receiver.recv_timeout(remaining_time) {
Ok(message) => match message {
BatchMessage::ExportSpan(export_span_message_sent) => {
export_span_message_sent.store(false, Ordering::Relaxed);
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToBatchSize",
);
let _ = Self::get_spans_and_export(
&span_receiver,
&exporter,
&mut spans,
&mut last_export_time,
¤t_batch_size,
&config,
);
}
BatchMessage::ForceFlush(sender) => {
otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
let result = Self::get_spans_and_export(
&span_receiver,
&exporter,
&mut spans,
&mut last_export_time,
¤t_batch_size,
&config,
);
let _ = sender.send(result);
}
BatchMessage::Shutdown(sender) => {
otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
let result = Self::get_spans_and_export(
&span_receiver,
&exporter,
&mut spans,
&mut last_export_time,
¤t_batch_size,
&config,
);
let _ = exporter.shutdown();
let _ = sender.send(result);
otel_debug!(
name: "BatchSpanProcessor.ThreadExiting",
reason = "ShutdownRequested"
);
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
},
Err(RecvTimeoutError::Timeout) => {
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToTimer",
);
let _ = Self::get_spans_and_export(
&span_receiver,
&exporter,
&mut spans,
&mut last_export_time,
¤t_batch_size,
&config,
);
}
Err(RecvTimeoutError::Disconnected) => {
otel_debug!(
name: "BatchSpanProcessor.ThreadExiting",
reason = "MessageSenderDisconnected"
);
break;
}
}
}
otel_debug!(
name: "BatchSpanProcessor.ThreadStopped"
);
})
.expect("Failed to spawn thread");
Self {
span_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
export_span_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
}
}
pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
where
E: SpanExporter + Send + 'static,
{
BatchSpanProcessorBuilder {
exporter,
config: BatchConfig::default(),
}
}
#[inline]
fn get_spans_and_export<E>(
spans_receiver: &Receiver<SpanData>,
exporter: &E,
spans: &mut Vec<SpanData>,
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
config: &BatchConfig,
) -> OTelSdkResult
where
E: SpanExporter + Send + Sync + 'static,
{
let target = current_batch_size.load(Ordering::Relaxed); let mut result = OTelSdkResult::Ok(());
let mut total_exported_spans: usize = 0;
while target > 0 && total_exported_spans < target {
while let Ok(span) = spans_receiver.try_recv() {
spans.push(span);
if spans.len() == config.max_export_batch_size {
break;
}
}
let count_of_spans = spans.len(); total_exported_spans += count_of_spans;
result = Self::export_batch_sync(exporter, spans, last_export_time);
current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
}
result
}
#[allow(clippy::vec_box)]
fn export_batch_sync<E>(
exporter: &E,
batch: &mut Vec<SpanData>,
last_export_time: &mut Instant,
) -> OTelSdkResult
where
E: SpanExporter + ?Sized,
{
*last_export_time = Instant::now();
if batch.is_empty() {
return OTelSdkResult::Ok(());
}
let export = exporter.export(batch.split_off(0));
let export_result = futures_executor::block_on(export);
match export_result {
Ok(_) => OTelSdkResult::Ok(()),
Err(err) => {
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
OTelSdkResult::Err(err)
}
}
}
}
impl SpanProcessor for BatchSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
let result = self.span_sender.try_send(span);
match result {
Ok(_) => {
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
>= self.max_export_batch_size
{
if !self.export_span_message_sent.load(Ordering::Relaxed) {
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportSpan(
self.export_span_message_sent.clone(),
)) {
Ok(_) => {
}
Err(_err) => {
self.export_span_message_sent
.store(false, Ordering::Relaxed);
}
}
}
}
}
}
Err(std::sync::mpsc::TrySendError::Full(_)) => {
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
}
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
otel_warn!(
name: "BatchSpanProcessor.OnEnd.AfterShutdown",
message = "Spans are being emitted even after Shutdown. This indicates incorrect lifecycle management of TracerProvider in application. Spans will not be exported."
);
}
}
}
fn force_flush(&self) -> OTelSdkResult {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
match self
.message_sender
.try_send(BatchMessage::ForceFlush(sender))
{
Ok(_) => receiver
.recv_timeout(self.forceflush_timeout)
.map_err(|err| {
if err == std::sync::mpsc::RecvTimeoutError::Timeout {
OTelSdkError::Timeout(self.forceflush_timeout)
} else {
OTelSdkError::InternalFailure(format!("{err}"))
}
})?,
Err(std::sync::mpsc::TrySendError::Full(_)) => {
otel_debug!(
name: "BatchSpanProcessor.ForceFlush.ControlChannelFull",
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
);
Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
otel_debug!(
name: "BatchSpanProcessor.ForceFlush.AlreadyShutdown",
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
);
Err(OTelSdkError::AlreadyShutdown)
}
}
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(
name: "BatchSpanProcessor.SpansDropped",
dropped_span_count = dropped_spans,
max_queue_size = max_queue_size,
message = "Spans were dropped due to a queue being full. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
Ok(_) => {
receiver
.recv_timeout(timeout)
.map(|_| {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.join().unwrap();
}
OTelSdkResult::Ok(())
})
.map_err(|err| match err {
std::sync::mpsc::RecvTimeoutError::Timeout => {
otel_error!(
name: "BatchSpanProcessor.Shutdown.Timeout",
message = "BatchSpanProcessor shutdown timing out."
);
OTelSdkError::Timeout(timeout)
}
_ => {
otel_error!(
name: "BatchSpanProcessor.Shutdown.Error",
error = format!("{}", err)
);
OTelSdkError::InternalFailure(format!("{err}"))
}
})?
}
Err(std::sync::mpsc::TrySendError::Full(_)) => {
otel_debug!(
name: "BatchSpanProcessor.Shutdown.ControlChannelFull",
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
);
Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call.".into()))
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
otel_debug!(
name: "BatchSpanProcessor.Shutdown.AlreadyShutdown",
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
);
Err(OTelSdkError::AlreadyShutdown)
}
}
}
fn set_resource(&mut self, resource: &Resource) {
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
}
}
#[derive(Debug, Default)]
pub struct BatchSpanProcessorBuilder<E>
where
E: SpanExporter + Send + 'static,
{
exporter: E,
config: BatchConfig,
}
impl<E> BatchSpanProcessorBuilder<E>
where
E: SpanExporter + Send + 'static,
{
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchSpanProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchSpanProcessor {
BatchSpanProcessor::new(self.exporter, self.config)
}
}
#[derive(Debug)]
pub struct BatchConfig {
pub(crate) max_queue_size: usize,
pub(crate) scheduled_delay: Duration,
#[allow(dead_code)]
pub(crate) max_export_batch_size: usize,
#[allow(dead_code)]
pub(crate) max_export_timeout: Duration,
#[allow(dead_code)]
pub(crate) max_concurrent_exports: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfigBuilder::default().build()
}
}
#[derive(Debug)]
pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
max_concurrent_exports: usize,
}
impl Default for BatchConfigBuilder {
fn default() -> Self {
BatchConfigBuilder {
max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
}
.init_from_env_vars()
}
}
impl BatchConfigBuilder {
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
self.max_concurrent_exports = max_concurrent_exports;
self
}
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
pub fn build(self) -> BatchConfig {
let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
BatchConfig {
max_queue_size: self.max_queue_size,
scheduled_delay: self.scheduled_delay,
max_export_timeout: self.max_export_timeout,
max_concurrent_exports: self.max_concurrent_exports,
max_export_batch_size,
}
}
fn init_from_env_vars(mut self) -> Self {
if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
.ok()
.and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
{
self.max_concurrent_exports = max_concurrent_exports;
}
if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
.ok()
.and_then(|queue_size| usize::from_str(&queue_size).ok())
{
self.max_queue_size = max_queue_size;
}
if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
.ok()
.and_then(|delay| u64::from_str(&delay).ok())
{
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}
if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
.ok()
.and_then(|batch_size| usize::from_str(&batch_size).ok())
{
self.max_export_batch_size = max_export_batch_size;
}
if self.max_export_batch_size > self.max_queue_size {
self.max_export_batch_size = self.max_queue_size;
}
if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
.ok()
.and_then(|timeout| u64::from_str(&timeout).ok())
{
self.max_export_timeout = Duration::from_millis(max_export_timeout);
}
self
}
}
#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
};
use crate::error::OTelSdkResult;
use crate::testing::trace::new_test_export_span_data;
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
};
use crate::trace::InMemorySpanExporterBuilder;
use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
use crate::trace::{SpanData, SpanExporter};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
use std::fmt::Debug;
use std::time::Duration;
#[test]
fn simple_span_processor_on_end_calls_export() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
let _result = processor.shutdown();
}
#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
parent_span_is_remote: false,
span_kind: SpanKind::Internal,
name: "opentelemetry".into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
};
processor.on_end(unsampled);
assert!(exporter.get_finished_spans().unwrap().is_empty());
}
#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
assert!(!exporter.get_finished_spans().unwrap().is_empty());
let _result = processor.shutdown();
assert!(exporter.get_finished_spans().unwrap().is_empty());
}
#[test]
fn test_default_const_values() {
assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT.as_millis(), 5000);
assert_eq!(
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
"OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
);
assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30000);
}
#[test]
fn test_default_batch_config_adheres_to_specification() {
let env_vars = vec![
OTEL_BSP_SCHEDULE_DELAY,
OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_QUEUE_SIZE,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_CONCURRENT_EXPORTS,
];
let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
assert_eq!(
config.max_concurrent_exports,
OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
);
assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}
#[test]
fn test_code_based_config_overrides_env_vars() {
let env_vars = vec![
(OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BSP_MAX_CONCURRENT_EXPORTS, Some("5")),
(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
(OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
];
temp_env::with_vars(env_vars, || {
let config = BatchConfigBuilder::default()
.with_max_export_batch_size(512)
.with_max_queue_size(2048)
.with_scheduled_delay(Duration::from_millis(1000));
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
let config = {
config
.with_max_concurrent_exports(10)
.with_max_export_timeout(Duration::from_millis(2000))
};
let config = config.build();
assert_eq!(config.max_export_batch_size, 512);
assert_eq!(config.max_queue_size, 2048);
assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
{
assert_eq!(config.max_concurrent_exports, 10);
assert_eq!(config.max_export_timeout, Duration::from_millis(2000));
}
});
}
#[test]
fn test_batch_config_configurable_by_env_vars() {
let env_vars = vec![
(OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
(OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
}
#[test]
fn test_batch_config_max_export_batch_size_validation() {
let env_vars = vec![
(OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.max_queue_size, 256);
assert_eq!(config.max_export_batch_size, 256);
assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
}
#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfigBuilder::default()
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_millis(10))
.with_max_queue_size(10);
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
let batch = {
batch
.with_max_concurrent_exports(10)
.with_max_export_timeout(Duration::from_millis(10))
};
let batch = batch.build();
assert_eq!(batch.max_export_batch_size, 10);
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
assert_eq!(batch.max_queue_size, 10);
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
{
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
}
}
fn create_test_span(name: &str) -> SpanData {
SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
parent_span_is_remote: false,
span_kind: SpanKind::Internal,
name: name.to_string().into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
}
}
use crate::Resource;
use opentelemetry::{Key, KeyValue, Value};
use std::sync::{atomic::Ordering, Arc, Mutex};
#[derive(Debug)]
struct MockSpanExporter {
exported_spans: Arc<Mutex<Vec<SpanData>>>,
exported_resource: Arc<Mutex<Option<Resource>>>,
}
impl MockSpanExporter {
fn new() -> Self {
Self {
exported_spans: Arc::new(Mutex::new(Vec::new())),
exported_resource: Arc::new(Mutex::new(None)),
}
}
}
impl SpanExporter for MockSpanExporter {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
let exported_spans = self.exported_spans.clone();
exported_spans.lock().unwrap().extend(batch);
Ok(())
}
fn shutdown(&mut self) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, resource: &Resource) {
let mut exported_resource = self.exported_resource.lock().unwrap();
*exported_resource = Some(resource.clone());
}
}
#[test]
fn batchspanprocessor_handles_on_end() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_max_queue_size(10)
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_secs(5))
.build();
let processor = BatchSpanProcessor::new(exporter, config);
let test_span = create_test_span("test_span");
processor.on_end(test_span.clone());
std::thread::sleep(Duration::from_secs(6));
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1);
assert_eq!(exported_spans[0].name, "test_span");
}
#[test]
fn batchspanprocessor_force_flush() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
.with_max_queue_size(10)
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_secs(5))
.build();
let processor = BatchSpanProcessor::new(exporter, config);
let test_span = create_test_span("force_flush_span");
processor.on_end(test_span.clone());
let flush_result = processor.force_flush();
assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(
exported_spans.len(),
1,
"Unexpected number of exported spans"
);
assert_eq!(exported_spans[0].name, "force_flush_span");
}
#[test]
fn batchspanprocessor_shutdown() {
let exporter = InMemorySpanExporterBuilder::new()
.keep_records_on_shutdown()
.build();
let processor = BatchSpanProcessor::new(exporter.clone(), BatchConfig::default());
let record = create_test_span("test_span");
processor.on_end(record);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
processor.on_end(create_test_span("after_shutdown_span"));
assert_eq!(1, exporter.get_finished_spans().unwrap().len());
assert!(exporter.is_shutdown_called());
}
#[test]
fn batchspanprocessor_handles_dropped_spans() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
.with_max_queue_size(2) .with_max_export_batch_size(512) .with_scheduled_delay(Duration::from_secs(5))
.build();
let processor = BatchSpanProcessor::new(exporter, config);
let span1 = create_test_span("span1");
let span2 = create_test_span("span2");
let span3 = create_test_span("span3");
processor.on_end(span1.clone());
processor.on_end(span2.clone());
processor.on_end(span3.clone());
std::thread::sleep(Duration::from_secs(6));
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(
exported_spans.len(),
2,
"Unexpected number of exported spans"
);
assert!(exported_spans.iter().any(|s| s.name == "span1"));
assert!(exported_spans.iter().any(|s| s.name == "span2"));
assert!(
!exported_spans.iter().any(|s| s.name == "span3"),
"Span3 should have been dropped"
);
let dropped_count = processor.dropped_spans_count.load(Ordering::Relaxed);
assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed);
assert_eq!(current_batch_size, 0, "Unexpected current batch size");
}
#[test]
fn validate_span_attributes_exported_correctly() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default().build();
let processor = BatchSpanProcessor::new(exporter, config);
let mut span_data = create_test_span("attribute_validation");
span_data.attributes = vec![
KeyValue::new("key1", "value1"),
KeyValue::new("key2", "value2"),
];
processor.on_end(span_data.clone());
let _ = processor.force_flush();
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1);
let exported_span = &exported_spans[0];
assert!(exported_span
.attributes
.contains(&KeyValue::new("key1", "value1")));
assert!(exported_span
.attributes
.contains(&KeyValue::new("key2", "value2")));
}
#[test]
fn batchspanprocessor_sets_and_exports_with_resource() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let resource_shared = exporter.exported_resource.clone();
let config = BatchConfigBuilder::default().build();
let mut processor = BatchSpanProcessor::new(exporter, config);
let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
processor.set_resource(&resource);
let test_span = create_test_span("resource_test");
processor.on_end(test_span.clone());
let _ = processor.force_flush();
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1);
let exported_resource = resource_shared.lock().unwrap();
assert!(exported_resource.is_some());
assert_eq!(
exported_resource
.as_ref()
.unwrap()
.get(&Key::new("service.name")),
Some(Value::from("test_service"))
);
}
#[tokio::test(flavor = "current_thread")]
async fn test_batch_processor_current_thread_runtime() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_max_queue_size(5)
.with_max_export_batch_size(3)
.build();
let processor = BatchSpanProcessor::new(exporter, config);
for _ in 0..4 {
let span = new_test_export_span_data();
processor.on_end(span);
}
processor.force_flush().unwrap();
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 4);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_batch_processor_multi_thread_count_1_runtime() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_max_queue_size(5)
.with_max_export_batch_size(3)
.build();
let processor = BatchSpanProcessor::new(exporter, config);
for _ in 0..4 {
let span = new_test_export_span_data();
processor.on_end(span);
}
processor.force_flush().unwrap();
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 4);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_batch_processor_multi_thread() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_max_queue_size(20)
.with_max_export_batch_size(5)
.build();
let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
let mut handles = vec![];
for _ in 0..10 {
let processor_clone = Arc::clone(&processor);
let handle = tokio::spawn(async move {
let span = new_test_export_span_data();
processor_clone.on_end(span);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
processor.force_flush().unwrap();
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 10);
}
}