db/storage/kvs/redb/
tx.rs

1use async_trait::async_trait;
2use redb::{RangeIter, ReadableTable, TableDefinition};
3
4use crate::{
5	interface::{Key, KeyValuePair, Val},
6	DBTransaction, Error, SimpleTransaction, TagBucket, CF,
7};
8
9use super::ty::{DBType, TxType};
10
11type TableKey = &'static [u8];
12type TableValue = &'static [u8];
13
14fn filter_with_prefix(
15	iterator: RangeIter<TableKey, TableValue>,
16	prefix: Vec<u8>,
17) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
18	iterator.filter(move |item| -> bool {
19		let (k, _) = *item;
20		k.starts_with(&prefix)
21	})
22}
23
24fn filter_with_suffix(
25	iterator: RangeIter<TableKey, TableValue>,
26	suffix: Vec<u8>,
27) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
28	iterator.filter(move |item| -> bool {
29		let (k, _) = *item;
30		k.ends_with(&suffix)
31	})
32}
33
34fn get_table_name(cf: CF) -> String {
35	let default = "default".as_bytes().to_vec();
36	String::from_utf8(cf.unwrap_or(default)).unwrap()
37}
38
39#[async_trait(?Send)]
40impl SimpleTransaction for DBTransaction<DBType, TxType> {
41	fn closed(&self) -> bool {
42		self.ok
43	}
44
45	async fn count(&mut self, tags: TagBucket) -> Result<usize, Error> {
46		if self.closed() {
47			return Err(Error::TxFinished);
48		}
49
50		let guarded_tx = self.tx.lock().await;
51		let tx = guarded_tx.as_ref().unwrap();
52		let cf = tags.get_bytes("column_family");
53		let name = get_table_name(cf);
54		let def = TableDefinition::<TableKey, TableValue>::new(&name);
55		let table = &tx.open_table(def);
56
57		match table {
58			Ok(t) => Ok(t.len()?),
59			Err(_) => Err(Error::DsNoColumnFamilyFound),
60		}
61	}
62
63	async fn cancel(&mut self) -> Result<(), Error> {
64		if self.ok {
65			return Err(Error::TxFinished);
66		}
67
68		// Mark this transaction as done
69		self.ok = true;
70
71		let mut tx = self.tx.lock().await;
72		match tx.take() {
73			Some(tx) => tx.abort()?,
74			None => unreachable!(),
75		}
76
77		Ok(())
78	}
79
80	async fn commit(&mut self) -> Result<(), Error> {
81		if self.closed() {
82			return Err(Error::TxFinished);
83		}
84
85		// Check to see if transaction is writable
86		if !self.writable {
87			return Err(Error::TxReadonly);
88		}
89
90		// Mark this transaction as done
91		self.ok = true;
92
93		let mut tx = self.tx.lock().await;
94		match tx.take() {
95			Some(tx) => tx.commit()?,
96			None => unreachable!(),
97		}
98
99		Ok(())
100	}
101
102	async fn exi<K>(&self, key: K, tags: TagBucket) -> Result<bool, Error>
103	where
104		K: Into<Key> + Send,
105	{
106		if self.closed() {
107			return Err(Error::TxFinished);
108		}
109
110		let guarded_tx = self.tx.lock().await;
111		let tx = guarded_tx.as_ref().unwrap();
112
113		let cf = tags.get_bytes("column_family");
114		let name = get_table_name(cf);
115		let def = TableDefinition::<TableKey, TableValue>::new(&name);
116		let table = &tx.open_table(def);
117
118		let key = key.into();
119		match table {
120			Ok(t) => Ok(t.get(&key)?.is_some()),
121			Err(_) => Err(Error::DsNoColumnFamilyFound),
122		}
123	}
124	// Fetch a key from the database [column family]
125	async fn get<K>(&self, key: K, tags: TagBucket) -> Result<Option<Val>, Error>
126	where
127		K: Into<Key> + Send,
128	{
129		if self.closed() {
130			return Err(Error::TxFinished);
131		}
132
133		let guarded_tx = self.tx.lock().await;
134		let tx = guarded_tx.as_ref().unwrap();
135
136		let cf = tags.get_bytes("column_family");
137		let name = get_table_name(cf);
138		let def = TableDefinition::<TableKey, TableValue>::new(&name);
139		let table = &tx.open_table(def).unwrap();
140
141		let key = key.into();
142		let result = table.get(&key).unwrap();
143		Ok(result.map(|v| v.to_vec()))
144	}
145	// Insert or update a key in the database
146	async fn set<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
147	where
148		K: Into<Key> + Send,
149		V: Into<Key> + Send,
150	{
151		if self.closed() {
152			return Err(Error::TxFinished);
153		}
154
155		// Check to see if transaction is writable
156		if !self.writable {
157			return Err(Error::TxReadonly);
158		}
159
160		let guarded_tx = self.tx.lock().await;
161		let tx = guarded_tx.as_ref().unwrap();
162
163		let cf = tags.get_bytes("column_family");
164		let name = get_table_name(cf);
165		let def = TableDefinition::<TableKey, TableValue>::new(&name);
166		let (key, val) = (key.into(), val.into());
167
168		let mut table = tx.open_table(def);
169		match table.as_mut() {
170			Ok(t) => t.insert(&key, &val)?,
171			Err(_) => return Err(Error::DsNoColumnFamilyFound),
172		};
173
174		Ok(())
175	}
176
177	// Insert a key if it doesn't exist in the database
178	async fn put<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
179	where
180		K: Into<Key> + Send,
181		V: Into<Key> + Send,
182	{
183		if self.closed() {
184			return Err(Error::TxFinished);
185		}
186
187		// Check to see if transaction is writable
188		if !self.writable {
189			return Err(Error::TxReadonly);
190		}
191
192		let guarded_tx = self.tx.lock().await;
193		let tx = guarded_tx.as_ref().unwrap();
194
195		let cf = tags.get_bytes("column_family");
196		let name = get_table_name(cf);
197		let def = TableDefinition::<TableKey, TableValue>::new(&name);
198		let mut table = tx.open_table(def)?;
199
200		let (key, val) = (key.into(), val.into());
201
202		match table.get(&key)? {
203			None => table.insert(&key, &val)?,
204			_ => return Err(Error::TxConditionNotMet),
205		};
206
207		Ok(())
208	}
209
210	// Delete a key
211	async fn del<K>(&mut self, key: K, tags: TagBucket) -> Result<(), Error>
212	where
213		K: Into<Key> + Send,
214	{
215		if self.closed() {
216			return Err(Error::TxFinished);
217		}
218
219		// Check to see if transaction is writable
220		if !self.writable {
221			return Err(Error::TxReadonly);
222		}
223
224		let guarded_tx = self.tx.lock().await;
225		let tx = guarded_tx.as_ref().unwrap();
226
227		let cf = tags.get_bytes("column_family");
228		let name = get_table_name(cf);
229		let def = TableDefinition::<TableKey, TableValue>::new(&name);
230		let mut table = tx.open_table(def);
231
232		let key = key.into();
233
234		match table.as_mut() {
235			Ok(t) => t.remove(&key)?,
236			Err(_) => return Err(Error::DsNoColumnFamilyFound),
237		};
238
239		Ok(())
240	}
241
242	async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
243		if self.closed() {
244			return Err(Error::TxFinished);
245		}
246
247		let guarded_tx = self.tx.lock().await;
248		let tx = guarded_tx.as_ref().unwrap();
249
250		let cf = tags.get_bytes("column_family");
251		let name = get_table_name(cf);
252		let def = TableDefinition::<TableKey, TableValue>::new(&name);
253		let table = tx.open_table(def);
254
255		let iterator = match table.as_ref() {
256			Ok(t) => t.iter()?,
257			Err(_) => return Err(Error::DsNoColumnFamilyFound),
258		};
259
260		Ok(iterator
261			.map(|p| {
262				let (k, v) = p;
263				Ok((k.to_vec(), v.to_vec()))
264			})
265			.collect())
266	}
267
268	async fn prefix_iterate<P>(
269		&self,
270		prefix: P,
271		tags: TagBucket,
272	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
273	where
274		P: Into<Key> + Send,
275	{
276		if self.closed() {
277			return Err(Error::TxFinished);
278		}
279
280		let guarded_tx = self.tx.lock().await;
281		let tx = guarded_tx.as_ref().unwrap();
282
283		let cf = tags.get_bytes("column_family");
284		let name = get_table_name(cf);
285		let def = TableDefinition::<TableKey, TableValue>::new(&name);
286		let table = tx.open_table(def);
287
288		let iterator = table.as_ref().unwrap().iter()?;
289
290		let prefix: Key = prefix.into();
291		let filtered_iterator = filter_with_prefix(iterator, prefix);
292
293		Ok(filtered_iterator
294			.map(|pair| {
295				let (k, v) = pair;
296				Ok((k.to_vec(), v.to_vec()))
297			})
298			.collect())
299	}
300
301	async fn suffix_iterate<S>(
302		&self,
303		suffix: S,
304		tags: TagBucket,
305	) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
306	where
307		S: Into<Key> + Send,
308	{
309		if self.closed() {
310			return Err(Error::TxFinished);
311		}
312
313		let guarded_tx = self.tx.lock().await;
314		let tx = guarded_tx.as_ref().unwrap();
315
316		let cf = tags.get_bytes("column_family");
317		let name = get_table_name(cf);
318		let def = TableDefinition::<TableKey, TableValue>::new(&name);
319		let table = tx.open_table(def);
320
321		let iterator = table.as_ref().unwrap().iter()?;
322		let suffix: Key = suffix.into();
323		let filtered_iterator = filter_with_suffix(iterator, suffix);
324
325		Ok(filtered_iterator
326			.map(|pair| {
327				let (k, v) = pair;
328				Ok((k.to_vec(), v.to_vec()))
329			})
330			.collect())
331	}
332}