1use std::sync::{mpsc, Arc, Mutex};
9use std::time::Duration;
10
11use crate::event::ChangeEvent;
12
13pub struct ChangeIterator<Id> {
37 rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>,
38}
39
40impl<Id> ChangeIterator<Id> {
41 pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>) -> Self {
43 Self { rx }
44 }
45
46 pub fn recv(&self) -> Option<ChangeEvent<Id>> {
50 self.rx.lock().ok()?.recv().ok()
51 }
52
53 pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent<Id>> {
57 self.rx.lock().ok()?.recv_timeout(timeout).ok()
58 }
59
60 pub fn try_recv(&self) -> Option<ChangeEvent<Id>> {
64 self.rx.lock().ok()?.try_recv().ok()
65 }
66
67 pub fn try_iter(&self) -> TryIter<'_, Id> {
72 TryIter { inner: self }
73 }
74
75 pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_, Id> {
80 TimeoutIter {
81 inner: self,
82 timeout,
83 }
84 }
85}
86
87impl<Id> Iterator for ChangeIterator<Id> {
88 type Item = ChangeEvent<Id>;
89
90 fn next(&mut self) -> Option<Self::Item> {
94 self.recv()
95 }
96}
97
98pub struct TryIter<'a, Id> {
100 inner: &'a ChangeIterator<Id>,
101}
102
103impl<'a, Id> Iterator for TryIter<'a, Id> {
104 type Item = ChangeEvent<Id>;
105
106 fn next(&mut self) -> Option<Self::Item> {
107 self.inner.try_recv()
108 }
109}
110
111pub struct TimeoutIter<'a, Id> {
113 inner: &'a ChangeIterator<Id>,
114 timeout: Duration,
115}
116
117impl<'a, Id> Iterator for TimeoutIter<'a, Id> {
118 type Item = ChangeEvent<Id>;
119
120 fn next(&mut self) -> Option<Self::Item> {
121 self.inner.recv_timeout(self.timeout)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use std::thread;
129 use std::time::Instant;
130
131 fn create_test_event() -> ChangeEvent<String> {
132 ChangeEvent::new("test-entity".to_string(), "test-property")
133 }
134
135 #[test]
136 fn test_try_recv_empty() {
137 let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
138 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
139
140 assert!(iter.try_recv().is_none());
142
143 drop(tx);
145 }
146
147 #[test]
148 fn test_try_recv_with_event() {
149 let (tx, rx) = mpsc::channel();
150 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
151
152 tx.send(create_test_event()).unwrap();
154
155 let event = iter.try_recv().unwrap();
157 assert_eq!(event.property_key, "test-property");
158 assert_eq!(event.entity_id, "test-entity");
159
160 assert!(iter.try_recv().is_none());
162 }
163
164 #[test]
165 fn test_recv_timeout() {
166 let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
167 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
168
169 let start = Instant::now();
171 let result = iter.recv_timeout(Duration::from_millis(50));
172 assert!(result.is_none());
173 assert!(start.elapsed() >= Duration::from_millis(45));
174
175 drop(tx);
177 }
178
179 #[test]
180 fn test_recv_timeout_with_event() {
181 let (tx, rx) = mpsc::channel();
182 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
183
184 let tx_clone = tx.clone();
186 thread::spawn(move || {
187 thread::sleep(Duration::from_millis(10));
188 tx_clone.send(create_test_event()).unwrap();
189 });
190
191 let result = iter.recv_timeout(Duration::from_millis(100));
193 assert!(result.is_some());
194
195 drop(tx);
196 }
197
198 #[test]
199 fn test_try_iter() {
200 let (tx, rx) = mpsc::channel();
201 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
202
203 for _ in 0..3 {
205 tx.send(create_test_event()).unwrap();
206 }
207
208 let events: Vec<_> = iter.try_iter().collect();
210 assert_eq!(events.len(), 3);
211
212 assert!(iter.try_recv().is_none());
214
215 drop(tx);
216 }
217
218 #[test]
219 fn test_blocking_recv() {
220 let (tx, rx) = mpsc::channel();
221 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
222
223 thread::spawn(move || {
225 thread::sleep(Duration::from_millis(10));
226 tx.send(create_test_event()).unwrap();
227 });
228
229 let event = iter.recv().unwrap();
231 assert_eq!(event.property_key, "test-property");
232 }
233
234 #[test]
235 fn test_channel_closed() {
236 let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
237 let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
238
239 drop(tx);
241
242 assert!(iter.recv().is_none());
244 }
245}