husky/ops/
filter_inserter.rs1use anyhow::Result;
2
3use delegate::delegate;
4use std::sync::Arc;
5
6use crate::{
7 threads::Synchronizer,
8 traits::{
9 change::Change,
10 view::View,
11 watch::{Event, Watch},
12 },
13 wrappers::database::Db,
14};
15
16type InsertFn<P, M> = dyn Fn(M) -> Option<<P as Change>::Insert> + Send + Sync;
17
18pub struct FilterInserter<Previous, Merge>
39where
40 Previous: Change,
41{
42 inserter: Arc<InsertFn<Previous, Merge>>,
43 from: Previous,
44}
45impl<P: Clone + View + Change, M> Clone for FilterInserter<P, M> {
46 fn clone(&self) -> Self {
47 Self {
48 inserter: Arc::clone(&self.inserter),
49 from: self.from.clone(),
50 }
51 }
52}
53
54impl<P, Insert> FilterInserter<P, Insert>
55where
56 P: Change,
57{
58 pub(crate) fn new<ReduceFn>(from: P, inserter: ReduceFn) -> Self
59 where
60 ReduceFn: 'static + Fn(Insert) -> Option<<P as Change>::Insert> + Send + Sync,
61 P: 'static + Sync + Send,
62 {
63 let inserter = Arc::new(inserter);
64 FilterInserter { from, inserter }
65 }
66}
67
68impl<Previous, Merge> View for FilterInserter<Previous, Merge>
69where
70 Previous: View + Change,
71 Merge: 'static + Clone + Send + Sync,
72{
73 type Key = <Previous as View>::Key;
74 type Value = <Previous as View>::Value;
75 type Iter = Previous::Iter;
76 #[rustfmt::skip]
77 delegate!(
78 to self.from {
79 fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>>;
80 fn iter(&self) -> Self::Iter;
81 fn contains_key_ref(&self, key: &Self::Key) -> Result<bool>;
82 fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
83 where
84 Self::Key: Ord;
85 fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
86 where
87 Self::Key: Ord;
88 fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
89 where
90 Self::Key: Ord;
91 fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
92 where
93 Self::Key: Ord;
94 fn is_empty(&self) -> Option<bool>;
95 fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter>;
96 }
97 );
98}
99impl<Previous, Merge> Change for FilterInserter<Previous, Merge>
100where
101 Previous: Change,
102 Merge: 'static + Clone + Send + Sync,
103{
104 type Key = <Previous as Change>::Key;
105 type Value = <Previous as Change>::Value;
106 type Insert = Merge;
107 fn insert_owned(
108 &self,
109 key: Self::Key,
110 value: Self::Insert,
111 ) -> Result<Option<<Self as Change>::Value>> {
112 let v = (self.inserter)(value);
113 match v {
114 Some(v) => self.from.insert_owned(key, v),
115 None => self.from.remove_owned(key),
116 }
117 }
118 fn fetch_and_update(
119 &self,
120 key: &Self::Key,
121 mut f: impl FnMut(Option<Self::Value>) -> Option<Self::Insert>,
122 ) -> Result<Option<Self::Value>> {
123 self.from.fetch_and_update(key, |v| f(v).and_then(|v| (self.inserter)(v)))
124 }
125 #[rustfmt::skip]
126 delegate! {
127 to self.from {
128 fn clear(&self) -> Result<()>;
129 fn remove_owned(&self, key: <Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
130 fn remove_ref(&self, key: &<Self as Change>::Key) -> Result<Option<<Self as Change>::Value>>;
131 }
132 }
133}
134impl<Previous, Merge> Watch for FilterInserter<Previous, Merge>
135where
136 Previous: Change + Watch,
137 Merge: 'static + Clone + Send + Sync,
138{
139 #[rustfmt::skip]
140 delegate!(
141 to self.from {
142 fn watch(&self) -> bus::BusReader<Event<Self::Key, Self::Value>>;
143 fn db(&self) -> Db;
144 fn sync(&self) -> Arc<Synchronizer>;
145 fn wait(&self);
146 }
147 );
148}