use std::sync::Mutex;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum LogKind {
FileExportIo = 0,
AuditIo = 1,
PromServe = 2,
HeartbeatIo = 3,
AuditRingWarn = 4,
AuditRingCritical = 5,
}
impl LogKind {
pub const COUNT: usize = 6;
#[inline]
pub fn index(self) -> usize {
self as usize
}
}
#[derive(Clone, Copy, Default)]
struct BucketState {
last_emit_ns: u64,
queued_drops: u64,
total_suppressed: u64,
}
pub struct LogRateLimiter {
buckets: [BucketState; LogKind::COUNT],
}
impl LogRateLimiter {
const COOLDOWN_NS: u64 = 1_000_000_000;
const fn new() -> Self {
Self {
buckets: [BucketState {
last_emit_ns: 0,
queued_drops: 0,
total_suppressed: 0,
}; LogKind::COUNT],
}
}
pub fn should_emit(&mut self, kind: LogKind, now_ns: u64) -> Option<u64> {
let b = &mut self.buckets[kind.index()];
if b.last_emit_ns == 0 || now_ns.saturating_sub(b.last_emit_ns) >= Self::COOLDOWN_NS {
let drops = b.queued_drops;
b.last_emit_ns = now_ns;
b.queued_drops = 0;
Some(drops)
} else {
b.queued_drops = b.queued_drops.saturating_add(1);
b.total_suppressed = b.total_suppressed.saturating_add(1);
None
}
}
pub fn snapshot_totals(&self) -> [u64; LogKind::COUNT] {
let mut out = [0u64; LogKind::COUNT];
for (i, b) in self.buckets.iter().enumerate() {
out[i] = b.total_suppressed;
}
out
}
}
pub static LOG_RATE_LIMITER: Mutex<LogRateLimiter> = Mutex::new(LogRateLimiter::new());
#[inline]
pub fn rl_now_ns() -> u64 {
std::time::UNIX_EPOCH
.elapsed()
.map(|d| d.as_nanos().min(u64::MAX as u128) as u64)
.unwrap_or(0)
}
#[macro_export]
macro_rules! varta_warn_rl {
($kind:expr, $($arg:tt)*) => {{
let _now = $crate::log_ratelimit::rl_now_ns();
let _drops = $crate::log_ratelimit::LOG_RATE_LIMITER
.lock()
.unwrap_or_else(|p| p.into_inner())
.should_emit($kind, _now);
if let Some(_d) = _drops {
let mut _buf = $crate::log::StackFmt::<320>::new();
let _ = ::core::fmt::Write::write_fmt(&mut _buf, ::core::format_args!($($arg)*));
if _d > 0 {
let _ = ::core::fmt::Write::write_fmt(
&mut _buf,
::core::format_args!(" (suppressed {_d})"),
);
}
$crate::varta_warn!("{}", _buf.as_str());
}
}};
}
#[macro_export]
macro_rules! varta_error_rl {
($kind:expr, $($arg:tt)*) => {{
let _now = $crate::log_ratelimit::rl_now_ns();
let _drops = $crate::log_ratelimit::LOG_RATE_LIMITER
.lock()
.unwrap_or_else(|p| p.into_inner())
.should_emit($kind, _now);
if let Some(_d) = _drops {
let mut _buf = $crate::log::StackFmt::<320>::new();
let _ = ::core::fmt::Write::write_fmt(&mut _buf, ::core::format_args!($($arg)*));
if _d > 0 {
let _ = ::core::fmt::Write::write_fmt(
&mut _buf,
::core::format_args!(" (suppressed {_d})"),
);
}
$crate::varta_error!("{}", _buf.as_str());
}
}};
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_limiter() -> LogRateLimiter {
LogRateLimiter::new()
}
#[test]
fn first_message_always_emitted() {
let mut rl = fresh_limiter();
assert!(rl
.should_emit(LogKind::FileExportIo, 1_000_000_000)
.is_some());
}
#[test]
fn second_message_within_cooldown_suppressed() {
let mut rl = fresh_limiter();
rl.should_emit(LogKind::FileExportIo, 1_000_000_000);
assert!(rl
.should_emit(LogKind::FileExportIo, 1_500_000_000)
.is_none());
}
#[test]
fn message_allowed_after_cooldown() {
let mut rl = fresh_limiter();
rl.should_emit(LogKind::FileExportIo, 1_000_000_000);
rl.should_emit(LogKind::FileExportIo, 1_100_000_000);
rl.should_emit(LogKind::FileExportIo, 1_200_000_000);
let result = rl.should_emit(LogKind::FileExportIo, 2_000_000_001);
assert_eq!(result, Some(2));
}
#[test]
fn total_suppressed_accumulates() {
let mut rl = fresh_limiter();
rl.should_emit(LogKind::AuditIo, 1_000_000_000);
rl.should_emit(LogKind::AuditIo, 1_000_000_100); rl.should_emit(LogKind::AuditIo, 1_000_000_200); let totals = rl.snapshot_totals();
assert_eq!(totals[LogKind::AuditIo.index()], 2);
}
#[test]
fn kinds_are_independent() {
let mut rl = fresh_limiter();
assert!(rl.should_emit(LogKind::FileExportIo, 1_000).is_some());
assert!(rl.should_emit(LogKind::AuditIo, 1_000).is_some());
assert!(rl.should_emit(LogKind::FileExportIo, 2_000).is_none());
assert!(rl.should_emit(LogKind::AuditIo, 2_000).is_none());
}
}