use std::time::Duration;
use apollo_configuration::ErrorCollector;
use apollo_errors::Error;
use miette::Diagnostic;
#[derive(Debug, Error, Diagnostic)]
pub enum RateLimitProcessorConfigError {
#[error("batch processor throughput ({throughput}/s) is less than max_rate ({max_rate}/s)")]
#[diagnostic(code(telemetry::rate_limited_processor::configured_throughput_insufficient))]
ThroughputInsufficient {
throughput: u32,
max_rate: u32,
#[help]
help: String,
},
}
pub(crate) const DEFAULT_SCHEDULE_DELAY: u64 = 5000;
pub(crate) const DEFAULT_EXPORT_TIMEOUT: u64 = 30000;
pub(crate) const DEFAULT_MAX_EXPORT_BATCH_SIZE: u32 = 512;
pub(crate) const DEFAULT_MAX_QUEUE_SIZE: u32 = 2048;
pub(crate) trait BatchProcessorConfig {
fn schedule_delay(&self) -> u64;
fn export_timeout(&self) -> u64;
fn max_export_batch_size(&self) -> u32;
fn max_queue_size(&self) -> u32;
fn exports_per_second(&self) -> f64 {
1000.0 / self.schedule_delay() as f64
}
fn to_logs_batch_config(&self) -> opentelemetry_sdk::logs::BatchConfig {
opentelemetry_sdk::logs::BatchConfigBuilder::default()
.with_max_queue_size(self.max_queue_size() as usize)
.with_max_export_batch_size(self.max_export_batch_size() as usize)
.with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
.build()
}
fn to_trace_batch_config(&self) -> opentelemetry_sdk::trace::BatchConfig {
opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_max_queue_size(self.max_queue_size() as usize)
.with_max_export_batch_size(self.max_export_batch_size() as usize)
.with_scheduled_delay(Duration::from_millis(self.schedule_delay()))
.build()
}
}
pub(crate) trait RateLimitedProcessorConfig: BatchProcessorConfig {
fn max_rate(&self) -> u32;
fn max_throughput(&self) -> f64 {
self.exports_per_second() * self.max_export_batch_size() as f64
}
fn required_batch_size_for_rate(&self) -> u32 {
(self.max_rate() as f64 / self.exports_per_second()).ceil() as u32
}
fn required_schedule_delay_for_rate(&self) -> u64 {
(1000.0 * self.max_export_batch_size() as f64 / self.max_rate() as f64).floor() as u64
}
}
pub(crate) fn validate_rate_limited_processor<T: RateLimitedProcessorConfig>(
config: &T,
mut errors: ErrorCollector<'_>,
) {
let max_throughput = config.max_throughput();
if max_throughput >= config.max_rate() as f64 {
return;
}
let max_throughput_int = max_throughput.floor() as u32;
let suggestions = [
format!(
"decrease schedule_delay to at most {}ms",
config.required_schedule_delay_for_rate()
),
format!(
"increase max_export_batch_size to at least {}",
config.required_batch_size_for_rate()
),
format!("decrease max_rate to at most {}", max_throughput_int),
];
errors.report(RateLimitProcessorConfigError::ThroughputInsufficient {
throughput: max_throughput_int,
max_rate: config.max_rate(),
help: format!(
"To achieve max_rate of {}/s, either:\n - {}",
config.max_rate(),
suggestions.join(", or\n - ")
),
});
}
#[cfg(test)]
mod test {
use crate::OpenTelemetryConfig;
use apollo_configuration::parse_yaml;
#[test]
fn rate_limited_span_processor_rejects_insufficient_throughput() {
let result: Result<OpenTelemetryConfig, _> = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 500
"},
&Default::default(),
);
assert!(result.is_err());
let err = format!("{:?}", result.unwrap_err());
assert!(
err.contains("ThroughputInsufficient"),
"should report throughput insufficient: {err}"
);
assert!(
err.contains("increase max_export_batch_size"),
"should suggest increasing batch_size: {err}"
);
assert!(
err.contains("decrease schedule_delay"),
"should suggest decreasing schedule_delay: {err}"
);
assert!(
err.contains("decrease max_rate"),
"should suggest decreasing max_rate: {err}"
);
}
#[test]
fn rate_limited_span_processor_valid_with_increased_batch_size() {
let _config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 500
max_export_batch_size: 2500
"},
&Default::default(),
)
.unwrap();
}
#[test]
fn rate_limited_span_processor_valid_with_decreased_schedule_delay() {
let _config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 500
schedule_delay: 1000
"},
&Default::default(),
)
.unwrap();
}
#[test]
fn rate_limited_span_processor_valid_within_default_throughput() {
let _config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 100
"},
&Default::default(),
)
.unwrap();
}
#[test]
fn rate_limited_log_processor_rejects_insufficient_throughput() {
let result: Result<OpenTelemetryConfig, _> = parse_yaml(
indoc::indoc! {"
logger_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 500
"},
&Default::default(),
);
assert!(result.is_err());
let err = format!("{:?}", result.unwrap_err());
assert!(
err.contains("ThroughputInsufficient"),
"should report throughput insufficient: {err}"
);
}
#[test]
fn rate_limited_log_processor_valid_with_increased_batch_size() {
let _config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
logger_provider:
processors:
- rate_limited:
exporter:
console: {}
max_rate: 500
max_export_batch_size: 2500
"},
&Default::default(),
)
.unwrap();
}
}