1use std::fmt;
2use std::fmt::Debug;
3use std::ops::Range;
4
5use futures::stream::Stream;
6
7use super::api::{ScanLimit, Transactable};
8use super::batch::Batch;
9use super::scanner::{Direction, Scanner};
10use super::{IntoBytes, Key, Result, Val};
11use crate::kvs::timestamp::{BoxTimeStamp, BoxTimeStampImpl};
12
13#[derive(Copy, Clone, Eq, PartialEq)]
15pub enum TransactionType {
16 Read,
17 Write,
18}
19
20#[derive(Copy, Clone)]
22pub enum LockType {
23 Pessimistic,
24 Optimistic,
25}
26
27impl From<bool> for LockType {
28 fn from(value: bool) -> Self {
29 match value {
30 true => LockType::Pessimistic,
31 false => LockType::Optimistic,
32 }
33 }
34}
35
36pub struct Transactor {
38 pub(super) inner: Box<dyn Transactable>,
40}
41
42impl fmt::Display for Transactor {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "{}", self.kind())
45 }
46}
47
48impl Drop for Transactor {
49 fn drop(&mut self) {
50 if !self.closed() && self.writeable() {
51 #[cfg(test)]
53 warn!("A transaction was dropped without being committed or cancelled");
54 #[cfg(not(test))]
56 error!("A transaction was dropped without being committed or cancelled");
57 }
58 }
59}
60
61impl Transactor {
62 pub(super) fn kind(&self) -> &'static str {
64 self.inner.kind()
65 }
66
67 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
74 pub fn closed(&self) -> bool {
75 self.inner.closed()
76 }
77
78 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
87 pub fn writeable(&self) -> bool {
88 self.inner.writeable()
89 }
90
91 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
95 pub async fn cancel(&self) -> Result<()> {
96 self.inner.cancel().await
97 }
98
99 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
103 pub async fn commit(&self) -> Result<()> {
104 self.inner.commit().await
105 }
106
107 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
109 pub async fn exists<K>(&self, key: K, version: Option<u64>) -> Result<bool>
110 where
111 K: IntoBytes + Debug,
112 {
113 let key = key.into_vec();
114 self.inner.exists(key, version).await
115 }
116
117 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
119 pub async fn get<K>(&self, key: K, version: Option<u64>) -> Result<Option<Val>>
120 where
121 K: IntoBytes + Debug,
122 {
123 let key = key.into_vec();
124 self.inner.get(key, version).await
125 }
126
127 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
129 pub async fn getm<K>(&self, keys: Vec<K>, version: Option<u64>) -> Result<Vec<Option<Val>>>
130 where
131 K: IntoBytes + Debug,
132 {
133 let keys = keys.into_iter().map(IntoBytes::into_vec).collect();
134 self.inner.getm(keys, version).await
135 }
136
137 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
142 pub async fn getp<K>(&self, key: K) -> Result<Vec<(Key, Val)>>
143 where
144 K: IntoBytes + Debug,
145 {
146 let key = key.into_vec();
147 self.inner.getp(key).await
148 }
149
150 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
155 pub async fn getr<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<Vec<(Key, Val)>>
156 where
157 K: IntoBytes + Debug,
158 {
159 let beg = rng.start.into_vec();
160 let end = rng.end.into_vec();
161 self.inner.getr(beg..end, version).await
162 }
163
164 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
166 pub async fn set<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<()>
167 where
168 K: IntoBytes + Debug,
169 V: IntoBytes + Debug,
170 {
171 let key = key.into_vec();
172 let val = val.into_vec();
173 self.inner.set(key, val, version).await
174 }
175
176 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
178 pub async fn replace<K, V>(&self, key: K, val: V) -> Result<()>
179 where
180 K: IntoBytes + Debug,
181 V: IntoBytes + Debug,
182 {
183 let key = key.into_vec();
184 let val = val.into_vec();
185 self.inner.replace(key, val).await
186 }
187
188 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
190 pub async fn put<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<()>
191 where
192 K: IntoBytes + Debug,
193 V: IntoBytes + Debug,
194 {
195 let key = key.into_vec();
196 let val = val.into_vec();
197 self.inner.put(key, val, version).await
198 }
199
200 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
202 pub async fn putc<K, V>(&self, key: K, val: V, chk: Option<V>) -> Result<()>
203 where
204 K: IntoBytes + Debug,
205 V: IntoBytes + Debug,
206 {
207 let key = key.into_vec();
208 let val = val.into_vec();
209 let chk = chk.map(|v| v.into_vec());
210 self.inner.putc(key, val, chk).await
211 }
212
213 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
215 pub async fn del<K>(&self, key: K) -> Result<()>
216 where
217 K: IntoBytes + Debug,
218 {
219 let key = key.into_vec();
220 self.inner.del(key).await
221 }
222
223 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
226 pub async fn delc<K, V>(&self, key: K, chk: Option<V>) -> Result<()>
227 where
228 K: IntoBytes + Debug,
229 V: IntoBytes + Debug,
230 {
231 let key = key.into_vec();
232 let chk = chk.map(|v| v.into_vec());
233 self.inner.delc(key, chk).await
234 }
235
236 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
241 pub async fn delp<K>(&self, key: K) -> Result<()>
242 where
243 K: IntoBytes + Debug,
244 {
245 let key = key.into_vec();
246 self.inner.delp(key).await
247 }
248
249 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
254 pub async fn delr<K>(&self, rng: Range<K>) -> Result<()>
255 where
256 K: IntoBytes + Debug,
257 {
258 let beg = rng.start.into_vec();
259 let end = rng.end.into_vec();
260 self.inner.delr(beg..end).await
261 }
262
263 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
265 pub async fn clr<K>(&self, key: K) -> Result<()>
266 where
267 K: IntoBytes + Debug,
268 {
269 let key = key.into_vec();
270 self.inner.clr(key).await
271 }
272
273 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
276 pub async fn clrc<K, V>(&self, key: K, chk: Option<V>) -> Result<()>
277 where
278 K: IntoBytes + Debug,
279 V: IntoBytes + Debug,
280 {
281 let key = key.into_vec();
282 let chk = chk.map(|v| v.into_vec());
283 self.inner.clrc(key, chk).await
284 }
285
286 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
291 pub async fn clrp<K>(&self, key: K) -> Result<()>
292 where
293 K: IntoBytes + Debug,
294 {
295 let key = key.into_vec();
296 self.inner.clrp(key).await
297 }
298
299 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
304 pub async fn clrr<K>(&self, rng: Range<K>) -> Result<()>
305 where
306 K: IntoBytes + Debug,
307 {
308 let beg = rng.start.into_vec();
309 let end = rng.end.into_vec();
310 self.inner.clrr(beg..end).await
311 }
312
313 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
322 pub async fn keys<K>(
323 &self,
324 rng: Range<K>,
325 limit: ScanLimit,
326 skip: u32,
327 version: Option<u64>,
328 ) -> Result<Vec<Key>>
329 where
330 K: IntoBytes + Debug,
331 {
332 let beg = rng.start.into_vec();
333 let end = rng.end.into_vec();
334 if beg > end {
335 return Ok(vec![]);
336 }
337 self.inner.keys(beg..end, limit, skip, version).await
338 }
339
340 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
345 pub async fn keysr<K>(
346 &self,
347 rng: Range<K>,
348 limit: ScanLimit,
349 skip: u32,
350 version: Option<u64>,
351 ) -> Result<Vec<Key>>
352 where
353 K: IntoBytes + Debug,
354 {
355 let beg = rng.start.into_vec();
356 let end = rng.end.into_vec();
357 if beg > end {
358 return Ok(vec![]);
359 }
360 self.inner.keysr(beg..end, limit, skip, version).await
361 }
362
363 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
368 pub async fn scan<K>(
369 &self,
370 rng: Range<K>,
371 limit: ScanLimit,
372 skip: u32,
373 version: Option<u64>,
374 ) -> Result<Vec<(Key, Val)>>
375 where
376 K: IntoBytes + Debug,
377 {
378 let beg = rng.start.into_vec();
379 let end = rng.end.into_vec();
380 if beg > end {
381 return Ok(vec![]);
382 }
383 self.inner.scan(beg..end, limit, skip, version).await
384 }
385
386 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
391 pub async fn scanr<K>(
392 &self,
393 rng: Range<K>,
394 limit: ScanLimit,
395 skip: u32,
396 version: Option<u64>,
397 ) -> Result<Vec<(Key, Val)>>
398 where
399 K: IntoBytes + Debug,
400 {
401 let beg = rng.start.into_vec();
402 let end = rng.end.into_vec();
403 if beg > end {
404 return Ok(vec![]);
405 }
406 self.inner.scanr(beg..end, limit, skip, version).await
407 }
408
409 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
414 pub async fn count<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<usize>
415 where
416 K: IntoBytes + Debug,
417 {
418 let beg = rng.start.into_vec();
419 let end = rng.end.into_vec();
420 self.inner.count(beg..end, version).await
421 }
422
423 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
432 pub async fn batch_keys<K>(
433 &self,
434 rng: Range<K>,
435 batch: u32,
436 version: Option<u64>,
437 ) -> Result<Batch<Key>>
438 where
439 K: IntoBytes + Debug,
440 {
441 let beg = rng.start.into_vec();
442 let end = rng.end.into_vec();
443 self.inner.batch_keys(beg..end, batch, version).await
444 }
445
446 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
451 pub async fn batch_keys_vals<K>(
452 &self,
453 rng: Range<K>,
454 batch: u32,
455 version: Option<u64>,
456 ) -> Result<Batch<(Key, Val)>>
457 where
458 K: IntoBytes + Debug,
459 {
460 let beg = rng.start.into_vec();
461 let end = rng.end.into_vec();
462 self.inner.batch_keys_vals(beg..end, batch, version).await
463 }
464
465 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
476 pub fn stream_keys<K>(
477 &self,
478 rng: Range<K>,
479 version: Option<u64>,
480 limit: Option<usize>,
481 skip: u32,
482 dir: Direction,
483 ) -> impl Stream<Item = Result<Vec<Key>>> + '_
484 where
485 K: IntoBytes + Debug,
486 {
487 let beg = rng.start.into_vec();
488 let end = rng.end.into_vec();
489 let mut scanner = Scanner::<Key>::new(self, beg..end, limit, dir);
490 if let Some(v) = version {
492 scanner = scanner.version(v);
493 }
494 if skip > 0 {
496 scanner = scanner.skip(skip);
497 }
498 scanner
500 }
501
502 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
510 pub fn stream_keys_vals<K>(
511 &self,
512 rng: Range<K>,
513 version: Option<u64>,
514 limit: Option<usize>,
515 skip: u32,
516 dir: Direction,
517 prefetch: bool,
518 ) -> impl Stream<Item = Result<Vec<(Key, Val)>>> + '_
519 where
520 K: IntoBytes + Debug,
521 {
522 let beg = rng.start.into_vec();
523 let end = rng.end.into_vec();
524 let mut scanner = Scanner::<(Key, Val)>::new(self, beg..end, limit, dir);
525 if let Some(v) = version {
527 scanner = scanner.version(v);
528 }
529 if skip > 0 {
531 scanner = scanner.skip(skip);
532 }
533 if prefetch {
537 scanner = scanner
538 .prefetch(true)
539 .initial_batch_size(ScanLimit::Count(*crate::cnf::NORMAL_FETCH_SIZE * 2));
540 }
541 scanner
543 }
544
545 pub async fn new_save_point(&self) -> Result<()> {
551 self.inner.new_save_point().await
552 }
553
554 pub async fn release_last_save_point(&self) -> Result<()> {
556 self.inner.release_last_save_point().await
557 }
558
559 pub async fn rollback_to_save_point(&self) -> Result<()> {
561 self.inner.rollback_to_save_point().await
562 }
563
564 pub async fn timestamp(&self) -> Result<BoxTimeStamp> {
570 self.inner.timestamp().await
571 }
572
573 pub fn timestamp_impl(&self) -> BoxTimeStampImpl {
575 self.inner.timestamp_impl()
576 }
577}