1use anyhow::Result;
2use bus::{Bus, BusReader};
3use parking_lot::RwLock;
4use std::{cmp::Ordering, hash::Hash, sync::Arc};
5
6use crate::{
7 macros::cloned,
8 threads::{spawn_watcher, Synchronizer},
9 traits::{
10 view::View,
11 watch::{Event, Watch, Watcher},
12 },
13};
14
15pub struct Chain<A, B>
36where
37 A: View,
38 B: View<Key = A::Key, Value = A::Value>,
39{
40 a: A,
41 b: B,
42 watcher: Watcher<A::Key, A::Value>,
43 sync: Arc<Synchronizer>,
44}
45impl<A, B> Clone for Chain<A, B>
46where
47 A: View,
48 B: View<Key = A::Key, Value = A::Value>,
49{
50 fn clone(&self) -> Self {
51 Self {
52 a: self.a.clone(),
53 b: self.b.clone(),
54 watcher: self.watcher.clone(),
55 sync: Arc::clone(&self.sync),
56 }
57 }
58}
59
60impl<A, B> Chain<A, B>
61where
62 A: View + Watch + Sync + Send,
63 B: View<Key = <A as View>::Key, Value = <A as View>::Value> + Watch + Sync + Send,
64{
65 pub(crate) fn new(a: A, b: B) -> Self {
66 let sync = Arc::new(Synchronizer::from(vec![a.sync(), b.sync()]));
67 let watcher = Watcher::new(cloned!(sync, a, b, move || {
68 let bus = Arc::new(RwLock::new(Bus::new(128)));
69 let a_reader = a.watch();
70 let b_reader = b.watch();
71 spawn_watcher(
72 Arc::clone(&sync),
73 a_reader,
74 Arc::clone(&bus),
75 cloned!(move |event| {
76 let (key, value) = match event {
77 Event::Insert { key, value } => {
78 (Arc::clone(&key), Some(Arc::clone(&value)))
79 }
80 Event::Remove { key } => {
81 (Arc::clone(&key), b.get_ref(&*key)?.map(Arc::new))
82 }
83 };
84 let event = match value {
85 Some(value) => Event::Insert { key, value },
86 None => Event::Remove { key },
87 };
88 Ok(vec![event])
89 }),
90 );
91 spawn_watcher(
92 sync,
93 b_reader,
94 Arc::clone(&bus),
95 cloned!(move |event| {
96 let (key, value) = match event {
97 Event::Insert { key, value } => {
98 (Arc::clone(&key), Some(Arc::clone(&value)))
99 }
100 Event::Remove { key } => {
101 (Arc::clone(&key), a.get_ref(&*key)?.map(Arc::new))
102 }
103 };
104 let event = match value {
105 Some(value) => Event::Insert { key, value },
106 None => Event::Remove { key },
107 };
108 Ok(vec![event])
109 }),
110 );
111 bus
112 }));
113 Chain {
114 a,
115 b,
116 watcher,
117 sync,
118 }
119 }
120}
121
122impl<A, B> View for Chain<A, B>
123where
124 A: View,
125 B: View<Key = A::Key, Value = A::Value>,
126{
127 type Key = A::Key;
128 type Value = A::Value;
129 type Iter = std::iter::Chain<A::Iter, B::Iter>;
130 fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>> {
131 let a = self.a.get_ref(key)?;
132 if let Some(a) = a {
133 return Ok(Some(a));
134 }
135 let b = self.b.get_ref(key)?;
136 Ok(b)
137 }
138 fn iter(&self) -> Self::Iter {
139 let a = self.a.iter();
140 let b = self.b.iter();
141 a.chain(b)
142 }
143 fn contains_key_ref(&self, key: &Self::Key) -> Result<bool> {
144 Ok(self.a.contains_key_ref(key)? || self.b.contains_key_ref(key)?)
145 }
146 fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
147 where
148 Self::Key: Ord,
149 {
150 let a = self.a.get_lt_ref(key)?;
151 let b = self.b.get_lt_ref(key)?;
152 match (a, b) {
153 (None, None) => Ok(None),
154 (Some(a), None) => Ok(Some(a)),
155 (None, Some(b)) => Ok(Some(b)),
156 (Some(a), Some(b)) => match a.0.cmp(&b.0) {
157 Ordering::Less => Ok(Some(b)),
158 Ordering::Equal => Ok(Some(a)),
159 Ordering::Greater => Ok(Some(a)),
160 },
161 }
162 }
163 fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
164 where
165 Self::Key: Ord,
166 {
167 let a = self.a.get_gt_ref(key)?;
168 let b = self.b.get_gt_ref(key)?;
169 match (a, b) {
170 (None, None) => Ok(None),
171 (Some(a), None) => Ok(Some(a)),
172 (None, Some(b)) => Ok(Some(b)),
173 (Some(a), Some(b)) => match a.0.cmp(&b.0) {
174 Ordering::Less => Ok(Some(a)),
175 Ordering::Equal => Ok(Some(a)),
176 Ordering::Greater => Ok(Some(b)),
177 },
178 }
179 }
180 fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
181 where
182 Self::Key: Ord,
183 {
184 let a = self.a.first()?;
185 let b = self.b.first()?;
186 match (a, b) {
187 (None, None) => Ok(None),
188 (Some(a), None) => Ok(Some(a)),
189 (None, Some(b)) => Ok(Some(b)),
190 (Some(a), Some(b)) => match a.0.cmp(&b.0) {
191 Ordering::Less => Ok(Some(a)),
192 Ordering::Equal => Ok(Some(a)),
193 Ordering::Greater => Ok(Some(b)),
194 },
195 }
196 }
197 fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
198 where
199 Self::Key: Ord,
200 {
201 let a = self.a.last()?;
202 let b = self.b.last()?;
203 match (a, b) {
204 (None, None) => Ok(None),
205 (Some(a), None) => Ok(Some(a)),
206 (None, Some(b)) => Ok(Some(b)),
207 (Some(a), Some(b)) => match a.0.cmp(&b.0) {
208 Ordering::Less => Ok(Some(b)),
209 Ordering::Equal => Ok(Some(a)),
210 Ordering::Greater => Ok(Some(a)),
211 },
212 }
213 }
214 fn is_empty(&self) -> Option<bool> {
215 let a = self.a.is_empty();
216 let b = self.b.is_empty();
217 match (a, b) {
218 (Some(a), Some(b)) => Some(a && b),
219 (Some(a), None) => {
220 if a {
221 None
222 } else {
223 Some(a)
224 }
225 }
226 (None, Some(b)) => {
227 if b {
228 None
229 } else {
230 Some(b)
231 }
232 }
233 (None, None) => None,
234 }
235 }
236 fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter> {
237 let a = (range.start_bound(), range.end_bound());
238 let b = (range.start_bound(), range.end_bound());
239 let a = self.a.range(a)?;
240 let b = self.b.range(b)?;
241 Ok(a.chain(b))
242 }
243}
244
245impl<A, B> Watch for Chain<A, B>
246where
247 A: View + Watch,
248 B: View<Key = A::Key, Value = A::Value> + Watch,
249 <A as View>::Key: Hash + Eq,
250{
251 fn watch(&self) -> BusReader<Event<Self::Key, Self::Value>> {
252 self.watcher.new_reader()
253 }
254 fn db(&self) -> crate::wrappers::database::Db {
255 self.a.db()
256 }
257 fn sync(&self) -> Arc<Synchronizer> {
258 Arc::clone(&self.sync)
259 }
260 fn wait(&self) {
261 self.a.wait();
262 self.b.wait();
263 }
264}