callback_server/server.rs
1//! HTTP server for receiving UPnP event notifications.
2
3use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
4use std::sync::Arc;
5use tokio::sync::mpsc;
6use tracing::{debug, error, info, trace, warn};
7use warp::Filter;
8
9use super::router::{EventRouter, NotificationPayload};
10
11/// HTTP callback server for receiving UPnP event notifications.
12///
13/// The `CallbackServer` binds to a local port and provides an HTTP endpoint
14/// for receiving UPnP NOTIFY requests. It validates UPnP headers and routes
15/// events through an `EventRouter` to a channel.
16///
17/// # Example
18///
19/// ```no_run
20/// use tokio::sync::mpsc;
21/// use callback_server::{CallbackServer, NotificationPayload};
22///
23/// #[tokio::main]
24/// async fn main() {
25/// let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
26///
27/// let server = CallbackServer::new((3400, 3500), tx)
28/// .await
29/// .expect("Failed to create callback server");
30///
31/// println!("Server listening at: {}", server.base_url());
32///
33/// // Process notifications
34/// while let Some(notification) = rx.recv().await {
35/// println!("Received event for subscription: {}", notification.subscription_id);
36/// }
37/// }
38/// ```
39pub struct CallbackServer {
40 /// The port the server is bound to
41 port: u16,
42 /// The base URL for callback registration
43 base_url: String,
44 /// Event router for handling incoming events
45 event_router: Arc<EventRouter>,
46 /// Shutdown signal sender
47 shutdown_tx: Option<mpsc::Sender<()>>,
48 /// Server task handle
49 server_handle: Option<tokio::task::JoinHandle<()>>,
50}
51
52impl CallbackServer {
53 /// Create and start a new unified callback server.
54 ///
55 /// This method creates a single HTTP server that efficiently handles all UPnP
56 /// event notifications from multiple speakers and services. The server:
57 /// - Finds an available port in the specified range
58 /// - Detects the local IP address for callback URLs
59 /// - Starts an HTTP server to receive all UPnP NOTIFY requests
60 /// - Routes events through a unified event router to registered handlers
61 ///
62 /// # Unified Event Stream Processing
63 ///
64 /// The callback server is designed to support the unified event stream processor
65 /// pattern where a single HTTP endpoint receives events from multiple UPnP
66 /// services and speakers, then routes them to appropriate handlers based on
67 /// subscription IDs.
68 ///
69 /// # Arguments
70 ///
71 /// * `port_range` - Range of ports to try binding to (start, end)
72 /// * `event_sender` - Channel for sending notification payloads to the unified processor
73 ///
74 /// # Returns
75 ///
76 /// Returns the callback server instance or an error if no port could be bound
77 /// or the local IP address could not be detected.
78 ///
79 /// # Example
80 ///
81 /// ```no_run
82 /// # use tokio::sync::mpsc;
83 /// # use callback_server::{CallbackServer, NotificationPayload};
84 /// # #[tokio::main]
85 /// # async fn main() {
86 /// let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
87 /// let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
88 /// println!("Unified callback server listening at: {}", server.base_url());
89 /// # }
90 /// ```
91 pub async fn new(
92 port_range: (u16, u16),
93 event_sender: mpsc::UnboundedSender<NotificationPayload>,
94 ) -> Result<Self, String> {
95 // Find an available port in the range
96 let port = Self::find_available_port(port_range.0, port_range.1).ok_or_else(|| {
97 format!(
98 "No available port found in range {}-{}",
99 port_range.0, port_range.1
100 )
101 })?;
102
103 // Detect local IP address
104 let local_ip = Self::detect_local_ip()
105 .ok_or_else(|| "Failed to detect local IP address".to_string())?;
106
107 eprintln!("Detected local IP address: {local_ip}");
108 let base_url = format!("http://{local_ip}:{port}");
109 eprintln!("Callback server base URL: {base_url}");
110
111 // Create event router
112 let event_router = Arc::new(EventRouter::new(event_sender));
113
114 // Create shutdown channel
115 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
116
117 // Create ready signal channel
118 let (ready_tx, mut ready_rx) = mpsc::channel::<()>(1);
119
120 // Start the HTTP server
121 let server_handle = Self::start_server(port, event_router.clone(), shutdown_rx, ready_tx);
122
123 // Wait for server to be ready
124 ready_rx
125 .recv()
126 .await
127 .ok_or_else(|| "Server failed to start".to_string())?;
128
129 Ok(Self {
130 port,
131 base_url,
132 event_router,
133 shutdown_tx: Some(shutdown_tx),
134 server_handle: Some(server_handle),
135 })
136 }
137
138 /// Get the unified callback URL for subscription registration.
139 ///
140 /// This URL should be used when subscribing to UPnP events from any speaker
141 /// or service. The unified callback server will route all incoming events
142 /// based on their subscription IDs to the appropriate handlers.
143 ///
144 /// The format is `http://<local_ip>:<port>` and this same URL is used for
145 /// all subscriptions, enabling the unified event stream processing pattern.
146 ///
147 /// # Example
148 ///
149 /// ```no_run
150 /// # use tokio::sync::mpsc;
151 /// # use callback_server::{CallbackServer, NotificationPayload};
152 /// # #[tokio::main]
153 /// # async fn main() {
154 /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
155 /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
156 /// let callback_url = server.base_url();
157 /// println!("Use this URL for all subscriptions: {}", callback_url);
158 /// # }
159 /// ```
160 pub fn base_url(&self) -> &str {
161 &self.base_url
162 }
163
164 /// Get the port the server is bound to.
165 pub fn port(&self) -> u16 {
166 self.port
167 }
168
169 /// Get a reference to the event router.
170 ///
171 /// The router can be used to register and unregister subscription IDs
172 /// for event routing.
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// # use tokio::sync::mpsc;
178 /// # use callback_server::{CallbackServer, NotificationPayload};
179 /// # #[tokio::main]
180 /// # async fn main() {
181 /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
182 /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
183 /// server.router().register("uuid:subscription-123".to_string()).await;
184 /// # }
185 /// ```
186 pub fn router(&self) -> &Arc<EventRouter> {
187 &self.event_router
188 }
189
190 /// Shutdown the callback server gracefully.
191 ///
192 /// Sends a shutdown signal to the HTTP server and waits for it to complete
193 /// any in-flight requests.
194 ///
195 /// # Example
196 ///
197 /// ```no_run
198 /// # use tokio::sync::mpsc;
199 /// # use callback_server::{CallbackServer, NotificationPayload};
200 /// # #[tokio::main]
201 /// # async fn main() {
202 /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
203 /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
204 /// server.shutdown().await.unwrap();
205 /// # }
206 /// ```
207 pub async fn shutdown(mut self) -> Result<(), String> {
208 // Send shutdown signal to HTTP server
209 if let Some(tx) = self.shutdown_tx.take() {
210 let _ = tx.send(()).await;
211 }
212
213 // Wait for server task to complete
214 if let Some(handle) = self.server_handle.take() {
215 let _ = handle.await;
216 }
217
218 Ok(())
219 }
220
221 /// Find an available port in the given range.
222 fn find_available_port(start: u16, end: u16) -> Option<u16> {
223 (start..=end).find(|&port| Self::is_port_available(port))
224 }
225
226 /// Check if a port is available for binding.
227 fn is_port_available(port: u16) -> bool {
228 TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)).is_ok()
229 }
230
231 /// Detect the local IP address for callback URLs.
232 ///
233 /// This uses a UDP socket connection to determine the local IP address
234 /// that would be used for outbound connections. No data is actually sent.
235 fn detect_local_ip() -> Option<IpAddr> {
236 // Try to connect to a public IP to determine our local IP
237 // We don't actually send data, just use the socket to determine routing
238 let socket = std::net::UdpSocket::bind("0.0.0.0:0").ok()?;
239 socket.connect("8.8.8.8:80").ok()?;
240 let local_addr = socket.local_addr().ok()?;
241 Some(local_addr.ip())
242 }
243
244 /// Start the HTTP server on the given port.
245 fn start_server(
246 port: u16,
247 event_router: Arc<EventRouter>,
248 mut shutdown_rx: mpsc::Receiver<()>,
249 ready_tx: mpsc::Sender<()>,
250 ) -> tokio::task::JoinHandle<()> {
251 tokio::spawn(async move {
252 // Create the NOTIFY endpoint that accepts any path (like the old code)
253 let notify_route = warp::method()
254 .and(warp::path::full())
255 .and(warp::header::optional::<String>("sid"))
256 .and(warp::header::optional::<String>("nt"))
257 .and(warp::header::optional::<String>("nts"))
258 .and(warp::body::bytes())
259 .and_then({
260 let router = event_router.clone();
261 move |method: warp::http::Method,
262 path: warp::path::FullPath,
263 sid: Option<String>,
264 nt: Option<String>,
265 nts: Option<String>,
266 body: bytes::Bytes| {
267 let router = router.clone();
268 async move {
269 // Only handle NOTIFY method
270 if method != warp::http::Method::from_bytes(b"NOTIFY").unwrap() {
271 return Err(warp::reject::not_found());
272 }
273
274 // Log incoming request details for unified event stream monitoring
275 debug!(
276 method = %method,
277 path = %path.as_str(),
278 body_size = body.len(),
279 sid = ?sid,
280 nt = ?nt,
281 nts = ?nts,
282 "Received UPnP NOTIFY event"
283 );
284
285 // Convert body to string and log content at trace level only
286 let event_xml = String::from_utf8_lossy(&body).to_string();
287 if event_xml.len() > 200 {
288 trace!(
289 event_xml_preview = %&event_xml[..200],
290 total_length = event_xml.len(),
291 "UPnP event XML content (truncated)"
292 );
293 } else {
294 trace!(
295 event_xml = %event_xml,
296 "UPnP event XML content (full)"
297 );
298 }
299
300 // Validate UPnP headers
301 if !Self::validate_upnp_headers(&sid, &nt, &nts) {
302 error!(
303 sid = ?sid,
304 nt = ?nt,
305 nts = ?nts,
306 "Invalid UPnP headers in NOTIFY request"
307 );
308 return Err(warp::reject::custom(InvalidUpnpHeaders));
309 }
310
311 // Extract subscription ID from SID header (required for UPnP events)
312 let sub_id = sid.ok_or_else(|| {
313 error!("Missing required SID header in UPnP NOTIFY request");
314 warp::reject::custom(InvalidUpnpHeaders)
315 })?;
316
317 // Route the event through the unified event stream
318 let routed = router.route_event(sub_id.clone(), event_xml).await;
319
320 if routed {
321 debug!(
322 subscription_id = %sub_id,
323 "UPnP event routed successfully"
324 );
325 Ok::<_, warp::Rejection>(warp::reply::with_status(
326 "",
327 warp::http::StatusCode::OK,
328 ))
329 } else {
330 warn!(
331 subscription_id = %sub_id,
332 "UPnP event routing failed - subscription not found"
333 );
334 Err(warp::reject::not_found())
335 }
336 }
337 }
338 });
339
340 // Configure routes with just the NOTIFY endpoint
341 let routes = notify_route.recover(handle_rejection);
342
343 // Create server with graceful shutdown
344 let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(
345 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
346 async move {
347 shutdown_rx.recv().await;
348 },
349 );
350
351 info!(
352 address = %addr,
353 "CallbackServer listening - ready to process UPnP events"
354 );
355 // Signal that server is ready
356 let _ = ready_tx.send(()).await;
357 server.await;
358 })
359 }
360
361 /// Validate UPnP event notification headers.
362 ///
363 /// Checks that the required SID header is present and validates optional
364 /// NT and NTS headers if they are provided.
365 fn validate_upnp_headers(
366 sid: &Option<String>,
367 nt: &Option<String>,
368 nts: &Option<String>,
369 ) -> bool {
370 // SID header is required for event notifications
371 if sid.is_none() {
372 return false;
373 }
374
375 // For UPnP events, NT and NTS headers are typically present
376 // If present, validate they have expected values
377 if let (Some(nt_val), Some(nts_val)) = (nt, nts) {
378 if nt_val != "upnp:event" || nts_val != "upnp:propchange" {
379 return false;
380 }
381 }
382
383 true
384 }
385}
386
387/// Custom rejection for invalid UPnP headers.
388#[derive(Debug)]
389struct InvalidUpnpHeaders;
390
391impl warp::reject::Reject for InvalidUpnpHeaders {}
392
393/// Handle rejections and convert them to HTTP responses.
394async fn handle_rejection(
395 err: warp::Rejection,
396) -> Result<impl warp::Reply, std::convert::Infallible> {
397 let code;
398 let message;
399
400 if err.is_not_found() {
401 code = warp::http::StatusCode::NOT_FOUND;
402 message = "Subscription not found";
403 } else if err.find::<InvalidUpnpHeaders>().is_some() {
404 code = warp::http::StatusCode::BAD_REQUEST;
405 message = "Invalid UPnP headers";
406 } else {
407 code = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
408 message = "Internal server error";
409 }
410
411 Ok(warp::reply::with_status(message, code))
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417
418 #[test]
419 fn test_is_port_available() {
420 // Port 0 should always be available (OS assigns a free port)
421 assert!(CallbackServer::is_port_available(0));
422
423 // Bind to a port and verify it's no longer available
424 let _listener = TcpListener::bind("0.0.0.0:0").unwrap();
425 let port = _listener.local_addr().unwrap().port();
426 // While the listener is held, the port should not be available
427 assert!(!CallbackServer::is_port_available(port));
428 // Keep listener alive for the assertion
429 drop(_listener);
430 }
431
432 #[test]
433 fn test_find_available_port() {
434 // Should find a port in a reasonable range
435 let port = CallbackServer::find_available_port(50000, 50100);
436 assert!(port.is_some());
437 assert!(port.unwrap() >= 50000 && port.unwrap() <= 50100);
438 }
439
440 #[test]
441 fn test_detect_local_ip() {
442 let ip = CallbackServer::detect_local_ip();
443 assert!(ip.is_some());
444
445 // Should not be localhost
446 if let Some(IpAddr::V4(addr)) = ip {
447 assert_ne!(addr, Ipv4Addr::new(127, 0, 0, 1));
448 }
449 }
450
451 #[test]
452 fn test_validate_upnp_headers() {
453 // Valid headers with NT and NTS
454 assert!(CallbackServer::validate_upnp_headers(
455 &Some("uuid:123".to_string()),
456 &Some("upnp:event".to_string()),
457 &Some("upnp:propchange".to_string()),
458 ));
459
460 // Valid headers without NT and NTS (event notification)
461 assert!(CallbackServer::validate_upnp_headers(
462 &Some("uuid:123".to_string()),
463 &None,
464 &None,
465 ));
466
467 // Invalid: missing SID
468 assert!(!CallbackServer::validate_upnp_headers(
469 &None,
470 &Some("upnp:event".to_string()),
471 &Some("upnp:propchange".to_string()),
472 ));
473
474 // Invalid: wrong NT value
475 assert!(!CallbackServer::validate_upnp_headers(
476 &Some("uuid:123".to_string()),
477 &Some("wrong".to_string()),
478 &Some("upnp:propchange".to_string()),
479 ));
480
481 // Invalid: wrong NTS value
482 assert!(!CallbackServer::validate_upnp_headers(
483 &Some("uuid:123".to_string()),
484 &Some("upnp:event".to_string()),
485 &Some("wrong".to_string()),
486 ));
487 }
488
489 #[tokio::test]
490 async fn test_callback_server_creation() {
491 let (tx, _rx) = mpsc::unbounded_channel();
492
493 let server = CallbackServer::new((50000, 50100), tx).await;
494 assert!(server.is_ok());
495
496 let server = server.unwrap();
497 assert!(server.port() >= 50000 && server.port() <= 50100);
498 assert!(server.base_url().contains(&server.port().to_string()));
499
500 // Cleanup
501 server.shutdown().await.unwrap();
502 }
503
504 #[tokio::test]
505 async fn test_callback_server_register_unregister() {
506 let (tx, _rx) = mpsc::unbounded_channel();
507 let server = CallbackServer::new((51000, 51100), tx).await.unwrap();
508
509 let sub_id = "test-sub-123".to_string();
510
511 // Register subscription via router
512 server.router().register(sub_id.clone()).await;
513
514 // Unregister subscription via router
515 server.router().unregister(&sub_id).await;
516
517 // Plugin system has been removed
518
519 // Cleanup
520 server.shutdown().await.unwrap();
521 }
522}