use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use crate::event::ChangeEvent;
pub struct ChangeIterator<Id> {
rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>,
}
impl<Id> ChangeIterator<Id> {
pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent<Id>>>>) -> Self {
Self { rx }
}
pub fn recv(&self) -> Option<ChangeEvent<Id>> {
self.rx.lock().ok()?.recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent<Id>> {
self.rx.lock().ok()?.recv_timeout(timeout).ok()
}
pub fn try_recv(&self) -> Option<ChangeEvent<Id>> {
self.rx.lock().ok()?.try_recv().ok()
}
pub fn try_iter(&self) -> TryIter<'_, Id> {
TryIter { inner: self }
}
pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_, Id> {
TimeoutIter {
inner: self,
timeout,
}
}
}
impl<Id> Iterator for ChangeIterator<Id> {
type Item = ChangeEvent<Id>;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
pub struct TryIter<'a, Id> {
inner: &'a ChangeIterator<Id>,
}
impl<'a, Id> Iterator for TryIter<'a, Id> {
type Item = ChangeEvent<Id>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.try_recv()
}
}
pub struct TimeoutIter<'a, Id> {
inner: &'a ChangeIterator<Id>,
timeout: Duration,
}
impl<'a, Id> Iterator for TimeoutIter<'a, Id> {
type Item = ChangeEvent<Id>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.recv_timeout(self.timeout)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Instant;
fn create_test_event() -> ChangeEvent<String> {
ChangeEvent::new("test-entity".to_string(), "test-property")
}
#[test]
fn test_try_recv_empty() {
let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
assert!(iter.try_recv().is_none());
drop(tx);
}
#[test]
fn test_try_recv_with_event() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
tx.send(create_test_event()).unwrap();
let event = iter.try_recv().unwrap();
assert_eq!(event.property_key, "test-property");
assert_eq!(event.entity_id, "test-entity");
assert!(iter.try_recv().is_none());
}
#[test]
fn test_recv_timeout() {
let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
let start = Instant::now();
let result = iter.recv_timeout(Duration::from_millis(50));
assert!(result.is_none());
assert!(start.elapsed() >= Duration::from_millis(45));
drop(tx);
}
#[test]
fn test_recv_timeout_with_event() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
let tx_clone = tx.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
tx_clone.send(create_test_event()).unwrap();
});
let result = iter.recv_timeout(Duration::from_millis(100));
assert!(result.is_some());
drop(tx);
}
#[test]
fn test_try_iter() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
for _ in 0..3 {
tx.send(create_test_event()).unwrap();
}
let events: Vec<_> = iter.try_iter().collect();
assert_eq!(events.len(), 3);
assert!(iter.try_recv().is_none());
drop(tx);
}
#[test]
fn test_blocking_recv() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
tx.send(create_test_event()).unwrap();
});
let event = iter.recv().unwrap();
assert_eq!(event.property_key, "test-property");
}
#[test]
fn test_channel_closed() {
let (tx, rx) = mpsc::channel::<ChangeEvent<String>>();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
drop(tx);
assert!(iter.recv().is_none());
}
}