use crate::signal::{self, sample_raw, Signal};
use crate::source::{with_weak, CallbackError, CallbackResult, Source};
use crate::transaction::commit;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::thread;
mod coalesce;
pub struct Sink<A> {
source: Arc<RwLock<Source<A>>>,
}
impl<A> Clone for Sink<A> {
fn clone(&self) -> Sink<A> {
Sink {
source: self.source.clone(),
}
}
}
impl<A: Send + Sync> Sink<A> {
pub fn new() -> Sink<A> {
Sink {
source: Arc::new(RwLock::new(Source::new())),
}
}
pub fn stream(&self) -> Stream<A> {
Stream {
source: self.source.clone(),
keep_alive: Box::new(()),
}
}
}
impl<A: Send + Sync + Clone + 'static> Sink<A> {
pub fn send_async(&self, a: A) {
let clone = self.clone();
thread::spawn(move || clone.send(a));
}
pub fn feed<I: IntoIterator<Item = A>>(&self, iterator: I) {
for event in iterator {
self.send(event);
}
}
pub fn feed_async<I: IntoIterator<Item = A> + Send + 'static>(&self, iterator: I) {
let clone = self.clone();
thread::spawn(move || clone.feed(iterator));
}
pub fn send(&self, a: A) {
commit(|| self.source.write().unwrap().send(a))
}
}
pub trait BoxClone: Sync + Send {
fn box_clone(&self) -> Box<dyn BoxClone>;
}
impl<T: Sync + Send + Clone + 'static> BoxClone for T {
fn box_clone(&self) -> Box<dyn BoxClone> {
Box::new(self.clone())
}
}
pub fn source<A>(stream: &Stream<A>) -> &Arc<RwLock<Source<A>>> {
&stream.source
}
pub struct Stream<A> {
source: Arc<RwLock<Source<A>>>,
#[allow(dead_code)]
keep_alive: Box<dyn BoxClone>,
}
impl<A> Clone for Stream<A> {
fn clone(&self) -> Stream<A> {
Stream {
source: self.source.clone(),
keep_alive: self.keep_alive.box_clone(),
}
}
}
impl<A: Clone + Send + Sync + 'static> Stream<A> {
pub fn never() -> Stream<A> {
Stream {
source: Arc::new(RwLock::new(Source::new())),
keep_alive: Box::new(()),
}
}
pub fn map<B, F>(&self, f: F) -> Stream<B>
where
B: Send + Sync + Clone + 'static,
F: Fn(A) -> B + Send + Sync + 'static,
{
commit(|| {
let src = Arc::new(RwLock::new(Source::new()));
let weak = Arc::downgrade(&src);
self.source
.write()
.unwrap()
.register(move |a| with_weak(&weak, |src| src.send(f(a))));
Stream {
source: src,
keep_alive: Box::new(self.clone()),
}
})
}
pub fn filter<F>(&self, f: F) -> Stream<A>
where
F: Fn(&A) -> bool + Send + Sync + 'static,
{
self.filter_map(move |a| if f(&a) { Some(a) } else { None })
}
pub fn filter_map<B, F>(&self, f: F) -> Stream<B>
where
B: Send + Sync + Clone + 'static,
F: Fn(A) -> Option<B> + Send + Sync + 'static,
{
self.map(f).filter_some()
}
pub fn merge(&self, other: &Stream<A>) -> Stream<A> {
commit(|| {
let src = Arc::new(RwLock::new(Source::new()));
for parent in &[self, other] {
let weak = Arc::downgrade(&src);
parent
.source
.write()
.unwrap()
.register(move |a| with_weak(&weak, |src| src.send(a)));
}
Stream {
source: src,
keep_alive: Box::new((self.clone(), other.clone())),
}
})
}
pub fn coalesce<F>(&self, reducer: F) -> Stream<A>
where
F: Fn(A, A) -> A + Send + Sync + 'static,
{
commit(|| coalesce::stream(self, reducer))
}
pub fn hold(&self, initial: A) -> Signal<A> {
signal::hold(initial, self)
}
pub fn events(&self) -> Events<A> {
Events::new(self)
}
pub fn fold<B, F>(&self, initial: B, f: F) -> Signal<B>
where
B: Send + Sync + Clone + 'static,
F: Fn(B, A) -> B + Send + Sync + 'static,
{
Signal::cyclic(|fold| fold.snapshot(self, f).hold(initial))
}
}
impl<A: Clone + Send + Sync + 'static> Stream<Option<A>> {
pub fn filter_some(&self) -> Stream<A> {
commit(|| {
let src = Arc::new(RwLock::new(Source::new()));
let weak = Arc::downgrade(&src);
self.source
.write()
.unwrap()
.register(move |a| a.map_or(Ok(()), |a| with_weak(&weak, |src| src.send(a))));
Stream {
source: src,
keep_alive: Box::new(self.clone()),
}
})
}
}
impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>> {
pub fn switch(&self) -> Stream<A> {
fn rewire_callbacks<A>(
new_stream: Stream<A>,
source: Weak<RwLock<Source<A>>>,
terminate: &mut Arc<()>,
) -> CallbackResult
where
A: Send + Sync + Clone + 'static,
{
*terminate = Arc::new(());
let weak = Arc::downgrade(&terminate);
new_stream.source.write().unwrap().register(move |a| {
weak.upgrade()
.ok_or(CallbackError::Disappeared)
.and_then(|_| with_weak(&source, |src| src.send(a)))
});
Ok(())
}
commit(|| {
let src = Arc::new(RwLock::new(Source::new()));
let weak = Arc::downgrade(&src);
self.source.write().unwrap().register({
let mut terminate = Arc::new(());
move |stream| rewire_callbacks(stream, weak.clone(), &mut terminate)
});
Stream {
source: src,
keep_alive: Box::new(self.clone()),
}
})
}
}
pub fn snapshot<A, B, C, F>(signal: &Signal<A>, stream: &Stream<B>, f: F) -> Stream<C>
where
A: Clone + Send + Sync + 'static,
B: Clone + Send + Sync + 'static,
C: Clone + Send + Sync + 'static,
F: Fn(A, B) -> C + Send + Sync + 'static,
{
commit(|| {
let src = Arc::new(RwLock::new(Source::new()));
let weak = Arc::downgrade(&src);
stream.source.write().unwrap().register({
let signal = signal.clone();
move |b| with_weak(&weak, |src| src.send(f(sample_raw(&signal), b)))
});
Stream {
source: src,
keep_alive: Box::new((stream.clone(), signal.clone())),
}
})
}
pub struct Events<A> {
receiver: Receiver<A>,
#[allow(dead_code)]
keep_alive: Box<dyn BoxClone>,
}
impl<A: Send + Sync + 'static> Events<A> {
fn new(stream: &Stream<A>) -> Events<A> {
commit(|| {
let (tx, rx) = channel();
let tx = Mutex::new(tx);
stream.source.write().unwrap().register(move |a| {
tx.lock()
.unwrap()
.send(a)
.map_err(|_| CallbackError::Disappeared)
});
Events {
receiver: rx,
keep_alive: Box::new(stream.clone()),
}
})
}
}
impl<A: Send + Sync + 'static> Iterator for Events<A> {
type Item = A;
fn next(&mut self) -> Option<A> {
self.receiver.recv().ok()
}
}
pub fn build<A, T: BoxClone + 'static>(src: Arc<RwLock<Source<A>>>, keep_alive: &T) -> Stream<A> {
Stream {
source: src,
keep_alive: keep_alive.box_clone(),
}
}
#[cfg(test)]
mod test {
use quickcheck::quickcheck;
use std::thread;
use std::time::Duration;
use super::*;
use crate::testing::{id, stream_eq};
#[test]
fn sink() {
let sink = Sink::new();
let mut events = sink.stream().events();
sink.send(1);
sink.send(2);
assert_eq!(events.next(), Some(1));
assert_eq!(events.next(), Some(2));
}
#[test]
fn map() {
let sink = Sink::new();
let triple = sink.stream().map(|x| 3 * x);
let mut events = triple.events();
sink.send(1);
assert_eq!(events.next(), Some(3));
}
#[test]
fn filter_some() {
let sink = Sink::new();
let small = sink.stream().filter_some();
let mut events = small.events();
sink.send(None);
sink.send(Some(9));
assert_eq!(events.next(), Some(9));
}
#[test]
fn chain_1() {
let sink: Sink<i32> = Sink::new();
let chain = sink.stream().map(|x| x / 2).filter(|&x| x < 3);
let mut events = chain.events();
sink.send(7);
sink.send(4);
assert_eq!(events.next(), Some(2));
}
#[test]
fn merge() {
let sink1 = Sink::new();
let sink2 = Sink::new();
let mut events = sink1.stream().merge(&sink2.stream()).events();
sink1.send(12);
sink2.send(9);
assert_eq!(events.next(), Some(12));
assert_eq!(events.next(), Some(9));
}
#[test]
fn chain_2() {
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<i32> = Sink::new();
let mut events = sink1
.stream()
.map(|x| x + 4)
.merge(
&sink2
.stream()
.filter_map(|x| if x < 4 { Some(x) } else { None })
.map(|x| x * 5),
)
.events();
sink1.send(12);
sink2.send(3);
assert_eq!(events.next(), Some(16));
assert_eq!(events.next(), Some(15));
}
#[test]
fn move_closure() {
let sink = Sink::<i32>::new();
let x = 3;
sink.stream().map(move |y| y + x);
}
#[test]
fn fold_race_condition() {
let sink = Sink::new();
sink.feed_async(0..100000);
for _ in 0..10 {
let _sum = sink.stream().fold(0, |a, b| a + b);
}
}
#[test]
fn sink_send_async() {
let sink = Sink::new();
let mut events = sink.stream().events();
sink.send_async(1);
assert_eq!(events.next(), Some(1));
}
#[test]
fn sink_feed() {
let sink = Sink::new();
let events = sink.stream().events();
sink.feed(0..10);
for (n, m) in events.take(10).enumerate() {
assert_eq!(n as i32, m);
}
}
#[test]
fn sink_feed_async() {
let sink = Sink::new();
let events = sink.stream().events();
sink.feed_async(0..10);
for (n, m) in events.take(10).enumerate() {
assert_eq!(n as i32, m);
}
}
#[test]
fn coalesce() {
let sink = Sink::new();
let stream = sink.stream().merge(&sink.stream()).coalesce(|a, b| a + b);
let mut events = stream.events();
sink.send(1);
assert_eq!(events.next(), Some(2));
}
#[test]
fn monoid_left_identity() {
fn check(input: Vec<i32>) -> Result<(), String> {
let sink = Sink::new();
let a = sink.stream();
let eq = stream_eq(&Stream::never().merge(&a), &a);
sink.feed(input.into_iter());
eq.sample()
}
quickcheck(check as fn(Vec<i32>) -> Result<(), String>);
}
#[test]
fn monoid_right_identity() {
fn check(input: Vec<i32>) -> Result<(), String> {
let sink = Sink::new();
let a = sink.stream();
let eq = stream_eq(&a.merge(&Stream::never()), &a);
sink.feed(input.into_iter());
eq.sample()
}
quickcheck(check as fn(Vec<i32>) -> Result<(), String>);
}
#[test]
fn monoid_associative() {
fn check(input_a: Vec<i32>, input_b: Vec<i32>, input_c: Vec<i32>) -> Result<(), String> {
let sink_a = Sink::new();
let sink_b = Sink::new();
let sink_c = Sink::new();
let a = sink_a.stream();
let b = sink_b.stream();
let c = sink_c.stream();
let eq = stream_eq(&a.merge(&b.merge(&c)), &a.merge(&b).merge(&c));
thread::spawn(move || sink_a.feed(input_a.into_iter()));
thread::spawn(move || sink_b.feed(input_b.into_iter()));
thread::spawn(move || sink_c.feed(input_c.into_iter()));
thread::sleep(Duration::from_millis(1));
eq.sample()
}
quickcheck(check as fn(Vec<i32>, Vec<i32>, Vec<i32>) -> Result<(), String>);
}
#[test]
fn functor_identity() {
fn check(input: Vec<i32>) -> Result<(), String> {
let sink = Sink::new();
let a = sink.stream();
let eq = stream_eq(&a.map(id), &a);
sink.feed(input.into_iter());
eq.sample()
}
quickcheck(check as fn(Vec<i32>) -> Result<(), String>);
}
#[test]
fn functor_composition() {
fn check(input: Vec<i32>) -> Result<(), String> {
fn f(n: i32) -> i64 {
(n + 3) as i64
}
fn g(n: i64) -> f64 {
n as f64 / 2.5
}
let sink = Sink::new();
let a = sink.stream();
let eq = stream_eq(&a.map(f).map(g), &a.map(|n| g(f(n))));
sink.feed(input.into_iter());
eq.sample()
}
quickcheck(check as fn(Vec<i32>) -> Result<(), String>);
}
}