apollo_opentelemetry/config/
processor.rs1use std::time::Duration;
7
8use apollo_configuration::ErrorCollector;
9use apollo_errors::Error;
10use miette::Diagnostic;
11
12#[derive(Debug, Error, Diagnostic)]
14pub enum RateLimitProcessorConfigError {
15 #[error("batch processor throughput ({throughput}/s) is less than max_rate ({max_rate}/s)")]
17 #[diagnostic(code(telemetry::rate_limited_processor::configured_throughput_insufficient))]
18 ThroughputInsufficient {
19 throughput: u32,
20 max_rate: u32,
21 #[help]
22 help: String,
23 },
24}
25
26pub(crate) const DEFAULT_SCHEDULE_DELAY: u64 = 5000;
28
29pub(crate) const DEFAULT_EXPORT_TIMEOUT: u64 = 30000;
31
32pub(crate) const DEFAULT_MAX_EXPORT_BATCH_SIZE: u32 = 512;
34
35pub(crate) const DEFAULT_MAX_QUEUE_SIZE: u32 = 2048;
37
38pub(crate) trait BatchProcessorConfig {
43 fn schedule_delay(&self) -> u64;
44 fn export_timeout(&self) -> u64;
45 fn max_export_batch_size(&self) -> u32;
46 fn max_queue_size(&self) -> u32;
47
48 fn exports_per_second(&self) -> f64 {
49 1000.0 / self.schedule_delay() as f64
50 }
51
52 fn to_logs_batch_config(&self) -> opentelemetry_sdk::logs::BatchConfig {
53 opentelemetry_sdk::logs::BatchConfigBuilder::default()
54 .with_max_queue_size(self.max_queue_size() as usize)
55 .with_max_export_batch_size(self.max_export_batch_size() as usize)
56 .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
57 .build()
58 }
59
60 fn to_trace_batch_config(&self) -> opentelemetry_sdk::trace::BatchConfig {
61 opentelemetry_sdk::trace::BatchConfigBuilder::default()
62 .with_max_queue_size(self.max_queue_size() as usize)
63 .with_max_export_batch_size(self.max_export_batch_size() as usize)
64 .with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
65 .build()
66 }
67}
68
69pub(crate) trait RateLimitedProcessorConfig: BatchProcessorConfig {
73 fn max_rate(&self) -> u32;
74
75 fn max_throughput(&self) -> f64 {
76 self.exports_per_second() * self.max_export_batch_size() as f64
77 }
78
79 fn required_batch_size_for_rate(&self) -> u32 {
81 (self.max_rate() as f64 / self.exports_per_second()).ceil() as u32
82 }
83
84 fn required_schedule_delay_for_rate(&self) -> u64 {
86 (1000.0 * self.max_export_batch_size() as f64 / self.max_rate() as f64).floor() as u64
87 }
88}
89
90pub(crate) fn validate_rate_limited_processor<T: RateLimitedProcessorConfig>(
96 config: &T,
97 mut errors: ErrorCollector<'_>,
98) {
99 let max_throughput = config.max_throughput();
100
101 if max_throughput >= config.max_rate() as f64 {
102 return;
103 }
104
105 let max_throughput_int = max_throughput.floor() as u32;
106
107 let suggestions = [
108 format!(
109 "decrease schedule_delay to at most {}ms",
110 config.required_schedule_delay_for_rate()
111 ),
112 format!(
113 "increase max_export_batch_size to at least {}",
114 config.required_batch_size_for_rate()
115 ),
116 format!("decrease max_rate to at most {}", max_throughput_int),
117 ];
118
119 errors.report(RateLimitProcessorConfigError::ThroughputInsufficient {
120 throughput: max_throughput_int,
121 max_rate: config.max_rate(),
122 help: format!(
123 "To achieve max_rate of {}/s, either:\n - {}",
124 config.max_rate(),
125 suggestions.join(", or\n - ")
126 ),
127 });
128}
129
130#[cfg(test)]
131mod test {
132 use crate::OpenTelemetryConfig;
133 use apollo_configuration::parse_yaml;
134
135 #[test]
136 fn rate_limited_span_processor_rejects_insufficient_throughput() {
137 let result: Result<OpenTelemetryConfig, _> = parse_yaml(
140 indoc::indoc! {"
141 tracer_provider:
142 processors:
143 - rate_limited:
144 exporter:
145 console: {}
146 max_rate: 500
147 "},
148 &Default::default(),
149 );
150
151 assert!(result.is_err());
152 let err = format!("{:?}", result.unwrap_err());
153 assert!(
154 err.contains("ThroughputInsufficient"),
155 "should report throughput insufficient: {err}"
156 );
157 assert!(
158 err.contains("increase max_export_batch_size"),
159 "should suggest increasing batch_size: {err}"
160 );
161 assert!(
162 err.contains("decrease schedule_delay"),
163 "should suggest decreasing schedule_delay: {err}"
164 );
165 assert!(
166 err.contains("decrease max_rate"),
167 "should suggest decreasing max_rate: {err}"
168 );
169 }
170
171 #[test]
172 fn rate_limited_span_processor_valid_with_increased_batch_size() {
173 let _config: OpenTelemetryConfig = parse_yaml(
176 indoc::indoc! {"
177 tracer_provider:
178 processors:
179 - rate_limited:
180 exporter:
181 console: {}
182 max_rate: 500
183 max_export_batch_size: 2500
184 "},
185 &Default::default(),
186 )
187 .unwrap();
188 }
189
190 #[test]
191 fn rate_limited_span_processor_valid_with_decreased_schedule_delay() {
192 let _config: OpenTelemetryConfig = parse_yaml(
195 indoc::indoc! {"
196 tracer_provider:
197 processors:
198 - rate_limited:
199 exporter:
200 console: {}
201 max_rate: 500
202 schedule_delay: 1000
203 "},
204 &Default::default(),
205 )
206 .unwrap();
207 }
208
209 #[test]
210 fn rate_limited_span_processor_valid_within_default_throughput() {
211 let _config: OpenTelemetryConfig = parse_yaml(
214 indoc::indoc! {"
215 tracer_provider:
216 processors:
217 - rate_limited:
218 exporter:
219 console: {}
220 max_rate: 100
221 "},
222 &Default::default(),
223 )
224 .unwrap();
225 }
226
227 #[test]
228 fn rate_limited_log_processor_rejects_insufficient_throughput() {
229 let result: Result<OpenTelemetryConfig, _> = parse_yaml(
230 indoc::indoc! {"
231 logger_provider:
232 processors:
233 - rate_limited:
234 exporter:
235 console: {}
236 max_rate: 500
237 "},
238 &Default::default(),
239 );
240
241 assert!(result.is_err());
242 let err = format!("{:?}", result.unwrap_err());
243 assert!(
244 err.contains("ThroughputInsufficient"),
245 "should report throughput insufficient: {err}"
246 );
247 }
248
249 #[test]
250 fn rate_limited_log_processor_valid_with_increased_batch_size() {
251 let _config: OpenTelemetryConfig = parse_yaml(
252 indoc::indoc! {"
253 logger_provider:
254 processors:
255 - rate_limited:
256 exporter:
257 console: {}
258 max_rate: 500
259 max_export_batch_size: 2500
260 "},
261 &Default::default(),
262 )
263 .unwrap();
264 }
265}