use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::trace::runtime::{TraceRuntime, TrySend};
use crate::trace::Span;
use futures_channel::oneshot;
use futures_util::{
future::{self, BoxFuture, Either},
select,
stream::{self, FusedStream, FuturesUnordered},
Stream, StreamExt as _,
};
use opentelemetry_api::global;
use opentelemetry_api::{
trace::{TraceError, TraceResult},
Context,
};
use std::{env, fmt, str::FromStr, thread, time::Duration};
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn on_start(&self, span: &mut Span, cx: &Context);
fn on_end(&self, span: SpanData);
fn force_flush(&self) -> TraceResult<()>;
fn shutdown(&mut self) -> TraceResult<()>;
}
#[derive(Debug)]
pub struct SimpleSpanProcessor {
sender: crossbeam_channel::Sender<Option<SpanData>>,
shutdown: crossbeam_channel::Receiver<()>,
}
impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(Some(span)) = span_rx.recv() {
if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) {
global::handle_error(err);
}
}
exporter.shutdown();
if let Err(err) = shutdown_tx.send(()) {
global::handle_error(TraceError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});
SimpleSpanProcessor {
sender: span_tx,
shutdown: shutdown_rx,
}
}
}
impl SpanProcessor for SimpleSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if !span.span_context.is_sampled() {
return;
}
if let Err(err) = self.sender.send(Some(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
}
}
fn force_flush(&self) -> TraceResult<()> {
Ok(())
}
fn shutdown(&mut self) -> TraceResult<()> {
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(TraceError::from(format!(
"error shutting down span processor: {:?}",
err
)))
}
}
Ok(())
}
}
pub struct BatchSpanProcessor<R: TraceRuntime> {
message_sender: R::Sender,
}
impl<R: TraceRuntime> fmt::Debug for BatchSpanProcessor<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchSpanProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}
impl<R: TraceRuntime> SpanProcessor for BatchSpanProcessor<R> {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if !span.span_context.is_sampled() {
return;
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
if let Err(err) = result {
global::handle_error(err);
}
}
fn force_flush(&self) -> TraceResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))?;
futures_executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
fn shutdown(&mut self) -> TraceResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))?;
futures_executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BatchMessage {
ExportSpan(SpanData),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
}
struct BatchSpanProcessorInternal<R> {
spans: Vec<SpanData>,
export_tasks: FuturesUnordered<BoxFuture<'static, ExportResult>>,
runtime: R,
exporter: Box<dyn SpanExporter>,
config: BatchConfig,
}
impl<R: TraceRuntime> BatchSpanProcessorInternal<R> {
async fn flush(&mut self, res_channel: Option<oneshot::Sender<ExportResult>>) {
let export_task = self.export();
let task = Box::pin(async move {
let result = export_task.await;
if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send flush result: {:?}",
result
)));
}
} else if let Err(err) = result {
global::handle_error(err);
}
Ok(())
});
if self.config.max_concurrent_exports == 1 {
let _ = task.await;
} else {
self.export_tasks.push(task);
while self.export_tasks.next().await.is_some() {}
}
}
async fn process_message(&mut self, message: BatchMessage) -> bool {
match message {
BatchMessage::ExportSpan(span) => {
self.spans.push(span);
if self.spans.len() == self.config.max_export_batch_size {
if !self.export_tasks.is_empty()
&& self.export_tasks.len() == self.config.max_concurrent_exports
{
self.export_tasks.next().await;
}
let export_task = self.export();
let task = async move {
if let Err(err) = export_task.await {
global::handle_error(err);
}
Ok(())
};
if self.config.max_concurrent_exports == 1 {
let _ = task.await;
} else {
self.export_tasks.push(Box::pin(task));
}
}
}
BatchMessage::Flush(res_channel) => {
self.flush(res_channel).await;
}
BatchMessage::Shutdown(ch) => {
self.flush(Some(ch)).await;
self.exporter.shutdown();
return false;
}
}
true
}
fn export(&mut self) -> BoxFuture<'static, ExportResult> {
if self.spans.is_empty() {
return Box::pin(future::ready(Ok(())));
}
let export = self.exporter.export(self.spans.split_off(0));
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;
Box::pin(async move {
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
}
})
}
async fn run(mut self, mut messages: impl Stream<Item = BatchMessage> + Unpin + FusedStream) {
loop {
select! {
_ = self.export_tasks.next() => {
},
message = messages.next() => {
match message {
Some(message) => {
if !self.process_message(message).await {
break;
}
},
None => break,
}
},
}
}
}
}
impl<R: TraceRuntime> BatchSpanProcessor<R> {
pub(crate) fn new(exporter: Box<dyn SpanExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let ticker = runtime
.interval(config.scheduled_delay)
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = runtime.clone();
let messages = Box::pin(stream::select(message_receiver, ticker));
let processor = BatchSpanProcessorInternal {
spans: Vec::new(),
export_tasks: FuturesUnordered::new(),
runtime: timeout_runtime,
config,
exporter,
};
runtime.spawn(Box::pin(processor.run(messages)));
BatchSpanProcessor { message_sender }
}
pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
where
E: SpanExporter,
{
BatchSpanProcessorBuilder {
exporter,
config: BatchConfig::default(),
runtime,
}
}
}
#[derive(Debug)]
pub struct BatchConfig {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
max_concurrent_exports: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
let mut config = BatchConfig {
max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
};
if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
.ok()
.and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
{
config.max_concurrent_exports = max_concurrent_exports;
}
if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
.ok()
.and_then(|queue_size| usize::from_str(&queue_size).ok())
{
config.max_queue_size = max_queue_size;
}
if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
.ok()
.or_else(|| env::var("OTEL_BSP_SCHEDULE_DELAY_MILLIS").ok())
.and_then(|delay| u64::from_str(&delay).ok())
{
config.scheduled_delay = Duration::from_millis(scheduled_delay);
}
if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
.ok()
.and_then(|batch_size| usize::from_str(&batch_size).ok())
{
config.max_export_batch_size = max_export_batch_size;
}
if config.max_export_batch_size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
}
if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
.ok()
.or_else(|| env::var("OTEL_BSP_EXPORT_TIMEOUT_MILLIS").ok())
.and_then(|timeout| u64::from_str(&timeout).ok())
{
config.max_export_timeout = Duration::from_millis(max_export_timeout);
}
config
}
}
impl BatchConfig {
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}
pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
self.max_concurrent_exports = max_concurrent_exports;
self
}
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
}
#[derive(Debug)]
pub struct BatchSpanProcessorBuilder<E, R> {
exporter: E,
config: BatchConfig,
runtime: R,
}
impl<E, R> BatchSpanProcessorBuilder<E, R>
where
E: SpanExporter + 'static,
R: TraceRuntime,
{
pub fn with_max_queue_size(self, size: usize) -> Self {
let mut config = self.config;
config.max_queue_size = size;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_scheduled_delay(self, delay: Duration) -> Self {
let mut config = self.config;
config.scheduled_delay = delay;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_max_timeout(self, timeout: Duration) -> Self {
let mut config = self.config;
config.max_export_timeout = timeout;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_max_export_batch_size(self, size: usize) -> Self {
let mut config = self.config;
if size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
} else {
config.max_export_batch_size = size;
}
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_max_concurrent_exports(self, max: usize) -> Self {
let mut config = self.config;
config.max_concurrent_exports = max;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchSpanProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchSpanProcessor<R> {
BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
}
}
#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
};
use crate::export::trace::{stdout, ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use crate::trace::{BatchConfig, EvictedHashMap, EvictedQueue};
use async_trait::async_trait;
use opentelemetry_api::trace::{SpanContext, SpanId, SpanKind, Status};
use std::fmt::Debug;
use std::future::Future;
use std::time::Duration;
#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
let _result = processor.shutdown();
}
#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: "opentelemetry".into(),
start_time: opentelemetry_api::time::now(),
end_time: opentelemetry_api::time::now(),
attributes: EvictedHashMap::new(0, 0),
events: EvictedQueue::new(0),
links: EvictedQueue::new(0),
status: Status::Unset,
resource: Default::default(),
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
}
#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
}
#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfig::default()
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_millis(10))
.with_max_export_timeout(Duration::from_millis(10))
.with_max_concurrent_exports(10)
.with_max_queue_size(10);
assert_eq!(batch.max_export_batch_size, 10);
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_queue_size, 10);
}
#[test]
fn test_build_batch_span_processor_builder() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT, "2046");
std::env::set_var(OTEL_BSP_SCHEDULE_DELAY, "I am not number");
let mut builder = BatchSpanProcessor::builder(
stdout::Exporter::new(std::io::stdout(), true),
runtime::Tokio,
);
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120");
builder = BatchSpanProcessor::builder(
stdout::Exporter::new(std::io::stdout(), true),
runtime::Tokio,
);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
}
#[tokio::test]
async fn test_batch_span_processor() {
let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
let config = BatchConfig {
scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
};
let mut processor =
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
let handle = tokio::spawn(async move {
loop {
if let Some(span) = export_receiver.recv().await {
assert_eq!(span.span_context, new_test_export_span_data().span_context);
break;
}
}
});
tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
assert!(flush_res.is_ok());
let _shutdown_result = processor.shutdown();
assert!(
tokio::time::timeout(Duration::from_secs(5), handle)
.await
.is_ok(),
"timed out in 5 seconds. force_flush may not export any data when called"
);
}
struct BlockingExporter<D> {
delay_for: Duration,
delay_fn: D,
}
impl<D, DS> Debug for BlockingExporter<D>
where
D: Fn(Duration) -> DS + 'static + Send + Sync,
DS: Future<Output = ()> + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("blocking exporter for testing")
}
}
#[async_trait]
impl<D, DS> SpanExporter for BlockingExporter<D>
where
D: Fn(Duration) -> DS + 'static + Send + Sync,
DS: Future<Output = ()> + Send + Sync + 'static,
{
fn export(
&mut self,
_batch: Vec<SpanData>,
) -> futures_util::future::BoxFuture<'static, ExportResult> {
use futures_util::FutureExt;
Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
}
}
#[test]
fn test_timeout_tokio_timeout() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(true));
}
#[test]
fn test_timeout_tokio_not_timeout() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(false));
}
#[test]
#[cfg(feature = "rt-async-std")]
fn test_timeout_async_std_timeout() {
async_std::task::block_on(timeout_test_std_async(true));
}
#[test]
#[cfg(feature = "rt-async-std")]
fn test_timeout_async_std_not_timeout() {
async_std::task::block_on(timeout_test_std_async(false));
}
#[cfg(feature = "rt-async-std")]
async fn timeout_test_std_async(time_out: bool) {
let config = BatchConfig {
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
};
let exporter = BlockingExporter {
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: async_std::task::sleep,
};
let mut processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
} else {
assert!(flush_res.is_ok());
}
let shutdown_res = processor.shutdown();
assert!(shutdown_res.is_ok());
}
async fn timeout_test_tokio(time_out: bool) {
let config = BatchConfig {
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
};
let exporter = BlockingExporter {
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: tokio::time::sleep,
};
let mut processor =
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
} else {
assert!(flush_res.is_ok());
}
let shutdown_res = processor.shutdown();
assert!(shutdown_res.is_ok());
}
}