use dashmap::DashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use tokio::time::Instant;
use tracing_core::Metadata;
use tracing_core::callsite::Identifier;
pub const DEFAULT_MAX_EVENTS_PER_SECOND: u64 = 1000;
pub const DEFAULT_MAX_EVENTS_PER_CALLSITE_PER_SECOND: u64 = 30;
pub const MAX_TRACKED_CALLSITES: usize = 4096;
const DROPPED_SUMMARY_INTERVAL: Duration = Duration::from_secs(10);
struct RateLimiterInner {
max_events_per_second: u64,
event_count: AtomicU64,
dropped_count: AtomicU64,
total_dropped: AtomicU64,
window_start_nanos: AtomicU64,
last_summary_nanos: AtomicU64,
start_instant: Instant,
}
#[derive(Clone)]
pub struct RateLimiter {
inner: Arc<RateLimiterInner>,
}
impl RateLimiter {
pub fn new(max_events_per_second: u64) -> Self {
let now = Instant::now();
Self {
inner: Arc::new(RateLimiterInner {
max_events_per_second,
event_count: AtomicU64::new(0),
dropped_count: AtomicU64::new(0),
total_dropped: AtomicU64::new(0),
window_start_nanos: AtomicU64::new(0),
last_summary_nanos: AtomicU64::new(0),
start_instant: now,
}),
}
}
pub fn with_defaults() -> Self {
Self::new(DEFAULT_MAX_EVENTS_PER_SECOND)
}
fn elapsed_nanos(&self) -> u64 {
self.inner.start_instant.elapsed().as_nanos() as u64
}
pub fn should_allow(&self) -> bool {
let now_nanos = self.elapsed_nanos();
let window_start = self.inner.window_start_nanos.load(Ordering::Relaxed);
const ONE_SECOND_NANOS: u64 = 1_000_000_000;
if now_nanos >= window_start + ONE_SECOND_NANOS {
if self
.inner
.window_start_nanos
.compare_exchange(window_start, now_nanos, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
self.inner.event_count.store(0, Ordering::Relaxed);
}
}
let count = self.inner.event_count.fetch_add(1, Ordering::Relaxed);
if count < self.inner.max_events_per_second {
self.maybe_log_summary();
true
} else {
self.inner.dropped_count.fetch_add(1, Ordering::Relaxed);
self.inner.total_dropped.fetch_add(1, Ordering::Relaxed);
self.maybe_log_summary();
false
}
}
fn maybe_log_summary(&self) {
let dropped = self.inner.dropped_count.load(Ordering::Relaxed);
if dropped == 0 {
return;
}
let now_nanos = self.elapsed_nanos();
let last_summary = self.inner.last_summary_nanos.load(Ordering::Relaxed);
let interval_nanos = DROPPED_SUMMARY_INTERVAL.as_nanos() as u64;
if now_nanos >= last_summary + interval_nanos {
if self
.inner
.last_summary_nanos
.compare_exchange(last_summary, now_nanos, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
let dropped = self.inner.dropped_count.swap(0, Ordering::Relaxed);
let total = self.inner.total_dropped.load(Ordering::Relaxed);
if dropped > 0 {
eprintln!(
"[RATE LIMIT] Dropped {} log events in last {}s (total dropped: {}). \
Max rate: {}/sec. Consider investigating log spam.",
dropped,
DROPPED_SUMMARY_INTERVAL.as_secs(),
total,
self.inner.max_events_per_second
);
}
}
}
}
}
impl Default for RateLimiter {
fn default() -> Self {
Self::with_defaults()
}
}
#[derive(Clone)]
pub struct PerCallsiteRateLimiter {
inner: Arc<PerCallsiteInner>,
}
struct PerCallsiteInner {
max_events_per_second_per_callsite: u64,
callsites: DashMap<Identifier, CallsiteState>,
last_summary_nanos: AtomicU64,
summary_interval: Duration,
start_instant: Instant,
cap_hit_warned: AtomicBool,
}
struct CallsiteState {
window_start_nanos: AtomicU64,
event_count: AtomicU64,
dropped_this_interval: AtomicU64,
total_dropped: AtomicU64,
name: &'static str,
}
impl PerCallsiteRateLimiter {
pub fn new(max_events_per_second_per_callsite: u64) -> Self {
Self::with_summary_interval(max_events_per_second_per_callsite, Duration::from_secs(30))
}
pub fn with_summary_interval(
max_events_per_second_per_callsite: u64,
summary_interval: Duration,
) -> Self {
Self {
inner: Arc::new(PerCallsiteInner {
max_events_per_second_per_callsite,
callsites: DashMap::new(),
last_summary_nanos: AtomicU64::new(0),
summary_interval,
start_instant: Instant::now(),
cap_hit_warned: AtomicBool::new(false),
}),
}
}
pub fn with_defaults() -> Self {
Self::new(DEFAULT_MAX_EVENTS_PER_CALLSITE_PER_SECOND)
}
fn elapsed_nanos(&self) -> u64 {
self.inner.start_instant.elapsed().as_nanos() as u64
}
pub fn should_allow(&self, metadata: &Metadata<'_>) -> bool {
self.should_allow_inner(metadata.callsite(), metadata.name())
}
fn should_allow_inner(&self, id: Identifier, name: &'static str) -> bool {
let now_nanos = self.elapsed_nanos();
let entry = match self.inner.callsites.get(&id) {
Some(e) => e,
None => {
if self.inner.callsites.len() >= MAX_TRACKED_CALLSITES {
if self
.inner
.cap_hit_warned
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
eprintln!(
"[RATE LIMIT per-callsite] tracked-callsite cap reached \
({MAX_TRACKED_CALLSITES} entries). New callsites are \
passing through unrate-limited until existing entries \
are reclaimed (they aren't — there is no eviction). \
Investigate whether something is generating unbounded \
unique callsites."
);
}
return true;
}
self.inner
.callsites
.entry(id.clone())
.or_insert_with(|| CallsiteState {
window_start_nanos: AtomicU64::new(now_nanos),
event_count: AtomicU64::new(0),
dropped_this_interval: AtomicU64::new(0),
total_dropped: AtomicU64::new(0),
name,
})
.downgrade()
}
};
const ONE_SECOND_NANOS: u64 = 1_000_000_000;
let window_start = entry.window_start_nanos.load(Ordering::Relaxed);
if now_nanos >= window_start.saturating_add(ONE_SECOND_NANOS) {
if entry
.window_start_nanos
.compare_exchange(window_start, now_nanos, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
entry.event_count.store(0, Ordering::Relaxed);
}
}
let count = entry.event_count.fetch_add(1, Ordering::Relaxed);
let allow = count < self.inner.max_events_per_second_per_callsite;
if !allow {
entry.dropped_this_interval.fetch_add(1, Ordering::Relaxed);
entry.total_dropped.fetch_add(1, Ordering::Relaxed);
}
drop(entry);
self.maybe_log_summary(now_nanos);
allow
}
fn maybe_log_summary(&self, now_nanos: u64) {
let last = self.inner.last_summary_nanos.load(Ordering::Relaxed);
let interval_nanos = self.inner.summary_interval.as_nanos() as u64;
if now_nanos < last.saturating_add(interval_nanos) {
return;
}
if self
.inner
.last_summary_nanos
.compare_exchange(last, now_nanos, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return;
}
let mut offenders: Vec<(String, u64, u64)> = Vec::new();
for entry in self.inner.callsites.iter() {
let dropped = entry.dropped_this_interval.swap(0, Ordering::Relaxed);
if dropped == 0 {
continue;
}
let total = entry.total_dropped.load(Ordering::Relaxed);
offenders.push((entry.name.to_string(), dropped, total));
}
offenders.sort_by(|a, b| b.1.cmp(&a.1));
for (site, dropped, total) in offenders {
eprintln!(
"[RATE LIMIT per-callsite] {site}: dropped {dropped} in last {}s (cumulative: {total}). \
Per-callsite cap: {}/sec.",
self.inner.summary_interval.as_secs(),
self.inner.max_events_per_second_per_callsite
);
}
}
}
impl Default for PerCallsiteRateLimiter {
fn default() -> Self {
Self::with_defaults()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rate_limiter_allows_under_limit() {
let limiter = RateLimiter::new(100);
for _ in 0..100 {
assert!(limiter.should_allow());
}
}
#[test]
fn test_rate_limiter_drops_over_limit() {
let limiter = RateLimiter::new(10);
for _ in 0..10 {
assert!(limiter.should_allow());
}
for _ in 0..10 {
assert!(!limiter.should_allow());
}
assert_eq!(limiter.inner.dropped_count.load(Ordering::Relaxed), 10);
assert_eq!(limiter.inner.total_dropped.load(Ordering::Relaxed), 10);
}
#[test]
fn test_rate_limiter_resets_after_window() {
let limiter = RateLimiter::new(5);
for _ in 0..5 {
assert!(limiter.should_allow());
}
assert!(!limiter.should_allow());
limiter.inner.event_count.store(0, Ordering::Relaxed);
assert!(limiter.should_allow());
for _ in 0..4 {
assert!(limiter.should_allow());
}
assert!(!limiter.should_allow());
}
#[test]
fn test_default_limit() {
let limiter = RateLimiter::with_defaults();
assert_eq!(
limiter.inner.max_events_per_second,
DEFAULT_MAX_EVENTS_PER_SECOND
);
}
#[test]
fn test_rate_limiter_is_cloneable() {
let limiter1 = RateLimiter::new(10);
let limiter2 = limiter1.clone();
for _ in 0..10 {
assert!(limiter1.should_allow());
}
assert!(!limiter2.should_allow());
}
use tracing_core::Metadata;
use tracing_core::callsite::Callsite;
struct TestCallsite {
_tag: usize,
}
impl Callsite for TestCallsite {
fn set_interest(&self, _: tracing_core::subscriber::Interest) {}
fn metadata(&self) -> &Metadata<'_> {
unreachable!("limiter never invokes this in the inner fast path")
}
}
static CS_A: TestCallsite = TestCallsite { _tag: 1 };
static CS_B: TestCallsite = TestCallsite { _tag: 2 };
static CS_C: TestCallsite = TestCallsite { _tag: 3 };
static CS_D: TestCallsite = TestCallsite { _tag: 4 };
fn id(cs: &'static TestCallsite) -> Identifier {
Identifier(cs)
}
fn drive(
limiter: &PerCallsiteRateLimiter,
cs: &'static TestCallsite,
n: usize,
name: &'static str,
) -> usize {
(0..n)
.filter(|_| limiter.should_allow_inner(id(cs), name))
.count()
}
#[test]
fn per_callsite_allows_under_limit() {
let limiter = PerCallsiteRateLimiter::new(50);
let allowed = drive(&limiter, &CS_A, 50, "site_under_limit");
assert_eq!(allowed, 50, "all 50 events under the cap must pass");
}
#[test]
fn per_callsite_drops_over_limit() {
let limiter = PerCallsiteRateLimiter::new(5);
let allowed = drive(&limiter, &CS_B, 20, "site_over_limit");
assert_eq!(allowed, 5, "only 5 of 20 events should pass the cap");
}
#[test]
fn per_callsite_isolates_sites_from_each_other() {
let limiter = PerCallsiteRateLimiter::new(3);
let allowed_a = drive(&limiter, &CS_C, 10, "site_c");
let allowed_b = drive(&limiter, &CS_D, 10, "site_d");
assert_eq!(allowed_a, 3, "site C should be capped at 3");
assert_eq!(
allowed_b, 3,
"site D should get its own 3 — not be starved by site C"
);
}
#[test]
fn per_callsite_window_resets() {
let limiter = PerCallsiteRateLimiter::new(2);
static CS_W: TestCallsite = TestCallsite { _tag: 5 };
let first = drive(&limiter, &CS_W, 5, "window_reset");
assert_eq!(first, 2);
for entry in limiter.inner.callsites.iter() {
entry.window_start_nanos.store(0, Ordering::Relaxed);
entry.event_count.store(0, Ordering::Relaxed);
}
let second = drive(&limiter, &CS_W, 5, "window_reset");
assert_eq!(second, 2, "fresh window must restore full budget");
}
#[test]
fn per_callsite_summary_path_smoke_single_threaded() {
static CS_S: TestCallsite = TestCallsite { _tag: 6 };
let limiter = PerCallsiteRateLimiter::with_summary_interval(2, Duration::from_nanos(1));
let _ = drive(&limiter, &CS_S, 10, "summary_smoke");
}
#[cfg(feature = "trace")]
#[test]
fn dynfilter_wiring_actually_filters_per_event() {
use std::sync::atomic::AtomicUsize;
use tracing_core::Event;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::{Context, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;
struct CountingLayer {
count: Arc<AtomicUsize>,
}
impl<S> Layer<S> for CountingLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, _event: &Event<'_>, _ctx: Context<'_, S>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
let count = Arc::new(AtomicUsize::new(0));
let count_layer = CountingLayer {
count: count.clone(),
};
let limiter = PerCallsiteRateLimiter::new(3);
let pc = limiter.clone();
let filter =
tracing_subscriber::filter::DynFilterFn::new(move |meta, _cx| pc.should_allow(meta));
let subscriber = tracing_subscriber::registry()
.with(filter)
.with(count_layer);
tracing::subscriber::with_default(subscriber, || {
for _ in 0..20 {
tracing::warn!(target: "dynfilter_test", "spam");
}
});
let observed = count.load(Ordering::SeqCst);
assert_eq!(
observed, 3,
"DynFilterFn must call the limiter per-event; saw {observed} \
of 20 events. If this is 20, the limiter is wired through \
`filter_fn` (which caches Interest per callsite and only \
invokes the closure once per callsite) — switch to \
`DynFilterFn`. See codex review on PR #4273."
);
}
#[test]
fn per_callsite_default_uses_canonical_constant() {
let limiter = PerCallsiteRateLimiter::default();
assert_eq!(
limiter.inner.max_events_per_second_per_callsite,
DEFAULT_MAX_EVENTS_PER_CALLSITE_PER_SECOND
);
}
#[test]
fn per_callsite_cap_hit_path_degrades_and_warns_once() {
let limiter = PerCallsiteRateLimiter::new(3);
for i in 0..MAX_TRACKED_CALLSITES {
let cs: &'static TestCallsite = Box::leak(Box::new(TestCallsite { _tag: 1_000 + i }));
limiter.should_allow_inner(Identifier(cs), "filler");
}
assert_eq!(
limiter.inner.callsites.len(),
MAX_TRACKED_CALLSITES,
"table should fill exactly to the cap"
);
assert!(
!limiter.inner.cap_hit_warned.load(Ordering::SeqCst),
"cap-hit latch should not fire while inserting still succeeds"
);
let overflow_a: &'static TestCallsite =
Box::leak(Box::new(TestCallsite { _tag: 9_000_001 }));
let allowed = limiter.should_allow_inner(Identifier(overflow_a), "overflow_a");
assert!(allowed, "cap-hit path must degrade to allow");
assert!(
limiter.inner.cap_hit_warned.load(Ordering::SeqCst),
"cap-hit must flip the warned latch"
);
assert_eq!(
limiter.inner.callsites.len(),
MAX_TRACKED_CALLSITES,
"cap-hit must NOT grow the table past the cap"
);
let overflow_b: &'static TestCallsite =
Box::leak(Box::new(TestCallsite { _tag: 9_000_002 }));
let allowed_again = limiter.should_allow_inner(Identifier(overflow_b), "overflow_b");
assert!(allowed_again);
assert!(
limiter.inner.cap_hit_warned.load(Ordering::SeqCst),
"latch must remain set"
);
assert_eq!(limiter.inner.callsites.len(), MAX_TRACKED_CALLSITES);
}
#[test]
fn per_callsite_concurrent_writers_and_summary_no_deadlock() {
use std::sync::atomic::AtomicUsize;
use std::thread;
static CS_CONC: TestCallsite = TestCallsite { _tag: 12345 };
const CAP: u64 = 50;
const THREADS: usize = 8;
const EVENTS_PER_THREAD: usize = 1000;
let limiter = PerCallsiteRateLimiter::with_summary_interval(CAP, Duration::from_nanos(1));
let limiter_arc = Arc::new(limiter);
let allowed_total = Arc::new(AtomicUsize::new(0));
let stop = Arc::new(AtomicBool::new(false));
let writers: Vec<_> = (0..THREADS)
.map(|_| {
let l = limiter_arc.clone();
let a = allowed_total.clone();
thread::spawn(move || {
for _ in 0..EVENTS_PER_THREAD {
if l.should_allow_inner(Identifier(&CS_CONC), "concurrent") {
a.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
let summary_thread = {
let l = limiter_arc.clone();
let s = stop.clone();
thread::spawn(move || {
while !s.load(Ordering::SeqCst) {
let _ = l.should_allow_inner(Identifier(&CS_CONC), "concurrent");
}
})
};
for w in writers {
w.join().expect("writer thread panicked");
}
stop.store(true, Ordering::SeqCst);
summary_thread
.join()
.expect("summary thread panicked — likely deadlock with writers");
let allowed = allowed_total.load(Ordering::SeqCst);
let total_events = THREADS * EVENTS_PER_THREAD;
assert!(
(allowed as u64) <= CAP * 10,
"allowed={allowed} exceeded cap*10={} — counter atomicity broken",
CAP * 10
);
assert!(
(allowed as u64) >= CAP / 2,
"allowed={allowed} suspiciously low — fewer than CAP/2={}",
CAP / 2
);
assert!(
allowed < total_events,
"every event passed (allowed={allowed} = total={total_events}) — \
cap not enforced"
);
}
#[test]
fn per_callsite_is_cloneable_and_shares_state() {
static CS_X: TestCallsite = TestCallsite { _tag: 7 };
let limiter1 = PerCallsiteRateLimiter::new(3);
let limiter2 = limiter1.clone();
let a = drive(&limiter1, &CS_X, 3, "shared_state");
let b = drive(&limiter2, &CS_X, 3, "shared_state");
assert_eq!(a, 3);
assert_eq!(b, 0, "limiter2 must observe limiter1's exhausted state");
}
}