husky/ops/
filter_inserter.rs

1use anyhow::Result;
2
3use delegate::delegate;
4use std::sync::Arc;
5
6use crate::{
7	threads::Synchronizer,
8	traits::{
9		change::Change,
10		view::View,
11		watch::{Event, Watch},
12	},
13	wrappers::database::Db,
14};
15
16type InsertFn<P, M> = dyn Fn(M) -> Option<<P as Change>::Insert> + Send + Sync;
17
18/// A struct that changes the insert type.
19/// You can create an [FilterInserter] from a [Change] struct.
20///
21/// # Examples
22/// ```
23/// # use husky::{Tree, View, Change, Operate};
24/// # let db = husky::open_temp().unwrap();
25/// # let tree: Tree<String, i32> = db.open_tree("tree").unwrap();
26/// let inserter = tree.filter_inserter(|insert: i32| if insert < 10 { Some(insert) } else { None });
27///
28/// tree.insert("key", 9).unwrap();
29///
30/// let result = tree.get("key").unwrap();
31/// assert_eq!(result, Some(9));
32///
33/// inserter.insert("key", 11).unwrap();
34///
35/// let result = tree.get("key").unwrap();
36/// assert_eq!(result, None);
37/// ```
38pub struct FilterInserter<Previous, Merge>
39where
40	Previous: Change,
41{
42	inserter: Arc<InsertFn<Previous, Merge>>,
43	from: Previous,
44}
45impl<P: Clone + View + Change, M> Clone for FilterInserter<P, M> {
46	fn clone(&self) -> Self {
47		Self {
48			inserter: Arc::clone(&self.inserter),
49			from: self.from.clone(),
50		}
51	}
52}
53
54impl<P, Insert> FilterInserter<P, Insert>
55where
56	P: Change,
57{
58	pub(crate) fn new<ReduceFn>(from: P, inserter: ReduceFn) -> Self
59	where
60		ReduceFn: 'static + Fn(Insert) -> Option<<P as Change>::Insert> + Send + Sync,
61		P: 'static + Sync + Send,
62	{
63		let inserter = Arc::new(inserter);
64		FilterInserter { from, inserter }
65	}
66}
67
68impl<Previous, Merge> View for FilterInserter<Previous, Merge>
69where
70	Previous: View + Change,
71	Merge: 'static + Clone + Send + Sync,
72{
73	type Key = <Previous as View>::Key;
74	type Value = <Previous as View>::Value;
75	type Iter = Previous::Iter;
76  #[rustfmt::skip]
77	delegate!(
78    to self.from {
79      fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>>;
80      fn iter(&self) -> Self::Iter;
81      fn contains_key_ref(&self, key: &Self::Key) -> Result<bool>;
82      fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
83      where
84        Self::Key: Ord;
85      fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
86      where
87        Self::Key: Ord;
88      fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
89      where
90        Self::Key: Ord;
91      fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
92      where
93        Self::Key: Ord;
94      fn is_empty(&self) -> Option<bool>;
95      fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter>;
96    }
97  );
98}
99impl<Previous, Merge> Change for FilterInserter<Previous, Merge>
100where
101	Previous: Change,
102	Merge: 'static + Clone + Send + Sync,
103{
104	type Key = <Previous as Change>::Key;
105	type Value = <Previous as Change>::Value;
106	type Insert = Merge;
107	fn insert_owned(
108		&self,
109		key: Self::Key,
110		value: Self::Insert,
111	) -> Result<Option<<Self as Change>::Value>> {
112		let v = (self.inserter)(value);
113		match v {
114			Some(v) => self.from.insert_owned(key, v),
115			None => self.from.remove_owned(key),
116		}
117	}
118  fn fetch_and_update(
119    &self,
120    key: &Self::Key,
121    mut f: impl FnMut(Option<Self::Value>) -> Option<Self::Insert>,
122  ) -> Result<Option<Self::Value>> {
123    self.from.fetch_and_update(key, |v| f(v).and_then(|v| (self.inserter)(v)))
124  }
125  #[rustfmt::skip]
126	delegate! {
127    to self.from {
128      fn clear(&self) -> Result<()>;
129      fn remove_owned(&self, key: <Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
130      fn remove_ref(&self, key: &<Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
131	  }
132	}
133}
134impl<Previous, Merge> Watch for FilterInserter<Previous, Merge>
135where
136	Previous: Change + Watch,
137	Merge: 'static + Clone + Send + Sync,
138{
139	#[rustfmt::skip]
140	delegate!(
141    to self.from {
142      fn watch(&self) -> bus::BusReader<Event<Self::Key, Self::Value>>;
143      fn db(&self) -> Db;
144      fn sync(&self) -> Arc<Synchronizer>;
145      fn wait(&self);
146    }
147  );
148}