Skip to main content

sonos_stream/events/
processor.rs

1//! Simplified event processor that delegates to sonos-api event framework
2//!
3//! This processor replaces the old service-specific processing logic with
4//! a simple delegation to the sonos-api EventProcessor.
5
6use std::sync::Arc;
7use tokio::sync::{mpsc, RwLock};
8use tracing::{debug, error, info, trace, warn};
9
10use callback_server::{
11    router::{EventRouter, NotificationPayload},
12    FirewallDetectionCoordinator,
13};
14use sonos_api::events::EventProcessor as ApiEventProcessor;
15
16use crate::error::{EventProcessingError, EventProcessingResult};
17use crate::events::types::{EnrichedEvent, EventData, EventSource};
18use crate::subscription::manager::SubscriptionManager;
19
20/// Simplified event processor that delegates to sonos-api event framework
21pub struct EventProcessor {
22    /// The sonos-api event processor that handles service-specific parsing
23    api_processor: ApiEventProcessor,
24
25    /// Subscription manager for looking up subscriptions by SID
26    subscription_manager: Arc<SubscriptionManager>,
27
28    /// Sender for enriched events (maintains compatibility with existing code)
29    event_sender: mpsc::UnboundedSender<EnrichedEvent>,
30
31    /// Statistics tracking
32    stats: Arc<RwLock<EventProcessorStats>>,
33
34    /// Firewall detection coordinator for event arrival notifications
35    firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
36}
37
38impl EventProcessor {
39    /// Create a new event processor
40    pub fn new(
41        subscription_manager: Arc<SubscriptionManager>,
42        event_sender: mpsc::UnboundedSender<EnrichedEvent>,
43        firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
44    ) -> Self {
45        Self {
46            api_processor: ApiEventProcessor::with_default_parsers(),
47            subscription_manager,
48            event_sender,
49            stats: Arc::new(RwLock::new(EventProcessorStats::new())),
50            firewall_coordinator,
51        }
52    }
53
54    /// Process a UPnP notification payload from the callback server
55    pub async fn process_upnp_notification(
56        &self,
57        payload: NotificationPayload,
58    ) -> EventProcessingResult<()> {
59        // Update stats
60        {
61            let mut stats = self.stats.write().await;
62            stats.upnp_events_received += 1;
63        }
64
65        // Look up subscription by SID
66        let subscription_wrapper = self
67            .subscription_manager
68            .get_subscription_by_sid(&payload.subscription_id)
69            .await
70            .ok_or_else(|| {
71                EventProcessingError::Enrichment(format!(
72                    "No subscription found for SID: {}",
73                    payload.subscription_id
74                ))
75            })?;
76
77        // Get speaker/service pair from subscription
78        let pair = subscription_wrapper.speaker_service_pair();
79        let registration_id = subscription_wrapper.registration_id();
80
81        // Record that we received an event for this subscription
82        subscription_wrapper.record_event_received().await;
83        self.subscription_manager
84            .record_event_received(&payload.subscription_id)
85            .await;
86
87        // Notify firewall coordinator that an event was received
88        if let Some(coordinator) = &self.firewall_coordinator {
89            coordinator.on_event_received(pair.speaker_ip).await;
90        }
91
92        // Parse the event using sonos-api event processor
93        let api_enriched_event = self
94            .api_processor
95            .process_upnp_event(
96                pair.speaker_ip, // speaker_ip is already an IpAddr
97                pair.service,
98                payload.subscription_id.clone(),
99                &payload.event_xml,
100            )
101            .map_err(|e| EventProcessingError::Parsing(format!("API processing failed: {e}")))?;
102
103        // Convert from sonos-api enriched event to sonos-stream compatible format
104        let event_data =
105            self.convert_api_event_data(&pair.service, api_enriched_event.event_data)?;
106
107        // Create enriched event compatible with existing sonos-stream code
108        let enriched_event = EnrichedEvent::new(
109            registration_id,
110            pair.speaker_ip,
111            pair.service,
112            EventSource::UPnPNotification {
113                subscription_id: payload.subscription_id,
114            },
115            event_data,
116        );
117
118        // Send enriched event
119        debug!(
120            speaker_ip = %enriched_event.speaker_ip,
121            service = ?enriched_event.service,
122            event_source = ?enriched_event.event_source,
123            "Routing event to EventIterator channel"
124        );
125        self.event_sender
126            .send(enriched_event)
127            .map_err(|_| EventProcessingError::ChannelClosed)?;
128
129        // Update success stats
130        {
131            let mut stats = self.stats.write().await;
132            stats.events_processed += 1;
133        }
134
135        Ok(())
136    }
137
138    /// Process a synthetic event from polling (already enriched)
139    pub async fn process_polling_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
140        // Update stats
141        {
142            let mut stats = self.stats.write().await;
143            stats.polling_events_received += 1;
144        }
145
146        // Send the event (it's already enriched)
147        debug!(
148            speaker_ip = %event.speaker_ip,
149            service = ?event.service,
150            event_source = ?event.event_source,
151            "Routing polling event to EventIterator channel"
152        );
153        self.event_sender
154            .send(event)
155            .map_err(|_| EventProcessingError::ChannelClosed)?;
156
157        // Update success stats
158        {
159            let mut stats = self.stats.write().await;
160            stats.events_processed += 1;
161        }
162
163        Ok(())
164    }
165
166    /// Process a resync event (already enriched)
167    pub async fn process_resync_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
168        // Update stats
169        {
170            let mut stats = self.stats.write().await;
171            stats.resync_events_received += 1;
172        }
173
174        // Send the event (it's already enriched)
175        debug!(
176            speaker_ip = %event.speaker_ip,
177            service = ?event.service,
178            event_source = ?event.event_source,
179            "Routing resync event to EventIterator channel"
180        );
181        self.event_sender
182            .send(event)
183            .map_err(|_| EventProcessingError::ChannelClosed)?;
184
185        // Update success stats
186        {
187            let mut stats = self.stats.write().await;
188            stats.events_processed += 1;
189        }
190
191        Ok(())
192    }
193
194    /// Convert from sonos-api event data to sonos-stream compatible EventData.
195    ///
196    /// Each match arm downcasts the type-erased event and calls `into_state()`
197    /// to produce the canonical State type used by EventData.
198    fn convert_api_event_data(
199        &self,
200        service: &sonos_api::Service,
201        api_event_data: Box<dyn std::any::Any + Send + Sync>,
202    ) -> EventProcessingResult<EventData> {
203        match service {
204            sonos_api::Service::AVTransport => {
205                let event = api_event_data
206                    .downcast::<sonos_api::services::av_transport::AVTransportEvent>()
207                    .map_err(|_| {
208                        EventProcessingError::Parsing(
209                            "Failed to downcast AVTransport event".to_string(),
210                        )
211                    })?;
212                Ok(EventData::AVTransport(event.into_state()))
213            }
214            sonos_api::Service::RenderingControl => {
215                let event = api_event_data
216                    .downcast::<sonos_api::services::rendering_control::RenderingControlEvent>()
217                    .map_err(|_| {
218                        EventProcessingError::Parsing(
219                            "Failed to downcast RenderingControl event".to_string(),
220                        )
221                    })?;
222                Ok(EventData::RenderingControl(event.into_state()))
223            }
224            sonos_api::Service::GroupRenderingControl => {
225                let event = api_event_data
226                    .downcast::<sonos_api::services::group_rendering_control::GroupRenderingControlEvent>()
227                    .map_err(|_| EventProcessingError::Parsing("Failed to downcast GroupRenderingControl event".to_string()))?;
228                Ok(EventData::GroupRenderingControl(event.into_state()))
229            }
230            sonos_api::Service::ZoneGroupTopology => {
231                let event = api_event_data
232                    .downcast::<sonos_api::services::zone_group_topology::ZoneGroupTopologyEvent>()
233                    .map_err(|_| {
234                        EventProcessingError::Parsing(
235                            "Failed to downcast ZoneGroupTopology event".to_string(),
236                        )
237                    })?;
238                Ok(EventData::ZoneGroupTopology(event.into_state()))
239            }
240            sonos_api::Service::GroupManagement => {
241                let event = api_event_data
242                    .downcast::<sonos_api::services::group_management::GroupManagementEvent>()
243                    .map_err(|_| {
244                        EventProcessingError::Parsing(
245                            "Failed to downcast GroupManagement event".to_string(),
246                        )
247                    })?;
248                Ok(EventData::GroupManagement(event.into_state()))
249            }
250        }
251    }
252
253    /// Start processing UPnP events from the callback server
254    pub async fn start_upnp_processing(
255        &self,
256        mut upnp_receiver: mpsc::UnboundedReceiver<NotificationPayload>,
257    ) {
258        info!("Starting UPnP event processing using sonos-api framework");
259
260        let mut event_count = 0;
261        loop {
262            tokio::select! {
263                maybe_payload = upnp_receiver.recv() => {
264                    match maybe_payload {
265                        Some(payload) => {
266                            event_count += 1;
267                            debug!(
268                                event_count,
269                                subscription_id = %payload.subscription_id,
270                                "Processing UPnP event"
271                            );
272
273                            match self.process_upnp_notification(payload).await {
274                                Ok(()) => {
275                                    trace!(event_count, "UPnP event processed successfully");
276                                }
277                                Err(e) => {
278                                    error!(
279                                        event_count,
280                                        error = %e,
281                                        "Failed to process UPnP event"
282                                    );
283                                    let mut stats = self.stats.write().await;
284                                    stats.processing_errors += 1;
285                                }
286                            }
287                        }
288                        None => {
289                            warn!("UPnP receiver channel closed");
290                            break;
291                        }
292                    }
293                }
294                _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
295                    trace!(
296                        events_processed = event_count,
297                        "UPnP processor waiting for events"
298                    );
299                }
300            }
301        }
302
303        info!("UPnP event processing stopped");
304    }
305
306    /// Start processing polling events
307    pub async fn start_polling_processing(
308        &self,
309        mut polling_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
310    ) {
311        info!("Starting polling event processing");
312
313        while let Some(event) = polling_receiver.recv().await {
314            match self.process_polling_event(event).await {
315                Ok(()) => {
316                    // Event processed successfully
317                }
318                Err(e) => {
319                    error!(
320                        error = %e,
321                        "Failed to process polling event"
322                    );
323                    let mut stats = self.stats.write().await;
324                    stats.processing_errors += 1;
325                }
326            }
327        }
328
329        info!("Polling event processing stopped");
330    }
331
332    /// Start processing resync events
333    pub async fn start_resync_processing(
334        &self,
335        mut resync_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
336    ) {
337        info!("Starting resync event processing");
338
339        while let Some(event) = resync_receiver.recv().await {
340            match self.process_resync_event(event).await {
341                Ok(()) => {
342                    // Event processed successfully
343                }
344                Err(e) => {
345                    error!(
346                        error = %e,
347                        "Failed to process resync event"
348                    );
349                    let mut stats = self.stats.write().await;
350                    stats.processing_errors += 1;
351                }
352            }
353        }
354
355        info!("Resync event processing stopped");
356    }
357
358    /// Get event processor statistics
359    pub async fn stats(&self) -> EventProcessorStats {
360        let stats = self.stats.read().await;
361        stats.clone()
362    }
363
364    /// Get list of supported service types
365    pub fn supported_services(&self) -> Vec<sonos_api::Service> {
366        self.api_processor.supported_services()
367    }
368
369    /// Check if a service type is supported
370    pub fn is_service_supported(&self, service: &sonos_api::Service) -> bool {
371        self.api_processor.supports_service(service)
372    }
373}
374
375/// Statistics about event processing (maintained for compatibility)
376#[derive(Debug, Clone)]
377pub struct EventProcessorStats {
378    /// Total events processed successfully
379    pub events_processed: u64,
380
381    /// UPnP events received from callback server
382    pub upnp_events_received: u64,
383
384    /// Polling events received
385    pub polling_events_received: u64,
386
387    /// Resync events received
388    pub resync_events_received: u64,
389
390    /// Processing errors encountered
391    pub processing_errors: u64,
392
393    /// Events for unsupported services
394    pub unsupported_services: u64,
395}
396
397impl EventProcessorStats {
398    fn new() -> Self {
399        Self {
400            events_processed: 0,
401            upnp_events_received: 0,
402            polling_events_received: 0,
403            resync_events_received: 0,
404            processing_errors: 0,
405            unsupported_services: 0,
406        }
407    }
408
409    /// Get total events received (all sources)
410    pub fn total_events_received(&self) -> u64 {
411        self.upnp_events_received + self.polling_events_received + self.resync_events_received
412    }
413
414    /// Get processing success rate
415    pub fn success_rate(&self) -> f64 {
416        let total = self.total_events_received();
417        if total == 0 {
418            1.0
419        } else {
420            self.events_processed as f64 / total as f64
421        }
422    }
423}
424
425impl std::fmt::Display for EventProcessorStats {
426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427        writeln!(f, "Event Processor Stats:")?;
428        writeln!(f, "  Total processed: {}", self.events_processed)?;
429        writeln!(f, "  Success rate: {:.1}%", self.success_rate() * 100.0)?;
430        writeln!(f, "  Event sources:")?;
431        writeln!(f, "    UPnP events: {}", self.upnp_events_received)?;
432        writeln!(f, "    Polling events: {}", self.polling_events_received)?;
433        writeln!(f, "    Resync events: {}", self.resync_events_received)?;
434        writeln!(f, "  Errors:")?;
435        writeln!(f, "    Processing errors: {}", self.processing_errors)?;
436        writeln!(f, "    Unsupported services: {}", self.unsupported_services)?;
437        Ok(())
438    }
439}
440
441/// Helper function to create an EventRouter integrated with EventProcessor
442pub async fn create_integrated_event_router(
443    _event_processor: Arc<EventProcessor>,
444) -> (
445    Arc<EventRouter>,
446    mpsc::UnboundedReceiver<NotificationPayload>,
447) {
448    let (upnp_sender, upnp_receiver) = mpsc::unbounded_channel();
449    let router = Arc::new(EventRouter::new(upnp_sender));
450
451    (router, upnp_receiver)
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_event_processor_creation() {
460        let (event_sender, _event_receiver) = mpsc::unbounded_channel();
461        let subscription_manager =
462            Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
463
464        let processor = EventProcessor::new(subscription_manager, event_sender, None);
465
466        // Should have the supported services from sonos-api
467        assert_eq!(processor.supported_services().len(), 5); // AVTransport, RenderingControl, GroupRenderingControl, ZoneGroupTopology, GroupManagement
468        assert!(processor.is_service_supported(&sonos_api::Service::AVTransport));
469        assert!(processor.is_service_supported(&sonos_api::Service::RenderingControl));
470        assert!(processor.is_service_supported(&sonos_api::Service::GroupRenderingControl));
471        assert!(processor.is_service_supported(&sonos_api::Service::ZoneGroupTopology));
472        assert!(processor.is_service_supported(&sonos_api::Service::GroupManagement));
473    }
474
475    #[tokio::test]
476    async fn test_event_processor_stats() {
477        let (event_sender, _event_receiver) = mpsc::unbounded_channel();
478        let subscription_manager =
479            Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
480
481        let processor = EventProcessor::new(subscription_manager, event_sender, None);
482
483        let stats = processor.stats().await;
484        assert_eq!(stats.events_processed, 0);
485        assert_eq!(stats.total_events_received(), 0);
486        assert_eq!(stats.success_rate(), 1.0);
487    }
488}