db/storage/kvs/sled/
tx.rs1use async_trait::async_trait;
2use sled::{IVec, Iter};
3
4use crate::{
5 interface::{Key, KeyValuePair, Val},
6 DBTransaction, Error, SimpleTransaction, TagBucket,
7};
8
9use super::ty::{DBType, TxType};
10
11fn filter_with_prefix(iterator: Iter, prefix: Vec<u8>) -> impl Iterator<Item = (IVec, IVec)> {
12 iterator
13 .filter(move |item| -> bool {
14 if let Ok((k, _)) = item.clone() {
15 return k.starts_with(&prefix);
16 }
17 false
18 })
19 .map(move |item| item.unwrap())
20}
21
22fn filter_with_suffix(iterator: Iter, prefix: Vec<u8>) -> impl Iterator<Item = (IVec, IVec)> {
23 iterator
24 .filter(move |item| -> bool {
25 if let Ok((k, _)) = item.clone() {
26 return k.ends_with(&prefix);
27 };
28 false
29 })
30 .map(move |item| item.unwrap())
31}
32
33#[async_trait(?Send)]
34impl SimpleTransaction for DBTransaction<DBType, TxType> {
35 fn closed(&self) -> bool {
36 self.ok
37 }
38
39 async fn count(&mut self, tags: TagBucket) -> Result<usize, Error> {
40 if self.closed() {
41 return Err(Error::TxFinished);
42 }
43
44 let db = &self._db;
45 let tree_name = tags.get("tree");
46 let result = if let Some(t) = tree_name {
47 let tree = db.open_tree(t).unwrap();
48 tree.len()
49 } else {
50 db.len()
51 };
52 Ok(result)
53 }
54
55 async fn cancel(&mut self) -> Result<(), Error> {
56 if self.ok {
57 return Err(Error::TxFinished);
58 }
59
60 self.ok = true;
61
62 Ok(())
63 }
64
65 async fn commit(&mut self) -> Result<(), Error> {
66 if self.closed() {
67 return Err(Error::TxFinished);
68 }
69
70 if !self.writable {
72 return Err(Error::TxReadonly);
73 }
74
75 self.ok = true;
77
78 Ok(())
79 }
80
81 async fn exi<K>(&self, key: K, tags: TagBucket) -> Result<bool, Error>
82 where
83 K: Into<Key> + Send,
84 {
85 if self.closed() {
86 return Err(Error::TxFinished);
87 }
88
89 let db = &self._db;
90 let tree_name = tags.get("tree");
91 let key = key.into();
92 let k = key.as_slice();
93 let result = if let Some(t) = tree_name {
94 let tree = db.open_tree(t).unwrap();
95 tree.contains_key(k)
96 } else {
97 db.contains_key(k)
98 };
99 Ok(result.unwrap())
100 }
101 async fn get<K>(&self, key: K, tags: TagBucket) -> Result<Option<Val>, Error>
103 where
104 K: Into<Key> + Send,
105 {
106 if self.closed() {
107 return Err(Error::TxFinished);
108 }
109
110 let db = &self._db;
111 let tree_name = tags.get("tree");
112 let key = key.into();
113 let k = key.as_slice();
114 let result = if let Some(t) = tree_name {
115 let tree = db.open_tree(t).unwrap();
116 tree.get(k)
117 } else {
118 db.get(k)
119 };
120 Ok(result.unwrap().map(|v| v.to_vec()))
121 }
122 async fn set<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
124 where
125 K: Into<Key> + Send,
126 V: Into<Key> + Send,
127 {
128 if self.closed() {
129 return Err(Error::TxFinished);
130 }
131
132 if !self.writable {
133 return Err(Error::TxReadonly);
134 }
135
136 let db = &self._db;
137 let tree_name = tags.get("tree");
138 let key = key.into();
139 let k = key.as_slice();
140 let v = val.into();
141 if let Some(t) = tree_name {
142 let tree = db.open_tree(t).unwrap();
143 tree.insert(k, v).unwrap()
144 } else {
145 db.insert(k, v).unwrap()
146 };
147 Ok(())
148 }
149
150 async fn put<K, V>(&mut self, key: K, val: V, tags: TagBucket) -> Result<(), Error>
152 where
153 K: Into<Key> + Send,
154 V: Into<Key> + Send,
155 {
156 if self.closed() {
157 return Err(Error::TxFinished);
158 }
159
160 if !self.writable {
162 return Err(Error::TxReadonly);
163 }
164
165 let db = &self._db;
166 let tree_name = tags.get("tree");
167 let key = key.into();
168 let k = key.as_slice();
169 let v = val.into();
170 let is_exists = if let Some(t) = tree_name.clone() {
171 let tree = db.open_tree(t).unwrap();
172 tree.contains_key(k)
173 } else {
174 db.contains_key(k)
175 }
176 .unwrap();
177
178 match is_exists {
179 false => if let Some(t) = tree_name {
180 let tree = db.open_tree(t).unwrap();
181 tree.insert(k, v)
182 } else {
183 db.insert(k, v)
184 }
185 .unwrap(),
186 _ => return Err(Error::TxConditionNotMet),
187 };
188
189 Ok(())
190 }
191
192 async fn del<K>(&mut self, key: K, tags: TagBucket) -> Result<(), Error>
194 where
195 K: Into<Key> + Send,
196 {
197 if self.closed() {
198 return Err(Error::TxFinished);
199 }
200
201 if !self.writable {
203 return Err(Error::TxReadonly);
204 }
205
206 let db = &self._db;
207 let tree_name = tags.get("tree");
208 let key = key.into();
209 let k = key.as_slice();
210 if let Some(t) = tree_name {
211 let tree = db.open_tree(t).unwrap();
212 tree.remove(k)
213 } else {
214 db.remove(k)
215 }
216 .unwrap();
217
218 Ok(())
219 }
220
221 async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<KeyValuePair, Error>>, Error> {
222 if self.closed() {
223 return Err(Error::TxFinished);
224 }
225
226 let db = &self._db;
227 let tree_name = tags.get("tree");
228 let iter = if let Some(t) = tree_name {
229 let tree = db.open_tree(t).unwrap();
230 tree.iter()
231 } else {
232 db.iter()
233 };
234
235 Ok(iter
236 .map(|p| {
237 let (k, v) = p.unwrap();
238 Ok((k.to_vec(), v.to_vec()))
239 })
240 .collect())
241 }
242
243 async fn prefix_iterate<P>(
244 &self,
245 prefix: P,
246 tags: TagBucket,
247 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
248 where
249 P: Into<Key> + Send,
250 {
251 if self.closed() {
252 return Err(Error::TxFinished);
253 }
254
255 let db = &self._db;
256 let tree_name = tags.get("tree");
257 let iter = if let Some(t) = tree_name {
258 let tree = db.open_tree(t).unwrap();
259 tree.iter()
260 } else {
261 db.iter()
262 };
263
264 let prefix: Key = prefix.into();
265 let filtered_iterator = filter_with_prefix(iter, prefix);
266
267 Ok(filtered_iterator
268 .map(|pair| {
269 let (k, v) = pair;
270 Ok((k.to_vec(), v.to_vec()))
271 })
272 .collect())
273 }
274
275 async fn suffix_iterate<S>(
276 &self,
277 suffix: S,
278 tags: TagBucket,
279 ) -> Result<Vec<Result<KeyValuePair, Error>>, Error>
280 where
281 S: Into<Key> + Send,
282 {
283 if self.closed() {
284 return Err(Error::TxFinished);
285 }
286
287 let db = &self._db;
288 let tree_name = tags.get("tree");
289 let iter = if let Some(t) = tree_name {
290 let tree = db.open_tree(t).unwrap();
291 tree.iter()
292 } else {
293 db.iter()
294 };
295
296 let suffix: Key = suffix.into();
297 let filtered_iterator = filter_with_suffix(iter, suffix);
298
299 Ok(filtered_iterator
300 .map(|pair| {
301 let (k, v) = pair;
302 Ok((k.to_vec(), v.to_vec()))
303 })
304 .collect())
305 }
306}