1use anyhow::Result;
2use bus::Bus;
3use delegate::delegate;
4use parking_lot::RwLock;
5use std::sync::Arc;
6
7use crate::{
8 macros::{cloned, unwrap_or_return},
9 threads::{spawn_watcher, Synchronizer},
10 traits::{
11 change::Change,
12 view::View,
13 watch::{Event, Watch, Watcher},
14 },
15 wrappers::database::Db,
16};
17
18type FilterOp<K, V> = dyn Fn(&K, &V) -> bool + Send + Sync;
19
20pub struct Filter<Previous>
35where
36 Previous: View,
37{
38 filter: Arc<FilterOp<Previous::Key, Previous::Value>>,
39 from: Previous,
40 watcher: Watcher<Previous::Key, Previous::Value>,
41 sync: Arc<Synchronizer>,
42}
43impl<P: View> Clone for Filter<P> {
44 fn clone(&self) -> Self {
45 Self {
46 filter: Arc::clone(&self.filter),
47 from: self.from.clone(),
48 watcher: self.watcher.clone(),
49 sync: Arc::clone(&self.sync),
50 }
51 }
52}
53
54impl<P> Filter<P>
55where
56 P: View + Watch,
57{
58 pub(crate) fn new<F>(from: P, filter: F) -> Self
59 where
60 F: 'static + Fn(&P::Key, &P::Value) -> bool + Sync + Send,
61 P: 'static + Sync + Send,
62 {
63 let filter = Arc::new(filter);
64 let sync = Arc::new(Synchronizer::from(vec![from.sync()]));
65 let watcher = Watcher::new(cloned!(sync, from, filter, move || {
66 let bus = Arc::new(RwLock::new(Bus::new(128)));
67 let previous = from.watch();
68 spawn_watcher(
69 sync,
70 previous,
71 Arc::clone(&bus),
72 cloned!(filter, move |event| {
73 let (key, value) = match event {
74 Event::Insert { key, value } => (Arc::clone(&key), Some(value)),
75 Event::Remove { key } => (Arc::clone(&key), None),
76 };
77 let value = match value {
78 Some(value) if filter(&key, &*value) => Some(Arc::clone(&value)),
79 _ => None,
80 };
81 let event = match value {
82 Some(value) => Event::Insert { key, value },
83 _ => Event::Remove { key },
84 };
85 Ok(vec![event])
86 }),
87 );
88 bus
89 }));
90 Filter {
91 from,
92 filter,
93 sync,
94 watcher,
95 }
96 }
97}
98
99impl<Previous> View for Filter<Previous>
100where
101 Previous: View,
102{
103 type Key = Previous::Key;
104 type Value = Previous::Value;
105 type Iter = Box<dyn Iterator<Item = Result<(Self::Key, Self::Value)>>>;
106 fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>> {
107 let v = self.from.get_ref(key)?;
108 let v = unwrap_or_return!(v);
109 let filter = (self.filter)(key, &v);
110 if filter {
111 Ok(Some(v))
112 } else {
113 Ok(None)
114 }
115 }
116 fn iter(&self) -> Self::Iter {
117 let filter = Arc::clone(&self.filter);
118 Box::new(self.from.clone().iter().filter_map(move |r| match r {
119 Ok((k, v)) => {
120 if filter(&k, &v) {
121 Some(Ok((k, v)))
122 } else {
123 None
124 }
125 }
126 Err(e) => Some(Err(e)),
127 }))
128 }
129 fn contains_key_ref(&self, key: &Self::Key) -> Result<bool> {
130 let c = self.from.contains_key_ref(key)?;
131 if !c {
132 return Ok(false);
133 };
134 let v = self.from.get_ref(key)?;
135 let v = if let Some(v) = v { v } else { return Ok(false) };
136 let filter = (self.filter)(key, &v);
137 Ok(filter)
138 }
139 fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
140 where
141 Self::Key: Ord,
142 {
143 let v = self.from.get_lt_ref(key)?;
144 let v = if let Some(v) = v { v } else { return Ok(None) };
145 let filter = (self.filter)(&v.0, &v.1);
146 if filter {
147 Ok(Some(v))
148 } else {
149 Ok(None)
150 }
151 }
152 fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
153 where
154 Self::Key: Ord,
155 {
156 let v = self.from.get_gt_ref(key)?;
157 let v = if let Some(v) = v { v } else { return Ok(None) };
158 let filter = (self.filter)(&v.0, &v.1);
159 if filter {
160 Ok(Some(v))
161 } else {
162 Ok(None)
163 }
164 }
165 fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
166 where
167 Self::Key: Ord,
168 {
169 let v = self.from.first()?;
170 let v = if let Some(v) = v { v } else { return Ok(None) };
171 let filter = (self.filter)(&v.0, &v.1);
172 if filter {
173 Ok(Some(v))
174 } else {
175 Ok(None)
176 }
177 }
178 fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
179 where
180 Self::Key: Ord,
181 {
182 let v = self.from.last()?;
183 let v = if let Some(v) = v { v } else { return Ok(None) };
184 let filter = (self.filter)(&v.0, &v.1);
185 if filter {
186 Ok(Some(v))
187 } else {
188 Ok(None)
189 }
190 }
191 fn is_empty(&self) -> Option<bool> {
192 let e = self.from.is_empty();
193 if e == Some(true) {
194 e
195 } else {
196 None
197 }
198 }
199 fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter> {
200 let filter = Arc::clone(&self.filter);
201 let iter = self.from.range(range)?;
202 Ok(Box::new(iter.filter_map(move |r| match r {
203 Ok((k, v)) => {
204 if filter(&k, &v) {
205 Some(Ok((k, v)))
206 } else {
207 None
208 }
209 }
210 Err(e) => Some(Err(e)),
211 })))
212 }
213}
214impl<Previous> Change for Filter<Previous>
215where
216 Previous: View + Change,
217{
218 type Key = <Previous as Change>::Key;
219 type Value = <Previous as Change>::Value;
220 type Insert = <Previous as Change>::Insert;
221 #[rustfmt::skip]
222 delegate! {
223 to self.from {
224 fn insert_owned(&self, key: Self::Key, value: Self::Insert) -> Result<Option<<Self as Change>::Value>>;
225 fn insert_ref(&self, key: &Self::Key, value: &Self::Insert) -> Result<Option<<Self as Change>::Value>>;
226 fn remove_owned(&self, key: <Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
227 fn remove_ref(&self, key: &<Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
228 fn clear(&self) -> Result<()>;
229 fn fetch_and_update(
230 &self,
231 key: &Self::Key,
232 f: impl FnMut(Option<Self::Value>) -> Option<Self::Insert>,
233 ) -> Result<Option<Self::Value>>;
234 }
235 }
236}
237impl<Previous> Watch for Filter<Previous>
238where
239 Previous: View + Watch,
240{
241 fn watch(&self) -> bus::BusReader<Event<Self::Key, Self::Value>> {
242 self.watcher.new_reader()
243 }
244 fn db(&self) -> Db {
245 self.from.db()
246 }
247 fn sync(&self) -> Arc<Synchronizer> {
248 Arc::clone(&self.sync)
249 }
250 fn wait(&self) {
251 self.from.wait()
252 }
253}