use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock};
use tracing::debug;
const BUFFER_TTL: Duration = Duration::from_secs(5);
#[derive(Debug, Clone)]
pub struct NotificationPayload {
pub subscription_id: String,
pub event_xml: String,
}
struct RouterState {
subscriptions: HashSet<String>,
pending: Vec<(String, String, Instant)>,
}
#[derive(Clone)]
pub struct EventRouter {
state: Arc<RwLock<RouterState>>,
event_sender: mpsc::UnboundedSender<NotificationPayload>,
}
impl EventRouter {
pub fn new(event_sender: mpsc::UnboundedSender<NotificationPayload>) -> Self {
Self {
state: Arc::new(RwLock::new(RouterState {
subscriptions: HashSet::new(),
pending: Vec::new(),
})),
event_sender,
}
}
pub async fn register(&self, subscription_id: String) {
let mut state = self.state.write().await;
state.subscriptions.insert(subscription_id.clone());
let now = Instant::now();
let mut i = 0;
while i < state.pending.len() {
let (ref sid, _, buffered_at) = state.pending[i];
if sid == &subscription_id {
let (_, xml, _) = state.pending.swap_remove(i);
debug!(sid = %subscription_id, "Replayed buffered event");
let payload = NotificationPayload {
subscription_id: subscription_id.clone(),
event_xml: xml,
};
let _ = self.event_sender.send(payload);
} else if now.duration_since(buffered_at) > BUFFER_TTL {
state.pending.swap_remove(i);
} else {
i += 1;
}
}
}
pub async fn unregister(&self, subscription_id: &str) {
let mut state = self.state.write().await;
state.subscriptions.remove(subscription_id);
state.pending.retain(|(sid, _, _)| sid != subscription_id);
}
pub async fn route_event(&self, subscription_id: String, event_xml: String) {
let mut state = self.state.write().await;
if state.subscriptions.contains(&subscription_id) {
let payload = NotificationPayload {
subscription_id,
event_xml,
};
let _ = self.event_sender.send(payload);
} else {
debug!(sid = %subscription_id, "Buffered event for pending SID");
state
.pending
.push((subscription_id, event_xml, Instant::now()));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_event_router_register_and_route() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
let sub_id = "test-sub-123".to_string();
router.register(sub_id.clone()).await;
let event_xml = "<event>test</event>".to_string();
router.route_event(sub_id.clone(), event_xml.clone()).await;
let payload = rx.recv().await.unwrap();
assert_eq!(payload.subscription_id, sub_id);
assert_eq!(payload.event_xml, event_xml);
}
#[tokio::test]
async fn test_event_router_unregister() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
let sub_id = "test-sub-123".to_string();
router.register(sub_id.clone()).await;
router.unregister(&sub_id).await;
let event_xml = "<event>test</event>".to_string();
router.route_event(sub_id, event_xml).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_event_router_unknown_subscription_buffers() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
router
.route_event("unknown-sub".to_string(), "<event>test</event>".to_string())
.await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_event_buffered_and_replayed_on_late_register() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
let sub_id = "uuid:late-register".to_string();
let event_xml =
"<e:propertyset><CurrentPlayMode>NORMAL</CurrentPlayMode></e:propertyset>".to_string();
router.route_event(sub_id.clone(), event_xml.clone()).await;
router.register(sub_id.clone()).await;
let payload = rx.try_recv().expect("expected replayed event");
assert_eq!(payload.subscription_id, sub_id);
assert_eq!(payload.event_xml, event_xml);
}
#[tokio::test]
async fn test_stale_buffer_entries_cleaned_on_register() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
{
let mut state = router.state.write().await;
state.pending.push((
"uuid:stale-sid".to_string(),
"<event>stale</event>".to_string(),
Instant::now() - Duration::from_secs(10), ));
}
router.register("uuid:fresh-sid".to_string()).await;
assert!(rx.try_recv().is_err());
let state = router.state.read().await;
assert!(state.pending.is_empty(), "stale entry should be cleaned up");
}
#[tokio::test]
async fn test_unregister_drains_buffer() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
let sub_id = "uuid:drain-test".to_string();
router
.route_event(sub_id.clone(), "<event>buffered</event>".to_string())
.await;
router.unregister(&sub_id).await;
router.register(sub_id.clone()).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_multiple_buffered_events_replayed() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
let sub_id = "uuid:multi".to_string();
router
.route_event(sub_id.clone(), "<event>first</event>".to_string())
.await;
router
.route_event(sub_id.clone(), "<event>second</event>".to_string())
.await;
router.register(sub_id.clone()).await;
let p1 = rx.try_recv().expect("expected first replayed event");
assert!(p1.event_xml.contains("first"));
let p2 = rx.try_recv().expect("expected second replayed event");
assert!(p2.event_xml.contains("second"));
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_buffer_isolates_different_sids() {
let (tx, mut rx) = mpsc::unbounded_channel();
let router = EventRouter::new(tx);
router
.route_event("uuid:sid-a".to_string(), "<event>a</event>".to_string())
.await;
router
.route_event("uuid:sid-b".to_string(), "<event>b</event>".to_string())
.await;
router.register("uuid:sid-a".to_string()).await;
let p = rx.try_recv().expect("expected replayed event for sid-a");
assert_eq!(p.subscription_id, "uuid:sid-a");
assert!(p.event_xml.contains("a"));
assert!(rx.try_recv().is_err());
router.register("uuid:sid-b".to_string()).await;
let p2 = rx.try_recv().expect("expected replayed event for sid-b");
assert_eq!(p2.subscription_id, "uuid:sid-b");
assert!(p2.event_xml.contains("b"));
}
}