use futures::Stream;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::error::{EventProcessingError, EventProcessingResult};
use crate::events::types::{EnrichedEvent, EventSource};
use crate::registry::RegistrationId;
pub struct EventIterator {
receiver: Option<mpsc::UnboundedReceiver<EnrichedEvent>>,
buffered_events: VecDeque<EnrichedEvent>,
runtime_handle: tokio::runtime::Handle,
stats: EventIteratorStats,
consumed: bool,
}
impl EventIterator {
pub fn new(receiver: mpsc::UnboundedReceiver<EnrichedEvent>) -> Self {
let runtime_handle = tokio::runtime::Handle::try_current()
.expect("EventIterator must be created within a Tokio runtime");
Self {
receiver: Some(receiver),
buffered_events: VecDeque::new(),
runtime_handle,
stats: EventIteratorStats::new(),
consumed: false,
}
}
pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
if self.consumed {
return None;
}
if let Some(event) = self.buffered_events.pop_front() {
self.stats.events_delivered += 1;
return Some(event);
}
if let Some(resync_event) = self.check_and_emit_resync().await {
self.stats.resync_events_emitted += 1;
self.stats.events_delivered += 1;
return Some(resync_event);
}
if let Some(receiver) = &mut self.receiver {
match receiver.recv().await {
Some(event) => {
self.stats.events_received += 1;
self.stats.events_delivered += 1;
Some(event)
}
None => {
self.consumed = true;
None
}
}
} else {
None
}
}
pub async fn next_timeout(
&mut self,
timeout_duration: Duration,
) -> EventProcessingResult<Option<EnrichedEvent>> {
match timeout(timeout_duration, self.next_async()).await {
Ok(event) => Ok(event),
Err(_) => {
self.stats.timeouts += 1;
Err(EventProcessingError::Timeout)
}
}
}
pub fn try_next(&mut self) -> EventProcessingResult<Option<EnrichedEvent>> {
if self.consumed {
return Ok(None);
}
if let Some(event) = self.buffered_events.pop_front() {
self.stats.events_delivered += 1;
return Ok(Some(event));
}
if let Some(receiver) = &mut self.receiver {
match receiver.try_recv() {
Ok(event) => {
self.stats.events_received += 1;
self.stats.events_delivered += 1;
Ok(Some(event))
}
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Disconnected) => {
self.consumed = true;
Ok(None)
}
}
} else {
Ok(None)
}
}
pub fn iter(&mut self) -> SyncEventIterator<'_> {
SyncEventIterator::new(self)
}
async fn check_and_emit_resync(&mut self) -> Option<EnrichedEvent> {
None
}
pub async fn next_batch(&mut self, max_count: usize, max_wait: Duration) -> Vec<EnrichedEvent> {
let mut events = Vec::new();
let start = tokio::time::Instant::now();
if let Some(first_event) = self.next_async().await {
events.push(first_event);
} else {
return events; }
while events.len() < max_count && start.elapsed() < max_wait {
match self.try_next() {
Ok(Some(event)) => events.push(event),
Ok(None) => break, Err(_) => break, }
}
events
}
pub fn stats(&self) -> &EventIteratorStats {
&self.stats
}
pub fn is_consumed(&self) -> bool {
self.consumed
}
pub async fn peek(&mut self) -> Option<&EnrichedEvent> {
if self.buffered_events.is_empty() {
if let Some(event) = self.next_async().await {
self.buffered_events.push_back(event);
self.stats.events_delivered -= 1; }
}
self.buffered_events.front()
}
pub fn filter_by_registration(self, registration_id: RegistrationId) -> FilteredEventIterator {
FilteredEventIterator::new(self, move |event| event.registration_id == registration_id)
}
pub fn filter_by_service(self, service: sonos_api::Service) -> FilteredEventIterator {
FilteredEventIterator::new(self, move |event| event.service == service)
}
pub fn filter_by_source_type(self, source_type: EventSourceType) -> FilteredEventIterator {
FilteredEventIterator::new(self, move |event| {
matches!(
(&event.event_source, source_type),
(EventSource::UPnPNotification { .. }, EventSourceType::UPnP)
| (
EventSource::PollingDetection { .. },
EventSourceType::Polling
)
)
})
}
}
pub struct SyncEventIterator<'a> {
inner: &'a mut EventIterator,
}
impl<'a> SyncEventIterator<'a> {
fn new(inner: &'a mut EventIterator) -> Self {
Self { inner }
}
}
impl<'a> Iterator for SyncEventIterator<'a> {
type Item = EnrichedEvent;
fn next(&mut self) -> Option<Self::Item> {
let runtime_handle = self.inner.runtime_handle.clone();
runtime_handle.block_on(self.inner.next_async())
}
}
impl Stream for EventIterator {
type Item = EnrichedEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.consumed {
return Poll::Ready(None);
}
if let Some(event) = self.buffered_events.pop_front() {
self.stats.events_delivered += 1;
return Poll::Ready(Some(event));
}
if let Some(receiver) = &mut self.receiver {
match receiver.poll_recv(cx) {
Poll::Ready(Some(event)) => {
self.stats.events_received += 1;
self.stats.events_delivered += 1;
Poll::Ready(Some(event))
}
Poll::Ready(None) => {
self.consumed = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EventSourceType {
UPnP,
Polling,
}
pub struct FilteredEventIterator {
inner: EventIterator,
predicate: Box<dyn Fn(&EnrichedEvent) -> bool + Send>,
}
impl FilteredEventIterator {
fn new<F>(inner: EventIterator, predicate: F) -> Self
where
F: Fn(&EnrichedEvent) -> bool + Send + 'static,
{
Self {
inner,
predicate: Box::new(predicate),
}
}
pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
loop {
match self.inner.next_async().await {
Some(event) => {
if (self.predicate)(&event) {
return Some(event);
}
}
None => return None,
}
}
}
pub fn iter(&mut self) -> FilteredSyncIterator<'_> {
FilteredSyncIterator::new(self)
}
}
pub struct FilteredSyncIterator<'a> {
inner: &'a mut FilteredEventIterator,
}
impl<'a> FilteredSyncIterator<'a> {
fn new(inner: &'a mut FilteredEventIterator) -> Self {
Self { inner }
}
}
impl<'a> Iterator for FilteredSyncIterator<'a> {
type Item = EnrichedEvent;
fn next(&mut self) -> Option<Self::Item> {
let runtime_handle = self.inner.inner.runtime_handle.clone();
runtime_handle.block_on(self.inner.next_async())
}
}
#[derive(Debug, Clone)]
pub struct EventIteratorStats {
pub events_received: u64,
pub events_delivered: u64,
pub resync_events_emitted: u64,
pub timeouts: u64,
}
impl EventIteratorStats {
fn new() -> Self {
Self {
events_received: 0,
events_delivered: 0,
resync_events_emitted: 0,
timeouts: 0,
}
}
pub fn delivery_rate(&self) -> f64 {
if self.events_received == 0 {
1.0
} else {
self.events_delivered as f64 / self.events_received as f64
}
}
}
impl std::fmt::Display for EventIteratorStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Event Iterator Stats:")?;
writeln!(f, " Events received: {}", self.events_received)?;
writeln!(f, " Events delivered: {}", self.events_delivered)?;
writeln!(f, " Resync events: {}", self.resync_events_emitted)?;
writeln!(f, " Timeouts: {}", self.timeouts)?;
writeln!(f, " Delivery rate: {:.1}%", self.delivery_rate() * 100.0)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::types::{AVTransportState, EventData, EventSource};
use std::time::SystemTime;
fn create_test_event(registration_id: RegistrationId) -> EnrichedEvent {
EnrichedEvent {
registration_id,
speaker_ip: "192.168.1.100".parse().unwrap(),
service: sonos_api::Service::AVTransport,
event_source: EventSource::UPnPNotification {
subscription_id: "test-sid".to_string(),
},
timestamp: SystemTime::now(),
event_data: EventData::AVTransport(AVTransportState {
transport_state: Some("PLAYING".to_string()),
transport_status: None,
speed: None,
current_track_uri: None,
track_duration: None,
track_metadata: None,
rel_time: None,
abs_time: None,
rel_count: None,
abs_count: None,
play_mode: None,
next_track_uri: None,
next_track_metadata: None,
queue_length: None,
}),
}
}
#[tokio::test]
async fn test_event_iterator_creation() {
let (_sender, receiver) = mpsc::unbounded_channel();
let iterator = EventIterator::new(receiver);
assert!(!iterator.is_consumed());
assert_eq!(iterator.stats().events_received, 0);
assert_eq!(iterator.stats().events_delivered, 0);
}
#[tokio::test]
async fn test_async_iteration() {
let (sender, receiver) = mpsc::unbounded_channel();
let mut iterator = EventIterator::new(receiver);
let test_event = create_test_event(RegistrationId::new(1));
sender.send(test_event.clone()).unwrap();
let received = iterator.next_async().await;
assert!(received.is_some());
let event = received.unwrap();
assert_eq!(event.registration_id, test_event.registration_id);
assert_eq!(iterator.stats().events_received, 1);
assert_eq!(iterator.stats().events_delivered, 1);
}
#[tokio::test]
async fn test_try_next() {
let (sender, receiver) = mpsc::unbounded_channel();
let mut iterator = EventIterator::new(receiver);
let result = iterator.try_next().unwrap();
assert!(result.is_none());
let test_event = create_test_event(RegistrationId::new(1));
sender.send(test_event.clone()).unwrap();
let result = iterator.try_next().unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().registration_id, test_event.registration_id);
}
#[tokio::test]
async fn test_next_timeout() {
let (_sender, receiver) = mpsc::unbounded_channel();
let mut iterator = EventIterator::new(receiver);
let result = iterator.next_timeout(Duration::from_millis(100)).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), EventProcessingError::Timeout));
assert_eq!(iterator.stats().timeouts, 1);
}
#[tokio::test]
async fn test_next_batch() {
let (sender, receiver) = mpsc::unbounded_channel();
let mut iterator = EventIterator::new(receiver);
for i in 1..=5 {
let event = create_test_event(RegistrationId::new(i));
sender.send(event).unwrap();
}
let batch = iterator.next_batch(3, Duration::from_millis(100)).await;
assert_eq!(batch.len(), 3);
assert_eq!(batch[0].registration_id.as_u64(), 1);
assert_eq!(batch[1].registration_id.as_u64(), 2);
assert_eq!(batch[2].registration_id.as_u64(), 3);
}
#[test]
fn test_sync_iteration() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (sender, mut iterator) = rt.block_on(async {
let (sender, receiver) = mpsc::unbounded_channel();
let iterator = EventIterator::new(receiver);
(sender, iterator)
});
for i in 1..=3 {
let event = create_test_event(RegistrationId::new(i));
sender.send(event).unwrap();
}
drop(sender);
let events: Vec<_> = iterator.iter().collect();
assert_eq!(events.len(), 3);
assert_eq!(events[0].registration_id.as_u64(), 1);
assert_eq!(events[1].registration_id.as_u64(), 2);
assert_eq!(events[2].registration_id.as_u64(), 3);
}
#[test]
fn test_filtered_iterator() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (sender, iterator) = rt.block_on(async {
let (sender, receiver) = mpsc::unbounded_channel();
let iterator = EventIterator::new(receiver);
(sender, iterator)
});
let event1 = create_test_event(RegistrationId::new(1));
let event2 = create_test_event(RegistrationId::new(2));
let event3 = create_test_event(RegistrationId::new(1));
sender.send(event1).unwrap();
sender.send(event2).unwrap();
sender.send(event3).unwrap();
drop(sender);
let mut filtered = iterator.filter_by_registration(RegistrationId::new(1));
let events: Vec<_> = filtered.iter().collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0].registration_id.as_u64(), 1);
assert_eq!(events[1].registration_id.as_u64(), 1);
}
#[tokio::test]
async fn test_peek() {
let (sender, receiver) = mpsc::unbounded_channel();
let mut iterator = EventIterator::new(receiver);
let test_event = create_test_event(RegistrationId::new(1));
sender.send(test_event.clone()).unwrap();
let peeked = iterator.peek().await;
assert!(peeked.is_some());
assert_eq!(peeked.unwrap().registration_id, test_event.registration_id);
let next = iterator.next_async().await;
assert!(next.is_some());
assert_eq!(next.unwrap().registration_id, test_event.registration_id);
}
#[test]
fn test_stats() {
let stats = EventIteratorStats::new();
assert_eq!(stats.delivery_rate(), 1.0);
let stats_with_data = EventIteratorStats {
events_received: 10,
events_delivered: 8,
resync_events_emitted: 1,
timeouts: 2,
};
assert_eq!(stats_with_data.delivery_rate(), 0.8);
}
}