use std::time::Duration;
use opentelemetry::Context;
use opentelemetry::trace::SpanContext;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::Span;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::SpanProcessor;
#[derive(Debug)]
pub(crate) struct DatadogSpanProcessor<T: SpanProcessor> {
delegate: T,
}
impl<T: SpanProcessor> DatadogSpanProcessor<T> {
pub(crate) fn new(delegate: T) -> Self {
Self { delegate }
}
}
impl<T: SpanProcessor> SpanProcessor for DatadogSpanProcessor<T> {
fn on_start(&self, span: &mut Span, cx: &Context) {
self.delegate.on_start(span, cx)
}
fn on_end(&self, mut span: SpanData) {
let new_trace_flags = span.span_context.trace_flags().with_sampled(true);
span.span_context = SpanContext::new(
span.span_context.trace_id(),
span.span_context.span_id(),
new_trace_flags,
span.span_context.is_remote(),
span.span_context.trace_state().clone(),
);
self.delegate.on_end(span)
}
fn force_flush(&self) -> OTelSdkResult {
self.delegate.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.delegate.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.delegate.set_resource(resource)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use opentelemetry::Context;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::SpanKind;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::SpanEvents;
use opentelemetry_sdk::trace::SpanLinks;
use opentelemetry_sdk::trace::SpanProcessor;
use parking_lot::Mutex;
use super::*;
#[derive(Debug, Clone)]
struct MockSpanProcessor {
spans: Arc<Mutex<Vec<SpanData>>>,
}
impl MockSpanProcessor {
fn new() -> Self {
Self {
spans: Default::default(),
}
}
}
impl SpanProcessor for MockSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {}
fn on_end(&self, span: SpanData) {
self.spans.lock().push(span);
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
#[test]
fn test_on_end_updates_trace_flags() {
let mock_processor = MockSpanProcessor::new();
let processor = DatadogSpanProcessor::new(mock_processor.clone());
let span_context = SpanContext::new(
TraceId::from(1),
SpanId::from(1),
TraceFlags::default(),
false,
Default::default(),
);
let span_data = SpanData {
span_context,
parent_span_id: SpanId::from(1),
parent_span_is_remote: false,
span_kind: SpanKind::Client,
name: Default::default(),
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: Vec::with_capacity(32),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Default::default(),
instrumentation_scope: Default::default(),
dropped_attributes_count: 0,
};
processor.on_end(span_data.clone());
let updated_trace_flags = span_data.span_context.trace_flags().with_sampled(true);
let stored_spans = mock_processor.spans.lock();
assert_eq!(stored_spans.len(), 1);
assert_eq!(
stored_spans[0].span_context.trace_flags(),
updated_trace_flags
);
}
}