use crate::span_record::SpanRecord;
use crossbeam::queue::ArrayQueue;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
pub struct SpanRingBuffer {
queue: Arc<ArrayQueue<SpanRecord>>,
sidecar_handle: Option<JoinHandle<()>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
total_pushed: Arc<std::sync::atomic::AtomicU64>,
total_dropped: Arc<std::sync::atomic::AtomicU64>,
}
impl SpanRingBuffer {
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "Ring buffer capacity must be > 0");
let queue = Arc::new(ArrayQueue::new(capacity));
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let total_pushed = Arc::new(std::sync::atomic::AtomicU64::new(0));
let total_dropped = Arc::new(std::sync::atomic::AtomicU64::new(0));
let queue_clone = queue.clone();
let shutdown_clone = shutdown.clone();
let sidecar_handle = thread::spawn(move || {
Self::sidecar_worker(queue_clone, shutdown_clone);
});
Self { queue, sidecar_handle: Some(sidecar_handle), shutdown, total_pushed, total_dropped }
}
pub fn push(&self, span: SpanRecord) {
self.total_pushed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match self.queue.push(span) {
Ok(()) => {}
Err(_dropped_span) => {
self.total_dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
eprintln!(
"WARNING: Ring buffer full - span dropped (backpressure). \
Consider increasing capacity or reducing trace volume."
);
}
}
}
pub fn shutdown(mut self) {
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
if let Some(handle) = self.sidecar_handle.take() {
let _ = handle.join();
}
}
pub fn stats(&self) -> BufferStats {
BufferStats {
total_pushed: self.total_pushed.load(std::sync::atomic::Ordering::Relaxed),
total_dropped: self.total_dropped.load(std::sync::atomic::Ordering::Relaxed),
current_size: self.queue.len(),
capacity: self.queue.capacity(),
}
}
fn sidecar_worker(
queue: Arc<ArrayQueue<SpanRecord>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
) {
const BATCH_SIZE: usize = 100;
const SLEEP_MS: u64 = 10;
let mut batch = Vec::with_capacity(BATCH_SIZE);
loop {
if shutdown.load(std::sync::atomic::Ordering::SeqCst) {
while let Some(span) = queue.pop() {
batch.push(span);
if batch.len() >= BATCH_SIZE {
Self::export_batch(&batch);
batch.clear();
}
}
if !batch.is_empty() {
Self::export_batch(&batch);
}
break;
}
while let Some(span) = queue.pop() {
batch.push(span);
if batch.len() >= BATCH_SIZE {
break;
}
}
if batch.is_empty() {
thread::sleep(Duration::from_millis(SLEEP_MS));
} else {
Self::export_batch(&batch);
batch.clear();
}
}
}
fn export_batch(batch: &[SpanRecord]) {
eprintln!("DEBUG: Exporting batch of {} spans", batch.len());
}
}
impl Drop for SpanRingBuffer {
fn drop(&mut self) {
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
if let Some(handle) = self.sidecar_handle.take() {
let _ = handle.join();
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct BufferStats {
pub total_pushed: u64,
pub total_dropped: u64,
pub current_size: usize,
pub capacity: usize,
}
impl BufferStats {
pub fn drop_rate(&self) -> f64 {
if self.total_pushed == 0 {
0.0
} else {
self.total_dropped as f64 / self.total_pushed as f64
}
}
pub fn utilization(&self) -> f64 {
self.current_size as f64 / self.capacity as f64
}
}
static_assertions::assert_impl_all!(BufferStats: Send, Sync);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ring_buffer_creation() {
let buffer = SpanRingBuffer::new(1024);
let stats = buffer.stats();
assert_eq!(stats.capacity, 1024);
assert_eq!(stats.current_size, 0);
assert_eq!(stats.total_pushed, 0);
assert_eq!(stats.total_dropped, 0);
}
#[test]
fn test_push_single_span() {
use crate::span_record::{SpanKind, StatusCode};
use std::collections::HashMap;
let buffer = SpanRingBuffer::new(1024);
let span = SpanRecord::new(
[1; 16],
[2; 8],
None,
"test_span".to_string(),
SpanKind::Internal,
1000,
2000,
1,
StatusCode::Ok,
String::new(),
HashMap::new(),
HashMap::new(),
0,
0,
);
buffer.push(span);
let stats = buffer.stats();
assert_eq!(stats.total_pushed, 1);
assert_eq!(stats.total_dropped, 0);
}
#[test]
#[should_panic(expected = "Ring buffer capacity must be > 0")]
fn test_zero_capacity_panics() {
let _ = SpanRingBuffer::new(0);
}
#[test]
fn test_backpressure_drops_spans() {
use crate::span_record::{SpanKind, StatusCode};
use std::collections::HashMap;
let buffer = SpanRingBuffer::new(2);
for i in 0..10 {
let span = SpanRecord::new(
[i as u8; 16],
[i as u8; 8],
None,
format!("span_{}", i),
SpanKind::Internal,
1000 * i as u64,
2000 * i as u64,
i as u64,
StatusCode::Ok,
String::new(),
HashMap::new(),
HashMap::new(),
0,
0,
);
buffer.push(span);
}
std::thread::sleep(Duration::from_millis(100));
let stats = buffer.stats();
assert_eq!(stats.total_pushed, 10);
assert!(stats.total_dropped > 0 || stats.current_size <= 2);
}
#[test]
fn test_drop_rate_calculation() {
let stats =
BufferStats { total_pushed: 100, total_dropped: 5, current_size: 50, capacity: 1024 };
assert_eq!(stats.drop_rate(), 0.05);
assert_eq!(stats.utilization(), 50.0 / 1024.0);
}
}