Skip to main content

sonos_state/
iter.rs

1//! Sync-first change iterator for property updates
2//!
3//! Provides a blocking iterator over property change events.
4//! Only emits events for properties that have been watched.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::StateManager;
10//!
11//! let manager = StateManager::new()?;
12//! // ... add devices and watch properties ...
13//!
14//! // Blocking iteration
15//! for event in manager.iter() {
16//!     println!("{} changed on {}", event.property_key, event.speaker_id);
17//! }
18//!
19//! // Non-blocking check
20//! for event in manager.iter().try_iter() {
21//!     println!("{} changed", event.property_key);
22//! }
23//!
24//! // With timeout
25//! if let Some(event) = manager.iter().recv_timeout(Duration::from_secs(1)) {
26//!     println!("Got event: {:?}", event);
27//! }
28//! ```
29
30use std::sync::{mpsc, Arc, Mutex};
31use std::time::Duration;
32
33use crate::state::ChangeEvent;
34
35/// Blocking iterator over property change events
36///
37/// Receives change events for watched properties via `std::sync::mpsc`.
38/// All methods are synchronous - no async/await required.
39pub struct ChangeIterator {
40    rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
41}
42
43impl ChangeIterator {
44    /// Create a new ChangeIterator from a shared receiver
45    pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>) -> Self {
46        Self { rx }
47    }
48
49    /// Block until the next event is available
50    ///
51    /// Returns `None` if the channel is closed.
52    pub fn recv(&self) -> Option<ChangeEvent> {
53        self.rx.lock().ok()?.recv().ok()
54    }
55
56    /// Block until the next event or timeout expires
57    ///
58    /// Returns `None` if the timeout expires or channel is closed.
59    pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent> {
60        self.rx.lock().ok()?.recv_timeout(timeout).ok()
61    }
62
63    /// Try to receive an event without blocking
64    ///
65    /// Returns `None` if no event is currently available.
66    pub fn try_recv(&self) -> Option<ChangeEvent> {
67        self.rx.lock().ok()?.try_recv().ok()
68    }
69
70    /// Get a non-blocking iterator over currently available events
71    ///
72    /// Returns an iterator that yields all events currently in the queue
73    /// without blocking. Useful for batch processing.
74    pub fn try_iter(&self) -> TryIter<'_> {
75        TryIter { inner: self }
76    }
77
78    /// Get a blocking iterator with timeout
79    ///
80    /// Returns an iterator that blocks for up to `timeout` on each call
81    /// to `next()`. Stops when timeout expires without events.
82    pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
83        TimeoutIter {
84            inner: self,
85            timeout,
86        }
87    }
88}
89
90impl Iterator for ChangeIterator {
91    type Item = ChangeEvent;
92
93    /// Block until the next change event
94    ///
95    /// Returns `None` if the channel is closed.
96    fn next(&mut self) -> Option<Self::Item> {
97        self.recv()
98    }
99}
100
101/// Non-blocking iterator over currently available events
102pub struct TryIter<'a> {
103    inner: &'a ChangeIterator,
104}
105
106impl<'a> Iterator for TryIter<'a> {
107    type Item = ChangeEvent;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        self.inner.try_recv()
111    }
112}
113
114/// Blocking iterator with timeout
115pub struct TimeoutIter<'a> {
116    inner: &'a ChangeIterator,
117    timeout: Duration,
118}
119
120impl<'a> Iterator for TimeoutIter<'a> {
121    type Item = ChangeEvent;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        self.inner.recv_timeout(self.timeout)
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use crate::model::SpeakerId;
132    use sonos_api::Service;
133    use std::thread;
134    use std::time::Instant;
135
136    fn create_test_event() -> ChangeEvent {
137        ChangeEvent {
138            speaker_id: SpeakerId::new("test-speaker"),
139            property_key: "volume",
140            service: Service::RenderingControl,
141            timestamp: Instant::now(),
142        }
143    }
144
145    #[test]
146    fn test_try_recv_empty() {
147        let (tx, rx) = mpsc::channel();
148        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
149
150        // Should return None when empty
151        assert!(iter.try_recv().is_none());
152
153        // Prevent unused warning
154        drop(tx);
155    }
156
157    #[test]
158    fn test_try_recv_with_event() {
159        let (tx, rx) = mpsc::channel();
160        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
161
162        // Send an event
163        tx.send(create_test_event()).unwrap();
164
165        // Should receive the event
166        let event = iter.try_recv().unwrap();
167        assert_eq!(event.property_key, "volume");
168        assert_eq!(event.speaker_id.as_str(), "test-speaker");
169
170        // Should return None now
171        assert!(iter.try_recv().is_none());
172    }
173
174    #[test]
175    fn test_recv_timeout() {
176        let (tx, rx) = mpsc::channel();
177        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
178
179        // Should timeout when empty
180        let start = Instant::now();
181        let result = iter.recv_timeout(Duration::from_millis(50));
182        assert!(result.is_none());
183        assert!(start.elapsed() >= Duration::from_millis(45));
184
185        // Prevent unused warning
186        drop(tx);
187    }
188
189    #[test]
190    fn test_recv_timeout_with_event() {
191        let (tx, rx) = mpsc::channel();
192        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
193
194        // Send event after a short delay
195        let tx_clone = tx.clone();
196        thread::spawn(move || {
197            thread::sleep(Duration::from_millis(10));
198            tx_clone.send(create_test_event()).unwrap();
199        });
200
201        // Should receive within timeout
202        let result = iter.recv_timeout(Duration::from_millis(100));
203        assert!(result.is_some());
204
205        drop(tx);
206    }
207
208    #[test]
209    fn test_try_iter() {
210        let (tx, rx) = mpsc::channel();
211        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
212
213        // Send multiple events
214        for _ in 0..3 {
215            tx.send(create_test_event()).unwrap();
216        }
217
218        // Should get all events via try_iter
219        let events: Vec<_> = iter.try_iter().collect();
220        assert_eq!(events.len(), 3);
221
222        // Should be empty now
223        assert!(iter.try_recv().is_none());
224
225        drop(tx);
226    }
227
228    #[test]
229    fn test_blocking_recv() {
230        let (tx, rx) = mpsc::channel();
231        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
232
233        // Send event from another thread
234        thread::spawn(move || {
235            thread::sleep(Duration::from_millis(10));
236            tx.send(create_test_event()).unwrap();
237        });
238
239        // Should block and receive
240        let event = iter.recv().unwrap();
241        assert_eq!(event.property_key, "volume");
242    }
243
244    #[test]
245    fn test_channel_closed() {
246        let (tx, rx) = mpsc::channel::<ChangeEvent>();
247        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
248
249        // Close the channel
250        drop(tx);
251
252        // Should return None
253        assert!(iter.recv().is_none());
254    }
255}