apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
//! Rate-limited log processor.
//!
//! Provides cost protection by limiting the number of logs processed per second.
//! Logs exceeding the rate limit are dropped and metrics are emitted.

use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use opentelemetry::InstrumentationScope;
use opentelemetry::{KeyValue, global};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord};

use crate::error::ExporterKind;
use crate::processors::RateLimiter;

/// A log processor that applies rate limiting before delegating to the inner processor.
///
/// Logs exceeding the configured rate limit are dropped and counted via metrics.
/// This provides cost protection against runaway telemetry from misconfiguration
/// (e.g., infinite loops) or unexpected load.
///
/// Wraps the batch processor, dropping logs before they consume queue space.
pub(crate) struct RateLimitedLogProcessor<P> {
    inner: P,
    limiter: Arc<RateLimiter>,
}

impl<P: Debug> Debug for RateLimitedLogProcessor<P> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RateLimitedLogProcessor")
            .field("inner", &self.inner)
            .field("max_rate", &self.limiter.max_rate())
            .finish()
    }
}

impl<P> RateLimitedLogProcessor<P> {
    pub(crate) fn new(inner: P, max_rate: u32, exporter: ExporterKind, instance_id: u64) -> Self {
        let meter = global::meter("apollo-opentelemetry");
        let counter = meter
            .u64_counter("otel.sdk.processor.log.processed")
            .with_description("The number of log records for which the processing has finished")
            .with_unit("{log_record}")
            .build();
        // Use instance_id to match the associated exporter's component name
        let component_name = format!("rate_limited_log_processor/{}", instance_id);
        let attributes = vec![
            KeyValue::new("otel.component.type", "rate_limited_log_processor"),
            KeyValue::new("otel.component.name", component_name),
        ];

        Self {
            inner,
            limiter: Arc::new(RateLimiter::new(max_rate, counter, attributes, exporter)),
        }
    }
}

impl<P: LogProcessor> LogProcessor for RateLimitedLogProcessor<P> {
    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
        if self.limiter.check() {
            self.inner.emit(record, instrumentation);
        }
    }

    fn force_flush(&self) -> OTelSdkResult {
        self.inner.force_flush()
    }

    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
        self.inner.shutdown_with_timeout(timeout)
    }

    fn set_resource(&mut self, resource: &Resource) {
        self.inner.set_resource(resource);
    }
}

#[cfg(test)]
mod tests {
    use std::sync::atomic::{AtomicUsize, Ordering};

    use opentelemetry::logs::{Logger, LoggerProvider};
    use opentelemetry_sdk::logs::SdkLoggerProvider;

    use super::*;

    /// Mock processor that counts how many logs it receives.
    #[derive(Debug, Default)]
    struct MockLogProcessor {
        emit_count: AtomicUsize,
    }

    impl LogProcessor for MockLogProcessor {
        fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
            self.emit_count.fetch_add(1, Ordering::SeqCst);
        }

        fn force_flush(&self) -> OTelSdkResult {
            Ok(())
        }

        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
            Ok(())
        }

        fn set_resource(&mut self, _resource: &Resource) {}
    }

    /// Wrapper to allow sharing the mock between test and processor.
    #[derive(Debug, Clone, Default)]
    struct SharedMockLogProcessor {
        inner: Arc<MockLogProcessor>,
    }

    impl LogProcessor for SharedMockLogProcessor {
        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
            self.inner.emit(record, instrumentation);
        }

        fn force_flush(&self) -> OTelSdkResult {
            self.inner.force_flush()
        }

        fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
            self.inner.shutdown_with_timeout(timeout)
        }

        fn set_resource(&mut self, _resource: &Resource) {}
    }

    fn create_test_scope() -> InstrumentationScope {
        InstrumentationScope::builder("test").build()
    }

    #[test]
    fn logs_within_limit_are_forwarded() {
        let mock = SharedMockLogProcessor::default();
        let processor = RateLimitedLogProcessor::new(mock.clone(), 100, ExporterKind::OtlpHttp, 0);

        // Create a logger provider to get proper log records
        let provider = SdkLoggerProvider::builder().build();
        let logger = provider.logger("test");
        let scope = create_test_scope();

        // Emit 50 logs - all should be forwarded
        for _ in 0..50 {
            let mut record = logger.create_log_record();
            processor.emit(&mut record, &scope);
        }

        assert_eq!(mock.inner.emit_count.load(Ordering::SeqCst), 50);
    }

    #[test]
    fn logs_over_limit_are_dropped() {
        let mock = SharedMockLogProcessor::default();
        // Very low limit to trigger drops
        let processor = RateLimitedLogProcessor::new(mock.clone(), 1, ExporterKind::OtlpHttp, 0);

        // Create a logger provider to get proper log records
        let provider = SdkLoggerProvider::builder().build();
        let logger = provider.logger("test");
        let scope = create_test_scope();

        // Emit 10 logs rapidly - most should be dropped
        for _ in 0..10 {
            let mut record = logger.create_log_record();
            processor.emit(&mut record, &scope);
        }

        // Only 1 should have been forwarded (the rate limit)
        assert_eq!(mock.inner.emit_count.load(Ordering::SeqCst), 1);
    }

    #[test]
    fn force_flush_delegates_to_inner() {
        let mock = SharedMockLogProcessor::default();
        let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let result = processor.force_flush();
        assert!(result.is_ok());
    }

    #[test]
    fn shutdown_with_timeout_delegates_to_inner() {
        let mock = SharedMockLogProcessor::default();
        let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let result = processor.shutdown_with_timeout(Duration::from_secs(5));
        assert!(result.is_ok());
    }

    #[test]
    fn set_resource_delegates_to_inner() {
        let mock = SharedMockLogProcessor::default();
        let mut processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let resource = Resource::builder().build();
        processor.set_resource(&resource);
        // No panic means delegation worked
    }

    #[test]
    fn debug_format_shows_inner_and_max_rate() {
        let mock = SharedMockLogProcessor::default();
        let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);

        let debug_str = format!("{:?}", processor);
        assert!(debug_str.contains("RateLimitedLogProcessor"));
        assert!(debug_str.contains("max_rate"));
        assert!(debug_str.contains("100"));
    }
}