use crate::Observer;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
trait ErasedObserver: Debug + Send {
fn clone_box(&self) -> Box<dyn ErasedObserver>;
fn observe_if_distinct(&mut self) -> bool;
fn register(&self, waker: &Waker);
fn is_dirty(&self) -> bool;
}
impl<T> ErasedObserver for Observer<T>
where
T: PartialEq + Clone + Debug + Send + 'static,
{
fn clone_box(&self) -> Box<dyn ErasedObserver> {
Box::new(self.clone())
}
fn observe_if_distinct(&mut self) -> bool {
self.observe_if_distinct()
}
fn register(&self, waker: &Waker) {
self.active_observation.register(waker)
}
fn is_dirty(&self) -> bool {
self.is_dirty()
}
}
#[derive(Debug)]
pub struct AggregateObserver {
observers: Vec<Box<dyn ErasedObserver>>,
}
impl AggregateObserver {
pub fn new() -> Self {
AggregateObserver {
observers: Vec::new(),
}
}
pub fn add_observer<T>(&mut self, observer: Observer<T>)
where
T: 'static + PartialEq + Clone + Debug + Send,
{
self.observers.push(Box::new(observer));
}
pub fn is_dirty(&self) -> bool {
self.observers.iter().any(|e| e.is_dirty())
}
}
impl futures_core::Stream for AggregateObserver {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
for (o, observer) in self.observers.iter_mut().enumerate() {
observer.register(cx.waker());
if observer.observe_if_distinct() {
return Poll::Ready(Some(o)); }
}
Poll::Pending
}
}
impl Clone for AggregateObserver {
fn clone(&self) -> Self {
Self {
observers: self.observers.iter().map(|obs| obs.clone_box()).collect(),
}
}
}
impl Default for AggregateObserver {
fn default() -> Self {
Self::new()
}
}
impl Display for AggregateObserver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AggregateObserver({} observers)", self.observers.len())
}
}
impl<T> From<Observer<T>> for AggregateObserver
where
T: 'static + PartialEq + Clone + Debug + Send,
{
fn from(observer: Observer<T>) -> Self {
let mut aggregate = AggregateObserver::new();
aggregate.add_observer(observer);
aggregate
}
}
#[cfg(test)]
mod tests {
use super::AggregateObserver;
use crate::Value;
use futures_util::StreamExt;
use test_executors::async_test;
#[cfg(not(target_arch = "wasm32"))]
use std::thread;
#[cfg(target_arch = "wasm32")]
use wasm_thread as thread;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
#[cfg(target_arch = "wasm32")]
use web_time::Instant;
#[async_test]
async fn test_aggregate_observer() {
let value = Value::new(2);
let value2 = Value::new(0.3);
let mut o = AggregateObserver::new();
o.add_observer(value.observe());
o.add_observer(value2.observe());
let _ = o.next().await;
let _ = o.next().await;
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(100));
let value = value;
value.set(3);
std::mem::forget(value);
});
_ = o.next().await;
}
#[async_test]
async fn test_repeat_values() {
let v = Value::new(0);
let mut o = AggregateObserver::new();
o.add_observer(v.observe());
let o1 = o.next().await;
assert_eq!(o1, Some(0));
thread::spawn(move || {
let v = v;
for _ in 0..5 {
thread::sleep(std::time::Duration::from_millis(10));
v.set(0);
}
v.set(1);
std::mem::forget(v);
});
let begin = Instant::now();
let o2 = o.next().await;
assert!(
begin.elapsed().as_millis() > 49,
"Should have waited for the next value"
);
assert_eq!(o2, Some(0));
}
#[test]
fn test_aggregate_display() {
let value = Value::new(42);
let mut aggregate = AggregateObserver::new();
let empty_str = format!("{}", aggregate);
assert_eq!(empty_str, "AggregateObserver(0 observers)");
aggregate.add_observer(value.observe());
let one_str = format!("{}", aggregate);
assert_eq!(one_str, "AggregateObserver(1 observers)");
let value2 = Value::new("test");
aggregate.add_observer(value2.observe());
let two_str = format!("{}", aggregate);
assert_eq!(two_str, "AggregateObserver(2 observers)");
}
}