Skip to main content

state_store/
iter.rs

1//! Blocking iterator over property change events
2//!
3//! Provides various iteration patterns for consuming change events:
4//! - Blocking: `recv()`, `for event in iter`
5//! - Non-blocking: `try_recv()`, `try_iter()`
6//! - Timeout: `recv_timeout()`, `timeout_iter()`
7
8use std::sync::{mpsc, Arc, Mutex};
9use std::time::Duration;
10
11use crate::event::ChangeEvent;
12
13/// Blocking iterator over property change events
14///
15/// Receives change events for watched properties via `std::sync::mpsc`.
16/// All methods are synchronous - no async/await required.
17///
18/// # Example
19///
20/// ```rust,ignore
21/// // Blocking iteration
22/// for event in store.iter() {
23///     println!("{} changed on {:?}", event.property_key, event.entity_id);
24/// }
25///
26/// // Non-blocking check
27/// for event in store.iter().try_iter() {
28///     println!("{} changed", event.property_key);
29/// }
30///
31/// // With timeout
32/// if let Some(event) = store.iter().recv_timeout(Duration::from_secs(1)) {
33///     println!("Got event: {:?}", event);
34/// }
35/// ```
36pub struct ChangeIterator<Id> {
37    rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>,
38}
39
40impl<Id> ChangeIterator<Id> {
41    /// Create a new ChangeIterator from a shared receiver
42    pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>) -> Self {
43        Self { rx }
44    }
45
46    /// Block until the next event is available
47    ///
48    /// Returns `None` if the channel is closed.
49    pub fn recv(&self) -> Option<ChangeEvent<Id>> {
50        self.rx.lock().ok()?.recv().ok()
51    }
52
53    /// Block until the next event or timeout expires
54    ///
55    /// Returns `None` if the timeout expires or channel is closed.
56    pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent<Id>> {
57        self.rx.lock().ok()?.recv_timeout(timeout).ok()
58    }
59
60    /// Try to receive an event without blocking
61    ///
62    /// Returns `None` if no event is currently available.
63    pub fn try_recv(&self) -> Option<ChangeEvent<Id>> {
64        self.rx.lock().ok()?.try_recv().ok()
65    }
66
67    /// Get a non-blocking iterator over currently available events
68    ///
69    /// Returns an iterator that yields all events currently in the queue
70    /// without blocking. Useful for batch processing.
71    pub fn try_iter(&self) -> TryIter<'_, Id> {
72        TryIter { inner: self }
73    }
74
75    /// Get a blocking iterator with timeout
76    ///
77    /// Returns an iterator that blocks for up to `timeout` on each call
78    /// to `next()`. Stops when timeout expires without events.
79    pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_, Id> {
80        TimeoutIter {
81            inner: self,
82            timeout,
83        }
84    }
85}
86
87impl<Id> Iterator for ChangeIterator<Id> {
88    type Item = ChangeEvent<Id>;
89
90    /// Block until the next change event
91    ///
92    /// Returns `None` if the channel is closed.
93    fn next(&mut self) -> Option<Self::Item> {
94        self.recv()
95    }
96}
97
98/// Non-blocking iterator over currently available events
99pub struct TryIter<'a, Id> {
100    inner: &'a ChangeIterator<Id>,
101}
102
103impl<'a, Id> Iterator for TryIter<'a, Id> {
104    type Item = ChangeEvent<Id>;
105
106    fn next(&mut self) -> Option<Self::Item> {
107        self.inner.try_recv()
108    }
109}
110
111/// Blocking iterator with timeout
112pub struct TimeoutIter<'a, Id> {
113    inner: &'a ChangeIterator<Id>,
114    timeout: Duration,
115}
116
117impl<'a, Id> Iterator for TimeoutIter<'a, Id> {
118    type Item = ChangeEvent<Id>;
119
120    fn next(&mut self) -> Option<Self::Item> {
121        self.inner.recv_timeout(self.timeout)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use std::thread;
129    use std::time::Instant;
130
131    fn create_test_event() -> ChangeEvent<String> {
132        ChangeEvent::new("test-entity".to_string(), "test-property")
133    }
134
135    #[test]
136    fn test_try_recv_empty() {
137        let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
138        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
139
140        // Should return None when empty
141        assert!(iter.try_recv().is_none());
142
143        // Prevent unused warning
144        drop(tx);
145    }
146
147    #[test]
148    fn test_try_recv_with_event() {
149        let (tx, rx) = mpsc::channel();
150        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
151
152        // Send an event
153        tx.send(create_test_event()).unwrap();
154
155        // Should receive the event
156        let event = iter.try_recv().unwrap();
157        assert_eq!(event.property_key, "test-property");
158        assert_eq!(event.entity_id, "test-entity");
159
160        // Should return None now
161        assert!(iter.try_recv().is_none());
162    }
163
164    #[test]
165    fn test_recv_timeout() {
166        let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
167        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
168
169        // Should timeout when empty
170        let start = Instant::now();
171        let result = iter.recv_timeout(Duration::from_millis(50));
172        assert!(result.is_none());
173        assert!(start.elapsed() >= Duration::from_millis(45));
174
175        // Prevent unused warning
176        drop(tx);
177    }
178
179    #[test]
180    fn test_recv_timeout_with_event() {
181        let (tx, rx) = mpsc::channel();
182        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
183
184        // Send event after a short delay
185        let tx_clone = tx.clone();
186        thread::spawn(move || {
187            thread::sleep(Duration::from_millis(10));
188            tx_clone.send(create_test_event()).unwrap();
189        });
190
191        // Should receive within timeout
192        let result = iter.recv_timeout(Duration::from_millis(100));
193        assert!(result.is_some());
194
195        drop(tx);
196    }
197
198    #[test]
199    fn test_try_iter() {
200        let (tx, rx) = mpsc::channel();
201        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
202
203        // Send multiple events
204        for _ in 0..3 {
205            tx.send(create_test_event()).unwrap();
206        }
207
208        // Should get all events via try_iter
209        let events: Vec<_> = iter.try_iter().collect();
210        assert_eq!(events.len(), 3);
211
212        // Should be empty now
213        assert!(iter.try_recv().is_none());
214
215        drop(tx);
216    }
217
218    #[test]
219    fn test_blocking_recv() {
220        let (tx, rx) = mpsc::channel();
221        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
222
223        // Send event from another thread
224        thread::spawn(move || {
225            thread::sleep(Duration::from_millis(10));
226            tx.send(create_test_event()).unwrap();
227        });
228
229        // Should block and receive
230        let event = iter.recv().unwrap();
231        assert_eq!(event.property_key, "test-property");
232    }
233
234    #[test]
235    fn test_channel_closed() {
236        let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
237        let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
238
239        // Close the channel
240        drop(tx);
241
242        // Should return None
243        assert!(iter.recv().is_none());
244    }
245}