db/storage/kvs/sled/
tx.rs

1use async_trait::async_trait;
2use sled::{IVec, Iter};
3
4use crate::{
5	interface::{Key, KeyValuePair, Val},
6	DBTransaction, Error, SimpleTransaction, TagBucket,
7};
8
9use super::ty::{DBType, TxType};
10
11fn filter_with_prefix(iterator: Iter, prefix: Vec<u8>) -> impl Iterator<Item = (IVec, IVec)> {
12	iterator
13		.filter(move |item| -> bool {
14			if let Ok((k, _)) = item.clone() {
15				return k.starts_with(&prefix);
16			}
17			false
18		})
19		.map(move |item| item.unwrap())
20}
21
22fn filter_with_suffix(iterator: Iter, prefix: Vec<u8>) -> impl Iterator<Item = (IVec, IVec)> {
23	iterator
24		.filter(move |item| -> bool {
25			if let Ok((k, _)) = item.clone() {
26				return k.ends_with(&prefix);
27			};
28			false
29		})
30		.map(move |item| item.unwrap())
31}
32
33#[async_trait(?Send)]
34impl SimpleTransaction for DBTransaction<DBType, TxType> {
35	fn closed(&self) -> bool {
36		self.ok
37	}
38
39	async fn count(&mut self, tags: TagBucket) -> Result<usize, Error> {
40		if self.closed() {
41			return Err(Error::TxFinished);
42		}
43
44		let db = &self._db;
45		let tree_name = tags.get("tree");
46		let result = if let Some(t) = tree_name {
47			let tree = db.open_tree(t).unwrap();
48			tree.len()
49		} else {
50			db.len()
51		};
52		Ok(result)
53	}
54
55	async fn cancel(&mut self) -> Result<(), Error> {
56		if self.ok {
57			return Err(Error::TxFinished);
58		}
59
60		self.ok = true;
61
62		Ok(())
63	}
64
65	async fn commit(&mut self) -> Result<(), Error> {
66		if self.closed() {
67			return Err(Error::TxFinished);
68		}
69
70		// Check to see if transaction is writable
71		if !self.writable {
72			return Err(Error::TxReadonly);
73		}
74
75		// Mark this transaction as done
76		self.ok = true;
77
78		Ok(())
79	}
80
81	async fn exi<K>(&self, key: K, tags: TagBucket) -> Result<bool, Error>
82	where
83		K: Into<Key> + Send,
84	{
85		if self.closed() {
86			return Err(Error::TxFinished);
87		}
88
89		let db = &self._db;
90		let tree_name = tags.get("tree");
91		let key = key.into();
92		let k = key.as_slice();
93		let result = if let Some(t) = tree_name {
94			let tree = db.open_tree(t).unwrap();
95			tree.contains_key(k)
96		} else {
97			db.contains_key(k)
98		};
99		Ok(result.unwrap())
100	}
101	// Fetch a key from the database [column family]
102	async fn get<K>(&self, key: K, tags: TagBucket) -> Result<Option<Val>, Error>
103	where
104		K: Into<Key> + Send,
105	{
106		if self.closed() {
107			return Err(Error::TxFinished);
108		}
109
110		let db = &self._db;
111		let tree_name = tags.get("tree");
112		let key = key.into();
113		let k = key.as_slice();
114		let result = if let Some(t) = tree_name {
115			let tree = db.open_tree(t).unwrap();
116			tree.get(k)
117		} else {
118			db.get(k)
119		};
120		Ok(result.unwrap().map(|v| v.to_vec()))
121	}
122	// Insert or update a key in the database
123	async fn set<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
124	where
125		K: Into<Key> + Send,
126		V: Into<Key> + Send,
127	{
128		if self.closed() {
129			return Err(Error::TxFinished);
130		}
131
132		if !self.writable {
133			return Err(Error::TxReadonly);
134		}
135
136		let db = &self._db;
137		let tree_name = tags.get("tree");
138		let key = key.into();
139		let k = key.as_slice();
140		let v = val.into();
141		if let Some(t) = tree_name {
142			let tree = db.open_tree(t).unwrap();
143			tree.insert(k, v).unwrap()
144		} else {
145			db.insert(k, v).unwrap()
146		};
147		Ok(())
148	}
149
150	// Insert a key if it doesn't exist in the database
151	async fn put<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
152	where
153		K: Into<Key> + Send,
154		V: Into<Key> + Send,
155	{
156		if self.closed() {
157			return Err(Error::TxFinished);
158		}
159
160		// Check to see if transaction is writable
161		if !self.writable {
162			return Err(Error::TxReadonly);
163		}
164
165		let db = &self._db;
166		let tree_name = tags.get("tree");
167		let key = key.into();
168		let k = key.as_slice();
169		let v = val.into();
170		let is_exists = if let Some(t) = tree_name.clone() {
171			let tree = db.open_tree(t).unwrap();
172			tree.contains_key(k)
173		} else {
174			db.contains_key(k)
175		}
176		.unwrap();
177
178		match is_exists {
179			false => if let Some(t) = tree_name {
180				let tree = db.open_tree(t).unwrap();
181				tree.insert(k, v)
182			} else {
183				db.insert(k, v)
184			}
185			.unwrap(),
186			_ => return Err(Error::TxConditionNotMet),
187		};
188
189		Ok(())
190	}
191
192	// Delete a key
193	async fn del<K>(&mut self, key: K, tags: TagBucket) -> Result<(), Error>
194	where
195		K: Into<Key> + Send,
196	{
197		if self.closed() {
198			return Err(Error::TxFinished);
199		}
200
201		// Check to see if transaction is writable
202		if !self.writable {
203			return Err(Error::TxReadonly);
204		}
205
206		let db = &self._db;
207		let tree_name = tags.get("tree");
208		let key = key.into();
209		let k = key.as_slice();
210		if let Some(t) = tree_name {
211			let tree = db.open_tree(t).unwrap();
212			tree.remove(k)
213		} else {
214			db.remove(k)
215		}
216		.unwrap();
217
218		Ok(())
219	}
220
221	async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
222		if self.closed() {
223			return Err(Error::TxFinished);
224		}
225
226		let db = &self._db;
227		let tree_name = tags.get("tree");
228		let iter = if let Some(t) = tree_name {
229			let tree = db.open_tree(t).unwrap();
230			tree.iter()
231		} else {
232			db.iter()
233		};
234
235		Ok(iter
236			.map(|p| {
237				let (k, v) = p.unwrap();
238				Ok((k.to_vec(), v.to_vec()))
239			})
240			.collect())
241	}
242
243	async fn prefix_iterate<P>(
244		&self,
245		prefix: P,
246		tags: TagBucket,
247	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
248	where
249		P: Into<Key> + Send,
250	{
251		if self.closed() {
252			return Err(Error::TxFinished);
253		}
254
255		let db = &self._db;
256		let tree_name = tags.get("tree");
257		let iter = if let Some(t) = tree_name {
258			let tree = db.open_tree(t).unwrap();
259			tree.iter()
260		} else {
261			db.iter()
262		};
263
264		let prefix: Key = prefix.into();
265		let filtered_iterator = filter_with_prefix(iter, prefix);
266
267		Ok(filtered_iterator
268			.map(|pair| {
269				let (k, v) = pair;
270				Ok((k.to_vec(), v.to_vec()))
271			})
272			.collect())
273	}
274
275	async fn suffix_iterate<S>(
276		&self,
277		suffix: S,
278		tags: TagBucket,
279	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
280	where
281		S: Into<Key> + Send,
282	{
283		if self.closed() {
284			return Err(Error::TxFinished);
285		}
286
287		let db = &self._db;
288		let tree_name = tags.get("tree");
289		let iter = if let Some(t) = tree_name {
290			let tree = db.open_tree(t).unwrap();
291			tree.iter()
292		} else {
293			db.iter()
294		};
295
296		let suffix: Key = suffix.into();
297		let filtered_iterator = filter_with_suffix(iter, suffix);
298
299		Ok(filtered_iterator
300			.map(|pair| {
301				let (k, v) = pair;
302				Ok((k.to_vec(), v.to_vec()))
303			})
304			.collect())
305	}
306}