use crate::constants::{defaults, env_vars};
use crate::logger::Logger;
use bon::bon;
static LOGGER: Logger = Logger::const_new("processor");
use opentelemetry::Context;
use opentelemetry_sdk::{
error::{OTelSdkError, OTelSdkResult},
trace::{Span, SpanProcessor},
trace::{SpanData, SpanExporter},
Resource,
};
use std::env;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
};
#[derive(Debug)]
struct SpanRingBuffer {
buffer: Vec<Option<SpanData>>,
head: usize, tail: usize, size: usize, capacity: usize,
}
impl Default for SpanRingBuffer {
fn default() -> Self {
Self::new(2048) }
}
impl SpanRingBuffer {
fn new(capacity: usize) -> Self {
let mut buffer = Vec::with_capacity(capacity);
buffer.extend((0..capacity).map(|_| None));
Self {
buffer,
head: 0,
tail: 0,
size: 0,
capacity,
}
}
fn push(&mut self, span: SpanData) -> bool {
if self.size == self.capacity {
return false;
}
self.buffer[self.head] = Some(span);
self.head = (self.head + 1) % self.capacity;
self.size += 1;
true
}
fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
let batch_size = self.size.min(max_batch_size);
let mut result = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(span) = self.buffer[self.tail].take() {
result.push(span);
}
self.tail = (self.tail + 1) % self.capacity;
self.size -= 1;
}
if self.size == 0 {
self.head = 0;
self.tail = 0;
}
result
}
}
#[derive(Debug)]
pub struct LambdaSpanProcessor<E>
where
E: SpanExporter + std::fmt::Debug,
{
exporter: Mutex<E>,
spans: Mutex<SpanRingBuffer>,
is_shutdown: Arc<AtomicBool>,
dropped_count: AtomicUsize,
}
#[bon]
impl<E> LambdaSpanProcessor<E>
where
E: SpanExporter + std::fmt::Debug,
{
#[builder]
pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
Ok(value) => match value.parse::<usize>() {
Ok(size) => size,
Err(_) => {
LOGGER.warn(format!(
"Failed to parse {}: {}, using fallback",
env_vars::QUEUE_SIZE,
value
));
max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
}
},
Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
};
Self {
exporter: Mutex::new(exporter),
spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
is_shutdown: Arc::new(AtomicBool::new(false)),
dropped_count: AtomicUsize::new(0),
}
}
}
impl<E> SpanProcessor for LambdaSpanProcessor<E>
where
E: SpanExporter + std::fmt::Debug,
{
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if self.is_shutdown.load(Ordering::Relaxed) {
LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
self.dropped_count.fetch_add(1, Ordering::Relaxed);
return;
}
if !span.span_context.is_sampled() {
return;
}
if let Ok(mut spans) = self.spans.lock() {
if !spans.push(span) {
let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
if prev == 0 || prev % 100 == 0 {
LOGGER.warn(format!(
"LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
prev + 1
));
}
}
} else {
LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
}
}
fn force_flush(&self) -> OTelSdkResult {
LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
let spans_result = self.spans.lock();
let all_spans = match spans_result {
Ok(mut spans) => {
let current_size = spans.size;
spans.take_batch(current_size)
}
Err(_) => {
return Err(OTelSdkError::InternalFailure(
"Failed to acquire spans lock in force_flush".to_string(),
));
}
};
let exporter_result = self.exporter.lock();
match exporter_result {
Ok(exporter) => {
let result = futures_executor::block_on(exporter.export(all_spans));
if let Err(ref err) = result {
LOGGER.debug(format!(
"LambdaSpanProcessor.force_flush export error: {err:?}"
));
}
result
}
Err(_) => {
Err(OTelSdkError::InternalFailure(
"Failed to acquire exporter lock in force_flush".to_string(),
))
}
}
}
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::Relaxed);
self.force_flush()?;
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
"Failed to acquire exporter lock in shutdown".to_string(),
))
}
}
fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::Relaxed);
self.force_flush()?;
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
"Failed to acquire exporter lock in shutdown_with_timeout".to_string(),
))
}
}
fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logger::Logger;
use opentelemetry::{
trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
InstrumentationScope,
};
use opentelemetry_sdk::{
trace::SpanExporter,
trace::{SpanEvents, SpanLinks},
};
use serial_test::serial;
use std::{borrow::Cow, sync::Arc};
use tokio::sync::Mutex;
fn setup_test_logger() -> Logger {
Logger::new("test")
}
#[derive(Debug)]
struct MockExporter {
spans: Arc<Mutex<Vec<SpanData>>>,
}
impl MockExporter {
fn new() -> Self {
Self {
spans: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl SpanExporter for MockExporter {
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
{
let spans = self.spans.clone();
Box::pin(async move {
let mut spans = spans.lock().await;
spans.extend(batch);
Ok(())
})
}
fn shutdown(&mut self) -> OTelSdkResult {
Ok(())
}
}
fn create_test_span(name: &str) -> SpanData {
let flags = TraceFlags::default().with_sampled(true);
SpanData {
span_context: SpanContext::new(
TraceId::from_hex("01000000000000000000000000000000").unwrap(),
SpanId::from_hex("0100000000000001").unwrap(),
flags,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
parent_span_is_remote: false,
span_kind: opentelemetry::trace::SpanKind::Internal,
name: Cow::Owned(name.to_string()),
start_time: std::time::SystemTime::now(),
end_time: std::time::SystemTime::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: opentelemetry::trace::Status::default(),
instrumentation_scope: InstrumentationScope::builder("test").build(),
}
}
fn cleanup_env() {
env::remove_var(env_vars::QUEUE_SIZE);
env::remove_var(env_vars::PROCESSOR_MODE);
env::remove_var(env_vars::COMPRESSION_LEVEL);
env::remove_var(env_vars::SERVICE_NAME);
}
#[test]
#[serial]
fn test_ring_buffer_basic_operations() {
let mut buffer = SpanRingBuffer::new(2);
assert!(buffer.size == 0);
assert_eq!(buffer.take_batch(2), vec![]);
buffer.push(create_test_span("span1"));
buffer.push(create_test_span("span2"));
assert!(buffer.size != 0);
let spans = buffer.take_batch(2);
assert_eq!(spans.len(), 2);
assert!(buffer.size == 0);
}
#[test]
#[serial]
fn test_ring_buffer_overflow() {
let mut buffer = SpanRingBuffer::new(2);
buffer.push(create_test_span("span1"));
buffer.push(create_test_span("span2"));
let success = buffer.push(create_test_span("span3"));
assert!(!success);
let spans = buffer.take_batch(2);
assert_eq!(spans.len(), 2);
assert!(spans.iter().any(|s| s.name == "span1"));
assert!(spans.iter().any(|s| s.name == "span2"));
}
#[test]
#[serial]
fn test_ring_buffer_batch_operations() {
let mut buffer = SpanRingBuffer::new(5);
for i in 0..5 {
buffer.push(create_test_span(&format!("span{i}")));
}
assert_eq!(buffer.take_batch(2).len(), 2);
assert_eq!(buffer.take_batch(2).len(), 2);
assert_eq!(buffer.take_batch(2).len(), 1);
assert!(buffer.size == 0);
}
#[tokio::test]
#[serial]
async fn test_processor_sync_mode() {
let _logger = setup_test_logger();
let mock_exporter = MockExporter::new();
let spans_exported = mock_exporter.spans.clone();
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.max_queue_size(10)
.build();
processor.on_end(create_test_span("test_span"));
processor.force_flush().unwrap();
let exported = spans_exported.lock().await;
assert_eq!(exported.len(), 1);
assert_eq!(exported[0].name, "test_span");
}
#[tokio::test]
#[serial]
async fn test_shutdown_exports_remaining_spans() {
let _logger = setup_test_logger();
let mock_exporter = MockExporter::new();
let spans_exported = mock_exporter.spans.clone();
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.max_queue_size(10)
.build();
processor.on_end(create_test_span("span1"));
processor.on_end(create_test_span("span2"));
processor.shutdown().unwrap();
let exported = spans_exported.lock().await;
assert_eq!(exported.len(), 2);
processor.on_end(create_test_span("span3"));
assert_eq!(exported.len(), 2); }
#[tokio::test]
#[serial]
async fn test_concurrent_span_processing() {
let _logger = setup_test_logger();
let mock_exporter = MockExporter::new();
let spans_exported = mock_exporter.spans.clone();
let processor = Arc::new(
LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.max_queue_size(100)
.build(),
);
let mut handles = Vec::new();
for i in 0..10 {
let processor = processor.clone();
handles.push(tokio::spawn(async move {
for j in 0..10 {
processor.on_end(create_test_span(&format!("span_{i}_{j}")));
}
}));
}
for handle in handles {
handle.await.unwrap();
}
processor.force_flush().unwrap();
let exported = spans_exported.lock().await;
assert_eq!(exported.len(), 100);
assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
}
#[test]
#[serial]
fn test_builder_default_values() {
cleanup_env();
let mock_exporter = MockExporter::new();
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.build();
assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
#[test]
#[serial]
fn test_builder_env_var_values() {
cleanup_env();
let mock_exporter = MockExporter::new();
env::set_var(env_vars::QUEUE_SIZE, "1000");
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.build();
assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
cleanup_env();
}
#[test]
#[serial]
fn test_builder_env_var_precedence() {
cleanup_env();
let mock_exporter = MockExporter::new();
env::set_var(env_vars::QUEUE_SIZE, "1000");
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.max_queue_size(500)
.build();
assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
cleanup_env();
}
#[test]
#[serial]
fn test_invalid_env_vars() {
cleanup_env();
let mock_exporter = MockExporter::new();
env::set_var(env_vars::QUEUE_SIZE, "invalid");
let processor = LambdaSpanProcessor::builder()
.exporter(mock_exporter)
.max_queue_size(500)
.build();
assert_eq!(processor.spans.lock().unwrap().capacity, 500);
cleanup_env();
}
}