apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
//! Rate limiter for span/log/metric export.
//!
//! Provides cost protection by limiting export rate and emitting metrics
//! for monitoring dropped items.
//!
//! **UNSTABLE:** Metrics follow the [OTel SDK semantic conventions] which are currently
//! in development status. Metric names and attributes may change in future releases.
//!
//! [OTel SDK semantic conventions]: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/otel/sdk-metrics.md

use std::num::NonZeroU32;
use std::sync::atomic::{AtomicBool, Ordering};

use governor::clock::{Clock, DefaultClock};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter as GovernorRateLimiter};
use log::warn;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Counter;

use crate::error::ExporterKind;

/// Rate limiter that emits metrics for processed items.
///
/// Wraps `governor` and increments a counter following the OTel SDK spec:
/// - Successful processing: no `error.type` attribute
/// - Failed processing: `error.type=rate_limited`
pub(crate) struct RateLimiter<C: Clock = DefaultClock> {
    limiter: GovernorRateLimiter<NotKeyed, InMemoryState, C, NoOpMiddleware<C::Instant>>,
    max_rate: u32,
    counter: Counter<u64>,
    /// Attributes for successful processing (no error.type)
    success_attributes: Vec<KeyValue>,
    /// Attributes for dropped items (error.type=rate_limited)
    dropped_attributes: Vec<KeyValue>,
    /// Whether we've logged the first rate limit warning
    logged_warning: AtomicBool,
    /// Exporter kind for logging
    exporter: ExporterKind,
}

impl RateLimiter<DefaultClock> {
    /// `attributes` are the base attributes; `error.type` is added for drops per OTel spec.
    pub(crate) fn new(
        max_rate: u32,
        counter: Counter<u64>,
        attributes: Vec<KeyValue>,
        exporter: ExporterKind,
    ) -> Self {
        Self::with_clock(
            max_rate,
            counter,
            attributes,
            exporter,
            DefaultClock::default(),
        )
    }
}

impl<C: Clock> RateLimiter<C> {
    /// Create a rate limiter with a custom clock.
    pub(crate) fn with_clock(
        max_rate: u32,
        counter: Counter<u64>,
        attributes: Vec<KeyValue>,
        exporter: ExporterKind,
        clock: C,
    ) -> Self {
        let quota = Quota::per_second(NonZeroU32::new(max_rate).unwrap_or(NonZeroU32::MIN));

        // Success: base attributes only (no error.type per spec)
        let success_attributes = attributes.clone();

        // Dropped: add error.type=rate_limited per spec
        let mut dropped_attributes = attributes;
        dropped_attributes.push(KeyValue::new("error.type", "rate_limited"));

        Self {
            limiter: GovernorRateLimiter::direct_with_clock(quota, clock),
            max_rate,
            counter,
            success_attributes,
            dropped_attributes,
            logged_warning: AtomicBool::new(false),
            exporter,
        }
    }

    pub fn max_rate(&self) -> u32 {
        self.max_rate
    }

    /// Returns `true` if accepted, `false` if dropped. Increments the counter.
    pub fn check(&self) -> bool {
        if self.limiter.check().is_ok() {
            self.counter.add(1, &self.success_attributes);
            true
        } else {
            self.counter.add(1, &self.dropped_attributes);
            // Log warning on first drop
            if !self.logged_warning.swap(true, Ordering::Relaxed) {
                warn!(
                    "Rate limit exceeded for {} exporter (max_rate={}), dropping telemetry. \
                     Check otel.sdk.processor.*.processed metrics for drop counts.",
                    self.exporter.as_str(),
                    self.max_rate
                );
            }
            false
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use apollo_opentelemetry_test::{TelemetryContext, assert_metric};
    use governor::clock::FakeRelativeClock;
    use opentelemetry::global;
    use std::time::Duration;

    fn create_test_limiter(max_rate: u32) -> RateLimiter {
        let meter = global::meter_provider().meter("test");
        let counter = meter
            .u64_counter("otel.sdk.processor.test.processed")
            .build();
        let attributes = vec![
            KeyValue::new("otel.component.type", "rate_limited_span_processor"),
            KeyValue::new("otel.component.name", "rate_limited_span_processor/test"),
        ];
        RateLimiter::new(max_rate, counter, attributes, ExporterKind::OtlpHttp)
    }

    fn create_test_limiter_with_clock(
        max_rate: u32,
        clock: FakeRelativeClock,
    ) -> RateLimiter<FakeRelativeClock> {
        let meter = global::meter_provider().meter("test");
        let counter = meter
            .u64_counter("otel.sdk.processor.test.processed")
            .build();
        let attributes = vec![
            KeyValue::new("otel.component.type", "rate_limited_span_processor"),
            KeyValue::new("otel.component.name", "rate_limited_span_processor/test"),
        ];
        RateLimiter::with_clock(max_rate, counter, attributes, ExporterKind::OtlpHttp, clock)
    }

    #[test]
    fn test_rate_limiter_accepts_within_limit() {
        let ctx = TelemetryContext::new();
        let limiter = create_test_limiter(100);

        // Should accept items within limit
        for _ in 0..50 {
            assert!(limiter.check());
        }

        // Should have recorded accepted items (no error.type per spec)
        assert_metric!(
            ctx,
            "otel.sdk.processor.test.processed",
            "otel.component.type" = "rate_limited_span_processor",
            "otel.component.name" = "rate_limited_span_processor/test"
        );
    }

    #[test]
    fn test_rate_limiter_drops_over_limit() {
        let ctx = TelemetryContext::new();
        let limiter = create_test_limiter(1);

        // First one should succeed
        assert!(limiter.check());

        // Immediately trying more should fail (no time to replenish)
        let mut drops = 0;
        for _ in 0..10 {
            if !limiter.check() {
                drops += 1;
            }
        }

        assert!(drops > 0, "Should have dropped some items");

        // Verify dropped metrics have error.type=rate_limited per spec
        assert_metric!(
            ctx,
            "otel.sdk.processor.test.processed",
            "otel.component.type" = "rate_limited_span_processor",
            "otel.component.name" = "rate_limited_span_processor/test",
            "error.type" = "rate_limited"
        );
    }

    #[test]
    fn test_rate_limiter_recovers_over_time() {
        let _ctx = TelemetryContext::new();
        let clock = FakeRelativeClock::default();
        let limiter = create_test_limiter_with_clock(5, clock.clone());

        // Use up all 5 tokens
        for i in 0..5 {
            assert!(limiter.check(), "request {} should succeed", i);
        }

        // Next request should be rate limited
        assert!(!limiter.check(), "should be rate limited after burst");

        // Advance time by 1 second - should replenish all 5 tokens
        clock.advance(Duration::from_secs(1));

        // Should now be able to make 5 more requests
        for i in 0..5 {
            assert!(
                limiter.check(),
                "request {} should succeed after recovery",
                i
            );
        }

        // And again hit the limit
        assert!(
            !limiter.check(),
            "should be rate limited after second burst"
        );

        // Advance time by 200ms - should replenish 1 token (5 per second = 1 per 200ms)
        clock.advance(Duration::from_millis(200));
        assert!(
            limiter.check(),
            "should accept 1 request after partial recovery"
        );
        assert!(
            !limiter.check(),
            "should be rate limited after using recovered token"
        );
    }
}