use std::num::NonZeroU32;
use std::sync::atomic::{AtomicBool, Ordering};
use governor::clock::{Clock, DefaultClock};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter as GovernorRateLimiter};
use log::warn;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Counter;
use crate::error::ExporterKind;
pub(crate) struct RateLimiter<C: Clock = DefaultClock> {
limiter: GovernorRateLimiter<NotKeyed, InMemoryState, C, NoOpMiddleware<C::Instant>>,
max_rate: u32,
counter: Counter<u64>,
success_attributes: Vec<KeyValue>,
dropped_attributes: Vec<KeyValue>,
logged_warning: AtomicBool,
exporter: ExporterKind,
}
impl RateLimiter<DefaultClock> {
pub(crate) fn new(
max_rate: u32,
counter: Counter<u64>,
attributes: Vec<KeyValue>,
exporter: ExporterKind,
) -> Self {
Self::with_clock(
max_rate,
counter,
attributes,
exporter,
DefaultClock::default(),
)
}
}
impl<C: Clock> RateLimiter<C> {
pub(crate) fn with_clock(
max_rate: u32,
counter: Counter<u64>,
attributes: Vec<KeyValue>,
exporter: ExporterKind,
clock: C,
) -> Self {
let quota = Quota::per_second(NonZeroU32::new(max_rate).unwrap_or(NonZeroU32::MIN));
let success_attributes = attributes.clone();
let mut dropped_attributes = attributes;
dropped_attributes.push(KeyValue::new("error.type", "rate_limited"));
Self {
limiter: GovernorRateLimiter::direct_with_clock(quota, clock),
max_rate,
counter,
success_attributes,
dropped_attributes,
logged_warning: AtomicBool::new(false),
exporter,
}
}
pub fn max_rate(&self) -> u32 {
self.max_rate
}
pub fn check(&self) -> bool {
if self.limiter.check().is_ok() {
self.counter.add(1, &self.success_attributes);
true
} else {
self.counter.add(1, &self.dropped_attributes);
if !self.logged_warning.swap(true, Ordering::Relaxed) {
warn!(
"Rate limit exceeded for {} exporter (max_rate={}), dropping telemetry. \
Check otel.sdk.processor.*.processed metrics for drop counts.",
self.exporter.as_str(),
self.max_rate
);
}
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use apollo_opentelemetry_test::{TelemetryContext, assert_metric};
use governor::clock::FakeRelativeClock;
use opentelemetry::global;
use std::time::Duration;
fn create_test_limiter(max_rate: u32) -> RateLimiter {
let meter = global::meter_provider().meter("test");
let counter = meter
.u64_counter("otel.sdk.processor.test.processed")
.build();
let attributes = vec![
KeyValue::new("otel.component.type", "rate_limited_span_processor"),
KeyValue::new("otel.component.name", "rate_limited_span_processor/test"),
];
RateLimiter::new(max_rate, counter, attributes, ExporterKind::OtlpHttp)
}
fn create_test_limiter_with_clock(
max_rate: u32,
clock: FakeRelativeClock,
) -> RateLimiter<FakeRelativeClock> {
let meter = global::meter_provider().meter("test");
let counter = meter
.u64_counter("otel.sdk.processor.test.processed")
.build();
let attributes = vec![
KeyValue::new("otel.component.type", "rate_limited_span_processor"),
KeyValue::new("otel.component.name", "rate_limited_span_processor/test"),
];
RateLimiter::with_clock(max_rate, counter, attributes, ExporterKind::OtlpHttp, clock)
}
#[test]
fn test_rate_limiter_accepts_within_limit() {
let ctx = TelemetryContext::new();
let limiter = create_test_limiter(100);
for _ in 0..50 {
assert!(limiter.check());
}
assert_metric!(
ctx,
"otel.sdk.processor.test.processed",
"otel.component.type" = "rate_limited_span_processor",
"otel.component.name" = "rate_limited_span_processor/test"
);
}
#[test]
fn test_rate_limiter_drops_over_limit() {
let ctx = TelemetryContext::new();
let limiter = create_test_limiter(1);
assert!(limiter.check());
let mut drops = 0;
for _ in 0..10 {
if !limiter.check() {
drops += 1;
}
}
assert!(drops > 0, "Should have dropped some items");
assert_metric!(
ctx,
"otel.sdk.processor.test.processed",
"otel.component.type" = "rate_limited_span_processor",
"otel.component.name" = "rate_limited_span_processor/test",
"error.type" = "rate_limited"
);
}
#[test]
fn test_rate_limiter_recovers_over_time() {
let _ctx = TelemetryContext::new();
let clock = FakeRelativeClock::default();
let limiter = create_test_limiter_with_clock(5, clock.clone());
for i in 0..5 {
assert!(limiter.check(), "request {} should succeed", i);
}
assert!(!limiter.check(), "should be rate limited after burst");
clock.advance(Duration::from_secs(1));
for i in 0..5 {
assert!(
limiter.check(),
"request {} should succeed after recovery",
i
);
}
assert!(
!limiter.check(),
"should be rate limited after second burst"
);
clock.advance(Duration::from_millis(200));
assert!(
limiter.check(),
"should accept 1 request after partial recovery"
);
assert!(
!limiter.check(),
"should be rate limited after using recovered token"
);
}
}