use std::time::Duration;
use crossbeam::channel::{self, Receiver, TryRecvError};
use crate::{
signal::Signal,
subscription::SubscriptionGuard,
traits::{CellValue, Watchable},
};
pub struct BoundedOutput<T> {
receiver: Receiver<T>,
_guard: SubscriptionGuard,
}
impl<T: CellValue> BoundedOutput<T> {
pub fn new<W: Watchable<T>>(source: &W, capacity: usize) -> Self {
let (sender, receiver) = channel::bounded(capacity);
let guard = source.subscribe(move |signal| {
if let Signal::Value(value) = signal {
let _ = sender.send((**value).clone());
}
});
Self {
receiver,
_guard: guard,
}
}
pub fn dropping<W: Watchable<T>>(source: &W, capacity: usize) -> Self {
let (sender, receiver) = channel::bounded(capacity);
let guard = source.subscribe(move |signal| {
if let Signal::Value(value) = signal {
if sender.try_send((**value).clone()).is_err() {
let _ = sender.try_send((**value).clone());
}
}
});
Self {
receiver,
_guard: guard,
}
}
pub fn recv(&self) -> Result<T, channel::RecvError> {
self.receiver.recv()
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, channel::RecvTimeoutError> {
self.receiver.recv_timeout(timeout)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.receiver.try_recv()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn iter(&self) -> channel::Iter<'_, T> {
self.receiver.iter()
}
pub fn try_iter(&self) -> channel::TryIter<'_, T> {
self.receiver.try_iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Cell, Mutable};
#[test]
fn test_bounded_output_basic() {
let cell = Cell::new(0);
let output = BoundedOutput::new(&cell, 16);
assert_eq!(output.try_recv(), Ok(0));
cell.set(1);
cell.set(2);
cell.set(3);
assert_eq!(output.try_recv(), Ok(1));
assert_eq!(output.try_recv(), Ok(2));
assert_eq!(output.try_recv(), Ok(3));
assert!(output.try_recv().is_err());
}
#[test]
fn test_bounded_output_timeout() {
let cell = Cell::new(42);
let output = BoundedOutput::new(&cell, 16);
assert_eq!(output.recv_timeout(Duration::from_millis(10)), Ok(42));
assert!(output.recv_timeout(Duration::from_millis(10)).is_err());
}
#[test]
fn test_bounded_output_len() {
let cell = Cell::new(0);
let output = BoundedOutput::new(&cell, 16);
assert_eq!(output.len(), 1);
cell.set(1);
cell.set(2);
assert_eq!(output.len(), 3);
let _ = output.try_recv();
assert_eq!(output.len(), 2);
}
#[test]
fn test_bounded_output_try_iter() {
let cell = Cell::new(0);
let output = BoundedOutput::new(&cell, 16);
cell.set(1);
cell.set(2);
let values: Vec<_> = output.try_iter().collect();
assert_eq!(values, vec![0, 1, 2]);
assert!(output.is_empty());
}
}