use std::pin::Pin;
use std::marker::Unpin;
use std::future::Future;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_util::stream;
use futures_util::stream::StreamExt;
use crate::internal::Map2;
use crate::signal_vec::{VecDiff, SignalVec};
#[must_use = "Signals do nothing unless polled"]
pub trait Signal {
type Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
}
impl<'a, A> Signal for &'a mut A where A: ?Sized + Signal + Unpin {
type Item = A::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
A::poll_change(Pin::new(&mut **self), cx)
}
}
impl<A> Signal for Box<A> where A: ?Sized + Signal + Unpin {
type Item = A::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
A::poll_change(Pin::new(&mut *self), cx)
}
}
impl<A> Signal for Pin<A>
where A: Unpin + ::std::ops::DerefMut,
A::Target: Signal {
type Item = <<A as ::std::ops::Deref>::Target as Signal>::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::get_mut(self).as_mut().poll_change(cx)
}
}
pub trait SignalExt: Signal {
#[inline]
fn to_stream(self) -> SignalStream<Self>
where Self: Sized {
SignalStream {
signal: self,
}
}
#[inline]
fn to_future(self) -> SignalFuture<Self>
where Self: Sized {
SignalFuture {
signal: self,
value: None,
}
}
#[inline]
fn map<A, B>(self, callback: B) -> Map<Self, B>
where B: FnMut(Self::Item) -> A,
Self: Sized {
Map {
signal: self,
callback,
}
}
#[inline]
fn inspect<A>(self, callback: A) -> Inspect<Self, A>
where A: FnMut(&Self::Item),
Self: Sized {
Inspect {
signal: self,
callback,
}
}
#[inline]
fn dedupe_map<A, B>(self, callback: B) -> DedupeMap<Self, B>
where B: FnMut(&mut Self::Item) -> A,
Self: Sized {
DedupeMap {
old_value: None,
signal: self,
callback,
}
}
#[inline]
fn dedupe(self) -> Dedupe<Self> where Self: Sized {
Dedupe {
old_value: None,
signal: self,
}
}
#[inline]
fn dedupe_cloned(self) -> DedupeCloned<Self> where Self: Sized {
DedupeCloned {
old_value: None,
signal: self,
}
}
#[inline]
fn map_future<A, B>(self, callback: B) -> MapFuture<Self, A, B>
where A: Future,
B: FnMut(Self::Item) -> A,
Self: Sized {
MapFuture {
signal: Some(self),
future: None,
callback,
first: true,
}
}
#[inline]
fn filter_map<A, B>(self, callback: B) -> FilterMap<Self, B>
where B: FnMut(Self::Item) -> Option<A>,
Self: Sized {
FilterMap {
signal: self,
callback,
first: true,
}
}
#[inline]
fn flatten(self) -> Flatten<Self>
where Self::Item: Signal,
Self: Sized {
Flatten {
signal: Some(self),
inner: None,
}
}
#[inline]
fn switch<A, B>(self, callback: B) -> Switch<Self, A, B>
where A: Signal,
B: FnMut(Self::Item) -> A,
Self: Sized {
Switch {
inner: self.map(callback).flatten()
}
}
#[inline]
fn for_each<U, F>(self, callback: F) -> ForEach<Self, U, F>
where U: Future<Output = ()>,
F: FnMut(Self::Item) -> U,
Self: Sized {
ForEach {
inner: SignalStream {
signal: self,
}.for_each(callback)
}
}
#[inline]
fn to_signal_vec(self) -> SignalSignalVec<Self>
where Self: Sized {
SignalSignalVec {
signal: self
}
}
#[inline]
fn wait_for(self, value: Self::Item) -> WaitFor<Self>
where Self::Item: PartialEq,
Self: Sized {
WaitFor {
signal: self,
value: value,
}
}
#[inline]
fn first(self) -> First<Self> where Self: Sized {
First {
signal: Some(self),
}
}
#[inline]
fn poll_change_unpin(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> where Self: Unpin + Sized {
Pin::new(self).poll_change(cx)
}
}
impl<T: ?Sized> SignalExt for T where T: Signal {}
#[inline]
pub fn not<A>(signal: A) -> impl Signal<Item = bool>
where A: Signal<Item = bool> {
signal.map(|x| !x)
}
#[inline]
pub fn and<A, B>(left: A, right: B) -> impl Signal<Item = bool>
where A: Signal<Item = bool>,
B: Signal<Item = bool> {
Map2::new(left, right, |a, b| *a && *b)
}
#[inline]
pub fn or<A, B>(left: A, right: B) -> impl Signal<Item = bool>
where A: Signal<Item = bool>,
B: Signal<Item = bool> {
Map2::new(left, right, |a, b| *a || *b)
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FromFuture<A> {
future: Option<A>,
first: bool,
}
impl<A> Unpin for FromFuture<A> where A: Unpin {}
impl<A> Signal for FromFuture<A> where A: Future {
type Item = Option<A::Output>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin future,
mut first,
});
match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
None => {
Poll::Ready(None)
},
Some(Poll::Ready(value)) => {
future.set(None);
Poll::Ready(Some(Some(value)))
},
Some(Poll::Pending) => {
if *first {
*first = false;
Poll::Ready(Some(None))
} else {
Poll::Pending
}
},
}
}
}
#[inline]
pub fn from_future<A>(future: A) -> FromFuture<A> where A: Future {
FromFuture { future: Some(future), first: true }
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FromStream<A> {
stream: A,
first: bool,
}
impl<A> Unpin for FromStream<A> where A: Unpin {}
impl<A> Signal for FromStream<A> where A: Stream {
type Item = Option<A::Item>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin stream,
mut first,
});
match stream.poll_next(cx) {
Poll::Ready(None) => {
Poll::Ready(None)
},
Poll::Ready(Some(value)) => {
Poll::Ready(Some(Some(value)))
},
Poll::Pending => {
if *first {
*first = false;
Poll::Ready(Some(None))
} else {
Poll::Pending
}
},
}
}
}
#[inline]
pub fn from_stream<A>(stream: A) -> FromStream<A> where A: Stream {
FromStream { stream, first: true }
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Always<A> {
value: Option<A>,
}
impl<A> Unpin for Always<A> {}
impl<A> Signal for Always<A> {
type Item = A;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(self.value.take())
}
}
#[inline]
pub fn always<A>(value: A) -> Always<A> {
Always {
value: Some(value),
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct First<A> {
signal: Option<A>,
}
impl<A> Unpin for First<A> where A: Unpin {}
impl<A> Signal for First<A> where A: Signal {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
});
if let Some(poll) = signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
signal.set(None);
poll
} else {
Poll::Ready(None)
}
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Switch<A, B, C> where A: Signal, C: FnMut(A::Item) -> B {
inner: Flatten<Map<A, C>>,
}
impl<A, B, C> Unpin for Switch<A, B, C> where A: Signal + Unpin, B: Unpin, C: FnMut(A::Item) -> B {}
impl<A, B, C> Signal for Switch<A, B, C>
where A: Signal,
B: Signal,
C: FnMut(A::Item) -> B {
type Item = B::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin inner,
});
inner.poll_change(cx)
}
}
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct ForEach<A, B, C> {
inner: stream::ForEach<SignalStream<A>, B, C>,
}
impl<A, B, C> Unpin for ForEach<A, B, C> where A: Signal + Unpin, B: Future<Output = ()> + Unpin, C: FnMut(A::Item) -> B {}
impl<A, B, C> Future for ForEach<A, B, C>
where A: Signal,
B: Future<Output = ()>,
C: FnMut(A::Item) -> B {
type Output = ();
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe_project!(self => {
pin inner,
});
inner.poll(cx)
}
}
#[derive(Debug)]
#[must_use = "Streams do nothing unless polled"]
pub struct SignalStream<A> {
signal: A,
}
impl<A> Unpin for SignalStream<A> where A: Unpin {}
impl<A: Signal> Stream for SignalStream<A> {
type Item = A::Item;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
});
signal.poll_change(cx)
}
}
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct SignalFuture<A> where A: Signal {
signal: A,
value: Option<A::Item>,
}
impl<A> Unpin for SignalFuture<A> where A: Unpin + Signal {}
impl<A> Future for SignalFuture<A> where A: Signal {
type Output = A::Item;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe_project!(self => {
pin signal,
mut value,
});
loop {
return match signal.as_mut().poll_change(cx) {
Poll::Ready(None) => {
Poll::Ready(value.take().unwrap())
},
Poll::Ready(Some(new_value)) => {
*value = Some(new_value);
continue;
},
Poll::Pending => {
Poll::Pending
},
}
}
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Map<A, B> {
signal: A,
callback: B,
}
impl<A, B> Unpin for Map<A, B> where A: Unpin {}
impl<A, B, C> Signal for Map<A, B>
where A: Signal,
B: FnMut(A::Item) -> C {
type Item = C;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
mut callback,
});
signal.poll_change(cx).map(|opt| opt.map(|value| callback(value)))
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Inspect<A, B> {
signal: A,
callback: B,
}
impl<A, B> Unpin for Inspect<A, B> where A: Unpin {}
impl<A, B> Signal for Inspect<A, B>
where A: Signal,
B: FnMut(&A::Item) {
type Item = A::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
mut callback,
});
let poll = signal.poll_change(cx);
if let Poll::Ready(Some(ref value)) = poll {
callback(value);
}
poll
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct MapFuture<A, B, C> {
signal: Option<A>,
future: Option<B>,
callback: C,
first: bool,
}
impl<A, B, C> Unpin for MapFuture<A, B, C> where A: Unpin, B: Unpin {}
impl<A, B, C> Signal for MapFuture<A, B, C>
where A: Signal,
B: Future,
C: FnMut(A::Item) -> B {
type Item = Option<B::Output>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
pin future,
mut callback,
mut first,
});
let mut done = false;
loop {
match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => {
done = true;
},
Some(Poll::Ready(None)) => {
signal.set(None);
done = true;
},
Some(Poll::Ready(Some(value))) => {
let value = Some(callback(value));
future.set(value);
continue;
},
Some(Poll::Pending) => {},
}
break;
}
match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
None => {},
Some(Poll::Ready(value)) => {
future.set(None);
*first = false;
return Poll::Ready(Some(Some(value)));
},
Some(Poll::Pending) => {
done = false;
},
}
if *first {
*first = false;
Poll::Ready(Some(None))
} else if done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
pub struct WaitFor<A>
where A: Signal,
A::Item: PartialEq {
signal: A,
value: A::Item,
}
impl<A> Unpin for WaitFor<A> where A: Unpin + Signal, A::Item: PartialEq {}
impl<A> Future for WaitFor<A>
where A: Signal,
A::Item: PartialEq {
type Output = Option<A::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe_project!(self => {
pin signal,
mut value,
});
loop {
let poll = signal.as_mut().poll_change(cx);
if let Poll::Ready(Some(ref new_value)) = poll {
if new_value != value {
continue;
}
}
return poll;
}
}
}
#[derive(Debug)]
#[must_use = "SignalVecs do nothing unless polled"]
pub struct SignalSignalVec<A> {
signal: A,
}
impl<A> Unpin for SignalSignalVec<A> where A: Unpin {}
impl<A, B> SignalVec for SignalSignalVec<A>
where A: Signal<Item = Vec<B>> {
type Item = B;
#[inline]
fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
unsafe_project!(self => {
pin signal,
});
signal.poll_change(cx).map(|opt| opt.map(|values| VecDiff::Replace { values }))
}
}
macro_rules! dedupe {
($signal:expr, $cx:expr, $value:expr, $pat:pat, $name:ident => $output:expr) => {
loop {
return match $signal.as_mut().poll_change($cx) {
Poll::Ready(Some($pat)) => {
let has_changed = match $value {
Some(ref old_value) => *old_value != $name,
None => true,
};
if has_changed {
let output = $output;
*$value = Some($name);
Poll::Ready(Some(output))
} else {
continue;
}
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
};
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct DedupeMap<A, B> where A: Signal {
old_value: Option<A::Item>,
signal: A,
callback: B,
}
impl<A, B> Unpin for DedupeMap<A, B> where A: Unpin + Signal {}
impl<A, B, C> Signal for DedupeMap<A, B>
where A: Signal,
A::Item: PartialEq,
B: FnMut(&mut A::Item) -> C {
type Item = C;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
mut old_value,
pin signal,
mut callback,
});
dedupe!(signal, cx, old_value, mut value, value => callback(&mut value))
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Dedupe<A> where A: Signal {
old_value: Option<A::Item>,
signal: A,
}
impl<A> Unpin for Dedupe<A> where A: Unpin + Signal {}
impl<A> Signal for Dedupe<A>
where A: Signal,
A::Item: PartialEq + Copy {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
mut old_value,
pin signal,
});
dedupe!(signal, cx, old_value, value, value => value)
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct DedupeCloned<A> where A: Signal {
old_value: Option<A::Item>,
signal: A,
}
impl<A> Unpin for DedupeCloned<A> where A: Unpin + Signal {}
impl<A> Signal for DedupeCloned<A>
where A: Signal,
A::Item: PartialEq + Clone {
type Item = A::Item;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
mut old_value,
pin signal,
});
dedupe!(signal, cx, old_value, value, value => value.clone())
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct FilterMap<A, B> {
signal: A,
callback: B,
first: bool,
}
impl<A, B> Unpin for FilterMap<A, B> where A: Unpin {}
impl<A, B, C> Signal for FilterMap<A, B>
where A: Signal,
B: FnMut(A::Item) -> Option<C> {
type Item = Option<C>;
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
mut callback,
mut first,
});
loop {
return match signal.as_mut().poll_change(cx) {
Poll::Ready(Some(value)) => match callback(value) {
Some(value) => {
*first = false;
Poll::Ready(Some(Some(value)))
},
None => {
if *first {
*first = false;
Poll::Ready(Some(None))
} else {
continue;
}
},
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
}
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct Flatten<A> where A: Signal {
signal: Option<A>,
inner: Option<A::Item>,
}
impl<A> Unpin for Flatten<A> where A: Unpin + Signal, A::Item: Unpin {}
impl<A> Signal for Flatten<A>
where A: Signal,
A::Item: Signal {
type Item = <A::Item as Signal>::Item;
#[inline]
fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe_project!(self => {
pin signal,
pin inner,
});
let done = match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
None => true,
Some(Poll::Ready(None)) => {
signal.set(None);
true
},
Some(Poll::Ready(Some(new_inner))) => {
inner.set(Some(new_inner));
false
},
Some(Poll::Pending) => false,
};
match inner.as_mut().as_pin_mut().map(|inner| inner.poll_change(cx)) {
Some(Poll::Ready(None)) => {
inner.set(None);
},
Some(poll) => {
return poll;
},
None => {},
}
if done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}