use crate::error::OtlpError;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_sdk::trace::SpanData;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct BatchBuffer {
traces: Arc<Mutex<Vec<SpanData>>>,
metrics: Arc<Mutex<Vec<ExportMetricsServiceRequest>>>,
write_interval: Duration,
last_write: Arc<Mutex<std::time::SystemTime>>,
max_trace_size: usize,
max_metric_size: usize,
}
impl BatchBuffer {
pub fn new(write_interval_secs: u64, max_trace_size: usize, max_metric_size: usize) -> Self {
Self {
traces: Arc::new(Mutex::new(Vec::new())),
metrics: Arc::new(Mutex::new(Vec::new())),
write_interval: Duration::from_secs(write_interval_secs),
last_write: Arc::new(Mutex::new(std::time::SystemTime::now())),
max_trace_size,
max_metric_size,
}
}
pub async fn add_trace(&self, span: SpanData) -> Result<(), OtlpError> {
let mut traces = self.traces.lock().await;
if traces.len() >= self.max_trace_size {
return Err(OtlpError::Export(crate::error::OtlpExportError::BufferFull));
}
traces.push(span);
Ok(())
}
pub async fn add_traces(&self, spans: Vec<SpanData>) -> Result<(), OtlpError> {
let mut traces = self.traces.lock().await;
if traces.len() + spans.len() > self.max_trace_size {
return Err(OtlpError::Export(crate::error::OtlpExportError::BufferFull));
}
traces.extend(spans);
Ok(())
}
pub async fn add_metrics_protobuf(
&self,
metrics: ExportMetricsServiceRequest,
) -> Result<(), OtlpError> {
let mut buffered_metrics = self.metrics.lock().await;
if buffered_metrics.len() >= self.max_metric_size {
return Err(OtlpError::Export(crate::error::OtlpExportError::BufferFull));
}
buffered_metrics.push(metrics);
Ok(())
}
pub async fn take_traces(&self) -> Vec<SpanData> {
let mut traces = self.traces.lock().await;
std::mem::take(&mut *traces)
}
pub async fn take_metrics(&self) -> Vec<ExportMetricsServiceRequest> {
let mut metrics = self.metrics.lock().await;
std::mem::take(&mut *metrics)
}
pub async fn should_write(&self) -> bool {
let last_write = self.last_write.lock().await;
if let Ok(elapsed) = last_write.elapsed() {
elapsed >= self.write_interval
} else {
true
}
}
pub async fn update_last_write(&self) {
let mut last_write = self.last_write.lock().await;
*last_write = std::time::SystemTime::now();
}
pub async fn trace_count(&self) -> usize {
let traces = self.traces.lock().await;
traces.len()
}
pub async fn metric_count(&self) -> usize {
let metrics = self.metrics.lock().await;
metrics.len()
}
}