async_skipdb/serializable/
serializable.rs1use async_txn::{error::WtmError, PwmComparableRange};
2use skipdb_core::rev_range::WriteTransactionRevRange;
3
4use std::{convert::Infallible, future::Future, ops::Bound};
5
6use super::*;
7
8#[cfg(all(test, any(feature = "tokio", feature = "smol", feature = "async-std")))]
9mod tests;
10
11pub struct SerializableTransaction<K, V, S: AsyncSpawner> {
13 pub(super) db: SerializableDb<K, V, S>,
14 pub(super) wtm: AsyncWtm<K, V, BTreeCm<K>, BTreePwm<K, V>, S>,
15}
16
17impl<K, V, S> SerializableTransaction<K, V, S>
18where
19 K: CheapClone + Ord,
20 S: AsyncSpawner,
21{
22 #[inline]
23 pub(super) async fn new(db: SerializableDb<K, V, S>) -> Self {
24 let wtm = db
25 .inner
26 .tm
27 .write_with_blocking_cm_and_pwm((), ())
28 .await
29 .unwrap();
30 Self { db, wtm }
31 }
32}
33
34impl<K, V, S> SerializableTransaction<K, V, S>
35where
36 K: CheapClone + Ord + Send + Sync + 'static,
37 V: Send + Sync + 'static,
38 S: AsyncSpawner,
39{
40 #[inline]
56 pub async fn commit(&mut self) -> Result<(), WtmError<Infallible, Infallible, Infallible>> {
57 let db = self.db.clone();
58 self
59 .wtm
60 .commit(|ents| async move {
61 db.inner.map.apply(ents);
62 Ok(())
63 })
64 .await
65 }
66}
67
68impl<K, V, S> SerializableTransaction<K, V, S>
69where
70 K: CheapClone + Ord + Send + Sync + 'static,
71 V: Send + Sync + 'static,
72 S: AsyncSpawner,
73{
74 #[inline]
90 pub async fn commit_with_task<Fut, E, R>(
91 &mut self,
92 callback: impl FnOnce(Result<(), E>) -> Fut + Send + 'static,
93 ) -> Result<S::JoinHandle<R>, WtmError<Infallible, Infallible, E>>
94 where
95 E: std::error::Error + Send,
96 Fut: Future<Output = R> + Send + 'static,
97 R: Send + 'static,
98 {
99 let db = self.db.clone();
100
101 self
102 .wtm
103 .commit_with_task(
104 move |ents| async move {
105 db.inner.map.apply(ents);
106 Ok(())
107 },
108 callback,
109 )
110 .await
111 }
112}
113
114impl<K, V, S> SerializableTransaction<K, V, S>
115where
116 K: CheapClone + Ord,
117 S: AsyncSpawner,
118{
119 #[inline]
121 pub fn version(&self) -> u64 {
122 self.wtm.version()
123 }
124
125 #[inline]
127 pub fn rollback(&mut self) -> Result<(), TransactionError<Infallible, Infallible>> {
128 self.wtm.rollback_blocking()
129 }
130
131 #[inline]
133 pub fn contains_key(
134 &mut self,
135 key: &K,
136 ) -> Result<bool, TransactionError<Infallible, Infallible>> {
137 let version = self.wtm.version();
138 match self.wtm.contains_key_blocking(key)? {
139 Some(true) => Ok(true),
140 Some(false) => Ok(false),
141 None => Ok(self.db.inner.map.contains_key(key, version)),
142 }
143 }
144
145 #[inline]
147 pub fn get<'a, 'b: 'a>(
148 &'a mut self,
149 key: &'b K,
150 ) -> Result<Option<Ref<'a, K, V>>, TransactionError<Infallible, Infallible>> {
151 let version = self.wtm.version();
152 match self.wtm.get_blocking(key)? {
153 Some(v) => {
154 if v.value().is_some() {
155 Ok(Some(v.into()))
156 } else {
157 Ok(None)
158 }
159 }
160 None => Ok(self.db.inner.map.get(key, version).map(Into::into)),
161 }
162 }
163
164 #[inline]
166 pub fn insert(
167 &mut self,
168 key: K,
169 value: V,
170 ) -> Result<(), TransactionError<Infallible, Infallible>> {
171 self.wtm.insert_blocking(key, value)
172 }
173
174 #[inline]
176 pub fn remove(&mut self, key: K) -> Result<(), TransactionError<Infallible, Infallible>> {
177 self.wtm.remove_blocking(key)
178 }
179
180 #[inline]
182 pub fn iter(
183 &mut self,
184 ) -> Result<TransactionIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>> {
185 let version = self.wtm.version();
186 let (mut marker, pm) = self
187 .wtm
188 .blocking_marker_with_pm()
189 .ok_or(TransactionError::Discard)?;
190
191 let start: Bound<K> = Bound::Unbounded;
192 let end: Bound<K> = Bound::Unbounded;
193 marker.mark_range((start, end));
194 let committed = self.db.inner.map.iter(version);
195 let pendings = pm.iter();
196
197 Ok(TransactionIter::new(pendings, committed, None))
198 }
199
200 #[inline]
202 pub fn iter_rev(
203 &mut self,
204 ) -> Result<WriteTransactionRevIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
205 {
206 let version = self.wtm.version();
207 let (mut marker, pm) = self
208 .wtm
209 .blocking_marker_with_pm()
210 .ok_or(TransactionError::Discard)?;
211
212 let start: Bound<K> = Bound::Unbounded;
213 let end: Bound<K> = Bound::Unbounded;
214 marker.mark_range((start, end));
215 let committed = self.db.inner.map.iter_rev(version);
216 let pendings = pm.iter().rev();
217
218 Ok(WriteTransactionRevIter::new(pendings, committed, None))
219 }
220
221 #[inline]
223 pub fn range<'a, R>(
224 &'a mut self,
225 range: R,
226 ) -> Result<TransactionRange<'a, K, R, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
227 where
228 R: RangeBounds<K> + 'a,
229 {
230 let version = self.wtm.version();
231 let (mut marker, pm) = self
232 .wtm
233 .blocking_marker_with_pm()
234 .ok_or(TransactionError::Discard)?;
235 let start = range.start_bound();
236 let end = range.end_bound();
237 marker.mark_range((start, end));
238 let pendings = pm.range_comparable((start, end));
239 let committed = self.db.inner.map.range(range, version);
240
241 Ok(TransactionRange::new(pendings, committed, None))
242 }
243
244 #[inline]
246 pub fn range_rev<'a, R>(
247 &'a mut self,
248 range: R,
249 ) -> Result<
250 WriteTransactionRevRange<'a, K, R, K, V, BTreeCm<K>>,
251 TransactionError<Infallible, Infallible>,
252 >
253 where
254 R: RangeBounds<K> + 'a,
255 {
256 let version = self.wtm.version();
257 let (mut marker, pm) = self
258 .wtm
259 .blocking_marker_with_pm()
260 .ok_or(TransactionError::Discard)?;
261 let start = range.start_bound();
262 let end = range.end_bound();
263 marker.mark_range((start, end));
264 let pendings = pm.range_comparable((start, end)).rev();
265 let committed = self.db.inner.map.range_rev(range, version);
266
267 Ok(WriteTransactionRevRange::new(
268 pendings,
269 committed,
270 Some(marker),
271 ))
272 }
273}