Skip to main content

callback_server/
server.rs

1//! HTTP server for receiving UPnP event notifications.
2
3use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
4use std::sync::Arc;
5use tokio::sync::mpsc;
6use tracing::{debug, error, info, trace, warn};
7use warp::Filter;
8
9use super::router::{EventRouter, NotificationPayload};
10
11/// HTTP callback server for receiving UPnP event notifications.
12///
13/// The `CallbackServer` binds to a local port and provides an HTTP endpoint
14/// for receiving UPnP NOTIFY requests. It validates UPnP headers and routes
15/// events through an `EventRouter` to a channel.
16///
17/// # Example
18///
19/// ```no_run
20/// use tokio::sync::mpsc;
21/// use callback_server::{CallbackServer, NotificationPayload};
22///
23/// #[tokio::main]
24/// async fn main() {
25///     let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
26///     
27///     let server = CallbackServer::new((3400, 3500), tx)
28///         .await
29///         .expect("Failed to create callback server");
30///     
31///     println!("Server listening at: {}", server.base_url());
32///     
33///     // Process notifications
34///     while let Some(notification) = rx.recv().await {
35///         println!("Received event for subscription: {}", notification.subscription_id);
36///     }
37/// }
38/// ```
39pub struct CallbackServer {
40    /// The port the server is bound to
41    port: u16,
42    /// The base URL for callback registration
43    base_url: String,
44    /// Event router for handling incoming events
45    event_router: Arc<EventRouter>,
46    /// Shutdown signal sender
47    shutdown_tx: Option<mpsc::Sender<()>>,
48    /// Server task handle
49    server_handle: Option<tokio::task::JoinHandle<()>>,
50}
51
52impl CallbackServer {
53    /// Create and start a new unified callback server.
54    ///
55    /// This method creates a single HTTP server that efficiently handles all UPnP
56    /// event notifications from multiple speakers and services. The server:
57    /// - Finds an available port in the specified range
58    /// - Detects the local IP address for callback URLs
59    /// - Starts an HTTP server to receive all UPnP NOTIFY requests
60    /// - Routes events through a unified event router to registered handlers
61    ///
62    /// # Unified Event Stream Processing
63    ///
64    /// The callback server is designed to support the unified event stream processor
65    /// pattern where a single HTTP endpoint receives events from multiple UPnP
66    /// services and speakers, then routes them to appropriate handlers based on
67    /// subscription IDs.
68    ///
69    /// # Arguments
70    ///
71    /// * `port_range` - Range of ports to try binding to (start, end)
72    /// * `event_sender` - Channel for sending notification payloads to the unified processor
73    ///
74    /// # Returns
75    ///
76    /// Returns the callback server instance or an error if no port could be bound
77    /// or the local IP address could not be detected.
78    ///
79    /// # Example
80    ///
81    /// ```no_run
82    /// # use tokio::sync::mpsc;
83    /// # use callback_server::{CallbackServer, NotificationPayload};
84    /// # #[tokio::main]
85    /// # async fn main() {
86    /// let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
87    /// let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
88    /// println!("Unified callback server listening at: {}", server.base_url());
89    /// # }
90    /// ```
91    pub async fn new(
92        port_range: (u16, u16),
93        event_sender: mpsc::UnboundedSender<NotificationPayload>,
94    ) -> Result<Self, String> {
95        // Find an available port in the range
96        let port = Self::find_available_port(port_range.0, port_range.1).ok_or_else(|| {
97            format!(
98                "No available port found in range {}-{}",
99                port_range.0, port_range.1
100            )
101        })?;
102
103        // Detect local IP address
104        let local_ip = Self::detect_local_ip()
105            .ok_or_else(|| "Failed to detect local IP address".to_string())?;
106
107        eprintln!("Detected local IP address: {local_ip}");
108        let base_url = format!("http://{local_ip}:{port}");
109        eprintln!("Callback server base URL: {base_url}");
110
111        // Create event router
112        let event_router = Arc::new(EventRouter::new(event_sender));
113
114        // Create shutdown channel
115        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
116
117        // Create ready signal channel
118        let (ready_tx, mut ready_rx) = mpsc::channel::<()>(1);
119
120        // Start the HTTP server
121        let server_handle = Self::start_server(port, event_router.clone(), shutdown_rx, ready_tx);
122
123        // Wait for server to be ready
124        ready_rx
125            .recv()
126            .await
127            .ok_or_else(|| "Server failed to start".to_string())?;
128
129        Ok(Self {
130            port,
131            base_url,
132            event_router,
133            shutdown_tx: Some(shutdown_tx),
134            server_handle: Some(server_handle),
135        })
136    }
137
138    /// Get the unified callback URL for subscription registration.
139    ///
140    /// This URL should be used when subscribing to UPnP events from any speaker
141    /// or service. The unified callback server will route all incoming events
142    /// based on their subscription IDs to the appropriate handlers.
143    ///
144    /// The format is `http://<local_ip>:<port>` and this same URL is used for
145    /// all subscriptions, enabling the unified event stream processing pattern.
146    ///
147    /// # Example
148    ///
149    /// ```no_run
150    /// # use tokio::sync::mpsc;
151    /// # use callback_server::{CallbackServer, NotificationPayload};
152    /// # #[tokio::main]
153    /// # async fn main() {
154    /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
155    /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
156    /// let callback_url = server.base_url();
157    /// println!("Use this URL for all subscriptions: {}", callback_url);
158    /// # }
159    /// ```
160    pub fn base_url(&self) -> &str {
161        &self.base_url
162    }
163
164    /// Get the port the server is bound to.
165    pub fn port(&self) -> u16 {
166        self.port
167    }
168
169    /// Get a reference to the event router.
170    ///
171    /// The router can be used to register and unregister subscription IDs
172    /// for event routing.
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// # use tokio::sync::mpsc;
178    /// # use callback_server::{CallbackServer, NotificationPayload};
179    /// # #[tokio::main]
180    /// # async fn main() {
181    /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
182    /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
183    /// server.router().register("uuid:subscription-123".to_string()).await;
184    /// # }
185    /// ```
186    pub fn router(&self) -> &Arc<EventRouter> {
187        &self.event_router
188    }
189
190    /// Shutdown the callback server gracefully.
191    ///
192    /// Sends a shutdown signal to the HTTP server and waits for it to complete
193    /// any in-flight requests.
194    ///
195    /// # Example
196    ///
197    /// ```no_run
198    /// # use tokio::sync::mpsc;
199    /// # use callback_server::{CallbackServer, NotificationPayload};
200    /// # #[tokio::main]
201    /// # async fn main() {
202    /// # let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
203    /// # let server = CallbackServer::new((3400, 3500), tx).await.unwrap();
204    /// server.shutdown().await.unwrap();
205    /// # }
206    /// ```
207    pub async fn shutdown(mut self) -> Result<(), String> {
208        // Send shutdown signal to HTTP server
209        if let Some(tx) = self.shutdown_tx.take() {
210            let _ = tx.send(()).await;
211        }
212
213        // Wait for server task to complete
214        if let Some(handle) = self.server_handle.take() {
215            let _ = handle.await;
216        }
217
218        Ok(())
219    }
220
221    /// Find an available port in the given range.
222    fn find_available_port(start: u16, end: u16) -> Option<u16> {
223        (start..=end).find(|&port| Self::is_port_available(port))
224    }
225
226    /// Check if a port is available for binding.
227    fn is_port_available(port: u16) -> bool {
228        TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)).is_ok()
229    }
230
231    /// Detect the local IP address for callback URLs.
232    ///
233    /// This uses a UDP socket connection to determine the local IP address
234    /// that would be used for outbound connections. No data is actually sent.
235    fn detect_local_ip() -> Option<IpAddr> {
236        // Try to connect to a public IP to determine our local IP
237        // We don't actually send data, just use the socket to determine routing
238        let socket = std::net::UdpSocket::bind("0.0.0.0:0").ok()?;
239        socket.connect("8.8.8.8:80").ok()?;
240        let local_addr = socket.local_addr().ok()?;
241        Some(local_addr.ip())
242    }
243
244    /// Start the HTTP server on the given port.
245    fn start_server(
246        port: u16,
247        event_router: Arc<EventRouter>,
248        mut shutdown_rx: mpsc::Receiver<()>,
249        ready_tx: mpsc::Sender<()>,
250    ) -> tokio::task::JoinHandle<()> {
251        tokio::spawn(async move {
252            // Create the NOTIFY endpoint that accepts any path (like the old code)
253            let notify_route = warp::method()
254                .and(warp::path::full())
255                .and(warp::header::optional::<String>("sid"))
256                .and(warp::header::optional::<String>("nt"))
257                .and(warp::header::optional::<String>("nts"))
258                .and(warp::body::bytes())
259                .and_then({
260                    let router = event_router.clone();
261                    move |method: warp::http::Method,
262                          path: warp::path::FullPath,
263                          sid: Option<String>,
264                          nt: Option<String>,
265                          nts: Option<String>,
266                          body: bytes::Bytes| {
267                        let router = router.clone();
268                        async move {
269                            // Only handle NOTIFY method
270                            if method != warp::http::Method::from_bytes(b"NOTIFY").unwrap() {
271                                return Err(warp::reject::not_found());
272                            }
273
274                            // Log incoming request details for unified event stream monitoring
275                            debug!(
276                                method = %method,
277                                path = %path.as_str(),
278                                body_size = body.len(),
279                                sid = ?sid,
280                                nt = ?nt,
281                                nts = ?nts,
282                                "Received UPnP NOTIFY event"
283                            );
284
285                            // Convert body to string and log content at trace level only
286                            let event_xml = String::from_utf8_lossy(&body).to_string();
287                            if event_xml.len() > 200 {
288                                trace!(
289                                    event_xml_preview = %&event_xml[..200],
290                                    total_length = event_xml.len(),
291                                    "UPnP event XML content (truncated)"
292                                );
293                            } else {
294                                trace!(
295                                    event_xml = %event_xml,
296                                    "UPnP event XML content (full)"
297                                );
298                            }
299
300                            // Validate UPnP headers
301                            if !Self::validate_upnp_headers(&sid, &nt, &nts) {
302                                error!(
303                                    sid = ?sid,
304                                    nt = ?nt,
305                                    nts = ?nts,
306                                    "Invalid UPnP headers in NOTIFY request"
307                                );
308                                return Err(warp::reject::custom(InvalidUpnpHeaders));
309                            }
310
311                            // Extract subscription ID from SID header (required for UPnP events)
312                            let sub_id = sid.ok_or_else(|| {
313                                error!("Missing required SID header in UPnP NOTIFY request");
314                                warp::reject::custom(InvalidUpnpHeaders)
315                            })?;
316
317                            // Route the event through the unified event stream
318                            let routed = router.route_event(sub_id.clone(), event_xml).await;
319
320                            if routed {
321                                debug!(
322                                    subscription_id = %sub_id,
323                                    "UPnP event routed successfully"
324                                );
325                                Ok::<_, warp::Rejection>(warp::reply::with_status(
326                                    "",
327                                    warp::http::StatusCode::OK,
328                                ))
329                            } else {
330                                warn!(
331                                    subscription_id = %sub_id,
332                                    "UPnP event routing failed - subscription not found"
333                                );
334                                Err(warp::reject::not_found())
335                            }
336                        }
337                    }
338                });
339
340            // Configure routes with just the NOTIFY endpoint
341            let routes = notify_route.recover(handle_rejection);
342
343            // Create server with graceful shutdown
344            let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(
345                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
346                async move {
347                    shutdown_rx.recv().await;
348                },
349            );
350
351            info!(
352                address = %addr,
353                "CallbackServer listening - ready to process UPnP events"
354            );
355            // Signal that server is ready
356            let _ = ready_tx.send(()).await;
357            server.await;
358        })
359    }
360
361    /// Validate UPnP event notification headers.
362    ///
363    /// Checks that the required SID header is present and validates optional
364    /// NT and NTS headers if they are provided.
365    fn validate_upnp_headers(
366        sid: &Option<String>,
367        nt: &Option<String>,
368        nts: &Option<String>,
369    ) -> bool {
370        // SID header is required for event notifications
371        if sid.is_none() {
372            return false;
373        }
374
375        // For UPnP events, NT and NTS headers are typically present
376        // If present, validate they have expected values
377        if let (Some(nt_val), Some(nts_val)) = (nt, nts) {
378            if nt_val != "upnp:event" || nts_val != "upnp:propchange" {
379                return false;
380            }
381        }
382
383        true
384    }
385}
386
387/// Custom rejection for invalid UPnP headers.
388#[derive(Debug)]
389struct InvalidUpnpHeaders;
390
391impl warp::reject::Reject for InvalidUpnpHeaders {}
392
393/// Handle rejections and convert them to HTTP responses.
394async fn handle_rejection(
395    err: warp::Rejection,
396) -> Result<impl warp::Reply, std::convert::Infallible> {
397    let code;
398    let message;
399
400    if err.is_not_found() {
401        code = warp::http::StatusCode::NOT_FOUND;
402        message = "Subscription not found";
403    } else if err.find::<InvalidUpnpHeaders>().is_some() {
404        code = warp::http::StatusCode::BAD_REQUEST;
405        message = "Invalid UPnP headers";
406    } else {
407        code = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
408        message = "Internal server error";
409    }
410
411    Ok(warp::reply::with_status(message, code))
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn test_is_port_available() {
420        // Port 0 should always be available (OS assigns a free port)
421        assert!(CallbackServer::is_port_available(0));
422
423        // Bind to a port and verify it's no longer available
424        let _listener = TcpListener::bind("0.0.0.0:0").unwrap();
425        let port = _listener.local_addr().unwrap().port();
426        // While the listener is held, the port should not be available
427        assert!(!CallbackServer::is_port_available(port));
428        // Keep listener alive for the assertion
429        drop(_listener);
430    }
431
432    #[test]
433    fn test_find_available_port() {
434        // Should find a port in a reasonable range
435        let port = CallbackServer::find_available_port(50000, 50100);
436        assert!(port.is_some());
437        assert!(port.unwrap() >= 50000 && port.unwrap() <= 50100);
438    }
439
440    #[test]
441    fn test_detect_local_ip() {
442        let ip = CallbackServer::detect_local_ip();
443        assert!(ip.is_some());
444
445        // Should not be localhost
446        if let Some(IpAddr::V4(addr)) = ip {
447            assert_ne!(addr, Ipv4Addr::new(127, 0, 0, 1));
448        }
449    }
450
451    #[test]
452    fn test_validate_upnp_headers() {
453        // Valid headers with NT and NTS
454        assert!(CallbackServer::validate_upnp_headers(
455            &Some("uuid:123".to_string()),
456            &Some("upnp:event".to_string()),
457            &Some("upnp:propchange".to_string()),
458        ));
459
460        // Valid headers without NT and NTS (event notification)
461        assert!(CallbackServer::validate_upnp_headers(
462            &Some("uuid:123".to_string()),
463            &None,
464            &None,
465        ));
466
467        // Invalid: missing SID
468        assert!(!CallbackServer::validate_upnp_headers(
469            &None,
470            &Some("upnp:event".to_string()),
471            &Some("upnp:propchange".to_string()),
472        ));
473
474        // Invalid: wrong NT value
475        assert!(!CallbackServer::validate_upnp_headers(
476            &Some("uuid:123".to_string()),
477            &Some("wrong".to_string()),
478            &Some("upnp:propchange".to_string()),
479        ));
480
481        // Invalid: wrong NTS value
482        assert!(!CallbackServer::validate_upnp_headers(
483            &Some("uuid:123".to_string()),
484            &Some("upnp:event".to_string()),
485            &Some("wrong".to_string()),
486        ));
487    }
488
489    #[tokio::test]
490    async fn test_callback_server_creation() {
491        let (tx, _rx) = mpsc::unbounded_channel();
492
493        let server = CallbackServer::new((50000, 50100), tx).await;
494        assert!(server.is_ok());
495
496        let server = server.unwrap();
497        assert!(server.port() >= 50000 && server.port() <= 50100);
498        assert!(server.base_url().contains(&server.port().to_string()));
499
500        // Cleanup
501        server.shutdown().await.unwrap();
502    }
503
504    #[tokio::test]
505    async fn test_callback_server_register_unregister() {
506        let (tx, _rx) = mpsc::unbounded_channel();
507        let server = CallbackServer::new((51000, 51100), tx).await.unwrap();
508
509        let sub_id = "test-sub-123".to_string();
510
511        // Register subscription via router
512        server.router().register(sub_id.clone()).await;
513
514        // Unregister subscription via router
515        server.router().unregister(&sub_id).await;
516
517        // Plugin system has been removed
518
519        // Cleanup
520        server.shutdown().await.unwrap();
521    }
522}