1use futures::Stream;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tokio::time::timeout;
13
14use crate::error::{EventProcessingError, EventProcessingResult};
15use crate::events::types::{EnrichedEvent, EventSource};
16use crate::registry::RegistrationId;
17
18pub struct EventIterator {
20 receiver: Option<mpsc::UnboundedReceiver<EnrichedEvent>>,
22
23 buffered_events: VecDeque<EnrichedEvent>,
25
26 runtime_handle: tokio::runtime::Handle,
28
29 stats: EventIteratorStats,
31
32 consumed: bool,
34}
35
36impl EventIterator {
37 pub fn new(receiver: mpsc::UnboundedReceiver<EnrichedEvent>) -> Self {
39 let runtime_handle = tokio::runtime::Handle::try_current()
40 .expect("EventIterator must be created within a Tokio runtime");
41
42 Self {
43 receiver: Some(receiver),
44 buffered_events: VecDeque::new(),
45 runtime_handle,
46 stats: EventIteratorStats::new(),
47 consumed: false,
48 }
49 }
50
51 pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
54 if self.consumed {
55 return None;
56 }
57
58 if let Some(event) = self.buffered_events.pop_front() {
60 self.stats.events_delivered += 1;
61 return Some(event);
62 }
63
64 if let Some(resync_event) = self.check_and_emit_resync().await {
66 self.stats.resync_events_emitted += 1;
67 self.stats.events_delivered += 1;
68 return Some(resync_event);
69 }
70
71 if let Some(receiver) = &mut self.receiver {
73 match receiver.recv().await {
74 Some(event) => {
75 self.stats.events_received += 1;
76 self.stats.events_delivered += 1;
77 Some(event)
78 }
79 None => {
80 self.consumed = true;
82 None
83 }
84 }
85 } else {
86 None
87 }
88 }
89
90 pub async fn next_timeout(
92 &mut self,
93 timeout_duration: Duration,
94 ) -> EventProcessingResult<Option<EnrichedEvent>> {
95 match timeout(timeout_duration, self.next_async()).await {
96 Ok(event) => Ok(event),
97 Err(_) => {
98 self.stats.timeouts += 1;
99 Err(EventProcessingError::Timeout)
100 }
101 }
102 }
103
104 pub fn try_next(&mut self) -> EventProcessingResult<Option<EnrichedEvent>> {
106 if self.consumed {
107 return Ok(None);
108 }
109
110 if let Some(event) = self.buffered_events.pop_front() {
112 self.stats.events_delivered += 1;
113 return Ok(Some(event));
114 }
115
116 if let Some(receiver) = &mut self.receiver {
118 match receiver.try_recv() {
119 Ok(event) => {
120 self.stats.events_received += 1;
121 self.stats.events_delivered += 1;
122 Ok(Some(event))
123 }
124 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
125 Err(mpsc::error::TryRecvError::Disconnected) => {
126 self.consumed = true;
127 Ok(None)
128 }
129 }
130 } else {
131 Ok(None)
132 }
133 }
134
135 pub fn iter(&mut self) -> SyncEventIterator<'_> {
141 SyncEventIterator::new(self)
142 }
143
144 async fn check_and_emit_resync(&mut self) -> Option<EnrichedEvent> {
146 None
150 }
151
152 pub async fn next_batch(&mut self, max_count: usize, max_wait: Duration) -> Vec<EnrichedEvent> {
154 let mut events = Vec::new();
155 let start = tokio::time::Instant::now();
156
157 if let Some(first_event) = self.next_async().await {
159 events.push(first_event);
160 } else {
161 return events; }
163
164 while events.len() < max_count && start.elapsed() < max_wait {
166 match self.try_next() {
167 Ok(Some(event)) => events.push(event),
168 Ok(None) => break, Err(_) => break, }
171 }
172
173 events
174 }
175
176 pub fn stats(&self) -> &EventIteratorStats {
178 &self.stats
179 }
180
181 pub fn is_consumed(&self) -> bool {
183 self.consumed
184 }
185
186 pub async fn peek(&mut self) -> Option<&EnrichedEvent> {
188 if self.buffered_events.is_empty() {
190 if let Some(event) = self.next_async().await {
191 self.buffered_events.push_back(event);
192 self.stats.events_delivered -= 1; }
194 }
195
196 self.buffered_events.front()
197 }
198
199 pub fn filter_by_registration(self, registration_id: RegistrationId) -> FilteredEventIterator {
201 FilteredEventIterator::new(self, move |event| event.registration_id == registration_id)
202 }
203
204 pub fn filter_by_service(self, service: sonos_api::Service) -> FilteredEventIterator {
206 FilteredEventIterator::new(self, move |event| event.service == service)
207 }
208
209 pub fn filter_by_source_type(self, source_type: EventSourceType) -> FilteredEventIterator {
211 FilteredEventIterator::new(self, move |event| {
212 matches!(
213 (&event.event_source, source_type),
214 (EventSource::UPnPNotification { .. }, EventSourceType::UPnP)
215 | (
216 EventSource::PollingDetection { .. },
217 EventSourceType::Polling
218 )
219 )
220 })
221 }
222}
223
224pub struct SyncEventIterator<'a> {
227 inner: &'a mut EventIterator,
228}
229
230impl<'a> SyncEventIterator<'a> {
231 fn new(inner: &'a mut EventIterator) -> Self {
232 Self { inner }
233 }
234}
235
236impl<'a> Iterator for SyncEventIterator<'a> {
237 type Item = EnrichedEvent;
238
239 fn next(&mut self) -> Option<Self::Item> {
240 let runtime_handle = self.inner.runtime_handle.clone();
242 runtime_handle.block_on(self.inner.next_async())
243 }
244}
245
246impl Stream for EventIterator {
248 type Item = EnrichedEvent;
249
250 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
251 if self.consumed {
252 return Poll::Ready(None);
253 }
254
255 if let Some(event) = self.buffered_events.pop_front() {
257 self.stats.events_delivered += 1;
258 return Poll::Ready(Some(event));
259 }
260
261 if let Some(receiver) = &mut self.receiver {
263 match receiver.poll_recv(cx) {
264 Poll::Ready(Some(event)) => {
265 self.stats.events_received += 1;
266 self.stats.events_delivered += 1;
267 Poll::Ready(Some(event))
268 }
269 Poll::Ready(None) => {
270 self.consumed = true;
271 Poll::Ready(None)
272 }
273 Poll::Pending => Poll::Pending,
274 }
275 } else {
276 Poll::Ready(None)
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum EventSourceType {
284 UPnP,
285 Polling,
286}
287
288pub struct FilteredEventIterator {
290 inner: EventIterator,
291 predicate: Box<dyn Fn(&EnrichedEvent) -> bool + Send>,
292}
293
294impl FilteredEventIterator {
295 fn new<F>(inner: EventIterator, predicate: F) -> Self
296 where
297 F: Fn(&EnrichedEvent) -> bool + Send + 'static,
298 {
299 Self {
300 inner,
301 predicate: Box::new(predicate),
302 }
303 }
304
305 pub async fn next_async(&mut self) -> Option<EnrichedEvent> {
307 loop {
308 match self.inner.next_async().await {
309 Some(event) => {
310 if (self.predicate)(&event) {
311 return Some(event);
312 }
313 }
315 None => return None,
316 }
317 }
318 }
319
320 pub fn iter(&mut self) -> FilteredSyncIterator<'_> {
322 FilteredSyncIterator::new(self)
323 }
324}
325
326pub struct FilteredSyncIterator<'a> {
328 inner: &'a mut FilteredEventIterator,
329}
330
331impl<'a> FilteredSyncIterator<'a> {
332 fn new(inner: &'a mut FilteredEventIterator) -> Self {
333 Self { inner }
334 }
335}
336
337impl<'a> Iterator for FilteredSyncIterator<'a> {
338 type Item = EnrichedEvent;
339
340 fn next(&mut self) -> Option<Self::Item> {
341 let runtime_handle = self.inner.inner.runtime_handle.clone();
342 runtime_handle.block_on(self.inner.next_async())
343 }
344}
345
346#[derive(Debug, Clone)]
348pub struct EventIteratorStats {
349 pub events_received: u64,
351
352 pub events_delivered: u64,
354
355 pub resync_events_emitted: u64,
357
358 pub timeouts: u64,
360}
361
362impl EventIteratorStats {
363 fn new() -> Self {
364 Self {
365 events_received: 0,
366 events_delivered: 0,
367 resync_events_emitted: 0,
368 timeouts: 0,
369 }
370 }
371
372 pub fn delivery_rate(&self) -> f64 {
374 if self.events_received == 0 {
375 1.0
376 } else {
377 self.events_delivered as f64 / self.events_received as f64
378 }
379 }
380}
381
382impl std::fmt::Display for EventIteratorStats {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 writeln!(f, "Event Iterator Stats:")?;
385 writeln!(f, " Events received: {}", self.events_received)?;
386 writeln!(f, " Events delivered: {}", self.events_delivered)?;
387 writeln!(f, " Resync events: {}", self.resync_events_emitted)?;
388 writeln!(f, " Timeouts: {}", self.timeouts)?;
389 writeln!(f, " Delivery rate: {:.1}%", self.delivery_rate() * 100.0)?;
390 Ok(())
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::events::types::{AVTransportState, EventData, EventSource};
398
399 use std::time::SystemTime;
400
401 fn create_test_event(registration_id: RegistrationId) -> EnrichedEvent {
402 EnrichedEvent {
403 registration_id,
404 speaker_ip: "192.168.1.100".parse().unwrap(),
405 service: sonos_api::Service::AVTransport,
406 event_source: EventSource::UPnPNotification {
407 subscription_id: "test-sid".to_string(),
408 },
409 timestamp: SystemTime::now(),
410 event_data: EventData::AVTransport(AVTransportState {
411 transport_state: Some("PLAYING".to_string()),
412 transport_status: None,
413 speed: None,
414 current_track_uri: None,
415 track_duration: None,
416 track_metadata: None,
417 rel_time: None,
418 abs_time: None,
419 rel_count: None,
420 abs_count: None,
421 play_mode: None,
422 next_track_uri: None,
423 next_track_metadata: None,
424 queue_length: None,
425 }),
426 }
427 }
428
429 #[tokio::test]
430 async fn test_event_iterator_creation() {
431 let (_sender, receiver) = mpsc::unbounded_channel();
432 let iterator = EventIterator::new(receiver);
433
434 assert!(!iterator.is_consumed());
435 assert_eq!(iterator.stats().events_received, 0);
436 assert_eq!(iterator.stats().events_delivered, 0);
437 }
438
439 #[tokio::test]
440 async fn test_async_iteration() {
441 let (sender, receiver) = mpsc::unbounded_channel();
442 let mut iterator = EventIterator::new(receiver);
443
444 let test_event = create_test_event(RegistrationId::new(1));
446 sender.send(test_event.clone()).unwrap();
447
448 let received = iterator.next_async().await;
450 assert!(received.is_some());
451 let event = received.unwrap();
452 assert_eq!(event.registration_id, test_event.registration_id);
453 assert_eq!(iterator.stats().events_received, 1);
454 assert_eq!(iterator.stats().events_delivered, 1);
455 }
456
457 #[tokio::test]
458 async fn test_try_next() {
459 let (sender, receiver) = mpsc::unbounded_channel();
460 let mut iterator = EventIterator::new(receiver);
461
462 let result = iterator.try_next().unwrap();
464 assert!(result.is_none());
465
466 let test_event = create_test_event(RegistrationId::new(1));
468 sender.send(test_event.clone()).unwrap();
469
470 let result = iterator.try_next().unwrap();
471 assert!(result.is_some());
472 assert_eq!(result.unwrap().registration_id, test_event.registration_id);
473 }
474
475 #[tokio::test]
476 async fn test_next_timeout() {
477 let (_sender, receiver) = mpsc::unbounded_channel();
478 let mut iterator = EventIterator::new(receiver);
479
480 let result = iterator.next_timeout(Duration::from_millis(100)).await;
482 assert!(result.is_err());
483 assert!(matches!(result.unwrap_err(), EventProcessingError::Timeout));
484 assert_eq!(iterator.stats().timeouts, 1);
485 }
486
487 #[tokio::test]
488 async fn test_next_batch() {
489 let (sender, receiver) = mpsc::unbounded_channel();
490 let mut iterator = EventIterator::new(receiver);
491
492 for i in 1..=5 {
494 let event = create_test_event(RegistrationId::new(i));
495 sender.send(event).unwrap();
496 }
497
498 let batch = iterator.next_batch(3, Duration::from_millis(100)).await;
500 assert_eq!(batch.len(), 3);
501 assert_eq!(batch[0].registration_id.as_u64(), 1);
502 assert_eq!(batch[1].registration_id.as_u64(), 2);
503 assert_eq!(batch[2].registration_id.as_u64(), 3);
504 }
505
506 #[test]
507 fn test_sync_iteration() {
508 let rt = tokio::runtime::Runtime::new().unwrap();
509
510 let (sender, mut iterator) = rt.block_on(async {
512 let (sender, receiver) = mpsc::unbounded_channel();
513 let iterator = EventIterator::new(receiver);
514 (sender, iterator)
515 });
516
517 for i in 1..=3 {
520 let event = create_test_event(RegistrationId::new(i));
521 sender.send(event).unwrap();
522 }
523 drop(sender);
524
525 let events: Vec<_> = iterator.iter().collect();
526 assert_eq!(events.len(), 3);
527 assert_eq!(events[0].registration_id.as_u64(), 1);
528 assert_eq!(events[1].registration_id.as_u64(), 2);
529 assert_eq!(events[2].registration_id.as_u64(), 3);
530 }
531
532 #[test]
533 fn test_filtered_iterator() {
534 let rt = tokio::runtime::Runtime::new().unwrap();
535
536 let (sender, iterator) = rt.block_on(async {
538 let (sender, receiver) = mpsc::unbounded_channel();
539 let iterator = EventIterator::new(receiver);
540 (sender, iterator)
541 });
542
543 let event1 = create_test_event(RegistrationId::new(1));
546 let event2 = create_test_event(RegistrationId::new(2));
547 let event3 = create_test_event(RegistrationId::new(1));
548
549 sender.send(event1).unwrap();
550 sender.send(event2).unwrap();
551 sender.send(event3).unwrap();
552 drop(sender);
553
554 let mut filtered = iterator.filter_by_registration(RegistrationId::new(1));
555
556 let events: Vec<_> = filtered.iter().collect();
557 assert_eq!(events.len(), 2);
558 assert_eq!(events[0].registration_id.as_u64(), 1);
559 assert_eq!(events[1].registration_id.as_u64(), 1);
560 }
561
562 #[tokio::test]
563 async fn test_peek() {
564 let (sender, receiver) = mpsc::unbounded_channel();
565 let mut iterator = EventIterator::new(receiver);
566
567 let test_event = create_test_event(RegistrationId::new(1));
568 sender.send(test_event.clone()).unwrap();
569
570 let peeked = iterator.peek().await;
572 assert!(peeked.is_some());
573 assert_eq!(peeked.unwrap().registration_id, test_event.registration_id);
574
575 let next = iterator.next_async().await;
577 assert!(next.is_some());
578 assert_eq!(next.unwrap().registration_id, test_event.registration_id);
579 }
580
581 #[test]
582 fn test_stats() {
583 let stats = EventIteratorStats::new();
584 assert_eq!(stats.delivery_rate(), 1.0);
585
586 let stats_with_data = EventIteratorStats {
587 events_received: 10,
588 events_delivered: 8,
589 resync_events_emitted: 1,
590 timeouts: 2,
591 };
592 assert_eq!(stats_with_data.delivery_rate(), 0.8);
593 }
594}