apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
//! Batch and rate-limited processor configuration traits.
//!
//! Provides shared logic for batch processors including effective value
//! calculations and conversions to SDK BatchConfig types.

use std::time::Duration;

use apollo_configuration::ErrorCollector;
use apollo_errors::Error;
use miette::Diagnostic;

/// Rate-limited processor configuration errors.
#[derive(Debug, Error, Diagnostic)]
pub enum RateLimitProcessorConfigError {
    /// The batch processor throughput is insufficient for the configured max_rate.
    #[error("batch processor throughput ({throughput}/s) is less than max_rate ({max_rate}/s)")]
    #[diagnostic(code(telemetry::rate_limited_processor::configured_throughput_insufficient))]
    ThroughputInsufficient {
        throughput: u32,
        max_rate: u32,
        #[help]
        help: String,
    },
}

/// Default schedule delay in milliseconds (SDK default).
pub(crate) const DEFAULT_SCHEDULE_DELAY: u64 = 5000;

/// Default export timeout in milliseconds (SDK default).
pub(crate) const DEFAULT_EXPORT_TIMEOUT: u64 = 30000;

/// Default max export batch size (SDK default).
pub(crate) const DEFAULT_MAX_EXPORT_BATCH_SIZE: u32 = 512;

/// Default max queue size (SDK default).
pub(crate) const DEFAULT_MAX_QUEUE_SIZE: u32 = 2048;

/// Trait for batch processor configurations.
///
/// Provides default implementations for calculating effective values based
/// on the raw configuration values.
pub(crate) trait BatchProcessorConfig {
    fn schedule_delay(&self) -> u64;
    fn export_timeout(&self) -> u64;
    fn max_export_batch_size(&self) -> u32;
    fn max_queue_size(&self) -> u32;

    fn exports_per_second(&self) -> f64 {
        1000.0 / self.schedule_delay() as f64
    }

    fn to_logs_batch_config(&self) -> opentelemetry_sdk::logs::BatchConfig {
        opentelemetry_sdk::logs::BatchConfigBuilder::default()
            .with_max_queue_size(self.max_queue_size() as usize)
            .with_max_export_batch_size(self.max_export_batch_size() as usize)
            .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
            .build()
    }

    fn to_trace_batch_config(&self) -> opentelemetry_sdk::trace::BatchConfig {
        opentelemetry_sdk::trace::BatchConfigBuilder::default()
            .with_max_queue_size(self.max_queue_size() as usize)
            .with_max_export_batch_size(self.max_export_batch_size() as usize)
            .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
            .build()
    }
}

/// Trait for rate-limited processor configurations.
///
/// Extends `BatchProcessorConfig` with rate limiting and throughput calculations.
pub(crate) trait RateLimitedProcessorConfig: BatchProcessorConfig {
    fn max_rate(&self) -> u32;

    fn max_throughput(&self) -> f64 {
        self.exports_per_second() * self.max_export_batch_size() as f64
    }

    /// Returns the batch size needed to achieve max_rate with current schedule_delay.
    fn required_batch_size_for_rate(&self) -> u32 {
        (self.max_rate() as f64 / self.exports_per_second()).ceil() as u32
    }

    /// Returns the schedule delay needed to achieve max_rate with current batch size.
    fn required_schedule_delay_for_rate(&self) -> u64 {
        (1000.0 * self.max_export_batch_size() as f64 / self.max_rate() as f64).floor() as u64
    }
}

/// Validates that the batch processor can keep up with the rate limiter.
///
/// The batch processor exports `max_export_batch_size` items every `schedule_delay` ms.
/// If this throughput is less than `max_rate`, items will back up in the queue and
/// eventually be dropped.
pub(crate) fn validate_rate_limited_processor<T: RateLimitedProcessorConfig>(
    config: &T,
    mut errors: ErrorCollector<'_>,
) {
    let max_throughput = config.max_throughput();

    if max_throughput >= config.max_rate() as f64 {
        return;
    }

    let max_throughput_int = max_throughput.floor() as u32;

    let suggestions = [
        format!(
            "decrease schedule_delay to at most {}ms",
            config.required_schedule_delay_for_rate()
        ),
        format!(
            "increase max_export_batch_size to at least {}",
            config.required_batch_size_for_rate()
        ),
        format!("decrease max_rate to at most {}", max_throughput_int),
    ];

    errors.report(RateLimitProcessorConfigError::ThroughputInsufficient {
        throughput: max_throughput_int,
        max_rate: config.max_rate(),
        help: format!(
            "To achieve max_rate of {}/s, either:\n  - {}",
            config.max_rate(),
            suggestions.join(", or\n  - ")
        ),
    });
}

#[cfg(test)]
mod test {
    use crate::OpenTelemetryConfig;
    use apollo_configuration::parse_yaml;

    #[test]
    fn rate_limited_span_processor_rejects_insufficient_throughput() {
        // With defaults: batch_size=512, schedule_delay=5000ms
        // throughput = 512 * 0.2 = 102.4/s < 500/s
        let result: Result<OpenTelemetryConfig, _> = parse_yaml(
            indoc::indoc! {"
            tracer_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 500
        "},
            &Default::default(),
        );

        assert!(result.is_err());
        let err = format!("{:?}", result.unwrap_err());
        assert!(
            err.contains("ThroughputInsufficient"),
            "should report throughput insufficient: {err}"
        );
        assert!(
            err.contains("increase max_export_batch_size"),
            "should suggest increasing batch_size: {err}"
        );
        assert!(
            err.contains("decrease schedule_delay"),
            "should suggest decreasing schedule_delay: {err}"
        );
        assert!(
            err.contains("decrease max_rate"),
            "should suggest decreasing max_rate: {err}"
        );
    }

    #[test]
    fn rate_limited_span_processor_valid_with_increased_batch_size() {
        // batch_size=2500 with schedule_delay=5000ms
        // throughput = 2500 * 0.2 = 500/s >= 500/s
        let _config: OpenTelemetryConfig = parse_yaml(
            indoc::indoc! {"
            tracer_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 500
                    max_export_batch_size: 2500
        "},
            &Default::default(),
        )
        .unwrap();
    }

    #[test]
    fn rate_limited_span_processor_valid_with_decreased_schedule_delay() {
        // batch_size=512 (default) with schedule_delay=1000ms
        // throughput = 512 * 1.0 = 512/s >= 500/s
        let _config: OpenTelemetryConfig = parse_yaml(
            indoc::indoc! {"
            tracer_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 500
                    schedule_delay: 1000
        "},
            &Default::default(),
        )
        .unwrap();
    }

    #[test]
    fn rate_limited_span_processor_valid_within_default_throughput() {
        // With defaults: batch_size=512, schedule_delay=5000ms
        // throughput = 512 * 0.2 = 102.4/s >= 100/s
        let _config: OpenTelemetryConfig = parse_yaml(
            indoc::indoc! {"
            tracer_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 100
        "},
            &Default::default(),
        )
        .unwrap();
    }

    #[test]
    fn rate_limited_log_processor_rejects_insufficient_throughput() {
        let result: Result<OpenTelemetryConfig, _> = parse_yaml(
            indoc::indoc! {"
            logger_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 500
        "},
            &Default::default(),
        );

        assert!(result.is_err());
        let err = format!("{:?}", result.unwrap_err());
        assert!(
            err.contains("ThroughputInsufficient"),
            "should report throughput insufficient: {err}"
        );
    }

    #[test]
    fn rate_limited_log_processor_valid_with_increased_batch_size() {
        let _config: OpenTelemetryConfig = parse_yaml(
            indoc::indoc! {"
            logger_provider:
              processors:
                - rate_limited:
                    exporter:
                      console: {}
                    max_rate: 500
                    max_export_batch_size: 2500
        "},
            &Default::default(),
        )
        .unwrap();
    }
}