sonos-sdk-callback-server 0.5.1

Internal HTTP callback server for sonos-sdk UPnP event reception
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
//! Firewall detection coordinator for callback server.
//!
//! This module implements a per-device firewall detection system that monitors
//! real UPnP event delivery to determine whether callback servers can receive
//! external requests from Sonos devices on the local network.

use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};

/// Status of firewall detection for a device.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum FirewallStatus {
    /// Detection has not been performed yet
    #[default]
    Unknown,
    /// Server can receive external requests from this device
    Accessible,
    /// Server appears to be blocked by firewall for this device
    Blocked,
    /// Detection failed due to other errors
    Error,
}

/// Configuration for firewall detection behavior.
#[derive(Debug, Clone)]
pub struct FirewallDetectionConfig {
    /// Timeout for waiting for first event from a device
    pub event_wait_timeout: Duration,
    /// Enable per-device caching of firewall status
    pub enable_caching: bool,
    /// Maximum number of cached device states
    pub max_cached_devices: usize,
}

impl Default for FirewallDetectionConfig {
    fn default() -> Self {
        Self {
            event_wait_timeout: Duration::from_secs(15),
            enable_caching: true,
            max_cached_devices: 100,
        }
    }
}

/// Per-device firewall detection state.
#[derive(Debug, Clone)]
pub struct DeviceFirewallState {
    pub device_ip: IpAddr,
    pub status: FirewallStatus,
    pub first_subscription_time: SystemTime,
    pub first_event_time: Option<SystemTime>,
    pub detection_completed: bool,
    pub timeout_duration: Duration,
}

/// Result of a firewall detection operation.
#[derive(Debug, Clone)]
pub struct DetectionResult {
    pub device_ip: IpAddr,
    pub status: FirewallStatus,
    pub reason: DetectionReason,
}

/// Reason for detection completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DetectionReason {
    /// First event arrived within timeout
    EventReceived,
    /// No events received within timeout
    Timeout,
    /// Subscription creation failed
    SubscriptionFailed,
}

/// Coordinates per-device firewall detection by monitoring real UPnP event delivery.
///
/// The coordinator tracks firewall status on a per-device basis, triggering detection
/// when the first subscription is created for a device and monitoring for event arrivals
/// to determine connectivity status.
pub struct FirewallDetectionCoordinator {
    /// Per-device detection states
    device_states: Arc<RwLock<HashMap<IpAddr, Arc<RwLock<DeviceFirewallState>>>>>,

    /// Configuration for detection behavior
    config: FirewallDetectionConfig,

    /// Channel for notifying when detection completes
    detection_complete_tx: mpsc::UnboundedSender<DetectionResult>,

    /// Handle for the timeout monitoring task
    _timeout_task_handle: tokio::task::JoinHandle<()>,
}

impl FirewallDetectionCoordinator {
    /// Create a new firewall detection coordinator.
    pub fn new(config: FirewallDetectionConfig) -> Self {
        let (detection_complete_tx, mut detection_complete_rx) = mpsc::unbounded_channel();

        let device_states = Arc::new(RwLock::new(HashMap::new()));

        // Spawn background task for timeout monitoring
        let timeout_task_handle = {
            let device_states = device_states.clone();
            let detection_complete_tx = detection_complete_tx.clone();
            tokio::spawn(async move {
                Self::monitor_timeouts(device_states, detection_complete_tx).await;
            })
        };

        // Spawn task to handle detection results (logging for now)
        tokio::spawn(async move {
            while let Some(result) = detection_complete_rx.recv().await {
                match result.reason {
                    DetectionReason::EventReceived => {
                        info!(
                            device_ip = %result.device_ip,
                            reason = ?result.reason,
                            status = ?result.status,
                            "Firewall detection: Events accessible from device"
                        );
                    }
                    DetectionReason::Timeout => {
                        warn!(
                            device_ip = %result.device_ip,
                            reason = ?result.reason,
                            status = ?result.status,
                            "Firewall detection: No events received within timeout"
                        );
                    }
                    DetectionReason::SubscriptionFailed => {
                        warn!(
                            device_ip = %result.device_ip,
                            reason = ?result.reason,
                            status = ?result.status,
                            "Firewall detection: Subscription failed for device"
                        );
                    }
                }
            }
        });

        Self {
            device_states,
            config,
            detection_complete_tx,
            _timeout_task_handle: timeout_task_handle,
        }
    }

    /// Called when the first subscription is created for a device.
    ///
    /// Returns the cached status if already known, otherwise starts monitoring
    /// and returns Unknown status while detection is in progress.
    pub async fn on_first_subscription(&self, device_ip: IpAddr) -> FirewallStatus {
        if !self.config.enable_caching {
            // Caching disabled - always return Unknown and start fresh detection
            self.start_detection_for_device(device_ip).await;
            return FirewallStatus::Unknown;
        }

        let device_states = self.device_states.read().await;

        // Check if we already have cached status
        if let Some(state_arc) = device_states.get(&device_ip) {
            let state = state_arc.read().await;
            if state.detection_completed {
                debug!(
                    device_ip = %device_ip,
                    status = ?state.status,
                    "Firewall detection: Using cached status for device"
                );
                return state.status;
            }
        }

        drop(device_states); // Release read lock before starting detection

        // First subscription for this device - start monitoring
        self.start_detection_for_device(device_ip).await;

        debug!(
            device_ip = %device_ip,
            timeout = ?self.config.event_wait_timeout,
            "Firewall detection: Started monitoring device for events"
        );

        FirewallStatus::Unknown
    }

    /// Called when any event is received from a device.
    ///
    /// If detection is in progress for this device, marks it as accessible.
    pub async fn on_event_received(&self, device_ip: IpAddr) {
        let device_states = self.device_states.read().await;

        if let Some(state_arc) = device_states.get(&device_ip) {
            let mut state = state_arc.write().await;

            if !state.detection_completed {
                // First event received - mark as accessible
                state.first_event_time = Some(SystemTime::now());
                state.status = FirewallStatus::Accessible;
                state.detection_completed = true;

                let elapsed = SystemTime::now()
                    .duration_since(state.first_subscription_time)
                    .unwrap_or(Duration::ZERO);

                // Notify completion
                let _ = self.detection_complete_tx.send(DetectionResult {
                    device_ip,
                    status: FirewallStatus::Accessible,
                    reason: DetectionReason::EventReceived,
                });

                info!(
                    device_ip = %device_ip,
                    elapsed = ?elapsed,
                    status = ?FirewallStatus::Accessible,
                    "Firewall detection: Event received from device, marking as accessible"
                );
            }
        }
    }

    /// Get the current cached status for a device.
    pub async fn get_device_status(&self, device_ip: IpAddr) -> FirewallStatus {
        let device_states = self.device_states.read().await;

        if let Some(state_arc) = device_states.get(&device_ip) {
            let state = state_arc.read().await;
            state.status
        } else {
            FirewallStatus::Unknown
        }
    }

    /// Clear cached status for a device (useful for testing).
    pub async fn clear_device_cache(&self, device_ip: IpAddr) {
        let mut device_states = self.device_states.write().await;
        device_states.remove(&device_ip);
        debug!(
            device_ip = %device_ip,
            "Firewall detection: Cleared cache for device"
        );
    }

    /// Start detection monitoring for a specific device.
    async fn start_detection_for_device(&self, device_ip: IpAddr) {
        let mut device_states = self.device_states.write().await;

        // Create new detection state
        let new_state = Arc::new(RwLock::new(DeviceFirewallState {
            device_ip,
            status: FirewallStatus::Unknown,
            first_subscription_time: SystemTime::now(),
            first_event_time: None,
            detection_completed: false,
            timeout_duration: self.config.event_wait_timeout,
        }));

        // Enforce maximum cache size
        if device_states.len() >= self.config.max_cached_devices {
            // Remove oldest entry (this is a simple LRU-like behavior)
            if let Some(oldest_ip) = device_states.keys().next().copied() {
                device_states.remove(&oldest_ip);
                debug!(
                    oldest_ip = %oldest_ip,
                    cache_size = self.config.max_cached_devices,
                    "Firewall detection: Removed oldest cached entry due to cache being full"
                );
            }
        }

        device_states.insert(device_ip, new_state);
    }

    /// Background task that monitors for timeouts.
    async fn monitor_timeouts(
        device_states: Arc<RwLock<HashMap<IpAddr, Arc<RwLock<DeviceFirewallState>>>>>,
        detection_complete_tx: mpsc::UnboundedSender<DetectionResult>,
    ) {
        let mut interval = tokio::time::interval(Duration::from_secs(1));

        loop {
            interval.tick().await;

            let device_states_read = device_states.read().await;

            for (device_ip, state_arc) in device_states_read.iter() {
                let mut state = state_arc.write().await;

                if !state.detection_completed {
                    let elapsed = SystemTime::now()
                        .duration_since(state.first_subscription_time)
                        .unwrap_or(Duration::ZERO);

                    if elapsed >= state.timeout_duration {
                        // Timeout reached - mark as blocked
                        state.status = FirewallStatus::Blocked;
                        state.detection_completed = true;

                        // Notify completion
                        let _ = detection_complete_tx.send(DetectionResult {
                            device_ip: *device_ip,
                            status: FirewallStatus::Blocked,
                            reason: DetectionReason::Timeout,
                        });

                        warn!(
                            device_ip = %device_ip,
                            timeout = ?state.timeout_duration,
                            status = ?FirewallStatus::Blocked,
                            "Firewall detection: No events received within timeout, marking as blocked"
                        );
                    }
                }
            }
        }
    }

    /// Get statistics about the coordinator state.
    pub async fn get_stats(&self) -> CoordinatorStats {
        let device_states = self.device_states.read().await;

        let mut stats = CoordinatorStats {
            total_devices: device_states.len(),
            accessible_devices: 0,
            blocked_devices: 0,
            unknown_devices: 0,
            error_devices: 0,
        };

        for state_arc in device_states.values() {
            let state = state_arc.read().await;
            match state.status {
                FirewallStatus::Accessible => stats.accessible_devices += 1,
                FirewallStatus::Blocked => stats.blocked_devices += 1,
                FirewallStatus::Unknown => stats.unknown_devices += 1,
                FirewallStatus::Error => stats.error_devices += 1,
            }
        }

        stats
    }
}

/// Statistics about the firewall detection coordinator.
#[derive(Debug, Clone)]
pub struct CoordinatorStats {
    pub total_devices: usize,
    pub accessible_devices: usize,
    pub blocked_devices: usize,
    pub unknown_devices: usize,
    pub error_devices: usize,
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Ipv4Addr;

    #[tokio::test]
    async fn test_coordinator_creation() {
        let config = FirewallDetectionConfig::default();
        let _coordinator = FirewallDetectionCoordinator::new(config);
        // Just verify it doesn't panic
    }

    #[tokio::test]
    async fn test_first_subscription_starts_monitoring() {
        let config = FirewallDetectionConfig::default();
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));
        let status = coordinator.on_first_subscription(device_ip).await;

        // Should return Unknown while monitoring
        assert_eq!(status, FirewallStatus::Unknown);

        // Should have cached status
        let cached_status = coordinator.get_device_status(device_ip).await;
        assert_eq!(cached_status, FirewallStatus::Unknown);
    }

    #[tokio::test]
    async fn test_event_received_marks_accessible() {
        let config = FirewallDetectionConfig::default();
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));

        // Start monitoring
        coordinator.on_first_subscription(device_ip).await;

        // Simulate event received
        coordinator.on_event_received(device_ip).await;

        // Should now be accessible
        let status = coordinator.get_device_status(device_ip).await;
        assert_eq!(status, FirewallStatus::Accessible);
    }

    #[tokio::test]
    async fn test_timeout_marks_blocked() {
        let config = FirewallDetectionConfig {
            event_wait_timeout: Duration::from_millis(100), // Very short timeout for testing
            ..Default::default()
        };
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));

        // Start monitoring
        coordinator.on_first_subscription(device_ip).await;

        // Wait for timeout + monitoring task to run (monitoring runs every 1 second)
        tokio::time::sleep(Duration::from_millis(1200)).await;

        // Should now be blocked
        let status = coordinator.get_device_status(device_ip).await;
        assert_eq!(status, FirewallStatus::Blocked);
    }

    #[tokio::test]
    async fn test_cached_status_reused() {
        let config = FirewallDetectionConfig::default();
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));

        // Start monitoring and mark as accessible
        coordinator.on_first_subscription(device_ip).await;
        coordinator.on_event_received(device_ip).await;

        // Second subscription should return cached status
        let status = coordinator.on_first_subscription(device_ip).await;
        assert_eq!(status, FirewallStatus::Accessible);
    }

    #[tokio::test]
    async fn test_clear_device_cache() {
        let config = FirewallDetectionConfig::default();
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));

        // Create cached entry
        coordinator.on_first_subscription(device_ip).await;
        coordinator.on_event_received(device_ip).await;

        // Verify cached
        assert_eq!(
            coordinator.get_device_status(device_ip).await,
            FirewallStatus::Accessible
        );

        // Clear cache
        coordinator.clear_device_cache(device_ip).await;

        // Should be unknown again
        assert_eq!(
            coordinator.get_device_status(device_ip).await,
            FirewallStatus::Unknown
        );
    }

    #[tokio::test]
    async fn test_stats() {
        let config = FirewallDetectionConfig::default();
        let coordinator = FirewallDetectionCoordinator::new(config);

        let device1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100));
        let device2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101));

        // One accessible, one unknown
        coordinator.on_first_subscription(device1).await;
        coordinator.on_event_received(device1).await;
        coordinator.on_first_subscription(device2).await;

        let stats = coordinator.get_stats().await;
        assert_eq!(stats.total_devices, 2);
        assert_eq!(stats.accessible_devices, 1);
        assert_eq!(stats.unknown_devices, 1);
        assert_eq!(stats.blocked_devices, 0);
        assert_eq!(stats.error_devices, 0);
    }
}