Skip to main content

sonos_state/
iter.rs

1//! Sync-first change iterator for property updates
2//!
3//! Provides a blocking iterator over property change events.
4//! Only emits events for properties that have been watched.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::StateManager;
10//!
11//! let manager = StateManager::new()?;
12//! // ... add devices and watch properties ...
13//!
14//! // Blocking iteration
15//! for event in manager.iter() {
16//!     println!("{} changed on {}", event.property_key, event.speaker_id);
17//! }
18//!
19//! // Non-blocking check
20//! for event in manager.iter().try_iter() {
21//!     println!("{} changed", event.property_key);
22//! }
23//!
24//! // With timeout
25//! if let Some(event) = manager.iter().recv_timeout(Duration::from_secs(1)) {
26//!     println!("Got event: {:?}", event);
27//! }
28//! ```
29
30use std::sync::{mpsc, Arc, Mutex};
31use std::time::Duration;
32
33use crate::state::ChangeEvent;
34
35/// Blocking iterator over property change events
36///
37/// Receives change events for watched properties via `std::sync::mpsc`.
38/// All methods are synchronous - no async/await required.
39pub struct ChangeIterator {
40    rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
41}
42
43impl ChangeIterator {
44    /// Create a new ChangeIterator from a shared receiver
45    pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>) -> Self {
46        Self { rx }
47    }
48
49    /// Block until the next event is available
50    ///
51    /// Returns `None` if the channel is closed.
52    pub fn recv(&self) -> Option<ChangeEvent> {
53        let event = self.rx.lock().ok()?.recv().ok();
54        if let Some(ref e) = event {
55            tracing::trace!(
56                "ChangeIterator::recv yielded {} for {}",
57                e.property_key,
58                e.speaker_id.as_str()
59            );
60        }
61        event
62    }
63
64    /// Block until the next event or timeout expires
65    ///
66    /// Returns `None` if the timeout expires or channel is closed.
67    pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent> {
68        let event = self.rx.lock().ok()?.recv_timeout(timeout).ok();
69        if let Some(ref e) = event {
70            tracing::trace!(
71                "ChangeIterator::recv_timeout yielded {} for {}",
72                e.property_key,
73                e.speaker_id.as_str()
74            );
75        }
76        event
77    }
78
79    /// Try to receive an event without blocking
80    ///
81    /// Returns `None` if no event is currently available.
82    pub fn try_recv(&self) -> Option<ChangeEvent> {
83        let event = self.rx.lock().ok()?.try_recv().ok();
84        if let Some(ref e) = event {
85            tracing::trace!(
86                "ChangeIterator::try_recv yielded {} for {}",
87                e.property_key,
88                e.speaker_id.as_str()
89            );
90        }
91        event
92    }
93
94    /// Get a non-blocking iterator over currently available events
95    ///
96    /// Returns an iterator that yields all events currently in the queue
97    /// without blocking. Useful for batch processing.
98    pub fn try_iter(&self) -> TryIter<'_> {
99        TryIter { inner: self }
100    }
101
102    /// Get a blocking iterator with timeout
103    ///
104    /// Returns an iterator that blocks for up to `timeout` on each call
105    /// to `next()`. Stops when timeout expires without events.
106    pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
107        TimeoutIter {
108            inner: self,
109            timeout,
110        }
111    }
112}
113
114impl Iterator for ChangeIterator {
115    type Item = ChangeEvent;
116
117    /// Block until the next change event
118    ///
119    /// Returns `None` if the channel is closed.
120    fn next(&mut self) -> Option<Self::Item> {
121        self.recv()
122    }
123}
124
125/// Non-blocking iterator over currently available events
126pub struct TryIter<'a> {
127    inner: &'a ChangeIterator,
128}
129
130impl<'a> Iterator for TryIter<'a> {
131    type Item = ChangeEvent;
132
133    fn next(&mut self) -> Option<Self::Item> {
134        self.inner.try_recv()
135    }
136}
137
138/// Blocking iterator with timeout
139pub struct TimeoutIter<'a> {
140    inner: &'a ChangeIterator,
141    timeout: Duration,
142}
143
144impl<'a> Iterator for TimeoutIter<'a> {
145    type Item = ChangeEvent;
146
147    fn next(&mut self) -> Option<Self::Item> {
148        self.inner.recv_timeout(self.timeout)
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::model::SpeakerId;
156    use sonos_api::Service;
157    use std::thread;
158    use std::time::Instant;
159
160    fn create_test_event() -> ChangeEvent {
161        ChangeEvent {
162            speaker_id: SpeakerId::new("test-speaker"),
163            property_key: "volume",
164            service: Service::RenderingControl,
165            timestamp: Instant::now(),
166        }
167    }
168
169    #[test]
170    fn test_try_recv_empty() {
171        let (tx, rx) = mpsc::channel();
172        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
173
174        // Should return None when empty
175        assert!(iter.try_recv().is_none());
176
177        // Prevent unused warning
178        drop(tx);
179    }
180
181    #[test]
182    fn test_try_recv_with_event() {
183        let (tx, rx) = mpsc::channel();
184        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
185
186        // Send an event
187        tx.send(create_test_event()).unwrap();
188
189        // Should receive the event
190        let event = iter.try_recv().unwrap();
191        assert_eq!(event.property_key, "volume");
192        assert_eq!(event.speaker_id.as_str(), "test-speaker");
193
194        // Should return None now
195        assert!(iter.try_recv().is_none());
196    }
197
198    #[test]
199    fn test_recv_timeout() {
200        let (tx, rx) = mpsc::channel();
201        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
202
203        // Should timeout when empty
204        let start = Instant::now();
205        let result = iter.recv_timeout(Duration::from_millis(50));
206        assert!(result.is_none());
207        assert!(start.elapsed() >= Duration::from_millis(45));
208
209        // Prevent unused warning
210        drop(tx);
211    }
212
213    #[test]
214    fn test_recv_timeout_with_event() {
215        let (tx, rx) = mpsc::channel();
216        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
217
218        // Send event after a short delay
219        let tx_clone = tx.clone();
220        thread::spawn(move || {
221            thread::sleep(Duration::from_millis(10));
222            tx_clone.send(create_test_event()).unwrap();
223        });
224
225        // Should receive within timeout
226        let result = iter.recv_timeout(Duration::from_millis(100));
227        assert!(result.is_some());
228
229        drop(tx);
230    }
231
232    #[test]
233    fn test_try_iter() {
234        let (tx, rx) = mpsc::channel();
235        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
236
237        // Send multiple events
238        for _ in 0..3 {
239            tx.send(create_test_event()).unwrap();
240        }
241
242        // Should get all events via try_iter
243        let events: Vec<_> = iter.try_iter().collect();
244        assert_eq!(events.len(), 3);
245
246        // Should be empty now
247        assert!(iter.try_recv().is_none());
248
249        drop(tx);
250    }
251
252    #[test]
253    fn test_blocking_recv() {
254        let (tx, rx) = mpsc::channel();
255        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
256
257        // Send event from another thread
258        thread::spawn(move || {
259            thread::sleep(Duration::from_millis(10));
260            tx.send(create_test_event()).unwrap();
261        });
262
263        // Should block and receive
264        let event = iter.recv().unwrap();
265        assert_eq!(event.property_key, "volume");
266    }
267
268    #[test]
269    fn test_channel_closed() {
270        let (tx, rx) = mpsc::channel::<ChangeEvent>();
271        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
272
273        // Close the channel
274        drop(tx);
275
276        // Should return None
277        assert!(iter.recv().is_none());
278    }
279}