1use super::types::{EnrichedEvent, EventSource};
7use crate::{Result, Service};
8use std::net::IpAddr;
9
10pub struct EventProcessor;
12
13impl Default for EventProcessor {
14 fn default() -> Self {
15 Self::new()
16 }
17}
18
19impl EventProcessor {
20 pub fn new() -> Self {
22 Self
23 }
24
25 pub fn with_default_parsers() -> Self {
27 Self::new()
28 }
29
30 pub fn process_upnp_event(
34 &self,
35 speaker_ip: IpAddr,
36 service: Service,
37 subscription_id: String,
38 event_xml: &str,
39 ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
40 let event_data = self.parse_event_for_service(&service, event_xml)?;
41 let event_source = EventSource::UPnPNotification { subscription_id };
42
43 Ok(EnrichedEvent::new(
44 speaker_ip,
45 service,
46 event_source,
47 event_data,
48 ))
49 }
50
51 pub fn process_polling_event(
53 &self,
54 speaker_ip: IpAddr,
55 service: Service,
56 poll_interval: std::time::Duration,
57 event_xml: &str,
58 ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
59 let event_data = self.parse_event_for_service(&service, event_xml)?;
60 let event_source = EventSource::PollingDetection { poll_interval };
61
62 Ok(EnrichedEvent::new(
63 speaker_ip,
64 service,
65 event_source,
66 event_data,
67 ))
68 }
69
70 pub fn process_resync_event(
72 &self,
73 speaker_ip: IpAddr,
74 service: Service,
75 event_xml: &str,
76 ) -> Result<EnrichedEvent<Box<dyn std::any::Any + Send + Sync>>> {
77 let event_data = self.parse_event_for_service(&service, event_xml)?;
78 let event_source = EventSource::ResyncOperation;
79
80 Ok(EnrichedEvent::new(
81 speaker_ip,
82 service,
83 event_source,
84 event_data,
85 ))
86 }
87
88 fn parse_event_for_service(
90 &self,
91 service: &Service,
92 event_xml: &str,
93 ) -> Result<Box<dyn std::any::Any + Send + Sync>> {
94 match service {
95 Service::AVTransport => {
96 let event = crate::services::av_transport::AVTransportEvent::from_xml(event_xml)?;
97 Ok(Box::new(event))
98 }
99 Service::RenderingControl => {
100 let event =
101 crate::services::rendering_control::RenderingControlEvent::from_xml(event_xml)?;
102 Ok(Box::new(event))
103 }
104 Service::GroupRenderingControl => {
105 let event =
106 crate::services::group_rendering_control::GroupRenderingControlEvent::from_xml(
107 event_xml,
108 )?;
109 Ok(Box::new(event))
110 }
111 Service::ZoneGroupTopology => {
112 let event = crate::services::zone_group_topology::ZoneGroupTopologyEvent::from_xml(
113 event_xml,
114 )?;
115 Ok(Box::new(event))
116 }
117 Service::GroupManagement => {
118 let event =
119 crate::services::group_management::GroupManagementEvent::from_xml(event_xml)?;
120 Ok(Box::new(event))
121 }
122 }
123 }
124
125 pub fn supports_service(&self, service: &Service) -> bool {
127 matches!(
128 service,
129 Service::AVTransport
130 | Service::RenderingControl
131 | Service::GroupRenderingControl
132 | Service::ZoneGroupTopology
133 | Service::GroupManagement
134 )
135 }
136
137 pub fn supported_services(&self) -> Vec<Service> {
139 vec![
140 Service::AVTransport,
141 Service::RenderingControl,
142 Service::GroupRenderingControl,
143 Service::ZoneGroupTopology,
144 Service::GroupManagement,
145 ]
146 }
147}
148
149#[derive(Debug, Clone, Default)]
151pub struct EventProcessorStats {
152 pub events_processed: u64,
154
155 pub upnp_events: u64,
157
158 pub polling_events: u64,
160
161 pub resync_events: u64,
163
164 pub processing_errors: u64,
166
167 pub unsupported_services: u64,
169}
170
171impl EventProcessorStats {
172 pub fn total_events(&self) -> u64 {
174 self.upnp_events + self.polling_events + self.resync_events
175 }
176
177 pub fn success_rate(&self) -> f64 {
179 let total = self.total_events();
180 if total == 0 {
181 1.0
182 } else {
183 self.events_processed as f64 / total as f64
184 }
185 }
186}
187
188impl std::fmt::Display for EventProcessorStats {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 writeln!(f, "Event Processor Stats:")?;
191 writeln!(f, " Total processed: {}", self.events_processed)?;
192 writeln!(f, " Success rate: {:.1}%", self.success_rate() * 100.0)?;
193 writeln!(f, " Event sources:")?;
194 writeln!(f, " UPnP events: {}", self.upnp_events)?;
195 writeln!(f, " Polling events: {}", self.polling_events)?;
196 writeln!(f, " Resync events: {}", self.resync_events)?;
197 writeln!(f, " Errors:")?;
198 writeln!(f, " Processing errors: {}", self.processing_errors)?;
199 writeln!(f, " Unsupported services: {}", self.unsupported_services)?;
200 Ok(())
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_event_processor_creation() {
210 let processor = EventProcessor::new();
211
212 assert_eq!(processor.supported_services().len(), 5); }
215
216 #[test]
217 fn test_event_processor_with_default_parsers() {
218 let processor = EventProcessor::with_default_parsers();
219
220 assert_eq!(processor.supported_services().len(), 5); assert!(processor.supports_service(&Service::AVTransport));
224 assert!(processor.supports_service(&Service::RenderingControl));
225 assert!(processor.supports_service(&Service::GroupRenderingControl));
226 assert!(processor.supports_service(&Service::ZoneGroupTopology));
227 assert!(processor.supports_service(&Service::GroupManagement));
228 }
229
230 #[test]
231 fn test_supported_services() {
232 let processor = EventProcessor::new();
233
234 assert!(processor.supports_service(&Service::AVTransport));
236 assert!(processor.supports_service(&Service::RenderingControl));
237 assert!(processor.supports_service(&Service::GroupRenderingControl));
238 assert!(processor.supports_service(&Service::ZoneGroupTopology));
239 assert!(processor.supports_service(&Service::GroupManagement));
240 }
241
242 #[test]
243 fn test_event_parsing_functionality() {
244 let processor = EventProcessor::new();
245
246 let av_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
248 <e:property>
249 <LastChange><Event xmlns="urn:schemas-upnp-org:metadata-1-0/AVT/">
250 <InstanceID val="0">
251 <TransportState val="PLAYING"/>
252 </InstanceID>
253 </Event></LastChange>
254 </e:property>
255 </e:propertyset>"#;
256
257 let result = processor.process_upnp_event(
258 "192.168.1.100".parse().unwrap(),
259 Service::AVTransport,
260 "uuid:123".to_string(),
261 av_xml,
262 );
263
264 assert!(result.is_ok());
265
266 let rc_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
268 <e:property>
269 <LastChange><Event xmlns="urn:schemas-upnp-org:metadata-1-0/RCS/">
270 <InstanceID val="0">
271 <Volume val="50"/>
272 </InstanceID>
273 </Event></LastChange>
274 </e:property>
275 </e:propertyset>"#;
276
277 let result = processor.process_upnp_event(
278 "192.168.1.100".parse().unwrap(),
279 Service::RenderingControl,
280 "uuid:456".to_string(),
281 rc_xml,
282 );
283
284 assert!(result.is_ok());
285
286 let grc_xml = r#"<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0"><e:property><GroupVolume>14</GroupVolume></e:property><e:property><GroupMute>0</GroupMute></e:property><e:property><GroupVolumeChangeable>1</GroupVolumeChangeable></e:property></e:propertyset>"#;
288
289 let result = processor.process_upnp_event(
290 "192.168.1.100".parse().unwrap(),
291 Service::GroupRenderingControl,
292 "uuid:789".to_string(),
293 grc_xml,
294 );
295
296 assert!(result.is_ok());
297 let enriched = result.unwrap();
298 let grc_event = enriched
299 .event_data
300 .downcast::<crate::services::group_rendering_control::GroupRenderingControlEvent>()
301 .expect("Should downcast to GroupRenderingControlEvent");
302 assert_eq!(grc_event.group_volume(), Some(14));
303 assert_eq!(grc_event.group_mute(), Some(false));
304 assert_eq!(grc_event.group_volume_changeable(), Some(true));
305 }
306
307 #[test]
308 fn test_event_processor_stats() {
309 let stats = EventProcessorStats::default();
310 assert_eq!(stats.total_events(), 0);
311 assert_eq!(stats.success_rate(), 1.0);
312
313 let stats = EventProcessorStats {
314 events_processed: 8,
315 upnp_events: 5,
316 polling_events: 3,
317 resync_events: 2,
318 processing_errors: 2,
319 unsupported_services: 0,
320 };
321
322 assert_eq!(stats.total_events(), 10);
323 assert_eq!(stats.success_rate(), 0.8);
324 }
325}