db/storage/kvs/redb/
tx.rs

1use async_trait::async_trait;
2use redb::{RangeIter, ReadableTable, TableDefinition};
3
4use crate::{
5	interface::{Key, KeyValuePair, Val},
6	DBTransaction, Error, SimpleTransaction, CF,
7};
8
9use super::ty::{DBType, TxType};
10
11type TableKey = &'static [u8];
12type TableValue = &'static [u8];
13
14fn filter_with_prefix(
15	iterator: RangeIter<TableKey, TableValue>,
16	prefix: Vec<u8>,
17) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
18	iterator.filter(move |item| -> bool {
19		let (k, _) = *item;
20		k.starts_with(&prefix)
21	})
22}
23
24fn filter_with_suffix(
25	iterator: RangeIter<TableKey, TableValue>,
26	suffix: Vec<u8>,
27) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
28	iterator.filter(move |item| -> bool {
29		let (k, _) = *item;
30		k.ends_with(&suffix)
31	})
32}
33
34fn get_table_name(cf: CF) -> String {
35	String::from_utf8(cf.unwrap()).unwrap()
36}
37
38#[async_trait(?Send)]
39impl SimpleTransaction for DBTransaction<DBType, TxType> {
40	fn closed(&self) -> bool {
41		self.ok
42	}
43
44	async fn count(&mut self, cf: CF) -> Result<usize, Error> {
45		if self.closed() {
46			return Err(Error::TxFinished);
47		}
48
49		let guarded_tx = self.tx.lock().await;
50		let tx = guarded_tx.as_ref().unwrap();
51
52		let name = get_table_name(cf);
53		let def = TableDefinition::<TableKey, TableValue>::new(&name);
54		let table = &tx.open_table(def);
55
56		match table {
57			Ok(t) => Ok(t.len()?),
58			Err(_) => Err(Error::DsNoColumnFamilyFound),
59		}
60	}
61
62	async fn cancel(&mut self) -> Result<(), Error> {
63		if self.ok {
64			return Err(Error::TxFinished);
65		}
66
67		// Mark this transaction as done
68		self.ok = true;
69
70		let mut tx = self.tx.lock().await;
71		match tx.take() {
72			Some(tx) => tx.abort()?,
73			None => unreachable!(),
74		}
75
76		Ok(())
77	}
78
79	async fn commit(&mut self) -> Result<(), Error> {
80		if self.closed() {
81			return Err(Error::TxFinished);
82		}
83
84		// Check to see if transaction is writable
85		if !self.writable {
86			return Err(Error::TxReadonly);
87		}
88
89		// Mark this transaction as done
90		self.ok = true;
91
92		let mut tx = self.tx.lock().await;
93		match tx.take() {
94			Some(tx) => tx.commit()?,
95			None => unreachable!(),
96		}
97
98		Ok(())
99	}
100
101	async fn exi<K>(&self, cf: CF, key: K) -> Result<bool, Error>
102	where
103		K: Into<Key> + Send,
104	{
105		if self.closed() {
106			return Err(Error::TxFinished);
107		}
108
109		let guarded_tx = self.tx.lock().await;
110		let tx = guarded_tx.as_ref().unwrap();
111
112		let name = get_table_name(cf);
113		let def = TableDefinition::<TableKey, TableValue>::new(&name);
114		let table = &tx.open_table(def);
115
116		let key = key.into();
117		match table {
118			Ok(t) => Ok(t.get(&key)?.is_some()),
119			Err(_) => Err(Error::DsNoColumnFamilyFound),
120		}
121	}
122	// Fetch a key from the database [column family]
123	async fn get<K>(&self, cf: CF, key: K) -> Result<Option<Val>, Error>
124	where
125		K: Into<Key> + Send,
126	{
127		if self.closed() {
128			return Err(Error::TxFinished);
129		}
130
131		let guarded_tx = self.tx.lock().await;
132		let tx = guarded_tx.as_ref().unwrap();
133
134		let name = get_table_name(cf);
135		let def = TableDefinition::<TableKey, TableValue>::new(&name);
136		let table = &tx.open_table(def).unwrap();
137
138		let key = key.into();
139		let result = table.get(&key).unwrap();
140		Ok(result.map(|v| v.to_vec()))
141	}
142	// Insert or update a key in the database
143	async fn set<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
144	where
145		K: Into<Key> + Send,
146		V: Into<Key> + Send,
147	{
148		if self.closed() {
149			return Err(Error::TxFinished);
150		}
151
152		// Check to see if transaction is writable
153		if !self.writable {
154			return Err(Error::TxReadonly);
155		}
156
157		let guarded_tx = self.tx.lock().await;
158		let tx = guarded_tx.as_ref().unwrap();
159
160		let name = get_table_name(cf);
161		let def = TableDefinition::<TableKey, TableValue>::new(&name);
162		let (key, val) = (key.into(), val.into());
163
164		let mut table = tx.open_table(def);
165		match table.as_mut() {
166			Ok(t) => t.insert(&key, &val)?,
167			Err(_) => return Err(Error::DsNoColumnFamilyFound),
168		};
169
170		Ok(())
171	}
172
173	// Insert a key if it doesn't exist in the database
174	async fn put<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
175	where
176		K: Into<Key> + Send,
177		V: Into<Key> + Send,
178	{
179		if self.closed() {
180			return Err(Error::TxFinished);
181		}
182
183		// Check to see if transaction is writable
184		if !self.writable {
185			return Err(Error::TxReadonly);
186		}
187
188		let guarded_tx = self.tx.lock().await;
189		let tx = guarded_tx.as_ref().unwrap();
190
191		let name = get_table_name(cf);
192		let def = TableDefinition::<TableKey, TableValue>::new(&name);
193		let mut table = tx.open_table(def)?;
194
195		let (key, val) = (key.into(), val.into());
196
197		match table.get(&key)? {
198			None => table.insert(&key, &val)?,
199			_ => return Err(Error::TxConditionNotMet),
200		};
201
202		Ok(())
203	}
204
205	// Delete a key
206	async fn del<K>(&mut self, cf: CF, key: K) -> Result<(), Error>
207	where
208		K: Into<Key> + Send,
209	{
210		if self.closed() {
211			return Err(Error::TxFinished);
212		}
213
214		// Check to see if transaction is writable
215		if !self.writable {
216			return Err(Error::TxReadonly);
217		}
218
219		let guarded_tx = self.tx.lock().await;
220		let tx = guarded_tx.as_ref().unwrap();
221
222		let name = get_table_name(cf);
223		let def = TableDefinition::<TableKey, TableValue>::new(&name);
224		let mut table = tx.open_table(def);
225
226		let key = key.into();
227
228		match table.as_mut() {
229			Ok(t) => t.remove(&key)?,
230			Err(_) => return Err(Error::DsNoColumnFamilyFound),
231		};
232
233		Ok(())
234	}
235
236	async fn iterate(&self, cf: CF) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
237		if self.closed() {
238			return Err(Error::TxFinished);
239		}
240
241		let guarded_tx = self.tx.lock().await;
242		let tx = guarded_tx.as_ref().unwrap();
243
244		let name = get_table_name(cf);
245		let def = TableDefinition::<TableKey, TableValue>::new(&name);
246		let table = tx.open_table(def);
247
248		let iterator = match table.as_ref() {
249			Ok(t) => t.iter()?,
250			Err(_) => return Err(Error::DsNoColumnFamilyFound),
251		};
252
253		Ok(iterator
254			.map(|p| {
255				let (k, v) = p;
256				Ok((k.to_vec(), v.to_vec()))
257			})
258			.collect())
259	}
260
261	async fn prefix_iterate<P>(
262		&self,
263		cf: CF,
264		prefix: P,
265	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
266	where
267		P: Into<Key> + Send,
268	{
269		if self.closed() {
270			return Err(Error::TxFinished);
271		}
272
273		let guarded_tx = self.tx.lock().await;
274		let tx = guarded_tx.as_ref().unwrap();
275
276		let name = get_table_name(cf);
277		let def = TableDefinition::<TableKey, TableValue>::new(&name);
278		let table = tx.open_table(def);
279
280		let iterator = table.as_ref().unwrap().iter()?;
281
282		let prefix: Key = prefix.into();
283		let filtered_iterator = filter_with_prefix(iterator, prefix);
284
285		Ok(filtered_iterator
286			.map(|pair| {
287				let (k, v) = pair;
288				Ok((k.to_vec(), v.to_vec()))
289			})
290			.collect())
291	}
292
293	async fn suffix_iterate<S>(
294		&self,
295		cf: CF,
296		suffix: S,
297	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
298	where
299		S: Into<Key> + Send,
300	{
301		if self.closed() {
302			return Err(Error::TxFinished);
303		}
304
305		let guarded_tx = self.tx.lock().await;
306		let tx = guarded_tx.as_ref().unwrap();
307
308		let name = get_table_name(cf);
309		let def = TableDefinition::<TableKey, TableValue>::new(&name);
310		let table = tx.open_table(def);
311
312		let iterator = table.as_ref().unwrap().iter()?;
313		let suffix: Key = suffix.into();
314		let filtered_iterator = filter_with_suffix(iterator, suffix);
315
316		Ok(filtered_iterator
317			.map(|pair| {
318				let (k, v) = pair;
319				Ok((k.to_vec(), v.to_vec()))
320			})
321			.collect())
322	}
323}