husky/ops/
filter.rs

1use anyhow::Result;
2use bus::Bus;
3use delegate::delegate;
4use parking_lot::RwLock;
5use std::sync::Arc;
6
7use crate::{
8	macros::{cloned, unwrap_or_return},
9	threads::{spawn_watcher, Synchronizer},
10	traits::{
11		change::Change,
12		view::View,
13		watch::{Event, Watch, Watcher},
14	},
15	wrappers::database::Db,
16};
17
18type FilterOp<K, V> = dyn Fn(&K, &V) -> bool + Send + Sync;
19
20/// A struct that filters values.
21/// You can create a [Filter] from a [View] struct.
22/// # Examples
23/// ```
24/// # use husky::{Tree, View, Change, Operate};
25/// # let db = husky::open_temp().unwrap();
26/// # let tree: Tree<String, u32> = db.open_tree("tree").unwrap();
27/// let filter = tree.filter(|_, v| *v > 2);
28///
29/// tree.insert("key", 2u32).unwrap();
30///
31/// let result = filter.get("key").unwrap();
32/// assert_eq!(result, None);
33/// ```
34pub struct Filter<Previous>
35where
36	Previous: View,
37{
38	filter: Arc<FilterOp<Previous::Key, Previous::Value>>,
39	from: Previous,
40	watcher: Watcher<Previous::Key, Previous::Value>,
41	sync: Arc<Synchronizer>,
42}
43impl<P: View> Clone for Filter<P> {
44	fn clone(&self) -> Self {
45		Self {
46			filter: Arc::clone(&self.filter),
47			from: self.from.clone(),
48			watcher: self.watcher.clone(),
49			sync: Arc::clone(&self.sync),
50		}
51	}
52}
53
54impl<P> Filter<P>
55where
56	P: View + Watch,
57{
58	pub(crate) fn new<F>(from: P, filter: F) -> Self
59	where
60		F: 'static + Fn(&P::Key, &P::Value) -> bool + Sync + Send,
61		P: 'static + Sync + Send,
62	{
63		let filter = Arc::new(filter);
64		let sync = Arc::new(Synchronizer::from(vec![from.sync()]));
65		let watcher = Watcher::new(cloned!(sync, from, filter, move || {
66			let bus = Arc::new(RwLock::new(Bus::new(128)));
67			let previous = from.watch();
68			spawn_watcher(
69				sync,
70				previous,
71				Arc::clone(&bus),
72				cloned!(filter, move |event| {
73					let (key, value) = match event {
74						Event::Insert { key, value } => (Arc::clone(&key), Some(value)),
75						Event::Remove { key } => (Arc::clone(&key), None),
76					};
77					let value = match value {
78						Some(value) if filter(&key, &*value) => Some(Arc::clone(&value)),
79						_ => None,
80					};
81					let event = match value {
82						Some(value) => Event::Insert { key, value },
83						_ => Event::Remove { key },
84					};
85					Ok(vec![event])
86				}),
87			);
88			bus
89		}));
90		Filter {
91			from,
92			filter,
93			sync,
94			watcher,
95		}
96	}
97}
98
99impl<Previous> View for Filter<Previous>
100where
101	Previous: View,
102{
103	type Key = Previous::Key;
104	type Value = Previous::Value;
105	type Iter = Box<dyn Iterator<Item = Result<(Self::Key, Self::Value)>>>;
106	fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>> {
107		let v = self.from.get_ref(key)?;
108		let v = unwrap_or_return!(v);
109		let filter = (self.filter)(key, &v);
110		if filter {
111			Ok(Some(v))
112		} else {
113			Ok(None)
114		}
115	}
116	fn iter(&self) -> Self::Iter {
117		let filter = Arc::clone(&self.filter);
118		Box::new(self.from.clone().iter().filter_map(move |r| match r {
119			Ok((k, v)) => {
120				if filter(&k, &v) {
121					Some(Ok((k, v)))
122				} else {
123					None
124				}
125			}
126			Err(e) => Some(Err(e)),
127		}))
128	}
129	fn contains_key_ref(&self, key: &Self::Key) -> Result<bool> {
130		let c = self.from.contains_key_ref(key)?;
131		if !c {
132			return Ok(false);
133		};
134		let v = self.from.get_ref(key)?;
135		let v = if let Some(v) = v { v } else { return Ok(false) };
136		let filter = (self.filter)(key, &v);
137		Ok(filter)
138	}
139	fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
140	where
141		Self::Key: Ord,
142	{
143		let v = self.from.get_lt_ref(key)?;
144		let v = if let Some(v) = v { v } else { return Ok(None) };
145		let filter = (self.filter)(&v.0, &v.1);
146		if filter {
147			Ok(Some(v))
148		} else {
149			Ok(None)
150		}
151	}
152	fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
153	where
154		Self::Key: Ord,
155	{
156		let v = self.from.get_gt_ref(key)?;
157		let v = if let Some(v) = v { v } else { return Ok(None) };
158		let filter = (self.filter)(&v.0, &v.1);
159		if filter {
160			Ok(Some(v))
161		} else {
162			Ok(None)
163		}
164	}
165	fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
166	where
167		Self::Key: Ord,
168	{
169		let v = self.from.first()?;
170		let v = if let Some(v) = v { v } else { return Ok(None) };
171		let filter = (self.filter)(&v.0, &v.1);
172		if filter {
173			Ok(Some(v))
174		} else {
175			Ok(None)
176		}
177	}
178	fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
179	where
180		Self::Key: Ord,
181	{
182		let v = self.from.last()?;
183		let v = if let Some(v) = v { v } else { return Ok(None) };
184		let filter = (self.filter)(&v.0, &v.1);
185		if filter {
186			Ok(Some(v))
187		} else {
188			Ok(None)
189		}
190	}
191	fn is_empty(&self) -> Option<bool> {
192		let e = self.from.is_empty();
193		if e == Some(true) {
194			e
195		} else {
196			None
197		}
198	}
199	fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter> {
200		let filter = Arc::clone(&self.filter);
201		let iter = self.from.range(range)?;
202		Ok(Box::new(iter.filter_map(move |r| match r {
203			Ok((k, v)) => {
204				if filter(&k, &v) {
205					Some(Ok((k, v)))
206				} else {
207					None
208				}
209			}
210			Err(e) => Some(Err(e)),
211		})))
212	}
213}
214impl<Previous> Change for Filter<Previous>
215where
216	Previous: View + Change,
217{
218	type Key = <Previous as Change>::Key;
219	type Value = <Previous as Change>::Value;
220	type Insert = <Previous as Change>::Insert;
221  #[rustfmt::skip]
222	delegate! {
223	  to self.from {
224      fn insert_owned(&self, key: Self::Key, value: Self::Insert) -> Result<Option<<Self as Change>::Value>>;
225      fn insert_ref(&self, key: &Self::Key, value: &Self::Insert) -> Result<Option<<Self as Change>::Value>>;
226      fn remove_owned(&self, key: <Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
227      fn remove_ref(&self, key: &<Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
228      fn clear(&self) -> Result<()>;
229      fn fetch_and_update(
230        &self,
231        key: &Self::Key,
232        f: impl FnMut(Option<Self::Value>) -> Option<Self::Insert>,
233      ) -> Result<Option<Self::Value>>;
234	  }
235	}
236}
237impl<Previous> Watch for Filter<Previous>
238where
239	Previous: View + Watch,
240{
241	fn watch(&self) -> bus::BusReader<Event<Self::Key, Self::Value>> {
242		self.watcher.new_reader()
243	}
244	fn db(&self) -> Db {
245		self.from.db()
246	}
247	fn sync(&self) -> Arc<Synchronizer> {
248		Arc::clone(&self.sync)
249	}
250	fn wait(&self) {
251		self.from.wait()
252	}
253}