1use std::sync::Arc;
7use tokio::sync::{mpsc, RwLock};
8use tracing::{debug, error, info, trace, warn};
9
10use callback_server::{
11 router::{EventRouter, NotificationPayload},
12 FirewallDetectionCoordinator,
13};
14use sonos_api::events::EventProcessor as ApiEventProcessor;
15
16use crate::error::{EventProcessingError, EventProcessingResult};
17use crate::events::types::{EnrichedEvent, EventData, EventSource};
18use crate::subscription::manager::SubscriptionManager;
19
20pub struct EventProcessor {
22 api_processor: ApiEventProcessor,
24
25 subscription_manager: Arc<SubscriptionManager>,
27
28 event_sender: mpsc::UnboundedSender<EnrichedEvent>,
30
31 stats: Arc<RwLock<EventProcessorStats>>,
33
34 firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
36}
37
38impl EventProcessor {
39 pub fn new(
41 subscription_manager: Arc<SubscriptionManager>,
42 event_sender: mpsc::UnboundedSender<EnrichedEvent>,
43 firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
44 ) -> Self {
45 Self {
46 api_processor: ApiEventProcessor::with_default_parsers(),
47 subscription_manager,
48 event_sender,
49 stats: Arc::new(RwLock::new(EventProcessorStats::new())),
50 firewall_coordinator,
51 }
52 }
53
54 pub async fn process_upnp_notification(
56 &self,
57 payload: NotificationPayload,
58 ) -> EventProcessingResult<()> {
59 {
61 let mut stats = self.stats.write().await;
62 stats.upnp_events_received += 1;
63 }
64
65 let subscription_wrapper = self
67 .subscription_manager
68 .get_subscription_by_sid(&payload.subscription_id)
69 .await
70 .ok_or_else(|| {
71 EventProcessingError::Enrichment(format!(
72 "No subscription found for SID: {}",
73 payload.subscription_id
74 ))
75 })?;
76
77 let pair = subscription_wrapper.speaker_service_pair();
79 let registration_id = subscription_wrapper.registration_id();
80
81 subscription_wrapper.record_event_received().await;
83 self.subscription_manager
84 .record_event_received(&payload.subscription_id)
85 .await;
86
87 if let Some(coordinator) = &self.firewall_coordinator {
89 coordinator.on_event_received(pair.speaker_ip).await;
90 }
91
92 let api_enriched_event = self
94 .api_processor
95 .process_upnp_event(
96 pair.speaker_ip, pair.service,
98 payload.subscription_id.clone(),
99 &payload.event_xml,
100 )
101 .map_err(|e| EventProcessingError::Parsing(format!("API processing failed: {e}")))?;
102
103 let event_data =
105 self.convert_api_event_data(&pair.service, api_enriched_event.event_data)?;
106
107 let enriched_event = EnrichedEvent::new(
109 registration_id,
110 pair.speaker_ip,
111 pair.service,
112 EventSource::UPnPNotification {
113 subscription_id: payload.subscription_id,
114 },
115 event_data,
116 );
117
118 debug!(
120 speaker_ip = %enriched_event.speaker_ip,
121 service = ?enriched_event.service,
122 event_source = ?enriched_event.event_source,
123 "Routing event to EventIterator channel"
124 );
125 self.event_sender
126 .send(enriched_event)
127 .map_err(|_| EventProcessingError::ChannelClosed)?;
128
129 {
131 let mut stats = self.stats.write().await;
132 stats.events_processed += 1;
133 }
134
135 Ok(())
136 }
137
138 pub async fn process_polling_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
140 {
142 let mut stats = self.stats.write().await;
143 stats.polling_events_received += 1;
144 }
145
146 debug!(
148 speaker_ip = %event.speaker_ip,
149 service = ?event.service,
150 event_source = ?event.event_source,
151 "Routing polling event to EventIterator channel"
152 );
153 self.event_sender
154 .send(event)
155 .map_err(|_| EventProcessingError::ChannelClosed)?;
156
157 {
159 let mut stats = self.stats.write().await;
160 stats.events_processed += 1;
161 }
162
163 Ok(())
164 }
165
166 pub async fn process_resync_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
168 {
170 let mut stats = self.stats.write().await;
171 stats.resync_events_received += 1;
172 }
173
174 debug!(
176 speaker_ip = %event.speaker_ip,
177 service = ?event.service,
178 event_source = ?event.event_source,
179 "Routing resync event to EventIterator channel"
180 );
181 self.event_sender
182 .send(event)
183 .map_err(|_| EventProcessingError::ChannelClosed)?;
184
185 {
187 let mut stats = self.stats.write().await;
188 stats.events_processed += 1;
189 }
190
191 Ok(())
192 }
193
194 fn convert_api_event_data(
199 &self,
200 service: &sonos_api::Service,
201 api_event_data: Box<dyn std::any::Any + Send + Sync>,
202 ) -> EventProcessingResult<EventData> {
203 match service {
204 sonos_api::Service::AVTransport => {
205 let event = api_event_data
206 .downcast::<sonos_api::services::av_transport::AVTransportEvent>()
207 .map_err(|_| {
208 EventProcessingError::Parsing(
209 "Failed to downcast AVTransport event".to_string(),
210 )
211 })?;
212 Ok(EventData::AVTransport(event.into_state()))
213 }
214 sonos_api::Service::RenderingControl => {
215 let event = api_event_data
216 .downcast::<sonos_api::services::rendering_control::RenderingControlEvent>()
217 .map_err(|_| {
218 EventProcessingError::Parsing(
219 "Failed to downcast RenderingControl event".to_string(),
220 )
221 })?;
222 Ok(EventData::RenderingControl(event.into_state()))
223 }
224 sonos_api::Service::GroupRenderingControl => {
225 let event = api_event_data
226 .downcast::<sonos_api::services::group_rendering_control::GroupRenderingControlEvent>()
227 .map_err(|_| EventProcessingError::Parsing("Failed to downcast GroupRenderingControl event".to_string()))?;
228 Ok(EventData::GroupRenderingControl(event.into_state()))
229 }
230 sonos_api::Service::ZoneGroupTopology => {
231 let event = api_event_data
232 .downcast::<sonos_api::services::zone_group_topology::ZoneGroupTopologyEvent>()
233 .map_err(|_| {
234 EventProcessingError::Parsing(
235 "Failed to downcast ZoneGroupTopology event".to_string(),
236 )
237 })?;
238 Ok(EventData::ZoneGroupTopology(event.into_state()))
239 }
240 sonos_api::Service::GroupManagement => {
241 let event = api_event_data
242 .downcast::<sonos_api::services::group_management::GroupManagementEvent>()
243 .map_err(|_| {
244 EventProcessingError::Parsing(
245 "Failed to downcast GroupManagement event".to_string(),
246 )
247 })?;
248 Ok(EventData::GroupManagement(event.into_state()))
249 }
250 }
251 }
252
253 pub async fn start_upnp_processing(
255 &self,
256 mut upnp_receiver: mpsc::UnboundedReceiver<NotificationPayload>,
257 ) {
258 info!("Starting UPnP event processing using sonos-api framework");
259
260 let mut event_count = 0;
261 loop {
262 tokio::select! {
263 maybe_payload = upnp_receiver.recv() => {
264 match maybe_payload {
265 Some(payload) => {
266 event_count += 1;
267 debug!(
268 event_count,
269 subscription_id = %payload.subscription_id,
270 "Processing UPnP event"
271 );
272
273 match self.process_upnp_notification(payload).await {
274 Ok(()) => {
275 trace!(event_count, "UPnP event processed successfully");
276 }
277 Err(e) => {
278 error!(
279 event_count,
280 error = %e,
281 "Failed to process UPnP event"
282 );
283 let mut stats = self.stats.write().await;
284 stats.processing_errors += 1;
285 }
286 }
287 }
288 None => {
289 warn!("UPnP receiver channel closed");
290 break;
291 }
292 }
293 }
294 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
295 trace!(
296 events_processed = event_count,
297 "UPnP processor waiting for events"
298 );
299 }
300 }
301 }
302
303 info!("UPnP event processing stopped");
304 }
305
306 pub async fn start_polling_processing(
308 &self,
309 mut polling_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
310 ) {
311 info!("Starting polling event processing");
312
313 while let Some(event) = polling_receiver.recv().await {
314 match self.process_polling_event(event).await {
315 Ok(()) => {
316 }
318 Err(e) => {
319 error!(
320 error = %e,
321 "Failed to process polling event"
322 );
323 let mut stats = self.stats.write().await;
324 stats.processing_errors += 1;
325 }
326 }
327 }
328
329 info!("Polling event processing stopped");
330 }
331
332 pub async fn start_resync_processing(
334 &self,
335 mut resync_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
336 ) {
337 info!("Starting resync event processing");
338
339 while let Some(event) = resync_receiver.recv().await {
340 match self.process_resync_event(event).await {
341 Ok(()) => {
342 }
344 Err(e) => {
345 error!(
346 error = %e,
347 "Failed to process resync event"
348 );
349 let mut stats = self.stats.write().await;
350 stats.processing_errors += 1;
351 }
352 }
353 }
354
355 info!("Resync event processing stopped");
356 }
357
358 pub async fn stats(&self) -> EventProcessorStats {
360 let stats = self.stats.read().await;
361 stats.clone()
362 }
363
364 pub fn supported_services(&self) -> Vec<sonos_api::Service> {
366 self.api_processor.supported_services()
367 }
368
369 pub fn is_service_supported(&self, service: &sonos_api::Service) -> bool {
371 self.api_processor.supports_service(service)
372 }
373}
374
375#[derive(Debug, Clone)]
377pub struct EventProcessorStats {
378 pub events_processed: u64,
380
381 pub upnp_events_received: u64,
383
384 pub polling_events_received: u64,
386
387 pub resync_events_received: u64,
389
390 pub processing_errors: u64,
392
393 pub unsupported_services: u64,
395}
396
397impl EventProcessorStats {
398 fn new() -> Self {
399 Self {
400 events_processed: 0,
401 upnp_events_received: 0,
402 polling_events_received: 0,
403 resync_events_received: 0,
404 processing_errors: 0,
405 unsupported_services: 0,
406 }
407 }
408
409 pub fn total_events_received(&self) -> u64 {
411 self.upnp_events_received + self.polling_events_received + self.resync_events_received
412 }
413
414 pub fn success_rate(&self) -> f64 {
416 let total = self.total_events_received();
417 if total == 0 {
418 1.0
419 } else {
420 self.events_processed as f64 / total as f64
421 }
422 }
423}
424
425impl std::fmt::Display for EventProcessorStats {
426 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427 writeln!(f, "Event Processor Stats:")?;
428 writeln!(f, " Total processed: {}", self.events_processed)?;
429 writeln!(f, " Success rate: {:.1}%", self.success_rate() * 100.0)?;
430 writeln!(f, " Event sources:")?;
431 writeln!(f, " UPnP events: {}", self.upnp_events_received)?;
432 writeln!(f, " Polling events: {}", self.polling_events_received)?;
433 writeln!(f, " Resync events: {}", self.resync_events_received)?;
434 writeln!(f, " Errors:")?;
435 writeln!(f, " Processing errors: {}", self.processing_errors)?;
436 writeln!(f, " Unsupported services: {}", self.unsupported_services)?;
437 Ok(())
438 }
439}
440
441pub async fn create_integrated_event_router(
443 _event_processor: Arc<EventProcessor>,
444) -> (
445 Arc<EventRouter>,
446 mpsc::UnboundedReceiver<NotificationPayload>,
447) {
448 let (upnp_sender, upnp_receiver) = mpsc::unbounded_channel();
449 let router = Arc::new(EventRouter::new(upnp_sender));
450
451 (router, upnp_receiver)
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_event_processor_creation() {
460 let (event_sender, _event_receiver) = mpsc::unbounded_channel();
461 let subscription_manager =
462 Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
463
464 let processor = EventProcessor::new(subscription_manager, event_sender, None);
465
466 assert_eq!(processor.supported_services().len(), 5); assert!(processor.is_service_supported(&sonos_api::Service::AVTransport));
469 assert!(processor.is_service_supported(&sonos_api::Service::RenderingControl));
470 assert!(processor.is_service_supported(&sonos_api::Service::GroupRenderingControl));
471 assert!(processor.is_service_supported(&sonos_api::Service::ZoneGroupTopology));
472 assert!(processor.is_service_supported(&sonos_api::Service::GroupManagement));
473 }
474
475 #[tokio::test]
476 async fn test_event_processor_stats() {
477 let (event_sender, _event_receiver) = mpsc::unbounded_channel();
478 let subscription_manager =
479 Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
480
481 let processor = EventProcessor::new(subscription_manager, event_sender, None);
482
483 let stats = processor.stats().await;
484 assert_eq!(stats.events_processed, 0);
485 assert_eq!(stats.total_events_received(), 0);
486 assert_eq!(stats.success_rate(), 1.0);
487 }
488}