1use crate::{
2 threads::spawn_listener,
3 traits::{
4 change::Change,
5 serial::Serial,
6 view::View,
7 watch::{Event, Watch},
8 },
9};
10
11use self::{
12 chain::Chain, filter::Filter, filter_inserter::FilterInserter, filter_map::FilterMap,
13 filter_reducer::FilterReducer, index::Index, inserter::Inserter, map::Map, reducer::Reducer,
14 transform::Transform, zip::Zip,
15};
16
17pub mod chain;
19pub mod filter;
21pub mod filter_inserter;
23pub mod filter_map;
25pub mod filter_reducer;
27pub mod index;
29pub mod inserter;
31pub mod map;
33pub mod reducer;
35pub mod transform;
37pub mod zip;
39
40pub trait Operate
42where
43 Self: Sized + Clone + Sync + Send,
44{
45 fn map<M, Mapped>(&self, mapper: M) -> Map<Self, Mapped>
47 where
48 Self: View + Watch,
49 M: 'static + Fn(&Self::Key, &Self::Value) -> Mapped + Sync + Send,
50 Mapped: 'static + Clone + Send + Sync,
51 {
52 Map::new(self.clone(), mapper)
53 }
54 fn transform<K, V, T>(&self, transformer: T) -> Transform<Self, K, V>
56 where
57 Self: View + Watch,
58 T: 'static + Fn(&Self::Key, &Self::Value) -> Vec<(K, V)> + Sync + Send,
59 K: Serial,
60 V: Serial,
61 {
62 Transform::new(self.clone(), transformer)
63 }
64 fn index<F, I>(&self, indexer: F) -> Index<Self, I>
66 where
67 Self: View + Watch,
68 F: 'static + Fn(&Self::Key, &Self::Value) -> Vec<I> + Sync + Send,
69 I: Serial,
70 {
71 Index::new(self.clone(), indexer)
72 }
73 fn chain<B>(&self, other: &B) -> Chain<Self, B>
75 where
76 Self: View + Sync + Send + Watch,
77 B: View<Key = Self::Key, Value = Self::Value> + Watch + Sync + Send,
78 {
79 Chain::new(self.clone(), other.clone())
80 }
81 fn zip<B>(&self, other: &B) -> Zip<Self, B>
83 where
84 Self: View + Sync + Send + Watch,
85 B: View<Key = Self::Key> + Watch + Sync + Send,
86 {
87 Zip::new(self.clone(), other.clone())
88 }
89 fn unzip<A, B>(&self) -> (Map<Self, A>, Map<Self, B>)
91 where
92 Self: View<Value = (A, B)> + Watch,
93 A: Serial,
94 B: Serial,
95 {
96 let a = self.map(|_, (a, _)| a.clone());
97 let b = self.map(|_, (_, b)| b.clone());
98 (a, b)
99 }
100 fn filter<F>(&self, filter: F) -> Filter<Self>
102 where
103 Self: View + Watch,
104 F: 'static + Fn(&Self::Key, &Self::Value) -> bool + Sync + Send,
105 {
106 Filter::new(self.clone(), filter)
107 }
108 fn filter_map<F, Mapped>(&self, mapper: F) -> FilterMap<Self, Mapped>
110 where
111 Self: View + Watch,
112 F: 'static + Fn(&Self::Key, &Self::Value) -> Option<Mapped> + Sync + Send,
113 Mapped: 'static + Clone + Send + Sync,
114 {
115 FilterMap::new(self.clone(), mapper)
116 }
117 fn filter_reducer<ReduceFn, Merge>(&self, reducer: ReduceFn) -> FilterReducer<Self, Merge>
119 where
120 Self: View + Change,
121 ReduceFn: 'static
122 + Fn(Option<<Self as Change>::Value>, Merge) -> Option<<Self as Change>::Insert>
123 + Sync
124 + Send,
125 {
126 FilterReducer::new(self.clone(), reducer)
127 }
128 fn reducer<ReduceFn, Merge>(&self, reducer: ReduceFn) -> Reducer<Self, Merge>
130 where
131 Self: View + Change,
132 ReduceFn: 'static
133 + Fn(Option<<Self as Change>::Value>, Merge) -> <Self as Change>::Insert
134 + Sync
135 + Send,
136 {
137 Reducer::new(self.clone(), reducer)
138 }
139 fn filter_inserter<InsertFn, Insert>(&self, inserter: InsertFn) -> FilterInserter<Self, Insert>
141 where
142 Self: 'static + Change,
143 InsertFn: 'static + Fn(Insert) -> Option<<Self as Change>::Insert> + Sync + Send,
144 {
145 FilterInserter::new(self.clone(), inserter)
146 }
147 fn inserter<InsertFn, Insert>(&self, inserter: InsertFn) -> Inserter<Self, Insert>
149 where
150 Self: 'static + Change,
151 InsertFn: 'static + Fn(Insert) -> <Self as Change>::Insert + Sync + Send,
152 {
153 Inserter::new(self.clone(), inserter)
154 }
155 fn pipe<O>(&self, other: O)
157 where
158 Self: View + Watch,
159 O: Change<Key = Self::Key, Insert = Self::Value> + Watch + Send + Sync,
160 {
161 let sync = other.sync();
162 sync.push_source(self.sync());
163 spawn_listener(sync, self.watch(), move |event| {
164 let (key, value) = match event {
165 Event::Insert { key, value } => (key, Some(value)),
166 Event::Remove { key } => (key, None),
167 };
168 match value {
169 Some(value) => other.insert_ref(&*key, &*value)?,
170 None => other.remove_ref(&*key)?,
171 };
172 Ok(0)
174 });
175 }
176}
177
178impl<T> Operate for T where Self: Clone + Sized + View + Watch + Sync + Send {}