apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
//! Rate-limited span processor.
//!
//! Provides cost protection by limiting the number of spans processed per second.
//! Spans exceeding the rate limit are dropped and metrics are emitted.

use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use opentelemetry::{Context, KeyValue, global};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::{Span, SpanData, SpanProcessor};

use crate::error::ExporterKind;
use crate::processors::RateLimiter;

/// A span processor that applies rate limiting before delegating to the inner processor.
///
/// Spans exceeding the configured rate limit are dropped and counted via metrics.
/// This provides cost protection against runaway telemetry from misconfiguration
/// (e.g., infinite loops) or unexpected load.
///
/// Wraps the batch processor, dropping spans before they consume queue space.
pub(crate) struct RateLimitedSpanProcessor<P> {
    inner: P,
    limiter: Arc<RateLimiter>,
}

impl<P: Debug> Debug for RateLimitedSpanProcessor<P> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RateLimitedSpanProcessor")
            .field("inner", &self.inner)
            .field("max_rate", &self.limiter.max_rate())
            .finish()
    }
}

impl<P> RateLimitedSpanProcessor<P> {
    pub(crate) fn new(inner: P, max_rate: u32, exporter: ExporterKind, instance_id: u64) -> Self {
        let meter = global::meter("apollo-opentelemetry");
        let counter = meter
            .u64_counter("otel.sdk.processor.span.processed")
            .with_description("The number of spans for which the processing has finished")
            .with_unit("{span}")
            .build();
        // Use instance_id to match the associated exporter's component name
        let component_name = format!("rate_limited_span_processor/{}", instance_id);
        let attributes = vec![
            KeyValue::new("otel.component.type", "rate_limited_span_processor"),
            KeyValue::new("otel.component.name", component_name),
        ];

        Self {
            inner,
            limiter: Arc::new(RateLimiter::new(max_rate, counter, attributes, exporter)),
        }
    }
}

impl<P: SpanProcessor> SpanProcessor for RateLimitedSpanProcessor<P> {
    fn on_start(&self, span: &mut Span, cx: &Context) {
        self.inner.on_start(span, cx);
    }

    fn on_end(&self, span: SpanData) {
        if self.limiter.check() {
            self.inner.on_end(span);
        }
    }

    fn force_flush(&self) -> OTelSdkResult {
        self.inner.force_flush()
    }

    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
        self.inner.shutdown_with_timeout(timeout)
    }

    fn set_resource(&mut self, resource: &Resource) {
        self.inner.set_resource(resource);
    }
}

#[cfg(test)]
mod tests {
    use std::borrow::Cow;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::time::SystemTime;

    use opentelemetry::trace::{
        SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
    };

    use super::*;

    /// Mock processor that counts how many spans it receives.
    #[derive(Debug, Default)]
    struct MockSpanProcessor {
        on_end_count: AtomicUsize,
    }

    impl SpanProcessor for MockSpanProcessor {
        fn on_start(&self, _span: &mut Span, _cx: &Context) {}

        fn on_end(&self, _span: SpanData) {
            self.on_end_count.fetch_add(1, Ordering::SeqCst);
        }

        fn force_flush(&self) -> OTelSdkResult {
            Ok(())
        }

        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
            Ok(())
        }

        fn set_resource(&mut self, _resource: &Resource) {}
    }

    /// Wrapper to allow sharing the mock between test and processor.
    #[derive(Debug, Clone, Default)]
    struct SharedMockSpanProcessor {
        inner: Arc<MockSpanProcessor>,
    }

    impl SpanProcessor for SharedMockSpanProcessor {
        fn on_start(&self, span: &mut Span, cx: &Context) {
            self.inner.on_start(span, cx);
        }

        fn on_end(&self, span: SpanData) {
            self.inner.on_end(span);
        }

        fn force_flush(&self) -> OTelSdkResult {
            self.inner.force_flush()
        }

        fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
            self.inner.shutdown_with_timeout(timeout)
        }

        fn set_resource(&mut self, _resource: &Resource) {}
    }

    fn create_test_span_data() -> SpanData {
        let span_context = SpanContext::new(
            TraceId::from_hex("0102030405060708090a0b0c0d0e0f10").unwrap(),
            SpanId::from_hex("0102030405060708").unwrap(),
            TraceFlags::SAMPLED,
            false,
            TraceState::default(),
        );

        SpanData {
            span_context,
            parent_span_id: SpanId::INVALID,
            parent_span_is_remote: false,
            name: Cow::Borrowed("test-span"),
            span_kind: SpanKind::Internal,
            start_time: SystemTime::now(),
            end_time: SystemTime::now(),
            attributes: Vec::new(),
            dropped_attributes_count: 0,
            events: opentelemetry_sdk::trace::SpanEvents::default(),
            links: opentelemetry_sdk::trace::SpanLinks::default(),
            status: Status::Unset,
            instrumentation_scope: opentelemetry::InstrumentationScope::builder("test").build(),
        }
    }

    #[test]
    fn spans_within_limit_are_forwarded() {
        let mock = SharedMockSpanProcessor::default();
        let processor = RateLimitedSpanProcessor::new(mock.clone(), 100, ExporterKind::OtlpHttp, 0);

        // End 50 spans - all should be forwarded
        for _ in 0..50 {
            processor.on_end(create_test_span_data());
        }

        assert_eq!(mock.inner.on_end_count.load(Ordering::SeqCst), 50);
    }

    #[test]
    fn spans_over_limit_are_dropped() {
        let mock = SharedMockSpanProcessor::default();
        // Very low limit to trigger drops
        let processor = RateLimitedSpanProcessor::new(mock.clone(), 1, ExporterKind::OtlpHttp, 0);

        // End 10 spans rapidly - most should be dropped
        for _ in 0..10 {
            processor.on_end(create_test_span_data());
        }

        // Only 1 should have been forwarded (the rate limit)
        assert_eq!(mock.inner.on_end_count.load(Ordering::SeqCst), 1);
    }

    #[test]
    fn on_start_delegates_to_inner() {
        use opentelemetry::trace::{Tracer, TracerProvider};
        use opentelemetry_sdk::trace::SdkTracerProvider;

        let mock = SharedMockSpanProcessor::default();
        let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        // Create a real span to test on_start
        let provider = SdkTracerProvider::builder()
            .with_span_processor(processor)
            .build();
        let tracer = provider.tracer("test");

        // Creating and ending a span exercises on_start
        let _span = tracer.start("test-span");
        // No panic means delegation worked
    }

    #[test]
    fn force_flush_delegates_to_inner() {
        let mock = SharedMockSpanProcessor::default();
        let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let result = processor.force_flush();
        assert!(result.is_ok());
    }

    #[test]
    fn shutdown_with_timeout_delegates_to_inner() {
        let mock = SharedMockSpanProcessor::default();
        let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let result = processor.shutdown_with_timeout(Duration::from_secs(5));
        assert!(result.is_ok());
    }

    #[test]
    fn set_resource_delegates_to_inner() {
        let mock = SharedMockSpanProcessor::default();
        let mut processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let resource = Resource::builder().build();
        processor.set_resource(&resource);
        // No panic means delegation worked
    }

    #[test]
    fn debug_format_shows_inner_and_max_rate() {
        let mock = SharedMockSpanProcessor::default();
        let processor = RateLimitedSpanProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let debug_str = format!("{:?}", processor);
        assert!(debug_str.contains("RateLimitedSpanProcessor"));
        assert!(debug_str.contains("max_rate"));
        assert!(debug_str.contains("100"));
    }
}