use std::collections::HashSet;
use compact_str::CompactString;
use super::attention::UrgencyBasedPolicy;
use super::queue::SignalQueue;
use crate::types::policy::{AttentionPolicy, SignalDisposition};
use crate::types::signal::RuntimeSignal;
pub struct SignalRouter {
seen: HashSet<CompactString>,
queue: SignalQueue,
attention: Box<dyn AttentionPolicy>,
}
impl SignalRouter {
pub fn new(max_queue_size: usize) -> Self {
Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
}
pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
Self {
seen: HashSet::new(),
queue: SignalQueue::new(max_queue_size),
attention: policy,
}
}
pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
if let Some(ref key) = signal.dedupe_key {
if !self.seen.insert(key.clone()) {
return SignalDisposition::Ignore;
}
}
let disposition = self.attention.evaluate(&signal, is_running);
if disposition == SignalDisposition::Queue {
if !self.queue.push(signal) {
return SignalDisposition::Dropped;
}
}
disposition
}
pub fn next(&mut self) -> Option<RuntimeSignal> {
self.queue.pop()
}
pub fn next_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
self.queue.pop_for(recipient)
}
pub fn depth(&self) -> usize {
self.queue.len()
}
pub fn clear_dedup(&mut self) {
self.seen.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::signal::{SignalSource, SignalType, Urgency};
#[test]
fn deduplicates_signals() {
let mut router = SignalRouter::new(100);
let sig = RuntimeSignal::new(
SignalSource::Cron,
SignalType::Event,
Urgency::Normal,
"tick",
)
.with_dedupe("cron-tick-1");
let d1 = router.ingest(sig.clone(), false);
assert_ne!(d1, SignalDisposition::Ignore);
let d2 = router.ingest(sig, false);
assert_eq!(d2, SignalDisposition::Ignore);
}
#[test]
fn normal_signal_queued() {
let mut router = SignalRouter::new(100);
let sig = RuntimeSignal::new(
SignalSource::Cron,
SignalType::Event,
Urgency::Normal,
"job",
);
let d = router.ingest(sig, false);
assert_eq!(d, SignalDisposition::Queue);
assert_eq!(router.depth(), 1);
assert!(router.next().is_some());
}
#[test]
fn interrupt_signals_not_queued() {
let mut router = SignalRouter::new(100);
let sig = RuntimeSignal::new(
SignalSource::Gateway,
SignalType::Alert,
Urgency::Critical,
"fire",
);
let d = router.ingest(sig, true);
assert_eq!(d, SignalDisposition::InterruptNow);
assert_eq!(router.depth(), 0);
}
#[test]
fn full_queue_drops_signal() {
let mut router = SignalRouter::new(1);
let s1 = RuntimeSignal::new(
SignalSource::Cron,
SignalType::Event,
Urgency::Normal,
"first",
);
let s2 = RuntimeSignal::new(
SignalSource::Cron,
SignalType::Event,
Urgency::Normal,
"second",
);
assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
}
#[test]
fn next_for_drains_recipient_plus_broadcast_only() {
let mut router = SignalRouter::new(100);
let to_a = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "a")
.with_recipient("sess-a")
.with_timestamp(1);
let to_b = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "b")
.with_recipient("sess-b")
.with_timestamp(2);
let broadcast =
RuntimeSignal::new(SignalSource::Cron, SignalType::Job, Urgency::Normal, "all")
.with_timestamp(3);
router.ingest(to_a, false);
router.ingest(to_b, false);
router.ingest(broadcast, false);
let first = router.next_for(Some("sess-a")).unwrap();
let second = router.next_for(Some("sess-a")).unwrap();
let mut got: Vec<_> = [first.summary.as_str(), second.summary.as_str()]
.iter()
.map(|s| s.to_string())
.collect();
got.sort();
assert_eq!(got, vec!["a".to_string(), "all".to_string()]);
assert!(router.next_for(Some("sess-a")).is_none());
assert_eq!(router.next_for(Some("sess-b")).unwrap().summary.as_str(), "b");
}
#[test]
fn next_for_none_drains_anything() {
let mut router = SignalRouter::new(100);
router.ingest(
RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "x")
.with_recipient("sess-a"),
false,
);
assert_eq!(router.next_for(None).unwrap().summary.as_str(), "x");
}
#[test]
fn clear_dedup_allows_reingest() {
let mut router = SignalRouter::new(100);
let sig = RuntimeSignal::new(
SignalSource::Cron,
SignalType::Event,
Urgency::Normal,
"tick",
)
.with_dedupe("key-1");
router.ingest(sig.clone(), false);
assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);
router.clear_dedup();
assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
}
}