Skip to main content

callback_server/
router.rs

1//! Event routing for HTTP callback notifications.
2//!
3//! This module provides the `EventRouter` which maintains a set of active
4//! subscription IDs and routes incoming UPnP event notifications to a channel.
5//! Events for not-yet-registered SIDs are buffered and replayed when
6//! registration completes, preventing the race between SUBSCRIBE response
7//! and initial NOTIFY delivery.
8
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{mpsc, RwLock};
13use tracing::debug;
14
15/// Maximum time a buffered event is kept before being discarded.
16/// The race window is typically microseconds; 5 seconds handles any
17/// pathological scheduling delay.
18const BUFFER_TTL: Duration = Duration::from_secs(5);
19
20/// Generic notification payload for UPnP event notifications.
21///
22/// This represents an unparsed UPnP event notification that has been received
23/// via HTTP callback. It contains only the subscription ID and raw XML body,
24/// with no device-specific context.
25#[derive(Debug, Clone)]
26pub struct NotificationPayload {
27    /// The subscription ID from the UPnP SID header
28    pub subscription_id: String,
29    /// The raw XML event body
30    pub event_xml: String,
31}
32
33/// Internal state protected by a single lock to eliminate TOCTOU gaps.
34struct RouterState {
35    subscriptions: HashSet<String>,
36    /// Flat buffer of (subscription_id, event_xml, buffered_at).
37    /// Expected size: 0-5 entries. Only populated during the microsecond
38    /// race window between SUBSCRIBE response and register() call.
39    pending: Vec<(String, String, Instant)>,
40}
41
42/// Routes events from HTTP callbacks to a channel.
43///
44/// The `EventRouter` maintains a set of active subscription IDs. When an event
45/// is received via HTTP callback, the router checks if the subscription is
46/// registered and sends the notification payload to the configured channel.
47///
48/// Events for unregistered SIDs are buffered briefly and replayed when
49/// `register()` is called, preventing the race between SUBSCRIBE response
50/// and initial UPnP NOTIFY delivery.
51#[derive(Clone)]
52pub struct EventRouter {
53    state: Arc<RwLock<RouterState>>,
54    /// Channel for sending notification payloads
55    event_sender: mpsc::UnboundedSender<NotificationPayload>,
56}
57
58impl EventRouter {
59    /// Create a new event router.
60    ///
61    /// # Arguments
62    ///
63    /// * `event_sender` - Channel for sending notification payloads
64    ///
65    /// # Example
66    ///
67    /// ```
68    /// use tokio::sync::mpsc;
69    /// use callback_server::router::{EventRouter, NotificationPayload};
70    ///
71    /// let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
72    /// let router = EventRouter::new(tx);
73    /// ```
74    pub fn new(event_sender: mpsc::UnboundedSender<NotificationPayload>) -> Self {
75        Self {
76            state: Arc::new(RwLock::new(RouterState {
77                subscriptions: HashSet::new(),
78                pending: Vec::new(),
79            })),
80            event_sender,
81        }
82    }
83
84    /// Register a subscription ID for event routing.
85    ///
86    /// Adds the SID to the active set and replays any buffered events that
87    /// arrived before registration (the SUBSCRIBE/NOTIFY race window).
88    /// Also cleans up stale buffer entries older than `BUFFER_TTL`.
89    pub async fn register(&self, subscription_id: String) {
90        let mut state = self.state.write().await;
91        state.subscriptions.insert(subscription_id.clone());
92
93        // Replay buffered events for this SID and remove stale entries.
94        let now = Instant::now();
95        let mut i = 0;
96        while i < state.pending.len() {
97            let (ref sid, _, buffered_at) = state.pending[i];
98            if sid == &subscription_id {
99                let (_, xml, _) = state.pending.swap_remove(i);
100                debug!(sid = %subscription_id, "Replayed buffered event");
101                let payload = NotificationPayload {
102                    subscription_id: subscription_id.clone(),
103                    event_xml: xml,
104                };
105                let _ = self.event_sender.send(payload);
106                // Don't increment i — swap_remove moved the last element here
107            } else if now.duration_since(buffered_at) > BUFFER_TTL {
108                state.pending.swap_remove(i);
109                // Don't increment i
110            } else {
111                i += 1;
112            }
113        }
114    }
115
116    /// Unregister a subscription ID.
117    ///
118    /// Removes the SID from the active set and drains any buffered events
119    /// for it, preventing stale replays on future re-registration.
120    pub async fn unregister(&self, subscription_id: &str) {
121        let mut state = self.state.write().await;
122        state.subscriptions.remove(subscription_id);
123        state.pending.retain(|(sid, _, _)| sid != subscription_id);
124    }
125
126    /// Route an incoming event to the unified event stream.
127    ///
128    /// If the subscription is registered, the event is sent immediately.
129    /// If not, the event is buffered for replay when `register()` is called.
130    /// The caller should always return HTTP 200 OK — buffered events are
131    /// accepted for processing, not rejected.
132    pub async fn route_event(&self, subscription_id: String, event_xml: String) {
133        let mut state = self.state.write().await;
134        if state.subscriptions.contains(&subscription_id) {
135            let payload = NotificationPayload {
136                subscription_id,
137                event_xml,
138            };
139            let _ = self.event_sender.send(payload);
140        } else {
141            debug!(sid = %subscription_id, "Buffered event for pending SID");
142            state
143                .pending
144                .push((subscription_id, event_xml, Instant::now()));
145        }
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[tokio::test]
154    async fn test_event_router_register_and_route() {
155        let (tx, mut rx) = mpsc::unbounded_channel();
156        let router = EventRouter::new(tx);
157
158        let sub_id = "test-sub-123".to_string();
159
160        // Register subscription
161        router.register(sub_id.clone()).await;
162
163        // Route an event
164        let event_xml = "<event>test</event>".to_string();
165        router.route_event(sub_id.clone(), event_xml.clone()).await;
166
167        // Verify payload was sent
168        let payload = rx.recv().await.unwrap();
169        assert_eq!(payload.subscription_id, sub_id);
170        assert_eq!(payload.event_xml, event_xml);
171    }
172
173    #[tokio::test]
174    async fn test_event_router_unregister() {
175        let (tx, mut rx) = mpsc::unbounded_channel();
176        let router = EventRouter::new(tx);
177
178        let sub_id = "test-sub-123".to_string();
179
180        // Register and then unregister
181        router.register(sub_id.clone()).await;
182        router.unregister(&sub_id).await;
183
184        // Route an event — should be buffered (not delivered), since SID is unregistered
185        let event_xml = "<event>test</event>".to_string();
186        router.route_event(sub_id, event_xml).await;
187
188        // No immediate payload — event was buffered, not routed
189        assert!(rx.try_recv().is_err());
190    }
191
192    #[tokio::test]
193    async fn test_event_router_unknown_subscription_buffers() {
194        let (tx, mut rx) = mpsc::unbounded_channel();
195        let router = EventRouter::new(tx);
196
197        // Route event for unknown subscription — should be buffered, not dropped
198        router
199            .route_event("unknown-sub".to_string(), "<event>test</event>".to_string())
200            .await;
201
202        // No immediate payload — event was buffered
203        assert!(rx.try_recv().is_err());
204    }
205
206    /// Proves the registration race condition: an event arriving before register()
207    /// should be buffered and replayed when register() is called.
208    #[tokio::test]
209    async fn test_event_buffered_and_replayed_on_late_register() {
210        let (tx, mut rx) = mpsc::unbounded_channel();
211        let router = EventRouter::new(tx);
212
213        let sub_id = "uuid:late-register".to_string();
214        let event_xml =
215            "<e:propertyset><CurrentPlayMode>NORMAL</CurrentPlayMode></e:propertyset>".to_string();
216
217        // 1. Event arrives BEFORE register (the race condition)
218        router.route_event(sub_id.clone(), event_xml.clone()).await;
219
220        // 2. Register happens moments later
221        router.register(sub_id.clone()).await;
222
223        // 3. The buffered event should have been replayed on register
224        let payload = rx.try_recv().expect("expected replayed event");
225        assert_eq!(payload.subscription_id, sub_id);
226        assert_eq!(payload.event_xml, event_xml);
227    }
228
229    /// Stale buffered events (older than BUFFER_TTL) are cleaned up during register().
230    #[tokio::test]
231    async fn test_stale_buffer_entries_cleaned_on_register() {
232        let (tx, mut rx) = mpsc::unbounded_channel();
233        let router = EventRouter::new(tx);
234
235        // Manually insert a stale entry by writing to state directly
236        {
237            let mut state = router.state.write().await;
238            state.pending.push((
239                "uuid:stale-sid".to_string(),
240                "<event>stale</event>".to_string(),
241                Instant::now() - Duration::from_secs(10), // 10s ago, well past TTL
242            ));
243        }
244
245        // Register a different SID — should clean up the stale entry
246        router.register("uuid:fresh-sid".to_string()).await;
247
248        // No events replayed (the stale entry was for a different SID and expired)
249        assert!(rx.try_recv().is_err());
250
251        // Verify the stale entry was cleaned up
252        let state = router.state.read().await;
253        assert!(state.pending.is_empty(), "stale entry should be cleaned up");
254    }
255
256    /// unregister() drains buffered events for the removed SID.
257    #[tokio::test]
258    async fn test_unregister_drains_buffer() {
259        let (tx, mut rx) = mpsc::unbounded_channel();
260        let router = EventRouter::new(tx);
261
262        let sub_id = "uuid:drain-test".to_string();
263
264        // Buffer an event
265        router
266            .route_event(sub_id.clone(), "<event>buffered</event>".to_string())
267            .await;
268
269        // Unregister — should drain the buffered event
270        router.unregister(&sub_id).await;
271
272        // Re-register — should NOT replay the drained event
273        router.register(sub_id.clone()).await;
274
275        // No events replayed (buffer was drained by unregister)
276        assert!(rx.try_recv().is_err());
277    }
278
279    /// Multiple buffered events for the same SID are all replayed.
280    #[tokio::test]
281    async fn test_multiple_buffered_events_replayed() {
282        let (tx, mut rx) = mpsc::unbounded_channel();
283        let router = EventRouter::new(tx);
284
285        let sub_id = "uuid:multi".to_string();
286
287        // Buffer two events before registering
288        router
289            .route_event(sub_id.clone(), "<event>first</event>".to_string())
290            .await;
291        router
292            .route_event(sub_id.clone(), "<event>second</event>".to_string())
293            .await;
294
295        // Register — both events should be replayed
296        router.register(sub_id.clone()).await;
297
298        let p1 = rx.try_recv().expect("expected first replayed event");
299        assert!(p1.event_xml.contains("first"));
300
301        let p2 = rx.try_recv().expect("expected second replayed event");
302        assert!(p2.event_xml.contains("second"));
303
304        // No more events
305        assert!(rx.try_recv().is_err());
306    }
307
308    /// Buffered events for different SIDs don't interfere.
309    #[tokio::test]
310    async fn test_buffer_isolates_different_sids() {
311        let (tx, mut rx) = mpsc::unbounded_channel();
312        let router = EventRouter::new(tx);
313
314        // Buffer events for two different SIDs
315        router
316            .route_event("uuid:sid-a".to_string(), "<event>a</event>".to_string())
317            .await;
318        router
319            .route_event("uuid:sid-b".to_string(), "<event>b</event>".to_string())
320            .await;
321
322        // Register only SID-A
323        router.register("uuid:sid-a".to_string()).await;
324
325        // Only SID-A's event should be replayed
326        let p = rx.try_recv().expect("expected replayed event for sid-a");
327        assert_eq!(p.subscription_id, "uuid:sid-a");
328        assert!(p.event_xml.contains("a"));
329
330        // SID-B's event is still in the buffer
331        assert!(rx.try_recv().is_err());
332
333        // Now register SID-B
334        router.register("uuid:sid-b".to_string()).await;
335
336        let p2 = rx.try_recv().expect("expected replayed event for sid-b");
337        assert_eq!(p2.subscription_id, "uuid:sid-b");
338        assert!(p2.event_xml.contains("b"));
339    }
340}