1#[allow(unused_imports)] use super::api::Transaction;
3use super::Key;
4use super::KeyEncode;
5use super::Val;
6use super::Version;
7use crate::cf;
8use crate::cnf::NORMAL_FETCH_SIZE;
9use crate::dbs::node::Timestamp;
10use crate::doc::CursorValue;
11use crate::err::Error;
12use crate::idg::u32::U32;
13use crate::key::debug::Sprintable;
14use crate::kvs::batch::Batch;
15use crate::kvs::clock::SizedClock;
16#[cfg(any(
17 feature = "kv-tikv",
18 feature = "kv-fdb",
19 feature = "kv-indxdb",
20 feature = "kv-surrealcs",
21))]
22use crate::kvs::savepoint::SavePointImpl;
23use crate::kvs::stash::Stash;
24use crate::kvs::KeyDecode as _;
25use crate::sql;
26use crate::sql::thing::Thing;
27use crate::vs::VersionStamp;
28use sql::statements::DefineTableStatement;
29use std::fmt;
30use std::fmt::Debug;
31use std::ops::Range;
32use std::sync::Arc;
33
34const TARGET: &str = "surrealdb::core::kvs::tr";
35
36#[derive(Debug, Default)]
38pub enum Check {
39 #[default]
40 None,
41 Warn,
42 Error,
43}
44
45#[derive(Copy, Clone)]
47pub enum TransactionType {
48 Read,
49 Write,
50}
51
52impl From<bool> for TransactionType {
53 fn from(value: bool) -> Self {
54 match value {
55 true => TransactionType::Write,
56 false => TransactionType::Read,
57 }
58 }
59}
60
61#[derive(Copy, Clone)]
63pub enum LockType {
64 Pessimistic,
65 Optimistic,
66}
67
68impl From<bool> for LockType {
69 fn from(value: bool) -> Self {
70 match value {
71 true => LockType::Pessimistic,
72 false => LockType::Optimistic,
73 }
74 }
75}
76
77#[allow(dead_code)]
79#[non_exhaustive]
80pub struct Transactor {
81 pub(super) inner: Inner,
82 pub(super) stash: Stash,
83 pub(super) cf: cf::Writer,
84 pub(super) clock: Arc<SizedClock>,
85}
86
87#[allow(clippy::large_enum_variant)]
88pub(super) enum Inner {
89 #[cfg(feature = "kv-mem")]
90 Mem(super::mem::Transaction),
91 #[cfg(feature = "kv-rocksdb")]
92 RocksDB(super::rocksdb::Transaction),
93 #[cfg(feature = "kv-indxdb")]
94 IndxDB(super::indxdb::Transaction),
95 #[cfg(feature = "kv-tikv")]
96 TiKV(super::tikv::Transaction),
97 #[cfg(feature = "kv-fdb")]
98 FoundationDB(super::fdb::Transaction),
99 #[cfg(feature = "kv-surrealkv")]
100 SurrealKV(super::surrealkv::Transaction),
101 #[cfg(feature = "kv-surrealcs")]
102 SurrealCS(super::surrealcs::Transaction),
103}
104
105impl fmt::Display for Transactor {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 #![allow(unused_variables)]
108 match &self.inner {
109 #[cfg(feature = "kv-mem")]
110 Inner::Mem(_) => write!(f, "memory"),
111 #[cfg(feature = "kv-rocksdb")]
112 Inner::RocksDB(_) => write!(f, "rocksdb"),
113 #[cfg(feature = "kv-indxdb")]
114 Inner::IndxDB(_) => write!(f, "indxdb"),
115 #[cfg(feature = "kv-tikv")]
116 Inner::TiKV(_) => write!(f, "tikv"),
117 #[cfg(feature = "kv-fdb")]
118 Inner::FoundationDB(_) => write!(f, "fdb"),
119 #[cfg(feature = "kv-surrealkv")]
120 Inner::SurrealKV(_) => write!(f, "surrealkv"),
121 #[cfg(feature = "kv-surrealcs")]
122 Inner::SurrealCS(_) => write!(f, "surrealcs"),
123 #[allow(unreachable_patterns)]
124 _ => unreachable!(),
125 }
126 }
127}
128
129macro_rules! expand_inner {
130 ( $v:expr, $arm:pat_param => $b:block ) => {
131 match $v {
132 #[cfg(feature = "kv-mem")]
133 Inner::Mem($arm) => $b,
134 #[cfg(feature = "kv-rocksdb")]
135 Inner::RocksDB($arm) => $b,
136 #[cfg(feature = "kv-indxdb")]
137 Inner::IndxDB($arm) => $b,
138 #[cfg(feature = "kv-tikv")]
139 Inner::TiKV($arm) => $b,
140 #[cfg(feature = "kv-fdb")]
141 Inner::FoundationDB($arm) => $b,
142 #[cfg(feature = "kv-surrealkv")]
143 Inner::SurrealKV($arm) => $b,
144 #[cfg(feature = "kv-surrealcs")]
145 Inner::SurrealCS($arm) => $b,
146 #[allow(unreachable_patterns)]
147 _ => unreachable!(),
148 }
149 };
150}
151
152impl Transactor {
153 #![cfg_attr(
155 not(any(
156 feature = "kv-mem",
157 feature = "kv-rocksdb",
158 feature = "kv-indxdb",
159 feature = "kv-tikv",
160 feature = "kv-fdb",
161 feature = "kv-surrealkv",
162 )),
163 allow(unused_variables)
164 )]
165 pub(crate) fn supports_reverse_scan(&self) -> bool {
170 expand_inner!(&self.inner, v => { v.supports_reverse_scan() })
171 }
172
173 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
182 pub(crate) fn check_level(&mut self, check: Check) {
183 expand_inner!(&mut self.inner, v => { v.check_level(check) })
184 }
185
186 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
193 pub async fn closed(&self) -> bool {
194 trace!(target: TARGET, "Closed");
195 expand_inner!(&self.inner, v => { v.closed() })
196 }
197
198 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
202 pub async fn cancel(&mut self) -> Result<(), Error> {
203 trace!(target: TARGET, "Cancel");
204 expand_inner!(&mut self.inner, v => { v.cancel().await })
205 }
206
207 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
211 pub async fn commit(&mut self) -> Result<(), Error> {
212 trace!(target: TARGET, "Commit");
213 expand_inner!(&mut self.inner, v => { v.commit().await })
214 }
215
216 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
218 pub async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
219 where
220 K: KeyEncode + Debug,
221 {
222 let key = key.encode_owned()?;
223 trace!(target: TARGET, key = key.sprint(), version = version, "Exists");
224 expand_inner!(&mut self.inner, v => { v.exists(key, version).await })
225 }
226
227 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
229 pub async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
230 where
231 K: KeyEncode + Debug,
232 {
233 let key = key.encode_owned()?;
234 trace!(target: TARGET, key = key.sprint(), version = version, "Get");
235 expand_inner!(&mut self.inner, v => { v.get(key, version).await })
236 }
237
238 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
240 pub async fn getm<K>(&mut self, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
241 where
242 K: KeyEncode + Debug,
243 {
244 let mut keys_encoded = Vec::new();
245 for k in keys {
246 keys_encoded.push(k.encode_owned()?);
247 }
248 trace!(target: TARGET, keys = keys_encoded.sprint(), "GetM");
249 expand_inner!(&mut self.inner, v => { v.getm(keys_encoded).await })
250 }
251
252 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
256 pub async fn getr<K>(
257 &mut self,
258 rng: Range<K>,
259 version: Option<u64>,
260 ) -> Result<Vec<(Key, Val)>, Error>
261 where
262 K: KeyEncode + Debug,
263 {
264 let beg: Key = rng.start.encode_owned()?;
265 let end: Key = rng.end.encode_owned()?;
266 let rng = beg.as_slice()..end.as_slice();
267 trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR");
268 expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
269 }
270
271 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
275 pub async fn getp<K>(&mut self, key: K) -> Result<Vec<(Key, Val)>, Error>
276 where
277 K: KeyEncode + Debug,
278 {
279 let key = key.encode_owned()?;
280 trace!(target: TARGET, key = key.sprint(), "GetP");
281 expand_inner!(&mut self.inner, v => { v.getp(key).await })
282 }
283
284 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
286 pub async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
287 where
288 K: KeyEncode + Debug,
289 V: Into<Val> + Debug,
290 {
291 let key = key.encode_owned()?;
292 trace!(target: TARGET, key = key.sprint(), version = version, "Set");
293 expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
294 }
295
296 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
298 pub async fn replace<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
299 where
300 K: KeyEncode + Debug,
301 V: Into<Val> + Debug,
302 {
303 let key = key.encode_owned()?;
304 trace!(target: TARGET, key = key.sprint(), "Replace");
305 expand_inner!(&mut self.inner, v => { v.replace(key, val).await })
306 }
307
308 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
310 pub async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
311 where
312 K: KeyEncode + Debug,
313 V: Into<Val> + Debug,
314 {
315 let key = key.encode_owned()?;
316 trace!(target: TARGET, key = key.sprint(), version = version, "Put");
317 expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
318 }
319
320 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
322 pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
323 where
324 K: KeyEncode + Debug,
325 V: Into<Val> + Debug,
326 {
327 let key = key.encode_owned()?;
328 trace!(target: TARGET, key = key.sprint(), "PutC");
329 expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await })
330 }
331
332 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
334 pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
335 where
336 K: KeyEncode + Debug,
337 {
338 let key = key.encode_owned()?;
339 trace!(target: TARGET, key = key.sprint(), "Del");
340 expand_inner!(&mut self.inner, v => { v.del(key).await })
341 }
342
343 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
345 pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
346 where
347 K: KeyEncode + Debug,
348 V: Into<Val> + Debug,
349 {
350 let key = key.encode_owned()?;
351 trace!(target: TARGET, key = key.sprint(), "DelC");
352 expand_inner!(&mut self.inner, v => { v.delc(key, chk).await })
353 }
354
355 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
359 pub async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
360 where
361 K: KeyEncode + Debug,
362 {
363 let beg: Key = rng.start.encode_owned()?;
364 let end: Key = rng.end.encode_owned()?;
365 let rng = beg.as_slice()..end.as_slice();
366 trace!(target: TARGET, rng = rng.sprint(), "DelR");
367 expand_inner!(&mut self.inner, v => { v.delr(beg..end).await })
368 }
369
370 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
374 pub async fn delp<K>(&mut self, key: K) -> Result<(), Error>
375 where
376 K: KeyEncode + Debug,
377 {
378 let key = key.encode_owned()?;
379 trace!(target: TARGET, key = key.sprint(), "DelP");
380 expand_inner!(&mut self.inner, v => { v.delp(key).await })
381 }
382
383 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
385 pub async fn clr<K>(&mut self, key: K) -> Result<(), Error>
386 where
387 K: KeyEncode + Debug,
388 {
389 let key = key.encode_owned()?;
390 trace!(target: TARGET, key = key.sprint(), "Clr");
391 expand_inner!(&mut self.inner, v => { v.clr(key).await })
392 }
393
394 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
396 pub async fn clrc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
397 where
398 K: KeyEncode + Debug,
399 V: Into<Val> + Debug,
400 {
401 let key = key.encode_owned()?;
402 trace!(target: TARGET, key = key.sprint(), "ClrC");
403 expand_inner!(&mut self.inner, v => { v.clrc(key, chk).await })
404 }
405
406 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
410 pub async fn clrr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
411 where
412 K: KeyEncode + Debug,
413 {
414 let beg: Key = rng.start.encode_owned()?;
415 let end: Key = rng.end.encode_owned()?;
416 let rng = beg.as_slice()..end.as_slice();
417 trace!(target: TARGET, rng = rng.sprint(), "ClrR");
418 expand_inner!(&mut self.inner, v => { v.clrr(beg..end).await })
419 }
420
421 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
425 pub async fn clrp<K>(&mut self, key: K) -> Result<(), Error>
426 where
427 K: KeyEncode + Debug,
428 {
429 let key: Key = key.encode_owned()?;
430 trace!(target: TARGET, key = key.sprint(), "ClrP");
431 expand_inner!(&mut self.inner, v => { v.clrp(key).await })
432 }
433
434 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
438 pub async fn keys<K>(
439 &mut self,
440 rng: Range<K>,
441 limit: u32,
442 version: Option<u64>,
443 ) -> Result<Vec<Key>, Error>
444 where
445 K: KeyEncode + Debug,
446 {
447 let beg: Key = rng.start.encode_owned()?;
448 let end: Key = rng.end.encode_owned()?;
449 let rng = beg.as_slice()..end.as_slice();
450 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keys");
451 if beg > end {
452 return Ok(vec![]);
453 }
454 expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit, version).await })
455 }
456
457 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
461 pub async fn keysr<K>(
462 &mut self,
463 rng: Range<K>,
464 limit: u32,
465 version: Option<u64>,
466 ) -> Result<Vec<Key>, Error>
467 where
468 K: KeyEncode + Debug,
469 {
470 let beg: Key = rng.start.encode_owned()?;
471 let end: Key = rng.end.encode_owned()?;
472 let rng = beg.as_slice()..end.as_slice();
473 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keysr");
474 if beg > end {
475 return Ok(vec![]);
476 }
477 expand_inner!(&mut self.inner, v => { v.keysr(beg..end, limit, version).await })
478 }
479
480 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
484 pub async fn scan<K>(
485 &mut self,
486 rng: Range<K>,
487 limit: u32,
488 version: Option<u64>,
489 ) -> Result<Vec<(Key, Val)>, Error>
490 where
491 K: KeyEncode + Debug,
492 {
493 let beg: Key = rng.start.encode_owned()?;
494 let end: Key = rng.end.encode_owned()?;
495 let rng = beg.as_slice()..end.as_slice();
496 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan");
497 if beg > end {
498 return Ok(vec![]);
499 }
500 expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await })
501 }
502
503 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
504 pub async fn scanr<K>(
505 &mut self,
506 rng: Range<K>,
507 limit: u32,
508 version: Option<u64>,
509 ) -> Result<Vec<(Key, Val)>, Error>
510 where
511 K: Into<Key> + Debug,
512 {
513 let beg: Key = rng.start.into();
514 let end: Key = rng.end.into();
515 let rng = beg.as_slice()..end.as_slice();
516 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scanr");
517 if beg > end {
518 return Ok(vec![]);
519 }
520 expand_inner!(&mut self.inner, v => { v.scanr(beg..end, limit, version).await })
521 }
522
523 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
527 pub async fn batch_keys<K>(
528 &mut self,
529 rng: Range<K>,
530 batch: u32,
531 version: Option<u64>,
532 ) -> Result<Batch<Key>, Error>
533 where
534 K: KeyEncode + Debug,
535 {
536 let beg: Key = rng.start.encode_owned()?;
537 let end: Key = rng.end.encode_owned()?;
538 let rng = beg.as_slice()..end.as_slice();
539 trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
540 expand_inner!(&mut self.inner, v => { v.batch_keys(beg..end, batch, version).await })
541 }
542
543 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
547 pub async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
548 where
549 K: KeyEncode + Debug,
550 {
551 let beg: Key = rng.start.encode_owned()?;
552 let end: Key = rng.end.encode_owned()?;
553 let rng = beg.as_slice()..end.as_slice();
554 trace!(target: TARGET, rng = rng.sprint(), "Count");
555 expand_inner!(&mut self.inner, v => { v.count(beg..end).await })
556 }
557
558 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
562 pub async fn batch_keys_vals<K>(
563 &mut self,
564 rng: Range<K>,
565 batch: u32,
566 version: Option<u64>,
567 ) -> Result<Batch<(Key, Val)>, Error>
568 where
569 K: KeyEncode + Debug,
570 {
571 let beg: Key = rng.start.encode_owned()?;
572 let end: Key = rng.end.encode_owned()?;
573 let rng = beg.as_slice()..end.as_slice();
574 trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
575 expand_inner!(&mut self.inner, v => { v.batch_keys_vals(beg..end, batch, version).await })
576 }
577
578 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
582 pub async fn batch_keys_vals_versions<K>(
583 &mut self,
584 rng: Range<K>,
585 batch: u32,
586 ) -> Result<Batch<(Key, Val, Version, bool)>, Error>
587 where
588 K: KeyEncode + Debug,
589 {
590 let beg: Key = rng.start.encode_owned()?;
591 let end: Key = rng.end.encode_owned()?;
592 let rng = beg.as_slice()..end.as_slice();
593 trace!(target: TARGET, rng = rng.sprint(), "BatchVersions");
594 expand_inner!(&mut self.inner, v => { v.batch_keys_vals_versions(beg..end, batch).await })
595 }
596
597 pub async fn get_timestamp<K>(&mut self, key: K) -> Result<VersionStamp, Error>
603 where
604 K: KeyEncode + Debug,
605 {
606 let key = key.encode_owned()?;
607 expand_inner!(&mut self.inner, v => { v.get_timestamp(key).await })
608 }
609
610 pub async fn set_versionstamped<K, V>(
612 &mut self,
613 ts_key: K,
614 prefix: K,
615 suffix: K,
616 val: V,
617 ) -> Result<(), Error>
618 where
619 K: KeyEncode + Debug,
620 V: Into<Val> + Debug,
621 {
622 let ts_key = ts_key.encode_owned()?;
623 let prefix = prefix.encode_owned()?;
624 let suffix = suffix.encode_owned()?;
625 expand_inner!(&mut self.inner, v => { v.set_versionstamp(ts_key, prefix, suffix, val).await })
626 }
627
628 pub async fn clock(&self) -> Timestamp {
641 self.clock.now().await
642 }
643
644 #[allow(clippy::too_many_arguments)]
648 pub(crate) fn record_change(
649 &mut self,
650 ns: &str,
651 db: &str,
652 tb: &str,
653 id: &Thing,
654 previous: CursorValue,
655 current: CursorValue,
656 store_difference: bool,
657 ) {
658 self.cf.record_cf_change(ns, db, tb, id.clone(), previous, current, store_difference)
659 }
660
661 pub(crate) fn record_table_change(
663 &mut self,
664 ns: &str,
665 db: &str,
666 tb: &str,
667 dt: &DefineTableStatement,
668 ) {
669 self.cf.define_table(ns, db, tb, dt)
670 }
671
672 pub(crate) async fn get_idg(&mut self, key: &Key) -> Result<U32, Error> {
673 Ok(if let Some(v) = self.stash.get(key) {
674 v
675 } else {
676 let val = self.get(key.clone(), None).await?;
677 if let Some(val) = val {
678 U32::new(key.clone(), Some(val)).await?
679 } else {
680 U32::new(key.clone(), None).await?
681 }
682 })
683 }
684
685 pub(crate) async fn get_next_ns_id(&mut self) -> Result<u32, Error> {
687 let key = crate::key::root::ni::Ni::default().encode_owned()?;
688 let mut seq = self.get_idg(&key).await?;
689 let nid = seq.get_next_id();
690 self.stash.set(key, seq.clone());
691 let (k, v) = seq.finish().unwrap();
692 self.replace(k, v).await?;
693 Ok(nid)
694 }
695
696 pub(crate) async fn get_next_db_id(&mut self, ns: u32) -> Result<u32, Error> {
698 let key = crate::key::namespace::di::new(ns).encode_owned()?;
699 let mut seq = self.get_idg(&key).await?;
700 let nid = seq.get_next_id();
701 self.stash.set(key, seq.clone());
702 let (k, v) = seq.finish().unwrap();
703 self.replace(k, v).await?;
704 Ok(nid)
705 }
706
707 pub(crate) async fn get_next_tb_id(&mut self, ns: u32, db: u32) -> Result<u32, Error> {
709 let key = crate::key::database::ti::new(ns, db).encode_owned()?;
710 let mut seq = self.get_idg(&key).await?;
711 let nid = seq.get_next_id();
712 self.stash.set(key, seq.clone());
713 let (k, v) = seq.finish().unwrap();
714 self.replace(k, v).await?;
715 Ok(nid)
716 }
717
718 #[allow(unused)]
720 pub(crate) async fn remove_ns_id(&mut self, ns: u32) -> Result<(), Error> {
721 let key = crate::key::root::ni::Ni::default().encode_owned()?;
722 let mut seq = self.get_idg(&key).await?;
723 seq.remove_id(ns);
724 self.stash.set(key, seq.clone());
725 let (k, v) = seq.finish().unwrap();
726 self.replace(k, v).await?;
727 Ok(())
728 }
729
730 #[allow(unused)]
732 pub(crate) async fn remove_db_id(&mut self, ns: u32, db: u32) -> Result<(), Error> {
733 let key = crate::key::namespace::di::new(ns).encode_owned()?;
734 let mut seq = self.get_idg(&key).await?;
735 seq.remove_id(db);
736 self.stash.set(key, seq.clone());
737 let (k, v) = seq.finish().unwrap();
738 self.replace(k, v).await?;
739 Ok(())
740 }
741
742 #[allow(unused)]
744 pub(crate) async fn remove_tb_id(&mut self, ns: u32, db: u32, tb: u32) -> Result<(), Error> {
745 let key = crate::key::database::ti::new(ns, db).encode_owned()?;
746 let mut seq = self.get_idg(&key).await?;
747 seq.remove_id(tb);
748 self.stash.set(key, seq.clone());
749 let (k, v) = seq.finish().unwrap();
750 self.replace(k, v).await?;
751 Ok(())
752 }
753
754 pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> {
771 let changes = self.cf.get()?;
772 for (tskey, prefix, suffix, v) in changes {
773 self.set_versionstamped(tskey, prefix, suffix, v).await?
774 }
775 Ok(())
776 }
777
778 pub(crate) async fn set_timestamp_for_versionstamp(
781 &mut self,
782 ts: u64,
783 ns: &str,
784 db: &str,
785 ) -> Result<VersionStamp, Error> {
786 let key = crate::key::database::vs::new(ns, db);
789 let vst = self.get_timestamp(key).await?;
790 trace!(
791 target: TARGET,
792 "Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
793 ts,
794 vst.into_u64_lossy(),
795 ns,
796 db
797 );
798
799 let mut ts_key = crate::key::database::ts::new(ns, db, ts);
802 let begin = ts_key.encode()?;
803 let end = crate::key::database::ts::suffix(ns, db)?;
804 let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
805 let latest_ts_pair = ts_pairs.last();
806 if let Some((k, _)) = latest_ts_pair {
807 trace!(
808 target: TARGET,
809 "There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
810 ts,
811 ns,
812 db,
813 k.sprint()
814 );
815 let k = crate::key::database::ts::Ts::decode(k)?;
816 let latest_ts = k.ts;
817 if latest_ts >= ts {
818 warn!("ts {ts} is less than the latest ts {latest_ts}");
819 ts_key = crate::key::database::ts::new(ns, db, latest_ts + 1);
820 }
821 }
822 self.replace(ts_key, vst.as_bytes()).await?;
823 Ok(vst)
824 }
825
826 pub(crate) async fn get_versionstamp_from_timestamp(
827 &mut self,
828 ts: u64,
829 ns: &str,
830 db: &str,
831 ) -> Result<Option<VersionStamp>, Error> {
832 let start = crate::key::database::ts::prefix(ns, db)?;
833 let ts_key = crate::key::database::ts::new(ns, db, ts + 1).encode_owned()?;
834 let end = ts_key.encode_owned()?;
835 let ts = if self.supports_reverse_scan() {
836 self.scanr(start..end, 1, None).await?.pop().map(|x| x.1)
837 } else {
838 let mut batch = self.batch_keys(start..end, *NORMAL_FETCH_SIZE, None).await?;
841 let mut last = batch.result.pop();
842 while let Some(next) = batch.next {
843 yield_now!();
845 batch = self.batch_keys(next, *NORMAL_FETCH_SIZE, None).await?;
846 last = batch.result.pop();
847 }
848 if let Some(last) = last {
849 self.get(last, None).await?
850 } else {
851 None
852 }
853 };
854 if let Some(v) = ts {
855 return Ok(Some(VersionStamp::from_slice(&v)?));
856 }
857 Ok(None)
858 }
859
860 pub(crate) async fn new_save_point(&mut self) {
861 expand_inner!(&mut self.inner, v => { v.new_save_point() })
862 }
863
864 pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
865 expand_inner!(&mut self.inner, v => { v.rollback_to_save_point().await })
866 }
867
868 pub(crate) async fn release_last_save_point(&mut self) -> Result<(), Error> {
869 expand_inner!(&mut self.inner, v => { v.release_last_save_point() })
870 }
871}