use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use crate::state::ChangeEvent;
pub struct ChangeIterator {
rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
}
impl ChangeIterator {
pub(crate) fn new(rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>) -> Self {
Self { rx }
}
pub fn recv(&self) -> Option<ChangeEvent> {
let event = self.rx.lock().ok()?.recv().ok();
if let Some(ref e) = event {
tracing::trace!(
"ChangeIterator::recv yielded {} for {}",
e.property_key,
e.speaker_id.as_str()
);
}
event
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<ChangeEvent> {
let event = self.rx.lock().ok()?.recv_timeout(timeout).ok();
if let Some(ref e) = event {
tracing::trace!(
"ChangeIterator::recv_timeout yielded {} for {}",
e.property_key,
e.speaker_id.as_str()
);
}
event
}
pub fn try_recv(&self) -> Option<ChangeEvent> {
let event = self.rx.lock().ok()?.try_recv().ok();
if let Some(ref e) = event {
tracing::trace!(
"ChangeIterator::try_recv yielded {} for {}",
e.property_key,
e.speaker_id.as_str()
);
}
event
}
pub fn try_iter(&self) -> TryIter<'_> {
TryIter { inner: self }
}
pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
inner: self,
timeout,
}
}
}
impl Iterator for ChangeIterator {
type Item = ChangeEvent;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
pub struct TryIter<'a> {
inner: &'a ChangeIterator,
}
impl<'a> Iterator for TryIter<'a> {
type Item = ChangeEvent;
fn next(&mut self) -> Option<Self::Item> {
self.inner.try_recv()
}
}
pub struct TimeoutIter<'a> {
inner: &'a ChangeIterator,
timeout: Duration,
}
impl<'a> Iterator for TimeoutIter<'a> {
type Item = ChangeEvent;
fn next(&mut self) -> Option<Self::Item> {
self.inner.recv_timeout(self.timeout)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::SpeakerId;
use sonos_api::Service;
use std::thread;
use std::time::Instant;
fn create_test_event() -> ChangeEvent {
ChangeEvent {
speaker_id: SpeakerId::new("test-speaker"),
property_key: "volume",
service: Service::RenderingControl,
timestamp: Instant::now(),
}
}
#[test]
fn test_try_recv_empty() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
assert!(iter.try_recv().is_none());
drop(tx);
}
#[test]
fn test_try_recv_with_event() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
tx.send(create_test_event()).unwrap();
let event = iter.try_recv().unwrap();
assert_eq!(event.property_key, "volume");
assert_eq!(event.speaker_id.as_str(), "test-speaker");
assert!(iter.try_recv().is_none());
}
#[test]
fn test_recv_timeout() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
let start = Instant::now();
let result = iter.recv_timeout(Duration::from_millis(50));
assert!(result.is_none());
assert!(start.elapsed() >= Duration::from_millis(45));
drop(tx);
}
#[test]
fn test_recv_timeout_with_event() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
let tx_clone = tx.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
tx_clone.send(create_test_event()).unwrap();
});
let result = iter.recv_timeout(Duration::from_millis(100));
assert!(result.is_some());
drop(tx);
}
#[test]
fn test_try_iter() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
for _ in 0..3 {
tx.send(create_test_event()).unwrap();
}
let events: Vec<_> = iter.try_iter().collect();
assert_eq!(events.len(), 3);
assert!(iter.try_recv().is_none());
drop(tx);
}
#[test]
fn test_blocking_recv() {
let (tx, rx) = mpsc::channel();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
tx.send(create_test_event()).unwrap();
});
let event = iter.recv().unwrap();
assert_eq!(event.property_key, "volume");
}
#[test]
fn test_channel_closed() {
let (tx, rx) = mpsc::channel::<ChangeEvent>();
let iter = ChangeIterator::new(Arc::new(Mutex::new(rx)));
drop(tx);
assert!(iter.recv().is_none());
}
}