db/storage/kvs/rocksdb/
tx.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use rocksdb::{BoundColumnFamily, DBAccess, DBIteratorWithThreadMode, IteratorMode};
5
6use super::ty::{DBType, TxType};
7use crate::{
8	err::Error,
9	interface::{
10		kv::{Key, Val},
11		KeyValuePair,
12	},
13	model::{DBTransaction, SimpleTransaction},
14	CF,
15};
16
17fn take_with_prefix<T: DBAccess>(
18	iterator: DBIteratorWithThreadMode<T>,
19	prefix: Vec<u8>,
20) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>> + '_ {
21	iterator.take_while(move |item| -> bool {
22		if let Ok((ref k, _)) = *item {
23			k.starts_with(&prefix)
24		} else {
25			true
26		}
27	})
28}
29
30fn take_with_suffix<T: DBAccess>(
31	iterator: DBIteratorWithThreadMode<T>,
32	suffix: Vec<u8>,
33) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>> + '_ {
34	iterator.take_while(move |item| -> bool {
35		if let Ok((ref k, _)) = *item {
36			k.ends_with(&suffix)
37		} else {
38			true
39		}
40	})
41}
42
43impl DBTransaction<DBType, TxType> {
44	fn get_column_family(&self, cf: CF) -> Result<Arc<BoundColumnFamily>, Error> {
45		if cf.is_none() {
46			return Err(Error::DsColumnFamilyIsNotValid);
47		}
48		let cf_name = String::from_utf8(cf.unwrap()).unwrap();
49		let bounded_cf = self._db.cf_handle(&cf_name);
50
51		match bounded_cf {
52			Some(cf) => Ok(cf),
53			_ => Err(Error::DsNoColumnFamilyFound),
54		}
55	}
56}
57
58#[async_trait(?Send)]
59impl SimpleTransaction for DBTransaction<DBType, TxType> {
60	fn closed(&self) -> bool {
61		self.ok
62	}
63
64	async fn count(&mut self, cf: CF) -> Result<usize, Error> {
65		if self.closed() {
66			return Err(Error::TxFinished);
67		}
68
69		let guarded_tx = self.tx.lock().await;
70		let tx = guarded_tx.as_ref().unwrap();
71		let cf = &self.get_column_family(cf).unwrap();
72		Ok(tx.iterator_cf(cf, IteratorMode::Start).count())
73	}
74
75	async fn cancel(&mut self) -> Result<(), Error> {
76		if self.ok {
77			return Err(Error::TxFinished);
78		}
79
80		// Mark this transaction as done
81		self.ok = true;
82
83		let mut tx = self.tx.lock().await;
84		match tx.take() {
85			Some(tx) => tx.rollback()?,
86			None => unreachable!(),
87		}
88
89		Ok(())
90	}
91
92	async fn commit(&mut self) -> Result<(), Error> {
93		if self.closed() {
94			return Err(Error::TxFinished);
95		}
96
97		// Check to see if transaction is writable
98		if !self.writable {
99			return Err(Error::TxReadonly);
100		}
101
102		// Mark this transaction as done
103		self.ok = true;
104
105		let mut tx = self.tx.lock().await;
106		match tx.take() {
107			Some(tx) => tx.commit()?,
108			None => unreachable!(),
109		}
110
111		Ok(())
112	}
113
114	async fn exi<K>(&self, cf: CF, key: K) -> Result<bool, Error>
115	where
116		K: Into<Key> + Send,
117	{
118		if self.closed() {
119			return Err(Error::TxFinished);
120		}
121
122		let tx = self.tx.lock().await;
123		let cf = &self.get_column_family(cf).unwrap();
124		let result = tx.as_ref().unwrap().get_cf(cf, key.into()).unwrap().is_some();
125
126		Ok(result)
127	}
128	// Fetch a key from the database [column family]
129	async fn get<K>(&self, cf: CF, key: K) -> Result<Option<Val>, Error>
130	where
131		K: Into<Key> + Send,
132	{
133		if self.closed() {
134			return Err(Error::TxFinished);
135		}
136
137		let guarded_tx = self.tx.lock().await;
138		let tx = guarded_tx.as_ref().unwrap();
139		let cf = &self.get_column_family(cf).unwrap();
140		Ok(tx.get_cf(cf, key.into()).unwrap())
141	}
142
143	async fn multi_get<K>(&self, cf: CF, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
144	where
145		K: Into<Key> + Send + AsRef<[u8]>,
146	{
147		if self.closed() {
148			return Err(Error::TxFinished);
149		}
150
151		let guarded_tx = self.tx.lock().await;
152		let tx = guarded_tx.as_ref().unwrap();
153		let mut values = vec![];
154		let cf = &self.get_column_family(cf).unwrap();
155		for key in keys.iter() {
156			let value = tx.get_cf(cf, key).unwrap();
157			values.push(value);
158		}
159		Ok(values)
160	}
161	// Insert or update a key in the database
162	async fn set<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
163	where
164		K: Into<Key> + Send,
165		V: Into<Key> + Send,
166	{
167		if self.closed() {
168			return Err(Error::TxFinished);
169		}
170
171		// Check to see if transaction is writable
172		if !self.writable {
173			return Err(Error::TxReadonly);
174		}
175
176		// Set the key
177		let guarded_tx = self.tx.lock().await;
178		let tx = guarded_tx.as_ref().unwrap();
179		let cf = &self.get_column_family(cf).unwrap();
180		tx.put_cf(cf, key.into(), val.into())?;
181		Ok(())
182	}
183
184	// Insert a key if it doesn't exist in the database
185	async fn put<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
186	where
187		K: Into<Key> + Send,
188		V: Into<Key> + Send,
189	{
190		if self.closed() {
191			return Err(Error::TxFinished);
192		}
193
194		// Check to see if transaction is writable
195		if !self.writable {
196			return Err(Error::TxReadonly);
197		}
198
199		// Future tx
200		let guarded_tx = self.tx.lock().await;
201		let tx = guarded_tx.as_ref().unwrap();
202		let (key, val) = (key.into(), val.into());
203
204		let cf = &self.get_column_family(cf).unwrap();
205		match tx.get_cf(cf, &key)? {
206			None => tx.put_cf(cf, key, val)?,
207			_ => return Err(Error::TxConditionNotMet),
208		};
209		Ok(())
210	}
211
212	// Delete a key
213	async fn del<K>(&mut self, cf: CF, key: K) -> Result<(), Error>
214	where
215		K: Into<Key> + Send,
216	{
217		if self.closed() {
218			return Err(Error::TxFinished);
219		}
220
221		// Check to see if transaction is writable
222		if !self.writable {
223			return Err(Error::TxReadonly);
224		}
225
226		let key = key.into();
227		let guarded_tx = self.tx.lock().await;
228		let tx = guarded_tx.as_ref().unwrap();
229
230		let cf = &self.get_column_family(cf).unwrap();
231		match tx.get_cf(cf, &key)? {
232			Some(_v) => tx.delete_cf(cf, key)?,
233			None => return Err(Error::TxnKeyNotFound),
234		};
235
236		Ok(())
237	}
238
239	// Iterate key value elements with handler
240	async fn iterate(&self, cf: CF) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
241		if self.closed() {
242			return Err(Error::TxFinished);
243		}
244
245		let guarded_tx = self.tx.lock().await;
246		let tx = guarded_tx.as_ref().unwrap();
247
248		let cf = &self.get_column_family(cf).unwrap();
249
250		let iterator = tx.iterator_cf(cf, IteratorMode::Start);
251
252		Ok(iterator
253			.map(|pair| {
254				let (k, v) = pair.unwrap();
255				Ok((k.to_vec(), v.to_vec()))
256			})
257			.collect())
258	}
259
260	async fn suffix_iterate<S>(
261		&self,
262		cf: CF,
263		suffix: S,
264	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
265	where
266		S: Into<Key> + Send,
267	{
268		if self.closed() {
269			return Err(Error::TxFinished);
270		}
271
272		let guarded_tx = self.tx.lock().await;
273		let tx = guarded_tx.as_ref().unwrap();
274		let cf = &self.get_column_family(cf).unwrap();
275		let suffix: Key = suffix.into();
276
277		let iterator = tx.iterator_cf(cf, IteratorMode::Start);
278		let taken_iterator = take_with_suffix(iterator, suffix);
279
280		Ok(taken_iterator
281			.map(|pair| {
282				let (k, v) = pair.unwrap();
283				Ok((k.to_vec(), v.to_vec()))
284			})
285			.collect())
286	}
287
288	// Iterate key value elements with handler
289	async fn prefix_iterate<P>(
290		&self,
291		cf: CF,
292		prefix: P,
293	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
294	where
295		P: Into<Key> + Send,
296	{
297		if self.closed() {
298			return Err(Error::TxFinished);
299		}
300
301		let guarded_tx = self.tx.lock().await;
302		let tx = guarded_tx.as_ref().unwrap();
303		let cf = &self.get_column_family(cf).unwrap();
304		let prefix: Key = prefix.into();
305		let iterator = tx.prefix_iterator_cf(cf, &prefix);
306		let taken_iterator = take_with_prefix(iterator, prefix);
307
308		Ok(taken_iterator
309			.map(|v| {
310				let (k, v) = v.unwrap();
311				Ok((k.to_vec(), v.to_vec()))
312			})
313			.collect())
314	}
315}