use std::collections::HashMap;
use astrid_events::{AstridEvent, EventBus};
const BUS_ACTIVITY_WINDOW: std::time::Duration = std::time::Duration::from_secs(5);
const BUS_STORM_RATE_THRESHOLD: f64 = 100.0;
const BUS_STORM_TOP_TOPICS: usize = 5;
const LAGGED_LABEL: &str = "(dropped/lagged)";
struct WindowSummary {
total: u64,
rate: f64,
is_storm: bool,
top_topics: String,
}
#[expect(
clippy::cast_precision_loss,
reason = "event counts in a 5s window stay far below 2^53, where f64 is exact"
)]
fn summarize_window(counts: &HashMap<String, u64>, elapsed_secs: f64) -> WindowSummary {
let total: u64 = counts.values().copied().sum();
let rate = if elapsed_secs > 0.0 {
total as f64 / elapsed_secs
} else {
0.0
};
let is_storm = rate >= BUS_STORM_RATE_THRESHOLD;
let top_topics = if is_storm {
let mut ranked: Vec<(&String, &u64)> = counts.iter().collect();
ranked.sort_unstable_by(|a, b| b.1.cmp(a.1).then_with(|| a.0.cmp(b.0)));
ranked
.iter()
.take(BUS_STORM_TOP_TOPICS)
.map(|(topic, count)| format!("{topic}={count}"))
.collect::<Vec<_>>()
.join(", ")
} else {
String::new()
};
WindowSummary {
total,
rate,
is_storm,
top_topics,
}
}
fn event_topic(event: &AstridEvent) -> &str {
match event {
AstridEvent::Ipc { message, .. } => message.topic.as_str(),
other => other.event_type(),
}
}
fn bump(counts: &mut HashMap<String, u64>, topic: &str, n: u64) {
if let Some(count) = counts.get_mut(topic) {
*count = count.saturating_add(n);
} else {
counts.insert(topic.to_string(), n);
}
}
pub(crate) fn spawn_bus_activity_monitor(event_bus: &EventBus) -> tokio::task::JoinHandle<()> {
let mut receiver = event_bus.subscribe_as("bus_monitor");
tokio::spawn(async move {
let mut counts: HashMap<String, u64> = HashMap::new();
let mut window_start = tokio::time::Instant::now();
let mut tick = tokio::time::interval(BUS_ACTIVITY_WINDOW);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
tick.tick().await;
loop {
tokio::select! {
event = receiver.recv() => {
let Some(first) = event else { break };
bump(&mut counts, event_topic(&first), 1);
while let Some(ev) = receiver.try_recv() {
bump(&mut counts, event_topic(&ev), 1);
}
let lagged = receiver.drain_lagged();
if lagged > 0 {
bump(&mut counts, LAGGED_LABEL, lagged);
}
},
_ = tick.tick() => {
metrics::counter!(
crate::METRIC_BACKGROUND_TICKS_TOTAL,
"loop" => "bus_monitor",
)
.increment(1);
let elapsed = window_start.elapsed().as_secs_f64();
let summary = summarize_window(&counts, elapsed);
if summary.is_storm {
tracing::warn!(
events_per_sec = summary.rate,
window_total = summary.total,
top_topics = %summary.top_topics,
"Event bus storm detected — sustained high publish rate \
(likely a feedback loop); hottest topics listed by volume"
);
} else if summary.total > 0 {
tracing::debug!(
events_per_sec = summary.rate,
window_total = summary.total,
"Event bus activity"
);
}
counts.clear();
window_start = tokio::time::Instant::now();
},
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
fn counts(pairs: &[(&str, u64)]) -> HashMap<String, u64> {
pairs.iter().map(|(t, c)| ((*t).to_string(), *c)).collect()
}
#[test]
fn empty_window_is_not_a_storm() {
let summary = summarize_window(&HashMap::new(), 5.0);
assert_eq!(summary.total, 0);
assert!(summary.rate.abs() < f64::EPSILON);
assert!(!summary.is_storm);
assert!(summary.top_topics.is_empty());
}
#[test]
fn low_rate_is_not_a_storm_and_skips_topic_formatting() {
let summary = summarize_window(&counts(&[("astrid.v1.watchdog.tick", 3)]), 5.0);
assert!(!summary.is_storm);
assert!(summary.top_topics.is_empty());
}
#[test]
fn sustained_high_rate_is_a_storm() {
let summary = summarize_window(&counts(&[("react.v1.step", 1000)]), 5.0);
assert!(summary.is_storm);
assert_eq!(summary.total, 1000);
assert!((summary.rate - 200.0).abs() < f64::EPSILON);
assert_eq!(summary.top_topics, "react.v1.step=1000");
}
#[test]
fn storm_names_hottest_topics_in_deterministic_order() {
let summary = summarize_window(
&counts(&[
("a.low", 10),
("b.high", 900),
("c.mid", 100),
("d.zero", 1),
]),
5.0,
);
assert!(summary.is_storm);
assert_eq!(
summary.top_topics,
"b.high=900, c.mid=100, a.low=10, d.zero=1"
);
}
#[test]
fn ties_break_on_topic_name_for_determinism() {
let summary = summarize_window(&counts(&[("zzz", 600), ("aaa", 600)]), 5.0);
assert!(summary.is_storm);
assert_eq!(summary.top_topics, "aaa=600, zzz=600");
}
#[test]
fn top_topics_is_capped() {
let pairs: Vec<(String, u64)> = (0..10)
.map(|i| (format!("topic.{i:02}"), 1000 - i))
.collect();
let map: HashMap<String, u64> = pairs.into_iter().collect();
let summary = summarize_window(&map, 5.0);
assert!(summary.is_storm);
assert_eq!(summary.top_topics.split(", ").count(), BUS_STORM_TOP_TOPICS);
}
#[test]
fn zero_elapsed_does_not_divide_by_zero() {
let summary = summarize_window(&counts(&[("x", 5)]), 0.0);
assert!(summary.rate.abs() < f64::EPSILON);
assert!(!summary.is_storm);
}
#[test]
fn dropped_events_count_toward_the_rate() {
let summary = summarize_window(&counts(&[(LAGGED_LABEL, 800)]), 5.0);
assert!(summary.is_storm);
assert_eq!(summary.top_topics, format!("{LAGGED_LABEL}=800"));
}
}