1use std::sync::{mpsc, Arc, Mutex};
31use std::time::Duration;
32
33use crate::state::ChangeEvent;
34
35pub struct ChangeIterator {
40 rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
41}
42
43impl ChangeIterator {
44 pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>) -> Self {
46 Self { rx }
47 }
48
49 pub fn recv(&self) -> Option<ChangeEvent> {
53 let event = self.rx.lock().ok()?.recv().ok();
54 if let Some(ref e) = event {
55 tracing::trace!(
56 "ChangeIterator::recv yielded {} for {}",
57 e.property_key,
58 e.speaker_id.as_str()
59 );
60 }
61 event
62 }
63
64 pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent> {
68 let event = self.rx.lock().ok()?.recv_timeout(timeout).ok();
69 if let Some(ref e) = event {
70 tracing::trace!(
71 "ChangeIterator::recv_timeout yielded {} for {}",
72 e.property_key,
73 e.speaker_id.as_str()
74 );
75 }
76 event
77 }
78
79 pub fn try_recv(&self) -> Option<ChangeEvent> {
83 let event = self.rx.lock().ok()?.try_recv().ok();
84 if let Some(ref e) = event {
85 tracing::trace!(
86 "ChangeIterator::try_recv yielded {} for {}",
87 e.property_key,
88 e.speaker_id.as_str()
89 );
90 }
91 event
92 }
93
94 pub fn try_iter(&self) -> TryIter<'_> {
99 TryIter { inner: self }
100 }
101
102 pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
107 TimeoutIter {
108 inner: self,
109 timeout,
110 }
111 }
112}
113
114impl Iterator for ChangeIterator {
115 type Item = ChangeEvent;
116
117 fn next(&mut self) -> Option<Self::Item> {
121 self.recv()
122 }
123}
124
125pub struct TryIter<'a> {
127 inner: &'a ChangeIterator,
128}
129
130impl<'a> Iterator for TryIter<'a> {
131 type Item = ChangeEvent;
132
133 fn next(&mut self) -> Option<Self::Item> {
134 self.inner.try_recv()
135 }
136}
137
138pub struct TimeoutIter<'a> {
140 inner: &'a ChangeIterator,
141 timeout: Duration,
142}
143
144impl<'a> Iterator for TimeoutIter<'a> {
145 type Item = ChangeEvent;
146
147 fn next(&mut self) -> Option<Self::Item> {
148 self.inner.recv_timeout(self.timeout)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::model::SpeakerId;
156 use sonos_api::Service;
157 use std::thread;
158 use std::time::Instant;
159
160 fn create_test_event() -> ChangeEvent {
161 ChangeEvent {
162 speaker_id: SpeakerId::new("test-speaker"),
163 property_key: "volume",
164 service: Service::RenderingControl,
165 timestamp: Instant::now(),
166 }
167 }
168
169 #[test]
170 fn test_try_recv_empty() {
171 let (tx, rx) = mpsc::channel();
172 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
173
174 assert!(iter.try_recv().is_none());
176
177 drop(tx);
179 }
180
181 #[test]
182 fn test_try_recv_with_event() {
183 let (tx, rx) = mpsc::channel();
184 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
185
186 tx.send(create_test_event()).unwrap();
188
189 let event = iter.try_recv().unwrap();
191 assert_eq!(event.property_key, "volume");
192 assert_eq!(event.speaker_id.as_str(), "test-speaker");
193
194 assert!(iter.try_recv().is_none());
196 }
197
198 #[test]
199 fn test_recv_timeout() {
200 let (tx, rx) = mpsc::channel();
201 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
202
203 let start = Instant::now();
205 let result = iter.recv_timeout(Duration::from_millis(50));
206 assert!(result.is_none());
207 assert!(start.elapsed() >= Duration::from_millis(45));
208
209 drop(tx);
211 }
212
213 #[test]
214 fn test_recv_timeout_with_event() {
215 let (tx, rx) = mpsc::channel();
216 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
217
218 let tx_clone = tx.clone();
220 thread::spawn(move || {
221 thread::sleep(Duration::from_millis(10));
222 tx_clone.send(create_test_event()).unwrap();
223 });
224
225 let result = iter.recv_timeout(Duration::from_millis(100));
227 assert!(result.is_some());
228
229 drop(tx);
230 }
231
232 #[test]
233 fn test_try_iter() {
234 let (tx, rx) = mpsc::channel();
235 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
236
237 for _ in 0..3 {
239 tx.send(create_test_event()).unwrap();
240 }
241
242 let events: Vec<_> = iter.try_iter().collect();
244 assert_eq!(events.len(), 3);
245
246 assert!(iter.try_recv().is_none());
248
249 drop(tx);
250 }
251
252 #[test]
253 fn test_blocking_recv() {
254 let (tx, rx) = mpsc::channel();
255 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
256
257 thread::spawn(move || {
259 thread::sleep(Duration::from_millis(10));
260 tx.send(create_test_event()).unwrap();
261 });
262
263 let event = iter.recv().unwrap();
265 assert_eq!(event.property_key, "volume");
266 }
267
268 #[test]
269 fn test_channel_closed() {
270 let (tx, rx) = mpsc::channel::<ChangeEvent>();
271 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
272
273 drop(tx);
275
276 assert!(iter.recv().is_none());
278 }
279}