futures_signals/signal/
signal.rs

1use std::pin::Pin;
2use std::marker::Unpin;
3use std::future::Future;
4use std::task::{Context, Poll};
5use futures_core::stream::Stream;
6use futures_util::stream;
7use futures_util::stream::StreamExt;
8use pin_project::pin_project;
9
10use crate::signal::Broadcaster;
11use crate::signal_vec::{VecDiff, SignalVec};
12
13
14// TODO impl for AssertUnwindSafe ?
15// TODO documentation for Signal contract:
16// * a Signal must always return Poll::Ready(Some(...)) the first time it is polled, no exceptions
17// * after the first time it can then return Poll::Ready(None) which means that the Signal is ended (i.e. there won't be any future changes)
18// * or it can return Poll::Pending, which means the Signal hasn't changed from its previous value
19// * whenever the Signal's value has changed, it must call cx.waker().wake_by_ref() which will notify the consumer that the Signal has changed
20// * If wake_by_ref() hasn't been called, then the consumer assumes that nothing has changed, so it won't re-poll the Signal
21// * unlike Streams, the consumer does not poll again if it receives Poll::Ready(Some(...)), it will only repoll if wake_by_ref() is called
22// * If the Signal returns Poll::Ready(None) then the consumer must not re-poll the Signal
23#[must_use = "Signals do nothing unless polled"]
24pub trait Signal {
25    type Item;
26
27    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
28}
29
30
31// Copied from Future in the Rust stdlib
32impl<'a, A> Signal for &'a mut A where A: ?Sized + Signal + Unpin {
33    type Item = A::Item;
34
35    #[inline]
36    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
37        A::poll_change(Pin::new(&mut **self), cx)
38    }
39}
40
41// Copied from Future in the Rust stdlib
42impl<A> Signal for Box<A> where A: ?Sized + Signal + Unpin {
43    type Item = A::Item;
44
45    #[inline]
46    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
47        A::poll_change(Pin::new(&mut *self), cx)
48    }
49}
50
51// Copied from Future in the Rust stdlib
52impl<A> Signal for Pin<A>
53    where A: Unpin + ::std::ops::DerefMut,
54          A::Target: Signal {
55    type Item = <<A as ::std::ops::Deref>::Target as Signal>::Item;
56
57    #[inline]
58    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
59        Pin::get_mut(self).as_mut().poll_change(cx)
60    }
61}
62
63
64// TODO Seal this
65pub trait SignalExt: Signal {
66    /// Creates a `Stream` which contains the values of `self`.
67    ///
68    /// When the output `Stream` is spawned:
69    ///
70    /// 1. It immediately outputs the current value of `self`.
71    ///
72    /// 2. Whenever `self` changes it outputs the new value of `self`.
73    ///
74    /// Like *all* of the `Signal` methods, `to_stream` might skip intermediate changes.
75    /// So you ***cannot*** rely upon it containing every intermediate change.
76    /// But you ***can*** rely upon it always containing the most recent change.
77    ///
78    /// # Performance
79    ///
80    /// This is ***extremely*** efficient: it is *guaranteed* constant time, and it does not do
81    /// any heap allocation.
82    #[inline]
83    fn to_stream(self) -> SignalStream<Self>
84        where Self: Sized {
85        SignalStream {
86            signal: self,
87        }
88    }
89
90    // TODO maybe remove this ?
91    #[inline]
92    fn to_future(self) -> SignalFuture<Self>
93        where Self: Sized {
94        SignalFuture {
95            signal: self,
96            value: None,
97        }
98    }
99
100    /// Creates a `Signal` which uses a closure to transform the value.
101    ///
102    /// When the output `Signal` is spawned:
103    ///
104    /// 1. It calls the closure with the current value of `self`.
105    ///
106    /// 2. Then it puts the return value of the closure into the output `Signal`.
107    ///
108    /// 3. Whenever `self` changes it repeats the above steps.
109    ///
110    ///    This happens automatically and efficiently.
111    ///
112    /// It will call the closure at most once for each change in `self`.
113    ///
114    /// Like *all* of the `Signal` methods, `map` might skip intermediate changes.
115    /// So you ***cannot*** rely upon the closure being called for every intermediate change.
116    /// But you ***can*** rely upon it always being called with the most recent change.
117    ///
118    /// # Examples
119    ///
120    /// Add `1` to the value:
121    ///
122    /// ```rust
123    /// # use futures_signals::signal::{always, SignalExt};
124    /// # let input = always(1);
125    /// let mapped = input.map(|value| value + 1);
126    /// ```
127    ///
128    /// `mapped` will always contain the current value of `input`, except with `1` added to it.
129    ///
130    /// If `input` has the value `10`, then `mapped` will have the value `11`.
131    ///
132    /// If `input` has the value `5`, then `mapped` will have the value `6`, etc.
133    ///
134    /// ----
135    ///
136    /// Formatting to a `String`:
137    ///
138    /// ```rust
139    /// # use futures_signals::signal::{always, SignalExt};
140    /// # let input = always(1);
141    /// let mapped = input.map(|value| format!("{}", value));
142    /// ```
143    ///
144    /// `mapped` will always contain the current value of `input`, except formatted as a `String`.
145    ///
146    /// If `input` has the value `10`, then `mapped` will have the value `"10"`.
147    ///
148    /// If `input` has the value `5`, then `mapped` will have the value `"5"`, etc.
149    ///
150    /// # Performance
151    ///
152    /// This is ***extremely*** efficient: it is *guaranteed* constant time, and it does not do
153    /// any heap allocation.
154    #[inline]
155    fn map<A, B>(self, callback: B) -> Map<Self, B>
156        where B: FnMut(Self::Item) -> A,
157              Self: Sized {
158        Map {
159            signal: self,
160            callback,
161        }
162    }
163
164    #[inline]
165    fn inspect<A>(self, callback: A) -> Inspect<Self, A>
166        where A: FnMut(&Self::Item),
167              Self: Sized {
168        Inspect {
169            signal: self,
170            callback,
171        }
172    }
173
174    #[inline]
175    fn eq(self, value: Self::Item) -> Eq<Self>
176        where Self::Item: PartialEq,
177              Self: Sized {
178        Eq {
179            signal: self,
180            matches: None,
181            value,
182        }
183    }
184
185    #[inline]
186    fn neq(self, value: Self::Item) -> Neq<Self>
187        where Self::Item: PartialEq,
188              Self: Sized {
189        Neq {
190            signal: self,
191            matches: None,
192            value,
193        }
194    }
195
196    /// Creates a `Signal` which uses a closure to transform the value.
197    ///
198    /// This is exactly the same as `map`, except:
199    ///
200    /// 1. It calls the closure with a mutable reference to the input value.
201    ///
202    /// 2. If the new input value is the same as the old input value, it will ***not*** call the closure, instead
203    ///    it will completely ignore the new value, like as if it never happened.
204    ///
205    ///    It uses the `PartialEq` implementation to determine whether the new value is the same as the old value.
206    ///
207    ///    It only keeps track of the most recent value: that means that it ***won't*** call the closure for consecutive
208    ///    duplicates, however it ***will*** call the closure for non-consecutive duplicates.
209    ///
210    /// Because `dedupe_map` has the same behavior as `map`, it is useful solely as a performance optimization.
211    ///
212    /// # Performance
213    ///
214    /// The performance is the same as `map`, except with an additional call to `eq`.
215    ///
216    /// If the `eq` call is fast, then `dedupe_map` can be faster than `map`, because it doesn't call the closure
217    /// when the new and old values are the same, and it also doesn't update any child Signals.
218    ///
219    /// On the other hand, if the `eq` call is slow, then `dedupe_map` is probably slower than `map`.
220    #[inline]
221    fn dedupe_map<A, B>(self, callback: B) -> DedupeMap<Self, B>
222        // TODO should this use & instead of &mut ?
223        where B: FnMut(&mut Self::Item) -> A,
224              Self::Item: PartialEq,
225              Self: Sized {
226        DedupeMap {
227            old_value: None,
228            signal: self,
229            callback,
230        }
231    }
232
233    #[inline]
234    fn dedupe(self) -> Dedupe<Self>
235        where Self::Item: PartialEq,
236              Self: Sized {
237        Dedupe {
238            old_value: None,
239            signal: self,
240        }
241    }
242
243    #[inline]
244    fn dedupe_cloned(self) -> DedupeCloned<Self>
245        where Self::Item: PartialEq,
246              Self: Sized {
247        DedupeCloned {
248            old_value: None,
249            signal: self,
250        }
251    }
252
253    /// Creates a `Signal` which uses a closure to asynchronously transform the value.
254    ///
255    /// When the output `Signal` is spawned:
256    ///
257    /// 1. It calls the closure with the current value of `self`.
258    ///
259    /// 2. The closure returns a `Future`. It waits for that `Future` to finish, and then
260    ///    it puts the return value of the `Future` into the output `Signal`.
261    ///
262    /// 3. Whenever `self` changes it repeats the above steps.
263    ///
264    /// It will call the closure at most once for each change in `self`.
265    ///
266    /// Because Signals must always have a current value, if the `Future` is not ready yet, then the
267    /// output `Signal` will start with the value `None`. When the `Future` finishes it then changes
268    /// to `Some`. This can be used to detect whether the `Future` has finished or not.
269    ///
270    /// If `self` changes before the old `Future` is finished, it will cancel the old `Future`.
271    /// That means if `self` changes faster than the `Future`, then it will never output any values.
272    ///
273    /// Like *all* of the `Signal` methods, `map_future` might skip intermediate changes.
274    /// So you ***cannot*** rely upon the closure being called for every intermediate change.
275    /// But you ***can*** rely upon it always being called with the most recent change.
276    ///
277    /// # Examples
278    ///
279    /// Call an asynchronous network API whenever the input changes:
280    ///
281    /// ```rust
282    /// # use futures_signals::signal::{always, SignalExt};
283    /// # use futures_util::future::{ready, Ready};
284    /// # fn call_network_api(value: u32) -> Ready<()> { ready(()) }
285    /// # fn main() {
286    /// # let input = always(1);
287    /// #
288    /// let mapped = input.map_future(|value| call_network_api(value));
289    /// # }
290    /// ```
291    ///
292    /// # Performance
293    ///
294    /// This is ***extremely*** efficient: it does not do any heap allocation, and it has *very* little overhead.
295    ///
296    /// Of course the performance will also depend upon the `Future` which is returned from the closure.
297    #[inline]
298    fn map_future<A, B>(self, callback: B) -> MapFuture<Self, A, B>
299        where A: Future,
300              B: FnMut(Self::Item) -> A,
301              Self: Sized {
302        MapFuture {
303            signal: Some(self),
304            future: None,
305            callback,
306            first: true,
307        }
308    }
309
310    /// Creates a `Signal` which uses a closure to filter and transform the value.
311    ///
312    /// When the output `Signal` is spawned:
313    ///
314    /// 1. The output `Signal` starts with the value `None`.
315    ///
316    /// 2. It calls the closure with the current value of `self`.
317    ///
318    /// 3. If the closure returns `Some`, then it puts the return value of the closure into the output `Signal`.
319    ///
320    /// 4. If the closure returns `None`, then it does nothing.
321    ///
322    /// 5. Whenever `self` changes it repeats steps 2 - 4.
323    ///
324    /// The output `Signal` will only be `None` for the initial value. After that it will always be `Some`.
325    ///
326    /// If the closure returns `Some` for the initial value, then the output `Signal` will never be `None`.
327    ///
328    /// It will call the closure at most once for each change in `self`.
329    ///
330    /// Like *all* of the `Signal` methods, `filter_map` might skip intermediate changes.
331    /// So you ***cannot*** rely upon the closure being called for every intermediate change.
332    /// But you ***can*** rely upon it always being called with the most recent change.
333    ///
334    /// # Examples
335    ///
336    /// Add `1` to the value, but only if the value is less than `5`:
337    ///
338    /// ```rust
339    /// # use futures_signals::signal::{always, SignalExt};
340    /// # let input = always(1);
341    /// let mapped = input.filter_map(|value| {
342    ///     if value < 5 {
343    ///         Some(value + 1)
344    ///
345    ///     } else {
346    ///         None
347    ///     }
348    /// });
349    /// ```
350    ///
351    /// If the initial value of `input` is `5` or greater then `mapped` will be `None`.
352    ///
353    /// If the current value of `input` is `5` or greater then `mapped` will keep its old value.
354    ///
355    /// Otherwise `mapped` will be `Some(input + 1)`.
356    ///
357    /// # Performance
358    ///
359    /// This is ***extremely*** efficient: it does not do any heap allocation, and it has *very* little overhead.
360    #[inline]
361    fn filter_map<A, B>(self, callback: B) -> FilterMap<Self, B>
362        where B: FnMut(Self::Item) -> Option<A>,
363              Self: Sized {
364        FilterMap {
365            signal: self,
366            callback,
367            first: true,
368        }
369    }
370
371    /// Creates a `Signal` which delays updates until a `Future` finishes.
372    ///
373    /// This can be used to throttle a `Signal` so that it only updates once every X seconds.
374    ///
375    /// If multiple updates happen while it's being delayed, it will only output the most recent
376    /// value.
377    ///
378    /// # Examples
379    ///
380    /// Wait 1 second between each update:
381    ///
382    /// ```rust
383    /// # use core::future::Future;
384    /// # use futures_signals::signal::{always, SignalExt};
385    /// # fn sleep(ms: i32) -> impl Future<Output = ()> { async {} }
386    /// # let input = always(1);
387    /// let output = input.throttle(|| sleep(1_000));
388    /// ```
389    ///
390    /// # Performance
391    ///
392    /// This is ***extremely*** efficient: it does not do any heap allocation, and it has *very* little overhead.
393    #[inline]
394    fn throttle<A, B>(self, callback: B) -> Throttle<Self, A, B>
395        where A: Future<Output = ()>,
396              B: FnMut() -> A,
397              Self: Sized {
398        Throttle {
399            signal: Some(self),
400            future: None,
401            callback,
402        }
403    }
404
405    /// Creates a `Signal` which flattens `self`.
406    ///
407    /// When the output `Signal` is spawned:
408    ///
409    /// 1. It retrieves the current value of `self` (this value is also a `Signal`).
410    ///
411    /// 2. Then it puts the current value of the inner `Signal` into the output `Signal`.
412    ///
413    /// 3. Whenever the inner `Signal` changes it puts the new value into the output `Signal`.
414    ///
415    /// 4. Whenever `self` changes it repeats the above steps.
416    ///
417    ///    This happens automatically and efficiently.
418    ///
419    /// Like *all* of the `Signal` methods, `flatten` might skip intermediate changes.
420    /// So you ***cannot*** rely upon it containing every intermediate change.
421    /// But you ***can*** rely upon it always containing the most recent change.
422    ///
423    /// # Performance
424    ///
425    /// This is very efficient: it is *guaranteed* constant time, and it does not do
426    /// any heap allocation.
427    #[inline]
428    fn flatten(self) -> Flatten<Self>
429        where Self::Item: Signal,
430              Self: Sized {
431        Flatten {
432            signal: Some(self),
433            inner: None,
434        }
435    }
436
437    #[inline]
438    fn switch<A, B>(self, callback: B) -> Switch<Self, A, B>
439        where A: Signal,
440              B: FnMut(Self::Item) -> A,
441              Self: Sized {
442        Switch {
443            inner: self.map(callback).flatten()
444        }
445    }
446
447    #[inline]
448    fn switch_signal_vec<A, F>(self, callback: F) -> SwitchSignalVec<Self, A, F>
449        where A: SignalVec,
450              F: FnMut(Self::Item) -> A,
451              Self: Sized {
452        SwitchSignalVec {
453            signal: Some(self),
454            signal_vec: None,
455            callback,
456            len: 0,
457        }
458    }
459
460    /// Creates a `Stream` which samples the value of `self` whenever the `Stream` has a new value.
461    ///
462    /// # Performance
463    ///
464    /// This is ***extremely*** efficient: it does not do any heap allocation, and it has *very* little overhead.
465    #[inline]
466    fn sample_stream_cloned<A>(self, stream: A) -> SampleStreamCloned<Self, A>
467        where A: Stream,
468              A::Item: Clone,
469              Self: Sized {
470        SampleStreamCloned {
471            signal: Some(self),
472            stream: stream,
473            value: None,
474        }
475    }
476
477    #[inline]
478    // TODO file Rust bug about bad error message when `callback` isn't marked as `mut`
479    fn for_each<U, F>(self, callback: F) -> ForEach<Self, U, F>
480        where U: Future<Output = ()>,
481              F: FnMut(Self::Item) -> U,
482              Self: Sized {
483        // TODO a bit hacky
484        ForEach {
485            inner: SignalStream {
486                signal: self,
487            }.for_each(callback)
488        }
489    }
490
491    #[inline]
492    fn to_signal_vec(self) -> SignalSignalVec<Self>
493        where Self: Sized {
494        SignalSignalVec {
495            signal: self
496        }
497    }
498
499    #[inline]
500    fn wait_for(self, value: Self::Item) -> WaitFor<Self>
501        where Self::Item: PartialEq,
502              Self: Sized {
503        WaitFor {
504            signal: self,
505            value: value,
506        }
507    }
508
509    #[inline]
510    fn first(self) -> First<Self> where Self: Sized {
511        First {
512            signal: Some(self),
513        }
514    }
515
516    /// Conditionally stops the `Signal`.
517    ///
518    /// For each value in `self` it will call the `test` function.
519    ///
520    /// If `test` returns `true` then the `Signal` will stop emitting
521    /// any future values.
522    ///
523    /// The value which is passed to `test` is always emitted no matter
524    /// what.
525    ///
526    /// # Examples
527    ///
528    /// ```rust
529    /// # use futures_signals::signal::{always, SignalExt};
530    /// # let input = always(1);
531    /// // Stops the signal when x is above 5
532    /// let output = input.stop_if(|x| *x > 5);
533    /// ```
534    ///
535    /// # Performance
536    ///
537    /// This is ***extremely*** efficient: it is *guaranteed* constant time, and it does not do
538    /// any heap allocation.
539    #[inline]
540    fn stop_if<F>(self, test: F) -> StopIf<Self, F>
541        where F: FnMut(&Self::Item) -> bool,
542              Self: Sized {
543        StopIf {
544            signal: self,
545            stopped: false,
546            test,
547        }
548    }
549
550
551    #[inline]
552    #[track_caller]
553    #[cfg(feature = "debug")]
554    fn debug(self) -> SignalDebug<Self> where Self: Sized, Self::Item: std::fmt::Debug {
555        SignalDebug {
556            signal: self,
557            location: std::panic::Location::caller(),
558        }
559    }
560
561    /// A convenience method for calling [`Broadcaster::new`].
562    ///
563    /// This allows you to clone / split a `Signal` into multiple `Signal`s.
564    ///
565    /// See the documentation for [`Broadcaster`] for more details.
566    #[inline]
567    fn broadcast(self) -> Broadcaster<Self> where Self: Sized {
568        Broadcaster::new(self)
569    }
570
571    /// A convenience for calling `Signal::poll_change` on `Unpin` types.
572    #[inline]
573    fn poll_change_unpin(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> where Self: Unpin + Sized {
574        Pin::new(self).poll_change(cx)
575    }
576
577    #[inline]
578    fn boxed<'a>(self) -> Pin<Box<dyn Signal<Item = Self::Item> + Send + 'a>>
579        where Self: Sized + Send + 'a {
580        Box::pin(self)
581    }
582
583    #[inline]
584    fn boxed_local<'a>(self) -> Pin<Box<dyn Signal<Item = Self::Item> + 'a>>
585        where Self: Sized + 'a {
586        Box::pin(self)
587    }
588}
589
590// TODO why is this ?Sized
591impl<T: ?Sized> SignalExt for T where T: Signal {}
592
593
594/// An owned dynamically typed [`Signal`].
595///
596/// This is useful if you don't know the static type, or if you need
597/// indirection.
598pub type BoxSignal<'a, T> = Pin<Box<dyn Signal<Item = T> + Send + 'a>>;
599
600/// Same as [`BoxSignal`], but without the `Send` requirement.
601pub type LocalBoxSignal<'a, T> = Pin<Box<dyn Signal<Item = T> + 'a>>;
602
603
604// TODO make this into a method later
605#[inline]
606pub fn not<A>(signal: A) -> impl Signal<Item = bool>
607    where A: Signal<Item = bool> {
608    signal.map(|x| !x)
609}
610
611// TODO make this into a method later
612// TODO use short-circuiting if the left signal returns false ?
613#[inline]
614pub fn and<A, B>(left: A, right: B) -> impl Signal<Item = bool>
615    where A: Signal<Item = bool>,
616          B: Signal<Item = bool> {
617    crate::map_ref! {
618        let a = left,
619        let b = right =>
620        *a && *b
621    }
622}
623
624// TODO make this into a method later
625// TODO use short-circuiting if the left signal returns true ?
626#[inline]
627pub fn or<A, B>(left: A, right: B) -> impl Signal<Item = bool>
628    where A: Signal<Item = bool>,
629          B: Signal<Item = bool> {
630    crate::map_ref! {
631        let a = left,
632        let b = right =>
633        *a || *b
634    }
635}
636
637
638#[pin_project(project = MaybeSignalStateProj)]
639#[derive(Debug)]
640enum MaybeSignalState<S, E> {
641    Signal(#[pin] S),
642    Value(Option<E>),
643}
644
645impl<S, E> MaybeSignalState<S, E> where S: Signal {
646    fn poll<A, B, C>(self: Pin<&mut Self>, cx: &mut Context, map_signal: A, map_value: B) -> Poll<Option<C>>
647        where A: FnOnce(S::Item) -> C,
648              B: FnOnce(E) -> C {
649        match self.project() {
650            MaybeSignalStateProj::Signal(signal) => {
651                signal.poll_change(cx).map(|value| value.map(map_signal))
652            },
653            MaybeSignalStateProj::Value(value) => {
654                match value.take() {
655                    Some(value) => Poll::Ready(Some(map_value(value))),
656                    None => Poll::Ready(None),
657                }
658            },
659        }
660    }
661}
662
663
664#[pin_project]
665#[derive(Debug)]
666#[must_use = "Signals do nothing unless polled"]
667pub struct ResultSignal<S, E> {
668    #[pin]
669    state: MaybeSignalState<S, E>,
670}
671
672impl<S, E> Signal for ResultSignal<S, E> where S: Signal {
673    type Item = Result<S::Item, E>;
674
675    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
676        self.project().state.poll(cx, Ok, Err)
677    }
678}
679
680/// Converts a `Result<Signal<A>, B>` into a `Signal<Result<A, B>>`.
681///
682/// This is mostly useful with [`SignalExt::switch`] or [`SignalExt::flatten`].
683///
684/// If the value is `Err(value)` then it behaves like [`always`], it just returns that
685/// value.
686///
687/// If the value is `Ok(signal)` then it will return the result of the signal,
688/// except wrapped in `Ok`.
689pub fn result<S, E>(value: Result<S, E>) -> ResultSignal<S, E> where S: Signal {
690    match value {
691        Ok(signal) => ResultSignal {
692            state: MaybeSignalState::Signal(signal),
693        },
694        Err(value) => ResultSignal {
695            state: MaybeSignalState::Value(Some(value)),
696        },
697    }
698}
699
700
701#[pin_project]
702#[derive(Debug)]
703#[must_use = "Signals do nothing unless polled"]
704pub struct OptionSignal<S> where S: Signal {
705    #[pin]
706    state: MaybeSignalState<S, Option<S::Item>>,
707}
708
709impl<S> Signal for OptionSignal<S> where S: Signal {
710    type Item = Option<S::Item>;
711
712    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
713        self.project().state.poll(cx, Some, |x| x)
714    }
715}
716
717/// Converts an `Option<Signal<A>>` into a `Signal<Option<A>>`.
718///
719/// This is mostly useful with [`SignalExt::switch`] or [`SignalExt::flatten`].
720///
721/// If the value is `None` then it behaves like [`always`], it just returns `None`.
722///
723/// If the value is `Some(signal)` then it will return the result of the signal,
724/// except wrapped in `Some`.
725pub fn option<S>(value: Option<S>) -> OptionSignal<S> where S: Signal {
726    match value {
727        Some(signal) => OptionSignal {
728            state: MaybeSignalState::Signal(signal),
729        },
730        None => OptionSignal {
731            state: MaybeSignalState::Value(Some(None)),
732        },
733    }
734}
735
736
737#[pin_project]
738#[derive(Debug)]
739#[must_use = "Signals do nothing unless polled"]
740#[cfg(feature = "debug")]
741pub struct SignalDebug<A> {
742    #[pin]
743    signal: A,
744    location: &'static std::panic::Location<'static>,
745}
746
747#[cfg(feature = "debug")]
748impl<A> Signal for SignalDebug<A> where A: Signal, A::Item: std::fmt::Debug {
749    type Item = A::Item;
750
751    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
752        let this = self.project();
753
754        let poll = this.signal.poll_change(cx);
755
756        log::trace!("[{}] {:#?}", this.location, poll);
757
758        poll
759    }
760}
761
762
763#[pin_project]
764#[derive(Debug)]
765#[must_use = "Signals do nothing unless polled"]
766pub struct FromFuture<A> {
767    // TODO is this valid with pinned types ?
768    #[pin]
769    future: Option<A>,
770    first: bool,
771}
772
773impl<A> Signal for FromFuture<A> where A: Future {
774    type Item = Option<A::Output>;
775
776    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
777        let mut this = self.project();
778
779        // TODO is this valid with pinned types ?
780        match this.future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
781            None => {
782                Poll::Ready(None)
783            },
784
785            Some(Poll::Ready(value)) => {
786                this.future.set(None);
787                Poll::Ready(Some(Some(value)))
788            },
789
790            Some(Poll::Pending) => {
791                if *this.first {
792                    *this.first = false;
793                    Poll::Ready(Some(None))
794
795                } else {
796                    Poll::Pending
797                }
798            },
799        }
800    }
801}
802
803#[inline]
804pub fn from_future<A>(future: A) -> FromFuture<A> where A: Future {
805    FromFuture { future: Some(future), first: true }
806}
807
808
809#[pin_project]
810#[derive(Debug)]
811#[must_use = "Signals do nothing unless polled"]
812pub struct FromStream<A> {
813    #[pin]
814    stream: Option<A>,
815    first: bool,
816}
817
818impl<A> Signal for FromStream<A> where A: Stream {
819    type Item = Option<A::Item>;
820
821    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
822        let mut this = self.project();
823
824        let mut value = None;
825
826        let done = loop {
827            match this.stream.as_mut().as_pin_mut().map(|stream| stream.poll_next(cx)) {
828                None => {
829                    break true;
830                },
831
832                Some(Poll::Ready(None)) => {
833                    this.stream.set(None);
834                    break true;
835                },
836
837                Some(Poll::Ready(Some(new_value))) => {
838                    value = Some(new_value);
839                    continue;
840                },
841
842                Some(Poll::Pending) => {
843                    break false;
844                },
845            }
846        };
847
848        match value {
849            Some(value) => {
850                *this.first = false;
851                Poll::Ready(Some(Some(value)))
852            },
853            None => {
854                if *this.first {
855                    *this.first = false;
856                    Poll::Ready(Some(None))
857
858                } else if done {
859                    Poll::Ready(None)
860
861                } else {
862                    Poll::Pending
863                }
864            },
865        }
866    }
867}
868
869#[inline]
870pub fn from_stream<A>(stream: A) -> FromStream<A> where A: Stream {
871    FromStream { stream: Some(stream), first: true }
872}
873
874
875#[derive(Debug)]
876#[must_use = "Signals do nothing unless polled"]
877pub struct Always<A> {
878    value: Option<A>,
879}
880
881impl<A> Unpin for Always<A> {}
882
883impl<A> Signal for Always<A> {
884    type Item = A;
885
886    #[inline]
887    fn poll_change(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
888        Poll::Ready(self.value.take())
889    }
890}
891
892#[inline]
893pub fn always<A>(value: A) -> Always<A> {
894    Always {
895        value: Some(value),
896    }
897}
898
899
900#[pin_project]
901#[derive(Debug)]
902#[must_use = "Signals do nothing unless polled"]
903pub struct First<A> {
904    #[pin]
905    signal: Option<A>,
906}
907
908impl<A> Signal for First<A> where A: Signal {
909    type Item = A::Item;
910
911    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
912        let mut this = self.project();
913
914        // TODO maybe it's safe to replace this with take ?
915        if let Some(poll) = this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
916            this.signal.set(None);
917            poll
918
919        } else {
920            Poll::Ready(None)
921        }
922    }
923}
924
925
926#[pin_project]
927#[derive(Debug)]
928#[must_use = "Signals do nothing unless polled"]
929pub struct Switch<A, B, C> where A: Signal, C: FnMut(A::Item) -> B {
930    #[pin]
931    inner: Flatten<Map<A, C>>,
932}
933
934impl<A, B, C> Signal for Switch<A, B, C>
935    where A: Signal,
936          B: Signal,
937          C: FnMut(A::Item) -> B {
938    type Item = B::Item;
939
940    #[inline]
941    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
942        self.project().inner.poll_change(cx)
943    }
944}
945
946
947#[pin_project]
948#[derive(Debug)]
949#[must_use = "Streams do nothing unless polled"]
950pub struct SampleStreamCloned<A, B> where A: Signal, B: Stream {
951    #[pin]
952    signal: Option<A>,
953    #[pin]
954    stream: B,
955    value: Option<A::Item>,
956}
957
958impl<A, B> Stream for SampleStreamCloned<A, B>
959    where A: Signal,
960          A::Item: Clone,
961          B: Stream {
962    type Item = (A::Item, B::Item);
963
964    #[inline]
965    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
966        let mut this = self.project();
967
968        this.stream.as_mut().poll_next(cx).map(|option| {
969            option.map(|value| {
970                loop {
971                    break match this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
972                        Some(Poll::Ready(Some(value))) => {
973                            *this.value = Some(value);
974                            continue;
975                        },
976                        Some(Poll::Ready(None)) => {
977                            this.signal.set(None);
978                        },
979                        _ => {},
980                    };
981                }
982
983                match this.value {
984                    Some(ref signal) => {
985                        (signal.clone(), value)
986                    },
987                    None => {
988                        unreachable!()
989                    },
990                }
991            })
992        })
993    }
994}
995
996
997// TODO faster for_each which doesn't poll twice on Poll::Ready
998#[pin_project]
999#[derive(Debug)]
1000#[must_use = "Futures do nothing unless polled"]
1001pub struct ForEach<A, B, C> {
1002    #[pin]
1003    inner: stream::ForEach<SignalStream<A>, B, C>,
1004}
1005
1006impl<A, B, C> Future for ForEach<A, B, C>
1007    where A: Signal,
1008          B: Future<Output = ()>,
1009          C: FnMut(A::Item) -> B {
1010    type Output = ();
1011
1012    #[inline]
1013    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1014        self.project().inner.poll(cx)
1015    }
1016}
1017
1018
1019#[pin_project]
1020#[derive(Debug)]
1021#[must_use = "Streams do nothing unless polled"]
1022pub struct SignalStream<A> {
1023    #[pin]
1024    signal: A,
1025}
1026
1027impl<A: Signal> Stream for SignalStream<A> {
1028    type Item = A::Item;
1029
1030    #[inline]
1031    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1032        self.project().signal.poll_change(cx)
1033    }
1034}
1035
1036
1037// TODO maybe remove this ?
1038#[pin_project]
1039#[derive(Debug)]
1040#[must_use = "Futures do nothing unless polled"]
1041pub struct SignalFuture<A> where A: Signal {
1042    #[pin]
1043    signal: A,
1044    value: Option<A::Item>,
1045}
1046
1047impl<A> Future for SignalFuture<A> where A: Signal {
1048    type Output = A::Item;
1049
1050    #[inline]
1051    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1052        let mut this = self.project();
1053
1054        loop {
1055            return match this.signal.as_mut().poll_change(cx) {
1056                Poll::Ready(None) => {
1057                    Poll::Ready(this.value.take().unwrap())
1058                },
1059                Poll::Ready(Some(new_value)) => {
1060                    *this.value = Some(new_value);
1061                    continue;
1062                },
1063                Poll::Pending => {
1064                    Poll::Pending
1065                },
1066            }
1067        }
1068    }
1069}
1070
1071
1072#[pin_project(project = MapProj)]
1073#[derive(Debug)]
1074#[must_use = "Signals do nothing unless polled"]
1075pub struct Map<A, B> {
1076    #[pin]
1077    signal: A,
1078    callback: B,
1079}
1080
1081impl<A, B, C> Signal for Map<A, B>
1082    where A: Signal,
1083          B: FnMut(A::Item) -> C {
1084    type Item = C;
1085
1086    #[inline]
1087    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1088        let MapProj { signal, callback } = self.project();
1089
1090        signal.poll_change(cx).map(|opt| opt.map(|value| callback(value)))
1091    }
1092}
1093
1094
1095#[pin_project(project = StopIfProj)]
1096#[derive(Debug)]
1097#[must_use = "Signals do nothing unless polled"]
1098pub struct StopIf<A, B> {
1099    #[pin]
1100    signal: A,
1101    stopped: bool,
1102    test: B,
1103}
1104
1105impl<A, B> Signal for StopIf<A, B>
1106    where A: Signal,
1107          B: FnMut(&A::Item) -> bool {
1108    type Item = A::Item;
1109
1110    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1111        let StopIfProj { signal, stopped, test } = self.project();
1112
1113        if *stopped {
1114            Poll::Ready(None)
1115
1116        } else {
1117            match signal.poll_change(cx) {
1118                Poll::Ready(Some(value)) => {
1119                    if test(&value) {
1120                        *stopped = true;
1121                    }
1122
1123                    Poll::Ready(Some(value))
1124                },
1125                Poll::Ready(None) => {
1126                    *stopped = true;
1127                    Poll::Ready(None)
1128                },
1129                Poll::Pending => Poll::Pending,
1130            }
1131        }
1132    }
1133}
1134
1135
1136#[pin_project(project = EqProj)]
1137#[derive(Debug)]
1138#[must_use = "Signals do nothing unless polled"]
1139pub struct Eq<A> where A: Signal {
1140    #[pin]
1141    signal: A,
1142    matches: Option<bool>,
1143    value: A::Item,
1144}
1145
1146impl<A> Signal for Eq<A>
1147    where A: Signal,
1148          A::Item: PartialEq {
1149    type Item = bool;
1150
1151    #[inline]
1152    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1153        let EqProj { mut signal, matches, value } = self.project();
1154
1155        loop {
1156            return match signal.as_mut().poll_change(cx) {
1157                Poll::Ready(Some(new_value)) => {
1158                    let new = Some(new_value == *value);
1159
1160                    if *matches != new {
1161                        *matches = new;
1162                        Poll::Ready(new)
1163
1164                    } else {
1165                        continue;
1166                    }
1167                },
1168                Poll::Ready(None) => Poll::Ready(None),
1169                Poll::Pending => Poll::Pending,
1170            }
1171        }
1172    }
1173}
1174
1175
1176#[pin_project(project = NeqProj)]
1177#[derive(Debug)]
1178#[must_use = "Signals do nothing unless polled"]
1179pub struct Neq<A> where A: Signal {
1180    #[pin]
1181    signal: A,
1182    matches: Option<bool>,
1183    value: A::Item,
1184}
1185
1186impl<A> Signal for Neq<A>
1187    where A: Signal,
1188          A::Item: PartialEq {
1189    type Item = bool;
1190
1191    #[inline]
1192    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1193        let NeqProj { mut signal, matches, value } = self.project();
1194
1195        loop {
1196            return match signal.as_mut().poll_change(cx) {
1197                Poll::Ready(Some(new_value)) => {
1198                    let new = Some(new_value != *value);
1199
1200                    if *matches != new {
1201                        *matches = new;
1202                        Poll::Ready(new)
1203
1204                    } else {
1205                        continue;
1206                    }
1207                },
1208                Poll::Ready(None) => Poll::Ready(None),
1209                Poll::Pending => Poll::Pending,
1210            }
1211        }
1212    }
1213}
1214
1215
1216#[pin_project]
1217#[derive(Debug)]
1218#[must_use = "Signals do nothing unless polled"]
1219pub struct Inspect<A, B> {
1220    #[pin]
1221    signal: A,
1222    callback: B,
1223}
1224
1225impl<A, B> Signal for Inspect<A, B>
1226    where A: Signal,
1227          B: FnMut(&A::Item) {
1228    type Item = A::Item;
1229
1230    #[inline]
1231    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1232        let this = self.project();
1233
1234        let poll = this.signal.poll_change(cx);
1235
1236        if let Poll::Ready(Some(ref value)) = poll {
1237            (this.callback)(value);
1238        }
1239
1240        poll
1241    }
1242}
1243
1244
1245#[pin_project(project = MapFutureProj)]
1246#[derive(Debug)]
1247#[must_use = "Signals do nothing unless polled"]
1248pub struct MapFuture<A, B, C> {
1249    #[pin]
1250    signal: Option<A>,
1251    #[pin]
1252    future: Option<B>,
1253    callback: C,
1254    first: bool,
1255}
1256
1257impl<A, B, C> Signal for MapFuture<A, B, C>
1258    where A: Signal,
1259          B: Future,
1260          C: FnMut(A::Item) -> B {
1261    type Item = Option<B::Output>;
1262
1263    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1264        let MapFutureProj { mut signal, mut future, callback, first } = self.project();
1265
1266        let mut done = false;
1267
1268        loop {
1269            match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1270                None => {
1271                    done = true;
1272                },
1273                Some(Poll::Ready(None)) => {
1274                    signal.set(None);
1275                    done = true;
1276                },
1277                Some(Poll::Ready(Some(value))) => {
1278                    let value = Some(callback(value));
1279                    future.set(value);
1280                    continue;
1281                },
1282                Some(Poll::Pending) => {},
1283            }
1284            break;
1285        }
1286
1287        match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1288            None => {},
1289            Some(Poll::Ready(value)) => {
1290                future.set(None);
1291                *first = false;
1292                return Poll::Ready(Some(Some(value)));
1293            },
1294            Some(Poll::Pending) => {
1295                done = false;
1296            },
1297        }
1298
1299        if *first {
1300            *first = false;
1301            Poll::Ready(Some(None))
1302
1303        } else if done {
1304            Poll::Ready(None)
1305
1306        } else {
1307            Poll::Pending
1308        }
1309    }
1310}
1311
1312
1313#[pin_project(project = ThrottleProj)]
1314#[derive(Debug)]
1315#[must_use = "Signals do nothing unless polled"]
1316pub struct Throttle<A, B, C> where A: Signal {
1317    #[pin]
1318    signal: Option<A>,
1319    #[pin]
1320    future: Option<B>,
1321    callback: C,
1322}
1323
1324impl<A, B, C> Signal for Throttle<A, B, C>
1325    where A: Signal,
1326          B: Future<Output = ()>,
1327          C: FnMut() -> B {
1328    type Item = A::Item;
1329
1330    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1331        let ThrottleProj { mut signal, mut future, callback } = self.project();
1332
1333        match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1334            None => {},
1335            Some(Poll::Ready(())) => {
1336                future.set(None);
1337            },
1338            Some(Poll::Pending) => {
1339                // TODO does this need to poll the Signal as well ?
1340                return Poll::Pending;
1341            },
1342        }
1343
1344        match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1345            None => {
1346                Poll::Ready(None)
1347            },
1348            Some(Poll::Ready(None)) => {
1349                // TODO maybe remove the future too ?
1350                signal.set(None);
1351                Poll::Ready(None)
1352            },
1353            Some(Poll::Ready(Some(value))) => {
1354                future.set(Some(callback()));
1355
1356                if let Some(Poll::Ready(())) = future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1357                    future.set(None);
1358                }
1359
1360                Poll::Ready(Some(value))
1361            },
1362            Some(Poll::Pending) => {
1363                Poll::Pending
1364            },
1365        }
1366    }
1367}
1368
1369
1370#[pin_project]
1371#[derive(Debug)]
1372#[must_use = "Futures do nothing unless polled"]
1373pub struct WaitFor<A>
1374    where A: Signal,
1375          A::Item: PartialEq {
1376    #[pin]
1377    signal: A,
1378    value: A::Item,
1379}
1380
1381impl<A> Future for WaitFor<A>
1382    where A: Signal,
1383          A::Item: PartialEq {
1384
1385    type Output = Option<A::Item>;
1386
1387    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1388        let mut this = self.project();
1389
1390        loop {
1391            let poll = this.signal.as_mut().poll_change(cx);
1392
1393            if let Poll::Ready(Some(ref new_value)) = poll {
1394                if new_value != this.value {
1395                    continue;
1396                }
1397            }
1398
1399            return poll;
1400        }
1401    }
1402}
1403
1404
1405#[pin_project]
1406#[derive(Debug)]
1407#[must_use = "SignalVecs do nothing unless polled"]
1408pub struct SignalSignalVec<A> {
1409    #[pin]
1410    signal: A,
1411}
1412
1413impl<A, B> SignalVec for SignalSignalVec<A>
1414    where A: Signal<Item = Vec<B>> {
1415    type Item = B;
1416
1417    #[inline]
1418    fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1419        self.project().signal.poll_change(cx).map(|opt| opt.map(|values| VecDiff::Replace { values }))
1420    }
1421}
1422
1423
1424// TODO should this inline ?
1425fn dedupe<A, S, F>(mut signal: Pin<&mut S>, cx: &mut Context, old_value: &mut Option<S::Item>, f: F) -> Poll<Option<A>>
1426    where S: Signal,
1427          S::Item: PartialEq,
1428          F: FnOnce(&mut S::Item) -> A {
1429    loop {
1430        return match signal.as_mut().poll_change(cx) {
1431            Poll::Ready(Some(mut new_value)) => {
1432                let has_changed = match old_value {
1433                    Some(old_value) => *old_value != new_value,
1434                    None => true,
1435                };
1436
1437                if has_changed {
1438                    let output = f(&mut new_value);
1439                    *old_value = Some(new_value);
1440                    Poll::Ready(Some(output))
1441
1442                } else {
1443                    continue;
1444                }
1445            },
1446            Poll::Ready(None) => Poll::Ready(None),
1447            Poll::Pending => Poll::Pending,
1448        }
1449    }
1450}
1451
1452
1453#[pin_project(project = DedupeMapProj)]
1454#[derive(Debug)]
1455#[must_use = "Signals do nothing unless polled"]
1456pub struct DedupeMap<A, B> where A: Signal {
1457    old_value: Option<A::Item>,
1458    #[pin]
1459    signal: A,
1460    callback: B,
1461}
1462
1463impl<A, B, C> Signal for DedupeMap<A, B>
1464    where A: Signal,
1465          A::Item: PartialEq,
1466          // TODO should this use & instead of &mut ?
1467          // TODO should this use Fn instead ?
1468          B: FnMut(&mut A::Item) -> C {
1469
1470    type Item = C;
1471
1472    // TODO should this use #[inline] ?
1473    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1474        let DedupeMapProj { old_value, signal, callback } = self.project();
1475
1476        dedupe(signal, cx, old_value, callback)
1477    }
1478}
1479
1480
1481#[pin_project(project = DedupeProj)]
1482#[derive(Debug)]
1483#[must_use = "Signals do nothing unless polled"]
1484pub struct Dedupe<A> where A: Signal {
1485    old_value: Option<A::Item>,
1486    #[pin]
1487    signal: A,
1488}
1489
1490impl<A> Signal for Dedupe<A>
1491    where A: Signal,
1492          A::Item: PartialEq + Copy {
1493
1494    type Item = A::Item;
1495
1496    // TODO should this use #[inline] ?
1497    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1498        let DedupeProj { old_value, signal } = self.project();
1499
1500        dedupe(signal, cx, old_value, |value| *value)
1501    }
1502}
1503
1504
1505#[pin_project(project = DedupeClonedProj)]
1506#[derive(Debug)]
1507#[must_use = "Signals do nothing unless polled"]
1508pub struct DedupeCloned<A> where A: Signal {
1509    old_value: Option<A::Item>,
1510    #[pin]
1511    signal: A,
1512}
1513
1514impl<A> Signal for DedupeCloned<A>
1515    where A: Signal,
1516          A::Item: PartialEq + Clone {
1517
1518    type Item = A::Item;
1519
1520    // TODO should this use #[inline] ?
1521    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1522        let DedupeClonedProj { old_value, signal } = self.project();
1523
1524        dedupe(signal, cx, old_value, |value| value.clone())
1525    }
1526}
1527
1528
1529#[pin_project(project = FilterMapProj)]
1530#[derive(Debug)]
1531#[must_use = "Signals do nothing unless polled"]
1532pub struct FilterMap<A, B> {
1533    #[pin]
1534    signal: A,
1535    callback: B,
1536    first: bool,
1537}
1538
1539impl<A, B, C> Signal for FilterMap<A, B>
1540    where A: Signal,
1541          B: FnMut(A::Item) -> Option<C> {
1542    type Item = Option<C>;
1543
1544    // TODO should this use #[inline] ?
1545    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1546        let FilterMapProj { mut signal, callback, first } = self.project();
1547
1548        loop {
1549            return match signal.as_mut().poll_change(cx) {
1550                Poll::Ready(Some(value)) => match callback(value) {
1551                    Some(value) => {
1552                        *first = false;
1553                        Poll::Ready(Some(Some(value)))
1554                    },
1555
1556                    None => {
1557                        if *first {
1558                            *first = false;
1559                            Poll::Ready(Some(None))
1560
1561                        } else {
1562                            continue;
1563                        }
1564                    },
1565                },
1566                Poll::Ready(None) => Poll::Ready(None),
1567                Poll::Pending => Poll::Pending,
1568            }
1569        }
1570    }
1571}
1572
1573
1574// TODO test the Unpin impl of this
1575//      impl<A> Unpin for Flatten<A> where A: Unpin + Signal, A::Item: Unpin {}
1576#[pin_project(project = FlattenProj)]
1577#[derive(Debug)]
1578#[must_use = "Signals do nothing unless polled"]
1579pub struct Flatten<A> where A: Signal {
1580    #[pin]
1581    signal: Option<A>,
1582    #[pin]
1583    inner: Option<A::Item>,
1584}
1585
1586// Poll parent => Has inner   => Poll inner  => Output
1587// --------------------------------------------------------
1588// Some(inner) =>             => Some(value) => Some(value)
1589// Some(inner) =>             => None        => Pending
1590// Some(inner) =>             => Pending     => Pending
1591// None        => Some(inner) => Some(value) => Some(value)
1592// None        => Some(inner) => None        => None
1593// None        => Some(inner) => Pending     => Pending
1594// None        => None        =>             => None
1595// Pending     => Some(inner) => Some(value) => Some(value)
1596// Pending     => Some(inner) => None        => Pending
1597// Pending     => Some(inner) => Pending     => Pending
1598// Pending     => None        =>             => Pending
1599impl<A> Signal for Flatten<A>
1600    where A: Signal,
1601          A::Item: Signal {
1602    type Item = <A::Item as Signal>::Item;
1603
1604    #[inline]
1605    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1606        let FlattenProj { mut signal, mut inner } = self.project();
1607
1608        let done = match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1609            None => true,
1610            Some(Poll::Ready(None)) => {
1611                signal.set(None);
1612                true
1613            },
1614            Some(Poll::Ready(Some(new_inner))) => {
1615                inner.set(Some(new_inner));
1616                false
1617            },
1618            Some(Poll::Pending) => false,
1619        };
1620
1621        match inner.as_mut().as_pin_mut().map(|inner| inner.poll_change(cx)) {
1622            Some(Poll::Ready(None)) => {
1623                inner.set(None);
1624            },
1625            Some(poll) => {
1626                return poll;
1627            },
1628            None => {},
1629        }
1630
1631        if done {
1632            Poll::Ready(None)
1633
1634        } else {
1635            Poll::Pending
1636        }
1637    }
1638}
1639
1640
1641#[pin_project(project = SwitchSignalVecProj)]
1642#[derive(Debug)]
1643#[must_use = "SignalVecs do nothing unless polled"]
1644pub struct SwitchSignalVec<A, B, C> where B: SignalVec {
1645    #[pin]
1646    signal: Option<A>,
1647    #[pin]
1648    signal_vec: Option<B>,
1649    callback: C,
1650    len: usize,
1651}
1652
1653impl<A, B, C> SignalVec for SwitchSignalVec<A, B, C>
1654    where A: Signal,
1655          B: SignalVec,
1656          C: FnMut(A::Item) -> B {
1657    type Item = B::Item;
1658
1659    fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1660        let SwitchSignalVecProj { mut signal, mut signal_vec, callback, len } = self.project();
1661
1662        let mut signal_value = None;
1663
1664        let signal_done = loop {
1665            break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1666                None => {
1667                    true
1668                },
1669                Some(Poll::Pending) => {
1670                    false
1671                },
1672                Some(Poll::Ready(None)) => {
1673                    signal.set(None);
1674                    true
1675                },
1676                Some(Poll::Ready(Some(value))) => {
1677                    signal_value = Some(value);
1678                    continue;
1679                },
1680            }
1681        };
1682
1683        fn new_signal_vec<A>(len: &mut usize) -> Poll<Option<VecDiff<A>>> {
1684            if *len == 0 {
1685                Poll::Pending
1686
1687            } else {
1688                *len = 0;
1689                Poll::Ready(Some(VecDiff::Replace { values: vec![] }))
1690            }
1691        }
1692
1693        fn calculate_len<A>(len: &mut usize, vec_diff: &VecDiff<A>) {
1694            match vec_diff {
1695                VecDiff::Replace { values } => {
1696                    *len = values.len();
1697                },
1698                VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
1699                    *len += 1;
1700                },
1701                VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
1702                    *len -= 1;
1703                },
1704                VecDiff::Clear {} => {
1705                    *len = 0;
1706                },
1707                VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {},
1708            }
1709        }
1710
1711        if let Some(value) = signal_value {
1712            signal_vec.set(Some(callback(value)));
1713
1714            match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1715                None => {
1716                    if signal_done {
1717                        Poll::Ready(None)
1718
1719                    } else {
1720                        new_signal_vec(len)
1721                    }
1722                },
1723
1724                Some(Poll::Pending) => {
1725                    new_signal_vec(len)
1726                },
1727
1728                Some(Poll::Ready(None)) => {
1729                    signal_vec.set(None);
1730
1731                    if signal_done {
1732                        Poll::Ready(None)
1733
1734                    } else {
1735                        new_signal_vec(len)
1736                    }
1737                },
1738
1739                Some(Poll::Ready(Some(vec_diff))) => {
1740                    if *len == 0 {
1741                        calculate_len(len, &vec_diff);
1742                        Poll::Ready(Some(vec_diff))
1743
1744                    } else {
1745                        let mut values = vec![];
1746
1747                        vec_diff.apply_to_vec(&mut values);
1748
1749                        *len = values.len();
1750
1751                        Poll::Ready(Some(VecDiff::Replace { values }))
1752                    }
1753                },
1754            }
1755
1756        } else {
1757            match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1758                None => {
1759                    if signal_done {
1760                        Poll::Ready(None)
1761
1762                    } else {
1763                        Poll::Pending
1764                    }
1765                },
1766
1767                Some(Poll::Pending) => {
1768                    Poll::Pending
1769                },
1770
1771                Some(Poll::Ready(None)) => {
1772                    signal_vec.set(None);
1773
1774                    if signal_done {
1775                        Poll::Ready(None)
1776
1777                    } else {
1778                        Poll::Pending
1779                    }
1780                },
1781
1782                Some(Poll::Ready(Some(vec_diff))) => {
1783                    calculate_len(len, &vec_diff);
1784                    Poll::Ready(Some(vec_diff))
1785                },
1786            }
1787        }
1788    }
1789}