1use async_trait::async_trait;
2use redb::{RangeIter, ReadableTable, TableDefinition};
3
4use crate::{
5 interface::{Key, KeyValuePair, Val},
6 DBTransaction, Error, SimpleTransaction, TagBucket, CF,
7};
8
9use super::ty::{DBType, TxType};
10
11type TableKey = &'static [u8];
12type TableValue = &'static [u8];
13
14fn filter_with_prefix(
15 iterator: RangeIter<TableKey, TableValue>,
16 prefix: Vec<u8>,
17) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
18 iterator.filter(move |item| -> bool {
19 let (k, _) = *item;
20 k.starts_with(&prefix)
21 })
22}
23
24fn filter_with_suffix(
25 iterator: RangeIter<TableKey, TableValue>,
26 suffix: Vec<u8>,
27) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
28 iterator.filter(move |item| -> bool {
29 let (k, _) = *item;
30 k.ends_with(&suffix)
31 })
32}
33
34fn get_table_name(cf: CF) -> String {
35 let default = "default".as_bytes().to_vec();
36 String::from_utf8(cf.unwrap_or(default)).unwrap()
37}
38
39#[async_trait(?Send)]
40impl SimpleTransaction for DBTransaction<DBType, TxType> {
41 fn closed(&self) -> bool {
42 self.ok
43 }
44
45 async fn count(&mut self, tags: TagBucket) -> Result<usize, Error> {
46 if self.closed() {
47 return Err(Error::TxFinished);
48 }
49
50 let guarded_tx = self.tx.lock().await;
51 let tx = guarded_tx.as_ref().unwrap();
52 let cf = tags.get_bytes("column_family");
53 let name = get_table_name(cf);
54 let def = TableDefinition::<TableKey, TableValue>::new(&name);
55 let table = &tx.open_table(def);
56
57 match table {
58 Ok(t) => Ok(t.len()?),
59 Err(_) => Err(Error::DsNoColumnFamilyFound),
60 }
61 }
62
63 async fn cancel(&mut self) -> Result<(), Error> {
64 if self.ok {
65 return Err(Error::TxFinished);
66 }
67
68 self.ok = true;
70
71 let mut tx = self.tx.lock().await;
72 match tx.take() {
73 Some(tx) => tx.abort()?,
74 None => unreachable!(),
75 }
76
77 Ok(())
78 }
79
80 async fn commit(&mut self) -> Result<(), Error> {
81 if self.closed() {
82 return Err(Error::TxFinished);
83 }
84
85 if !self.writable {
87 return Err(Error::TxReadonly);
88 }
89
90 self.ok = true;
92
93 let mut tx = self.tx.lock().await;
94 match tx.take() {
95 Some(tx) => tx.commit()?,
96 None => unreachable!(),
97 }
98
99 Ok(())
100 }
101
102 async fn exi<K>(&self, key: K, tags: TagBucket) -> Result<bool, Error>
103 where
104 K: Into<Key> + Send,
105 {
106 if self.closed() {
107 return Err(Error::TxFinished);
108 }
109
110 let guarded_tx = self.tx.lock().await;
111 let tx = guarded_tx.as_ref().unwrap();
112
113 let cf = tags.get_bytes("column_family");
114 let name = get_table_name(cf);
115 let def = TableDefinition::<TableKey, TableValue>::new(&name);
116 let table = &tx.open_table(def);
117
118 let key = key.into();
119 match table {
120 Ok(t) => Ok(t.get(&key)?.is_some()),
121 Err(_) => Err(Error::DsNoColumnFamilyFound),
122 }
123 }
124 async fn get<K>(&self, key: K, tags: TagBucket) -> Result<Option<Val>, Error>
126 where
127 K: Into<Key> + Send,
128 {
129 if self.closed() {
130 return Err(Error::TxFinished);
131 }
132
133 let guarded_tx = self.tx.lock().await;
134 let tx = guarded_tx.as_ref().unwrap();
135
136 let cf = tags.get_bytes("column_family");
137 let name = get_table_name(cf);
138 let def = TableDefinition::<TableKey, TableValue>::new(&name);
139 let table = &tx.open_table(def).unwrap();
140
141 let key = key.into();
142 let result = table.get(&key).unwrap();
143 Ok(result.map(|v| v.to_vec()))
144 }
145 async fn set<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
147 where
148 K: Into<Key> + Send,
149 V: Into<Key> + Send,
150 {
151 if self.closed() {
152 return Err(Error::TxFinished);
153 }
154
155 if !self.writable {
157 return Err(Error::TxReadonly);
158 }
159
160 let guarded_tx = self.tx.lock().await;
161 let tx = guarded_tx.as_ref().unwrap();
162
163 let cf = tags.get_bytes("column_family");
164 let name = get_table_name(cf);
165 let def = TableDefinition::<TableKey, TableValue>::new(&name);
166 let (key, val) = (key.into(), val.into());
167
168 let mut table = tx.open_table(def);
169 match table.as_mut() {
170 Ok(t) => t.insert(&key, &val)?,
171 Err(_) => return Err(Error::DsNoColumnFamilyFound),
172 };
173
174 Ok(())
175 }
176
177 async fn put<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
179 where
180 K: Into<Key> + Send,
181 V: Into<Key> + Send,
182 {
183 if self.closed() {
184 return Err(Error::TxFinished);
185 }
186
187 if !self.writable {
189 return Err(Error::TxReadonly);
190 }
191
192 let guarded_tx = self.tx.lock().await;
193 let tx = guarded_tx.as_ref().unwrap();
194
195 let cf = tags.get_bytes("column_family");
196 let name = get_table_name(cf);
197 let def = TableDefinition::<TableKey, TableValue>::new(&name);
198 let mut table = tx.open_table(def)?;
199
200 let (key, val) = (key.into(), val.into());
201
202 match table.get(&key)? {
203 None => table.insert(&key, &val)?,
204 _ => return Err(Error::TxConditionNotMet),
205 };
206
207 Ok(())
208 }
209
210 async fn del<K>(&mut self, key: K, tags: TagBucket) -> Result<(), Error>
212 where
213 K: Into<Key> + Send,
214 {
215 if self.closed() {
216 return Err(Error::TxFinished);
217 }
218
219 if !self.writable {
221 return Err(Error::TxReadonly);
222 }
223
224 let guarded_tx = self.tx.lock().await;
225 let tx = guarded_tx.as_ref().unwrap();
226
227 let cf = tags.get_bytes("column_family");
228 let name = get_table_name(cf);
229 let def = TableDefinition::<TableKey, TableValue>::new(&name);
230 let mut table = tx.open_table(def);
231
232 let key = key.into();
233
234 match table.as_mut() {
235 Ok(t) => t.remove(&key)?,
236 Err(_) => return Err(Error::DsNoColumnFamilyFound),
237 };
238
239 Ok(())
240 }
241
242 async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
243 if self.closed() {
244 return Err(Error::TxFinished);
245 }
246
247 let guarded_tx = self.tx.lock().await;
248 let tx = guarded_tx.as_ref().unwrap();
249
250 let cf = tags.get_bytes("column_family");
251 let name = get_table_name(cf);
252 let def = TableDefinition::<TableKey, TableValue>::new(&name);
253 let table = tx.open_table(def);
254
255 let iterator = match table.as_ref() {
256 Ok(t) => t.iter()?,
257 Err(_) => return Err(Error::DsNoColumnFamilyFound),
258 };
259
260 Ok(iterator
261 .map(|p| {
262 let (k, v) = p;
263 Ok((k.to_vec(), v.to_vec()))
264 })
265 .collect())
266 }
267
268 async fn prefix_iterate<P>(
269 &self,
270 prefix: P,
271 tags: TagBucket,
272 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
273 where
274 P: Into<Key> + Send,
275 {
276 if self.closed() {
277 return Err(Error::TxFinished);
278 }
279
280 let guarded_tx = self.tx.lock().await;
281 let tx = guarded_tx.as_ref().unwrap();
282
283 let cf = tags.get_bytes("column_family");
284 let name = get_table_name(cf);
285 let def = TableDefinition::<TableKey, TableValue>::new(&name);
286 let table = tx.open_table(def);
287
288 let iterator = table.as_ref().unwrap().iter()?;
289
290 let prefix: Key = prefix.into();
291 let filtered_iterator = filter_with_prefix(iterator, prefix);
292
293 Ok(filtered_iterator
294 .map(|pair| {
295 let (k, v) = pair;
296 Ok((k.to_vec(), v.to_vec()))
297 })
298 .collect())
299 }
300
301 async fn suffix_iterate<S>(
302 &self,
303 suffix: S,
304 tags: TagBucket,
305 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
306 where
307 S: Into<Key> + Send,
308 {
309 if self.closed() {
310 return Err(Error::TxFinished);
311 }
312
313 let guarded_tx = self.tx.lock().await;
314 let tx = guarded_tx.as_ref().unwrap();
315
316 let cf = tags.get_bytes("column_family");
317 let name = get_table_name(cf);
318 let def = TableDefinition::<TableKey, TableValue>::new(&name);
319 let table = tx.open_table(def);
320
321 let iterator = table.as_ref().unwrap().iter()?;
322 let suffix: Key = suffix.into();
323 let filtered_iterator = filter_with_suffix(iterator, suffix);
324
325 Ok(filtered_iterator
326 .map(|pair| {
327 let (k, v) = pair;
328 Ok((k.to_vec(), v.to_vec()))
329 })
330 .collect())
331 }
332}