use crate::helpers::arc_and_weak;
use crate::signal::Signal;
use crate::sync::Mutex;
use crate::types::{Callbacks, MaybeOwned, ObserveResult, Storage, SumType2};
use std::any::Any;
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[cfg(feature = "either")]
use crate::types::Either;
#[cfg(feature = "nightly")]
use crate::futures::StreamFuture;
#[derive(Debug)]
pub struct Sink<T> {
cbs: Arc<Callbacks<T>>,
}
impl<T> Sink<T> {
pub fn new() -> Self {
Sink {
cbs: Default::default(),
}
}
pub fn stream(&self) -> Stream<T> {
Stream::new(self.cbs.clone(), Source::None)
}
#[inline]
pub fn send<'a>(&self, val: impl Into<MaybeOwned<'a, T>>)
where
T: 'a,
{
self.cbs.call(val)
}
#[inline]
pub fn feed<'a, I, U>(&self, iter: I)
where
I: IntoIterator<Item = U>,
U: Into<MaybeOwned<'a, T>>,
T: 'a,
{
for val in iter {
self.send(val)
}
}
#[cfg(feature = "crossbeam-utils")]
#[inline]
pub fn send_parallel(&self, val: &T)
where
T: Sync,
{
self.cbs.call_parallel(val)
}
}
impl<T> Default for Sink<T> {
#[inline]
fn default() -> Self {
Sink::new()
}
}
impl<T> Clone for Sink<T> {
fn clone(&self) -> Self {
Sink {
cbs: self.cbs.clone(),
}
}
}
#[derive(Debug, Clone)]
enum Source {
None,
Erased(Arc<dyn Any + Send + Sync>),
}
impl Source {
fn stream<T: 'static>(s: &Stream<T>) -> Self {
Source::Erased(Arc::new(s.clone()))
}
fn stream2<A: 'static, B: 'static>(s1: &Stream<A>, s2: &Stream<B>) -> Self {
Source::Erased(Arc::new((s1.clone(), s2.clone())))
}
}
#[derive(Debug)]
pub struct Stream<T> {
cbs: Arc<Callbacks<T>>,
source: Source,
}
impl<T> Stream<T> {
fn new(cbs: Arc<Callbacks<T>>, source: Source) -> Self {
Stream { cbs, source }
}
pub fn never() -> Self {
Stream::new(Default::default(), Source::None)
}
pub fn observe<F, R>(&self, f: F)
where
F: Fn(MaybeOwned<'_, T>) -> R + Send + Sync + 'static,
R: ObserveResult,
{
self.cbs.push(move |arg| f(arg).is_callback_alive());
}
pub fn observe_strong<F, R>(&self, f: F)
where
F: Fn(MaybeOwned<'_, T>) -> R + Send + Sync + 'static,
T: 'static,
R: ObserveResult,
{
let this = self.clone();
self.cbs.push(move |arg| {
let _keepalive = &this;
f(arg).is_callback_alive()
});
}
#[inline]
pub fn inspect<F, R>(self, f: F) -> Self
where
F: Fn(MaybeOwned<'_, T>) -> R + Send + Sync + 'static,
R: ObserveResult,
{
self.observe(f);
self
}
}
impl<T: 'static> Stream<T> {
#[inline]
pub fn map<F, R>(&self, f: F) -> Stream<R>
where
F: Fn(MaybeOwned<'_, T>) -> R + Send + Sync + 'static,
R: 'static,
{
self.filter_map(move |arg| Some(f(arg)))
}
pub fn filter<F>(&self, pred: F) -> Self
where
F: Fn(&T) -> bool + Send + Sync + 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
self.cbs.push(move |arg| {
with_weak!(weak, |cb| if pred(&arg) {
cb.call(arg)
})
});
Stream::new(new_cbs, Source::stream(self))
}
pub fn filter_map<F, R>(&self, f: F) -> Stream<R>
where
F: Fn(MaybeOwned<'_, T>) -> Option<R> + Send + Sync + 'static,
R: 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
self.cbs.push(move |arg| {
with_weak!(weak, |cb| if let Some(val) = f(arg) {
cb.call(val)
})
});
Stream::new(new_cbs, Source::stream(self))
}
pub fn merge(&self, other: &Stream<T>) -> Self {
let (new_cbs, weak1) = arc_and_weak(Callbacks::new());
let weak2 = weak1.clone();
self.cbs
.push(move |arg| with_weak!(weak1, |cb| cb.call(arg)));
other
.cbs
.push(move |arg| with_weak!(weak2, |cb| cb.call(arg)));
Stream::new(new_cbs, Source::stream2(self, other))
}
pub fn merge_with<U, F1, F2, R>(&self, other: &Stream<U>, f1: F1, f2: F2) -> Stream<R>
where
F1: Fn(MaybeOwned<'_, T>) -> R + Send + Sync + 'static,
F2: Fn(MaybeOwned<'_, U>) -> R + Send + Sync + 'static,
U: 'static,
R: 'static,
{
let (new_cbs, weak1) = arc_and_weak(Callbacks::new());
let weak2 = weak1.clone();
self.cbs
.push(move |arg| with_weak!(weak1, |cb| cb.call(f1(arg))));
other
.cbs
.push(move |arg| with_weak!(weak2, |cb| cb.call(f2(arg))));
Stream::new(new_cbs, Source::stream2(self, other))
}
#[cfg(feature = "either")]
#[inline]
pub fn merge_with_either<U, F, R>(&self, other: &Stream<U>, f: F) -> Stream<R>
where
F: Fn(Either<MaybeOwned<'_, T>, MaybeOwned<'_, U>>) -> R + Clone + Send + Sync + 'static,
U: 'static,
R: 'static,
{
let f_ = f.clone();
self.merge_with(
other,
move |a| f(Either::Left(a)),
move |b| f_(Either::Right(b)),
)
}
pub fn fold<A, F>(&self, initial: A, f: F) -> Signal<A>
where
F: Fn(A, MaybeOwned<'_, T>) -> A + Send + Sync + 'static,
A: Clone + Send + Sync + 'static,
{
let (storage, weak) = arc_and_weak(Storage::new(initial));
self.cbs.push(move |arg| {
with_weak!(weak, |st| {
st.replace(|old| f(old, arg));
})
});
Signal::from_storage(storage, self.clone())
}
pub fn fold_clone<A, F>(&self, initial: A, f: F) -> Signal<A>
where
F: Fn(A, MaybeOwned<'_, T>) -> A + Send + Sync + 'static,
A: Clone + Send + Sync + 'static,
{
let (storage, weak) = arc_and_weak(Storage::new(initial));
self.cbs.push(move |arg| {
with_weak!(weak, |st| {
st.replace_clone(|old| f(old, arg));
})
});
Signal::from_storage(storage, self.clone())
}
pub fn map_n<F, R>(&self, f: F) -> Stream<R>
where
F: Fn(MaybeOwned<'_, T>, Sender<R>) + Send + Sync + 'static,
R: 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
self.cbs
.push(move |arg| with_weak!(weak, |cb| f(arg, Sender::new(cb))));
Stream::new(new_cbs, Source::stream(self))
}
pub fn scan<A, F>(&self, initial: A, f: F) -> Stream<A>
where
F: Fn(A, MaybeOwned<'_, T>) -> A + Send + Sync + 'static,
A: Clone + Send + Sync + 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
let storage = Storage::new(initial);
self.cbs.push(move |arg| {
with_weak!(weak, |cb| {
let new = storage.replace_fetch(|old| f(old, arg));
cb.call(new)
})
});
Stream::new(new_cbs, Source::stream(self))
}
pub fn scan_n<A, F, R>(&self, initial: A, f: F) -> Stream<R>
where
F: Fn(A, MaybeOwned<'_, T>, Sender<R>) -> A + Send + Sync + 'static,
A: Send + Sync + 'static,
R: 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
let storage = Storage::new(initial);
self.cbs.push(move |arg| {
with_weak!(weak, |cb| storage.replace(|old| f(
old,
arg,
Sender::new(cb)
)))
});
Stream::new(new_cbs, Source::stream(self))
}
#[inline]
pub fn collect<C>(&self) -> Signal<C>
where
C: Default + Extend<T> + Clone + Send + Sync + 'static,
T: Clone,
{
self.fold(C::default(), |mut a, v| {
a.extend(Some(v.into_owned()));
a
})
}
pub fn element_at(&self, index: usize) -> Self {
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
let pos = AtomicUsize::new(0);
self.cbs.push(move |arg| {
weak.upgrade()
.map(|cb| {
let cur_pos = pos.fetch_add(1, Ordering::Relaxed);
if cur_pos == index {
cb.call(arg);
}
cur_pos < index
})
.unwrap_or(false)
});
Stream::new(new_cbs, Source::stream(self))
}
pub fn elements_between<B>(&self, range: B) -> Self
where
B: RangeBounds<usize> + Send + Sync + 'static,
{
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
let pos = AtomicUsize::new(0);
self.cbs.push(move |arg| {
weak.upgrade()
.map(|cb| {
let cur_pos = pos.fetch_add(1, Ordering::Relaxed);
let after_start = match range.start_bound() {
Bound::Included(s) => cur_pos >= *s,
Bound::Excluded(s) => cur_pos > *s,
Bound::Unbounded => true,
};
let before_end = match range.end_bound() {
Bound::Included(e) => cur_pos <= *e,
Bound::Excluded(e) => cur_pos < *e,
Bound::Unbounded => true,
};
if after_start && before_end {
cb.call(arg)
}
before_end
})
.unwrap_or(false)
});
Stream::new(new_cbs, Source::stream(self))
}
}
impl<T: Clone + Send + 'static> Stream<T> {
#[inline]
pub fn hold(&self, initial: T) -> Signal<T>
where
T: Sync,
{
self.hold_if(initial, |_| true)
}
pub fn hold_if<F>(&self, initial: T, pred: F) -> Signal<T>
where
F: Fn(&T) -> bool + Send + Sync + 'static,
T: Sync,
{
let (storage, weak) = arc_and_weak(Storage::new(initial));
self.cbs.push(move |arg| {
with_weak!(weak, |st| if pred(&arg) {
st.set(arg.into_owned());
})
});
Signal::from_storage(storage, self.clone())
}
#[inline]
pub fn zip<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
where
U: Clone + Send + 'static,
{
self.zip_with(other, |a, b| (a, b))
}
pub fn zip_with<U, F, R>(&self, other: &Stream<U>, f: F) -> Stream<R>
where
F: Fn(T, U) -> R + Clone + Send + Sync + 'static,
U: Clone + Send + 'static,
R: 'static,
{
let (new_cbs, weak1) = arc_and_weak(Callbacks::new());
let weak2 = weak1.clone();
let left = Arc::new(Mutex::new(VecDeque::new()));
let right = Arc::new(Mutex::new(VecDeque::new()));
let left1 = left.clone();
let right1 = right.clone();
let f_ = f.clone();
self.cbs.push(move |arg| {
with_weak!(weak1, |cb| if let Some(val) = right1.lock().pop_front() {
cb.call(f(arg.into_owned(), val));
} else {
left.lock().push_back(arg.into_owned());
})
});
other.cbs.push(move |arg| {
with_weak!(weak2, |cb| if let Some(val) = left1.lock().pop_front() {
cb.call(f_(val, arg.into_owned()));
} else {
right.lock().push_back(arg.into_owned());
})
});
Stream::new(new_cbs, Source::stream2(self, other))
}
#[inline]
pub fn combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
where
U: Clone + Send + 'static,
{
self.combine_with(other, |a, b| (a, b))
}
pub fn combine_with<U, F, R>(&self, other: &Stream<U>, f: F) -> Stream<R>
where
F: Fn(T, U) -> R + Clone + Send + Sync + 'static,
U: Clone + Send + 'static,
R: 'static,
{
let (new_cbs, weak1) = arc_and_weak(Callbacks::new());
let weak2 = weak1.clone();
let left = Arc::new(Mutex::new(None));
let right = Arc::new(Mutex::new(None));
let left1 = left.clone();
let right1 = right.clone();
let f_ = f.clone();
self.cbs.push(move |arg| {
with_weak!(weak1, |cb| {
let arg = arg.into_owned();
*left.lock() = Some(arg.clone());
if let Some(val) = right1.lock().as_ref() {
cb.call(f(arg, U::clone(val)));
}
})
});
other.cbs.push(move |arg| {
with_weak!(weak2, |cb| {
let arg = arg.into_owned();
*right.lock() = Some(arg.clone());
if let Some(val) = left1.lock().as_ref() {
cb.call(f_(T::clone(val), arg));
}
})
});
Stream::new(new_cbs, Source::stream2(self, other))
}
#[cfg(feature = "nightly")]
#[inline]
pub fn next(&self) -> StreamFuture<T> {
StreamFuture::new(self.clone())
}
}
impl<T: Clone + 'static> Stream<Option<T>> {
#[inline]
pub fn filter_some(&self) -> Stream<T> {
self.filter_first()
}
}
impl<T: Clone + 'static, E: Clone + 'static> Stream<Result<T, E>> {
#[inline]
pub fn filter_ok(&self) -> Stream<T> {
self.filter_first()
}
#[inline]
pub fn filter_err(&self) -> Stream<E> {
self.filter_second()
}
}
impl<T: SumType2 + Clone + 'static> Stream<T>
where
T::Type1: 'static,
T::Type2: 'static,
{
pub fn filter_first(&self) -> Stream<T::Type1> {
self.filter_map(|res| {
if res.is_type1() {
res.into_owned().into_type1()
} else {
None
}
})
}
pub fn filter_second(&self) -> Stream<T::Type2> {
self.filter_map(|res| {
if res.is_type2() {
res.into_owned().into_type2()
} else {
None
}
})
}
pub fn split(&self) -> (Stream<T::Type1>, Stream<T::Type2>) {
let (cbs_1, weak_1) = arc_and_weak(Callbacks::new());
let (cbs_2, weak_2) = arc_and_weak(Callbacks::new());
self.cbs.push(move |result| {
if result.is_type1() {
if let Some(cb) = weak_1.upgrade() {
cb.call(result.into_owned().into_type1().unwrap());
true
} else {
weak_2.upgrade().is_some()
}
} else
{
if let Some(cb) = weak_2.upgrade() {
cb.call(result.into_owned().into_type2().unwrap());
true
} else {
weak_1.upgrade().is_some()
}
}
});
let source = Source::stream(self);
let stream_1 = Stream::new(cbs_1, source.clone());
let stream_2 = Stream::new(cbs_2, source);
(stream_1, stream_2)
}
}
impl<T: 'static> Stream<Stream<T>> {
pub fn switch(&self) -> Stream<T> {
let (new_cbs, weak) = arc_and_weak(Callbacks::new());
let id = Arc::new(AtomicUsize::new(0));
self.cbs.push(move |stream| {
if weak.upgrade().is_none() {
return false;
}
let cbs_w = weak.clone();
let cur_id = id.clone();
let my_id = id.fetch_add(1, Ordering::Relaxed) + 1;
stream.cbs.push(move |arg| {
if my_id != cur_id.load(Ordering::Relaxed) {
return false;
}
with_weak!(cbs_w, |cb| cb.call(arg))
});
true
});
Stream::new(new_cbs, Source::stream(self))
}
}
impl<T> Clone for Stream<T> {
fn clone(&self) -> Self {
Stream {
cbs: self.cbs.clone(),
source: self.source.clone(),
}
}
}
impl<T> Default for Stream<T> {
#[inline]
fn default() -> Self {
Stream::never()
}
}
#[derive(Debug)]
pub struct Sender<T>(Sink<T>);
impl<T> Sender<T> {
fn new(cbs: Arc<Callbacks<T>>) -> Self {
Sender(Sink { cbs })
}
#[inline]
pub fn send(&self, val: T) {
self.0.send(val)
}
#[inline]
pub fn feed(&self, iter: impl IntoIterator<Item = T>) {
self.0.feed(iter)
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender(self.0.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;
impl<T: Clone + Send + 'static> Stream<T> {
fn as_sync_channel(&self, bound: usize) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::sync_channel(bound);
self.observe(move |arg| tx.send(arg.into_owned()));
rx
}
}
#[test]
fn stream_basic() {
let sink = Sink::new();
let stream = sink.stream();
let rx = stream.as_sync_channel(20);
sink.send(42);
sink.send(33);
sink.send(12);
sink.feed(0..5);
sink.feed(vec![11, 22, 33]);
let result: Vec<_> = rx.try_iter().collect();
assert_eq!(result, [42, 33, 12, 0, 1, 2, 3, 4, 11, 22, 33]);
}
#[test]
fn stream_send_ref() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct Test(i32);
let sink: Sink<Test> = Sink::new();
let stream = sink.stream();
let rx = stream.as_sync_channel(10);
{
let a = Test(42);
let b = [Test(33), Test(-1)];
sink.send(&a);
sink.feed(&b);
}
assert_eq!(rx.try_recv(), Ok(Test(42)));
assert_eq!(rx.try_recv(), Ok(Test(33)));
assert_eq!(rx.try_recv(), Ok(Test(-1)));
}
#[test]
fn stream_switch() {
let stream_sink = Sink::new();
let sink1 = Sink::new();
let sink2 = Sink::new();
let switched = stream_sink.stream().switch();
let events = switched.as_sync_channel(10);
sink1.send(1);
sink2.send(2);
stream_sink.send(sink2.stream());
sink1.send(3);
sink2.send(4);
assert_eq!(events.try_recv(), Ok(4));
stream_sink.send(sink1.stream());
sink1.send(5);
sink2.send(6);
assert_eq!(events.try_recv(), Ok(5));
}
#[test]
fn stream_default() {
let sink: Sink<i32> = Default::default();
let stream1 = sink.stream();
let stream2: Stream<i32> = Default::default();
let merged = stream1.merge(&stream2);
let rx = merged.as_sync_channel(10);
sink.send(42);
sink.send(13);
assert_eq!(rx.try_recv(), Ok(42));
assert_eq!(rx.try_recv(), Ok(13));
}
#[test]
fn stream_scan() {
let sink = Sink::new();
let stream = sink.stream().scan(0, |a, n| a + *n);
let rx = stream.as_sync_channel(10);
sink.send(1);
assert_eq!(rx.try_recv(), Ok(1));
sink.send(2);
sink.send(10);
assert_eq!(rx.try_recv(), Ok(3));
assert_eq!(rx.try_recv(), Ok(13));
}
#[test]
fn stream_scan_n() {
let sink = Sink::new();
let stream = sink.stream().scan_n(std::i32::MIN, |a, n, sender| {
let n = *n;
if n > a {
sender.send(n);
n
} else {
a
}
});
let rx = stream.as_sync_channel(10);
sink.feed(&[1, 2, -1, 10, 5, 7, 42]);
let result: Vec<_> = rx.try_iter().collect();
assert_eq!(result, [1, 2, 10, 42]);
}
#[test]
fn stream_observe_strong() {
let sink = Sink::new();
let (tx, rx) = mpsc::sync_channel(10);
let (arc, weak) = arc_and_weak(Arc::new(()));
sink.stream().map(|x| *x * 2).observe_strong(move |x| {
let _a = &arc;
tx.send(*x)
});
sink.send(6);
assert_eq!(rx.try_recv(), Ok(12));
assert!(weak.upgrade().is_some());
drop(rx);
sink.send(10);
assert_eq!(weak.upgrade(), None);
sink.send(42);
assert_eq!(sink.cbs.len(), 0);
}
#[cfg(feature = "crossbeam-utils")]
#[test]
fn stream_send_parallel() {
use std::thread;
use std::time::{Duration, Instant};
let sink = Sink::new();
let s1 = sink.stream().map(|x| {
thread::sleep(Duration::from_millis(50));
*x + 1
});
let s2 = sink.stream().map(|x| {
thread::sleep(Duration::from_millis(50));
*x * 2
});
let result = s1.merge(&s2).fold(0, |a, n| a + *n);
let t = Instant::now();
sink.send_parallel(&10);
assert!(t.elapsed() < Duration::from_millis(100));
assert_eq!(result.sample(), 31);
sink.send_parallel(&1);
sink.send_parallel(&13);
assert_eq!(result.sample(), 75);
}
#[cfg(feature = "nightly")]
#[test]
fn stream_future() {
use futures::executor::LocalPool;
use futures::task::SpawnExt;
use std::thread;
use std::time::Duration;
let sink = Sink::new();
let future = sink.stream().map(|a| *a * 2).next();
let mut pool = LocalPool::new();
pool.spawner()
.spawn(
async {
let res = r#await!(future);
assert_eq!(res, 42);
},
)
.unwrap();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
sink.send(21);
});
pool.run();
}
#[test]
fn stream_zip() {
use std::sync::mpsc::TryRecvError::Empty;
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<&str> = Sink::new();
let zipped = sink1.stream().zip(&sink2.stream());
let rx = zipped.as_sync_channel(10);
sink1.send(1);
assert_eq!(rx.try_recv(), Err(Empty));
sink2.send("foo");
assert_eq!(rx.try_recv(), Ok((1, "foo")));
sink2.send("bar");
assert_eq!(rx.try_recv(), Err(Empty));
sink2.send("asd");
sink1.send(2);
assert_eq!(rx.try_recv(), Ok((2, "bar")));
}
#[test]
fn stream_combine() {
use std::sync::mpsc::TryRecvError::Empty;
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<&str> = Sink::new();
let combined = sink1.stream().combine(&sink2.stream());
let rx = combined.as_sync_channel(10);
sink1.send(1);
assert_eq!(rx.try_recv(), Err(Empty));
sink2.send("foo");
assert_eq!(rx.try_recv(), Ok((1, "foo")));
sink1.send(2);
assert_eq!(rx.try_recv(), Ok((2, "foo")));
sink1.send(3);
assert_eq!(rx.try_recv(), Ok((3, "foo")));
}
#[test]
fn stream_element_at() {
use std::sync::mpsc::TryRecvError::Empty;
let sink: Sink<i32> = Sink::new();
let stream1 = sink.stream().element_at(0);
let stream2 = sink.stream().element_at(2);
let stream3 = sink.stream().element_at(13);
let rx1 = stream1.as_sync_channel(10);
let rx2 = stream2.as_sync_channel(10);
let rx3 = stream3.as_sync_channel(10);
sink.feed(&[1, 12, 42, 7, 13]);
assert_eq!(rx1.try_recv(), Ok(1));
assert_eq!(rx1.try_recv(), Err(Empty));
assert_eq!(rx2.try_recv(), Ok(42));
assert_eq!(rx2.try_recv(), Err(Empty));
assert_eq!(rx3.try_recv(), Err(Empty));
}
#[test]
fn stream_elements_between() {
let sink: Sink<i32> = Sink::new();
let stream1 = sink.stream().elements_between(..3);
let stream2 = sink.stream().elements_between(2..=4);
let stream3 = sink.stream().elements_between(3..);
let rx1 = stream1.as_sync_channel(10);
let rx2 = stream2.as_sync_channel(10);
let rx3 = stream3.as_sync_channel(10);
sink.feed(&[1, 12, 42, 7, 13, -6, 22]);
let result1: Vec<_> = rx1.try_iter().collect();
let result2: Vec<_> = rx2.try_iter().collect();
let result3: Vec<_> = rx3.try_iter().collect();
assert_eq!(result1, [1, 12, 42]);
assert_eq!(result2, [42, 7, 13]);
assert_eq!(result3, [7, 13, -6, 22]);
}
}