callback_server/
router.rs1use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{mpsc, RwLock};
13use tracing::debug;
14
15const BUFFER_TTL: Duration = Duration::from_secs(5);
19
20#[derive(Debug, Clone)]
26pub struct NotificationPayload {
27 pub subscription_id: String,
29 pub event_xml: String,
31}
32
33struct RouterState {
35 subscriptions: HashSet<String>,
36 pending: Vec<(String, String, Instant)>,
40}
41
42#[derive(Clone)]
52pub struct EventRouter {
53 state: Arc<RwLock<RouterState>>,
54 event_sender: mpsc::UnboundedSender<NotificationPayload>,
56}
57
58impl EventRouter {
59 pub fn new(event_sender: mpsc::UnboundedSender<NotificationPayload>) -> Self {
75 Self {
76 state: Arc::new(RwLock::new(RouterState {
77 subscriptions: HashSet::new(),
78 pending: Vec::new(),
79 })),
80 event_sender,
81 }
82 }
83
84 pub async fn register(&self, subscription_id: String) {
90 let mut state = self.state.write().await;
91 state.subscriptions.insert(subscription_id.clone());
92
93 let now = Instant::now();
95 let mut i = 0;
96 while i < state.pending.len() {
97 let (ref sid, _, buffered_at) = state.pending[i];
98 if sid == &subscription_id {
99 let (_, xml, _) = state.pending.swap_remove(i);
100 debug!(sid = %subscription_id, "Replayed buffered event");
101 let payload = NotificationPayload {
102 subscription_id: subscription_id.clone(),
103 event_xml: xml,
104 };
105 let _ = self.event_sender.send(payload);
106 } else if now.duration_since(buffered_at) > BUFFER_TTL {
108 state.pending.swap_remove(i);
109 } else {
111 i += 1;
112 }
113 }
114 }
115
116 pub async fn unregister(&self, subscription_id: &str) {
121 let mut state = self.state.write().await;
122 state.subscriptions.remove(subscription_id);
123 state.pending.retain(|(sid, _, _)| sid != subscription_id);
124 }
125
126 pub async fn route_event(&self, subscription_id: String, event_xml: String) {
133 let mut state = self.state.write().await;
134 if state.subscriptions.contains(&subscription_id) {
135 let payload = NotificationPayload {
136 subscription_id,
137 event_xml,
138 };
139 let _ = self.event_sender.send(payload);
140 } else {
141 debug!(sid = %subscription_id, "Buffered event for pending SID");
142 state
143 .pending
144 .push((subscription_id, event_xml, Instant::now()));
145 }
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[tokio::test]
154 async fn test_event_router_register_and_route() {
155 let (tx, mut rx) = mpsc::unbounded_channel();
156 let router = EventRouter::new(tx);
157
158 let sub_id = "test-sub-123".to_string();
159
160 router.register(sub_id.clone()).await;
162
163 let event_xml = "<event>test</event>".to_string();
165 router.route_event(sub_id.clone(), event_xml.clone()).await;
166
167 let payload = rx.recv().await.unwrap();
169 assert_eq!(payload.subscription_id, sub_id);
170 assert_eq!(payload.event_xml, event_xml);
171 }
172
173 #[tokio::test]
174 async fn test_event_router_unregister() {
175 let (tx, mut rx) = mpsc::unbounded_channel();
176 let router = EventRouter::new(tx);
177
178 let sub_id = "test-sub-123".to_string();
179
180 router.register(sub_id.clone()).await;
182 router.unregister(&sub_id).await;
183
184 let event_xml = "<event>test</event>".to_string();
186 router.route_event(sub_id, event_xml).await;
187
188 assert!(rx.try_recv().is_err());
190 }
191
192 #[tokio::test]
193 async fn test_event_router_unknown_subscription_buffers() {
194 let (tx, mut rx) = mpsc::unbounded_channel();
195 let router = EventRouter::new(tx);
196
197 router
199 .route_event("unknown-sub".to_string(), "<event>test</event>".to_string())
200 .await;
201
202 assert!(rx.try_recv().is_err());
204 }
205
206 #[tokio::test]
209 async fn test_event_buffered_and_replayed_on_late_register() {
210 let (tx, mut rx) = mpsc::unbounded_channel();
211 let router = EventRouter::new(tx);
212
213 let sub_id = "uuid:late-register".to_string();
214 let event_xml =
215 "<e:propertyset><CurrentPlayMode>NORMAL</CurrentPlayMode></e:propertyset>".to_string();
216
217 router.route_event(sub_id.clone(), event_xml.clone()).await;
219
220 router.register(sub_id.clone()).await;
222
223 let payload = rx.try_recv().expect("expected replayed event");
225 assert_eq!(payload.subscription_id, sub_id);
226 assert_eq!(payload.event_xml, event_xml);
227 }
228
229 #[tokio::test]
231 async fn test_stale_buffer_entries_cleaned_on_register() {
232 let (tx, mut rx) = mpsc::unbounded_channel();
233 let router = EventRouter::new(tx);
234
235 {
237 let mut state = router.state.write().await;
238 state.pending.push((
239 "uuid:stale-sid".to_string(),
240 "<event>stale</event>".to_string(),
241 Instant::now() - Duration::from_secs(10), ));
243 }
244
245 router.register("uuid:fresh-sid".to_string()).await;
247
248 assert!(rx.try_recv().is_err());
250
251 let state = router.state.read().await;
253 assert!(state.pending.is_empty(), "stale entry should be cleaned up");
254 }
255
256 #[tokio::test]
258 async fn test_unregister_drains_buffer() {
259 let (tx, mut rx) = mpsc::unbounded_channel();
260 let router = EventRouter::new(tx);
261
262 let sub_id = "uuid:drain-test".to_string();
263
264 router
266 .route_event(sub_id.clone(), "<event>buffered</event>".to_string())
267 .await;
268
269 router.unregister(&sub_id).await;
271
272 router.register(sub_id.clone()).await;
274
275 assert!(rx.try_recv().is_err());
277 }
278
279 #[tokio::test]
281 async fn test_multiple_buffered_events_replayed() {
282 let (tx, mut rx) = mpsc::unbounded_channel();
283 let router = EventRouter::new(tx);
284
285 let sub_id = "uuid:multi".to_string();
286
287 router
289 .route_event(sub_id.clone(), "<event>first</event>".to_string())
290 .await;
291 router
292 .route_event(sub_id.clone(), "<event>second</event>".to_string())
293 .await;
294
295 router.register(sub_id.clone()).await;
297
298 let p1 = rx.try_recv().expect("expected first replayed event");
299 assert!(p1.event_xml.contains("first"));
300
301 let p2 = rx.try_recv().expect("expected second replayed event");
302 assert!(p2.event_xml.contains("second"));
303
304 assert!(rx.try_recv().is_err());
306 }
307
308 #[tokio::test]
310 async fn test_buffer_isolates_different_sids() {
311 let (tx, mut rx) = mpsc::unbounded_channel();
312 let router = EventRouter::new(tx);
313
314 router
316 .route_event("uuid:sid-a".to_string(), "<event>a</event>".to_string())
317 .await;
318 router
319 .route_event("uuid:sid-b".to_string(), "<event>b</event>".to_string())
320 .await;
321
322 router.register("uuid:sid-a".to_string()).await;
324
325 let p = rx.try_recv().expect("expected replayed event for sid-a");
327 assert_eq!(p.subscription_id, "uuid:sid-a");
328 assert!(p.event_xml.contains("a"));
329
330 assert!(rx.try_recv().is_err());
332
333 router.register("uuid:sid-b".to_string()).await;
335
336 let p2 = rx.try_recv().expect("expected replayed event for sid-b");
337 assert_eq!(p2.subscription_id, "uuid:sid-b");
338 assert!(p2.event_xml.contains("b"));
339 }
340}