1use std::iter::Sum;
2use std::collections::VecDeque;
3use std::pin::Pin;
4use std::marker::Unpin;
5use std::cmp::Ordering;
6use std::future::Future;
7use std::task::{Poll, Context};
8use futures_core::Stream;
9use futures_util::stream;
10use futures_util::stream::StreamExt;
11use pin_project::pin_project;
12
13use crate::signal::{Signal, Mutable, ReadOnlyMutable};
14
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
19pub enum VecDiff<A> {
20 Replace {
21 values: Vec<A>,
22 },
23
24 InsertAt {
25 index: usize,
26 value: A,
27 },
28
29 UpdateAt {
30 index: usize,
31 value: A,
32 },
33
34 RemoveAt {
35 index: usize,
36 },
37
38 Move {
50 old_index: usize,
51 new_index: usize,
52 },
53
54 Push {
55 value: A,
56 },
57
58 Pop {},
59
60 Clear {},
61}
62
63impl<A> VecDiff<A> {
64 fn map<B, F>(self, mut callback: F) -> VecDiff<B> where F: FnMut(A) -> B {
66 match self {
67 VecDiff::Replace { values } => VecDiff::Replace { values: values.into_iter().map(callback).collect() },
69 VecDiff::InsertAt { index, value } => VecDiff::InsertAt { index, value: callback(value) },
70 VecDiff::UpdateAt { index, value } => VecDiff::UpdateAt { index, value: callback(value) },
71 VecDiff::Push { value } => VecDiff::Push { value: callback(value) },
72 VecDiff::RemoveAt { index } => VecDiff::RemoveAt { index },
73 VecDiff::Move { old_index, new_index } => VecDiff::Move { old_index, new_index },
74 VecDiff::Pop {} => VecDiff::Pop {},
75 VecDiff::Clear {} => VecDiff::Clear {},
76 }
77 }
78
79 pub fn apply_to_vec(self, vec: &mut Vec<A>) {
80 match self {
81 VecDiff::Replace { values } => {
82 *vec = values;
83 },
84 VecDiff::InsertAt { index, value } => {
85 vec.insert(index, value);
86 },
87 VecDiff::UpdateAt { index, value } => {
88 vec[index] = value;
89 },
90 VecDiff::Push { value } => {
91 vec.push(value);
92 },
93 VecDiff::RemoveAt { index } => {
94 vec.remove(index);
95 },
96 VecDiff::Move { old_index, new_index } => {
97 let value = vec.remove(old_index);
98 vec.insert(new_index, value);
99 },
100 VecDiff::Pop {} => {
101 vec.pop().unwrap();
102 },
103 VecDiff::Clear {} => {
104 vec.clear();
105 },
106 }
107 }
108}
109
110
111#[must_use = "SignalVecs do nothing unless polled"]
113pub trait SignalVec {
114 type Item;
115
116 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>>;
117}
118
119
120impl<'a, A> SignalVec for &'a mut A where A: ?Sized + SignalVec + Unpin {
122 type Item = A::Item;
123
124 #[inline]
125 fn poll_vec_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
126 A::poll_vec_change(Pin::new(&mut **self), cx)
127 }
128}
129
130impl<A> SignalVec for Box<A> where A: ?Sized + SignalVec + Unpin {
132 type Item = A::Item;
133
134 #[inline]
135 fn poll_vec_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
136 A::poll_vec_change(Pin::new(&mut *self), cx)
137 }
138}
139
140impl<A> SignalVec for Pin<A>
142 where A: Unpin + ::std::ops::DerefMut,
143 A::Target: SignalVec {
144 type Item = <<A as ::std::ops::Deref>::Target as SignalVec>::Item;
145
146 #[inline]
147 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
148 Pin::get_mut(self).as_mut().poll_vec_change(cx)
149 }
150}
151
152
153pub trait SignalVecExt: SignalVec {
155 #[inline]
200 fn map<A, F>(self, callback: F) -> Map<Self, F>
201 where F: FnMut(Self::Item) -> A,
202 Self: Sized {
203 Map {
204 signal: self,
205 callback,
206 }
207 }
208
209 #[inline]
210 fn map_signal<A, F>(self, callback: F) -> MapSignal<Self, A, F>
211 where A: Signal,
212 F: FnMut(Self::Item) -> A,
213 Self: Sized {
214 MapSignal {
215 signal: Some(self),
216 signals: vec![],
217 pending: VecDeque::new(),
218 callback,
219 }
220 }
221
222 #[inline]
253 fn chain<S>(self, other: S) -> Chain<Self, S>
254 where S: SignalVec<Item = Self::Item>,
255 Self: Sized {
256 Chain {
257 left: Some(self),
258 right: Some(other),
259 left_len: 0,
260 right_len: 0,
261 pending: VecDeque::new(),
262 }
263 }
264
265 #[inline]
266 fn to_signal_map<A, F>(self, callback: F) -> ToSignalMap<Self, F>
267 where F: FnMut(&[Self::Item]) -> A,
268 Self: Sized {
269 ToSignalMap {
270 signal: Some(self),
271 first: true,
272 values: vec![],
273 callback,
274 }
275 }
276
277 #[inline]
278 fn to_signal_cloned(self) -> ToSignalCloned<Self>
279 where Self::Item: Clone,
280 Self: Sized {
281 ToSignalCloned {
282 signal: self.to_signal_map(|x| x.to_vec()),
283 }
284 }
285
286 #[inline]
322 fn filter<F>(self, callback: F) -> Filter<Self, F>
323 where F: FnMut(&Self::Item) -> bool,
324 Self: Sized {
325 Filter {
326 indexes: vec![],
327 signal: self,
328 callback,
329 }
330 }
331
332 #[inline]
333 fn filter_signal_cloned<A, F>(self, callback: F) -> FilterSignalCloned<Self, A, F>
334 where A: Signal<Item = bool>,
335 F: FnMut(&Self::Item) -> A,
336 Self: Sized {
337 FilterSignalCloned {
338 signal: Some(self),
339 signals: vec![],
340 pending: VecDeque::new(),
341 callback,
342 }
343 }
344
345 #[inline]
346 fn filter_map<A, F>(self, callback: F) -> FilterMap<Self, F>
347 where F: FnMut(Self::Item) -> Option<A>,
348 Self: Sized {
349 FilterMap {
350 indexes: vec![],
351 signal: self,
352 callback,
353 }
354 }
355
356 #[inline]
358 fn sum(self) -> SumSignal<Self>
359 where Self::Item: for<'a> Sum<&'a Self::Item>,
360 Self: Sized {
361 SumSignal {
362 signal: Some(self),
363 first: true,
364 values: vec![],
365 }
366 }
367
368 #[inline]
370 fn flatten(self) -> Flatten<Self>
371 where Self::Item: SignalVec,
372 Self: Sized {
373 Flatten {
374 signal: Some(self),
375 inner: vec![],
376 pending: VecDeque::new(),
377 }
378 }
379
380 #[inline]
381 #[track_caller]
382 #[cfg(feature = "debug")]
383 fn debug(self) -> SignalVecDebug<Self> where Self: Sized, Self::Item: std::fmt::Debug {
384 SignalVecDebug {
385 signal: self,
386 location: std::panic::Location::caller(),
387 }
388 }
389
390 #[inline]
484 fn sort_by_cloned<F>(self, compare: F) -> SortByCloned<Self, F>
485 where F: FnMut(&Self::Item, &Self::Item) -> Ordering,
486 Self: Sized {
487 SortByCloned {
488 pending: None,
489 values: vec![],
490 indexes: vec![],
491 signal: self,
492 compare,
493 }
494 }
495
496 #[inline]
497 fn to_stream(self) -> SignalVecStream<Self> where Self: Sized {
498 SignalVecStream {
499 signal: self,
500 }
501 }
502
503 #[inline]
504 fn for_each<U, F>(self, callback: F) -> ForEach<Self, U, F>
506 where U: Future<Output = ()>,
507 F: FnMut(VecDiff<Self::Item>) -> U,
508 Self: Sized {
509 ForEach {
511 inner: SignalVecStream {
512 signal: self,
513 }.for_each(callback)
514 }
515 }
516
517 #[inline]
519 fn len(self) -> Len<Self> where Self: Sized {
520 Len {
521 signal: Some(self),
522 first: true,
523 len: 0,
524 }
525 }
526
527 #[inline]
528 fn is_empty(self) -> IsEmpty<Self> where Self: Sized {
529 IsEmpty {
530 len: self.len(),
531 old: None,
532 }
533 }
534
535 #[inline]
536 fn enumerate(self) -> Enumerate<Self> where Self: Sized {
537 Enumerate {
538 signal: self,
539 mutables: vec![],
540 }
541 }
542
543 #[inline]
544 fn delay_remove<A, F>(self, f: F) -> DelayRemove<Self, A, F>
545 where A: Future<Output = ()>,
546 F: FnMut(&Self::Item) -> A,
547 Self: Sized {
548 DelayRemove {
549 signal: Some(self),
550 futures: vec![],
551 pending: VecDeque::new(),
552 callback: f,
553 }
554 }
555
556 #[inline]
558 fn poll_vec_change_unpin(&mut self, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> where Self: Unpin + Sized {
559 Pin::new(self).poll_vec_change(cx)
560 }
561
562 #[inline]
563 fn boxed<'a>(self) -> Pin<Box<dyn SignalVec<Item = Self::Item> + Send + 'a>>
564 where Self: Sized + Send + 'a {
565 Box::pin(self)
566 }
567
568 #[inline]
569 fn boxed_local<'a>(self) -> Pin<Box<dyn SignalVec<Item = Self::Item> + 'a>>
570 where Self: Sized + 'a {
571 Box::pin(self)
572 }
573}
574
575impl<T: ?Sized> SignalVecExt for T where T: SignalVec {}
577
578
579pub type BoxSignalVec<'a, T> = Pin<Box<dyn SignalVec<Item = T> + Send + 'a>>;
584
585pub type LocalBoxSignalVec<'a, T> = Pin<Box<dyn SignalVec<Item = T> + 'a>>;
587
588
589#[derive(Debug)]
590#[must_use = "SignalVecs do nothing unless polled"]
591pub struct Always<A> {
592 values: Option<Vec<A>>,
593}
594
595impl<A> Unpin for Always<A> {}
596
597impl<A> SignalVec for Always<A> {
598 type Item = A;
599
600 fn poll_vec_change(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
601 Poll::Ready(self.values.take().map(|values| VecDiff::Replace { values }))
602 }
603}
604
605#[inline]
609pub fn always<A>(values: Vec<A>) -> Always<A> {
610 Always {
611 values: Some(values),
612 }
613}
614
615
616#[pin_project(project = StreamSignalVecProj)]
617#[derive(Debug)]
618#[must_use = "SignalVecs do nothing unless polled"]
619pub struct StreamSignalVec<S> {
620 #[pin]
621 stream: S,
622}
623
624impl<S> SignalVec for StreamSignalVec<S> where S: Stream {
625 type Item = S::Item;
626
627 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
628 let StreamSignalVecProj { stream } = self.project();
629
630 stream.poll_next(cx).map(|some| some.map(|value| VecDiff::Push { value }))
631 }
632}
633
634#[inline]
638pub fn from_stream<S>(stream: S) -> StreamSignalVec<S> {
639 StreamSignalVec {
640 stream,
641 }
642}
643
644
645#[pin_project]
646#[derive(Debug)]
647#[must_use = "Futures do nothing unless polled"]
648pub struct ForEach<A, B, C> {
649 #[pin]
650 inner: stream::ForEach<SignalVecStream<A>, B, C>,
651}
652
653impl<A, B, C> Future for ForEach<A, B, C>
654 where A: SignalVec,
655 B: Future<Output = ()>,
656 C: FnMut(VecDiff<A::Item>) -> B {
657 type Output = ();
658
659 #[inline]
660 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
661 self.project().inner.poll(cx)
662 }
663}
664
665
666#[pin_project(project = MapProj)]
667#[derive(Debug)]
668#[must_use = "SignalVecs do nothing unless polled"]
669pub struct Map<A, B> {
670 #[pin]
671 signal: A,
672 callback: B,
673}
674
675impl<A, B, F> SignalVec for Map<A, F>
676 where A: SignalVec,
677 F: FnMut(A::Item) -> B {
678 type Item = B;
679
680 #[inline]
682 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
683 let MapProj { signal, callback } = self.project();
684
685 signal.poll_vec_change(cx).map(|some| some.map(|change| change.map(|value| callback(value))))
686 }
687}
688
689
690#[pin_project(project = ChainProj)]
692#[derive(Debug)]
693#[must_use = "SignalVecs do nothing unless polled"]
694pub struct Chain<A, B> where A: SignalVec {
695 #[pin]
696 left: Option<A>,
697 #[pin]
698 right: Option<B>,
699 left_len: usize,
700 right_len: usize,
701 pending: VecDeque<VecDiff<A::Item>>,
702}
703
704impl<'a, A, B> ChainProj<'a, A, B> where A: SignalVec, B: SignalVec<Item = A::Item> {
705 fn is_replace(change: Option<Poll<Option<VecDiff<A::Item>>>>) -> Result<Vec<A::Item>, Option<Poll<Option<VecDiff<A::Item>>>>> {
706 match change {
707 Some(Poll::Ready(Some(VecDiff::Replace { values }))) => Ok(values),
708 _ => Err(change),
709 }
710 }
711
712 fn is_clear(change: Option<Poll<Option<VecDiff<A::Item>>>>) -> Result<(), Option<Poll<Option<VecDiff<A::Item>>>>> {
713 match change {
714 Some(Poll::Ready(Some(VecDiff::Clear {}))) => Ok(()),
715 _ => Err(change),
716 }
717 }
718
719 fn process_replace(&mut self, mut left_values: Vec<A::Item>, cx: &mut Context) -> Option<VecDiff<A::Item>> {
720 match Self::is_replace(self.right.as_mut().as_pin_mut().map(|s| s.poll_vec_change(cx))) {
721 Ok(mut right_values) => {
722 *self.left_len = left_values.len();
723 *self.right_len = right_values.len();
724 left_values.append(&mut right_values);
725 Some(VecDiff::Replace { values: left_values })
726 },
727 Err(change) => {
728 let removing = *self.left_len;
729 let adding = left_values.len();
730
731 *self.left_len = adding;
732
733 let output = if *self.right_len == 0 {
734 Some(VecDiff::Replace { values: left_values })
735
736 } else {
737 self.pending.reserve(removing + adding);
738
739 for index in (0..removing).rev() {
740 self.pending.push_back(VecDiff::RemoveAt { index });
741 }
742
743 for (index, value) in left_values.into_iter().enumerate() {
744 self.pending.push_back(VecDiff::InsertAt { index, value });
745 }
746
747 None
748 };
749
750 if let Some(change) = self.process_right_change(change) {
751 self.pending.push_back(change);
752 }
753
754 output
755 },
756 }
757 }
758
759 fn process_clear(&mut self, cx: &mut Context) -> Option<VecDiff<A::Item>> {
760 match Self::is_clear(self.right.as_mut().as_pin_mut().map(|s| s.poll_vec_change(cx))) {
761 Ok(()) => {
762 *self.left_len = 0;
763 *self.right_len = 0;
764 Some(VecDiff::Clear {})
765 },
766 Err(change) => {
767 let removing = *self.left_len;
768
769 *self.left_len = 0;
770
771 let output = if *self.right_len == 0 {
772 Some(VecDiff::Clear {})
773
774 } else {
775 self.pending.reserve(removing);
776
777 for index in (0..removing).rev() {
778 self.pending.push_back(VecDiff::RemoveAt { index });
779 }
780
781 None
782 };
783
784 if let Some(change) = self.process_right_change(change) {
785 self.pending.push_back(change);
786 }
787
788 output
789 },
790 }
791 }
792
793 fn process_left(&mut self, cx: &mut Context) -> Option<VecDiff<A::Item>> {
794 match self.left.as_mut().as_pin_mut().map(|s| s.poll_vec_change(cx)) {
795 Some(Poll::Ready(Some(change))) => {
796 match change {
797 VecDiff::Replace { values } => {
798 self.process_replace(values, cx)
799 },
800 VecDiff::InsertAt { index, value } => {
801 *self.left_len += 1;
802 Some(VecDiff::InsertAt { index, value })
803 },
804 VecDiff::UpdateAt { index, value } => {
805 Some(VecDiff::UpdateAt { index, value })
806 },
807 VecDiff::Move { old_index, new_index } => {
808 Some(VecDiff::Move { old_index, new_index })
809 },
810 VecDiff::RemoveAt { index } => {
811 *self.left_len -= 1;
812 Some(VecDiff::RemoveAt { index })
813 },
814 VecDiff::Push { value } => {
815 let index = *self.left_len;
816 *self.left_len += 1;
817
818 if *self.right_len == 0 {
819 Some(VecDiff::Push { value })
820
821 } else {
822 Some(VecDiff::InsertAt { index, value })
823 }
824 },
825 VecDiff::Pop {} => {
826 *self.left_len -= 1;
827
828 if *self.right_len == 0 {
829 Some(VecDiff::Pop {})
830
831 } else {
832 Some(VecDiff::RemoveAt { index: *self.left_len })
833 }
834 },
835 VecDiff::Clear {} => {
836 self.process_clear(cx)
837 },
838 }
839 },
840 Some(Poll::Ready(None)) => {
841 self.left.set(None);
842 None
843 },
844 Some(Poll::Pending) => None,
845 None => None,
846 }
847 }
848
849 fn process_right_change(&mut self, change: Option<Poll<Option<VecDiff<A::Item>>>>) -> Option<VecDiff<A::Item>> {
850 match change {
851 Some(Poll::Ready(Some(change))) => {
852 match change {
853 VecDiff::Replace { values } => {
854 let removing = *self.right_len;
855 let adding = values.len();
856
857 *self.right_len = adding;
858
859 if *self.left_len == 0 {
860 Some(VecDiff::Replace { values })
861
862 } else {
863 self.pending.reserve(removing + adding);
864
865 for _ in 0..removing {
866 self.pending.push_back(VecDiff::Pop {});
867 }
868
869 for value in values.into_iter() {
870 self.pending.push_back(VecDiff::Push { value });
871 }
872
873 None
874 }
875 },
876 VecDiff::InsertAt { index, value } => {
877 *self.right_len += 1;
878 Some(VecDiff::InsertAt { index: index + *self.left_len, value })
879 },
880 VecDiff::UpdateAt { index, value } => {
881 Some(VecDiff::UpdateAt { index: index + *self.left_len, value })
882 },
883 VecDiff::Move { old_index, new_index } => {
884 Some(VecDiff::Move {
885 old_index: old_index + *self.left_len,
886 new_index: new_index + *self.left_len,
887 })
888 },
889 VecDiff::RemoveAt { index } => {
890 *self.right_len -= 1;
891 Some(VecDiff::RemoveAt { index: index + *self.left_len })
892 },
893 VecDiff::Push { value } => {
894 *self.right_len += 1;
895 Some(VecDiff::Push { value })
896 },
897 VecDiff::Pop {} => {
898 *self.right_len -= 1;
899 Some(VecDiff::Pop {})
900 },
901 VecDiff::Clear {} => {
902 let removing = *self.right_len;
903
904 *self.right_len = 0;
905
906 if *self.left_len == 0 {
907 Some(VecDiff::Clear {})
908
909 } else {
910 self.pending.reserve(removing);
911
912 for _ in 0..removing {
913 self.pending.push_back(VecDiff::Pop {});
914 }
915
916 None
917 }
918 },
919 }
920 },
921 Some(Poll::Ready(None)) => {
922 self.right.set(None);
923 None
924 },
925 Some(Poll::Pending) => None,
926 None => None,
927 }
928 }
929
930 fn process_right(&mut self, cx: &mut Context) -> Option<VecDiff<A::Item>> {
931 let change = self.right.as_mut().as_pin_mut().map(|s| s.poll_vec_change(cx));
932 self.process_right_change(change)
933 }
934}
935
936impl<A, B> SignalVec for Chain<A, B>
937 where A: SignalVec,
938 B: SignalVec<Item = A::Item> {
939 type Item = A::Item;
940
941 #[inline]
942 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
943 let mut this = self.project();
944
945 loop {
946 if let Some(change) = this.pending.pop_front() {
947 return Poll::Ready(Some(change));
948 }
949
950 if let Some(change) = this.process_left(cx) {
951 return Poll::Ready(Some(change));
952
953 } else if let Some(change) = this.pending.pop_front() {
954 return Poll::Ready(Some(change));
955 }
956
957 if let Some(change) = this.process_right(cx) {
958 return Poll::Ready(Some(change));
959
960 } else if let Some(change) = this.pending.pop_front() {
961 return Poll::Ready(Some(change));
962 }
963
964 if this.left.is_none() && this.right.is_none() {
965 return Poll::Ready(None);
966
967 } else {
968 return Poll::Pending;
969 }
970 }
971 }
972}
973
974
975#[derive(Debug)]
976struct FlattenState<A> {
977 signal_vec: Option<Pin<Box<A>>>,
979 len: usize,
980}
981
982impl<A> FlattenState<A> where A: SignalVec {
983 fn new(signal_vec: A) -> Self {
984 Self {
985 signal_vec: Some(Box::pin(signal_vec)),
986 len: 0,
987 }
988 }
989
990 fn update_len(&mut self, diff: &VecDiff<A::Item>) {
991 match diff {
992 VecDiff::Replace { values } => {
993 self.len = values.len();
994 },
995 VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
996 self.len += 1;
997 },
998 VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
999 self.len -= 1;
1000 },
1001 VecDiff::Clear {} => {
1002 self.len = 0;
1003 },
1004 VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {},
1005 }
1006 }
1007
1008 fn poll(&mut self, cx: &mut Context) -> Option<Poll<Option<VecDiff<A::Item>>>> {
1009 self.signal_vec.as_mut().map(|s| s.poll_vec_change_unpin(cx))
1010 }
1011
1012 fn poll_values(&mut self, cx: &mut Context) -> Vec<A::Item> {
1013 let mut output = vec![];
1014
1015 loop {
1016 match self.poll(cx) {
1017 Some(Poll::Ready(Some(diff))) => {
1018 self.update_len(&diff);
1019 diff.apply_to_vec(&mut output);
1020 },
1021 Some(Poll::Ready(None)) => {
1022 self.signal_vec = None;
1023 break;
1024 },
1025 Some(Poll::Pending) | None => {
1026 break;
1027 },
1028 }
1029 }
1030
1031 output
1032 }
1033
1034 fn poll_pending(&mut self, cx: &mut Context, prev_len: usize, pending: &mut PendingBuilder<VecDiff<A::Item>>) -> bool {
1035 loop {
1036 return match self.poll(cx) {
1037 Some(Poll::Ready(Some(diff))) => {
1038 let old_len = self.len;
1039
1040 self.update_len(&diff);
1041
1042 match diff {
1043 VecDiff::Replace { values } => {
1044 for index in (0..old_len).rev() {
1045 pending.push(VecDiff::RemoveAt { index: prev_len + index });
1046 }
1047
1048 for (index, value) in values.into_iter().enumerate() {
1049 pending.push(VecDiff::InsertAt { index: prev_len + index, value });
1050 }
1051 },
1052 VecDiff::InsertAt { index, value } => {
1053 pending.push(VecDiff::InsertAt { index: prev_len + index, value });
1054 },
1055 VecDiff::UpdateAt { index, value } => {
1056 pending.push(VecDiff::UpdateAt { index: prev_len + index, value });
1057 },
1058 VecDiff::RemoveAt { index } => {
1059 pending.push(VecDiff::RemoveAt { index: prev_len + index });
1060 },
1061 VecDiff::Move { old_index, new_index } => {
1062 pending.push(VecDiff::Move { old_index: prev_len + old_index, new_index: prev_len + new_index });
1063 },
1064 VecDiff::Push { value } => {
1065 pending.push(VecDiff::InsertAt { index: prev_len + old_len, value });
1066 },
1067 VecDiff::Pop {} => {
1068 pending.push(VecDiff::RemoveAt { index: prev_len + (old_len - 1) });
1069 },
1070 VecDiff::Clear {} => {
1071 for index in (0..old_len).rev() {
1072 pending.push(VecDiff::RemoveAt { index: prev_len + index });
1073 }
1074 },
1075 }
1076
1077 continue;
1078 },
1079 Some(Poll::Ready(None)) => {
1080 self.signal_vec = None;
1081 true
1082 },
1083 Some(Poll::Pending) => {
1084 false
1085 },
1086 None => {
1087 true
1088 },
1089 };
1090 }
1091 }
1092}
1093
1094#[pin_project(project = FlattenProj)]
1095#[must_use = "SignalVecs do nothing unless polled"]
1096pub struct Flatten<A> where A: SignalVec, A::Item: SignalVec {
1097 #[pin]
1098 signal: Option<A>,
1099 inner: Vec<FlattenState<A::Item>>,
1100 pending: VecDeque<VecDiff<<A::Item as SignalVec>::Item>>,
1101}
1102
1103impl<A> std::fmt::Debug for Flatten<A>
1104 where A: SignalVec + std::fmt::Debug,
1105 A::Item: SignalVec + std::fmt::Debug,
1106 <A::Item as SignalVec>::Item: std::fmt::Debug {
1107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108 f.debug_struct("Flatten")
1109 .field("signal", &self.signal)
1110 .field("inner", &self.inner)
1111 .field("pending", &self.pending)
1112 .finish()
1113 }
1114}
1115
1116impl<A> SignalVec for Flatten<A>
1117 where A: SignalVec,
1118 A::Item: SignalVec {
1119 type Item = <A::Item as SignalVec>::Item;
1120
1121 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1123 let mut this = self.project();
1124
1125 if let Some(diff) = this.pending.pop_front() {
1126 return Poll::Ready(Some(diff));
1127 }
1128
1129 let mut pending: PendingBuilder<VecDiff<<A::Item as SignalVec>::Item>> = PendingBuilder::new();
1130
1131 let top_done = loop {
1132 break match this.signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1133 Some(Poll::Ready(Some(diff))) => {
1134 match diff {
1135 VecDiff::Replace { values } => {
1136 *this.inner = values.into_iter().map(FlattenState::new).collect();
1137
1138 let values = this.inner.iter_mut()
1139 .flat_map(|state| state.poll_values(cx))
1140 .collect();
1141
1142 return Poll::Ready(Some(VecDiff::Replace { values }));
1143 },
1144 VecDiff::InsertAt { index, value } => {
1145 this.inner.insert(index, FlattenState::new(value));
1146 },
1147 VecDiff::UpdateAt { index, value } => {
1148 this.inner[index] = FlattenState::new(value);
1149 },
1150 VecDiff::RemoveAt { index } => {
1151 this.inner.remove(index);
1152 },
1153 VecDiff::Move { old_index, new_index } => {
1154 let value = this.inner.remove(old_index);
1155 this.inner.insert(new_index, value);
1156 },
1157 VecDiff::Push { value } => {
1158 this.inner.push(FlattenState::new(value));
1159 },
1160 VecDiff::Pop {} => {
1161 this.inner.pop().unwrap();
1162 },
1163 VecDiff::Clear {} => {
1164 this.inner.clear();
1165 return Poll::Ready(Some(VecDiff::Clear {}));
1166 },
1167 }
1168
1169 continue;
1170 },
1171 Some(Poll::Ready(None)) => {
1172 this.signal.set(None);
1173 true
1174 },
1175 Some(Poll::Pending) => {
1176 false
1177 },
1178 None => {
1179 true
1180 },
1181 };
1182 };
1183
1184 let mut inner_done = true;
1185
1186 let mut prev_len = 0;
1187
1188 for state in this.inner.iter_mut() {
1189 let done = state.poll_pending(cx, prev_len, &mut pending);
1190
1191 if !done {
1192 inner_done = false;
1193 }
1194
1195 prev_len += state.len;
1196 }
1197
1198 if let Some(first) = pending.first {
1199 *this.pending = pending.rest;
1200 Poll::Ready(Some(first))
1201
1202 } else if inner_done && top_done {
1203 Poll::Ready(None)
1204
1205 } else {
1206 Poll::Pending
1207 }
1208 }
1209}
1210
1211
1212#[pin_project]
1213#[must_use = "Signals do nothing unless polled"]
1214pub struct ToSignalCloned<A> where A: SignalVec {
1215 #[pin]
1216 signal: ToSignalMap<A, fn(&[A::Item]) -> Vec<A::Item>>,
1217}
1218
1219impl<A> std::fmt::Debug for ToSignalCloned<A> where A: SignalVec {
1220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1221 f.write_str("ToSignalCloned { ... }")
1222 }
1223}
1224
1225impl<A> Signal for ToSignalCloned<A>
1226 where A: SignalVec {
1227 type Item = Vec<A::Item>;
1228
1229 #[inline]
1230 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1231 self.project().signal.poll_change(cx)
1232 }
1233}
1234
1235
1236#[pin_project(project = ToSignalMapProj)]
1237#[derive(Debug)]
1238#[must_use = "Signals do nothing unless polled"]
1239pub struct ToSignalMap<A, B> where A: SignalVec {
1240 #[pin]
1241 signal: Option<A>,
1242 first: bool,
1244 values: Vec<A::Item>,
1245 callback: B,
1246}
1247
1248impl<A, B, F> Signal for ToSignalMap<A, F>
1249 where A: SignalVec,
1250 F: FnMut(&[A::Item]) -> B {
1251 type Item = B;
1252
1253 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1254 let ToSignalMapProj { mut signal, first, values, callback } = self.project();
1255
1256 let mut changed = false;
1257
1258 let done = loop {
1259 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1260 None => {
1261 true
1262 },
1263 Some(Poll::Ready(None)) => {
1264 signal.set(None);
1265 true
1266 },
1267 Some(Poll::Ready(Some(change))) => {
1268 match change {
1269 VecDiff::Replace { values: new_values } => {
1270 *values = new_values;
1272 },
1273
1274 VecDiff::InsertAt { index, value } => {
1275 values.insert(index, value);
1276 },
1277
1278 VecDiff::UpdateAt { index, value } => {
1279 values[index] = value;
1281 },
1282
1283 VecDiff::RemoveAt { index } => {
1284 values.remove(index);
1285 },
1286
1287 VecDiff::Move { old_index, new_index } => {
1288 let old = values.remove(old_index);
1289 values.insert(new_index, old);
1290 },
1291
1292 VecDiff::Push { value } => {
1293 values.push(value);
1294 },
1295
1296 VecDiff::Pop {} => {
1297 values.pop().unwrap();
1298 },
1299
1300 VecDiff::Clear {} => {
1301 values.clear();
1303 },
1304 }
1305
1306 changed = true;
1307
1308 continue;
1309 },
1310 Some(Poll::Pending) => {
1311 false
1312 },
1313 };
1314 };
1315
1316 if changed || *first {
1317 *first = false;
1318 Poll::Ready(Some(callback(&values)))
1319
1320 } else if done {
1321 Poll::Ready(None)
1322
1323 } else {
1324 Poll::Pending
1325 }
1326 }
1327}
1328
1329
1330#[pin_project(project = EnumerateProj)]
1331#[derive(Debug)]
1332#[must_use = "SignalVecs do nothing unless polled"]
1333pub struct Enumerate<A> {
1334 #[pin]
1335 signal: A,
1336 mutables: Vec<Mutable<Option<usize>>>,
1337}
1338
1339impl<A> SignalVec for Enumerate<A> where A: SignalVec {
1340 type Item = (ReadOnlyMutable<Option<usize>>, A::Item);
1341
1342 #[inline]
1343 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1344 fn increment_indexes(range: &[Mutable<Option<usize>>]) {
1345 for mutable in range {
1346 mutable.replace_with(|value| value.map(|value| value + 1));
1347 }
1348 }
1349
1350 fn decrement_indexes(range: &[Mutable<Option<usize>>]) {
1351 for mutable in range {
1352 mutable.replace_with(|value| value.map(|value| value - 1));
1353 }
1354 }
1355
1356 let EnumerateProj { signal, mutables } = self.project();
1357
1358 match signal.poll_vec_change(cx) {
1360 Poll::Ready(Some(change)) => Poll::Ready(Some(match change {
1361 VecDiff::Replace { values } => {
1362 for mutable in mutables.drain(..) {
1363 mutable.set(None);
1365 }
1366
1367 *mutables = Vec::with_capacity(values.len());
1368
1369 VecDiff::Replace {
1370 values: values.into_iter().enumerate().map(|(index, value)| {
1371 let mutable = Mutable::new(Some(index));
1372 let read_only = mutable.read_only();
1373 mutables.push(mutable);
1374 (read_only, value)
1375 }).collect()
1376 }
1377 },
1378
1379 VecDiff::InsertAt { index, value } => {
1380 let mutable = Mutable::new(Some(index));
1381 let read_only = mutable.read_only();
1382
1383 mutables.insert(index, mutable);
1384
1385 increment_indexes(&mutables[(index + 1)..]);
1386
1387 VecDiff::InsertAt { index, value: (read_only, value) }
1388 },
1389
1390 VecDiff::UpdateAt { index, value } => {
1391 VecDiff::UpdateAt { index, value: (mutables[index].read_only(), value) }
1392 },
1393
1394 VecDiff::Push { value } => {
1395 let mutable = Mutable::new(Some(mutables.len()));
1396 let read_only = mutable.read_only();
1397
1398 mutables.push(mutable);
1399
1400 VecDiff::Push { value: (read_only, value) }
1401 },
1402
1403 VecDiff::Move { old_index, new_index } => {
1404 let mutable = mutables.remove(old_index);
1405
1406 mutables.insert(new_index, mutable.clone());
1408
1409 if old_index < new_index {
1411 decrement_indexes(&mutables[old_index..new_index]);
1412
1413 } else if new_index < old_index {
1414 increment_indexes(&mutables[(new_index + 1)..(old_index + 1)]);
1415 }
1416
1417 mutable.set(Some(new_index));
1419
1420 VecDiff::Move { old_index, new_index }
1421 },
1422
1423 VecDiff::RemoveAt { index } => {
1424 let mutable = mutables.remove(index);
1425
1426 decrement_indexes(&mutables[index..]);
1427
1428 mutable.set(None);
1430
1431 VecDiff::RemoveAt { index }
1432 },
1433
1434 VecDiff::Pop {} => {
1435 let mutable = mutables.pop().unwrap();
1436
1437 mutable.set(None);
1439
1440 VecDiff::Pop {}
1441 },
1442
1443 VecDiff::Clear {} => {
1444 for mutable in mutables.drain(..) {
1445 mutable.set(None);
1447 }
1448
1449 VecDiff::Clear {}
1450 },
1451 })),
1452 Poll::Ready(None) => Poll::Ready(None),
1453 Poll::Pending => Poll::Pending,
1454 }
1455 }
1456}
1457
1458
1459struct PendingBuilder<A> {
1462 first: Option<A>,
1463 rest: VecDeque<A>,
1464}
1465
1466impl<A> PendingBuilder<A> {
1467 fn new() -> Self {
1468 Self {
1469 first: None,
1470 rest: VecDeque::new(),
1471 }
1472 }
1473
1474 fn push(&mut self, value: A) {
1475 if let None = self.first {
1476 self.first = Some(value);
1477
1478 } else {
1479 self.rest.push_back(value);
1480 }
1481 }
1482}
1483
1484
1485fn unwrap<A>(x: Poll<Option<A>>) -> A {
1486 match x {
1487 Poll::Ready(Some(x)) => x,
1488 _ => panic!("Signal did not return a value"),
1489 }
1490}
1491
1492#[pin_project(project = MapSignalProj)]
1493#[derive(Debug)]
1494#[must_use = "SignalVecs do nothing unless polled"]
1495pub struct MapSignal<A, B, F> where B: Signal {
1496 #[pin]
1497 signal: Option<A>,
1498 signals: Vec<Option<Pin<Box<B>>>>,
1500 pending: VecDeque<VecDiff<B::Item>>,
1501 callback: F,
1502}
1503
1504impl<A, B, F> SignalVec for MapSignal<A, B, F>
1505 where A: SignalVec,
1506 B: Signal,
1507 F: FnMut(A::Item) -> B {
1508 type Item = B::Item;
1509
1510 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1511 let MapSignalProj { mut signal, signals, pending, callback } = self.project();
1512
1513 if let Some(diff) = pending.pop_front() {
1514 return Poll::Ready(Some(diff));
1515 }
1516
1517 let mut new_pending = PendingBuilder::new();
1518
1519 let done = loop {
1520 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1521 None => {
1522 true
1523 },
1524 Some(Poll::Ready(None)) => {
1525 signal.set(None);
1526 true
1527 },
1528 Some(Poll::Ready(Some(change))) => {
1529 new_pending.push(match change {
1530 VecDiff::Replace { values } => {
1531 *signals = Vec::with_capacity(values.len());
1532
1533 VecDiff::Replace {
1534 values: values.into_iter().map(|value| {
1535 let mut signal = Box::pin(callback(value));
1536 let poll = unwrap(signal.as_mut().poll_change(cx));
1537 signals.push(Some(signal));
1538 poll
1539 }).collect()
1540 }
1541 },
1542
1543 VecDiff::InsertAt { index, value } => {
1544 let mut signal = Box::pin(callback(value));
1545 let poll = unwrap(signal.as_mut().poll_change(cx));
1546 signals.insert(index, Some(signal));
1547 VecDiff::InsertAt { index, value: poll }
1548 },
1549
1550 VecDiff::UpdateAt { index, value } => {
1551 let mut signal = Box::pin(callback(value));
1552 let poll = unwrap(signal.as_mut().poll_change(cx));
1553 signals[index] = Some(signal);
1554 VecDiff::UpdateAt { index, value: poll }
1555 },
1556
1557 VecDiff::Push { value } => {
1558 let mut signal = Box::pin(callback(value));
1559 let poll = unwrap(signal.as_mut().poll_change(cx));
1560 signals.push(Some(signal));
1561 VecDiff::Push { value: poll }
1562 },
1563
1564 VecDiff::Move { old_index, new_index } => {
1565 let value = signals.remove(old_index);
1566 signals.insert(new_index, value);
1567 VecDiff::Move { old_index, new_index }
1568 },
1569
1570 VecDiff::RemoveAt { index } => {
1571 signals.remove(index);
1572 VecDiff::RemoveAt { index }
1573 },
1574
1575 VecDiff::Pop {} => {
1576 signals.pop().unwrap();
1577 VecDiff::Pop {}
1578 },
1579
1580 VecDiff::Clear {} => {
1581 signals.clear();
1582 VecDiff::Clear {}
1583 },
1584 });
1585
1586 continue;
1587 },
1588 Some(Poll::Pending) => false,
1589 };
1590 };
1591
1592 let mut has_pending = false;
1593
1594 for (index, signal) in signals.as_mut_slice().into_iter().enumerate() {
1597 match signal.as_mut().map(|s| s.as_mut().poll_change(cx)) {
1599 Some(Poll::Ready(Some(value))) => {
1600 new_pending.push(VecDiff::UpdateAt { index, value });
1601 },
1602 Some(Poll::Ready(None)) => {
1603 *signal = None;
1604 },
1605 Some(Poll::Pending) => {
1606 has_pending = true;
1607 },
1608 None => {},
1609 }
1610 }
1611
1612 if let Some(first) = new_pending.first {
1613 *pending = new_pending.rest;
1614 Poll::Ready(Some(first))
1615
1616 } else if done && !has_pending {
1617 Poll::Ready(None)
1618
1619 } else {
1620 Poll::Pending
1621 }
1622 }
1623}
1624
1625
1626#[derive(Debug)]
1627struct FilterSignalClonedState<A, B> {
1628 signal: Option<Pin<Box<B>>>,
1630 value: A,
1631 exists: bool,
1632}
1633
1634#[pin_project(project = FilterSignalClonedProj)]
1635#[derive(Debug)]
1636#[must_use = "SignalVecs do nothing unless polled"]
1637pub struct FilterSignalCloned<A, B, F> where A: SignalVec {
1638 #[pin]
1639 signal: Option<A>,
1640 signals: Vec<FilterSignalClonedState<A::Item, B>>,
1641 pending: VecDeque<VecDiff<A::Item>>,
1642 callback: F,
1643}
1644
1645impl<A, B, F> FilterSignalCloned<A, B, F> where A: SignalVec {
1646 fn find_index(signals: &[FilterSignalClonedState<A::Item, B>], index: usize) -> usize {
1647 signals[0..index].into_iter().filter(|x| x.exists).count()
1648 }
1649}
1650
1651impl<A, B, F> SignalVec for FilterSignalCloned<A, B, F>
1652 where A: SignalVec,
1653 A::Item: Clone,
1654 B: Signal<Item = bool>,
1655 F: FnMut(&A::Item) -> B {
1656 type Item = A::Item;
1657
1658 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
1659 let FilterSignalClonedProj { mut signal, signals, pending, callback } = self.project();
1660
1661 if let Some(diff) = pending.pop_front() {
1662 return Poll::Ready(Some(diff));
1663 }
1664
1665 let mut new_pending = PendingBuilder::new();
1666
1667 let done = loop {
1669 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1670 None => true,
1671 Some(Poll::Ready(None)) => {
1672 signal.set(None);
1673 true
1674 },
1675 Some(Poll::Ready(Some(change))) => {
1676 new_pending.push(match change {
1677 VecDiff::Replace { values } => {
1678 *signals = Vec::with_capacity(values.len());
1679
1680 VecDiff::Replace {
1681 values: values.into_iter().filter(|value| {
1682 let mut signal = Box::pin(callback(value));
1683 let poll = unwrap(signal.as_mut().poll_change(cx));
1684
1685 signals.push(FilterSignalClonedState {
1686 signal: Some(signal),
1687 value: value.clone(),
1688 exists: poll,
1689 });
1690
1691 poll
1692 }).collect()
1693 }
1694 },
1695
1696 VecDiff::InsertAt { index, value } => {
1697 let mut signal = Box::pin(callback(&value));
1698 let poll = unwrap(signal.as_mut().poll_change(cx));
1699
1700 signals.insert(index, FilterSignalClonedState {
1701 signal: Some(signal),
1702 value: value.clone(),
1703 exists: poll,
1704 });
1705
1706 if poll {
1707 VecDiff::InsertAt { index: Self::find_index(signals, index), value }
1708
1709 } else {
1710 continue;
1711 }
1712 },
1713
1714 VecDiff::UpdateAt { index, value } => {
1715 let mut signal = Box::pin(callback(&value));
1716 let new_poll = unwrap(signal.as_mut().poll_change(cx));
1717
1718 let old_poll = {
1719 let state = &mut signals[index];
1720
1721 let exists = state.exists;
1722
1723 state.signal = Some(signal);
1724 state.value = value.clone();
1725 state.exists = new_poll;
1726
1727 exists
1728 };
1729
1730 if new_poll {
1731 if old_poll {
1732 VecDiff::UpdateAt { index: Self::find_index(signals, index), value }
1733
1734 } else {
1735 VecDiff::InsertAt { index: Self::find_index(signals, index), value }
1736 }
1737
1738 } else {
1739 if old_poll {
1740 VecDiff::RemoveAt { index: Self::find_index(signals, index) }
1741
1742 } else {
1743 continue;
1744 }
1745 }
1746 },
1747
1748 VecDiff::Push { value } => {
1749 let mut signal = Box::pin(callback(&value));
1750 let poll = unwrap(signal.as_mut().poll_change(cx));
1751
1752 signals.push(FilterSignalClonedState {
1753 signal: Some(signal),
1754 value: value.clone(),
1755 exists: poll,
1756 });
1757
1758 if poll {
1759 VecDiff::Push { value }
1760
1761 } else {
1762 continue;
1763 }
1764 },
1765
1766 VecDiff::Move { old_index, new_index } => {
1768 let state = signals.remove(old_index);
1769 let exists = state.exists;
1770
1771 signals.insert(new_index, state);
1772
1773 if exists {
1774 VecDiff::Move {
1775 old_index: Self::find_index(signals, old_index),
1776 new_index: Self::find_index(signals, new_index),
1777 }
1778
1779 } else {
1780 continue;
1781 }
1782 },
1783
1784 VecDiff::RemoveAt { index } => {
1785 let state = signals.remove(index);
1786
1787 if state.exists {
1788 VecDiff::RemoveAt { index: Self::find_index(signals, index) }
1789
1790 } else {
1791 continue;
1792 }
1793 },
1794
1795 VecDiff::Pop {} => {
1796 let state = signals.pop().expect("Cannot pop from empty vec");
1797
1798 if state.exists {
1799 VecDiff::Pop {}
1800
1801 } else {
1802 continue;
1803 }
1804 },
1805
1806 VecDiff::Clear {} => {
1807 signals.clear();
1808 VecDiff::Clear {}
1809 },
1810 });
1811
1812 continue;
1813 },
1814 Some(Poll::Pending) => false,
1815 }
1816 };
1817
1818 let mut has_pending = false;
1819
1820 let mut real_index = 0;
1821
1822 for state in signals.as_mut_slice().into_iter() {
1827 let old = state.exists;
1828
1829 loop {
1831 match state.signal.as_mut().map(|s| s.as_mut().poll_change(cx)) {
1832 Some(Poll::Ready(Some(exists))) => {
1833 state.exists = exists;
1834 continue;
1835 },
1836 Some(Poll::Ready(None)) => {
1837 state.signal = None;
1838 },
1839 Some(Poll::Pending) => {
1840 has_pending = true;
1841 },
1842 None => {},
1843 }
1844 break;
1845 }
1846
1847 if state.exists != old {
1848 if state.exists {
1851 new_pending.push(VecDiff::InsertAt { index: real_index, value: state.value.clone() });
1852
1853 } else {
1854 new_pending.push(VecDiff::RemoveAt { index: real_index });
1855 }
1856 }
1857
1858 if state.exists {
1859 real_index += 1;
1860 }
1861 }
1862
1863 if let Some(first) = new_pending.first {
1864 *pending = new_pending.rest;
1865 Poll::Ready(Some(first))
1866
1867 } else if done && !has_pending {
1868 Poll::Ready(None)
1869
1870 } else {
1871 Poll::Pending
1872 }
1873 }
1874}
1875
1876
1877#[pin_project(project = IsEmptyProj)]
1878#[derive(Debug)]
1879#[must_use = "Signals do nothing unless polled"]
1880pub struct IsEmpty<A> {
1881 #[pin]
1882 len: Len<A>,
1883 old: Option<bool>,
1884}
1885
1886impl<A> Signal for IsEmpty<A> where A: SignalVec {
1887 type Item = bool;
1888
1889 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1890 let IsEmptyProj { len, old } = self.project();
1891
1892 match len.poll_change(cx) {
1893 Poll::Ready(Some(len)) => {
1894 let new = Some(len == 0);
1895
1896 if *old != new {
1897 *old = new;
1898 Poll::Ready(new)
1899
1900 } else {
1901 Poll::Pending
1902 }
1903 },
1904 Poll::Ready(None) => Poll::Ready(None),
1905 Poll::Pending => Poll::Pending,
1906 }
1907 }
1908}
1909
1910
1911#[pin_project(project = LenProj)]
1912#[derive(Debug)]
1913#[must_use = "Signals do nothing unless polled"]
1914pub struct Len<A> {
1915 #[pin]
1916 signal: Option<A>,
1917 first: bool,
1918 len: usize,
1919}
1920
1921impl<A> Signal for Len<A> where A: SignalVec {
1922 type Item = usize;
1923
1924 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1925 let LenProj { mut signal, first, len } = self.project();
1926
1927 let mut changed = false;
1928
1929 let done = loop {
1930 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
1931 None => {
1932 true
1933 },
1934 Some(Poll::Ready(None)) => {
1935 signal.set(None);
1936 true
1937 },
1938 Some(Poll::Ready(Some(change))) => {
1939 match change {
1940 VecDiff::Replace { values } => {
1941 let new_len = values.len();
1942
1943 if *len != new_len {
1944 *len = new_len;
1945 changed = true;
1946 }
1947 },
1948
1949 VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
1950 *len += 1;
1951 changed = true;
1952 },
1953
1954 VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {},
1955
1956 VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
1957 *len -= 1;
1958 changed = true;
1959 },
1960
1961 VecDiff::Clear {} => {
1962 if *len != 0 {
1963 *len = 0;
1964 changed = true;
1965 }
1966 },
1967 }
1968
1969 continue;
1970 },
1971 Some(Poll::Pending) => {
1972 false
1973 },
1974 };
1975 };
1976
1977 if changed || *first {
1978 *first = false;
1979 Poll::Ready(Some(*len))
1981
1982 } else if done {
1983 Poll::Ready(None)
1984
1985 } else {
1986 Poll::Pending
1987 }
1988 }
1989}
1990
1991
1992#[pin_project]
1993#[derive(Debug)]
1994#[must_use = "Streams do nothing unless polled"]
1995pub struct SignalVecStream<A> {
1996 #[pin]
1997 signal: A,
1998}
1999
2000impl<A: SignalVec> Stream for SignalVecStream<A> {
2001 type Item = VecDiff<A::Item>;
2002
2003 #[inline]
2004 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
2005 self.project().signal.poll_vec_change(cx)
2006 }
2007}
2008
2009
2010fn find_index(indexes: &[bool], index: usize) -> usize {
2011 indexes[0..index].into_iter().filter(|x| **x).count()
2012}
2013
2014fn poll_filter_map<A, S, F>(indexes: &mut Vec<bool>, mut signal: Pin<&mut S>, cx: &mut Context, mut callback: F) -> Poll<Option<VecDiff<A>>>
2015 where S: SignalVec,
2016 F: FnMut(S::Item) -> Option<A> {
2017
2018 loop {
2019 return match signal.as_mut().poll_vec_change(cx) {
2020 Poll::Pending => Poll::Pending,
2021 Poll::Ready(None) => Poll::Ready(None),
2022 Poll::Ready(Some(change)) => match change {
2023 VecDiff::Replace { values } => {
2024 *indexes = Vec::with_capacity(values.len());
2025
2026 Poll::Ready(Some(VecDiff::Replace {
2027 values: values.into_iter().filter_map(|value| {
2028 let value = callback(value);
2029 indexes.push(value.is_some());
2030 value
2031 }).collect()
2032 }))
2033 },
2034
2035 VecDiff::InsertAt { index, value } => {
2036 if let Some(value) = callback(value) {
2037 indexes.insert(index, true);
2038 Poll::Ready(Some(VecDiff::InsertAt { index: find_index(indexes, index), value }))
2039
2040 } else {
2041 indexes.insert(index, false);
2042 continue;
2043 }
2044 },
2045
2046 VecDiff::UpdateAt { index, value } => {
2047 if let Some(value) = callback(value) {
2048 if indexes[index] {
2049 Poll::Ready(Some(VecDiff::UpdateAt { index: find_index(indexes, index), value }))
2050
2051 } else {
2052 indexes[index] = true;
2053 Poll::Ready(Some(VecDiff::InsertAt { index: find_index(indexes, index), value }))
2054 }
2055
2056 } else {
2057 if indexes[index] {
2058 indexes[index] = false;
2059 Poll::Ready(Some(VecDiff::RemoveAt { index: find_index(indexes, index) }))
2060
2061 } else {
2062 continue;
2063 }
2064 }
2065 },
2066
2067 VecDiff::Move { old_index, new_index } => {
2069 if indexes.remove(old_index) {
2070 indexes.insert(new_index, true);
2071
2072 Poll::Ready(Some(VecDiff::Move {
2073 old_index: find_index(indexes, old_index),
2074 new_index: find_index(indexes, new_index),
2075 }))
2076
2077 } else {
2078 indexes.insert(new_index, false);
2079 continue;
2080 }
2081 },
2082
2083 VecDiff::RemoveAt { index } => {
2084 if indexes.remove(index) {
2085 Poll::Ready(Some(VecDiff::RemoveAt { index: find_index(indexes, index) }))
2086
2087 } else {
2088 continue;
2089 }
2090 },
2091
2092 VecDiff::Push { value } => {
2093 if let Some(value) = callback(value) {
2094 indexes.push(true);
2095 Poll::Ready(Some(VecDiff::Push { value }))
2096
2097 } else {
2098 indexes.push(false);
2099 continue;
2100 }
2101 },
2102
2103 VecDiff::Pop {} => {
2104 if indexes.pop().expect("Cannot pop from empty vec") {
2105 Poll::Ready(Some(VecDiff::Pop {}))
2106
2107 } else {
2108 continue;
2109 }
2110 },
2111
2112 VecDiff::Clear {} => {
2113 indexes.clear();
2114 Poll::Ready(Some(VecDiff::Clear {}))
2115 },
2116 },
2117 }
2118 }
2119}
2120
2121
2122#[pin_project(project = FilterProj)]
2123#[derive(Debug)]
2124#[must_use = "SignalVecs do nothing unless polled"]
2125pub struct Filter<A, B> {
2126 indexes: Vec<bool>,
2128 #[pin]
2129 signal: A,
2130 callback: B,
2131}
2132
2133impl<A, F> SignalVec for Filter<A, F>
2134 where A: SignalVec,
2135 F: FnMut(&A::Item) -> bool {
2136 type Item = A::Item;
2137
2138 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
2139 let FilterProj { indexes, signal, callback } = self.project();
2140
2141 poll_filter_map(indexes, signal, cx, move |value| {
2142 if callback(&value) {
2143 Some(value)
2144 } else {
2145 None
2146 }
2147 })
2148 }
2149}
2150
2151
2152#[pin_project(project = FilterMapProj)]
2153#[derive(Debug)]
2154#[must_use = "SignalVecs do nothing unless polled"]
2155pub struct FilterMap<S, F> {
2156 indexes: Vec<bool>,
2158 #[pin]
2159 signal: S,
2160 callback: F,
2161}
2162
2163impl<S, A, F> SignalVec for FilterMap<S, F>
2164 where S: SignalVec,
2165 F: FnMut(S::Item) -> Option<A> {
2166 type Item = A;
2167
2168 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
2169 let FilterMapProj { indexes, signal, callback } = self.project();
2170 poll_filter_map(indexes, signal, cx, callback)
2171 }
2172}
2173
2174
2175#[pin_project(project = SumSignalProj)]
2176#[derive(Debug)]
2177#[must_use = "Signals do nothing unless polled"]
2178pub struct SumSignal<A> where A: SignalVec {
2179 #[pin]
2180 signal: Option<A>,
2181 first: bool,
2182 values: Vec<A::Item>,
2183}
2184
2185impl<A> Signal for SumSignal<A>
2186 where A: SignalVec,
2187 A::Item: for<'a> Sum<&'a A::Item> {
2188 type Item = A::Item;
2189
2190 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
2191 let SumSignalProj { mut signal, first, values } = self.project();
2192
2193 let mut changed = false;
2194
2195 let done = loop {
2196 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
2197 None => {
2198 true
2199 },
2200 Some(Poll::Ready(None)) => {
2201 signal.set(None);
2202 true
2203 },
2204 Some(Poll::Ready(Some(change))) => {
2205 match change {
2206 VecDiff::Replace { values: new_values } => {
2207 *values = new_values;
2209 },
2210
2211 VecDiff::InsertAt { index, value } => {
2212 values.insert(index, value);
2214 },
2215
2216 VecDiff::Push { value } => {
2217 values.push(value);
2219 },
2220
2221 VecDiff::UpdateAt { index, value } => {
2222 values[index] = value;
2224 },
2225
2226 VecDiff::Move { old_index, new_index } => {
2227 let value = values.remove(old_index);
2228 values.insert(new_index, value);
2229 continue;
2231 },
2232
2233 VecDiff::RemoveAt { index } => {
2234 values.remove(index);
2236 },
2237
2238 VecDiff::Pop {} => {
2239 values.pop().unwrap();
2241 },
2242
2243 VecDiff::Clear {} => {
2244 values.clear();
2246 },
2247 }
2248
2249 changed = true;
2250 continue;
2251 },
2252 Some(Poll::Pending) => {
2253 false
2254 },
2255 };
2256 };
2257
2258 if changed || *first {
2259 *first = false;
2260
2261 Poll::Ready(Some(Sum::sum(values.iter())))
2262
2263 } else if done {
2264 Poll::Ready(None)
2265
2266 } else {
2267 Poll::Pending
2268 }
2269 }
2270}
2271
2272
2273#[pin_project(project = SignalVecDebugProj)]
2274#[derive(Debug)]
2275#[must_use = "SignalVecs do nothing unless polled"]
2276#[cfg(feature = "debug")]
2277pub struct SignalVecDebug<A> {
2278 #[pin]
2279 signal: A,
2280 location: &'static std::panic::Location<'static>,
2281}
2282
2283#[cfg(feature = "debug")]
2284impl<A> SignalVec for SignalVecDebug<A> where A: SignalVec, A::Item: std::fmt::Debug {
2285 type Item = A::Item;
2286
2287 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
2288 let SignalVecDebugProj { signal, location } = self.project();
2289
2290 let poll = signal.poll_vec_change(cx);
2291
2292 log::trace!("[{}] {:#?}", location, poll);
2293
2294 poll
2295 }
2296}
2297
2298
2299#[pin_project(project = SortByClonedProj)]
2300#[derive(Debug)]
2301#[must_use = "SignalVecs do nothing unless polled"]
2302pub struct SortByCloned<A, B> where A: SignalVec {
2303 pending: Option<VecDiff<A::Item>>,
2304 values: Vec<A::Item>,
2305 indexes: Vec<usize>,
2306 #[pin]
2307 signal: A,
2308 compare: B,
2309}
2310
2311impl<A, F> SortByCloned<A, F>
2312 where A: SignalVec,
2313 F: FnMut(&A::Item, &A::Item) -> Ordering {
2314
2315 fn binary_search(values: &[A::Item], indexes: &[usize], compare: &mut F, index: usize) -> Result<usize, usize> {
2317 let value = &values[index];
2318
2319 indexes.binary_search_by(|i| compare(&values[*i], value).then_with(|| i.cmp(&index)))
2321 }
2322
2323 fn binary_search_insert(values: &[A::Item], indexes: &[usize], compare: &mut F, index: usize) -> usize {
2324 match Self::binary_search(values, indexes, compare, index) {
2325 Ok(_) => panic!("Value already exists"),
2326 Err(new_index) => new_index,
2327 }
2328 }
2329
2330 fn binary_search_remove(values: &[A::Item], indexes: &[usize], compare: &mut F, index: usize) -> usize {
2331 Self::binary_search(values, indexes, compare, index).expect("Could not find value")
2332 }
2333
2334 fn increment_indexes(indexes: &mut Vec<usize>, start: usize) {
2335 for index in indexes {
2336 let i = *index;
2337
2338 if i >= start {
2339 *index = i + 1;
2340 }
2341 }
2342 }
2343
2344 fn decrement_indexes(indexes: &mut Vec<usize>, start: usize) {
2345 for index in indexes {
2346 let i = *index;
2347
2348 if i > start {
2349 *index = i - 1;
2350 }
2351 }
2352 }
2353
2354 fn insert_at(indexes: &mut Vec<usize>, sorted_index: usize, index: usize, value: A::Item) -> VecDiff<A::Item> {
2355 if sorted_index == indexes.len() {
2356 indexes.push(index);
2357
2358 VecDiff::Push {
2359 value,
2360 }
2361
2362 } else {
2363 indexes.insert(sorted_index, index);
2364
2365 VecDiff::InsertAt {
2366 index: sorted_index,
2367 value,
2368 }
2369 }
2370 }
2371
2372 fn remove_at(indexes: &mut Vec<usize>, sorted_index: usize) -> Poll<Option<VecDiff<A::Item>>> {
2373 if sorted_index == (indexes.len() - 1) {
2374 indexes.pop();
2375
2376 Poll::Ready(Some(VecDiff::Pop {}))
2377
2378 } else {
2379 indexes.remove(sorted_index);
2380
2381 Poll::Ready(Some(VecDiff::RemoveAt {
2382 index: sorted_index,
2383 }))
2384 }
2385 }
2386}
2387
2388impl<A, F> SignalVec for SortByCloned<A, F>
2390 where A: SignalVec,
2391 F: FnMut(&A::Item, &A::Item) -> Ordering,
2392 A::Item: Clone {
2393 type Item = A::Item;
2394
2395 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
2397 let SortByClonedProj { pending, values, indexes, mut signal, compare } = self.project();
2398
2399 match pending.take() {
2400 Some(value) => Poll::Ready(Some(value)),
2401 None => loop {
2402 return match signal.as_mut().poll_vec_change(cx) {
2403 Poll::Pending => Poll::Pending,
2404 Poll::Ready(None) => Poll::Ready(None),
2405 Poll::Ready(Some(change)) => match change {
2406 VecDiff::Replace { values: new_values } => {
2407 let mut new_indexes: Vec<usize> = (0..new_values.len()).collect();
2409
2410 new_indexes.sort_unstable_by(|a, b| compare(&new_values[*a], &new_values[*b]).then_with(|| a.cmp(b)));
2412
2413 let output = Poll::Ready(Some(VecDiff::Replace {
2414 values: new_indexes.iter().map(|i| new_values[*i].clone()).collect()
2416 }));
2417
2418 *values = new_values;
2419 *indexes = new_indexes;
2420
2421 output
2422 },
2423
2424 VecDiff::InsertAt { index, value } => {
2425 let new_value = value.clone();
2426
2427 values.insert(index, value);
2428
2429 Self::increment_indexes(indexes, index);
2430
2431 let sorted_index = Self::binary_search_insert(values, indexes, compare, index);
2432
2433 Poll::Ready(Some(Self::insert_at(indexes, sorted_index, index, new_value)))
2434 },
2435
2436 VecDiff::Push { value } => {
2437 let new_value = value.clone();
2438
2439 let index = values.len();
2440
2441 values.push(value);
2442
2443 let sorted_index = Self::binary_search_insert(values, indexes, compare, index);
2444
2445 Poll::Ready(Some(Self::insert_at(indexes, sorted_index, index, new_value)))
2446 },
2447
2448 VecDiff::UpdateAt { index, value } => {
2449 let old_index = Self::binary_search_remove(values, indexes, compare, index);
2450
2451 let old_output = Self::remove_at(indexes, old_index);
2452
2453 let new_value = value.clone();
2454
2455 values[index] = value;
2456
2457 let new_index = Self::binary_search_insert(values, indexes, compare, index);
2458
2459 if old_index == new_index {
2460 indexes.insert(new_index, index);
2461
2462 Poll::Ready(Some(VecDiff::UpdateAt {
2463 index: new_index,
2464 value: new_value,
2465 }))
2466
2467 } else {
2468 let new_output = Self::insert_at(indexes, new_index, index, new_value);
2469 *pending = Some(new_output);
2470
2471 old_output
2472 }
2473 },
2474
2475 VecDiff::RemoveAt { index } => {
2476 let sorted_index = Self::binary_search_remove(values, indexes, compare, index);
2477
2478 values.remove(index);
2479
2480 Self::decrement_indexes(indexes, index);
2481
2482 Self::remove_at(indexes, sorted_index)
2483 },
2484
2485 VecDiff::Move { old_index, new_index } => {
2487 let old_sorted_index = Self::binary_search_remove(values, indexes, compare, old_index);
2488
2489 let value = values.remove(old_index);
2490
2491 Self::decrement_indexes(indexes, old_index);
2492
2493 indexes.remove(old_sorted_index);
2494
2495 values.insert(new_index, value);
2496
2497 Self::increment_indexes(indexes, new_index);
2498
2499 let new_sorted_index = Self::binary_search_insert(values, indexes, compare, new_index);
2500
2501 indexes.insert(new_sorted_index, new_index);
2502
2503 if old_sorted_index == new_sorted_index {
2504 continue;
2505
2506 } else {
2507 Poll::Ready(Some(VecDiff::Move {
2508 old_index: old_sorted_index,
2509 new_index: new_sorted_index,
2510 }))
2511 }
2512 },
2513
2514 VecDiff::Pop {} => {
2515 let index = values.len() - 1;
2516
2517 let sorted_index = Self::binary_search_remove(values, indexes, compare, index);
2518
2519 values.pop();
2520
2521 Self::remove_at(indexes, sorted_index)
2522 },
2523
2524 VecDiff::Clear {} => {
2525 values.clear();
2526 indexes.clear();
2527 Poll::Ready(Some(VecDiff::Clear {}))
2528 },
2529 },
2530 }
2531 },
2532 }
2533 }
2534}
2535
2536
2537#[derive(Debug)]
2538struct DelayRemoveState<A> {
2539 future: Pin<Box<A>>,
2541 is_removing: bool,
2542}
2543
2544impl<A> DelayRemoveState<A> {
2545 #[inline]
2546 fn new(future: A) -> Self {
2547 Self {
2548 future: Box::pin(future),
2549 is_removing: false,
2550 }
2551 }
2552}
2553
2554#[pin_project(project = DelayRemoveProj)]
2555#[derive(Debug)]
2556#[must_use = "SignalVecs do nothing unless polled"]
2557pub struct DelayRemove<A, B, F> where A: SignalVec {
2558 #[pin]
2559 signal: Option<A>,
2560 futures: Vec<DelayRemoveState<B>>,
2561 pending: VecDeque<VecDiff<A::Item>>,
2562 callback: F,
2563}
2564
2565impl<S, A, F> DelayRemove<S, A, F>
2566 where S: SignalVec,
2567 A: Future<Output = ()>,
2568 F: FnMut(&S::Item) -> A {
2569
2570 fn remove_index(futures: &mut Vec<DelayRemoveState<A>>, index: usize) -> VecDiff<S::Item> {
2571 if index == (futures.len() - 1) {
2572 futures.pop();
2573 VecDiff::Pop {}
2574
2575 } else {
2576 futures.remove(index);
2577 VecDiff::RemoveAt { index }
2578 }
2579 }
2580
2581 fn should_remove(state: &mut DelayRemoveState<A>, cx: &mut Context) -> bool {
2582 assert!(!state.is_removing);
2583
2584 if state.future.as_mut().poll(cx).is_ready() {
2585 true
2586
2587 } else {
2588 state.is_removing = true;
2589 false
2590 }
2591 }
2592
2593 fn find_index(futures: &[DelayRemoveState<A>], parent_index: usize) -> Option<usize> {
2594 let mut seen = 0;
2595
2596 futures.into_iter().position(|state| {
2598 if state.is_removing {
2599 false
2600
2601 } else if seen == parent_index {
2602 true
2603
2604 } else {
2605 seen += 1;
2606 false
2607 }
2608 })
2609 }
2610
2611 fn find_last_index(futures: &[DelayRemoveState<A>]) -> Option<usize> {
2612 futures.into_iter().rposition(|state| !state.is_removing)
2613 }
2614
2615 fn remove_existing_futures(futures: &mut Vec<DelayRemoveState<A>>, pending: &mut PendingBuilder<VecDiff<S::Item>>, cx: &mut Context) {
2616 let mut indexes = vec![];
2617
2618 for (index, future) in futures.iter_mut().enumerate() {
2619 if !future.is_removing {
2620 if Self::should_remove(future, cx) {
2621 indexes.push(index);
2622 }
2623 }
2624 }
2625
2626 for index in indexes.into_iter().rev() {
2628 pending.push(Self::remove_index(futures, index));
2629 }
2630 }
2631}
2632
2633impl<S, A, F> SignalVec for DelayRemove<S, A, F>
2634 where S: SignalVec,
2635 A: Future<Output = ()>,
2636 F: FnMut(&S::Item) -> A {
2637 type Item = S::Item;
2638
2639 fn poll_vec_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
2641 let DelayRemoveProj { mut signal, futures, pending, callback } = self.project();
2642
2643 if let Some(diff) = pending.pop_front() {
2644 return Poll::Ready(Some(diff));
2645 }
2646
2647 let mut new_pending = PendingBuilder::new();
2648
2649 let done = loop {
2651 break match signal.as_mut().as_pin_mut().map(|signal| signal.poll_vec_change(cx)) {
2652 None => true,
2653 Some(Poll::Ready(None)) => {
2654 signal.set(None);
2655 true
2656 },
2657 Some(Poll::Ready(Some(change))) => {
2658 match change {
2659 VecDiff::Replace { values } => {
2660 if futures.len() == 0 {
2662 *futures = values.iter().map(|value| DelayRemoveState::new(callback(value))).collect();
2663 new_pending.push(VecDiff::Replace { values });
2664
2665 } else {
2667 Self::remove_existing_futures(futures, &mut new_pending, cx);
2668
2669 for value in values {
2671 let state = DelayRemoveState::new(callback(&value));
2672 futures.push(state);
2673 new_pending.push(VecDiff::Push { value });
2674 }
2675 }
2676 },
2677
2678 VecDiff::InsertAt { index, value } => {
2679 let index = Self::find_index(futures, index).unwrap_or_else(|| futures.len());
2680 let state = DelayRemoveState::new(callback(&value));
2681 futures.insert(index, state);
2682 new_pending.push(VecDiff::InsertAt { index, value });
2683 },
2684
2685 VecDiff::Push { value } => {
2686 let state = DelayRemoveState::new(callback(&value));
2687 futures.push(state);
2688 new_pending.push(VecDiff::Push { value });
2689 },
2690
2691 VecDiff::UpdateAt { index, value } => {
2692 let index = Self::find_index(futures, index).expect("Could not find value");
2694 let state = DelayRemoveState::new(callback(&value));
2695 futures[index] = state;
2696 new_pending.push(VecDiff::UpdateAt { index, value });
2697 },
2698
2699 VecDiff::Move { old_index, new_index } => {
2702 let old_index = Self::find_index(futures, old_index).expect("Could not find value");
2703
2704 let state = futures.remove(old_index);
2705
2706 let new_index = Self::find_index(futures, new_index).unwrap_or_else(|| futures.len());
2707
2708 futures.insert(new_index, state);
2709
2710 new_pending.push(VecDiff::Move { old_index, new_index });
2711 },
2712
2713 VecDiff::RemoveAt { index } => {
2714 let index = Self::find_index(futures, index).expect("Could not find value");
2715
2716 if Self::should_remove(&mut futures[index], cx) {
2717 new_pending.push(Self::remove_index(futures, index));
2718 }
2719 },
2720
2721 VecDiff::Pop {} => {
2722 let index = Self::find_last_index(futures).expect("Cannot pop from empty vec");
2723
2724 if Self::should_remove(&mut futures[index], cx) {
2725 new_pending.push(Self::remove_index(futures, index));
2726 }
2727 },
2728
2729 VecDiff::Clear {} => {
2730 Self::remove_existing_futures(futures, &mut new_pending, cx);
2731 },
2732 }
2733
2734 continue;
2735 },
2736 Some(Poll::Pending) => {
2737 false
2738 },
2739 };
2740 };
2741
2742 let mut pending_removals = false;
2743
2744 let mut indexes = vec![];
2745
2746 for (index, state) in futures.iter_mut().enumerate() {
2748 if state.is_removing {
2749 if state.future.as_mut().poll(cx).is_ready() {
2750 indexes.push(index);
2751
2752 } else {
2753 pending_removals = true;
2754 }
2755 }
2756 }
2757
2758 for index in indexes.into_iter().rev() {
2760 new_pending.push(Self::remove_index(futures, index));
2761 }
2762
2763 if let Some(first) = new_pending.first {
2764 *pending = new_pending.rest;
2765 Poll::Ready(Some(first))
2766
2767 } else if done && !pending_removals {
2768 Poll::Ready(None)
2769
2770 } else {
2771 Poll::Pending
2772 }
2773 }
2774}
2775
2776
2777mod mutable_vec {
2779 use super::{SignalVec, VecDiff};
2780 use std::pin::Pin;
2781 use std::marker::Unpin;
2782 use std::fmt;
2783 use std::ops::{Deref, Index, Range, RangeBounds, Bound};
2784 use std::slice::SliceIndex;
2785 use std::vec::Drain;
2786 use std::borrow::Borrow;
2787 use std::cmp::{Ord, Ordering};
2788 use std::hash::{Hash, Hasher};
2789 use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
2790 use std::task::{Poll, Context};
2791 use futures_channel::mpsc;
2792 use futures_util::stream::StreamExt;
2793
2794
2795 fn convert_range<R>(range: R, len: usize) -> Range<usize> where R: RangeBounds<usize> {
2797 let start = match range.start_bound() {
2798 Bound::Included(&start) => start,
2799 Bound::Excluded(start) => {
2800 start.checked_add(1).unwrap_or_else(|| panic!("attempted to index slice from after maximum usize"))
2801 }
2802 Bound::Unbounded => 0,
2803 };
2804
2805 let end = match range.end_bound() {
2806 Bound::Included(end) => {
2807 end.checked_add(1).unwrap_or_else(|| panic!("attempted to index slice up to maximum usize"))
2808 }
2809 Bound::Excluded(&end) => end,
2810 Bound::Unbounded => len,
2811 };
2812
2813 if start > end {
2814 panic!("slice index starts at {} but ends at {}", start, end);
2815 }
2816 if end > len {
2817 panic!("range end index {} out of range for slice of length {}", end, len);
2818 }
2819
2820 Range { start, end }
2821 }
2822
2823
2824 #[derive(Debug)]
2825 struct MutableVecState<A> {
2826 values: Vec<A>,
2827 senders: Vec<mpsc::UnboundedSender<VecDiff<A>>>,
2828 }
2829
2830 impl<A> MutableVecState<A> {
2831 #[inline]
2833 fn notify<B: FnMut() -> VecDiff<A>>(&mut self, mut change: B) {
2834 self.senders.retain(|sender| {
2835 sender.unbounded_send(change()).is_ok()
2836 });
2837 }
2838
2839 fn notify_with<B, C, D, E>(&mut self, value: B, mut clone: C, change: D, mut notify: E)
2841 where C: FnMut(&B) -> B,
2842 D: FnOnce(&mut Self, B),
2843 E: FnMut(B) -> VecDiff<A> {
2844
2845 let mut len = self.senders.len();
2846
2847 if len == 0 {
2848 change(self, value);
2849
2850 } else {
2851 let mut copy = Some(clone(&value));
2852
2853 change(self, value);
2854
2855 self.senders.retain(move |sender| {
2856 let value = copy.take().unwrap();
2857
2858 len -= 1;
2859
2860 let value = if len == 0 {
2861 value
2862
2863 } else {
2864 let v = clone(&value);
2865 copy = Some(value);
2866 v
2867 };
2868
2869 sender.unbounded_send(notify(value)).is_ok()
2870 });
2871 }
2872 }
2873
2874 fn pop(&mut self) -> Option<A> {
2875 let value = self.values.pop();
2876
2877 if value.is_some() {
2878 self.notify(|| VecDiff::Pop {});
2879 }
2880
2881 value
2882 }
2883
2884 fn remove(&mut self, index: usize) -> A {
2885 let len = self.values.len();
2886
2887 let value = self.values.remove(index);
2888
2889 if index == (len - 1) {
2890 self.notify(|| VecDiff::Pop {});
2891
2892 } else {
2893 self.notify(|| VecDiff::RemoveAt { index });
2894 }
2895
2896 value
2897 }
2898
2899 fn move_from_to(&mut self, old_index: usize, new_index: usize) {
2900 if old_index != new_index {
2901 let value = self.values.remove(old_index);
2902 self.values.insert(new_index, value);
2903 self.notify(|| VecDiff::Move { old_index, new_index });
2904 }
2905 }
2906
2907 fn clear(&mut self) {
2908 if self.values.len() > 0 {
2909 self.values.clear();
2910
2911 self.notify(|| VecDiff::Clear {});
2912 }
2913 }
2914
2915 fn retain<F>(&mut self, mut f: F) where F: FnMut(&A) -> bool {
2916 let mut len = self.values.len();
2917
2918 if len > 0 {
2919 let mut index = 0;
2920
2921 let mut removals = vec![];
2922
2923 self.values.retain(|value| {
2924 let output = f(value);
2925
2926 if !output {
2927 removals.push(index);
2928 }
2929
2930 index += 1;
2931
2932 output
2933 });
2934
2935 if self.values.len() == 0 {
2936 self.notify(|| VecDiff::Clear {});
2937
2938 } else {
2939 for index in removals.into_iter().rev() {
2941 len -= 1;
2942
2943 if index == len {
2944 self.notify(|| VecDiff::Pop {});
2945
2946 } else {
2947 self.notify(|| VecDiff::RemoveAt { index });
2948 }
2949 }
2950 }
2951 }
2952 }
2953
2954 fn remove_range(&mut self, range: Range<usize>, mut len: usize) {
2955 if range.end > range.start {
2956 if range.start == 0 && range.end == len {
2957 self.notify(|| VecDiff::Clear {});
2958
2959 } else {
2960 for index in range.into_iter().rev() {
2962 len -= 1;
2963
2964 if index == len {
2965 self.notify(|| VecDiff::Pop {});
2966
2967 } else {
2968 self.notify(|| VecDiff::RemoveAt { index });
2969 }
2970 }
2971 }
2972 }
2973 }
2974
2975 fn drain<R>(&mut self, range: R) -> Drain<'_, A> where R: RangeBounds<usize> {
2976 let len = self.values.len();
2977 let range = convert_range(range, len);
2978 self.remove_range(range.clone(), len);
2979 self.values.drain(range)
2980 }
2981
2982 fn truncate(&mut self, len: usize) {
2983 let end = self.values.len();
2984 let range = Range {
2985 start: len,
2986 end: end,
2987 };
2988 self.remove_range(range, end);
2989 self.values.truncate(len)
2990 }
2991 }
2992
2993 impl<A: Copy> MutableVecState<A> {
2994 fn copy_values(values: &Vec<A>) -> Vec<A> {
2998 let mut output: Vec<A> = vec![];
2999 output.extend(values);
3000 output
3001 }
3002
3003 fn signal_vec_copy(&mut self) -> MutableSignalVec<A> {
3004 let (sender, receiver) = mpsc::unbounded();
3005
3006 if self.values.len() > 0 {
3007 sender.unbounded_send(VecDiff::Replace { values: Self::copy_values(&self.values) }).unwrap();
3008 }
3009
3010 self.senders.push(sender);
3011
3012 MutableSignalVec {
3013 receiver
3014 }
3015 }
3016
3017 fn push_copy(&mut self, value: A) {
3018 self.values.push(value);
3019 self.notify(|| VecDiff::Push { value });
3020 }
3021
3022 fn insert_copy(&mut self, index: usize, value: A) {
3023 if index == self.values.len() {
3024 self.push_copy(value);
3025
3026 } else {
3027 self.values.insert(index, value);
3028 self.notify(|| VecDiff::InsertAt { index, value });
3029 }
3030 }
3031
3032 fn set_copy(&mut self, index: usize, value: A) {
3033 self.values[index] = value;
3034 self.notify(|| VecDiff::UpdateAt { index, value });
3035 }
3036
3037 fn replace_copy(&mut self, values: Vec<A>) {
3038 self.notify_with(values,
3039 Self::copy_values,
3040 |this, values| this.values = values,
3041 |values| VecDiff::Replace { values });
3042 }
3043 }
3044
3045 impl<A: Clone> MutableVecState<A> {
3046 #[inline]
3047 fn notify_clone<B, C, D>(&mut self, value: B, change: C, notify: D)
3048 where B: Clone,
3049 C: FnOnce(&mut Self, B),
3050 D: FnMut(B) -> VecDiff<A> {
3051
3052 self.notify_with(value, |a| a.clone(), change, notify)
3053 }
3054
3055 fn signal_vec_clone(&mut self) -> MutableSignalVec<A> {
3057 let (sender, receiver) = mpsc::unbounded();
3058
3059 if self.values.len() > 0 {
3060 sender.unbounded_send(VecDiff::Replace { values: self.values.clone() }).unwrap();
3061 }
3062
3063 self.senders.push(sender);
3064
3065 MutableSignalVec {
3066 receiver
3067 }
3068 }
3069
3070 fn push_clone(&mut self, value: A) {
3071 self.notify_clone(value,
3072 |this, value| this.values.push(value),
3073 |value| VecDiff::Push { value });
3074 }
3075
3076 fn insert_clone(&mut self, index: usize, value: A) {
3077 if index == self.values.len() {
3078 self.push_clone(value);
3079
3080 } else {
3081 self.notify_clone(value,
3082 |this, value| this.values.insert(index, value),
3083 |value| VecDiff::InsertAt { index, value });
3084 }
3085 }
3086
3087 fn set_clone(&mut self, index: usize, value: A) {
3088 self.notify_clone(value,
3089 |this, value| this.values[index] = value,
3090 |value| VecDiff::UpdateAt { index, value });
3091 }
3092
3093 fn replace_clone(&mut self, values: Vec<A>) {
3094 self.notify_clone(values,
3095 |this, values| this.values = values,
3096 |values| VecDiff::Replace { values });
3097 }
3098
3099 fn extend<I>(&mut self, iter: I) where I: IntoIterator<Item = A> {
3101 for value in iter {
3102 self.push_clone(value);
3103 }
3104 }
3105 }
3106
3107
3108 macro_rules! make_shared {
3110 ($t:ty, $r:ty) => {
3111 impl<'a, A> $t {
3112 #[inline]
3113 pub fn as_slice(&self) -> &[A] {
3114 self
3115 }
3116
3117 #[inline]
3118 pub fn capacity(&self) -> usize {
3119 self.lock.values.capacity()
3120 }
3121 }
3122
3123 impl<'a, 'b, A, B> PartialEq<&'b [B]> for $t where A: PartialEq<B> {
3124 #[inline] fn eq(&self, other: &&'b [B]) -> bool { self[..] == other[..] }
3125 #[inline] fn ne(&self, other: &&'b [B]) -> bool { self[..] != other[..] }
3126 }
3127
3128 impl<'a, 'b, A, B> PartialEq<$r> for $t where A: PartialEq<B> {
3129 #[inline] fn eq(&self, other: &$r) -> bool { self[..] == other[..] }
3130 #[inline] fn ne(&self, other: &$r) -> bool { self[..] != other[..] }
3131 }
3132
3133 impl<'a, A> Eq for $t where A: Eq {}
3134
3135 impl<'a, A> Borrow<[A]> for $t {
3136 #[inline]
3137 fn borrow(&self) -> &[A] {
3138 &self[..]
3139 }
3140 }
3141
3142 impl<'a, A> PartialOrd for $t where A: PartialOrd {
3143 #[inline]
3144 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3145 PartialOrd::partial_cmp(&**self, &**other)
3146 }
3147 }
3148
3149 impl<'a, A> Ord for $t where A: Ord {
3150 #[inline]
3151 fn cmp(&self, other: &Self) -> Ordering {
3152 Ord::cmp(&**self, &**other)
3153 }
3154 }
3155
3156 impl<'a, A, I> Index<I> for $t where I: SliceIndex<[A]> {
3157 type Output = I::Output;
3158
3159 #[inline]
3160 fn index(&self, index: I) -> &Self::Output {
3161 Index::index(&**self, index)
3162 }
3163 }
3164
3165 impl<'a, A> Deref for $t {
3166 type Target = [A];
3167
3168 #[inline]
3169 fn deref(&self) -> &Self::Target {
3170 &self.lock.values
3171 }
3172 }
3173
3174 impl<'a, A> Hash for $t where A: Hash {
3175 #[inline]
3176 fn hash<H>(&self, state: &mut H) where H: Hasher {
3177 Hash::hash(&**self, state)
3178 }
3179 }
3180
3181 impl<'a, A> AsRef<$t> for $t {
3182 #[inline]
3183 fn as_ref(&self) -> &$t {
3184 self
3185 }
3186 }
3187
3188 impl<'a, A> AsRef<[A]> for $t {
3189 #[inline]
3190 fn as_ref(&self) -> &[A] {
3191 self
3192 }
3193 }
3194 };
3195 }
3196
3197
3198 #[derive(Debug)]
3199 pub struct MutableVecLockRef<'a, A> where A: 'a {
3200 lock: RwLockReadGuard<'a, MutableVecState<A>>,
3201 }
3202
3203 make_shared!(MutableVecLockRef<'a, A>, MutableVecLockRef<'b, B>);
3204
3205
3206 #[derive(Debug)]
3212 pub struct MutableVecLockMut<'a, A> where A: 'a {
3213 lock: RwLockWriteGuard<'a, MutableVecState<A>>,
3214 }
3215
3216 impl<'a, A> MutableVecLockMut<'a, A> {
3217 #[inline]
3218 pub fn pop(&mut self) -> Option<A> {
3219 self.lock.pop()
3220 }
3221
3222 #[inline]
3223 pub fn remove(&mut self, index: usize) -> A {
3224 self.lock.remove(index)
3225 }
3226
3227 #[inline]
3228 pub fn clear(&mut self) {
3229 self.lock.clear()
3230 }
3231
3232 #[inline]
3233 pub fn move_from_to(&mut self, old_index: usize, new_index: usize) {
3234 self.lock.move_from_to(old_index, new_index);
3235 }
3236
3237 pub fn swap(&mut self, a: usize, b: usize) {
3238 if a < b {
3239 self.move_from_to(a, b);
3240 self.move_from_to(b - 1, a);
3241
3242 } else if a > b {
3243 self.move_from_to(a, b);
3244 self.move_from_to(b + 1, a);
3245 }
3246 }
3247
3248 #[inline]
3249 pub fn retain<F>(&mut self, f: F) where F: FnMut(&A) -> bool {
3250 self.lock.retain(f)
3251 }
3252
3253 #[inline]
3255 pub fn drain<R>(&mut self, range: R) -> Drain<'_, A> where R: RangeBounds<usize> {
3256 self.lock.drain(range)
3257 }
3258
3259 #[inline]
3260 pub fn truncate(&mut self, len: usize) {
3261 self.lock.truncate(len)
3262 }
3263
3264 pub fn reverse(&mut self) {
3265 let len = self.len();
3266
3267 if len > 1 {
3268 let end = len - 1;
3269 let mut i = 0;
3270
3271 while i < end {
3272 self.move_from_to(end, i);
3273 i += 1;
3274 }
3275 }
3276 }
3277
3278 #[inline]
3279 pub fn reserve(&mut self, additional: usize) {
3280 self.lock.values.reserve(additional)
3281 }
3282
3283 #[inline]
3284 pub fn reserve_exact(&mut self, additional: usize) {
3285 self.lock.values.reserve_exact(additional)
3286 }
3287
3288 #[inline]
3289 pub fn shrink_to_fit(&mut self) {
3290 self.lock.values.shrink_to_fit()
3291 }
3292 }
3293
3294 impl<'a, A> MutableVecLockMut<'a, A> where A: Copy {
3295 #[inline]
3296 pub fn push(&mut self, value: A) {
3297 self.lock.push_copy(value)
3298 }
3299
3300 #[inline]
3301 pub fn insert(&mut self, index: usize, value: A) {
3302 self.lock.insert_copy(index, value)
3303 }
3304
3305 #[inline]
3307 pub fn set(&mut self, index: usize, value: A) {
3308 self.lock.set_copy(index, value)
3309 }
3310
3311 #[inline]
3312 pub fn replace(&mut self, values: Vec<A>) {
3313 self.lock.replace_copy(values)
3314 }
3315 }
3316
3317 impl<'a, A> MutableVecLockMut<'a, A> where A: Clone {
3318 #[inline]
3319 pub fn push_cloned(&mut self, value: A) {
3320 self.lock.push_clone(value)
3321 }
3322
3323 #[inline]
3324 pub fn insert_cloned(&mut self, index: usize, value: A) {
3325 self.lock.insert_clone(index, value)
3326 }
3327
3328 #[inline]
3330 pub fn set_cloned(&mut self, index: usize, value: A) {
3331 self.lock.set_clone(index, value)
3332 }
3333
3334 #[inline]
3335 pub fn replace_cloned(&mut self, values: Vec<A>) {
3336 self.lock.replace_clone(values)
3337 }
3338
3339 pub fn apply_vec_diff(this: &mut Self, diff: VecDiff<A>) {
3340 match diff {
3341 VecDiff::Replace { values } => this.replace_cloned(values),
3342 VecDiff::InsertAt { index, value } => this.insert_cloned(index, value),
3343 VecDiff::UpdateAt { index, value } => this.set_cloned(index, value),
3344 VecDiff::RemoveAt { index } => { this.remove(index); },
3345 VecDiff::Move { old_index, new_index } => this.move_from_to(old_index, new_index),
3346 VecDiff::Push { value } => this.push_cloned(value),
3347 VecDiff::Pop {} => { this.pop().unwrap(); },
3348 VecDiff::Clear {} => this.clear(),
3349 }
3350 }
3351 }
3352
3353 make_shared!(MutableVecLockMut<'a, A>, MutableVecLockMut<'b, B>);
3354
3355 impl<'a, A> Extend<A> for MutableVecLockMut<'a, A> where A: Clone {
3357 #[inline]
3358 fn extend<I>(&mut self, iter: I) where I: IntoIterator<Item = A> {
3359 self.lock.extend(iter)
3360 }
3361 }
3362
3363
3364 pub struct MutableVec<A>(Arc<RwLock<MutableVecState<A>>>);
3367
3368 impl<A> MutableVec<A> {
3369 #[inline]
3371 pub fn new_with_values(values: Vec<A>) -> Self {
3372 Self::from(values)
3373 }
3374
3375 #[inline]
3376 pub fn new() -> Self {
3377 Self::new_with_values(vec![])
3378 }
3379
3380 #[inline]
3381 pub fn with_capacity(capacity: usize) -> Self {
3382 Self::new_with_values(Vec::with_capacity(capacity))
3383 }
3384
3385 #[inline]
3387 pub fn lock_ref(&self) -> MutableVecLockRef<A> {
3388 MutableVecLockRef {
3389 lock: self.0.read().unwrap(),
3390 }
3391 }
3392
3393 #[inline]
3395 pub fn lock_mut(&self) -> MutableVecLockMut<A> {
3396 MutableVecLockMut {
3397 lock: self.0.write().unwrap(),
3398 }
3399 }
3400 }
3401
3402 impl<T, A> From<T> for MutableVec<A> where Vec<A>: From<T> {
3403 #[inline]
3404 fn from(values: T) -> Self {
3405 MutableVec(Arc::new(RwLock::new(MutableVecState {
3406 values: values.into(),
3407 senders: vec![],
3408 })))
3409 }
3410 }
3411
3412 impl<A: Copy> MutableVec<A> {
3413 #[inline]
3414 pub fn signal_vec(&self) -> MutableSignalVec<A> {
3415 self.0.write().unwrap().signal_vec_copy()
3416 }
3417 }
3418
3419 impl<A: Clone> MutableVec<A> {
3420 #[inline]
3421 pub fn signal_vec_cloned(&self) -> MutableSignalVec<A> {
3422 self.0.write().unwrap().signal_vec_clone()
3423 }
3424 }
3425
3426 impl<A> fmt::Debug for MutableVec<A> where A: fmt::Debug {
3427 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
3428 let state = self.0.read().unwrap();
3429
3430 fmt.debug_tuple("MutableVec")
3431 .field(&state.values)
3432 .finish()
3433 }
3434 }
3435
3436 #[cfg(feature = "serde")]
3437 impl<T> serde::Serialize for MutableVec<T> where T: serde::Serialize {
3438 #[inline]
3439 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
3440 self.0.read().unwrap().values.serialize(serializer)
3441 }
3442 }
3443
3444 #[cfg(feature = "serde")]
3445 impl<'de, T> serde::Deserialize<'de> for MutableVec<T> where T: serde::Deserialize<'de> {
3446 #[inline]
3447 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer<'de> {
3448 <Vec<T>>::deserialize(deserializer).map(MutableVec::new_with_values)
3449 }
3450 }
3451
3452 impl<T> Default for MutableVec<T> {
3453 #[inline]
3454 fn default() -> Self {
3455 MutableVec::new()
3456 }
3457 }
3458
3459 impl<T> Clone for MutableVec<T> {
3460 #[inline]
3461 fn clone(&self) -> Self {
3462 MutableVec(self.0.clone())
3463 }
3464 }
3465
3466 #[derive(Debug)]
3467 #[must_use = "SignalVecs do nothing unless polled"]
3468 pub struct MutableSignalVec<A> {
3469 receiver: mpsc::UnboundedReceiver<VecDiff<A>>,
3470 }
3471
3472 impl<A> Unpin for MutableSignalVec<A> {}
3473
3474 impl<A> SignalVec for MutableSignalVec<A> {
3475 type Item = A;
3476
3477 #[inline]
3478 fn poll_vec_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<VecDiff<Self::Item>>> {
3479 self.receiver.poll_next_unpin(cx)
3480 }
3481 }
3482}
3483
3484pub use self::mutable_vec::*;