1use std::net::{IpAddr, Ipv4Addr, UdpSocket};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use callback_server::{
14 CallbackServer, FirewallDetectionConfig, FirewallDetectionCoordinator, FirewallStatus,
15};
16use sonos_api::Service;
17
18use crate::config::BrokerConfig;
19use crate::error::{BrokerError, BrokerResult};
20use crate::events::{iterator::EventIterator, processor::EventProcessor, types::EnrichedEvent};
21use crate::polling::scheduler::PollingScheduler;
22use crate::registry::{RegistrationId, SpeakerServicePair, SpeakerServiceRegistry};
23use crate::subscription::{
24 event_detector::{EventDetector, PollingAction, PollingRequest},
25 manager::SubscriptionManager,
26};
27
28#[derive(Debug, Clone)]
30pub struct RegistrationResult {
31 pub registration_id: RegistrationId,
33
34 pub firewall_status: FirewallStatus,
36
37 pub polling_reason: Option<PollingReason>,
39
40 pub was_duplicate: bool,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum PollingReason {
47 FirewallBlocked,
49 EventTimeout,
51 SubscriptionFailed,
53 NetworkIssues,
55 ForcedPolling,
57}
58
59impl std::fmt::Display for PollingReason {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 PollingReason::FirewallBlocked => write!(f, "firewall blocked"),
63 PollingReason::EventTimeout => write!(f, "event timeout"),
64 PollingReason::SubscriptionFailed => write!(f, "subscription failed"),
65 PollingReason::NetworkIssues => write!(f, "network issues"),
66 PollingReason::ForcedPolling => write!(f, "forced polling"),
67 }
68 }
69}
70
71pub struct EventBroker {
73 registry: Arc<SpeakerServiceRegistry>,
75
76 subscription_manager: Arc<SubscriptionManager>,
78
79 event_processor: Arc<EventProcessor>,
81
82 _callback_server: Arc<CallbackServer>,
84
85 firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
87
88 event_detector: Arc<EventDetector>,
90
91 polling_scheduler: Arc<PollingScheduler>,
93
94 _event_sender: mpsc::UnboundedSender<EnrichedEvent>,
96
97 event_receiver: Option<mpsc::UnboundedReceiver<EnrichedEvent>>,
99
100 config: BrokerConfig,
102
103 shutdown_signal: Arc<AtomicBool>,
105
106 background_tasks: Vec<tokio::task::JoinHandle<()>>,
108
109 upnp_receiver: Option<mpsc::UnboundedReceiver<callback_server::router::NotificationPayload>>,
111
112 event_router: Option<Arc<callback_server::router::EventRouter>>,
114
115 polling_request_receiver: Option<mpsc::UnboundedReceiver<PollingRequest>>,
117}
118
119fn get_local_ip() -> Result<Ipv4Addr, std::io::Error> {
121 let socket = UdpSocket::bind("0.0.0.0:0")?;
124 socket.connect("8.8.8.8:53")?; match socket.local_addr()? {
127 std::net::SocketAddr::V4(addr) => Ok(*addr.ip()),
128 std::net::SocketAddr::V6(_) => {
129 Ok(Ipv4Addr::new(127, 0, 0, 1))
131 }
132 }
133}
134
135impl EventBroker {
136 pub async fn new(config: BrokerConfig) -> BrokerResult<Self> {
138 config.validate()?;
140
141 info!(config = ?config, "Initializing EventBroker");
142
143 let (event_sender, event_receiver) = mpsc::unbounded_channel();
145
146 let registry = Arc::new(SpeakerServiceRegistry::new(config.max_registrations));
148
149 let (upnp_sender, upnp_receiver) = mpsc::unbounded_channel();
151
152 let callback_server =
154 Self::create_callback_server_with_routing(&config, upnp_sender).await?;
155
156 let event_router = Arc::clone(callback_server.router());
158
159 let local_ip = get_local_ip().map_err(|e| {
161 BrokerError::Configuration(format!("Failed to determine local IP address: {e}"))
162 })?;
163 let server_url = format!("http://{}:{}", local_ip, callback_server.port());
164
165 let subscription_manager = Arc::new(SubscriptionManager::new(server_url.clone()));
167
168 let firewall_coordinator = if config.enable_proactive_firewall_detection {
170 let coordinator_config = FirewallDetectionConfig {
171 event_wait_timeout: config.firewall_event_wait_timeout,
172 enable_caching: config.enable_firewall_caching,
173 max_cached_devices: config.max_cached_device_states,
174 };
175
176 let coordinator = Arc::new(FirewallDetectionCoordinator::new(coordinator_config));
177
178 info!(
179 timeout = ?config.firewall_event_wait_timeout,
180 "Firewall detection coordinator enabled"
181 );
182
183 Some(coordinator)
184 } else {
185 debug!("Firewall detection disabled");
186 None
187 };
188
189 let event_processor = Arc::new(EventProcessor::new(
191 Arc::clone(&subscription_manager),
192 event_sender.clone(),
193 firewall_coordinator.clone(),
194 ));
195
196 let polling_scheduler = Arc::new(PollingScheduler::new(
198 event_sender.clone(),
199 config.base_polling_interval,
200 config.max_polling_interval,
201 config.adaptive_polling,
202 config.max_concurrent_polls,
203 ));
204
205 let (polling_request_sender, polling_request_receiver) = mpsc::unbounded_channel();
207
208 let mut event_detector =
210 EventDetector::new(config.event_timeout, config.polling_activation_delay);
211 if let Some(ref coordinator) = firewall_coordinator {
212 event_detector.set_firewall_coordinator(Arc::clone(coordinator));
213 }
214 event_detector.set_polling_request_sender(polling_request_sender);
215 let event_detector = Arc::new(event_detector);
216
217 let mut broker = Self {
218 registry,
219 subscription_manager,
220 event_processor,
221 _callback_server: callback_server,
222 firewall_coordinator,
223 event_detector,
224 polling_scheduler,
225 _event_sender: event_sender,
226 event_receiver: Some(event_receiver),
227 config,
228 shutdown_signal: Arc::new(AtomicBool::new(false)),
229 background_tasks: Vec::new(),
230 upnp_receiver: Some(upnp_receiver),
231 event_router: Some(event_router),
232 polling_request_receiver: Some(polling_request_receiver),
233 };
234
235 broker.start_background_processing().await?;
237
238 info!("EventBroker initialized successfully");
239
240 Ok(broker)
241 }
242
243 async fn create_callback_server_with_routing(
245 config: &BrokerConfig,
246 event_sender: mpsc::UnboundedSender<callback_server::router::NotificationPayload>,
247 ) -> BrokerResult<Arc<CallbackServer>> {
248 let server = CallbackServer::new(config.callback_port_range, event_sender)
249 .await
250 .map_err(|e| BrokerError::CallbackServer(e.to_string()))?;
251
252 Ok(Arc::new(server))
253 }
254
255 async fn is_first_subscription_for_device(&self, device_ip: IpAddr) -> bool {
258 let registered_pairs = self.registry.list_registrations().await;
260
261 let existing_count = registered_pairs
263 .iter()
264 .filter(|(_, pair)| pair.speaker_ip == device_ip)
265 .count();
266
267 existing_count <= 1
270 }
271
272 async fn start_background_processing(&mut self) -> BrokerResult<()> {
274 debug!("Starting background processing tasks");
275
276 if let Some(upnp_receiver) = self.upnp_receiver.take() {
278 let upnp_processor = Arc::clone(&self.event_processor);
279 let upnp_task = tokio::spawn(async move {
280 upnp_processor.start_upnp_processing(upnp_receiver).await;
281 });
282 self.background_tasks.push(upnp_task);
283 }
284
285 if let Some(polling_request_receiver) = self.polling_request_receiver.take() {
287 self.start_polling_request_processing(polling_request_receiver)
288 .await;
289 }
290
291 let monitoring_handle = self.event_detector.start_monitoring().await;
293 self.background_tasks.push(monitoring_handle);
294
295 self.start_subscription_renewal_monitoring().await;
297
298 debug!("Background processing tasks started");
299
300 Ok(())
301 }
302
303 async fn start_polling_request_processing(
305 &mut self,
306 mut receiver: mpsc::UnboundedReceiver<PollingRequest>,
307 ) {
308 let polling_scheduler = Arc::clone(&self.polling_scheduler);
309 let subscription_manager = Arc::clone(&self.subscription_manager);
310
311 let task = tokio::spawn(async move {
312 info!("Starting polling request processing");
313
314 while let Some(request) = receiver.recv().await {
315 match request.action {
316 PollingAction::Start => {
317 debug!(
318 speaker_ip = %request.speaker_service_pair.speaker_ip,
319 service = ?request.speaker_service_pair.service,
320 reason = ?request.reason,
321 registration_id = %request.registration_id,
322 "Starting polling for speaker service"
323 );
324
325 if let Err(e) = polling_scheduler
326 .start_polling(
327 request.registration_id,
328 request.speaker_service_pair.clone(),
329 )
330 .await
331 {
332 error!(
333 registration_id = %request.registration_id,
334 speaker_ip = %request.speaker_service_pair.speaker_ip,
335 service = ?request.speaker_service_pair.service,
336 error = %e,
337 "Failed to start polling"
338 );
339 } else {
340 if let Some(subscription) = subscription_manager
342 .get_subscription(request.registration_id)
343 .await
344 {
345 subscription.set_polling_active(true);
346 }
347 }
348 }
349 PollingAction::Stop => {
350 debug!(
351 speaker_ip = %request.speaker_service_pair.speaker_ip,
352 service = ?request.speaker_service_pair.service,
353 registration_id = %request.registration_id,
354 "Stopping polling for speaker service"
355 );
356
357 if let Err(e) = polling_scheduler
358 .stop_polling(request.registration_id)
359 .await
360 {
361 error!(
362 registration_id = %request.registration_id,
363 speaker_ip = %request.speaker_service_pair.speaker_ip,
364 service = ?request.speaker_service_pair.service,
365 error = %e,
366 "Failed to stop polling"
367 );
368 } else {
369 if let Some(subscription) = subscription_manager
371 .get_subscription(request.registration_id)
372 .await
373 {
374 subscription.set_polling_active(false);
375 }
376 }
377 }
378 }
379 }
380
381 info!("Polling request processing stopped");
382 });
383
384 self.background_tasks.push(task);
385 }
386
387 async fn start_subscription_renewal_monitoring(&mut self) {
389 let subscription_manager = Arc::clone(&self.subscription_manager);
390 let renewal_threshold = self.config.renewal_threshold;
391
392 let task = tokio::spawn(async move {
393 info!("Starting subscription renewal monitoring");
394
395 let mut interval = tokio::time::interval(renewal_threshold / 2); loop {
398 interval.tick().await;
399
400 match subscription_manager.check_renewals().await {
401 Ok(renewed_count) => {
402 if renewed_count > 0 {
403 debug!(renewed_count = renewed_count, "Renewed subscriptions");
404 }
405 }
406 Err(e) => {
407 error!(
408 error = %e,
409 "Error during subscription renewal check"
410 );
411 }
412 }
413 }
414 });
415
416 self.background_tasks.push(task);
417 }
418
419 pub async fn register_speaker_service(
421 &self,
422 speaker_ip: IpAddr,
423 service: Service,
424 ) -> BrokerResult<RegistrationResult> {
425 debug!(
426 speaker_ip = %speaker_ip,
427 service = ?service,
428 "Registering speaker service"
429 );
430
431 let registration_id = self.registry.register(speaker_ip, service).await?;
433 let was_duplicate = self.registry.is_registered(speaker_ip, service).await;
434
435 if was_duplicate {
436 debug!(
437 registration_id = %registration_id,
438 "Registration already exists"
439 );
440 }
441
442 let pair = SpeakerServicePair::new(speaker_ip, service);
443
444 let mut polling_reason = None;
445 let firewall_status;
446
447 if self.config.force_polling_mode {
448 debug!(
450 registration_id = %registration_id,
451 speaker_ip = %speaker_ip,
452 service = ?service,
453 "Force polling mode: skipping UPnP subscription"
454 );
455
456 firewall_status = FirewallStatus::Blocked;
457 polling_reason = Some(PollingReason::ForcedPolling);
458
459 if let Err(e) = self
464 .polling_scheduler
465 .start_polling(registration_id, pair.clone())
466 .await
467 {
468 error!(
469 registration_id = %registration_id,
470 error = %e,
471 "Failed to start forced polling"
472 );
473 let _ = self.registry.unregister(registration_id).await;
474 return Err(BrokerError::Polling(e));
475 }
476 } else {
477 let is_first_for_device = self.is_first_subscription_for_device(speaker_ip).await;
481
482 firewall_status = if let Some(coordinator) = &self.firewall_coordinator {
484 if is_first_for_device {
485 debug!(
486 speaker_ip = %speaker_ip,
487 "First subscription for device, triggering firewall detection"
488 );
489 coordinator.on_first_subscription(speaker_ip).await
490 } else {
491 coordinator.get_device_status(speaker_ip).await
492 }
493 } else {
494 FirewallStatus::Unknown
495 };
496
497 let subscription_result = self
499 .subscription_manager
500 .create_subscription(registration_id, pair.clone())
501 .await;
502
503 match subscription_result {
504 Ok(subscription) => {
505 debug!(
506 subscription_id = %subscription.subscription_id(),
507 "Created UPnP subscription"
508 );
509
510 if let Some(router) = &self.event_router {
512 router
513 .register(subscription.subscription_id().to_string())
514 .await;
515 debug!(
516 subscription_id = %subscription.subscription_id(),
517 "Registered subscription with EventRouter"
518 );
519 }
520
521 self.event_detector
523 .register_subscription(registration_id, pair.clone())
524 .await;
525
526 if let Some(request) = self
528 .event_detector
529 .evaluate_firewall_status(registration_id, &pair)
530 .await
531 {
532 polling_reason = Some(request.reason.clone());
533
534 if let Err(e) = self
536 .polling_scheduler
537 .start_polling(registration_id, pair.clone())
538 .await
539 {
540 error!(
541 registration_id = %registration_id,
542 error = %e,
543 "Failed to start immediate polling"
544 );
545 } else {
546 subscription.set_polling_active(true);
547 debug!(
548 registration_id = %registration_id,
549 reason = ?request.reason,
550 "Started immediate polling"
551 );
552 }
553 }
554 }
555 Err(e) => {
556 error!(
557 registration_id = %registration_id,
558 error = %e,
559 "Failed to create subscription, falling back to polling"
560 );
561 polling_reason = Some(PollingReason::SubscriptionFailed);
562
563 if let Err(e) = self
565 .polling_scheduler
566 .start_polling(registration_id, pair.clone())
567 .await
568 {
569 error!(
570 registration_id = %registration_id,
571 error = %e,
572 "Failed to start fallback polling"
573 );
574 let _ = self.registry.unregister(registration_id).await;
576 return Err(BrokerError::Polling(e));
577 } else {
578 debug!(
579 registration_id = %registration_id,
580 "Started fallback polling due to subscription failure"
581 );
582 }
583 }
584 }
585 }
586
587 let result = RegistrationResult {
588 registration_id,
589 firewall_status,
590 polling_reason,
591 was_duplicate,
592 };
593
594 debug!(
595 registration_id = %result.registration_id,
596 firewall_status = ?result.firewall_status,
597 polling_reason = ?result.polling_reason,
598 was_duplicate = result.was_duplicate,
599 "Registration completed"
600 );
601
602 Ok(result)
603 }
604
605 pub async fn unregister_speaker_service(
607 &self,
608 registration_id: RegistrationId,
609 ) -> BrokerResult<SpeakerServicePair> {
610 debug!(registration_id = %registration_id, "Unregistering subscription");
611
612 let pair = self.registry.get_pair(registration_id).await.ok_or({
614 BrokerError::Registry(crate::error::RegistryError::NotFound(registration_id))
615 })?;
616
617 if let Err(e) = self.polling_scheduler.stop_polling(registration_id).await {
619 warn!(
620 registration_id = %registration_id,
621 error = %e,
622 "Failed to stop polling during unregistration"
623 );
624 }
625
626 if let Err(e) = self
628 .subscription_manager
629 .remove_subscription(registration_id)
630 .await
631 {
632 warn!(
633 registration_id = %registration_id,
634 error = %e,
635 "Failed to remove subscription during unregistration"
636 );
637 }
638
639 self.event_detector
641 .unregister_subscription(registration_id)
642 .await;
643
644 let removed_pair = self.registry.unregister(registration_id).await?;
646
647 debug!(
648 speaker_ip = %pair.speaker_ip,
649 service = ?pair.service,
650 registration_id = %registration_id,
651 "Unregistration completed"
652 );
653
654 Ok(removed_pair)
655 }
656
657 pub fn event_iterator(&mut self) -> BrokerResult<EventIterator> {
660 let receiver = self.event_receiver.take().ok_or_else(|| {
661 BrokerError::Configuration("Event iterator already created".to_string())
662 })?;
663
664 let iterator = EventIterator::new(receiver);
665
666 Ok(iterator)
667 }
668
669 pub async fn stats(&self) -> BrokerStats {
671 let registry_stats = self.registry.stats().await;
672 let subscription_stats = self.subscription_manager.stats().await;
673 let polling_stats = self.polling_scheduler.stats().await;
674 let event_processor_stats = self.event_processor.stats().await;
675 let event_detector_stats = self.event_detector.stats().await;
676
677 BrokerStats {
678 registry_stats,
679 subscription_stats,
680 polling_stats,
681 event_processor_stats,
682 event_detector_stats,
683 firewall_status: FirewallStatus::Unknown, background_tasks_count: self.background_tasks.len(),
685 }
686 }
687
688 pub async fn firewall_status(&self) -> FirewallStatus {
690 FirewallStatus::Unknown
693 }
694
695 pub async fn get_device_firewall_status(&self, device_ip: IpAddr) -> FirewallStatus {
697 if let Some(coordinator) = &self.firewall_coordinator {
698 coordinator.get_device_status(device_ip).await
699 } else {
700 FirewallStatus::Unknown
701 }
702 }
703
704 pub async fn trigger_firewall_detection(
706 &self,
707 device_ip: IpAddr,
708 ) -> BrokerResult<FirewallStatus> {
709 if let Some(coordinator) = &self.firewall_coordinator {
710 Ok(coordinator.on_first_subscription(device_ip).await)
713 } else {
714 Err(BrokerError::Configuration(
715 "Firewall detection is disabled".to_string(),
716 ))
717 }
718 }
719
720 pub async fn shutdown(self) -> BrokerResult<()> {
722 info!("Shutting down EventBroker");
723
724 self.shutdown_signal.store(true, Ordering::Relaxed);
726
727 if let Err(e) = self.polling_scheduler.shutdown_all().await {
729 warn!(error = %e, "Error during polling shutdown");
730 }
731
732 if let Err(e) = self.subscription_manager.shutdown().await {
734 warn!(error = %e, "Error during subscription shutdown");
735 }
736
737 for task in self.background_tasks {
739 task.abort();
740 }
741
742 self.registry.clear().await;
744
745 info!("EventBroker shutdown complete");
746
747 Ok(())
748 }
749}
750
751#[derive(Debug)]
753pub struct BrokerStats {
754 pub registry_stats: crate::registry::RegistryStats,
755 pub subscription_stats: crate::subscription::manager::SubscriptionStats,
756 pub polling_stats: crate::polling::scheduler::PollingSchedulerStats,
757 pub event_processor_stats: crate::events::processor::EventProcessorStats,
758 pub event_detector_stats: crate::subscription::event_detector::EventDetectorStats,
759 pub firewall_status: FirewallStatus,
760 pub background_tasks_count: usize,
761}
762
763impl std::fmt::Display for BrokerStats {
764 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
765 writeln!(f, "=== EventBroker Stats ===")?;
766 writeln!(f, "Firewall Status: {:?}", self.firewall_status)?;
767 writeln!(f, "Background Tasks: {}", self.background_tasks_count)?;
768 writeln!(f)?;
769 write!(f, "{}", self.registry_stats)?;
770 writeln!(f)?;
771 write!(f, "{}", self.subscription_stats)?;
772 writeln!(f)?;
773 write!(f, "{}", self.polling_stats)?;
774 writeln!(f)?;
775 write!(f, "{}", self.event_processor_stats)?;
776 writeln!(f)?;
777 write!(f, "{}", self.event_detector_stats)?;
778 Ok(())
779 }
780}
781
782#[cfg(test)]
783mod tests {
784 use super::*;
785
786 #[tokio::test]
787 async fn test_broker_creation() {
788 let config = BrokerConfig::no_firewall_detection();
789 let broker = EventBroker::new(config).await;
790
791 assert!(broker.is_ok() || broker.is_err()); }
795
796 #[test]
797 fn test_registration_result() {
798 let result = RegistrationResult {
799 registration_id: RegistrationId::new(1),
800 firewall_status: FirewallStatus::Accessible,
801 polling_reason: Some(PollingReason::FirewallBlocked),
802 was_duplicate: false,
803 };
804
805 assert_eq!(result.registration_id.as_u64(), 1);
806 assert_eq!(result.firewall_status, FirewallStatus::Accessible);
807 assert_eq!(result.polling_reason, Some(PollingReason::FirewallBlocked));
808 assert!(!result.was_duplicate);
809 }
810
811 #[test]
812 fn test_polling_reason_display() {
813 assert_eq!(
814 PollingReason::FirewallBlocked.to_string(),
815 "firewall blocked"
816 );
817 assert_eq!(PollingReason::EventTimeout.to_string(), "event timeout");
818 assert_eq!(
819 PollingReason::SubscriptionFailed.to_string(),
820 "subscription failed"
821 );
822 assert_eq!(PollingReason::NetworkIssues.to_string(), "network issues");
823 assert_eq!(PollingReason::ForcedPolling.to_string(), "forced polling");
824 }
825}