husky/ops/
chain.rs

1use anyhow::Result;
2use bus::{Bus, BusReader};
3use parking_lot::RwLock;
4use std::{cmp::Ordering, hash::Hash, sync::Arc};
5
6use crate::{
7	macros::cloned,
8	threads::{spawn_watcher, Synchronizer},
9	traits::{
10		view::View,
11		watch::{Event, Watch, Watcher},
12	},
13};
14
15/// A struct that chains two trees.
16/// You can create a [Chain] from two [View] structs, as long as they have the same keys and values.
17/// It gives preference to the first tree.
18/// # Examples
19/// ```
20/// # use husky::{Tree, Change, View, Operate};
21/// # let db = husky::open_temp().unwrap();
22/// # let a_tree: Tree<String, String> = db.open_tree("a").unwrap();
23/// # let b_tree: Tree<String, String> = db.open_tree("b").unwrap();
24///
25/// let chain = a_tree.chain(&b_tree);
26///
27/// b_tree.insert("key", "b").unwrap();
28/// let result = chain.get("key").unwrap();
29/// assert_eq!(result, Some("b".to_string()));
30///
31/// a_tree.insert("key", "a").unwrap();
32/// let result = chain.get("key").unwrap();
33/// assert_eq!(result, Some("a".to_string()));
34/// ```
35pub struct Chain<A, B>
36where
37	A: View,
38	B: View<Key = A::Key, Value = A::Value>,
39{
40	a: A,
41	b: B,
42	watcher: Watcher<A::Key, A::Value>,
43	sync: Arc<Synchronizer>,
44}
45impl<A, B> Clone for Chain<A, B>
46where
47	A: View,
48	B: View<Key = A::Key, Value = A::Value>,
49{
50	fn clone(&self) -> Self {
51		Self {
52			a: self.a.clone(),
53			b: self.b.clone(),
54			watcher: self.watcher.clone(),
55			sync: Arc::clone(&self.sync),
56		}
57	}
58}
59
60impl<A, B> Chain<A, B>
61where
62	A: View + Watch + Sync + Send,
63	B: View<Key = <A as View>::Key, Value = <A as View>::Value> + Watch + Sync + Send,
64{
65	pub(crate) fn new(a: A, b: B) -> Self {
66		let sync = Arc::new(Synchronizer::from(vec![a.sync(), b.sync()]));
67		let watcher = Watcher::new(cloned!(sync, a, b, move || {
68			let bus = Arc::new(RwLock::new(Bus::new(128)));
69			let a_reader = a.watch();
70			let b_reader = b.watch();
71			spawn_watcher(
72				Arc::clone(&sync),
73				a_reader,
74				Arc::clone(&bus),
75				cloned!(move |event| {
76					let (key, value) = match event {
77						Event::Insert { key, value } => {
78							(Arc::clone(&key), Some(Arc::clone(&value)))
79						}
80						Event::Remove { key } => {
81							(Arc::clone(&key), b.get_ref(&*key)?.map(Arc::new))
82						}
83					};
84					let event = match value {
85						Some(value) => Event::Insert { key, value },
86						None => Event::Remove { key },
87					};
88					Ok(vec![event])
89				}),
90			);
91			spawn_watcher(
92				sync,
93				b_reader,
94				Arc::clone(&bus),
95				cloned!(move |event| {
96					let (key, value) = match event {
97						Event::Insert { key, value } => {
98							(Arc::clone(&key), Some(Arc::clone(&value)))
99						}
100						Event::Remove { key } => {
101							(Arc::clone(&key), a.get_ref(&*key)?.map(Arc::new))
102						}
103					};
104					let event = match value {
105						Some(value) => Event::Insert { key, value },
106						None => Event::Remove { key },
107					};
108					Ok(vec![event])
109				}),
110			);
111			bus
112		}));
113		Chain {
114			a,
115			b,
116			watcher,
117			sync,
118		}
119	}
120}
121
122impl<A, B> View for Chain<A, B>
123where
124	A: View,
125	B: View<Key = A::Key, Value = A::Value>,
126{
127	type Key = A::Key;
128	type Value = A::Value;
129	type Iter = std::iter::Chain<A::Iter, B::Iter>;
130	fn get_ref(&self, key: &Self::Key) -> Result<Option<Self::Value>> {
131		let a = self.a.get_ref(key)?;
132		if let Some(a) = a {
133			return Ok(Some(a));
134		}
135		let b = self.b.get_ref(key)?;
136		Ok(b)
137	}
138	fn iter(&self) -> Self::Iter {
139		let a = self.a.iter();
140		let b = self.b.iter();
141		a.chain(b)
142	}
143	fn contains_key_ref(&self, key: &Self::Key) -> Result<bool> {
144		Ok(self.a.contains_key_ref(key)? || self.b.contains_key_ref(key)?)
145	}
146	fn get_lt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
147	where
148		Self::Key: Ord,
149	{
150		let a = self.a.get_lt_ref(key)?;
151		let b = self.b.get_lt_ref(key)?;
152		match (a, b) {
153			(None, None) => Ok(None),
154			(Some(a), None) => Ok(Some(a)),
155			(None, Some(b)) => Ok(Some(b)),
156			(Some(a), Some(b)) => match a.0.cmp(&b.0) {
157				Ordering::Less => Ok(Some(b)),
158				Ordering::Equal => Ok(Some(a)),
159				Ordering::Greater => Ok(Some(a)),
160			},
161		}
162	}
163	fn get_gt_ref(&self, key: &Self::Key) -> Result<Option<(Self::Key, Self::Value)>>
164	where
165		Self::Key: Ord,
166	{
167		let a = self.a.get_gt_ref(key)?;
168		let b = self.b.get_gt_ref(key)?;
169		match (a, b) {
170			(None, None) => Ok(None),
171			(Some(a), None) => Ok(Some(a)),
172			(None, Some(b)) => Ok(Some(b)),
173			(Some(a), Some(b)) => match a.0.cmp(&b.0) {
174				Ordering::Less => Ok(Some(a)),
175				Ordering::Equal => Ok(Some(a)),
176				Ordering::Greater => Ok(Some(b)),
177			},
178		}
179	}
180	fn first(&self) -> Result<Option<(Self::Key, Self::Value)>>
181	where
182		Self::Key: Ord,
183	{
184		let a = self.a.first()?;
185		let b = self.b.first()?;
186		match (a, b) {
187			(None, None) => Ok(None),
188			(Some(a), None) => Ok(Some(a)),
189			(None, Some(b)) => Ok(Some(b)),
190			(Some(a), Some(b)) => match a.0.cmp(&b.0) {
191				Ordering::Less => Ok(Some(a)),
192				Ordering::Equal => Ok(Some(a)),
193				Ordering::Greater => Ok(Some(b)),
194			},
195		}
196	}
197	fn last(&self) -> Result<Option<(Self::Key, Self::Value)>>
198	where
199		Self::Key: Ord,
200	{
201		let a = self.a.last()?;
202		let b = self.b.last()?;
203		match (a, b) {
204			(None, None) => Ok(None),
205			(Some(a), None) => Ok(Some(a)),
206			(None, Some(b)) => Ok(Some(b)),
207			(Some(a), Some(b)) => match a.0.cmp(&b.0) {
208				Ordering::Less => Ok(Some(b)),
209				Ordering::Equal => Ok(Some(a)),
210				Ordering::Greater => Ok(Some(a)),
211			},
212		}
213	}
214	fn is_empty(&self) -> Option<bool> {
215		let a = self.a.is_empty();
216		let b = self.b.is_empty();
217		match (a, b) {
218			(Some(a), Some(b)) => Some(a && b),
219			(Some(a), None) => {
220				if a {
221					None
222				} else {
223					Some(a)
224				}
225			}
226			(None, Some(b)) => {
227				if b {
228					None
229				} else {
230					Some(b)
231				}
232			}
233			(None, None) => None,
234		}
235	}
236	fn range(&self, range: impl std::ops::RangeBounds<Self::Key>) -> Result<Self::Iter> {
237		let a = (range.start_bound(), range.end_bound());
238		let b = (range.start_bound(), range.end_bound());
239		let a = self.a.range(a)?;
240		let b = self.b.range(b)?;
241		Ok(a.chain(b))
242	}
243}
244
245impl<A, B> Watch for Chain<A, B>
246where
247	A: View + Watch,
248	B: View<Key = A::Key, Value = A::Value> + Watch,
249	<A as View>::Key: Hash + Eq,
250{
251	fn watch(&self) -> BusReader<Event<Self::Key, Self::Value>> {
252		self.watcher.new_reader()
253	}
254	fn db(&self) -> crate::wrappers::database::Db {
255		self.a.db()
256	}
257	fn sync(&self) -> Arc<Synchronizer> {
258		Arc::clone(&self.sync)
259	}
260	fn wait(&self) {
261		self.a.wait();
262		self.b.wait();
263	}
264}