use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde_json::Value;
use tracing::warn;
use super::events::TraceEvent;
pub trait Processor: Send + Sync {
fn name(&self) -> &'static str {
"unnamed"
}
fn process(&self, events: Vec<TraceEvent>) -> Vec<TraceEvent>;
}
#[async_trait]
pub trait Exporter: Send + Sync {
fn name(&self) -> &'static str {
"unnamed"
}
async fn export(&self, events: Vec<TraceEvent>, request_id: String, metadata: ExportMetadata);
}
pub trait FlushStrategy: Send + Sync {
fn should_flush(&self, event: &TraceEvent, buffered: usize) -> bool;
}
pub struct AtScheduledExit;
impl FlushStrategy for AtScheduledExit {
fn should_flush(&self, _event: &TraceEvent, _buffered: usize) -> bool {
false
}
}
pub struct FlushOnSize {
pub max_events: usize,
}
impl FlushStrategy for FlushOnSize {
fn should_flush(&self, _event: &TraceEvent, buffered: usize) -> bool {
buffered >= self.max_events
}
}
#[derive(Debug, Clone, Default)]
pub struct ExportMetadata {
pub workflow_name: Option<String>,
pub user_id: Option<String>,
pub session_id: Option<String>,
pub tags: Vec<String>,
pub partial: bool,
}
pub struct TracePipeline {
processors: Vec<Arc<dyn Processor>>,
exporters: Vec<Arc<dyn Exporter>>,
flush_strategy: Arc<dyn FlushStrategy>,
max_buffered_events: usize,
buffer: Mutex<Vec<TraceEvent>>,
overflow_warned: Mutex<bool>,
metadata: Mutex<ExportMetadata>,
}
impl TracePipeline {
pub fn new() -> Self {
Self {
processors: Vec::new(),
exporters: Vec::new(),
flush_strategy: Arc::new(AtScheduledExit),
max_buffered_events: 100_000,
buffer: Mutex::new(Vec::new()),
overflow_warned: Mutex::new(false),
metadata: Mutex::new(ExportMetadata::default()),
}
}
pub fn builder() -> TracePipelineBuilder {
TracePipelineBuilder::default()
}
pub fn set_metadata(&self, meta: ExportMetadata) {
*self.metadata.lock() = meta;
}
pub fn push(&self, event: TraceEvent) {
let mut buf = self.buffer.lock();
buf.push(event.clone());
if buf.len() > self.max_buffered_events {
let drop_count = buf.len() - self.max_buffered_events;
buf.drain(..drop_count);
let mut warned = self.overflow_warned.lock();
if !*warned {
warn!(
"trace buffer overflow at request_id={} โ dropped {} oldest events \
(max_buffered_events={}). Consider FlushOnSize for long-running calls.",
event.request_id, drop_count, self.max_buffered_events
);
*warned = true;
}
}
let buffered_len = buf.len();
let should_flush = self.flush_strategy.should_flush(&event, buffered_len);
drop(buf);
if should_flush {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let pipeline = self as *const Self;
let _ = pipeline;
let _ = handle;
}
}
}
#[doc(hidden)]
pub fn drain(&self) -> Vec<TraceEvent> {
std::mem::take(&mut *self.buffer.lock())
}
pub fn buffered_count(&self) -> usize {
self.buffer.lock().len()
}
pub async fn flush(self: &Arc<Self>, partial: bool) {
let events = self.drain();
if events.is_empty() {
return;
}
let mut stream = events;
for proc in &self.processors {
stream = proc.process(stream);
if stream.is_empty() {
return;
}
}
let request_id = stream
.first()
.map(|e| e.request_id.clone())
.unwrap_or_default();
let mut meta = self.metadata.lock().clone();
meta.partial = partial;
for exp in &self.exporters {
let evs = stream.clone();
exp.export(evs, request_id.clone(), meta.clone()).await;
}
}
pub fn flush_in_background(self: Arc<Self>, partial: bool) {
tokio::spawn(async move {
self.flush(partial).await;
});
}
}
#[derive(Default)]
pub struct TracePipelineBuilder {
processors: Vec<Arc<dyn Processor>>,
exporters: Vec<Arc<dyn Exporter>>,
flush_strategy: Option<Arc<dyn FlushStrategy>>,
max_buffered_events: Option<usize>,
}
impl TracePipelineBuilder {
pub fn processor(mut self, p: Arc<dyn Processor>) -> Self {
self.processors.push(p);
self
}
pub fn exporter(mut self, e: Arc<dyn Exporter>) -> Self {
self.exporters.push(e);
self
}
pub fn flush_strategy(mut self, s: Arc<dyn FlushStrategy>) -> Self {
self.flush_strategy = Some(s);
self
}
pub fn max_buffered_events(mut self, n: usize) -> Self {
self.max_buffered_events = Some(n);
self
}
pub fn build(self) -> TracePipeline {
TracePipeline {
processors: self.processors,
exporters: self.exporters,
flush_strategy: self
.flush_strategy
.unwrap_or_else(|| Arc::new(AtScheduledExit)),
max_buffered_events: self.max_buffered_events.unwrap_or(100_000),
buffer: Mutex::new(Vec::new()),
overflow_warned: Mutex::new(false),
metadata: Mutex::new(ExportMetadata::default()),
}
}
}
#[allow(dead_code)]
fn _values_kept_for_extras_extension() -> Value {
Value::Null
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::tracing::events::{EventKind, TraceEvent};
use chrono::Utc;
use std::collections::BTreeMap;
fn ev(seq: u64) -> TraceEvent {
TraceEvent {
event_id: format!("e{seq}"),
request_id: "r1".into(),
kind: EventKind::OpStart,
op_name: Some("op".into()),
ctx: vec!["main".into()],
timestamp: Utc::now(),
seq,
payload: BTreeMap::new(),
}
}
#[test]
fn push_appends_to_buffer() {
let p = TracePipeline::new();
p.push(ev(0));
p.push(ev(1));
assert_eq!(p.buffered_count(), 2);
}
#[test]
fn drain_resets_buffer() {
let p = TracePipeline::new();
p.push(ev(0));
let drained = p.drain();
assert_eq!(drained.len(), 1);
assert_eq!(p.buffered_count(), 0);
}
struct PassThroughProc;
impl Processor for PassThroughProc {
fn process(&self, events: Vec<TraceEvent>) -> Vec<TraceEvent> {
events
}
}
struct CountingExporter {
count: Arc<Mutex<u32>>,
}
#[async_trait]
impl Exporter for CountingExporter {
async fn export(
&self,
events: Vec<TraceEvent>,
_request_id: String,
_metadata: ExportMetadata,
) {
*self.count.lock() += events.len() as u32;
}
}
#[tokio::test]
async fn flush_runs_processors_then_exporters() {
let count = Arc::new(Mutex::new(0u32));
let pipeline = Arc::new(
TracePipeline::builder()
.processor(Arc::new(PassThroughProc))
.exporter(Arc::new(CountingExporter {
count: count.clone(),
}))
.build(),
);
pipeline.push(ev(0));
pipeline.push(ev(1));
pipeline.flush(false).await;
assert_eq!(*count.lock(), 2);
assert_eq!(pipeline.buffered_count(), 0);
}
}