Skip to main content

apollo_opentelemetry/config/
processor.rs

1//! Batch and rate-limited processor configuration traits.
2//!
3//! Provides shared logic for batch processors including effective value
4//! calculations and conversions to SDK BatchConfig types.
5
6use std::time::Duration;
7
8use apollo_configuration::ErrorCollector;
9use apollo_errors::Error;
10use miette::Diagnostic;
11
12/// Rate-limited processor configuration errors.
13#[derive(Debug, Error, Diagnostic)]
14pub enum RateLimitProcessorConfigError {
15    /// The batch processor throughput is insufficient for the configured max_rate.
16    #[error("batch processor throughput ({throughput}/s) is less than max_rate ({max_rate}/s)")]
17    #[diagnostic(code(telemetry::rate_limited_processor::configured_throughput_insufficient))]
18    ThroughputInsufficient {
19        throughput: u32,
20        max_rate: u32,
21        #[help]
22        help: String,
23    },
24}
25
26/// Default schedule delay in milliseconds (SDK default).
27pub(crate) const DEFAULT_SCHEDULE_DELAY: u64 = 5000;
28
29/// Default export timeout in milliseconds (SDK default).
30pub(crate) const DEFAULT_EXPORT_TIMEOUT: u64 = 30000;
31
32/// Default max export batch size (SDK default).
33pub(crate) const DEFAULT_MAX_EXPORT_BATCH_SIZE: u32 = 512;
34
35/// Default max queue size (SDK default).
36pub(crate) const DEFAULT_MAX_QUEUE_SIZE: u32 = 2048;
37
38/// Trait for batch processor configurations.
39///
40/// Provides default implementations for calculating effective values based
41/// on the raw configuration values.
42pub(crate) trait BatchProcessorConfig {
43    fn schedule_delay(&self) -> u64;
44    fn export_timeout(&self) -> u64;
45    fn max_export_batch_size(&self) -> u32;
46    fn max_queue_size(&self) -> u32;
47
48    fn exports_per_second(&self) -> f64 {
49        1000.0 / self.schedule_delay() as f64
50    }
51
52    fn to_logs_batch_config(&self) -> opentelemetry_sdk::logs::BatchConfig {
53        opentelemetry_sdk::logs::BatchConfigBuilder::default()
54            .with_max_queue_size(self.max_queue_size() as usize)
55            .with_max_export_batch_size(self.max_export_batch_size() as usize)
56            .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
57            .build()
58    }
59
60    fn to_trace_batch_config(&self) -> opentelemetry_sdk::trace::BatchConfig {
61        opentelemetry_sdk::trace::BatchConfigBuilder::default()
62            .with_max_queue_size(self.max_queue_size() as usize)
63            .with_max_export_batch_size(self.max_export_batch_size() as usize)
64            .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
65            .build()
66    }
67}
68
69/// Trait for rate-limited processor configurations.
70///
71/// Extends `BatchProcessorConfig` with rate limiting and throughput calculations.
72pub(crate) trait RateLimitedProcessorConfig: BatchProcessorConfig {
73    fn max_rate(&self) -> u32;
74
75    fn max_throughput(&self) -> f64 {
76        self.exports_per_second() * self.max_export_batch_size() as f64
77    }
78
79    /// Returns the batch size needed to achieve max_rate with current schedule_delay.
80    fn required_batch_size_for_rate(&self) -> u32 {
81        (self.max_rate() as f64 / self.exports_per_second()).ceil() as u32
82    }
83
84    /// Returns the schedule delay needed to achieve max_rate with current batch size.
85    fn required_schedule_delay_for_rate(&self) -> u64 {
86        (1000.0 * self.max_export_batch_size() as f64 / self.max_rate() as f64).floor() as u64
87    }
88}
89
90/// Validates that the batch processor can keep up with the rate limiter.
91///
92/// The batch processor exports `max_export_batch_size` items every `schedule_delay` ms.
93/// If this throughput is less than `max_rate`, items will back up in the queue and
94/// eventually be dropped.
95pub(crate) fn validate_rate_limited_processor<T: RateLimitedProcessorConfig>(
96    config: &T,
97    mut errors: ErrorCollector<'_>,
98) {
99    let max_throughput = config.max_throughput();
100
101    if max_throughput >= config.max_rate() as f64 {
102        return;
103    }
104
105    let max_throughput_int = max_throughput.floor() as u32;
106
107    let suggestions = [
108        format!(
109            "decrease schedule_delay to at most {}ms",
110            config.required_schedule_delay_for_rate()
111        ),
112        format!(
113            "increase max_export_batch_size to at least {}",
114            config.required_batch_size_for_rate()
115        ),
116        format!("decrease max_rate to at most {}", max_throughput_int),
117    ];
118
119    errors.report(RateLimitProcessorConfigError::ThroughputInsufficient {
120        throughput: max_throughput_int,
121        max_rate: config.max_rate(),
122        help: format!(
123            "To achieve max_rate of {}/s, either:\n  - {}",
124            config.max_rate(),
125            suggestions.join(", or\n  - ")
126        ),
127    });
128}
129
130#[cfg(test)]
131mod test {
132    use crate::OpenTelemetryConfig;
133    use apollo_configuration::parse_yaml;
134
135    #[test]
136    fn rate_limited_span_processor_rejects_insufficient_throughput() {
137        // With defaults: batch_size=512, schedule_delay=5000ms
138        // throughput = 512 * 0.2 = 102.4/s < 500/s
139        let result: Result<OpenTelemetryConfig, _> = parse_yaml(
140            indoc::indoc! {"
141            tracer_provider:
142              processors:
143                - rate_limited:
144                    exporter:
145                      console: {}
146                    max_rate: 500
147        "},
148            &Default::default(),
149        );
150
151        assert!(result.is_err());
152        let err = format!("{:?}", result.unwrap_err());
153        assert!(
154            err.contains("ThroughputInsufficient"),
155            "should report throughput insufficient: {err}"
156        );
157        assert!(
158            err.contains("increase max_export_batch_size"),
159            "should suggest increasing batch_size: {err}"
160        );
161        assert!(
162            err.contains("decrease schedule_delay"),
163            "should suggest decreasing schedule_delay: {err}"
164        );
165        assert!(
166            err.contains("decrease max_rate"),
167            "should suggest decreasing max_rate: {err}"
168        );
169    }
170
171    #[test]
172    fn rate_limited_span_processor_valid_with_increased_batch_size() {
173        // batch_size=2500 with schedule_delay=5000ms
174        // throughput = 2500 * 0.2 = 500/s >= 500/s
175        let _config: OpenTelemetryConfig = parse_yaml(
176            indoc::indoc! {"
177            tracer_provider:
178              processors:
179                - rate_limited:
180                    exporter:
181                      console: {}
182                    max_rate: 500
183                    max_export_batch_size: 2500
184        "},
185            &Default::default(),
186        )
187        .unwrap();
188    }
189
190    #[test]
191    fn rate_limited_span_processor_valid_with_decreased_schedule_delay() {
192        // batch_size=512 (default) with schedule_delay=1000ms
193        // throughput = 512 * 1.0 = 512/s >= 500/s
194        let _config: OpenTelemetryConfig = parse_yaml(
195            indoc::indoc! {"
196            tracer_provider:
197              processors:
198                - rate_limited:
199                    exporter:
200                      console: {}
201                    max_rate: 500
202                    schedule_delay: 1000
203        "},
204            &Default::default(),
205        )
206        .unwrap();
207    }
208
209    #[test]
210    fn rate_limited_span_processor_valid_within_default_throughput() {
211        // With defaults: batch_size=512, schedule_delay=5000ms
212        // throughput = 512 * 0.2 = 102.4/s >= 100/s
213        let _config: OpenTelemetryConfig = parse_yaml(
214            indoc::indoc! {"
215            tracer_provider:
216              processors:
217                - rate_limited:
218                    exporter:
219                      console: {}
220                    max_rate: 100
221        "},
222            &Default::default(),
223        )
224        .unwrap();
225    }
226
227    #[test]
228    fn rate_limited_log_processor_rejects_insufficient_throughput() {
229        let result: Result<OpenTelemetryConfig, _> = parse_yaml(
230            indoc::indoc! {"
231            logger_provider:
232              processors:
233                - rate_limited:
234                    exporter:
235                      console: {}
236                    max_rate: 500
237        "},
238            &Default::default(),
239        );
240
241        assert!(result.is_err());
242        let err = format!("{:?}", result.unwrap_err());
243        assert!(
244            err.contains("ThroughputInsufficient"),
245            "should report throughput insufficient: {err}"
246        );
247    }
248
249    #[test]
250    fn rate_limited_log_processor_valid_with_increased_batch_size() {
251        let _config: OpenTelemetryConfig = parse_yaml(
252            indoc::indoc! {"
253            logger_provider:
254              processors:
255                - rate_limited:
256                    exporter:
257                      console: {}
258                    max_rate: 500
259                    max_export_batch_size: 2500
260        "},
261            &Default::default(),
262        )
263        .unwrap();
264    }
265}