use async_trait::async_trait;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex as AsyncMutex};
use tokio::time::{interval, timeout, Duration};
use super::span::{Span, SpanExporter};
use crate::errors::ModuleError;
#[async_trait]
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
async fn on_span_end(&self, span: Span);
async fn shutdown(&self) -> Result<(), ModuleError>;
}
#[derive(Debug)]
pub struct SimpleSpanProcessor {
exporter: Arc<dyn SpanExporter>,
}
impl SimpleSpanProcessor {
#[must_use]
pub fn new(exporter: Arc<dyn SpanExporter>) -> Self {
Self { exporter }
}
}
#[async_trait]
impl SpanProcessor for SimpleSpanProcessor {
async fn on_span_end(&self, span: Span) {
if let Err(e) = self.exporter.export(&span).await {
tracing::warn!(error = %e.message, "SimpleSpanProcessor: export failed");
}
}
async fn shutdown(&self) -> Result<(), ModuleError> {
self.exporter.shutdown().await
}
}
#[derive(Debug, Clone, Copy)]
pub struct BatchSpanProcessorConfig {
pub max_queue_size: usize,
pub schedule_delay_ms: u64,
pub max_export_batch_size: usize,
pub export_timeout_ms: u64,
}
impl Default for BatchSpanProcessorConfig {
fn default() -> Self {
Self {
max_queue_size: 2048,
schedule_delay_ms: 5000,
max_export_batch_size: 512,
export_timeout_ms: 30000,
}
}
}
#[derive(Debug)]
pub struct BatchSpanProcessorBuilder {
exporter: Arc<dyn SpanExporter>,
config: BatchSpanProcessorConfig,
}
impl BatchSpanProcessorBuilder {
#[must_use]
pub fn new(exporter: Arc<dyn SpanExporter>) -> Self {
Self {
exporter,
config: BatchSpanProcessorConfig::default(),
}
}
#[must_use]
pub fn max_queue_size(mut self, n: usize) -> Self {
self.config.max_queue_size = n;
self
}
#[must_use]
pub fn schedule_delay_ms(mut self, ms: u64) -> Self {
self.config.schedule_delay_ms = ms;
self
}
#[must_use]
pub fn max_export_batch_size(mut self, n: usize) -> Self {
self.config.max_export_batch_size = n;
self
}
#[must_use]
pub fn export_timeout_ms(mut self, ms: u64) -> Self {
self.config.export_timeout_ms = ms;
self
}
#[must_use]
pub fn build(self) -> BatchSpanProcessor {
BatchSpanProcessor::new(self.exporter, self.config)
}
}
#[derive(Debug)]
struct BatchInner {
rx: AsyncMutex<mpsc::Receiver<Span>>,
queue_size: AtomicU64,
spans_dropped: AtomicU64,
config: BatchSpanProcessorConfig,
exporter: Arc<dyn SpanExporter>,
shutdown_tx: AsyncMutex<Option<mpsc::Sender<()>>>,
shutdown_signalled: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct BatchSpanProcessor {
tx: mpsc::Sender<Span>,
inner: Arc<BatchInner>,
}
impl BatchSpanProcessor {
#[must_use]
pub fn builder(exporter: Arc<dyn SpanExporter>) -> BatchSpanProcessorBuilder {
BatchSpanProcessorBuilder::new(exporter)
}
fn new(exporter: Arc<dyn SpanExporter>, config: BatchSpanProcessorConfig) -> Self {
let (tx, rx) = mpsc::channel::<Span>(config.max_queue_size);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let inner = Arc::new(BatchInner {
rx: AsyncMutex::new(rx),
queue_size: AtomicU64::new(0),
spans_dropped: AtomicU64::new(0),
config,
exporter,
shutdown_tx: AsyncMutex::new(Some(shutdown_tx)),
shutdown_signalled: AtomicBool::new(false),
});
let bg_inner = inner.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let mut tick = interval(Duration::from_millis(bg_inner.config.schedule_delay_ms));
tick.tick().await;
loop {
tokio::select! {
_ = tick.tick() => {
let closed = flush_batch(&bg_inner).await;
if closed && bg_inner.queue_size.load(Ordering::Relaxed) == 0 {
break;
}
}
_ = shutdown_rx.recv() => {
flush_batch(&bg_inner).await;
break;
}
}
}
});
} else {
tracing::warn!(
"BatchSpanProcessor constructed outside a tokio runtime; \
spans will not be flushed until shutdown() is awaited inside one"
);
}
Self { tx, inner }
}
#[must_use]
pub fn queue_size(&self) -> usize {
usize::try_from(self.inner.queue_size.load(Ordering::Relaxed)).unwrap_or(usize::MAX)
}
#[must_use]
pub fn spans_dropped(&self) -> u64 {
self.inner.spans_dropped.load(Ordering::Relaxed)
}
#[must_use]
pub fn max_queue_size(&self) -> usize {
self.inner.config.max_queue_size
}
}
async fn flush_batch(inner: &BatchInner) -> bool {
let mut rx = inner.rx.lock().await;
let mut batch: Vec<Span> = Vec::with_capacity(inner.config.max_export_batch_size);
let mut closed = false;
while batch.len() < inner.config.max_export_batch_size {
match rx.try_recv() {
Ok(span) => {
inner.queue_size.fetch_sub(1, Ordering::Relaxed);
batch.push(span);
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => {
closed = true;
break;
}
}
}
drop(rx);
for span in batch {
if let Err(e) = inner.exporter.export(&span).await {
tracing::warn!(error = %e.message, "BatchSpanProcessor: export failed");
}
}
closed
}
#[async_trait]
impl SpanProcessor for BatchSpanProcessor {
async fn on_span_end(&self, span: Span) {
match self.tx.try_send(span) {
Ok(()) => {
self.inner.queue_size.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
self.inner.spans_dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
async fn shutdown(&self) -> Result<(), ModuleError> {
self.inner.shutdown_signalled.store(true, Ordering::SeqCst);
let mut guard = self.inner.shutdown_tx.lock().await;
if let Some(tx) = guard.take() {
let _ = tx.send(()).await;
}
drop(guard);
let timeout_dur = Duration::from_millis(self.inner.config.export_timeout_ms);
let inner = self.inner.clone();
let _ = timeout(timeout_dur, async move {
flush_batch(&inner).await;
})
.await;
self.inner.exporter.shutdown().await
}
}
#[async_trait]
impl SpanExporter for SimpleSpanProcessor {
async fn export(&self, span: &Span) -> Result<(), ModuleError> {
self.exporter.export(span).await
}
async fn shutdown(&self) -> Result<(), ModuleError> {
<Self as SpanProcessor>::shutdown(self).await
}
}
#[async_trait]
impl SpanExporter for BatchSpanProcessor {
async fn export(&self, span: &Span) -> Result<(), ModuleError> {
<Self as SpanProcessor>::on_span_end(self, span.clone()).await;
Ok(())
}
async fn shutdown(&self) -> Result<(), ModuleError> {
<Self as SpanProcessor>::shutdown(self).await
}
}
impl Drop for BatchSpanProcessor {
fn drop(&mut self) {
if self.inner.shutdown_signalled.swap(true, Ordering::SeqCst) {
return;
}
if let Ok(mut guard) = self.inner.shutdown_tx.try_lock() {
if let Some(tx) = guard.take() {
let _ = tx.try_send(());
}
}
}
}