use crate::traits::*;
use crate::watcher::*;
use crate::releasable::*;
use crate::binding_context::*;
use futures::prelude::*;
use ::desync::*;
use std::sync::*;
pub fn bind_stream<S, Value, UpdateFn>(stream: S, initial_value: Value, update: UpdateFn) -> StreamBinding<Value>
where
S: 'static + Send + Stream + Unpin,
Value: 'static + Send + Clone + PartialEq,
UpdateFn: 'static + Send + FnMut(Value, S::Item) -> Value,
S::Item: Send,
{
let value = Arc::new(Mutex::new(initial_value));
let core = StreamBindingCore {
value: Arc::clone(&value),
notifications: vec![]
};
let stream = stream.ready_chunks(20);
let core = Arc::new(Desync::new(core));
let mut update = update;
pipe_in(Arc::clone(&core), stream,
move |core, next_items| {
for next_item in next_items {
let need_to_notify = {
let mut value = core.value.lock().unwrap();
let new_value = update((*value).clone(), next_item);
if new_value != *value {
*value = new_value;
true
} else {
false
}
};
if need_to_notify {
core.notifications.retain(|notify| notify.is_in_use());
core.notifications.iter().for_each(|notify| { notify.mark_as_changed(); });
}
}
Box::pin(future::ready(()))
});
StreamBinding {
core: core,
value: value
}
}
#[derive(Clone)]
pub struct StreamBinding<Value: Send> {
core: Arc<Desync<StreamBindingCore<Value>>>,
value: Arc<Mutex<Value>>
}
struct StreamBindingCore<Value>
where
Value: Send
{
value: Arc<Mutex<Value>>,
notifications: Vec<ReleasableNotifiable>
}
impl<Value> StreamBindingCore<Value>
where
Value: Send
{
pub fn filter_unused_notifications(&mut self) {
self.notifications.retain(|releasable| releasable.is_in_use());
}
}
impl<TValue> Bound for StreamBinding<TValue>
where
TValue: 'static + Send + Clone
{
type Value = TValue;
fn get(&self) -> Self::Value {
BindingContext::add_dependency(self.clone());
let value = self.value.lock().unwrap();
(*value).clone()
}
fn watch(&self, what: Arc<dyn Notifiable>) -> Arc<dyn Watcher<Self::Value>> {
let watch_binding = self.clone();
let (watcher, notifiable) = NotifyWatcher::new(move || watch_binding.get(), what);
self.core.sync(move |core| {
core.notifications.push(notifiable);
core.filter_unused_notifications();
});
Arc::new(watcher)
}
}
impl<Value: 'static + Send> Changeable for StreamBinding<Value> {
fn when_changed(&self, what: Arc<dyn Notifiable>) -> Box<dyn Releasable> {
let releasable = ReleasableNotifiable::new(what);
let notifiable = releasable.clone_as_owned();
self.core.sync(move |core| {
core.notifications.push(notifiable);
core.filter_unused_notifications();
});
Box::new(releasable)
}
}
#[cfg(test)]
mod test {
use super::*;
use super::super::notify_fn::*;
use futures::stream;
use futures::executor;
use futures::channel::mpsc;
use std::thread;
use std::time::Duration;
#[test]
pub fn stream_in_all_values() {
let stream = vec![1, 2, 3];
let stream = stream::iter(stream.into_iter());
let binding = bind_stream(stream, 0, |_old_value, new_value| new_value);
thread::sleep(Duration::from_millis(10));
assert!(binding.get() == 3);
}
#[test]
pub fn stream_processes_updates() {
let stream = vec![1, 2, 3];
let stream = stream::iter(stream.into_iter());
let binding = bind_stream(stream, 0, |_old_value, new_value| new_value + 42);
thread::sleep(Duration::from_millis(10));
assert!(binding.get() == 45);
}
#[test]
pub fn notifies_on_change() {
let (mut sender, receiver) = mpsc::channel(0);
let binding = bind_stream(receiver, 0, |_old_value, new_value| new_value);
let notified = Arc::new(Mutex::new(false));
let also_notified = Arc::clone(¬ified);
binding.when_changed(notify(move || *also_notified.lock().unwrap() = true)).keep_alive();
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == false);
executor::block_on(async {
sender.send(42).await.unwrap();
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == true);
assert!(binding.get() == 42);
})
}
#[test]
pub fn watcher_notifies_on_change() {
let (mut sender, receiver) = mpsc::channel(0);
let binding = bind_stream(receiver, 0, |_old_value, new_value| new_value);
let notified = Arc::new(Mutex::new(false));
let also_notified = Arc::clone(¬ified);
let watcher = binding.watch(notify(move || *also_notified.lock().unwrap() = true));
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == false);
watcher.get();
executor::block_on(async {
sender.send(42).await.unwrap();
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == true);
assert!(binding.get() == 42);
})
}
#[test]
pub fn no_notification_on_no_change() {
let (mut sender, receiver) = mpsc::channel(0);
let binding = bind_stream(receiver, 0, |_old_value, new_value| new_value);
let notified = Arc::new(Mutex::new(false));
let also_notified = Arc::clone(¬ified);
binding.when_changed(notify(move || *also_notified.lock().unwrap() = true)).keep_alive();
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == false);
executor::block_on(async {
sender.send(0).await.unwrap();
thread::sleep(Duration::from_millis(5));
assert!(*notified.lock().unwrap() == false);
assert!(binding.get() == 0);
});
}
}