use std::pin::Pin;
use std::marker::Unpin;
use std::future::Future;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_util::stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use crate::internal::Map2;
use crate::signal_vec::{VecDiff, SignalVec};
#[must_use = "Signals do nothing unless polled"]
pub trait Signal {
type Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
}
impl<'a, A> Signal for &'a mut A where A: ?Sized + Signal + Unpin {
type Item = A::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
A::poll_change(Pin::new(&mut **self), cx)
}
}
impl<A> Signal for Box<A> where A: ?Sized + Signal + Unpin {
type Item = A::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
A::poll_change(Pin::new(&mut *self), cx)
}
}
impl<A> Signal for Pin<A>
where A: Unpin + ::std::ops::DerefMut,
A::Target: Signal {
type Item = <<A as ::std::ops::Deref>::Target as Signal>::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::get_mut(self).as_mut().poll_change(cx)
}
}
pub trait SignalExt: Signal {
#[inline]
fn to_stream(self) -> SignalStream<Self>
where Self: Sized {
SignalStream {
signal: self,
}
}
#[inline]
fn to_future(self) -> SignalFuture<Self>
where Self: Sized {
SignalFuture {
signal: self,
value: None,
}
}
#[inline]
fn map<A, B>(self, callback: B) -> Map<Self, B>
where B: FnMut(Self::Item) -> A,
Self: Sized {
Map {
signal: self,
callback,
}
}
#[inline]
fn inspect<A>(self, callback: A) -> Inspect<Self, A>
where A: FnMut(&Self::Item),
Self: Sized {
Inspect {
signal: self,
callback,
}
}
#[inline]
fn dedupe_map<A, B>(self, callback: B) -> DedupeMap<Self, B>
where B: FnMut(&mut Self::Item) -> A,
Self: Sized {
DedupeMap {
old_value: None,
signal: self,
callback,
}
}
#[inline]
fn dedupe(self) -> Dedupe<Self> where Self: Sized {
Dedupe {
old_value: None,
signal: self,
}
}
#[inline]
fn dedupe_cloned(self) -> DedupeCloned<Self> where Self: Sized {
DedupeCloned {
old_value: None,
signal: self,
}
}
#[inline]
fn map_future<A, B>(self, callback: B) -> MapFuture<Self, A, B>
where A: Future,
B: FnMut(Self::Item) -> A,
Self: Sized {
MapFuture {
signal: Some(self),
future: None,
callback,
first: true,
}
}
#[inline]
fn filter_map<A, B>(self, callback: B) -> FilterMap<Self, B>
where B: FnMut(Self::Item) -> Option<A>,
Self: Sized {
FilterMap {
signal: self,
callback,
first: true,
}
}
#[inline]
fn throttle<A, B>(self, callback: B) -> Throttle<Self, A, B>
where A: Future<Output = ()>,
B: FnMut() -> A,
Self: Sized {
Throttle {
signal: Some(self),
future: None,
callback,
}
}
#[inline]
fn flatten(self) -> Flatten<Self>
where Self::Item: Signal,
Self: Sized {
Flatten {
signal: Some(self),
inner: None,
}
}
#[inline]
fn switch<A, B>(self, callback: B) -> Switch<Self, A, B>
where A: Signal,
B: FnMut(Self::Item) -> A,
Self: Sized {
Switch {
inner: self.map(callback).flatten()
}
}
#[inline]
fn switch_signal_vec<A, F>(self, callback: F) -> SwitchSignalVec<Self, A, F>
where A: SignalVec,
F: FnMut(Self::Item) -> A,
Self: Sized {
SwitchSignalVec {
signal: Some(self),
signal_vec: None,
callback,
is_empty: true,
pending: None,
}
}
#[inline]
fn for_each<U, F>(self, callback: F) -> ForEach<Self, U, F>
where U: Future<Output = ()>,
F: FnMut(Self::Item) -> U,
Self: Sized {
ForEach {
inner: SignalStream {
signal: self,
}.for_each(callback)
}
}
#[inline]
fn to_signal_vec(self) -> SignalSignalVec<Self>
where Self: Sized {
SignalSignalVec {
signal: self
}
}
#[inline]
fn wait_for(self, value: Self::Item) -> WaitFor<Self>
where Self::Item: PartialEq,
Self: Sized {
WaitFor {
signal: self,
value: value,
}
}
#[inline]
fn first(self) -> First<Self> where Self: Sized {
First {
signal: Some(self),
}
}
#[inline]
fn poll_change_unpin(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> where Self: Unpin + Sized {
Pin::new(self).poll_change(cx)
}
}
impl<T: ?Sized> SignalExt for T where T: Signal {}
#[inline]
pub fn not<A>(signal: A) -> impl Signal<Item = bool>
where A: Signal<Item = bool> {
signal.map(|x| !x)
}
#[inline]
pub fn and<A, B>(left: A, right: B) -> impl Signal<Item = bool>
where A: Signal<Item = bool>,
B: Signal<Item = bool> {
Map2::new(left, right, |a, b| *a && *b)
}
#[inline]
pub fn or<A, B>(left: A, right: B) -> impl Signal<Item = bool>
where A: Signal<Item = bool>,
B: Signal<Item = bool> {
Map2::new(left, right, |a, b| *a || *b)
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FromFuture<A> {
#[pin]
future: Option<A>,
first: bool,
}
impl<A> Signal for FromFuture<A> where A: Future {
type Item = Option<A::Output>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match this.future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
None => {
Poll::Ready(None)
},
Some(Poll::Ready(value)) => {
this.future.set(None);
Poll::Ready(Some(Some(value)))
},
Some(Poll::Pending) => {
if *this.first {
*this.first = false;
Poll::Ready(Some(None))
} else {
Poll::Pending
}
},
}
}
}
#[inline]
pub fn from_future<A>(future: A) -> FromFuture<A> where A: Future {
FromFuture { future: Some(future), first: true }
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FromStream<A> {
#[pin]
stream: A,
first: bool,
}
impl<A> Signal for FromStream<A> where A: Stream {
type Item = Option<A::Item>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
Poll::Ready(None)
},
Poll::Ready(Some(value)) => {
Poll::Ready(Some(Some(value)))
},
Poll::Pending => {
if *this.first {
*this.first = false;
Poll::Ready(Some(None))
} else {
Poll::Pending
}
},
}
}
}
#[inline]
pub fn from_stream<A>(stream: A) -> FromStream<A> where A: Stream {
FromStream { stream, first: true }
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Always<A> {
value: Option<A>,
}
impl<A> Unpin for Always<A> {}
impl<A> Signal for Always<A> {
type Item = A;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(self.value.take())
}
}
#[inline]
pub fn always<A>(value: A) -> Always<A> {
Always {
value: Some(value),
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct First<A> {
#[pin]
signal: Option<A>,
}
impl<A> Signal for First<A> where A: Signal {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(poll) = this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
this.signal.set(None);
poll
} else {
Poll::Ready(None)
}
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Switch<A, B, C> where A: Signal, C: FnMut(A::Item) -> B {
#[pin]
inner: Flatten<Map<A, C>>,
}
impl<A, B, C> Signal for Switch<A, B, C>
where A: Signal,
B: Signal,
C: FnMut(A::Item) -> B {
type Item = B::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().inner.poll_change(cx)
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct ForEach<A, B, C> {
#[pin]
inner: stream::ForEach<SignalStream<A>, B, C>,
}
impl<A, B, C> Future for ForEach<A, B, C>
where A: Signal,
B: Future<Output = ()>,
C: FnMut(A::Item) -> B {
type Output = ();
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Streams do nothing unless polled"]
pub struct SignalStream<A> {
#[pin]
signal: A,
}
impl<A: Signal> Stream for SignalStream<A> {
type Item = A::Item;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().signal.poll_change(cx)
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct SignalFuture<A> where A: Signal {
#[pin]
signal: A,
value: Option<A::Item>,
}
impl<A> Future for SignalFuture<A> where A: Signal {
type Output = A::Item;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = self.project();
loop {
return match this.signal.as_mut().poll_change(cx) {
Poll::Ready(None) => {
Poll::Ready(this.value.take().unwrap())
},
Poll::Ready(Some(new_value)) => {
*this.value = Some(new_value);
continue;
},
Poll::Pending => {
Poll::Pending
},
}
}
}
}
#[pin_project(project = MapProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Map<A, B> {
#[pin]
signal: A,
callback: B,
}
impl<A, B, C> Signal for Map<A, B>
where A: Signal,
B: FnMut(A::Item) -> C {
type Item = C;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let MapProj { signal, callback } = self.project();
signal.poll_change(cx).map(|opt| opt.map(|value| callback(value)))
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Inspect<A, B> {
#[pin]
signal: A,
callback: B,
}
impl<A, B> Signal for Inspect<A, B>
where A: Signal,
B: FnMut(&A::Item) {
type Item = A::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll = this.signal.poll_change(cx);
if let Poll::Ready(Some(ref value)) = poll {
(this.callback)(value);
}
poll
}
}
#[pin_project(project = MapFutureProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct MapFuture<A, B, C> {
#[pin]
signal: Option<A>,
#[pin]
future: Option<B>,
callback: C,
first: bool,
}
impl<A, B, C> Signal for MapFuture<A, B, C>
where A: Signal,
B: Future,
C: FnMut(A::Item) -> B {
type Item = Option<B::Output>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let MapFutureProj { mut signal, mut future, callback, first } = self.project();
let mut done = false;
loop {
match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => {
done = true;
},
Some(Poll::Ready(None)) => {
signal.set(None);
done = true;
},
Some(Poll::Ready(Some(value))) => {
let value = Some(callback(value));
future.set(value);
continue;
},
Some(Poll::Pending) => {},
}
break;
}
match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
None => {},
Some(Poll::Ready(value)) => {
future.set(None);
*first = false;
return Poll::Ready(Some(Some(value)));
},
Some(Poll::Pending) => {
done = false;
},
}
if *first {
*first = false;
Poll::Ready(Some(None))
} else if done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
#[pin_project(project = ThrottleProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Throttle<A, B, C> where A: Signal {
#[pin]
signal: Option<A>,
#[pin]
future: Option<B>,
callback: C,
}
impl<A, B, C> Signal for Throttle<A, B, C>
where A: Signal,
B: Future<Output = ()>,
C: FnMut() -> B {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let ThrottleProj { mut signal, mut future, callback } = self.project();
match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
None => {},
Some(Poll::Ready(())) => {
future.set(None);
},
Some(Poll::Pending) => {
return Poll::Pending;
},
}
match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => {
Poll::Ready(None)
},
Some(Poll::Ready(None)) => {
signal.set(None);
Poll::Ready(None)
},
Some(Poll::Ready(Some(value))) => {
future.set(Some(callback()));
if let Some(Poll::Ready(())) = future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
future.set(None);
}
Poll::Ready(Some(value))
},
Some(Poll::Pending) => {
Poll::Pending
},
}
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct WaitFor<A>
where A: Signal,
A::Item: PartialEq {
#[pin]
signal: A,
value: A::Item,
}
impl<A> Future for WaitFor<A>
where A: Signal,
A::Item: PartialEq {
type Output = Option<A::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let poll = this.signal.as_mut().poll_change(cx);
if let Poll::Ready(Some(ref new_value)) = poll {
if new_value != this.value {
continue;
}
}
return poll;
}
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "SignalVecs do nothing unless polled"]
pub struct SignalSignalVec<A> {
#[pin]
signal: A,
}
impl<A, B> SignalVec for SignalSignalVec<A>
where A: Signal<Item = Vec<B>> {
type Item = B;
#[inline]
fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
self.project().signal.poll_change(cx).map(|opt| opt.map(|values| VecDiff::Replace { values }))
}
}
macro_rules! dedupe {
($signal:expr, $cx:expr, $value:expr, $pat:pat, $name:ident => $output:expr) => {
loop {
return match $signal.as_mut().poll_change($cx) {
Poll::Ready(Some($pat)) => {
let has_changed = match $value {
Some(ref old_value) => *old_value != $name,
None => true,
};
if has_changed {
let output = $output;
*$value = Some($name);
Poll::Ready(Some(output))
} else {
continue;
}
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
};
}
#[pin_project(project = DedupeMapProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct DedupeMap<A, B> where A: Signal {
old_value: Option<A::Item>,
#[pin]
signal: A,
callback: B,
}
impl<A, B, C> Signal for DedupeMap<A, B>
where A: Signal,
A::Item: PartialEq,
B: FnMut(&mut A::Item) -> C {
type Item = C;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let DedupeMapProj { old_value, mut signal, callback } = self.project();
dedupe!(signal, cx, old_value, mut value, value => callback(&mut value))
}
}
#[pin_project(project = DedupeProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Dedupe<A> where A: Signal {
old_value: Option<A::Item>,
#[pin]
signal: A,
}
impl<A> Signal for Dedupe<A>
where A: Signal,
A::Item: PartialEq + Copy {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let DedupeProj { old_value, mut signal } = self.project();
dedupe!(signal, cx, old_value, value, value => value)
}
}
#[pin_project(project = DedupeClonedProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct DedupeCloned<A> where A: Signal {
old_value: Option<A::Item>,
#[pin]
signal: A,
}
impl<A> Signal for DedupeCloned<A>
where A: Signal,
A::Item: PartialEq + Clone {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let DedupeClonedProj { old_value, mut signal } = self.project();
dedupe!(signal, cx, old_value, value, value => value.clone())
}
}
#[pin_project(project = FilterMapProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FilterMap<A, B> {
#[pin]
signal: A,
callback: B,
first: bool,
}
impl<A, B, C> Signal for FilterMap<A, B>
where A: Signal,
B: FnMut(A::Item) -> Option<C> {
type Item = Option<C>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let FilterMapProj { mut signal, callback, first } = self.project();
loop {
return match signal.as_mut().poll_change(cx) {
Poll::Ready(Some(value)) => match callback(value) {
Some(value) => {
*first = false;
Poll::Ready(Some(Some(value)))
},
None => {
if *first {
*first = false;
Poll::Ready(Some(None))
} else {
continue;
}
},
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
}
#[pin_project(project = FlattenProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Flatten<A> where A: Signal {
#[pin]
signal: Option<A>,
#[pin]
inner: Option<A::Item>,
}
impl<A> Signal for Flatten<A>
where A: Signal,
A::Item: Signal {
type Item = <A::Item as Signal>::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let FlattenProj { mut signal, mut inner } = self.project();
let done = match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => true,
Some(Poll::Ready(None)) => {
signal.set(None);
true
},
Some(Poll::Ready(Some(new_inner))) => {
inner.set(Some(new_inner));
false
},
Some(Poll::Pending) => false,
};
match inner.as_mut().as_pin_mut().map(|inner| inner.poll_change(cx)) {
Some(Poll::Ready(None)) => {
inner.set(None);
},
Some(poll) => {
return poll;
},
None => {},
}
if done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
#[pin_project(project = SwitchSignalVecProj)]
#[derive(Debug)]
#[must_use = "SignalVecs do nothing unless polled"]
pub struct SwitchSignalVec<A, B, C> where B: SignalVec {
#[pin]
signal: Option<A>,
#[pin]
signal_vec: Option<B>,
callback: C,
is_empty: bool,
pending: Option<VecDiff<B::Item>>,
}
impl<A, B, C> SignalVec for SwitchSignalVec<A, B, C>
where A: Signal,
B: SignalVec,
C: FnMut(A::Item) -> B {
type Item = B::Item;
fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
let SwitchSignalVecProj { mut signal, mut signal_vec, callback, is_empty, pending } = self.project();
match pending.take() {
Some(value) => Poll::Ready(Some(value)),
None => {
let mut signal_value = None;
let signal_done = loop {
break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => {
Poll::Ready(None)
},
Some(Poll::Pending) => {
Poll::Pending
},
Some(Poll::Ready(None)) => {
signal.set(None);
Poll::Ready(None)
},
Some(Poll::Ready(Some(value))) => {
signal_value = Some(value);
continue;
},
}
};
fn done<A>(is_empty: &mut bool, signal_done: Poll<Option<VecDiff<A>>>) -> Poll<Option<VecDiff<A>>> {
if *is_empty {
signal_done
} else {
*is_empty = true;
Poll::Ready(Some(VecDiff::Replace { values: vec![] }))
}
}
fn replace<A>(is_empty: &mut bool, values: Vec<A>) -> Poll<Option<VecDiff<A>>> {
let new_is_empty = values.is_empty();
if *is_empty && new_is_empty {
Poll::Pending
} else {
*is_empty = new_is_empty;
Poll::Ready(Some(VecDiff::Replace { values }))
}
}
if let Some(value) = signal_value {
signal_vec.set(Some(callback(value)));
match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
None => {
done(is_empty, signal_done)
},
Some(Poll::Pending) => {
done(is_empty, Poll::Pending)
},
Some(Poll::Ready(None)) => {
signal_vec.set(None);
done(is_empty, signal_done)
},
Some(Poll::Ready(Some(VecDiff::Replace { values }))) => {
replace(is_empty, values)
},
Some(Poll::Ready(Some(vec_diff))) => {
if *is_empty {
*is_empty = false;
Poll::Ready(Some(vec_diff))
} else {
*pending = Some(vec_diff);
*is_empty = true;
Poll::Ready(Some(VecDiff::Replace { values: vec![] }))
}
},
}
} else {
match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
None => {
signal_done
},
Some(Poll::Pending) => {
Poll::Pending
},
Some(Poll::Ready(None)) => {
signal_vec.set(None);
signal_done
},
Some(Poll::Ready(Some(VecDiff::Replace { values }))) => {
replace(is_empty, values)
},
Some(Poll::Ready(Some(vec_diff))) => {
*is_empty = false;
Poll::Ready(Some(vec_diff))
},
}
}
},
}
}
}