use crate::Observer;
use crate::active_observation::ActiveObservation;
use std::fmt::Debug;
trait ErasedObserver: Debug + Send {
fn clone_box(&self) -> Box<dyn ErasedObserver>;
fn aggregate_poll(&mut self, observation: ActiveObservation) -> Result<ActiveObservation, ()>;
fn observe_if_distinct(&mut self) -> bool;
fn is_dirty(&self) -> bool;
}
impl<T> ErasedObserver for Observer<T>
where
T: PartialEq + Clone + Debug + Send + 'static,
{
fn clone_box(&self) -> Box<dyn ErasedObserver> {
Box::new(self.clone())
}
fn aggregate_poll(&mut self, observation: ActiveObservation) -> Result<ActiveObservation, ()> {
match self.aggregate_poll_impl(observation) {
Ok(f) => Ok(f.0), Err(_) => Err(()),
}
}
fn observe_if_distinct(&mut self) -> bool {
self.observe_if_distinct()
}
fn is_dirty(&self) -> bool {
self.is_dirty()
}
}
#[derive(Debug)]
pub struct AggregateObserver {
observers: Vec<Box<dyn ErasedObserver>>,
}
impl AggregateObserver {
pub fn new() -> Self {
AggregateObserver {
observers: Vec::new(),
}
}
pub fn add_observer<T>(&mut self, observer: Observer<T>)
where
T: 'static + PartialEq + Clone + Debug + Send,
{
self.observers.push(Box::new(observer));
}
pub async fn next(&mut self) -> usize {
loop {
let (active_observation, active_future) = crate::active_observation::observation();
for (o, observer) in &mut self.observers.iter_mut().enumerate() {
let r = observer.aggregate_poll(active_observation.clone());
match r {
Ok(future) => {
drop(future);
return o; }
Err(_) => continue, }
}
_ = active_future.await;
for (o, observer) in &mut self.observers.iter_mut().enumerate() {
if observer.observe_if_distinct() {
return o; }
}
}
}
pub fn is_dirty(&self) -> bool {
self.observers.iter().any(|e| e.is_dirty())
}
}
impl Clone for AggregateObserver {
fn clone(&self) -> Self {
Self {
observers: self.observers.iter().map(|obs| obs.clone_box()).collect(),
}
}
}
impl Default for AggregateObserver {
fn default() -> Self {
Self::new()
}
}
impl<T> From<Observer<T>> for AggregateObserver
where
T: 'static + PartialEq + Clone + Debug + Send,
{
fn from(observer: Observer<T>) -> Self {
let mut aggregate = AggregateObserver::new();
aggregate.add_observer(observer);
aggregate
}
}
#[cfg(test)]
mod tests {
use super::AggregateObserver;
use crate::Value;
use test_executors::async_test;
#[async_test]
async fn test_aggregate_observer() {
let value = Value::new(2);
let value2 = Value::new(0.3);
let mut o = AggregateObserver::new();
o.add_observer(value.observe());
o.add_observer(value2.observe());
let _ = o.next().await;
let _ = o.next().await;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
let value = value;
value.set(3);
std::mem::forget(value);
});
_ = o.next().await;
}
#[async_test]
async fn test_repeat_values() {
let v = Value::new(0);
let mut o = AggregateObserver::new();
o.add_observer(v.observe());
let o1 = o.next().await;
assert_eq!(o1, 0);
std::thread::spawn(move || {
let v = v;
for _ in 0..5 {
std::thread::sleep(std::time::Duration::from_millis(10));
v.set(0);
}
v.set(1);
std::mem::forget(v);
});
let begin = std::time::Instant::now();
let o2 = o.next().await;
assert!(
begin.elapsed().as_millis() > 49,
"Should have waited for the next value"
);
assert_eq!(o2, 0);
}
}