Skip to main content

sonos_api/events/
processor.rs

1//! Generic event processor for handling UPnP events across all services
2//!
3//! This module provides a service-agnostic event processor that can handle
4//! events from any Sonos UPnP service using direct self-parsing methods.
5
6use super::types::{EnrichedEvent, EventSource};
7use crate::{Result, Service};
8use std::net::IpAddr;
9
10/// Generic event processor that can handle events from any service
11pub struct EventProcessor;
12
13impl Default for EventProcessor {
14    fn default() -> Self {
15        Self::new()
16    }
17}
18
19impl EventProcessor {
20    /// Create a new event processor
21    pub fn new() -> Self {
22        Self
23    }
24
25    /// Create a new event processor (alias for compatibility)
26    pub fn with_default_parsers() -> Self {
27        Self::new()
28    }
29
30    /// Process a UPnP event notification using direct event type parsing
31    ///
32    /// This method calls EventType::from_xml() directly based on the service type.
33    pub fn process_upnp_event(
34        &self,
35        speaker_ip: IpAddr,
36        service: Service,
37        subscription_id: String,
38        event_xml: &str,
39    ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
40        let event_data = self.parse_event_for_service(&service, event_xml)?;
41        let event_source = EventSource::UPnPNotification { subscription_id };
42
43        Ok(EnrichedEvent::new(
44            speaker_ip,
45            service,
46            event_source,
47            event_data,
48        ))
49    }
50
51    /// Process a polling-detected event using direct event type parsing
52    pub fn process_polling_event(
53        &self,
54        speaker_ip: IpAddr,
55        service: Service,
56        poll_interval: std::time::Duration,
57        event_xml: &str,
58    ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
59        let event_data = self.parse_event_for_service(&service, event_xml)?;
60        let event_source = EventSource::PollingDetection { poll_interval };
61
62        Ok(EnrichedEvent::new(
63            speaker_ip,
64            service,
65            event_source,
66            event_data,
67        ))
68    }
69
70    /// Process a resync event using direct event type parsing
71    pub fn process_resync_event(
72        &self,
73        speaker_ip: IpAddr,
74        service: Service,
75        event_xml: &str,
76    ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
77        let event_data = self.parse_event_for_service(&service, event_xml)?;
78        let event_source = EventSource::ResyncOperation;
79
80        Ok(EnrichedEvent::new(
81            speaker_ip,
82            service,
83            event_source,
84            event_data,
85        ))
86    }
87
88    /// Parse event XML for the given service using direct EventType::from_xml() calls
89    fn parse_event_for_service(
90        &self,
91        service: &Service,
92        event_xml: &str,
93    ) -> Result<Box<dyn std::any::Any + Send + Sync>> {
94        match service {
95            Service::AVTransport => {
96                let event = crate::services::av_transport::AVTransportEvent::from_xml(event_xml)?;
97                Ok(Box::new(event))
98            }
99            Service::RenderingControl => {
100                let event =
101                    crate::services::rendering_control::RenderingControlEvent::from_xml(event_xml)?;
102                Ok(Box::new(event))
103            }
104            Service::GroupRenderingControl => {
105                let event =
106                    crate::services::group_rendering_control::GroupRenderingControlEvent::from_xml(
107                        event_xml,
108                    )?;
109                Ok(Box::new(event))
110            }
111            Service::ZoneGroupTopology => {
112                let event = crate::services::zone_group_topology::ZoneGroupTopologyEvent::from_xml(
113                    event_xml,
114                )?;
115                Ok(Box::new(event))
116            }
117            Service::GroupManagement => {
118                let event =
119                    crate::services::group_management::GroupManagementEvent::from_xml(event_xml)?;
120                Ok(Box::new(event))
121            }
122        }
123    }
124
125    /// Check if a service is supported by this processor
126    pub fn supports_service(&self, service: &Service) -> bool {
127        matches!(
128            service,
129            Service::AVTransport
130                | Service::RenderingControl
131                | Service::GroupRenderingControl
132                | Service::ZoneGroupTopology
133                | Service::GroupManagement
134        )
135    }
136
137    /// Get all supported services
138    pub fn supported_services(&self) -> Vec<Service> {
139        vec![
140            Service::AVTransport,
141            Service::RenderingControl,
142            Service::GroupRenderingControl,
143            Service::ZoneGroupTopology,
144            Service::GroupManagement,
145        ]
146    }
147}
148
149/// Event processing statistics
150#[derive(Debug, Clone, Default)]
151pub struct EventProcessorStats {
152    /// Total events processed successfully
153    pub events_processed: u64,
154
155    /// UPnP events processed
156    pub upnp_events: u64,
157
158    /// Polling events processed
159    pub polling_events: u64,
160
161    /// Resync events processed
162    pub resync_events: u64,
163
164    /// Processing errors encountered
165    pub processing_errors: u64,
166
167    /// Events for unsupported services
168    pub unsupported_services: u64,
169}
170
171impl EventProcessorStats {
172    /// Get total events received (all sources)
173    pub fn total_events(&self) -> u64 {
174        self.upnp_events + self.polling_events + self.resync_events
175    }
176
177    /// Get processing success rate
178    pub fn success_rate(&self) -> f64 {
179        let total = self.total_events();
180        if total == 0 {
181            1.0
182        } else {
183            self.events_processed as f64 / total as f64
184        }
185    }
186}
187
188impl std::fmt::Display for EventProcessorStats {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        writeln!(f, "Event Processor Stats:")?;
191        writeln!(f, "  Total processed: {}", self.events_processed)?;
192        writeln!(f, "  Success rate: {:.1}%", self.success_rate() * 100.0)?;
193        writeln!(f, "  Event sources:")?;
194        writeln!(f, "    UPnP events: {}", self.upnp_events)?;
195        writeln!(f, "    Polling events: {}", self.polling_events)?;
196        writeln!(f, "    Resync events: {}", self.resync_events)?;
197        writeln!(f, "  Errors:")?;
198        writeln!(f, "    Processing errors: {}", self.processing_errors)?;
199        writeln!(f, "    Unsupported services: {}", self.unsupported_services)?;
200        Ok(())
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn test_event_processor_creation() {
210        let processor = EventProcessor::new();
211
212        // Should support all implemented services
213        assert_eq!(processor.supported_services().len(), 5); // AVTransport, RenderingControl, GroupRenderingControl, ZoneGroupTopology, GroupManagement
214    }
215
216    #[test]
217    fn test_event_processor_with_default_parsers() {
218        let processor = EventProcessor::with_default_parsers();
219
220        // Should be created without error
221        // Should have parsers for all available services
222        assert_eq!(processor.supported_services().len(), 5); // AVTransport, RenderingControl, GroupRenderingControl, ZoneGroupTopology, GroupManagement
223        assert!(processor.supports_service(&Service::AVTransport));
224        assert!(processor.supports_service(&Service::RenderingControl));
225        assert!(processor.supports_service(&Service::GroupRenderingControl));
226        assert!(processor.supports_service(&Service::ZoneGroupTopology));
227        assert!(processor.supports_service(&Service::GroupManagement));
228    }
229
230    #[test]
231    fn test_supported_services() {
232        let processor = EventProcessor::new();
233
234        // Should support all event-enabled services
235        assert!(processor.supports_service(&Service::AVTransport));
236        assert!(processor.supports_service(&Service::RenderingControl));
237        assert!(processor.supports_service(&Service::GroupRenderingControl));
238        assert!(processor.supports_service(&Service::ZoneGroupTopology));
239        assert!(processor.supports_service(&Service::GroupManagement));
240    }
241
242    #[test]
243    fn test_event_parsing_functionality() {
244        let processor = EventProcessor::new();
245
246        // Test AVTransport parsing
247        let av_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
248            <e:property>
249                <LastChange>&lt;Event xmlns="urn:schemas-upnp-org:metadata-1-0/AVT/"&gt;
250                    &lt;InstanceID val="0"&gt;
251                        &lt;TransportState val="PLAYING"/&gt;
252                    &lt;/InstanceID&gt;
253                &lt;/Event&gt;</LastChange>
254            </e:property>
255        </e:propertyset>"#;
256
257        let result = processor.process_upnp_event(
258            "192.168.1.100".parse().unwrap(),
259            Service::AVTransport,
260            "uuid:123".to_string(),
261            av_xml,
262        );
263
264        assert!(result.is_ok());
265
266        // Test RenderingControl parsing
267        let rc_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
268            <e:property>
269                <LastChange>&lt;Event xmlns="urn:schemas-upnp-org:metadata-1-0/RCS/"&gt;
270                    &lt;InstanceID val="0"&gt;
271                        &lt;Volume val="50"/&gt;
272                    &lt;/InstanceID&gt;
273                &lt;/Event&gt;</LastChange>
274            </e:property>
275        </e:propertyset>"#;
276
277        let result = processor.process_upnp_event(
278            "192.168.1.100".parse().unwrap(),
279            Service::RenderingControl,
280            "uuid:456".to_string(),
281            rc_xml,
282        );
283
284        assert!(result.is_ok());
285
286        // Test GroupRenderingControl parsing (direct properties, NOT LastChange)
287        let grc_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0"><e:property><GroupVolume>14</GroupVolume></e:property><e:property><GroupMute>0</GroupMute></e:property><e:property><GroupVolumeChangeable>1</GroupVolumeChangeable></e:property></e:propertyset>"#;
288
289        let result = processor.process_upnp_event(
290            "192.168.1.100".parse().unwrap(),
291            Service::GroupRenderingControl,
292            "uuid:789".to_string(),
293            grc_xml,
294        );
295
296        assert!(result.is_ok());
297        let enriched = result.unwrap();
298        let grc_event = enriched
299            .event_data
300            .downcast::<crate::services::group_rendering_control::GroupRenderingControlEvent>()
301            .expect("Should downcast to GroupRenderingControlEvent");
302        assert_eq!(grc_event.group_volume(), Some(14));
303        assert_eq!(grc_event.group_mute(), Some(false));
304        assert_eq!(grc_event.group_volume_changeable(), Some(true));
305    }
306
307    #[test]
308    fn test_event_processor_stats() {
309        let stats = EventProcessorStats::default();
310        assert_eq!(stats.total_events(), 0);
311        assert_eq!(stats.success_rate(), 1.0);
312
313        let stats = EventProcessorStats {
314            events_processed: 8,
315            upnp_events: 5,
316            polling_events: 3,
317            resync_events: 2,
318            processing_errors: 2,
319            unsupported_services: 0,
320        };
321
322        assert_eq!(stats.total_events(), 10);
323        assert_eq!(stats.success_rate(), 0.8);
324    }
325}