1use std::sync::Arc;
2
3use async_trait::async_trait;
4use rocksdb::{BoundColumnFamily, DBAccess, DBIteratorWithThreadMode, IteratorMode};
5
6use super::ty::{DBType, TxType};
7use crate::{
8 err::Error,
9 interface::{
10 kv::{Key, Val},
11 KeyValuePair,
12 },
13 model::{DBTransaction, SimpleTransaction},
14 CF,
15};
16
17fn take_with_prefix<T: DBAccess>(
18 iterator: DBIteratorWithThreadMode<T>,
19 prefix: Vec<u8>,
20) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>> + '_ {
21 iterator.take_while(move |item| -> bool {
22 if let Ok((ref k, _)) = *item {
23 k.starts_with(&prefix)
24 } else {
25 true
26 }
27 })
28}
29
30fn take_with_suffix<T: DBAccess>(
31 iterator: DBIteratorWithThreadMode<T>,
32 suffix: Vec<u8>,
33) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>> + '_ {
34 iterator.take_while(move |item| -> bool {
35 if let Ok((ref k, _)) = *item {
36 k.ends_with(&suffix)
37 } else {
38 true
39 }
40 })
41}
42
43impl DBTransaction<DBType, TxType> {
44 fn get_column_family(&self, cf: CF) -> Result<Arc<BoundColumnFamily>, Error> {
45 if cf.is_none() {
46 return Err(Error::DsColumnFamilyIsNotValid);
47 }
48 let cf_name = String::from_utf8(cf.unwrap()).unwrap();
49 let bounded_cf = self._db.cf_handle(&cf_name);
50
51 match bounded_cf {
52 Some(cf) => Ok(cf),
53 _ => Err(Error::DsNoColumnFamilyFound),
54 }
55 }
56}
57
58#[async_trait(?Send)]
59impl SimpleTransaction for DBTransaction<DBType, TxType> {
60 fn closed(&self) -> bool {
61 self.ok
62 }
63
64 async fn count(&mut self, cf: CF) -> Result<usize, Error> {
65 if self.closed() {
66 return Err(Error::TxFinished);
67 }
68
69 let guarded_tx = self.tx.lock().await;
70 let tx = guarded_tx.as_ref().unwrap();
71 let cf = &self.get_column_family(cf).unwrap();
72 Ok(tx.iterator_cf(cf, IteratorMode::Start).count())
73 }
74
75 async fn cancel(&mut self) -> Result<(), Error> {
76 if self.ok {
77 return Err(Error::TxFinished);
78 }
79
80 self.ok = true;
82
83 let mut tx = self.tx.lock().await;
84 match tx.take() {
85 Some(tx) => tx.rollback()?,
86 None => unreachable!(),
87 }
88
89 Ok(())
90 }
91
92 async fn commit(&mut self) -> Result<(), Error> {
93 if self.closed() {
94 return Err(Error::TxFinished);
95 }
96
97 if !self.writable {
99 return Err(Error::TxReadonly);
100 }
101
102 self.ok = true;
104
105 let mut tx = self.tx.lock().await;
106 match tx.take() {
107 Some(tx) => tx.commit()?,
108 None => unreachable!(),
109 }
110
111 Ok(())
112 }
113
114 async fn exi<K>(&self, cf: CF, key: K) -> Result<bool, Error>
115 where
116 K: Into<Key> + Send,
117 {
118 if self.closed() {
119 return Err(Error::TxFinished);
120 }
121
122 let tx = self.tx.lock().await;
123 let cf = &self.get_column_family(cf).unwrap();
124 let result = tx.as_ref().unwrap().get_cf(cf, key.into()).unwrap().is_some();
125
126 Ok(result)
127 }
128 async fn get<K>(&self, cf: CF, key: K) -> Result<Option<Val>, Error>
130 where
131 K: Into<Key> + Send,
132 {
133 if self.closed() {
134 return Err(Error::TxFinished);
135 }
136
137 let guarded_tx = self.tx.lock().await;
138 let tx = guarded_tx.as_ref().unwrap();
139 let cf = &self.get_column_family(cf).unwrap();
140 Ok(tx.get_cf(cf, key.into()).unwrap())
141 }
142
143 async fn multi_get<K>(&self, cf: CF, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
144 where
145 K: Into<Key> + Send + AsRef<[u8]>,
146 {
147 if self.closed() {
148 return Err(Error::TxFinished);
149 }
150
151 let guarded_tx = self.tx.lock().await;
152 let tx = guarded_tx.as_ref().unwrap();
153 let mut values = vec![];
154 let cf = &self.get_column_family(cf).unwrap();
155 for key in keys.iter() {
156 let value = tx.get_cf(cf, key).unwrap();
157 values.push(value);
158 }
159 Ok(values)
160 }
161 async fn set<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
163 where
164 K: Into<Key> + Send,
165 V: Into<Key> + Send,
166 {
167 if self.closed() {
168 return Err(Error::TxFinished);
169 }
170
171 if !self.writable {
173 return Err(Error::TxReadonly);
174 }
175
176 let guarded_tx = self.tx.lock().await;
178 let tx = guarded_tx.as_ref().unwrap();
179 let cf = &self.get_column_family(cf).unwrap();
180 tx.put_cf(cf, key.into(), val.into())?;
181 Ok(())
182 }
183
184 async fn put<K, V>(&mut self, cf: CF, key: K, val: V) -> Result<(), Error>
186 where
187 K: Into<Key> + Send,
188 V: Into<Key> + Send,
189 {
190 if self.closed() {
191 return Err(Error::TxFinished);
192 }
193
194 if !self.writable {
196 return Err(Error::TxReadonly);
197 }
198
199 let guarded_tx = self.tx.lock().await;
201 let tx = guarded_tx.as_ref().unwrap();
202 let (key, val) = (key.into(), val.into());
203
204 let cf = &self.get_column_family(cf).unwrap();
205 match tx.get_cf(cf, &key)? {
206 None => tx.put_cf(cf, key, val)?,
207 _ => return Err(Error::TxConditionNotMet),
208 };
209 Ok(())
210 }
211
212 async fn del<K>(&mut self, cf: CF, key: K) -> Result<(), Error>
214 where
215 K: Into<Key> + Send,
216 {
217 if self.closed() {
218 return Err(Error::TxFinished);
219 }
220
221 if !self.writable {
223 return Err(Error::TxReadonly);
224 }
225
226 let key = key.into();
227 let guarded_tx = self.tx.lock().await;
228 let tx = guarded_tx.as_ref().unwrap();
229
230 let cf = &self.get_column_family(cf).unwrap();
231 match tx.get_cf(cf, &key)? {
232 Some(_v) => tx.delete_cf(cf, key)?,
233 None => return Err(Error::TxnKeyNotFound),
234 };
235
236 Ok(())
237 }
238
239 async fn iterate(&self, cf: CF) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
241 if self.closed() {
242 return Err(Error::TxFinished);
243 }
244
245 let guarded_tx = self.tx.lock().await;
246 let tx = guarded_tx.as_ref().unwrap();
247
248 let cf = &self.get_column_family(cf).unwrap();
249
250 let iterator = tx.iterator_cf(cf, IteratorMode::Start);
251
252 Ok(iterator
253 .map(|pair| {
254 let (k, v) = pair.unwrap();
255 Ok((k.to_vec(), v.to_vec()))
256 })
257 .collect())
258 }
259
260 async fn suffix_iterate<S>(
261 &self,
262 cf: CF,
263 suffix: S,
264 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
265 where
266 S: Into<Key> + Send,
267 {
268 if self.closed() {
269 return Err(Error::TxFinished);
270 }
271
272 let guarded_tx = self.tx.lock().await;
273 let tx = guarded_tx.as_ref().unwrap();
274 let cf = &self.get_column_family(cf).unwrap();
275 let suffix: Key = suffix.into();
276
277 let iterator = tx.iterator_cf(cf, IteratorMode::Start);
278 let taken_iterator = take_with_suffix(iterator, suffix);
279
280 Ok(taken_iterator
281 .map(|pair| {
282 let (k, v) = pair.unwrap();
283 Ok((k.to_vec(), v.to_vec()))
284 })
285 .collect())
286 }
287
288 async fn prefix_iterate<P>(
290 &self,
291 cf: CF,
292 prefix: P,
293 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
294 where
295 P: Into<Key> + Send,
296 {
297 if self.closed() {
298 return Err(Error::TxFinished);
299 }
300
301 let guarded_tx = self.tx.lock().await;
302 let tx = guarded_tx.as_ref().unwrap();
303 let cf = &self.get_column_family(cf).unwrap();
304 let prefix: Key = prefix.into();
305 let iterator = tx.prefix_iterator_cf(cf, &prefix);
306 let taken_iterator = take_with_prefix(iterator, prefix);
307
308 Ok(taken_iterator
309 .map(|v| {
310 let (k, v) = v.unwrap();
311 Ok((k.to_vec(), v.to_vec()))
312 })
313 .collect())
314 }
315}