1use std::sync::{mpsc, Arc, Mutex};
31use std::time::Duration;
32
33use crate::state::ChangeEvent;
34
35pub struct ChangeIterator {
40 rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
41}
42
43impl ChangeIterator {
44 pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>) -> Self {
46 Self { rx }
47 }
48
49 pub fn recv(&self) -> Option<ChangeEvent> {
53 self.rx.lock().ok()?.recv().ok()
54 }
55
56 pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent> {
60 self.rx.lock().ok()?.recv_timeout(timeout).ok()
61 }
62
63 pub fn try_recv(&self) -> Option<ChangeEvent> {
67 self.rx.lock().ok()?.try_recv().ok()
68 }
69
70 pub fn try_iter(&self) -> TryIter<'_> {
75 TryIter { inner: self }
76 }
77
78 pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
83 TimeoutIter {
84 inner: self,
85 timeout,
86 }
87 }
88}
89
90impl Iterator for ChangeIterator {
91 type Item = ChangeEvent;
92
93 fn next(&mut self) -> Option<Self::Item> {
97 self.recv()
98 }
99}
100
101pub struct TryIter<'a> {
103 inner: &'a ChangeIterator,
104}
105
106impl<'a> Iterator for TryIter<'a> {
107 type Item = ChangeEvent;
108
109 fn next(&mut self) -> Option<Self::Item> {
110 self.inner.try_recv()
111 }
112}
113
114pub struct TimeoutIter<'a> {
116 inner: &'a ChangeIterator,
117 timeout: Duration,
118}
119
120impl<'a> Iterator for TimeoutIter<'a> {
121 type Item = ChangeEvent;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 self.inner.recv_timeout(self.timeout)
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use crate::model::SpeakerId;
132 use sonos_api::Service;
133 use std::thread;
134 use std::time::Instant;
135
136 fn create_test_event() -> ChangeEvent {
137 ChangeEvent {
138 speaker_id: SpeakerId::new("test-speaker"),
139 property_key: "volume",
140 service: Service::RenderingControl,
141 timestamp: Instant::now(),
142 }
143 }
144
145 #[test]
146 fn test_try_recv_empty() {
147 let (tx, rx) = mpsc::channel();
148 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
149
150 assert!(iter.try_recv().is_none());
152
153 drop(tx);
155 }
156
157 #[test]
158 fn test_try_recv_with_event() {
159 let (tx, rx) = mpsc::channel();
160 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
161
162 tx.send(create_test_event()).unwrap();
164
165 let event = iter.try_recv().unwrap();
167 assert_eq!(event.property_key, "volume");
168 assert_eq!(event.speaker_id.as_str(), "test-speaker");
169
170 assert!(iter.try_recv().is_none());
172 }
173
174 #[test]
175 fn test_recv_timeout() {
176 let (tx, rx) = mpsc::channel();
177 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
178
179 let start = Instant::now();
181 let result = iter.recv_timeout(Duration::from_millis(50));
182 assert!(result.is_none());
183 assert!(start.elapsed() >= Duration::from_millis(45));
184
185 drop(tx);
187 }
188
189 #[test]
190 fn test_recv_timeout_with_event() {
191 let (tx, rx) = mpsc::channel();
192 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
193
194 let tx_clone = tx.clone();
196 thread::spawn(move || {
197 thread::sleep(Duration::from_millis(10));
198 tx_clone.send(create_test_event()).unwrap();
199 });
200
201 let result = iter.recv_timeout(Duration::from_millis(100));
203 assert!(result.is_some());
204
205 drop(tx);
206 }
207
208 #[test]
209 fn test_try_iter() {
210 let (tx, rx) = mpsc::channel();
211 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
212
213 for _ in 0..3 {
215 tx.send(create_test_event()).unwrap();
216 }
217
218 let events: Vec<_> = iter.try_iter().collect();
220 assert_eq!(events.len(), 3);
221
222 assert!(iter.try_recv().is_none());
224
225 drop(tx);
226 }
227
228 #[test]
229 fn test_blocking_recv() {
230 let (tx, rx) = mpsc::channel();
231 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
232
233 thread::spawn(move || {
235 thread::sleep(Duration::from_millis(10));
236 tx.send(create_test_event()).unwrap();
237 });
238
239 let event = iter.recv().unwrap();
241 assert_eq!(event.property_key, "volume");
242 }
243
244 #[test]
245 fn test_channel_closed() {
246 let (tx, rx) = mpsc::channel::<ChangeEvent>();
247 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
248
249 drop(tx);
251
252 assert!(iter.recv().is_none());
254 }
255}