1use std::pin::Pin;
2use std::marker::Unpin;
3use std::future::Future;
4use std::task::{Context, Poll};
5use futures_core::stream::Stream;
6use futures_util::stream;
7use futures_util::stream::StreamExt;
8use pin_project::pin_project;
9
10use crate::signal::Broadcaster;
11use crate::signal_vec::{VecDiff, SignalVec};
12
13
14#[must_use = "Signals do nothing unless polled"]
24pub trait Signal {
25 type Item;
26
27 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
28}
29
30
31impl<'a, A> Signal for &'a mut A where A: ?Sized + Signal + Unpin {
33 type Item = A::Item;
34
35 #[inline]
36 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
37 A::poll_change(Pin::new(&mut **self), cx)
38 }
39}
40
41impl<A> Signal for Box<A> where A: ?Sized + Signal + Unpin {
43 type Item = A::Item;
44
45 #[inline]
46 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
47 A::poll_change(Pin::new(&mut *self), cx)
48 }
49}
50
51impl<A> Signal for Pin<A>
53 where A: Unpin + ::std::ops::DerefMut,
54 A::Target: Signal {
55 type Item = <<A as ::std::ops::Deref>::Target as Signal>::Item;
56
57 #[inline]
58 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
59 Pin::get_mut(self).as_mut().poll_change(cx)
60 }
61}
62
63
64pub trait SignalExt: Signal {
66 #[inline]
83 fn to_stream(self) -> SignalStream<Self>
84 where Self: Sized {
85 SignalStream {
86 signal: self,
87 }
88 }
89
90 #[inline]
92 fn to_future(self) -> SignalFuture<Self>
93 where Self: Sized {
94 SignalFuture {
95 signal: self,
96 value: None,
97 }
98 }
99
100 #[inline]
155 fn map<A, B>(self, callback: B) -> Map<Self, B>
156 where B: FnMut(Self::Item) -> A,
157 Self: Sized {
158 Map {
159 signal: self,
160 callback,
161 }
162 }
163
164 #[inline]
165 fn inspect<A>(self, callback: A) -> Inspect<Self, A>
166 where A: FnMut(&Self::Item),
167 Self: Sized {
168 Inspect {
169 signal: self,
170 callback,
171 }
172 }
173
174 #[inline]
175 fn eq(self, value: Self::Item) -> Eq<Self>
176 where Self::Item: PartialEq,
177 Self: Sized {
178 Eq {
179 signal: self,
180 matches: None,
181 value,
182 }
183 }
184
185 #[inline]
186 fn neq(self, value: Self::Item) -> Neq<Self>
187 where Self::Item: PartialEq,
188 Self: Sized {
189 Neq {
190 signal: self,
191 matches: None,
192 value,
193 }
194 }
195
196 #[inline]
221 fn dedupe_map<A, B>(self, callback: B) -> DedupeMap<Self, B>
222 where B: FnMut(&mut Self::Item) -> A,
224 Self::Item: PartialEq,
225 Self: Sized {
226 DedupeMap {
227 old_value: None,
228 signal: self,
229 callback,
230 }
231 }
232
233 #[inline]
234 fn dedupe(self) -> Dedupe<Self>
235 where Self::Item: PartialEq,
236 Self: Sized {
237 Dedupe {
238 old_value: None,
239 signal: self,
240 }
241 }
242
243 #[inline]
244 fn dedupe_cloned(self) -> DedupeCloned<Self>
245 where Self::Item: PartialEq,
246 Self: Sized {
247 DedupeCloned {
248 old_value: None,
249 signal: self,
250 }
251 }
252
253 #[inline]
298 fn map_future<A, B>(self, callback: B) -> MapFuture<Self, A, B>
299 where A: Future,
300 B: FnMut(Self::Item) -> A,
301 Self: Sized {
302 MapFuture {
303 signal: Some(self),
304 future: None,
305 callback,
306 first: true,
307 }
308 }
309
310 #[inline]
361 fn filter_map<A, B>(self, callback: B) -> FilterMap<Self, B>
362 where B: FnMut(Self::Item) -> Option<A>,
363 Self: Sized {
364 FilterMap {
365 signal: self,
366 callback,
367 first: true,
368 }
369 }
370
371 #[inline]
394 fn throttle<A, B>(self, callback: B) -> Throttle<Self, A, B>
395 where A: Future<Output = ()>,
396 B: FnMut() -> A,
397 Self: Sized {
398 Throttle {
399 signal: Some(self),
400 future: None,
401 callback,
402 }
403 }
404
405 #[inline]
428 fn flatten(self) -> Flatten<Self>
429 where Self::Item: Signal,
430 Self: Sized {
431 Flatten {
432 signal: Some(self),
433 inner: None,
434 }
435 }
436
437 #[inline]
438 fn switch<A, B>(self, callback: B) -> Switch<Self, A, B>
439 where A: Signal,
440 B: FnMut(Self::Item) -> A,
441 Self: Sized {
442 Switch {
443 inner: self.map(callback).flatten()
444 }
445 }
446
447 #[inline]
448 fn switch_signal_vec<A, F>(self, callback: F) -> SwitchSignalVec<Self, A, F>
449 where A: SignalVec,
450 F: FnMut(Self::Item) -> A,
451 Self: Sized {
452 SwitchSignalVec {
453 signal: Some(self),
454 signal_vec: None,
455 callback,
456 len: 0,
457 }
458 }
459
460 #[inline]
466 fn sample_stream_cloned<A>(self, stream: A) -> SampleStreamCloned<Self, A>
467 where A: Stream,
468 A::Item: Clone,
469 Self: Sized {
470 SampleStreamCloned {
471 signal: Some(self),
472 stream: stream,
473 value: None,
474 }
475 }
476
477 #[inline]
478 fn for_each<U, F>(self, callback: F) -> ForEach<Self, U, F>
480 where U: Future<Output = ()>,
481 F: FnMut(Self::Item) -> U,
482 Self: Sized {
483 ForEach {
485 inner: SignalStream {
486 signal: self,
487 }.for_each(callback)
488 }
489 }
490
491 #[inline]
492 fn to_signal_vec(self) -> SignalSignalVec<Self>
493 where Self: Sized {
494 SignalSignalVec {
495 signal: self
496 }
497 }
498
499 #[inline]
500 fn wait_for(self, value: Self::Item) -> WaitFor<Self>
501 where Self::Item: PartialEq,
502 Self: Sized {
503 WaitFor {
504 signal: self,
505 value: value,
506 }
507 }
508
509 #[inline]
510 fn first(self) -> First<Self> where Self: Sized {
511 First {
512 signal: Some(self),
513 }
514 }
515
516 #[inline]
540 fn stop_if<F>(self, test: F) -> StopIf<Self, F>
541 where F: FnMut(&Self::Item) -> bool,
542 Self: Sized {
543 StopIf {
544 signal: self,
545 stopped: false,
546 test,
547 }
548 }
549
550
551 #[inline]
552 #[track_caller]
553 #[cfg(feature = "debug")]
554 fn debug(self) -> SignalDebug<Self> where Self: Sized, Self::Item: std::fmt::Debug {
555 SignalDebug {
556 signal: self,
557 location: std::panic::Location::caller(),
558 }
559 }
560
561 #[inline]
567 fn broadcast(self) -> Broadcaster<Self> where Self: Sized {
568 Broadcaster::new(self)
569 }
570
571 #[inline]
573 fn poll_change_unpin(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> where Self: Unpin + Sized {
574 Pin::new(self).poll_change(cx)
575 }
576
577 #[inline]
578 fn boxed<'a>(self) -> Pin<Box<dyn Signal<Item = Self::Item> + Send + 'a>>
579 where Self: Sized + Send + 'a {
580 Box::pin(self)
581 }
582
583 #[inline]
584 fn boxed_local<'a>(self) -> Pin<Box<dyn Signal<Item = Self::Item> + 'a>>
585 where Self: Sized + 'a {
586 Box::pin(self)
587 }
588}
589
590impl<T: ?Sized> SignalExt for T where T: Signal {}
592
593
594pub type BoxSignal<'a, T> = Pin<Box<dyn Signal<Item = T> + Send + 'a>>;
599
600pub type LocalBoxSignal<'a, T> = Pin<Box<dyn Signal<Item = T> + 'a>>;
602
603
604#[inline]
606pub fn not<A>(signal: A) -> impl Signal<Item = bool>
607 where A: Signal<Item = bool> {
608 signal.map(|x| !x)
609}
610
611#[inline]
614pub fn and<A, B>(left: A, right: B) -> impl Signal<Item = bool>
615 where A: Signal<Item = bool>,
616 B: Signal<Item = bool> {
617 crate::map_ref! {
618 let a = left,
619 let b = right =>
620 *a && *b
621 }
622}
623
624#[inline]
627pub fn or<A, B>(left: A, right: B) -> impl Signal<Item = bool>
628 where A: Signal<Item = bool>,
629 B: Signal<Item = bool> {
630 crate::map_ref! {
631 let a = left,
632 let b = right =>
633 *a || *b
634 }
635}
636
637
638#[pin_project(project = MaybeSignalStateProj)]
639#[derive(Debug)]
640enum MaybeSignalState<S, E> {
641 Signal(#[pin] S),
642 Value(Option<E>),
643}
644
645impl<S, E> MaybeSignalState<S, E> where S: Signal {
646 fn poll<A, B, C>(self: Pin<&mut Self>, cx: &mut Context, map_signal: A, map_value: B) -> Poll<Option<C>>
647 where A: FnOnce(S::Item) -> C,
648 B: FnOnce(E) -> C {
649 match self.project() {
650 MaybeSignalStateProj::Signal(signal) => {
651 signal.poll_change(cx).map(|value| value.map(map_signal))
652 },
653 MaybeSignalStateProj::Value(value) => {
654 match value.take() {
655 Some(value) => Poll::Ready(Some(map_value(value))),
656 None => Poll::Ready(None),
657 }
658 },
659 }
660 }
661}
662
663
664#[pin_project]
665#[derive(Debug)]
666#[must_use = "Signals do nothing unless polled"]
667pub struct ResultSignal<S, E> {
668 #[pin]
669 state: MaybeSignalState<S, E>,
670}
671
672impl<S, E> Signal for ResultSignal<S, E> where S: Signal {
673 type Item = Result<S::Item, E>;
674
675 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
676 self.project().state.poll(cx, Ok, Err)
677 }
678}
679
680pub fn result<S, E>(value: Result<S, E>) -> ResultSignal<S, E> where S: Signal {
690 match value {
691 Ok(signal) => ResultSignal {
692 state: MaybeSignalState::Signal(signal),
693 },
694 Err(value) => ResultSignal {
695 state: MaybeSignalState::Value(Some(value)),
696 },
697 }
698}
699
700
701#[pin_project]
702#[derive(Debug)]
703#[must_use = "Signals do nothing unless polled"]
704pub struct OptionSignal<S> where S: Signal {
705 #[pin]
706 state: MaybeSignalState<S, Option<S::Item>>,
707}
708
709impl<S> Signal for OptionSignal<S> where S: Signal {
710 type Item = Option<S::Item>;
711
712 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
713 self.project().state.poll(cx, Some, |x| x)
714 }
715}
716
717pub fn option<S>(value: Option<S>) -> OptionSignal<S> where S: Signal {
726 match value {
727 Some(signal) => OptionSignal {
728 state: MaybeSignalState::Signal(signal),
729 },
730 None => OptionSignal {
731 state: MaybeSignalState::Value(Some(None)),
732 },
733 }
734}
735
736
737#[pin_project]
738#[derive(Debug)]
739#[must_use = "Signals do nothing unless polled"]
740#[cfg(feature = "debug")]
741pub struct SignalDebug<A> {
742 #[pin]
743 signal: A,
744 location: &'static std::panic::Location<'static>,
745}
746
747#[cfg(feature = "debug")]
748impl<A> Signal for SignalDebug<A> where A: Signal, A::Item: std::fmt::Debug {
749 type Item = A::Item;
750
751 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
752 let this = self.project();
753
754 let poll = this.signal.poll_change(cx);
755
756 log::trace!("[{}] {:#?}", this.location, poll);
757
758 poll
759 }
760}
761
762
763#[pin_project]
764#[derive(Debug)]
765#[must_use = "Signals do nothing unless polled"]
766pub struct FromFuture<A> {
767 #[pin]
769 future: Option<A>,
770 first: bool,
771}
772
773impl<A> Signal for FromFuture<A> where A: Future {
774 type Item = Option<A::Output>;
775
776 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
777 let mut this = self.project();
778
779 match this.future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
781 None => {
782 Poll::Ready(None)
783 },
784
785 Some(Poll::Ready(value)) => {
786 this.future.set(None);
787 Poll::Ready(Some(Some(value)))
788 },
789
790 Some(Poll::Pending) => {
791 if *this.first {
792 *this.first = false;
793 Poll::Ready(Some(None))
794
795 } else {
796 Poll::Pending
797 }
798 },
799 }
800 }
801}
802
803#[inline]
804pub fn from_future<A>(future: A) -> FromFuture<A> where A: Future {
805 FromFuture { future: Some(future), first: true }
806}
807
808
809#[pin_project]
810#[derive(Debug)]
811#[must_use = "Signals do nothing unless polled"]
812pub struct FromStream<A> {
813 #[pin]
814 stream: Option<A>,
815 first: bool,
816}
817
818impl<A> Signal for FromStream<A> where A: Stream {
819 type Item = Option<A::Item>;
820
821 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
822 let mut this = self.project();
823
824 let mut value = None;
825
826 let done = loop {
827 match this.stream.as_mut().as_pin_mut().map(|stream| stream.poll_next(cx)) {
828 None => {
829 break true;
830 },
831
832 Some(Poll::Ready(None)) => {
833 this.stream.set(None);
834 break true;
835 },
836
837 Some(Poll::Ready(Some(new_value))) => {
838 value = Some(new_value);
839 continue;
840 },
841
842 Some(Poll::Pending) => {
843 break false;
844 },
845 }
846 };
847
848 match value {
849 Some(value) => {
850 *this.first = false;
851 Poll::Ready(Some(Some(value)))
852 },
853 None => {
854 if *this.first {
855 *this.first = false;
856 Poll::Ready(Some(None))
857
858 } else if done {
859 Poll::Ready(None)
860
861 } else {
862 Poll::Pending
863 }
864 },
865 }
866 }
867}
868
869#[inline]
870pub fn from_stream<A>(stream: A) -> FromStream<A> where A: Stream {
871 FromStream { stream: Some(stream), first: true }
872}
873
874
875#[derive(Debug)]
876#[must_use = "Signals do nothing unless polled"]
877pub struct Always<A> {
878 value: Option<A>,
879}
880
881impl<A> Unpin for Always<A> {}
882
883impl<A> Signal for Always<A> {
884 type Item = A;
885
886 #[inline]
887 fn poll_change(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
888 Poll::Ready(self.value.take())
889 }
890}
891
892#[inline]
893pub fn always<A>(value: A) -> Always<A> {
894 Always {
895 value: Some(value),
896 }
897}
898
899
900#[pin_project]
901#[derive(Debug)]
902#[must_use = "Signals do nothing unless polled"]
903pub struct First<A> {
904 #[pin]
905 signal: Option<A>,
906}
907
908impl<A> Signal for First<A> where A: Signal {
909 type Item = A::Item;
910
911 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
912 let mut this = self.project();
913
914 if let Some(poll) = this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
916 this.signal.set(None);
917 poll
918
919 } else {
920 Poll::Ready(None)
921 }
922 }
923}
924
925
926#[pin_project]
927#[derive(Debug)]
928#[must_use = "Signals do nothing unless polled"]
929pub struct Switch<A, B, C> where A: Signal, C: FnMut(A::Item) -> B {
930 #[pin]
931 inner: Flatten<Map<A, C>>,
932}
933
934impl<A, B, C> Signal for Switch<A, B, C>
935 where A: Signal,
936 B: Signal,
937 C: FnMut(A::Item) -> B {
938 type Item = B::Item;
939
940 #[inline]
941 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
942 self.project().inner.poll_change(cx)
943 }
944}
945
946
947#[pin_project]
948#[derive(Debug)]
949#[must_use = "Streams do nothing unless polled"]
950pub struct SampleStreamCloned<A, B> where A: Signal, B: Stream {
951 #[pin]
952 signal: Option<A>,
953 #[pin]
954 stream: B,
955 value: Option<A::Item>,
956}
957
958impl<A, B> Stream for SampleStreamCloned<A, B>
959 where A: Signal,
960 A::Item: Clone,
961 B: Stream {
962 type Item = (A::Item, B::Item);
963
964 #[inline]
965 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
966 let mut this = self.project();
967
968 this.stream.as_mut().poll_next(cx).map(|option| {
969 option.map(|value| {
970 loop {
971 break match this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
972 Some(Poll::Ready(Some(value))) => {
973 *this.value = Some(value);
974 continue;
975 },
976 Some(Poll::Ready(None)) => {
977 this.signal.set(None);
978 },
979 _ => {},
980 };
981 }
982
983 match this.value {
984 Some(ref signal) => {
985 (signal.clone(), value)
986 },
987 None => {
988 unreachable!()
989 },
990 }
991 })
992 })
993 }
994}
995
996
997#[pin_project]
999#[derive(Debug)]
1000#[must_use = "Futures do nothing unless polled"]
1001pub struct ForEach<A, B, C> {
1002 #[pin]
1003 inner: stream::ForEach<SignalStream<A>, B, C>,
1004}
1005
1006impl<A, B, C> Future for ForEach<A, B, C>
1007 where A: Signal,
1008 B: Future<Output = ()>,
1009 C: FnMut(A::Item) -> B {
1010 type Output = ();
1011
1012 #[inline]
1013 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1014 self.project().inner.poll(cx)
1015 }
1016}
1017
1018
1019#[pin_project]
1020#[derive(Debug)]
1021#[must_use = "Streams do nothing unless polled"]
1022pub struct SignalStream<A> {
1023 #[pin]
1024 signal: A,
1025}
1026
1027impl<A: Signal> Stream for SignalStream<A> {
1028 type Item = A::Item;
1029
1030 #[inline]
1031 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1032 self.project().signal.poll_change(cx)
1033 }
1034}
1035
1036
1037#[pin_project]
1039#[derive(Debug)]
1040#[must_use = "Futures do nothing unless polled"]
1041pub struct SignalFuture<A> where A: Signal {
1042 #[pin]
1043 signal: A,
1044 value: Option<A::Item>,
1045}
1046
1047impl<A> Future for SignalFuture<A> where A: Signal {
1048 type Output = A::Item;
1049
1050 #[inline]
1051 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1052 let mut this = self.project();
1053
1054 loop {
1055 return match this.signal.as_mut().poll_change(cx) {
1056 Poll::Ready(None) => {
1057 Poll::Ready(this.value.take().unwrap())
1058 },
1059 Poll::Ready(Some(new_value)) => {
1060 *this.value = Some(new_value);
1061 continue;
1062 },
1063 Poll::Pending => {
1064 Poll::Pending
1065 },
1066 }
1067 }
1068 }
1069}
1070
1071
1072#[pin_project(project = MapProj)]
1073#[derive(Debug)]
1074#[must_use = "Signals do nothing unless polled"]
1075pub struct Map<A, B> {
1076 #[pin]
1077 signal: A,
1078 callback: B,
1079}
1080
1081impl<A, B, C> Signal for Map<A, B>
1082 where A: Signal,
1083 B: FnMut(A::Item) -> C {
1084 type Item = C;
1085
1086 #[inline]
1087 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1088 let MapProj { signal, callback } = self.project();
1089
1090 signal.poll_change(cx).map(|opt| opt.map(|value| callback(value)))
1091 }
1092}
1093
1094
1095#[pin_project(project = StopIfProj)]
1096#[derive(Debug)]
1097#[must_use = "Signals do nothing unless polled"]
1098pub struct StopIf<A, B> {
1099 #[pin]
1100 signal: A,
1101 stopped: bool,
1102 test: B,
1103}
1104
1105impl<A, B> Signal for StopIf<A, B>
1106 where A: Signal,
1107 B: FnMut(&A::Item) -> bool {
1108 type Item = A::Item;
1109
1110 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1111 let StopIfProj { signal, stopped, test } = self.project();
1112
1113 if *stopped {
1114 Poll::Ready(None)
1115
1116 } else {
1117 match signal.poll_change(cx) {
1118 Poll::Ready(Some(value)) => {
1119 if test(&value) {
1120 *stopped = true;
1121 }
1122
1123 Poll::Ready(Some(value))
1124 },
1125 Poll::Ready(None) => {
1126 *stopped = true;
1127 Poll::Ready(None)
1128 },
1129 Poll::Pending => Poll::Pending,
1130 }
1131 }
1132 }
1133}
1134
1135
1136#[pin_project(project = EqProj)]
1137#[derive(Debug)]
1138#[must_use = "Signals do nothing unless polled"]
1139pub struct Eq<A> where A: Signal {
1140 #[pin]
1141 signal: A,
1142 matches: Option<bool>,
1143 value: A::Item,
1144}
1145
1146impl<A> Signal for Eq<A>
1147 where A: Signal,
1148 A::Item: PartialEq {
1149 type Item = bool;
1150
1151 #[inline]
1152 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1153 let EqProj { mut signal, matches, value } = self.project();
1154
1155 loop {
1156 return match signal.as_mut().poll_change(cx) {
1157 Poll::Ready(Some(new_value)) => {
1158 let new = Some(new_value == *value);
1159
1160 if *matches != new {
1161 *matches = new;
1162 Poll::Ready(new)
1163
1164 } else {
1165 continue;
1166 }
1167 },
1168 Poll::Ready(None) => Poll::Ready(None),
1169 Poll::Pending => Poll::Pending,
1170 }
1171 }
1172 }
1173}
1174
1175
1176#[pin_project(project = NeqProj)]
1177#[derive(Debug)]
1178#[must_use = "Signals do nothing unless polled"]
1179pub struct Neq<A> where A: Signal {
1180 #[pin]
1181 signal: A,
1182 matches: Option<bool>,
1183 value: A::Item,
1184}
1185
1186impl<A> Signal for Neq<A>
1187 where A: Signal,
1188 A::Item: PartialEq {
1189 type Item = bool;
1190
1191 #[inline]
1192 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1193 let NeqProj { mut signal, matches, value } = self.project();
1194
1195 loop {
1196 return match signal.as_mut().poll_change(cx) {
1197 Poll::Ready(Some(new_value)) => {
1198 let new = Some(new_value != *value);
1199
1200 if *matches != new {
1201 *matches = new;
1202 Poll::Ready(new)
1203
1204 } else {
1205 continue;
1206 }
1207 },
1208 Poll::Ready(None) => Poll::Ready(None),
1209 Poll::Pending => Poll::Pending,
1210 }
1211 }
1212 }
1213}
1214
1215
1216#[pin_project]
1217#[derive(Debug)]
1218#[must_use = "Signals do nothing unless polled"]
1219pub struct Inspect<A, B> {
1220 #[pin]
1221 signal: A,
1222 callback: B,
1223}
1224
1225impl<A, B> Signal for Inspect<A, B>
1226 where A: Signal,
1227 B: FnMut(&A::Item) {
1228 type Item = A::Item;
1229
1230 #[inline]
1231 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1232 let this = self.project();
1233
1234 let poll = this.signal.poll_change(cx);
1235
1236 if let Poll::Ready(Some(ref value)) = poll {
1237 (this.callback)(value);
1238 }
1239
1240 poll
1241 }
1242}
1243
1244
1245#[pin_project(project = MapFutureProj)]
1246#[derive(Debug)]
1247#[must_use = "Signals do nothing unless polled"]
1248pub struct MapFuture<A, B, C> {
1249 #[pin]
1250 signal: Option<A>,
1251 #[pin]
1252 future: Option<B>,
1253 callback: C,
1254 first: bool,
1255}
1256
1257impl<A, B, C> Signal for MapFuture<A, B, C>
1258 where A: Signal,
1259 B: Future,
1260 C: FnMut(A::Item) -> B {
1261 type Item = Option<B::Output>;
1262
1263 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1264 let MapFutureProj { mut signal, mut future, callback, first } = self.project();
1265
1266 let mut done = false;
1267
1268 loop {
1269 match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1270 None => {
1271 done = true;
1272 },
1273 Some(Poll::Ready(None)) => {
1274 signal.set(None);
1275 done = true;
1276 },
1277 Some(Poll::Ready(Some(value))) => {
1278 let value = Some(callback(value));
1279 future.set(value);
1280 continue;
1281 },
1282 Some(Poll::Pending) => {},
1283 }
1284 break;
1285 }
1286
1287 match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1288 None => {},
1289 Some(Poll::Ready(value)) => {
1290 future.set(None);
1291 *first = false;
1292 return Poll::Ready(Some(Some(value)));
1293 },
1294 Some(Poll::Pending) => {
1295 done = false;
1296 },
1297 }
1298
1299 if *first {
1300 *first = false;
1301 Poll::Ready(Some(None))
1302
1303 } else if done {
1304 Poll::Ready(None)
1305
1306 } else {
1307 Poll::Pending
1308 }
1309 }
1310}
1311
1312
1313#[pin_project(project = ThrottleProj)]
1314#[derive(Debug)]
1315#[must_use = "Signals do nothing unless polled"]
1316pub struct Throttle<A, B, C> where A: Signal {
1317 #[pin]
1318 signal: Option<A>,
1319 #[pin]
1320 future: Option<B>,
1321 callback: C,
1322}
1323
1324impl<A, B, C> Signal for Throttle<A, B, C>
1325 where A: Signal,
1326 B: Future<Output = ()>,
1327 C: FnMut() -> B {
1328 type Item = A::Item;
1329
1330 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1331 let ThrottleProj { mut signal, mut future, callback } = self.project();
1332
1333 match future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1334 None => {},
1335 Some(Poll::Ready(())) => {
1336 future.set(None);
1337 },
1338 Some(Poll::Pending) => {
1339 return Poll::Pending;
1341 },
1342 }
1343
1344 match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1345 None => {
1346 Poll::Ready(None)
1347 },
1348 Some(Poll::Ready(None)) => {
1349 signal.set(None);
1351 Poll::Ready(None)
1352 },
1353 Some(Poll::Ready(Some(value))) => {
1354 future.set(Some(callback()));
1355
1356 if let Some(Poll::Ready(())) = future.as_mut().as_pin_mut().map(|future| future.poll(cx)) {
1357 future.set(None);
1358 }
1359
1360 Poll::Ready(Some(value))
1361 },
1362 Some(Poll::Pending) => {
1363 Poll::Pending
1364 },
1365 }
1366 }
1367}
1368
1369
1370#[pin_project]
1371#[derive(Debug)]
1372#[must_use = "Futures do nothing unless polled"]
1373pub struct WaitFor<A>
1374 where A: Signal,
1375 A::Item: PartialEq {
1376 #[pin]
1377 signal: A,
1378 value: A::Item,
1379}
1380
1381impl<A> Future for WaitFor<A>
1382 where A: Signal,
1383 A::Item: PartialEq {
1384
1385 type Output = Option<A::Item>;
1386
1387 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1388 let mut this = self.project();
1389
1390 loop {
1391 let poll = this.signal.as_mut().poll_change(cx);
1392
1393 if let Poll::Ready(Some(ref new_value)) = poll {
1394 if new_value != this.value {
1395 continue;
1396 }
1397 }
1398
1399 return poll;
1400 }
1401 }
1402}
1403
1404
1405#[pin_project]
1406#[derive(Debug)]
1407#[must_use = "SignalVecs do nothing unless polled"]
1408pub struct SignalSignalVec<A> {
1409 #[pin]
1410 signal: A,
1411}
1412
1413impl<A, B> SignalVec for SignalSignalVec<A>
1414 where A: Signal<Item = Vec<B>> {
1415 type Item = B;
1416
1417 #[inline]
1418 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1419 self.project().signal.poll_change(cx).map(|opt| opt.map(|values| VecDiff::Replace { values }))
1420 }
1421}
1422
1423
1424fn dedupe<A, S, F>(mut signal: Pin<&mut S>, cx: &mut Context, old_value: &mut Option<S::Item>, f: F) -> Poll<Option<A>>
1426 where S: Signal,
1427 S::Item: PartialEq,
1428 F: FnOnce(&mut S::Item) -> A {
1429 loop {
1430 return match signal.as_mut().poll_change(cx) {
1431 Poll::Ready(Some(mut new_value)) => {
1432 let has_changed = match old_value {
1433 Some(old_value) => *old_value != new_value,
1434 None => true,
1435 };
1436
1437 if has_changed {
1438 let output = f(&mut new_value);
1439 *old_value = Some(new_value);
1440 Poll::Ready(Some(output))
1441
1442 } else {
1443 continue;
1444 }
1445 },
1446 Poll::Ready(None) => Poll::Ready(None),
1447 Poll::Pending => Poll::Pending,
1448 }
1449 }
1450}
1451
1452
1453#[pin_project(project = DedupeMapProj)]
1454#[derive(Debug)]
1455#[must_use = "Signals do nothing unless polled"]
1456pub struct DedupeMap<A, B> where A: Signal {
1457 old_value: Option<A::Item>,
1458 #[pin]
1459 signal: A,
1460 callback: B,
1461}
1462
1463impl<A, B, C> Signal for DedupeMap<A, B>
1464 where A: Signal,
1465 A::Item: PartialEq,
1466 B: FnMut(&mut A::Item) -> C {
1469
1470 type Item = C;
1471
1472 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1474 let DedupeMapProj { old_value, signal, callback } = self.project();
1475
1476 dedupe(signal, cx, old_value, callback)
1477 }
1478}
1479
1480
1481#[pin_project(project = DedupeProj)]
1482#[derive(Debug)]
1483#[must_use = "Signals do nothing unless polled"]
1484pub struct Dedupe<A> where A: Signal {
1485 old_value: Option<A::Item>,
1486 #[pin]
1487 signal: A,
1488}
1489
1490impl<A> Signal for Dedupe<A>
1491 where A: Signal,
1492 A::Item: PartialEq + Copy {
1493
1494 type Item = A::Item;
1495
1496 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1498 let DedupeProj { old_value, signal } = self.project();
1499
1500 dedupe(signal, cx, old_value, |value| *value)
1501 }
1502}
1503
1504
1505#[pin_project(project = DedupeClonedProj)]
1506#[derive(Debug)]
1507#[must_use = "Signals do nothing unless polled"]
1508pub struct DedupeCloned<A> where A: Signal {
1509 old_value: Option<A::Item>,
1510 #[pin]
1511 signal: A,
1512}
1513
1514impl<A> Signal for DedupeCloned<A>
1515 where A: Signal,
1516 A::Item: PartialEq + Clone {
1517
1518 type Item = A::Item;
1519
1520 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1522 let DedupeClonedProj { old_value, signal } = self.project();
1523
1524 dedupe(signal, cx, old_value, |value| value.clone())
1525 }
1526}
1527
1528
1529#[pin_project(project = FilterMapProj)]
1530#[derive(Debug)]
1531#[must_use = "Signals do nothing unless polled"]
1532pub struct FilterMap<A, B> {
1533 #[pin]
1534 signal: A,
1535 callback: B,
1536 first: bool,
1537}
1538
1539impl<A, B, C> Signal for FilterMap<A, B>
1540 where A: Signal,
1541 B: FnMut(A::Item) -> Option<C> {
1542 type Item = Option<C>;
1543
1544 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1546 let FilterMapProj { mut signal, callback, first } = self.project();
1547
1548 loop {
1549 return match signal.as_mut().poll_change(cx) {
1550 Poll::Ready(Some(value)) => match callback(value) {
1551 Some(value) => {
1552 *first = false;
1553 Poll::Ready(Some(Some(value)))
1554 },
1555
1556 None => {
1557 if *first {
1558 *first = false;
1559 Poll::Ready(Some(None))
1560
1561 } else {
1562 continue;
1563 }
1564 },
1565 },
1566 Poll::Ready(None) => Poll::Ready(None),
1567 Poll::Pending => Poll::Pending,
1568 }
1569 }
1570 }
1571}
1572
1573
1574#[pin_project(project = FlattenProj)]
1577#[derive(Debug)]
1578#[must_use = "Signals do nothing unless polled"]
1579pub struct Flatten<A> where A: Signal {
1580 #[pin]
1581 signal: Option<A>,
1582 #[pin]
1583 inner: Option<A::Item>,
1584}
1585
1586impl<A> Signal for Flatten<A>
1600 where A: Signal,
1601 A::Item: Signal {
1602 type Item = <A::Item as Signal>::Item;
1603
1604 #[inline]
1605 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1606 let FlattenProj { mut signal, mut inner } = self.project();
1607
1608 let done = match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1609 None => true,
1610 Some(Poll::Ready(None)) => {
1611 signal.set(None);
1612 true
1613 },
1614 Some(Poll::Ready(Some(new_inner))) => {
1615 inner.set(Some(new_inner));
1616 false
1617 },
1618 Some(Poll::Pending) => false,
1619 };
1620
1621 match inner.as_mut().as_pin_mut().map(|inner| inner.poll_change(cx)) {
1622 Some(Poll::Ready(None)) => {
1623 inner.set(None);
1624 },
1625 Some(poll) => {
1626 return poll;
1627 },
1628 None => {},
1629 }
1630
1631 if done {
1632 Poll::Ready(None)
1633
1634 } else {
1635 Poll::Pending
1636 }
1637 }
1638}
1639
1640
1641#[pin_project(project = SwitchSignalVecProj)]
1642#[derive(Debug)]
1643#[must_use = "SignalVecs do nothing unless polled"]
1644pub struct SwitchSignalVec<A, B, C> where B: SignalVec {
1645 #[pin]
1646 signal: Option<A>,
1647 #[pin]
1648 signal_vec: Option<B>,
1649 callback: C,
1650 len: usize,
1651}
1652
1653impl<A, B, C> SignalVec for SwitchSignalVec<A, B, C>
1654 where A: Signal,
1655 B: SignalVec,
1656 C: FnMut(A::Item) -> B {
1657 type Item = B::Item;
1658
1659 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1660 let SwitchSignalVecProj { mut signal, mut signal_vec, callback, len } = self.project();
1661
1662 let mut signal_value = None;
1663
1664 let signal_done = loop {
1665 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_change(cx)) {
1666 None => {
1667 true
1668 },
1669 Some(Poll::Pending) => {
1670 false
1671 },
1672 Some(Poll::Ready(None)) => {
1673 signal.set(None);
1674 true
1675 },
1676 Some(Poll::Ready(Some(value))) => {
1677 signal_value = Some(value);
1678 continue;
1679 },
1680 }
1681 };
1682
1683 fn new_signal_vec<A>(len: &mut usize) -> Poll<Option<VecDiff<A>>> {
1684 if *len == 0 {
1685 Poll::Pending
1686
1687 } else {
1688 *len = 0;
1689 Poll::Ready(Some(VecDiff::Replace { values: vec![] }))
1690 }
1691 }
1692
1693 fn calculate_len<A>(len: &mut usize, vec_diff: &VecDiff<A>) {
1694 match vec_diff {
1695 VecDiff::Replace { values } => {
1696 *len = values.len();
1697 },
1698 VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
1699 *len += 1;
1700 },
1701 VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
1702 *len -= 1;
1703 },
1704 VecDiff::Clear {} => {
1705 *len = 0;
1706 },
1707 VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {},
1708 }
1709 }
1710
1711 if let Some(value) = signal_value {
1712 signal_vec.set(Some(callback(value)));
1713
1714 match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1715 None => {
1716 if signal_done {
1717 Poll::Ready(None)
1718
1719 } else {
1720 new_signal_vec(len)
1721 }
1722 },
1723
1724 Some(Poll::Pending) => {
1725 new_signal_vec(len)
1726 },
1727
1728 Some(Poll::Ready(None)) => {
1729 signal_vec.set(None);
1730
1731 if signal_done {
1732 Poll::Ready(None)
1733
1734 } else {
1735 new_signal_vec(len)
1736 }
1737 },
1738
1739 Some(Poll::Ready(Some(vec_diff))) => {
1740 if *len == 0 {
1741 calculate_len(len, &vec_diff);
1742 Poll::Ready(Some(vec_diff))
1743
1744 } else {
1745 let mut values = vec![];
1746
1747 vec_diff.apply_to_vec(&mut values);
1748
1749 *len = values.len();
1750
1751 Poll::Ready(Some(VecDiff::Replace { values }))
1752 }
1753 },
1754 }
1755
1756 } else {
1757 match signal_vec.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1758 None => {
1759 if signal_done {
1760 Poll::Ready(None)
1761
1762 } else {
1763 Poll::Pending
1764 }
1765 },
1766
1767 Some(Poll::Pending) => {
1768 Poll::Pending
1769 },
1770
1771 Some(Poll::Ready(None)) => {
1772 signal_vec.set(None);
1773
1774 if signal_done {
1775 Poll::Ready(None)
1776
1777 } else {
1778 Poll::Pending
1779 }
1780 },
1781
1782 Some(Poll::Ready(Some(vec_diff))) => {
1783 calculate_len(len, &vec_diff);
1784 Poll::Ready(Some(vec_diff))
1785 },
1786 }
1787 }
1788 }
1789}