use std::rc::Rc;
use std::cell::Cell;
use std::task::Poll;
use futures_signals::cancelable_future;
use futures_signals::signal::{Signal, SignalExt, Mutable, channel};
use futures_signals::signal_vec::VecDiff;
use futures_util::future::{ready, poll_fn};
use pin_utils::pin_mut;
mod util;
#[test]
fn test_mutable() {
let mutable = Mutable::new(1);
let mut s1 = mutable.signal();
let mut s2 = mutable.signal_cloned();
util::with_noop_context(|cx| {
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Pending);
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Pending);
mutable.set(5);
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Pending);
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Pending);
drop(mutable);
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(None));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(None));
});
}
#[test]
fn test_mutable_drop() {
{
let mutable = Mutable::new(1);
let mut s1 = mutable.signal();
let mut s2 = mutable.signal_cloned();
drop(mutable);
util::with_noop_context(|cx| {
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(None));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(None));
});
}
{
let mutable = Mutable::new(1);
let mut s1 = mutable.signal();
let mut s2 = mutable.signal_cloned();
util::with_noop_context(|cx| {
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Pending);
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Pending);
mutable.set(5);
drop(mutable);
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(None));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(None));
});
}
{
let mutable = Mutable::new(1);
let mut s1 = mutable.signal();
let mut s2 = mutable.signal_cloned();
util::with_noop_context(|cx| {
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Pending);
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(1)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Pending);
mutable.set(5);
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s1.poll_change_unpin(cx), Poll::Pending);
drop(mutable);
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(Some(5)));
assert_eq!(s2.poll_change_unpin(cx), Poll::Ready(None));
assert_eq!(s1.poll_change_unpin(cx), Poll::Ready(None));
});
}
}
#[test]
fn test_send_sync() {
let a = cancelable_future(ready(()), || ());
let _: Box<dyn Send + Sync> = Box::new(a.0);
let _: Box<dyn Send + Sync> = Box::new(a.1);
let _: Box<dyn Send + Sync> = Box::new(Mutable::new(1));
let _: Box<dyn Send + Sync> = Box::new(Mutable::new(1).signal());
let _: Box<dyn Send + Sync> = Box::new(Mutable::new(1).signal_cloned());
let a = channel(1);
let _: Box<dyn Send + Sync> = Box::new(a.0);
let _: Box<dyn Send + Sync> = Box::new(a.1);
}
#[test]
fn test_map_future() {
let mutable = Rc::new(Mutable::new(1));
let first = Rc::new(Cell::new(true));
let s = {
let first = first.clone();
mutable.signal().map_future(move |value| {
let first = first.clone();
poll_fn(move |_| {
if first.get() {
Poll::Pending
} else {
Poll::Ready(value)
}
})
})
};
util::ForEachSignal::new(s)
.next({
let mutable = mutable.clone();
move |_, change| {
assert_eq!(change, Poll::Ready(Some(None)));
mutable.set(2);
}
})
.next({
let mutable = mutable.clone();
move |_, change| {
assert_eq!(change, Poll::Pending);
first.set(false);
mutable.set(3);
}
})
.next(|_, change| {
assert_eq!(change, Poll::Ready(Some(Some(3))));
})
.run();
}
#[test]
fn test_switch_signal_vec() {
let input = util::Source::new(vec![
Poll::Ready(true),
Poll::Pending,
Poll::Pending,
Poll::Ready(false),
Poll::Ready(false),
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Ready(false),
Poll::Ready(true),
Poll::Pending,
]);
let output = input.switch_signal_vec(move |test| {
if test {
util::Source::new(vec![
Poll::Ready(VecDiff::Push { value: 10 }),
])
} else {
util::Source::new(vec![
Poll::Ready(VecDiff::Replace { values: vec![0, 1, 2, 3, 4, 5] }),
Poll::Ready(VecDiff::Push { value: 6 }),
Poll::Pending,
Poll::Pending,
Poll::Ready(VecDiff::InsertAt { index: 0, value: 7 }),
])
}
});
util::assert_signal_vec_eq(output, vec![
Poll::Ready(Some(VecDiff::Push { value: 10 })),
Poll::Pending,
Poll::Ready(Some(VecDiff::Replace { values: vec![0, 1, 2, 3, 4, 5] })),
Poll::Ready(Some(VecDiff::Push { value: 6 })),
Poll::Pending,
Poll::Pending,
Poll::Ready(Some(VecDiff::InsertAt { index: 0, value: 7 })),
Poll::Pending,
Poll::Ready(Some(VecDiff::Replace { values: vec![] })),
Poll::Ready(Some(VecDiff::Push { value: 10 })),
Poll::Ready(None)
]);
}
#[test]
fn test_throttle() {
let input = util::Source::new(vec![
Poll::Ready(true),
Poll::Pending,
Poll::Pending,
Poll::Ready(false),
Poll::Ready(false),
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Ready(false),
Poll::Ready(true),
Poll::Pending,
]);
let output = input.throttle(move || {
let mut done = false;
poll_fn(move |context| {
if done {
done = false;
Poll::Ready(())
} else {
done = true;
context.waker().wake_by_ref();
Poll::Pending
}
})
});
util::assert_signal_eq(output, vec![
Poll::Ready(Some(true)),
Poll::Pending,
Poll::Pending,
Poll::Ready(Some(false)),
Poll::Ready(Some(false)),
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Ready(Some(false)),
Poll::Ready(Some(true)),
Poll::Pending,
Poll::Ready(None),
]);
}
#[test]
fn test_throttle_timing() {
let input = util::Source::new(vec![
Poll::Ready(0),
Poll::Ready(1),
Poll::Ready(2),
Poll::Ready(3),
Poll::Ready(4),
Poll::Ready(5),
]);
struct Called {
ready: Mutable<bool>,
function: Cell<u32>,
future: Cell<u32>,
}
let called = Rc::new(Called {
ready: Mutable::new(true),
function: Cell::new(0),
future: Cell::new(0),
});
let output = input.throttle({
let called = called.clone();
move || {
called.function.set(called.function.get() + 1);
let called = called.clone();
async move {
called.future.set(called.future.get() + 1);
called.ready.signal().wait_for(true).await;
}
}
});
pin_mut!(output);
util::with_noop_context(|cx| {
assert_eq!(called.function.get(), 0);
assert_eq!(called.future.get(), 0);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(0)));
assert_eq!(called.function.get(), 1);
assert_eq!(called.future.get(), 1);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(1)));
assert_eq!(called.function.get(), 2);
assert_eq!(called.future.get(), 2);
called.ready.set(false);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(2)));
assert_eq!(called.function.get(), 3);
assert_eq!(called.future.get(), 3);
assert_eq!(output.as_mut().poll_change(cx), Poll::Pending);
assert_eq!(called.function.get(), 3);
assert_eq!(called.future.get(), 3);
assert_eq!(output.as_mut().poll_change(cx), Poll::Pending);
assert_eq!(called.function.get(), 3);
assert_eq!(called.future.get(), 3);
called.ready.set(true);
assert_eq!(called.function.get(), 3);
assert_eq!(called.future.get(), 3);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(3)));
assert_eq!(called.function.get(), 4);
assert_eq!(called.future.get(), 4);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(4)));
assert_eq!(called.function.get(), 5);
assert_eq!(called.future.get(), 5);
assert_eq!(output.as_mut().poll_change(cx), Poll::Ready(Some(5)));
assert_eq!(called.function.get(), 6);
assert_eq!(called.future.get(), 6);
assert_eq!(output.poll_change(cx), Poll::Ready(None));
});
}