callback_server/router.rs
1//! Event routing for HTTP callback notifications.
2//!
3//! This module provides the `EventRouter` which maintains a set of active
4//! subscription IDs and routes incoming UPnP event notifications to a channel.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8use tokio::sync::{mpsc, RwLock};
9
10/// Generic notification payload for UPnP event notifications.
11///
12/// This represents an unparsed UPnP event notification that has been received
13/// via HTTP callback. It contains only the subscription ID and raw XML body,
14/// with no device-specific context.
15#[derive(Debug, Clone)]
16pub struct NotificationPayload {
17 /// The subscription ID from the UPnP SID header
18 pub subscription_id: String,
19 /// The raw XML event body
20 pub event_xml: String,
21}
22
23/// Routes events from HTTP callbacks to a channel.
24///
25/// The `EventRouter` maintains a set of active subscription IDs. When an event
26/// is received via HTTP callback, the router checks if the subscription is
27/// registered and sends the notification payload to the configured channel.
28#[derive(Clone)]
29pub struct EventRouter {
30 /// Set of active subscription IDs
31 subscriptions: Arc<RwLock<HashSet<String>>>,
32 /// Channel for sending notification payloads
33 event_sender: mpsc::UnboundedSender<NotificationPayload>,
34}
35
36impl EventRouter {
37 /// Create a new event router.
38 ///
39 /// # Arguments
40 ///
41 /// * `event_sender` - Channel for sending notification payloads
42 ///
43 /// # Example
44 ///
45 /// ```
46 /// use tokio::sync::mpsc;
47 /// use callback_server::router::{EventRouter, NotificationPayload};
48 ///
49 /// let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
50 /// let router = EventRouter::new(tx);
51 /// ```
52 pub fn new(event_sender: mpsc::UnboundedSender<NotificationPayload>) -> Self {
53 Self {
54 subscriptions: Arc::new(RwLock::new(HashSet::new())),
55 event_sender,
56 }
57 }
58
59 /// Register a subscription ID for event routing.
60 ///
61 /// This adds the subscription ID to the set of active subscriptions,
62 /// allowing incoming events for this subscription to be routed.
63 ///
64 /// # Arguments
65 ///
66 /// * `subscription_id` - The UPnP subscription ID to register
67 ///
68 /// # Example
69 ///
70 /// ```
71 /// # use tokio::sync::mpsc;
72 /// # use callback_server::router::{EventRouter, NotificationPayload};
73 /// # #[tokio::main]
74 /// # async fn main() {
75 /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
76 /// # let router = EventRouter::new(tx);
77 /// router.register("uuid:subscription-123".to_string()).await;
78 /// # }
79 /// ```
80 pub async fn register(&self, subscription_id: String) {
81 let mut subs = self.subscriptions.write().await;
82 subs.insert(subscription_id);
83 }
84
85 /// Unregister a subscription ID.
86 ///
87 /// Removes the subscription ID from the set of active subscriptions,
88 /// preventing future events for this subscription from being routed.
89 ///
90 /// # Arguments
91 ///
92 /// * `subscription_id` - The subscription ID to unregister
93 ///
94 /// # Example
95 ///
96 /// ```
97 /// # use tokio::sync::mpsc;
98 /// # use callback_server::router::{EventRouter, NotificationPayload};
99 /// # #[tokio::main]
100 /// # async fn main() {
101 /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
102 /// # let router = EventRouter::new(tx);
103 /// # router.register("uuid:subscription-123".to_string()).await;
104 /// router.unregister("uuid:subscription-123").await;
105 /// # }
106 /// ```
107 pub async fn unregister(&self, subscription_id: &str) {
108 let mut subs = self.subscriptions.write().await;
109 subs.remove(subscription_id);
110 }
111
112 /// Route an incoming event to the unified event stream.
113 ///
114 /// This method is the core of the unified event stream processing pattern.
115 /// It checks if the subscription ID is registered and sends a `NotificationPayload`
116 /// to the configured channel for further processing by the event stream processor.
117 ///
118 /// The unified approach means that all events from all speakers and services
119 /// flow through this single routing point, enabling efficient aggregation
120 /// and processing.
121 ///
122 /// # Arguments
123 ///
124 /// * `subscription_id` - The subscription ID from the UPnP SID header
125 /// * `event_xml` - The raw XML event body from the UPnP notification
126 ///
127 /// # Returns
128 ///
129 /// Returns `true` if the event was successfully routed to the unified stream,
130 /// `false` if the subscription ID was not registered.
131 ///
132 /// # Unified Event Processing
133 ///
134 /// This method enables the unified event stream processor pattern by:
135 /// 1. Validating that the subscription is registered and active
136 /// 2. Creating a generic notification payload with subscription context
137 /// 3. Forwarding to the unified event stream for service-specific processing
138 /// 4. Allowing downstream components to add speaker and service context
139 ///
140 /// # Example
141 ///
142 /// ```
143 /// # use tokio::sync::mpsc;
144 /// # use callback_server::router::{EventRouter, NotificationPayload};
145 /// # #[tokio::main]
146 /// # async fn main() {
147 /// # let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
148 /// # let router = EventRouter::new(tx);
149 /// # router.register("uuid:subscription-123".to_string()).await;
150 /// let routed = router.route_event(
151 /// "uuid:subscription-123".to_string(),
152 /// "<event>data</event>".to_string()
153 /// ).await;
154 /// assert!(routed);
155 /// # }
156 /// ```
157 pub async fn route_event(&self, subscription_id: String, event_xml: String) -> bool {
158 let subs = self.subscriptions.read().await;
159
160 if subs.contains(&subscription_id) {
161 let payload = NotificationPayload {
162 subscription_id,
163 event_xml,
164 };
165
166 // Send payload to channel (ignore errors if receiver is dropped)
167 let _ = self.event_sender.send(payload);
168 true
169 } else {
170 false
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[tokio::test]
180 async fn test_event_router_register_and_route() {
181 let (tx, mut rx) = mpsc::unbounded_channel();
182 let router = EventRouter::new(tx);
183
184 let sub_id = "test-sub-123".to_string();
185
186 // Register subscription
187 router.register(sub_id.clone()).await;
188
189 // Route an event
190 let event_xml = "<event>test</event>".to_string();
191 let routed = router.route_event(sub_id.clone(), event_xml.clone()).await;
192 assert!(routed);
193
194 // Verify payload was sent
195 let payload = rx.recv().await.unwrap();
196 assert_eq!(payload.subscription_id, sub_id);
197 assert_eq!(payload.event_xml, event_xml);
198 }
199
200 #[tokio::test]
201 async fn test_event_router_unregister() {
202 let (tx, mut rx) = mpsc::unbounded_channel();
203 let router = EventRouter::new(tx);
204
205 let sub_id = "test-sub-123".to_string();
206
207 // Register and then unregister
208 router.register(sub_id.clone()).await;
209 router.unregister(&sub_id).await;
210
211 // Try to route an event - should fail
212 let event_xml = "<event>test</event>".to_string();
213 let routed = router.route_event(sub_id, event_xml).await;
214 assert!(!routed);
215
216 // No payload should be received
217 assert!(rx.try_recv().is_err());
218 }
219
220 #[tokio::test]
221 async fn test_event_router_unknown_subscription() {
222 let (tx, mut rx) = mpsc::unbounded_channel();
223 let router = EventRouter::new(tx);
224
225 // Try to route event for unknown subscription
226 let routed = router
227 .route_event("unknown-sub".to_string(), "<event>test</event>".to_string())
228 .await;
229 assert!(!routed);
230
231 // No payload should be received
232 assert!(rx.try_recv().is_err());
233 }
234}