Skip to main content

sonos_stream/events/
iterator.rs

1//! Event iterator interfaces for consuming events
2//!
3//! This module provides both sync and async iterator interfaces for consuming events,
4//! with sync being the best practice for local state management and async for real-time processing.
5
6use futures::Stream;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tokio::time::timeout;
13
14use crate::error::{EventProcessingError, EventProcessingResult};
15use crate::events::types::{EnrichedEvent, EventSource};
16use crate::registry::RegistrationId;
17
18/// Main event iterator that provides both sync and async interfaces
19pub struct EventIterator {
20    /// Receiver for enriched events
21    receiver: Option<mpsc::UnboundedReceiver<EnrichedEvent>>,
22
23    /// Buffer for events when using sync iteration
24    buffered_events: VecDeque<EnrichedEvent>,
25
26    /// Tokio runtime handle for sync iteration
27    runtime_handle: tokio::runtime::Handle,
28
29    /// Statistics tracking
30    stats: EventIteratorStats,
31
32    /// Whether the iterator has been consumed
33    consumed: bool,
34}
35
36impl EventIterator {
37    /// Create a new event iterator
38    pub fn new(receiver: mpsc::UnboundedReceiver<EnrichedEvent>) -> Self {
39        let runtime_handle = tokio::runtime::Handle::try_current()
40            .expect("EventIterator must be created within a Tokio runtime");
41
42        Self {
43            receiver: Some(receiver),
44            buffered_events: VecDeque::new(),
45            runtime_handle,
46            stats: EventIteratorStats::new(),
47            consumed: false,
48        }
49    }
50
51    /// ASYNC INTERFACE - Get the next event asynchronously
52    /// Best for real-time event processing where you want to handle events as they arrive
53    pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
54        if self.consumed {
55            return None;
56        }
57
58        // First check buffered events
59        if let Some(event) = self.buffered_events.pop_front() {
60            self.stats.events_delivered += 1;
61            return Some(event);
62        }
63
64        // Check for automatic resync needs
65        if let Some(resync_event) = self.check_and_emit_resync().await {
66            self.stats.resync_events_emitted += 1;
67            self.stats.events_delivered += 1;
68            return Some(resync_event);
69        }
70
71        // Get next event from receiver
72        if let Some(receiver) = &mut self.receiver {
73            match receiver.recv().await {
74                Some(event) => {
75                    self.stats.events_received += 1;
76                    self.stats.events_delivered += 1;
77                    Some(event)
78                }
79                None => {
80                    // Channel closed
81                    self.consumed = true;
82                    None
83                }
84            }
85        } else {
86            None
87        }
88    }
89
90    /// ASYNC INTERFACE - Get next event with timeout
91    pub async fn next_timeout(
92        &mut self,
93        timeout_duration: Duration,
94    ) -> EventProcessingResult<Option<EnrichedEvent>> {
95        match timeout(timeout_duration, self.next_async()).await {
96            Ok(event) => Ok(event),
97            Err(_) => {
98                self.stats.timeouts += 1;
99                Err(EventProcessingError::Timeout)
100            }
101        }
102    }
103
104    /// ASYNC INTERFACE - Try to get next event without blocking
105    pub fn try_next(&mut self) -> EventProcessingResult<Option<EnrichedEvent>> {
106        if self.consumed {
107            return Ok(None);
108        }
109
110        // Check buffered events first
111        if let Some(event) = self.buffered_events.pop_front() {
112            self.stats.events_delivered += 1;
113            return Ok(Some(event));
114        }
115
116        // Try to receive from channel without blocking
117        if let Some(receiver) = &mut self.receiver {
118            match receiver.try_recv() {
119                Ok(event) => {
120                    self.stats.events_received += 1;
121                    self.stats.events_delivered += 1;
122                    Ok(Some(event))
123                }
124                Err(mpsc::error::TryRecvError::Empty) => Ok(None),
125                Err(mpsc::error::TryRecvError::Disconnected) => {
126                    self.consumed = true;
127                    Ok(None)
128                }
129            }
130        } else {
131            Ok(None)
132        }
133    }
134
135    /// SYNC INTERFACE - Get iterator for simple loop patterns
136    /// **BEST PRACTICE for local state management**
137    ///
138    /// This is the recommended interface for maintaining local state from events.
139    /// Use like: `for event in events.iter() { /* handle event */ }`
140    pub fn iter(&mut self) -> SyncEventIterator<'_> {
141        SyncEventIterator::new(self)
142    }
143
144    /// Check for automatic resync needs
145    async fn check_and_emit_resync(&mut self) -> Option<EnrichedEvent> {
146        // This is a placeholder implementation
147        // In a real implementation, this would coordinate with ResyncDetector
148        // to check if any registrations need resync events
149        None
150    }
151
152    /// Buffer multiple events for batch processing
153    pub async fn next_batch(&mut self, max_count: usize, max_wait: Duration) -> Vec<EnrichedEvent> {
154        let mut events = Vec::new();
155        let start = tokio::time::Instant::now();
156
157        // Get first event (wait for it)
158        if let Some(first_event) = self.next_async().await {
159            events.push(first_event);
160        } else {
161            return events; // No events available
162        }
163
164        // Try to get additional events without blocking
165        while events.len() < max_count && start.elapsed() < max_wait {
166            match self.try_next() {
167                Ok(Some(event)) => events.push(event),
168                Ok(None) => break, // No more events available
169                Err(_) => break,   // Error occurred
170            }
171        }
172
173        events
174    }
175
176    /// Get iterator statistics
177    pub fn stats(&self) -> &EventIteratorStats {
178        &self.stats
179    }
180
181    /// Check if the iterator has been consumed (channel closed)
182    pub fn is_consumed(&self) -> bool {
183        self.consumed
184    }
185
186    /// Peek at the next event without consuming it
187    pub async fn peek(&mut self) -> Option<&EnrichedEvent> {
188        // If we don't have buffered events, try to get one
189        if self.buffered_events.is_empty() {
190            if let Some(event) = self.next_async().await {
191                self.buffered_events.push_back(event);
192                self.stats.events_delivered -= 1; // Don't count peeked events as delivered
193            }
194        }
195
196        self.buffered_events.front()
197    }
198
199    /// Filter events by registration ID
200    pub fn filter_by_registration(self, registration_id: RegistrationId) -> FilteredEventIterator {
201        FilteredEventIterator::new(self, move |event| event.registration_id == registration_id)
202    }
203
204    /// Filter events by service type
205    pub fn filter_by_service(self, service: sonos_api::Service) -> FilteredEventIterator {
206        FilteredEventIterator::new(self, move |event| event.service == service)
207    }
208
209    /// Filter events by source type (UPnP or polling)
210    pub fn filter_by_source_type(self, source_type: EventSourceType) -> FilteredEventIterator {
211        FilteredEventIterator::new(self, move |event| {
212            matches!(
213                (&event.event_source, source_type),
214                (EventSource::UPnPNotification { .. }, EventSourceType::UPnP)
215                    | (
216                        EventSource::PollingDetection { .. },
217                        EventSourceType::Polling
218                    )
219            )
220        })
221    }
222}
223
224/// Synchronous event iterator for simple loop patterns
225/// **This is the best practice for local state management**
226pub struct SyncEventIterator<'a> {
227    inner: &'a mut EventIterator,
228}
229
230impl<'a> SyncEventIterator<'a> {
231    fn new(inner: &'a mut EventIterator) -> Self {
232        Self { inner }
233    }
234}
235
236impl<'a> Iterator for SyncEventIterator<'a> {
237    type Item = EnrichedEvent;
238
239    fn next(&mut self) -> Option<Self::Item> {
240        // Block on async next() for sync interface
241        let runtime_handle = self.inner.runtime_handle.clone();
242        runtime_handle.block_on(self.inner.next_async())
243    }
244}
245
246/// Implement Stream trait for EventIterator for advanced async usage
247impl Stream for EventIterator {
248    type Item = EnrichedEvent;
249
250    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
251        if self.consumed {
252            return Poll::Ready(None);
253        }
254
255        // Check buffered events first
256        if let Some(event) = self.buffered_events.pop_front() {
257            self.stats.events_delivered += 1;
258            return Poll::Ready(Some(event));
259        }
260
261        // Poll the receiver
262        if let Some(receiver) = &mut self.receiver {
263            match receiver.poll_recv(cx) {
264                Poll::Ready(Some(event)) => {
265                    self.stats.events_received += 1;
266                    self.stats.events_delivered += 1;
267                    Poll::Ready(Some(event))
268                }
269                Poll::Ready(None) => {
270                    self.consumed = true;
271                    Poll::Ready(None)
272                }
273                Poll::Pending => Poll::Pending,
274            }
275        } else {
276            Poll::Ready(None)
277        }
278    }
279}
280
281/// Filter criteria for event source types
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum EventSourceType {
284    UPnP,
285    Polling,
286}
287
288/// Filtered event iterator that applies a predicate to events
289pub struct FilteredEventIterator {
290    inner: EventIterator,
291    predicate: Box<dyn Fn(&EnrichedEvent) -> bool + Send>,
292}
293
294impl FilteredEventIterator {
295    fn new<F>(inner: EventIterator, predicate: F) -> Self
296    where
297        F: Fn(&EnrichedEvent) -> bool + Send + 'static,
298    {
299        Self {
300            inner,
301            predicate: Box::new(predicate),
302        }
303    }
304
305    /// Get the next filtered event asynchronously
306    pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
307        loop {
308            match self.inner.next_async().await {
309                Some(event) => {
310                    if (self.predicate)(&event) {
311                        return Some(event);
312                    }
313                    // Event doesn't match filter, continue
314                }
315                None => return None,
316            }
317        }
318    }
319
320    /// Get sync iterator for the filtered events
321    pub fn iter(&mut self) -> FilteredSyncIterator<'_> {
322        FilteredSyncIterator::new(self)
323    }
324}
325
326/// Sync iterator for filtered events
327pub struct FilteredSyncIterator<'a> {
328    inner: &'a mut FilteredEventIterator,
329}
330
331impl<'a> FilteredSyncIterator<'a> {
332    fn new(inner: &'a mut FilteredEventIterator) -> Self {
333        Self { inner }
334    }
335}
336
337impl<'a> Iterator for FilteredSyncIterator<'a> {
338    type Item = EnrichedEvent;
339
340    fn next(&mut self) -> Option<Self::Item> {
341        let runtime_handle = self.inner.inner.runtime_handle.clone();
342        runtime_handle.block_on(self.inner.next_async())
343    }
344}
345
346/// Statistics for event iterator usage
347#[derive(Debug, Clone)]
348pub struct EventIteratorStats {
349    /// Events received from the channel
350    pub events_received: u64,
351
352    /// Events delivered to the consumer
353    pub events_delivered: u64,
354
355    /// Resync events generated
356    pub resync_events_emitted: u64,
357
358    /// Timeouts occurred
359    pub timeouts: u64,
360}
361
362impl EventIteratorStats {
363    fn new() -> Self {
364        Self {
365            events_received: 0,
366            events_delivered: 0,
367            resync_events_emitted: 0,
368            timeouts: 0,
369        }
370    }
371
372    /// Get the delivery rate (events delivered / events received)
373    pub fn delivery_rate(&self) -> f64 {
374        if self.events_received == 0 {
375            1.0
376        } else {
377            self.events_delivered as f64 / self.events_received as f64
378        }
379    }
380}
381
382impl std::fmt::Display for EventIteratorStats {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        writeln!(f, "Event Iterator Stats:")?;
385        writeln!(f, "  Events received: {}", self.events_received)?;
386        writeln!(f, "  Events delivered: {}", self.events_delivered)?;
387        writeln!(f, "  Resync events: {}", self.resync_events_emitted)?;
388        writeln!(f, "  Timeouts: {}", self.timeouts)?;
389        writeln!(f, "  Delivery rate: {:.1}%", self.delivery_rate() * 100.0)?;
390        Ok(())
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::events::types::{AVTransportState, EventData, EventSource};
398
399    use std::time::SystemTime;
400
401    fn create_test_event(registration_id: RegistrationId) -> EnrichedEvent {
402        EnrichedEvent {
403            registration_id,
404            speaker_ip: "192.168.1.100".parse().unwrap(),
405            service: sonos_api::Service::AVTransport,
406            event_source: EventSource::UPnPNotification {
407                subscription_id: "test-sid".to_string(),
408            },
409            timestamp: SystemTime::now(),
410            event_data: EventData::AVTransport(AVTransportState {
411                transport_state: Some("PLAYING".to_string()),
412                transport_status: None,
413                speed: None,
414                current_track_uri: None,
415                track_duration: None,
416                track_metadata: None,
417                rel_time: None,
418                abs_time: None,
419                rel_count: None,
420                abs_count: None,
421                play_mode: None,
422                next_track_uri: None,
423                next_track_metadata: None,
424                queue_length: None,
425            }),
426        }
427    }
428
429    #[tokio::test]
430    async fn test_event_iterator_creation() {
431        let (_sender, receiver) = mpsc::unbounded_channel();
432        let iterator = EventIterator::new(receiver);
433
434        assert!(!iterator.is_consumed());
435        assert_eq!(iterator.stats().events_received, 0);
436        assert_eq!(iterator.stats().events_delivered, 0);
437    }
438
439    #[tokio::test]
440    async fn test_async_iteration() {
441        let (sender, receiver) = mpsc::unbounded_channel();
442        let mut iterator = EventIterator::new(receiver);
443
444        // Send test event
445        let test_event = create_test_event(RegistrationId::new(1));
446        sender.send(test_event.clone()).unwrap();
447
448        // Receive event
449        let received = iterator.next_async().await;
450        assert!(received.is_some());
451        let event = received.unwrap();
452        assert_eq!(event.registration_id, test_event.registration_id);
453        assert_eq!(iterator.stats().events_received, 1);
454        assert_eq!(iterator.stats().events_delivered, 1);
455    }
456
457    #[tokio::test]
458    async fn test_try_next() {
459        let (sender, receiver) = mpsc::unbounded_channel();
460        let mut iterator = EventIterator::new(receiver);
461
462        // Try without any events
463        let result = iterator.try_next().unwrap();
464        assert!(result.is_none());
465
466        // Send event and try again
467        let test_event = create_test_event(RegistrationId::new(1));
468        sender.send(test_event.clone()).unwrap();
469
470        let result = iterator.try_next().unwrap();
471        assert!(result.is_some());
472        assert_eq!(result.unwrap().registration_id, test_event.registration_id);
473    }
474
475    #[tokio::test]
476    async fn test_next_timeout() {
477        let (_sender, receiver) = mpsc::unbounded_channel();
478        let mut iterator = EventIterator::new(receiver);
479
480        // Should timeout since no events are sent
481        let result = iterator.next_timeout(Duration::from_millis(100)).await;
482        assert!(result.is_err());
483        assert!(matches!(result.unwrap_err(), EventProcessingError::Timeout));
484        assert_eq!(iterator.stats().timeouts, 1);
485    }
486
487    #[tokio::test]
488    async fn test_next_batch() {
489        let (sender, receiver) = mpsc::unbounded_channel();
490        let mut iterator = EventIterator::new(receiver);
491
492        // Send multiple events
493        for i in 1..=5 {
494            let event = create_test_event(RegistrationId::new(i));
495            sender.send(event).unwrap();
496        }
497
498        // Get batch of 3 events
499        let batch = iterator.next_batch(3, Duration::from_millis(100)).await;
500        assert_eq!(batch.len(), 3);
501        assert_eq!(batch[0].registration_id.as_u64(), 1);
502        assert_eq!(batch[1].registration_id.as_u64(), 2);
503        assert_eq!(batch[2].registration_id.as_u64(), 3);
504    }
505
506    #[test]
507    fn test_sync_iteration() {
508        let rt = tokio::runtime::Runtime::new().unwrap();
509
510        // Create iterator inside runtime context to capture the handle
511        let (sender, mut iterator) = rt.block_on(async {
512            let (sender, receiver) = mpsc::unbounded_channel();
513            let iterator = EventIterator::new(receiver);
514            (sender, iterator)
515        });
516
517        // Send test events and sync-iterate OUTSIDE block_on(),
518        // so SyncEventIterator::block_on() doesn't nest runtimes
519        for i in 1..=3 {
520            let event = create_test_event(RegistrationId::new(i));
521            sender.send(event).unwrap();
522        }
523        drop(sender);
524
525        let events: Vec<_> = iterator.iter().collect();
526        assert_eq!(events.len(), 3);
527        assert_eq!(events[0].registration_id.as_u64(), 1);
528        assert_eq!(events[1].registration_id.as_u64(), 2);
529        assert_eq!(events[2].registration_id.as_u64(), 3);
530    }
531
532    #[test]
533    fn test_filtered_iterator() {
534        let rt = tokio::runtime::Runtime::new().unwrap();
535
536        // Create iterator inside runtime context to capture the handle
537        let (sender, iterator) = rt.block_on(async {
538            let (sender, receiver) = mpsc::unbounded_channel();
539            let iterator = EventIterator::new(receiver);
540            (sender, iterator)
541        });
542
543        // Send events and sync-iterate OUTSIDE block_on(),
544        // so FilteredSyncIterator::block_on() doesn't nest runtimes
545        let event1 = create_test_event(RegistrationId::new(1));
546        let event2 = create_test_event(RegistrationId::new(2));
547        let event3 = create_test_event(RegistrationId::new(1));
548
549        sender.send(event1).unwrap();
550        sender.send(event2).unwrap();
551        sender.send(event3).unwrap();
552        drop(sender);
553
554        let mut filtered = iterator.filter_by_registration(RegistrationId::new(1));
555
556        let events: Vec<_> = filtered.iter().collect();
557        assert_eq!(events.len(), 2);
558        assert_eq!(events[0].registration_id.as_u64(), 1);
559        assert_eq!(events[1].registration_id.as_u64(), 1);
560    }
561
562    #[tokio::test]
563    async fn test_peek() {
564        let (sender, receiver) = mpsc::unbounded_channel();
565        let mut iterator = EventIterator::new(receiver);
566
567        let test_event = create_test_event(RegistrationId::new(1));
568        sender.send(test_event.clone()).unwrap();
569
570        // Peek at the event
571        let peeked = iterator.peek().await;
572        assert!(peeked.is_some());
573        assert_eq!(peeked.unwrap().registration_id, test_event.registration_id);
574
575        // Event should still be available for next()
576        let next = iterator.next_async().await;
577        assert!(next.is_some());
578        assert_eq!(next.unwrap().registration_id, test_event.registration_id);
579    }
580
581    #[test]
582    fn test_stats() {
583        let stats = EventIteratorStats::new();
584        assert_eq!(stats.delivery_rate(), 1.0);
585
586        let stats_with_data = EventIteratorStats {
587            events_received: 10,
588            events_delivered: 8,
589            resync_events_emitted: 1,
590            timeouts: 2,
591        };
592        assert_eq!(stats_with_data.delivery_rate(), 0.8);
593    }
594}