Skip to main content

callback_server/
router.rs

1//! Event routing for HTTP callback notifications.
2//!
3//! This module provides the `EventRouter` which maintains a set of active
4//! subscription IDs and routes incoming UPnP event notifications to a channel.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8use tokio::sync::{mpsc, RwLock};
9
10/// Generic notification payload for UPnP event notifications.
11///
12/// This represents an unparsed UPnP event notification that has been received
13/// via HTTP callback. It contains only the subscription ID and raw XML body,
14/// with no device-specific context.
15#[derive(Debug, Clone)]
16pub struct NotificationPayload {
17    /// The subscription ID from the UPnP SID header
18    pub subscription_id: String,
19    /// The raw XML event body
20    pub event_xml: String,
21}
22
23/// Routes events from HTTP callbacks to a channel.
24///
25/// The `EventRouter` maintains a set of active subscription IDs. When an event
26/// is received via HTTP callback, the router checks if the subscription is
27/// registered and sends the notification payload to the configured channel.
28#[derive(Clone)]
29pub struct EventRouter {
30    /// Set of active subscription IDs
31    subscriptions: Arc<RwLock<HashSet<String>>>,
32    /// Channel for sending notification payloads
33    event_sender: mpsc::UnboundedSender<NotificationPayload>,
34}
35
36impl EventRouter {
37    /// Create a new event router.
38    ///
39    /// # Arguments
40    ///
41    /// * `event_sender` - Channel for sending notification payloads
42    ///
43    /// # Example
44    ///
45    /// ```
46    /// use tokio::sync::mpsc;
47    /// use callback_server::router::{EventRouter, NotificationPayload};
48    ///
49    /// let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
50    /// let router = EventRouter::new(tx);
51    /// ```
52    pub fn new(event_sender: mpsc::UnboundedSender<NotificationPayload>) -> Self {
53        Self {
54            subscriptions: Arc::new(RwLock::new(HashSet::new())),
55            event_sender,
56        }
57    }
58
59    /// Register a subscription ID for event routing.
60    ///
61    /// This adds the subscription ID to the set of active subscriptions,
62    /// allowing incoming events for this subscription to be routed.
63    ///
64    /// # Arguments
65    ///
66    /// * `subscription_id` - The UPnP subscription ID to register
67    ///
68    /// # Example
69    ///
70    /// ```
71    /// # use tokio::sync::mpsc;
72    /// # use callback_server::router::{EventRouter, NotificationPayload};
73    /// # #[tokio::main]
74    /// # async fn main() {
75    /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
76    /// # let router = EventRouter::new(tx);
77    /// router.register("uuid:subscription-123".to_string()).await;
78    /// # }
79    /// ```
80    pub async fn register(&self, subscription_id: String) {
81        let mut subs = self.subscriptions.write().await;
82        subs.insert(subscription_id);
83    }
84
85    /// Unregister a subscription ID.
86    ///
87    /// Removes the subscription ID from the set of active subscriptions,
88    /// preventing future events for this subscription from being routed.
89    ///
90    /// # Arguments
91    ///
92    /// * `subscription_id` - The subscription ID to unregister
93    ///
94    /// # Example
95    ///
96    /// ```
97    /// # use tokio::sync::mpsc;
98    /// # use callback_server::router::{EventRouter, NotificationPayload};
99    /// # #[tokio::main]
100    /// # async fn main() {
101    /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
102    /// # let router = EventRouter::new(tx);
103    /// # router.register("uuid:subscription-123".to_string()).await;
104    /// router.unregister("uuid:subscription-123").await;
105    /// # }
106    /// ```
107    pub async fn unregister(&self, subscription_id: &str) {
108        let mut subs = self.subscriptions.write().await;
109        subs.remove(subscription_id);
110    }
111
112    /// Route an incoming event to the unified event stream.
113    ///
114    /// This method is the core of the unified event stream processing pattern.
115    /// It checks if the subscription ID is registered and sends a `NotificationPayload`
116    /// to the configured channel for further processing by the event stream processor.
117    ///
118    /// The unified approach means that all events from all speakers and services
119    /// flow through this single routing point, enabling efficient aggregation
120    /// and processing.
121    ///
122    /// # Arguments
123    ///
124    /// * `subscription_id` - The subscription ID from the UPnP SID header
125    /// * `event_xml` - The raw XML event body from the UPnP notification
126    ///
127    /// # Returns
128    ///
129    /// Returns `true` if the event was successfully routed to the unified stream,
130    /// `false` if the subscription ID was not registered.
131    ///
132    /// # Unified Event Processing
133    ///
134    /// This method enables the unified event stream processor pattern by:
135    /// 1. Validating that the subscription is registered and active
136    /// 2. Creating a generic notification payload with subscription context
137    /// 3. Forwarding to the unified event stream for service-specific processing
138    /// 4. Allowing downstream components to add speaker and service context
139    ///
140    /// # Example
141    ///
142    /// ```
143    /// # use tokio::sync::mpsc;
144    /// # use callback_server::router::{EventRouter, NotificationPayload};
145    /// # #[tokio::main]
146    /// # async fn main() {
147    /// # let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
148    /// # let router = EventRouter::new(tx);
149    /// # router.register("uuid:subscription-123".to_string()).await;
150    /// let routed = router.route_event(
151    ///     "uuid:subscription-123".to_string(),
152    ///     "<event>data</event>".to_string()
153    /// ).await;
154    /// assert!(routed);
155    /// # }
156    /// ```
157    pub async fn route_event(&self, subscription_id: String, event_xml: String) -> bool {
158        let subs = self.subscriptions.read().await;
159
160        if subs.contains(&subscription_id) {
161            let payload = NotificationPayload {
162                subscription_id,
163                event_xml,
164            };
165
166            // Send payload to channel (ignore errors if receiver is dropped)
167            let _ = self.event_sender.send(payload);
168            true
169        } else {
170            false
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[tokio::test]
180    async fn test_event_router_register_and_route() {
181        let (tx, mut rx) = mpsc::unbounded_channel();
182        let router = EventRouter::new(tx);
183
184        let sub_id = "test-sub-123".to_string();
185
186        // Register subscription
187        router.register(sub_id.clone()).await;
188
189        // Route an event
190        let event_xml = "<event>test</event>".to_string();
191        let routed = router.route_event(sub_id.clone(), event_xml.clone()).await;
192        assert!(routed);
193
194        // Verify payload was sent
195        let payload = rx.recv().await.unwrap();
196        assert_eq!(payload.subscription_id, sub_id);
197        assert_eq!(payload.event_xml, event_xml);
198    }
199
200    #[tokio::test]
201    async fn test_event_router_unregister() {
202        let (tx, mut rx) = mpsc::unbounded_channel();
203        let router = EventRouter::new(tx);
204
205        let sub_id = "test-sub-123".to_string();
206
207        // Register and then unregister
208        router.register(sub_id.clone()).await;
209        router.unregister(&sub_id).await;
210
211        // Try to route an event - should fail
212        let event_xml = "<event>test</event>".to_string();
213        let routed = router.route_event(sub_id, event_xml).await;
214        assert!(!routed);
215
216        // No payload should be received
217        assert!(rx.try_recv().is_err());
218    }
219
220    #[tokio::test]
221    async fn test_event_router_unknown_subscription() {
222        let (tx, mut rx) = mpsc::unbounded_channel();
223        let router = EventRouter::new(tx);
224
225        // Try to route event for unknown subscription
226        let routed = router
227            .route_event("unknown-sub".to_string(), "<event>test</event>".to_string())
228            .await;
229        assert!(!routed);
230
231        // No payload should be received
232        assert!(rx.try_recv().is_err());
233    }
234}