use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use opentelemetry::{Context, KeyValue, global};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::{Span, SpanData, SpanProcessor};
use crate::error::ExporterKind;
use crate::processors::RateLimiter;
pub(crate) struct RateLimitedSpanProcessor<P> {
inner: P,
limiter: Arc<RateLimiter>,
}
impl<P: Debug> Debug for RateLimitedSpanProcessor<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RateLimitedSpanProcessor")
.field("inner", &self.inner)
.field("max_rate", &self.limiter.max_rate())
.finish()
}
}
impl<P> RateLimitedSpanProcessor<P> {
pub(crate) fn new(inner: P, max_rate: u32, exporter: ExporterKind, instance_id: u64) -> Self {
let meter = global::meter("apollo-opentelemetry");
let counter = meter
.u64_counter("otel.sdk.processor.span.processed")
.with_description("The number of spans for which the processing has finished")
.with_unit("{span}")
.build();
let component_name = format!("rate_limited_span_processor/{}", instance_id);
let attributes = vec![
KeyValue::new("otel.component.type", "rate_limited_span_processor"),
KeyValue::new("otel.component.name", component_name),
];
Self {
inner,
limiter: Arc::new(RateLimiter::new(max_rate, counter, attributes, exporter)),
}
}
}
impl<P: SpanProcessor> SpanProcessor for RateLimitedSpanProcessor<P> {
fn on_start(&self, span: &mut Span, cx: &Context) {
self.inner.on_start(span, cx);
}
fn on_end(&self, span: SpanData) {
if self.limiter.check() {
self.inner.on_end(span);
}
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::SystemTime;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use super::*;
#[derive(Debug, Default)]
struct MockSpanProcessor {
on_end_count: AtomicUsize,
}
impl SpanProcessor for MockSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {}
fn on_end(&self, _span: SpanData) {
self.on_end_count.fetch_add(1, Ordering::SeqCst);
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, _resource: &Resource) {}
}
#[derive(Debug, Clone, Default)]
struct SharedMockSpanProcessor {
inner: Arc<MockSpanProcessor>,
}
impl SpanProcessor for SharedMockSpanProcessor {
fn on_start(&self, span: &mut Span, cx: &Context) {
self.inner.on_start(span, cx);
}
fn on_end(&self, span: SpanData) {
self.inner.on_end(span);
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, _resource: &Resource) {}
}
fn create_test_span_data() -> SpanData {
let span_context = SpanContext::new(
TraceId::from_hex("0102030405060708090a0b0c0d0e0f10").unwrap(),
SpanId::from_hex("0102030405060708").unwrap(),
TraceFlags::SAMPLED,
false,
TraceState::default(),
);
SpanData {
span_context,
parent_span_id: SpanId::INVALID,
parent_span_is_remote: false,
name: Cow::Borrowed("test-span"),
span_kind: SpanKind::Internal,
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: opentelemetry_sdk::trace::SpanEvents::default(),
links: opentelemetry_sdk::trace::SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: opentelemetry::InstrumentationScope::builder("test").build(),
}
}
#[test]
fn spans_within_limit_are_forwarded() {
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock.clone(), 100, ExporterKind::OtlpHttp, 0);
for _ in 0..50 {
processor.on_end(create_test_span_data());
}
assert_eq!(mock.inner.on_end_count.load(Ordering::SeqCst), 50);
}
#[test]
fn spans_over_limit_are_dropped() {
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock.clone(), 1, ExporterKind::OtlpHttp, 0);
for _ in 0..10 {
processor.on_end(create_test_span_data());
}
assert_eq!(mock.inner.on_end_count.load(Ordering::SeqCst), 1);
}
#[test]
fn on_start_delegates_to_inner() {
use opentelemetry::trace::{Tracer, TracerProvider};
use opentelemetry_sdk::trace::SdkTracerProvider;
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let provider = SdkTracerProvider::builder()
.with_span_processor(processor)
.build();
let tracer = provider.tracer("test");
let _span = tracer.start("test-span");
}
#[test]
fn force_flush_delegates_to_inner() {
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let result = processor.force_flush();
assert!(result.is_ok());
}
#[test]
fn shutdown_with_timeout_delegates_to_inner() {
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let result = processor.shutdown_with_timeout(Duration::from_secs(5));
assert!(result.is_ok());
}
#[test]
fn set_resource_delegates_to_inner() {
let mock = SharedMockSpanProcessor::default();
let mut processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let resource = Resource::builder().build();
processor.set_resource(&resource);
}
#[test]
fn debug_format_shows_inner_and_max_rate() {
let mock = SharedMockSpanProcessor::default();
let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let debug_str = format!("{:?}", processor);
assert!(debug_str.contains("RateLimitedSpanProcessor"));
assert!(debug_str.contains("max_rate"));
assert!(debug_str.contains("100"));
}
}