Skip to main content

sonos_stream/
broker.rs

1//! Main EventBroker implementation
2//!
3//! This is the central component that integrates all other components and provides
4//! the primary user interface for the sonos-stream crate. It coordinates subscription
5//! management, event processing, polling, and firewall detection.
6
7use std::net::{IpAddr, Ipv4Addr, UdpSocket};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use callback_server::{
14    CallbackServer, FirewallDetectionConfig, FirewallDetectionCoordinator, FirewallStatus,
15};
16use sonos_api::Service;
17
18use crate::config::BrokerConfig;
19use crate::error::{BrokerError, BrokerResult};
20use crate::events::{iterator::EventIterator, processor::EventProcessor, types::EnrichedEvent};
21use crate::polling::scheduler::PollingScheduler;
22use crate::registry::{RegistrationId, SpeakerServicePair, SpeakerServiceRegistry};
23use crate::subscription::{
24    event_detector::{EventDetector, PollingAction, PollingRequest},
25    manager::SubscriptionManager,
26};
27
28/// Result type for registration operations with enhanced feedback
29#[derive(Debug, Clone)]
30pub struct RegistrationResult {
31    /// The registration ID (new or existing)
32    pub registration_id: RegistrationId,
33
34    /// Current firewall status
35    pub firewall_status: FirewallStatus,
36
37    /// Reason for polling if polling was activated
38    pub polling_reason: Option<PollingReason>,
39
40    /// Whether this was a new registration or existing duplicate
41    pub was_duplicate: bool,
42}
43
44/// Reason why polling was activated
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum PollingReason {
47    /// Proactively detected firewall blocking
48    FirewallBlocked,
49    /// Events stopped arriving (fallback case)
50    EventTimeout,
51    /// UPnP subscription failed
52    SubscriptionFailed,
53    /// Network connectivity problems
54    NetworkIssues,
55    /// Forced polling mode (config-driven, e.g. firewall simulation)
56    ForcedPolling,
57}
58
59impl std::fmt::Display for PollingReason {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            PollingReason::FirewallBlocked => write!(f, "firewall blocked"),
63            PollingReason::EventTimeout => write!(f, "event timeout"),
64            PollingReason::SubscriptionFailed => write!(f, "subscription failed"),
65            PollingReason::NetworkIssues => write!(f, "network issues"),
66            PollingReason::ForcedPolling => write!(f, "forced polling"),
67        }
68    }
69}
70
71/// Main EventBroker that coordinates all components
72pub struct EventBroker {
73    /// Speaker/service registration registry
74    registry: Arc<SpeakerServiceRegistry>,
75
76    /// Subscription lifecycle manager
77    subscription_manager: Arc<SubscriptionManager>,
78
79    /// Event processor for parsing and enriching events
80    event_processor: Arc<EventProcessor>,
81
82    /// Callback server for receiving UPnP events (kept alive via Arc)
83    _callback_server: Arc<CallbackServer>,
84
85    /// Per-device firewall detection coordinator
86    firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
87
88    /// Event activity detector
89    event_detector: Arc<EventDetector>,
90
91    /// Polling scheduler
92    polling_scheduler: Arc<PollingScheduler>,
93
94    /// Main event stream sender (kept alive for channel)
95    _event_sender: mpsc::UnboundedSender<EnrichedEvent>,
96
97    /// Event receiver for the iterator (taken when creating iterator)
98    event_receiver: Option<mpsc::UnboundedReceiver<EnrichedEvent>>,
99
100    /// Configuration
101    config: BrokerConfig,
102
103    /// Shutdown signal
104    shutdown_signal: Arc<AtomicBool>,
105
106    /// Background task handles
107    background_tasks: Vec<tokio::task::JoinHandle<()>>,
108
109    /// UPnP event receiver for routing events from callback server to event processor
110    upnp_receiver: Option<mpsc::UnboundedReceiver<callback_server::router::NotificationPayload>>,
111
112    /// Event router for registering subscription IDs
113    event_router: Option<Arc<callback_server::router::EventRouter>>,
114
115    /// Polling request channel receiver (taken during background processing startup)
116    polling_request_receiver: Option<mpsc::UnboundedReceiver<PollingRequest>>,
117}
118
119/// Get the local IP address that can be reached by devices on the network
120fn get_local_ip() -> Result<Ipv4Addr, std::io::Error> {
121    // Create a UDP socket and connect to a remote address to determine the local interface
122    // This doesn't actually send data, just determines which interface would be used
123    let socket = UdpSocket::bind("0.0.0.0:0")?;
124    socket.connect("8.8.8.8:53")?; // Connect to Google DNS
125
126    match socket.local_addr()? {
127        std::net::SocketAddr::V4(addr) => Ok(*addr.ip()),
128        std::net::SocketAddr::V6(_) => {
129            // Fallback to IPv4 localhost if we got IPv6
130            Ok(Ipv4Addr::new(127, 0, 0, 1))
131        }
132    }
133}
134
135impl EventBroker {
136    /// Create a new EventBroker with the specified configuration
137    pub async fn new(config: BrokerConfig) -> BrokerResult<Self> {
138        // Validate configuration
139        config.validate()?;
140
141        info!(config = ?config, "Initializing EventBroker");
142
143        // Create main event channel
144        let (event_sender, event_receiver) = mpsc::unbounded_channel();
145
146        // Initialize registry
147        let registry = Arc::new(SpeakerServiceRegistry::new(config.max_registrations));
148
149        // Create channel for UPnP events from callback server to event processor
150        let (upnp_sender, upnp_receiver) = mpsc::unbounded_channel();
151
152        // Initialize callback server which creates its own internal EventRouter
153        let callback_server =
154            Self::create_callback_server_with_routing(&config, upnp_sender).await?;
155
156        // Get the event router from the callback server for subscription registration
157        let event_router = Arc::clone(callback_server.router());
158
159        // Get the actual network IP address so Sonos devices can reach the callback server
160        let local_ip = get_local_ip().map_err(|e| {
161            BrokerError::Configuration(format!("Failed to determine local IP address: {e}"))
162        })?;
163        let server_url = format!("http://{}:{}", local_ip, callback_server.port());
164
165        // Initialize subscription manager with correct callback URL
166        let subscription_manager = Arc::new(SubscriptionManager::new(server_url.clone()));
167
168        // Initialize firewall detection coordinator if enabled
169        let firewall_coordinator = if config.enable_proactive_firewall_detection {
170            let coordinator_config = FirewallDetectionConfig {
171                event_wait_timeout: config.firewall_event_wait_timeout,
172                enable_caching: config.enable_firewall_caching,
173                max_cached_devices: config.max_cached_device_states,
174            };
175
176            let coordinator = Arc::new(FirewallDetectionCoordinator::new(coordinator_config));
177
178            info!(
179                timeout = ?config.firewall_event_wait_timeout,
180                "Firewall detection coordinator enabled"
181            );
182
183            Some(coordinator)
184        } else {
185            debug!("Firewall detection disabled");
186            None
187        };
188
189        // Initialize event processor with the correct subscription manager and firewall coordinator
190        let event_processor = Arc::new(EventProcessor::new(
191            Arc::clone(&subscription_manager),
192            event_sender.clone(),
193            firewall_coordinator.clone(),
194        ));
195
196        // Initialize polling scheduler
197        let polling_scheduler = Arc::new(PollingScheduler::new(
198            event_sender.clone(),
199            config.base_polling_interval,
200            config.max_polling_interval,
201            config.adaptive_polling,
202            config.max_concurrent_polls,
203        ));
204
205        // Create polling request channel (sender kept alive for EventDetector)
206        let (polling_request_sender, polling_request_receiver) = mpsc::unbounded_channel();
207
208        // Initialize event detector and connect to firewall coordinator + polling channel
209        let mut event_detector =
210            EventDetector::new(config.event_timeout, config.polling_activation_delay);
211        if let Some(ref coordinator) = firewall_coordinator {
212            event_detector.set_firewall_coordinator(Arc::clone(coordinator));
213        }
214        event_detector.set_polling_request_sender(polling_request_sender);
215        let event_detector = Arc::new(event_detector);
216
217        let mut broker = Self {
218            registry,
219            subscription_manager,
220            event_processor,
221            _callback_server: callback_server,
222            firewall_coordinator,
223            event_detector,
224            polling_scheduler,
225            _event_sender: event_sender,
226            event_receiver: Some(event_receiver),
227            config,
228            shutdown_signal: Arc::new(AtomicBool::new(false)),
229            background_tasks: Vec::new(),
230            upnp_receiver: Some(upnp_receiver),
231            event_router: Some(event_router),
232            polling_request_receiver: Some(polling_request_receiver),
233        };
234
235        // Start background processing
236        broker.start_background_processing().await?;
237
238        info!("EventBroker initialized successfully");
239
240        Ok(broker)
241    }
242
243    /// Create callback server with proper event routing
244    async fn create_callback_server_with_routing(
245        config: &BrokerConfig,
246        event_sender: mpsc::UnboundedSender<callback_server::router::NotificationPayload>,
247    ) -> BrokerResult<Arc<CallbackServer>> {
248        let server = CallbackServer::new(config.callback_port_range, event_sender)
249            .await
250            .map_err(|e| BrokerError::CallbackServer(e.to_string()))?;
251
252        Ok(Arc::new(server))
253    }
254
255    /// Check if this is the first subscription for a given device IP
256    /// This should be called BEFORE creating the new subscription
257    async fn is_first_subscription_for_device(&self, device_ip: IpAddr) -> bool {
258        // Check all currently registered speaker/service pairs
259        let registered_pairs = self.registry.list_registrations().await;
260
261        // Count how many are for this device IP
262        let existing_count = registered_pairs
263            .iter()
264            .filter(|(_, pair)| pair.speaker_ip == device_ip)
265            .count();
266
267        // If there are no existing pairs for this device, it will be the first
268        // If there's 1, it means we just registered it, so this is still the first
269        existing_count <= 1
270    }
271
272    /// Start all background processing tasks
273    async fn start_background_processing(&mut self) -> BrokerResult<()> {
274        debug!("Starting background processing tasks");
275
276        // Start UPnP event processing using the pre-connected receiver
277        if let Some(upnp_receiver) = self.upnp_receiver.take() {
278            let upnp_processor = Arc::clone(&self.event_processor);
279            let upnp_task = tokio::spawn(async move {
280                upnp_processor.start_upnp_processing(upnp_receiver).await;
281            });
282            self.background_tasks.push(upnp_task);
283        }
284
285        // Start polling request processing using pre-created channel
286        if let Some(polling_request_receiver) = self.polling_request_receiver.take() {
287            self.start_polling_request_processing(polling_request_receiver)
288                .await;
289        }
290
291        // Start event activity monitoring
292        let monitoring_handle = self.event_detector.start_monitoring().await;
293        self.background_tasks.push(monitoring_handle);
294
295        // Start subscription renewal monitoring
296        self.start_subscription_renewal_monitoring().await;
297
298        debug!("Background processing tasks started");
299
300        Ok(())
301    }
302
303    /// Start processing polling requests
304    async fn start_polling_request_processing(
305        &mut self,
306        mut receiver: mpsc::UnboundedReceiver<PollingRequest>,
307    ) {
308        let polling_scheduler = Arc::clone(&self.polling_scheduler);
309        let subscription_manager = Arc::clone(&self.subscription_manager);
310
311        let task = tokio::spawn(async move {
312            info!("Starting polling request processing");
313
314            while let Some(request) = receiver.recv().await {
315                match request.action {
316                    PollingAction::Start => {
317                        debug!(
318                            speaker_ip = %request.speaker_service_pair.speaker_ip,
319                            service = ?request.speaker_service_pair.service,
320                            reason = ?request.reason,
321                            registration_id = %request.registration_id,
322                            "Starting polling for speaker service"
323                        );
324
325                        if let Err(e) = polling_scheduler
326                            .start_polling(
327                                request.registration_id,
328                                request.speaker_service_pair.clone(),
329                            )
330                            .await
331                        {
332                            error!(
333                                registration_id = %request.registration_id,
334                                speaker_ip = %request.speaker_service_pair.speaker_ip,
335                                service = ?request.speaker_service_pair.service,
336                                error = %e,
337                                "Failed to start polling"
338                            );
339                        } else {
340                            // Mark polling as active in subscription
341                            if let Some(subscription) = subscription_manager
342                                .get_subscription(request.registration_id)
343                                .await
344                            {
345                                subscription.set_polling_active(true);
346                            }
347                        }
348                    }
349                    PollingAction::Stop => {
350                        debug!(
351                            speaker_ip = %request.speaker_service_pair.speaker_ip,
352                            service = ?request.speaker_service_pair.service,
353                            registration_id = %request.registration_id,
354                            "Stopping polling for speaker service"
355                        );
356
357                        if let Err(e) = polling_scheduler
358                            .stop_polling(request.registration_id)
359                            .await
360                        {
361                            error!(
362                                registration_id = %request.registration_id,
363                                speaker_ip = %request.speaker_service_pair.speaker_ip,
364                                service = ?request.speaker_service_pair.service,
365                                error = %e,
366                                "Failed to stop polling"
367                            );
368                        } else {
369                            // Mark polling as inactive in subscription
370                            if let Some(subscription) = subscription_manager
371                                .get_subscription(request.registration_id)
372                                .await
373                            {
374                                subscription.set_polling_active(false);
375                            }
376                        }
377                    }
378                }
379            }
380
381            info!("Polling request processing stopped");
382        });
383
384        self.background_tasks.push(task);
385    }
386
387    /// Start subscription renewal monitoring
388    async fn start_subscription_renewal_monitoring(&mut self) {
389        let subscription_manager = Arc::clone(&self.subscription_manager);
390        let renewal_threshold = self.config.renewal_threshold;
391
392        let task = tokio::spawn(async move {
393            info!("Starting subscription renewal monitoring");
394
395            let mut interval = tokio::time::interval(renewal_threshold / 2); // Check twice as often as threshold
396
397            loop {
398                interval.tick().await;
399
400                match subscription_manager.check_renewals().await {
401                    Ok(renewed_count) => {
402                        if renewed_count > 0 {
403                            debug!(renewed_count = renewed_count, "Renewed subscriptions");
404                        }
405                    }
406                    Err(e) => {
407                        error!(
408                            error = %e,
409                            "Error during subscription renewal check"
410                        );
411                    }
412                }
413            }
414        });
415
416        self.background_tasks.push(task);
417    }
418
419    /// Register a speaker/service pair for event streaming
420    pub async fn register_speaker_service(
421        &self,
422        speaker_ip: IpAddr,
423        service: Service,
424    ) -> BrokerResult<RegistrationResult> {
425        debug!(
426            speaker_ip = %speaker_ip,
427            service = ?service,
428            "Registering speaker service"
429        );
430
431        // Check for duplicates and register
432        let registration_id = self.registry.register(speaker_ip, service).await?;
433        let was_duplicate = self.registry.is_registered(speaker_ip, service).await;
434
435        if was_duplicate {
436            debug!(
437                registration_id = %registration_id,
438                "Registration already exists"
439            );
440        }
441
442        let pair = SpeakerServicePair::new(speaker_ip, service);
443
444        let mut polling_reason = None;
445        let firewall_status;
446
447        if self.config.force_polling_mode {
448            // Force polling mode: skip UPnP subscription entirely, go straight to polling
449            debug!(
450                registration_id = %registration_id,
451                speaker_ip = %speaker_ip,
452                service = ?service,
453                "Force polling mode: skipping UPnP subscription"
454            );
455
456            firewall_status = FirewallStatus::Blocked;
457            polling_reason = Some(PollingReason::ForcedPolling);
458
459            // Skip EventDetector registration — no UPnP events will arrive,
460            // so monitoring would just detect a false timeout.
461
462            // Start polling immediately
463            if let Err(e) = self
464                .polling_scheduler
465                .start_polling(registration_id, pair.clone())
466                .await
467            {
468                error!(
469                    registration_id = %registration_id,
470                    error = %e,
471                    "Failed to start forced polling"
472                );
473                let _ = self.registry.unregister(registration_id).await;
474                return Err(BrokerError::Polling(e));
475            }
476        } else {
477            // Normal mode: attempt UPnP subscription with firewall detection
478
479            // Check if this is the first subscription for this device
480            let is_first_for_device = self.is_first_subscription_for_device(speaker_ip).await;
481
482            // Get or trigger firewall detection for this device
483            firewall_status = if let Some(coordinator) = &self.firewall_coordinator {
484                if is_first_for_device {
485                    debug!(
486                        speaker_ip = %speaker_ip,
487                        "First subscription for device, triggering firewall detection"
488                    );
489                    coordinator.on_first_subscription(speaker_ip).await
490                } else {
491                    coordinator.get_device_status(speaker_ip).await
492                }
493            } else {
494                FirewallStatus::Unknown
495            };
496
497            // Create subscription
498            let subscription_result = self
499                .subscription_manager
500                .create_subscription(registration_id, pair.clone())
501                .await;
502
503            match subscription_result {
504                Ok(subscription) => {
505                    debug!(
506                        subscription_id = %subscription.subscription_id(),
507                        "Created UPnP subscription"
508                    );
509
510                    // Register subscription ID with EventRouter for event routing
511                    if let Some(router) = &self.event_router {
512                        router
513                            .register(subscription.subscription_id().to_string())
514                            .await;
515                        debug!(
516                            subscription_id = %subscription.subscription_id(),
517                            "Registered subscription with EventRouter"
518                        );
519                    }
520
521                    // Register with event detector for timeout monitoring
522                    self.event_detector
523                        .register_subscription(registration_id, pair.clone())
524                        .await;
525
526                    // Evaluate firewall status for immediate polling decision
527                    if let Some(request) = self
528                        .event_detector
529                        .evaluate_firewall_status(registration_id, &pair)
530                        .await
531                    {
532                        polling_reason = Some(request.reason.clone());
533
534                        // Start polling immediately
535                        if let Err(e) = self
536                            .polling_scheduler
537                            .start_polling(registration_id, pair.clone())
538                            .await
539                        {
540                            error!(
541                                registration_id = %registration_id,
542                                error = %e,
543                                "Failed to start immediate polling"
544                            );
545                        } else {
546                            subscription.set_polling_active(true);
547                            debug!(
548                                registration_id = %registration_id,
549                                reason = ?request.reason,
550                                "Started immediate polling"
551                            );
552                        }
553                    }
554                }
555                Err(e) => {
556                    error!(
557                        registration_id = %registration_id,
558                        error = %e,
559                        "Failed to create subscription, falling back to polling"
560                    );
561                    polling_reason = Some(PollingReason::SubscriptionFailed);
562
563                    // Start polling as fallback
564                    if let Err(e) = self
565                        .polling_scheduler
566                        .start_polling(registration_id, pair.clone())
567                        .await
568                    {
569                        error!(
570                            registration_id = %registration_id,
571                            error = %e,
572                            "Failed to start fallback polling"
573                        );
574                        // Remove registration since both subscription and polling failed
575                        let _ = self.registry.unregister(registration_id).await;
576                        return Err(BrokerError::Polling(e));
577                    } else {
578                        debug!(
579                            registration_id = %registration_id,
580                            "Started fallback polling due to subscription failure"
581                        );
582                    }
583                }
584            }
585        }
586
587        let result = RegistrationResult {
588            registration_id,
589            firewall_status,
590            polling_reason,
591            was_duplicate,
592        };
593
594        debug!(
595            registration_id = %result.registration_id,
596            firewall_status = ?result.firewall_status,
597            polling_reason = ?result.polling_reason,
598            was_duplicate = result.was_duplicate,
599            "Registration completed"
600        );
601
602        Ok(result)
603    }
604
605    /// Unregister a speaker/service pair
606    pub async fn unregister_speaker_service(
607        &self,
608        registration_id: RegistrationId,
609    ) -> BrokerResult<SpeakerServicePair> {
610        debug!(registration_id = %registration_id, "Unregistering subscription");
611
612        // Get the pair before removing
613        let pair = self.registry.get_pair(registration_id).await.ok_or({
614            BrokerError::Registry(crate::error::RegistryError::NotFound(registration_id))
615        })?;
616
617        // Stop polling if active
618        if let Err(e) = self.polling_scheduler.stop_polling(registration_id).await {
619            warn!(
620                registration_id = %registration_id,
621                error = %e,
622                "Failed to stop polling during unregistration"
623            );
624        }
625
626        // Remove subscription
627        if let Err(e) = self
628            .subscription_manager
629            .remove_subscription(registration_id)
630            .await
631        {
632            warn!(
633                registration_id = %registration_id,
634                error = %e,
635                "Failed to remove subscription during unregistration"
636            );
637        }
638
639        // Unregister from event detector
640        self.event_detector
641            .unregister_subscription(registration_id)
642            .await;
643
644        // Remove from registry
645        let removed_pair = self.registry.unregister(registration_id).await?;
646
647        debug!(
648            speaker_ip = %pair.speaker_ip,
649            service = ?pair.service,
650            registration_id = %registration_id,
651            "Unregistration completed"
652        );
653
654        Ok(removed_pair)
655    }
656
657    /// Get an event iterator for consuming events
658    /// This consumes the broker's event receiver, so it can only be called once
659    pub fn event_iterator(&mut self) -> BrokerResult<EventIterator> {
660        let receiver = self.event_receiver.take().ok_or_else(|| {
661            BrokerError::Configuration("Event iterator already created".to_string())
662        })?;
663
664        let iterator = EventIterator::new(receiver);
665
666        Ok(iterator)
667    }
668
669    /// Get comprehensive statistics about the broker
670    pub async fn stats(&self) -> BrokerStats {
671        let registry_stats = self.registry.stats().await;
672        let subscription_stats = self.subscription_manager.stats().await;
673        let polling_stats = self.polling_scheduler.stats().await;
674        let event_processor_stats = self.event_processor.stats().await;
675        let event_detector_stats = self.event_detector.stats().await;
676
677        BrokerStats {
678            registry_stats,
679            subscription_stats,
680            polling_stats,
681            event_processor_stats,
682            event_detector_stats,
683            firewall_status: FirewallStatus::Unknown, // Status is now per-device
684            background_tasks_count: self.background_tasks.len(),
685        }
686    }
687
688    /// Get current firewall status (returns Unknown since status is now per-device)
689    pub async fn firewall_status(&self) -> FirewallStatus {
690        // Since firewall status is now per-device, this method returns Unknown
691        // Use get_device_firewall_status() for specific device status
692        FirewallStatus::Unknown
693    }
694
695    /// Get firewall status for a specific device
696    pub async fn get_device_firewall_status(&self, device_ip: IpAddr) -> FirewallStatus {
697        if let Some(coordinator) = &self.firewall_coordinator {
698            coordinator.get_device_status(device_ip).await
699        } else {
700            FirewallStatus::Unknown
701        }
702    }
703
704    /// Manually trigger firewall detection for a specific device
705    pub async fn trigger_firewall_detection(
706        &self,
707        device_ip: IpAddr,
708    ) -> BrokerResult<FirewallStatus> {
709        if let Some(coordinator) = &self.firewall_coordinator {
710            // Trigger detection by calling on_first_subscription
711            // This will start monitoring for the device
712            Ok(coordinator.on_first_subscription(device_ip).await)
713        } else {
714            Err(BrokerError::Configuration(
715                "Firewall detection is disabled".to_string(),
716            ))
717        }
718    }
719
720    /// Shutdown the broker and all background tasks
721    pub async fn shutdown(self) -> BrokerResult<()> {
722        info!("Shutting down EventBroker");
723
724        // Signal shutdown
725        self.shutdown_signal.store(true, Ordering::Relaxed);
726
727        // Shutdown polling scheduler
728        if let Err(e) = self.polling_scheduler.shutdown_all().await {
729            warn!(error = %e, "Error during polling shutdown");
730        }
731
732        // Shutdown subscription manager
733        if let Err(e) = self.subscription_manager.shutdown().await {
734            warn!(error = %e, "Error during subscription shutdown");
735        }
736
737        // Cancel background tasks
738        for task in self.background_tasks {
739            task.abort();
740        }
741
742        // Clear registry
743        self.registry.clear().await;
744
745        info!("EventBroker shutdown complete");
746
747        Ok(())
748    }
749}
750
751/// Comprehensive statistics about the broker
752#[derive(Debug)]
753pub struct BrokerStats {
754    pub registry_stats: crate::registry::RegistryStats,
755    pub subscription_stats: crate::subscription::manager::SubscriptionStats,
756    pub polling_stats: crate::polling::scheduler::PollingSchedulerStats,
757    pub event_processor_stats: crate::events::processor::EventProcessorStats,
758    pub event_detector_stats: crate::subscription::event_detector::EventDetectorStats,
759    pub firewall_status: FirewallStatus,
760    pub background_tasks_count: usize,
761}
762
763impl std::fmt::Display for BrokerStats {
764    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
765        writeln!(f, "=== EventBroker Stats ===")?;
766        writeln!(f, "Firewall Status: {:?}", self.firewall_status)?;
767        writeln!(f, "Background Tasks: {}", self.background_tasks_count)?;
768        writeln!(f)?;
769        write!(f, "{}", self.registry_stats)?;
770        writeln!(f)?;
771        write!(f, "{}", self.subscription_stats)?;
772        writeln!(f)?;
773        write!(f, "{}", self.polling_stats)?;
774        writeln!(f)?;
775        write!(f, "{}", self.event_processor_stats)?;
776        writeln!(f)?;
777        write!(f, "{}", self.event_detector_stats)?;
778        Ok(())
779    }
780}
781
782#[cfg(test)]
783mod tests {
784    use super::*;
785
786    #[tokio::test]
787    async fn test_broker_creation() {
788        let config = BrokerConfig::no_firewall_detection();
789        let broker = EventBroker::new(config).await;
790
791        // Note: This test might fail without proper callback server setup
792        // In a real implementation, we'd need to mock the callback server
793        assert!(broker.is_ok() || broker.is_err()); // Either works or fails gracefully
794    }
795
796    #[test]
797    fn test_registration_result() {
798        let result = RegistrationResult {
799            registration_id: RegistrationId::new(1),
800            firewall_status: FirewallStatus::Accessible,
801            polling_reason: Some(PollingReason::FirewallBlocked),
802            was_duplicate: false,
803        };
804
805        assert_eq!(result.registration_id.as_u64(), 1);
806        assert_eq!(result.firewall_status, FirewallStatus::Accessible);
807        assert_eq!(result.polling_reason, Some(PollingReason::FirewallBlocked));
808        assert!(!result.was_duplicate);
809    }
810
811    #[test]
812    fn test_polling_reason_display() {
813        assert_eq!(
814            PollingReason::FirewallBlocked.to_string(),
815            "firewall blocked"
816        );
817        assert_eq!(PollingReason::EventTimeout.to_string(), "event timeout");
818        assert_eq!(
819            PollingReason::SubscriptionFailed.to_string(),
820            "subscription failed"
821        );
822        assert_eq!(PollingReason::NetworkIssues.to_string(), "network issues");
823        assert_eq!(PollingReason::ForcedPolling.to_string(), "forced polling");
824    }
825}