use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info, trace, warn};
use callback_server::{
router::{EventRouter, NotificationPayload},
FirewallDetectionCoordinator,
};
use sonos_api::events::EventProcessor as ApiEventProcessor;
use crate::error::{EventProcessingError, EventProcessingResult};
use crate::events::types::{EnrichedEvent, EventData, EventSource};
use crate::subscription::manager::SubscriptionManager;
pub struct EventProcessor {
api_processor: ApiEventProcessor,
subscription_manager: Arc<SubscriptionManager>,
event_sender: mpsc::UnboundedSender<EnrichedEvent>,
stats: Arc<RwLock<EventProcessorStats>>,
firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
}
impl EventProcessor {
pub fn new(
subscription_manager: Arc<SubscriptionManager>,
event_sender: mpsc::UnboundedSender<EnrichedEvent>,
firewall_coordinator: Option<Arc<FirewallDetectionCoordinator>>,
) -> Self {
Self {
api_processor: ApiEventProcessor::with_default_parsers(),
subscription_manager,
event_sender,
stats: Arc::new(RwLock::new(EventProcessorStats::new())),
firewall_coordinator,
}
}
pub async fn process_upnp_notification(
&self,
payload: NotificationPayload,
) -> EventProcessingResult<()> {
{
let mut stats = self.stats.write().await;
stats.upnp_events_received += 1;
}
let subscription_wrapper = self
.subscription_manager
.get_subscription_by_sid(&payload.subscription_id)
.await
.ok_or_else(|| {
EventProcessingError::Enrichment(format!(
"No subscription found for SID: {}",
payload.subscription_id
))
})?;
let pair = subscription_wrapper.speaker_service_pair();
let registration_id = subscription_wrapper.registration_id();
subscription_wrapper.record_event_received().await;
self.subscription_manager
.record_event_received(&payload.subscription_id)
.await;
if let Some(coordinator) = &self.firewall_coordinator {
coordinator.on_event_received(pair.speaker_ip).await;
}
let api_enriched_event = self
.api_processor
.process_upnp_event(
pair.speaker_ip, pair.service,
payload.subscription_id.clone(),
&payload.event_xml,
)
.map_err(|e| EventProcessingError::Parsing(format!("API processing failed: {e}")))?;
let event_data =
self.convert_api_event_data(&pair.service, api_enriched_event.event_data)?;
let enriched_event = EnrichedEvent::new(
registration_id,
pair.speaker_ip,
pair.service,
EventSource::UPnPNotification {
subscription_id: payload.subscription_id,
},
event_data,
);
debug!(
speaker_ip = %enriched_event.speaker_ip,
service = ?enriched_event.service,
event_source = ?enriched_event.event_source,
"Routing event to EventIterator channel"
);
self.event_sender
.send(enriched_event)
.map_err(|_| EventProcessingError::ChannelClosed)?;
{
let mut stats = self.stats.write().await;
stats.events_processed += 1;
}
Ok(())
}
pub async fn process_polling_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
{
let mut stats = self.stats.write().await;
stats.polling_events_received += 1;
}
debug!(
speaker_ip = %event.speaker_ip,
service = ?event.service,
event_source = ?event.event_source,
"Routing polling event to EventIterator channel"
);
self.event_sender
.send(event)
.map_err(|_| EventProcessingError::ChannelClosed)?;
{
let mut stats = self.stats.write().await;
stats.events_processed += 1;
}
Ok(())
}
pub async fn process_resync_event(&self, event: EnrichedEvent) -> EventProcessingResult<()> {
{
let mut stats = self.stats.write().await;
stats.resync_events_received += 1;
}
debug!(
speaker_ip = %event.speaker_ip,
service = ?event.service,
event_source = ?event.event_source,
"Routing resync event to EventIterator channel"
);
self.event_sender
.send(event)
.map_err(|_| EventProcessingError::ChannelClosed)?;
{
let mut stats = self.stats.write().await;
stats.events_processed += 1;
}
Ok(())
}
fn convert_api_event_data(
&self,
service: &sonos_api::Service,
api_event_data: Box<dyn std::any::Any + Send + Sync>,
) -> EventProcessingResult<EventData> {
match service {
sonos_api::Service::AVTransport => {
let event = api_event_data
.downcast::<sonos_api::services::av_transport::AVTransportEvent>()
.map_err(|_| {
EventProcessingError::Parsing(
"Failed to downcast AVTransport event".to_string(),
)
})?;
Ok(EventData::AVTransport(event.into_state()))
}
sonos_api::Service::RenderingControl => {
let event = api_event_data
.downcast::<sonos_api::services::rendering_control::RenderingControlEvent>()
.map_err(|_| {
EventProcessingError::Parsing(
"Failed to downcast RenderingControl event".to_string(),
)
})?;
Ok(EventData::RenderingControl(event.into_state()))
}
sonos_api::Service::GroupRenderingControl => {
let event = api_event_data
.downcast::<sonos_api::services::group_rendering_control::GroupRenderingControlEvent>()
.map_err(|_| EventProcessingError::Parsing("Failed to downcast GroupRenderingControl event".to_string()))?;
Ok(EventData::GroupRenderingControl(event.into_state()))
}
sonos_api::Service::ZoneGroupTopology => {
let event = api_event_data
.downcast::<sonos_api::services::zone_group_topology::ZoneGroupTopologyEvent>()
.map_err(|_| {
EventProcessingError::Parsing(
"Failed to downcast ZoneGroupTopology event".to_string(),
)
})?;
Ok(EventData::ZoneGroupTopology(event.into_state()))
}
sonos_api::Service::GroupManagement => {
let event = api_event_data
.downcast::<sonos_api::services::group_management::GroupManagementEvent>()
.map_err(|_| {
EventProcessingError::Parsing(
"Failed to downcast GroupManagement event".to_string(),
)
})?;
Ok(EventData::GroupManagement(event.into_state()))
}
}
}
pub async fn start_upnp_processing(
&self,
mut upnp_receiver: mpsc::UnboundedReceiver<NotificationPayload>,
) {
info!("Starting UPnP event processing using sonos-api framework");
let mut event_count = 0;
loop {
tokio::select! {
maybe_payload = upnp_receiver.recv() => {
match maybe_payload {
Some(payload) => {
event_count += 1;
debug!(
event_count,
subscription_id = %payload.subscription_id,
"Processing UPnP event"
);
match self.process_upnp_notification(payload).await {
Ok(()) => {
trace!(event_count, "UPnP event processed successfully");
}
Err(e) => {
error!(
event_count,
error = %e,
"Failed to process UPnP event"
);
let mut stats = self.stats.write().await;
stats.processing_errors += 1;
}
}
}
None => {
warn!("UPnP receiver channel closed");
break;
}
}
}
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
trace!(
events_processed = event_count,
"UPnP processor waiting for events"
);
}
}
}
info!("UPnP event processing stopped");
}
pub async fn start_polling_processing(
&self,
mut polling_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
) {
info!("Starting polling event processing");
while let Some(event) = polling_receiver.recv().await {
match self.process_polling_event(event).await {
Ok(()) => {
}
Err(e) => {
error!(
error = %e,
"Failed to process polling event"
);
let mut stats = self.stats.write().await;
stats.processing_errors += 1;
}
}
}
info!("Polling event processing stopped");
}
pub async fn start_resync_processing(
&self,
mut resync_receiver: mpsc::UnboundedReceiver<EnrichedEvent>,
) {
info!("Starting resync event processing");
while let Some(event) = resync_receiver.recv().await {
match self.process_resync_event(event).await {
Ok(()) => {
}
Err(e) => {
error!(
error = %e,
"Failed to process resync event"
);
let mut stats = self.stats.write().await;
stats.processing_errors += 1;
}
}
}
info!("Resync event processing stopped");
}
pub async fn stats(&self) -> EventProcessorStats {
let stats = self.stats.read().await;
stats.clone()
}
pub fn supported_services(&self) -> Vec<sonos_api::Service> {
self.api_processor.supported_services()
}
pub fn is_service_supported(&self, service: &sonos_api::Service) -> bool {
self.api_processor.supports_service(service)
}
}
#[derive(Debug, Clone)]
pub struct EventProcessorStats {
pub events_processed: u64,
pub upnp_events_received: u64,
pub polling_events_received: u64,
pub resync_events_received: u64,
pub processing_errors: u64,
pub unsupported_services: u64,
}
impl EventProcessorStats {
fn new() -> Self {
Self {
events_processed: 0,
upnp_events_received: 0,
polling_events_received: 0,
resync_events_received: 0,
processing_errors: 0,
unsupported_services: 0,
}
}
pub fn total_events_received(&self) -> u64 {
self.upnp_events_received + self.polling_events_received + self.resync_events_received
}
pub fn success_rate(&self) -> f64 {
let total = self.total_events_received();
if total == 0 {
1.0
} else {
self.events_processed as f64 / total as f64
}
}
}
impl std::fmt::Display for EventProcessorStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Event Processor Stats:")?;
writeln!(f, " Total processed: {}", self.events_processed)?;
writeln!(f, " Success rate: {:.1}%", self.success_rate() * 100.0)?;
writeln!(f, " Event sources:")?;
writeln!(f, " UPnP events: {}", self.upnp_events_received)?;
writeln!(f, " Polling events: {}", self.polling_events_received)?;
writeln!(f, " Resync events: {}", self.resync_events_received)?;
writeln!(f, " Errors:")?;
writeln!(f, " Processing errors: {}", self.processing_errors)?;
writeln!(f, " Unsupported services: {}", self.unsupported_services)?;
Ok(())
}
}
pub async fn create_integrated_event_router(
_event_processor: Arc<EventProcessor>,
) -> (
Arc<EventRouter>,
mpsc::UnboundedReceiver<NotificationPayload>,
) {
let (upnp_sender, upnp_receiver) = mpsc::unbounded_channel();
let router = Arc::new(EventRouter::new(upnp_sender));
(router, upnp_receiver)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_processor_creation() {
let (event_sender, _event_receiver) = mpsc::unbounded_channel();
let subscription_manager =
Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
let processor = EventProcessor::new(subscription_manager, event_sender, None);
assert_eq!(processor.supported_services().len(), 5); assert!(processor.is_service_supported(&sonos_api::Service::AVTransport));
assert!(processor.is_service_supported(&sonos_api::Service::RenderingControl));
assert!(processor.is_service_supported(&sonos_api::Service::GroupRenderingControl));
assert!(processor.is_service_supported(&sonos_api::Service::ZoneGroupTopology));
assert!(processor.is_service_supported(&sonos_api::Service::GroupManagement));
}
#[tokio::test]
async fn test_event_processor_stats() {
let (event_sender, _event_receiver) = mpsc::unbounded_channel();
let subscription_manager =
Arc::new(SubscriptionManager::new("http://callback.url".to_string()));
let processor = EventProcessor::new(subscription_manager, event_sender, None);
let stats = processor.stats().await;
assert_eq!(stats.events_processed, 0);
assert_eq!(stats.total_events_received(), 0);
assert_eq!(stats.success_rate(), 1.0);
}
}