use super::traits::*;
use super::notify_fn::*;
use futures::*;
use futures::task;
use futures::task::{Poll};
use std::pin::{Pin};
use std::sync::*;
#[derive(Copy, Clone)]
enum FollowState {
Unchanged,
Changed
}
struct FollowCore<Binding: Bound> {
state: FollowState,
notify: Option<task::Waker>,
binding: Arc<Binding>,
}
pub struct FollowStream<Binding>
where
Binding: Bound,
Binding::Value: Send,
{
core: Arc<Mutex<FollowCore<Binding>>>,
_watcher: Box<dyn Releasable>,
}
impl<Binding> Stream for FollowStream<Binding>
where
Binding: 'static + Bound,
Binding::Value: 'static + Send,
{
type Item = Binding::Value;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let binding = {
let mut core = self.core.lock().unwrap();
match core.state {
FollowState::Unchanged => {
core.notify = Some(cx.waker().clone());
None
},
FollowState::Changed => {
core.state = FollowState::Unchanged;
Some(Arc::clone(&core.binding))
}
}
};
if let Some(binding) = binding {
Poll::Ready(Some(binding.get()))
} else {
Poll::Pending
}
}
}
pub fn follow<Binding>(binding: Binding) -> FollowStream<Binding>
where
Binding: 'static + Bound,
Binding::Value: 'static + Send,
{
let core = FollowCore {
state: FollowState::Changed,
notify: None,
binding: Arc::new(binding),
};
let core = Arc::new(Mutex::new(core));
let weak_core = Arc::downgrade(&core);
let watcher = {
let core = core.lock().unwrap();
core.binding.when_changed(notify(move || {
if let Some(core) = weak_core.upgrade() {
let task = {
let mut core = core.lock().unwrap();
core.state = FollowState::Changed;
core.notify.take()
};
if let Some(task) = task {
task.wake();
}
}
}))
};
FollowStream {
core: core,
_watcher: watcher
}
}
#[cfg(test)]
mod test {
use super::*;
use super::super::*;
use futures::executor;
use futures::task::{ArcWake, Context, waker_ref};
use ::desync::*;
use std::thread;
use std::time::Duration;
struct NotifyNothing;
impl ArcWake for NotifyNothing {
fn wake_by_ref(_arc_self: &Arc<Self>) {
}
}
#[test]
fn follow_stream_has_initial_value() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let mut stream = follow(bind_ref);
executor::block_on(async {
assert!(stream.next().await == Some(1));
});
}
#[test]
fn follow_stream_updates() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let mut stream = follow(bind_ref);
executor::block_on(async {
assert!(stream.next().await == Some(1));
binding.set(2);
assert!(stream.next().await == Some(2));
});
}
#[test]
fn computed_updates_during_read() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let computed = computed(move || {
let val = bind_ref.get();
thread::sleep(Duration::from_millis(300));
val
});
let mut stream = follow(computed);
let reader = Desync::new(vec![]);
let read_values = reader.after(async move {
let result = vec![
stream.next().await,
stream.next().await
];
result
}, |val, read_val| { *val = read_val; });
thread::sleep(Duration::from_millis(10));
binding.set(2);
let values_read_from_stream = reader.sync(|val| val.clone());
assert!(values_read_from_stream[0] == Some(1));
assert!(values_read_from_stream[1] == Some(2));
executor::block_on(read_values).unwrap();
}
#[test]
fn stream_is_unready_after_first_read() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let waker = Arc::new(NotifyNothing);
let waker = waker_ref(&waker);
let mut context = Context::from_waker(&waker);
let mut stream = follow(bind_ref);
assert!(stream.poll_next_unpin(&mut context) == Poll::Ready(Some(1)));
assert!(stream.poll_next_unpin(&mut context) == Poll::Pending);
}
#[test]
fn stream_is_immediately_ready_after_write() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let waker = Arc::new(NotifyNothing);
let waker = waker_ref(&waker);
let mut context = Context::from_waker(&waker);
let mut stream = follow(bind_ref);
assert!(stream.poll_next_unpin(&mut context) == Poll::Ready(Some(1)));
binding.set(2);
assert!(stream.poll_next_unpin(&mut context) == Poll::Ready(Some(2)));
}
#[test]
fn will_wake_when_binding_is_updated() {
let binding = bind(1);
let bind_ref = BindRef::from(binding.clone());
let mut stream = follow(bind_ref);
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
binding.set(2);
});
executor::block_on(async {
assert!(stream.next().await == Some(1));
assert!(stream.next().await == Some(2));
})
}
}