1use async_trait::async_trait;
2use redb::{RangeIter, ReadableTable, TableDefinition};
3
4use crate::{
5 interface::{Key, KeyValuePair, Val},
6 DBTransaction, Error, SimpleTransaction, CF,
7};
8
9use super::ty::{DBType, TxType};
10
11type TableKey = &'static [u8];
12type TableValue = &'static [u8];
13
14fn filter_with_prefix(
15 iterator: RangeIter<TableKey, TableValue>,
16 prefix: Vec<u8>,
17) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
18 iterator.filter(move |item| -> bool {
19 let (k, _) = *item;
20 k.starts_with(&prefix)
21 })
22}
23
24fn filter_with_suffix(
25 iterator: RangeIter<TableKey, TableValue>,
26 suffix: Vec<u8>,
27) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
28 iterator.filter(move |item| -> bool {
29 let (k, _) = *item;
30 k.ends_with(&suffix)
31 })
32}
33
34fn get_table_name(cf: CF) -> String {
35 String::from_utf8(cf.unwrap()).unwrap()
36}
37
38#[async_trait(?Send)]
39impl SimpleTransaction for DBTransaction<DBType, TxType> {
40 fn closed(&self) -> bool {
41 self.ok
42 }
43
44 async fn count(&mut self, cf: CF) -> Result<usize, Error> {
45 if self.closed() {
46 return Err(Error::TxFinished);
47 }
48
49 let guarded_tx = self.tx.lock().await;
50 let tx = guarded_tx.as_ref().unwrap();
51
52 let name = get_table_name(cf);
53 let def = TableDefinition::<TableKey, TableValue>::new(&name);
54 let table = &tx.open_table(def);
55
56 match table {
57 Ok(t) => Ok(t.len()?),
58 Err(_) => Err(Error::DsNoColumnFamilyFound),
59 }
60 }
61
62 async fn cancel(&mut self) -> Result<(), Error> {
63 if self.ok {
64 return Err(Error::TxFinished);
65 }
66
67 self.ok = true;
69
70 let mut tx = self.tx.lock().await;
71 match tx.take() {
72 Some(tx) => tx.abort()?,
73 None => unreachable!(),
74 }
75
76 Ok(())
77 }
78
79 async fn commit(&mut self) -> Result<(), Error> {
80 if self.closed() {
81 return Err(Error::TxFinished);
82 }
83
84 if !self.writable {
86 return Err(Error::TxReadonly);
87 }
88
89 self.ok = true;
91
92 let mut tx = self.tx.lock().await;
93 match tx.take() {
94 Some(tx) => tx.commit()?,
95 None => unreachable!(),
96 }
97
98 Ok(())
99 }
100
101 async fn exi<K>(&self, cf: CF, key: K) -> Result<bool, Error>
102 where
103 K: Into<Key> + Send,
104 {
105 if self.closed() {
106 return Err(Error::TxFinished);
107 }
108
109 let guarded_tx = self.tx.lock().await;
110 let tx = guarded_tx.as_ref().unwrap();
111
112 let name = get_table_name(cf);
113 let def = TableDefinition::<TableKey, TableValue>::new(&name);
114 let table = &tx.open_table(def);
115
116 let key = key.into();
117 match table {
118 Ok(t) => Ok(t.get(&key)?.is_some()),
119 Err(_) => Err(Error::DsNoColumnFamilyFound),
120 }
121 }
122 async fn get<K>(&self, cf: CF, key: K) -> Result<Option<Val>, Error>
124 where
125 K: Into<Key> + Send,
126 {
127 if self.closed() {
128 return Err(Error::TxFinished);
129 }
130
131 let guarded_tx = self.tx.lock().await;
132 let tx = guarded_tx.as_ref().unwrap();
133
134 let name = get_table_name(cf);
135 let def = TableDefinition::<TableKey, TableValue>::new(&name);
136 let table = &tx.open_table(def).unwrap();
137
138 let key = key.into();
139 let result = table.get(&key).unwrap();
140 Ok(result.map(|v| v.to_vec()))
141 }
142 async fn set<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
144 where
145 K: Into<Key> + Send,
146 V: Into<Key> + Send,
147 {
148 if self.closed() {
149 return Err(Error::TxFinished);
150 }
151
152 if !self.writable {
154 return Err(Error::TxReadonly);
155 }
156
157 let guarded_tx = self.tx.lock().await;
158 let tx = guarded_tx.as_ref().unwrap();
159
160 let name = get_table_name(cf);
161 let def = TableDefinition::<TableKey, TableValue>::new(&name);
162 let (key, val) = (key.into(), val.into());
163
164 let mut table = tx.open_table(def);
165 match table.as_mut() {
166 Ok(t) => t.insert(&key, &val)?,
167 Err(_) => return Err(Error::DsNoColumnFamilyFound),
168 };
169
170 Ok(())
171 }
172
173 async fn put<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
175 where
176 K: Into<Key> + Send,
177 V: Into<Key> + Send,
178 {
179 if self.closed() {
180 return Err(Error::TxFinished);
181 }
182
183 if !self.writable {
185 return Err(Error::TxReadonly);
186 }
187
188 let guarded_tx = self.tx.lock().await;
189 let tx = guarded_tx.as_ref().unwrap();
190
191 let name = get_table_name(cf);
192 let def = TableDefinition::<TableKey, TableValue>::new(&name);
193 let mut table = tx.open_table(def)?;
194
195 let (key, val) = (key.into(), val.into());
196
197 match table.get(&key)? {
198 None => table.insert(&key, &val)?,
199 _ => return Err(Error::TxConditionNotMet),
200 };
201
202 Ok(())
203 }
204
205 async fn del<K>(&mut self, cf: CF, key: K) -> Result<(), Error>
207 where
208 K: Into<Key> + Send,
209 {
210 if self.closed() {
211 return Err(Error::TxFinished);
212 }
213
214 if !self.writable {
216 return Err(Error::TxReadonly);
217 }
218
219 let guarded_tx = self.tx.lock().await;
220 let tx = guarded_tx.as_ref().unwrap();
221
222 let name = get_table_name(cf);
223 let def = TableDefinition::<TableKey, TableValue>::new(&name);
224 let mut table = tx.open_table(def);
225
226 let key = key.into();
227
228 match table.as_mut() {
229 Ok(t) => t.remove(&key)?,
230 Err(_) => return Err(Error::DsNoColumnFamilyFound),
231 };
232
233 Ok(())
234 }
235
236 async fn iterate(&self, cf: CF) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
237 if self.closed() {
238 return Err(Error::TxFinished);
239 }
240
241 let guarded_tx = self.tx.lock().await;
242 let tx = guarded_tx.as_ref().unwrap();
243
244 let name = get_table_name(cf);
245 let def = TableDefinition::<TableKey, TableValue>::new(&name);
246 let table = tx.open_table(def);
247
248 let iterator = match table.as_ref() {
249 Ok(t) => t.iter()?,
250 Err(_) => return Err(Error::DsNoColumnFamilyFound),
251 };
252
253 Ok(iterator
254 .map(|p| {
255 let (k, v) = p;
256 Ok((k.to_vec(), v.to_vec()))
257 })
258 .collect())
259 }
260
261 async fn prefix_iterate<P>(
262 &self,
263 cf: CF,
264 prefix: P,
265 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
266 where
267 P: Into<Key> + Send,
268 {
269 if self.closed() {
270 return Err(Error::TxFinished);
271 }
272
273 let guarded_tx = self.tx.lock().await;
274 let tx = guarded_tx.as_ref().unwrap();
275
276 let name = get_table_name(cf);
277 let def = TableDefinition::<TableKey, TableValue>::new(&name);
278 let table = tx.open_table(def);
279
280 let iterator = table.as_ref().unwrap().iter()?;
281
282 let prefix: Key = prefix.into();
283 let filtered_iterator = filter_with_prefix(iterator, prefix);
284
285 Ok(filtered_iterator
286 .map(|pair| {
287 let (k, v) = pair;
288 Ok((k.to_vec(), v.to_vec()))
289 })
290 .collect())
291 }
292
293 async fn suffix_iterate<S>(
294 &self,
295 cf: CF,
296 suffix: S,
297 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
298 where
299 S: Into<Key> + Send,
300 {
301 if self.closed() {
302 return Err(Error::TxFinished);
303 }
304
305 let guarded_tx = self.tx.lock().await;
306 let tx = guarded_tx.as_ref().unwrap();
307
308 let name = get_table_name(cf);
309 let def = TableDefinition::<TableKey, TableValue>::new(&name);
310 let table = tx.open_table(def);
311
312 let iterator = table.as_ref().unwrap().iter()?;
313 let suffix: Key = suffix.into();
314 let filtered_iterator = filter_with_suffix(iterator, suffix);
315
316 Ok(filtered_iterator
317 .map(|pair| {
318 let (k, v) = pair;
319 Ok((k.to_vec(), v.to_vec()))
320 })
321 .collect())
322 }
323}