surrealdb_core/kvs/
tr.rs

1#[allow(unused_imports)] // not used when non of the storage backends are enabled.
2use super::api::Transaction;
3use super::Key;
4use super::KeyEncode;
5use super::Val;
6use super::Version;
7use crate::cf;
8use crate::cnf::NORMAL_FETCH_SIZE;
9use crate::dbs::node::Timestamp;
10use crate::doc::CursorValue;
11use crate::err::Error;
12use crate::idg::u32::U32;
13use crate::key::debug::Sprintable;
14use crate::kvs::batch::Batch;
15use crate::kvs::clock::SizedClock;
16#[cfg(any(
17	feature = "kv-tikv",
18	feature = "kv-fdb",
19	feature = "kv-indxdb",
20	feature = "kv-surrealcs",
21))]
22use crate::kvs::savepoint::SavePointImpl;
23use crate::kvs::stash::Stash;
24use crate::kvs::KeyDecode as _;
25use crate::sql;
26use crate::sql::thing::Thing;
27use crate::vs::VersionStamp;
28use sql::statements::DefineTableStatement;
29use std::fmt;
30use std::fmt::Debug;
31use std::ops::Range;
32use std::sync::Arc;
33
34const TARGET: &str = "surrealdb::core::kvs::tr";
35
36/// Used to determine the behaviour when a transaction is not closed correctly
37#[derive(Debug, Default)]
38pub enum Check {
39	#[default]
40	None,
41	Warn,
42	Error,
43}
44
45/// Specifies whether the transaction is read-only or writeable.
46#[derive(Copy, Clone)]
47pub enum TransactionType {
48	Read,
49	Write,
50}
51
52impl From<bool> for TransactionType {
53	fn from(value: bool) -> Self {
54		match value {
55			true => TransactionType::Write,
56			false => TransactionType::Read,
57		}
58	}
59}
60
61/// Specifies whether the transaction is optimistic or pessimistic.
62#[derive(Copy, Clone)]
63pub enum LockType {
64	Pessimistic,
65	Optimistic,
66}
67
68impl From<bool> for LockType {
69	fn from(value: bool) -> Self {
70		match value {
71			true => LockType::Pessimistic,
72			false => LockType::Optimistic,
73		}
74	}
75}
76
77/// A set of undoable updates and requests against a dataset.
78#[allow(dead_code)]
79#[non_exhaustive]
80pub struct Transactor {
81	pub(super) inner: Inner,
82	pub(super) stash: Stash,
83	pub(super) cf: cf::Writer,
84	pub(super) clock: Arc<SizedClock>,
85}
86
87#[allow(clippy::large_enum_variant)]
88pub(super) enum Inner {
89	#[cfg(feature = "kv-mem")]
90	Mem(super::mem::Transaction),
91	#[cfg(feature = "kv-rocksdb")]
92	RocksDB(super::rocksdb::Transaction),
93	#[cfg(feature = "kv-indxdb")]
94	IndxDB(super::indxdb::Transaction),
95	#[cfg(feature = "kv-tikv")]
96	TiKV(super::tikv::Transaction),
97	#[cfg(feature = "kv-fdb")]
98	FoundationDB(super::fdb::Transaction),
99	#[cfg(feature = "kv-surrealkv")]
100	SurrealKV(super::surrealkv::Transaction),
101	#[cfg(feature = "kv-surrealcs")]
102	SurrealCS(super::surrealcs::Transaction),
103}
104
105impl fmt::Display for Transactor {
106	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107		#![allow(unused_variables)]
108		match &self.inner {
109			#[cfg(feature = "kv-mem")]
110			Inner::Mem(_) => write!(f, "memory"),
111			#[cfg(feature = "kv-rocksdb")]
112			Inner::RocksDB(_) => write!(f, "rocksdb"),
113			#[cfg(feature = "kv-indxdb")]
114			Inner::IndxDB(_) => write!(f, "indxdb"),
115			#[cfg(feature = "kv-tikv")]
116			Inner::TiKV(_) => write!(f, "tikv"),
117			#[cfg(feature = "kv-fdb")]
118			Inner::FoundationDB(_) => write!(f, "fdb"),
119			#[cfg(feature = "kv-surrealkv")]
120			Inner::SurrealKV(_) => write!(f, "surrealkv"),
121			#[cfg(feature = "kv-surrealcs")]
122			Inner::SurrealCS(_) => write!(f, "surrealcs"),
123			#[allow(unreachable_patterns)]
124			_ => unreachable!(),
125		}
126	}
127}
128
129macro_rules! expand_inner {
130	( $v:expr, $arm:pat_param => $b:block ) => {
131		match $v {
132			#[cfg(feature = "kv-mem")]
133			Inner::Mem($arm) => $b,
134			#[cfg(feature = "kv-rocksdb")]
135			Inner::RocksDB($arm) => $b,
136			#[cfg(feature = "kv-indxdb")]
137			Inner::IndxDB($arm) => $b,
138			#[cfg(feature = "kv-tikv")]
139			Inner::TiKV($arm) => $b,
140			#[cfg(feature = "kv-fdb")]
141			Inner::FoundationDB($arm) => $b,
142			#[cfg(feature = "kv-surrealkv")]
143			Inner::SurrealKV($arm) => $b,
144			#[cfg(feature = "kv-surrealcs")]
145			Inner::SurrealCS($arm) => $b,
146			#[allow(unreachable_patterns)]
147			_ => unreachable!(),
148		}
149	};
150}
151
152impl Transactor {
153	// Allow unused_variables when no storage is enabled as none of the values are used then.
154	#![cfg_attr(
155		not(any(
156			feature = "kv-mem",
157			feature = "kv-rocksdb",
158			feature = "kv-indxdb",
159			feature = "kv-tikv",
160			feature = "kv-fdb",
161			feature = "kv-surrealkv",
162		)),
163		allow(unused_variables)
164	)]
165	// --------------------------------------------------
166	// Integral methods
167	// --------------------------------------------------
168
169	pub(crate) fn supports_reverse_scan(&self) -> bool {
170		expand_inner!(&self.inner, v => { v.supports_reverse_scan() })
171	}
172
173	/// Specify how we should handle unclosed transactions.
174	///
175	/// If a transaction is not cancelled or rolled back then
176	/// this can cause issues on some storage engine
177	/// implementations. In tests we can ignore unhandled
178	/// transactions, whilst in development we should panic
179	/// so that any unintended behaviour is detected, and in
180	/// production we should only log a warning.
181	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
182	pub(crate) fn check_level(&mut self, check: Check) {
183		expand_inner!(&mut self.inner, v => { v.check_level(check) })
184	}
185
186	/// Check if transaction is finished.
187	///
188	/// If the transaction has been cancelled or committed,
189	/// then this function will return [`true`], and any further
190	/// calls to functions on this transaction will result
191	/// in a [`Error::TxFinished`] error.
192	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
193	pub async fn closed(&self) -> bool {
194		trace!(target: TARGET, "Closed");
195		expand_inner!(&self.inner, v => { v.closed() })
196	}
197
198	/// Cancel a transaction.
199	///
200	/// This reverses all changes made within the transaction.
201	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
202	pub async fn cancel(&mut self) -> Result<(), Error> {
203		trace!(target: TARGET, "Cancel");
204		expand_inner!(&mut self.inner, v => { v.cancel().await })
205	}
206
207	/// Commit a transaction.
208	///
209	/// This attempts to commit all changes made within the transaction.
210	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
211	pub async fn commit(&mut self) -> Result<(), Error> {
212		trace!(target: TARGET, "Commit");
213		expand_inner!(&mut self.inner, v => { v.commit().await })
214	}
215
216	/// Check if a key exists in the datastore.
217	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
218	pub async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
219	where
220		K: KeyEncode + Debug,
221	{
222		let key = key.encode_owned()?;
223		trace!(target: TARGET, key = key.sprint(), version = version, "Exists");
224		expand_inner!(&mut self.inner, v => { v.exists(key, version).await })
225	}
226
227	/// Fetch a key from the datastore.
228	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
229	pub async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
230	where
231		K: KeyEncode + Debug,
232	{
233		let key = key.encode_owned()?;
234		trace!(target: TARGET, key = key.sprint(), version = version, "Get");
235		expand_inner!(&mut self.inner, v => { v.get(key, version).await })
236	}
237
238	/// Fetch many keys from the datastore.
239	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
240	pub async fn getm<K>(&mut self, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
241	where
242		K: KeyEncode + Debug,
243	{
244		let mut keys_encoded = Vec::new();
245		for k in keys {
246			keys_encoded.push(k.encode_owned()?);
247		}
248		trace!(target: TARGET, keys = keys_encoded.sprint(), "GetM");
249		expand_inner!(&mut self.inner, v => { v.getm(keys_encoded).await })
250	}
251
252	/// Retrieve a specific range of keys from the datastore.
253	///
254	/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
255	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
256	pub async fn getr<K>(
257		&mut self,
258		rng: Range<K>,
259		version: Option<u64>,
260	) -> Result<Vec<(Key, Val)>, Error>
261	where
262		K: KeyEncode + Debug,
263	{
264		let beg: Key = rng.start.encode_owned()?;
265		let end: Key = rng.end.encode_owned()?;
266		let rng = beg.as_slice()..end.as_slice();
267		trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR");
268		expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
269	}
270
271	/// Retrieve a specific prefixed range of keys from the datastore.
272	///
273	/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
274	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
275	pub async fn getp<K>(&mut self, key: K) -> Result<Vec<(Key, Val)>, Error>
276	where
277		K: KeyEncode + Debug,
278	{
279		let key = key.encode_owned()?;
280		trace!(target: TARGET, key = key.sprint(), "GetP");
281		expand_inner!(&mut self.inner, v => { v.getp(key).await })
282	}
283
284	/// Insert or update a key in the datastore.
285	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
286	pub async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
287	where
288		K: KeyEncode + Debug,
289		V: Into<Val> + Debug,
290	{
291		let key = key.encode_owned()?;
292		trace!(target: TARGET, key = key.sprint(), version = version, "Set");
293		expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
294	}
295
296	/// Insert or replace a key in the datastore.
297	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
298	pub async fn replace<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
299	where
300		K: KeyEncode + Debug,
301		V: Into<Val> + Debug,
302	{
303		let key = key.encode_owned()?;
304		trace!(target: TARGET, key = key.sprint(), "Replace");
305		expand_inner!(&mut self.inner, v => { v.replace(key, val).await })
306	}
307
308	/// Insert a key if it doesn't exist in the datastore.
309	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
310	pub async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
311	where
312		K: KeyEncode + Debug,
313		V: Into<Val> + Debug,
314	{
315		let key = key.encode_owned()?;
316		trace!(target: TARGET, key = key.sprint(), version = version, "Put");
317		expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
318	}
319
320	/// Update a key in the datastore if the current value matches a condition.
321	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
322	pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
323	where
324		K: KeyEncode + Debug,
325		V: Into<Val> + Debug,
326	{
327		let key = key.encode_owned()?;
328		trace!(target: TARGET, key = key.sprint(), "PutC");
329		expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await })
330	}
331
332	/// Delete a key from the datastore.
333	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
334	pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
335	where
336		K: KeyEncode + Debug,
337	{
338		let key = key.encode_owned()?;
339		trace!(target: TARGET, key = key.sprint(), "Del");
340		expand_inner!(&mut self.inner, v => { v.del(key).await })
341	}
342
343	/// Delete a key from the datastore if the current value matches a condition.
344	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
345	pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
346	where
347		K: KeyEncode + Debug,
348		V: Into<Val> + Debug,
349	{
350		let key = key.encode_owned()?;
351		trace!(target: TARGET, key = key.sprint(), "DelC");
352		expand_inner!(&mut self.inner, v => { v.delc(key, chk).await })
353	}
354
355	/// Delete a range of keys from the datastore.
356	///
357	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
358	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
359	pub async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
360	where
361		K: KeyEncode + Debug,
362	{
363		let beg: Key = rng.start.encode_owned()?;
364		let end: Key = rng.end.encode_owned()?;
365		let rng = beg.as_slice()..end.as_slice();
366		trace!(target: TARGET, rng = rng.sprint(), "DelR");
367		expand_inner!(&mut self.inner, v => { v.delr(beg..end).await })
368	}
369
370	/// Delete a prefixed range of keys from the datastore.
371	///
372	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
373	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
374	pub async fn delp<K>(&mut self, key: K) -> Result<(), Error>
375	where
376		K: KeyEncode + Debug,
377	{
378		let key = key.encode_owned()?;
379		trace!(target: TARGET, key = key.sprint(), "DelP");
380		expand_inner!(&mut self.inner, v => { v.delp(key).await })
381	}
382
383	/// Delete all versions of a key from the datastore.
384	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
385	pub async fn clr<K>(&mut self, key: K) -> Result<(), Error>
386	where
387		K: KeyEncode + Debug,
388	{
389		let key = key.encode_owned()?;
390		trace!(target: TARGET, key = key.sprint(), "Clr");
391		expand_inner!(&mut self.inner, v => { v.clr(key).await })
392	}
393
394	/// Delete all versions of a key from the datastore if the current value matches a condition.
395	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
396	pub async fn clrc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
397	where
398		K: KeyEncode + Debug,
399		V: Into<Val> + Debug,
400	{
401		let key = key.encode_owned()?;
402		trace!(target: TARGET, key = key.sprint(), "ClrC");
403		expand_inner!(&mut self.inner, v => { v.clrc(key, chk).await })
404	}
405
406	/// Delete all versions of a range of keys from the datastore.
407	///
408	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
409	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
410	pub async fn clrr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
411	where
412		K: KeyEncode + Debug,
413	{
414		let beg: Key = rng.start.encode_owned()?;
415		let end: Key = rng.end.encode_owned()?;
416		let rng = beg.as_slice()..end.as_slice();
417		trace!(target: TARGET, rng = rng.sprint(), "ClrR");
418		expand_inner!(&mut self.inner, v => { v.clrr(beg..end).await })
419	}
420
421	/// Delete all versions of a prefixed range of keys from the datastore.
422	///
423	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
424	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
425	pub async fn clrp<K>(&mut self, key: K) -> Result<(), Error>
426	where
427		K: KeyEncode + Debug,
428	{
429		let key: Key = key.encode_owned()?;
430		trace!(target: TARGET, key = key.sprint(), "ClrP");
431		expand_inner!(&mut self.inner, v => { v.clrp(key).await })
432	}
433
434	/// Retrieve a specific range of keys from the datastore.
435	///
436	/// This function fetches the full range of keys without values, in a single request to the underlying datastore.
437	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
438	pub async fn keys<K>(
439		&mut self,
440		rng: Range<K>,
441		limit: u32,
442		version: Option<u64>,
443	) -> Result<Vec<Key>, Error>
444	where
445		K: KeyEncode + Debug,
446	{
447		let beg: Key = rng.start.encode_owned()?;
448		let end: Key = rng.end.encode_owned()?;
449		let rng = beg.as_slice()..end.as_slice();
450		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keys");
451		if beg > end {
452			return Ok(vec![]);
453		}
454		expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit, version).await })
455	}
456
457	/// Retrieve a specific range of keys from the datastore.
458	///
459	/// This function fetches the full range of keys without values, in a single request to the underlying datastore.
460	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
461	pub async fn keysr<K>(
462		&mut self,
463		rng: Range<K>,
464		limit: u32,
465		version: Option<u64>,
466	) -> Result<Vec<Key>, Error>
467	where
468		K: KeyEncode + Debug,
469	{
470		let beg: Key = rng.start.encode_owned()?;
471		let end: Key = rng.end.encode_owned()?;
472		let rng = beg.as_slice()..end.as_slice();
473		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keysr");
474		if beg > end {
475			return Ok(vec![]);
476		}
477		expand_inner!(&mut self.inner, v => { v.keysr(beg..end, limit, version).await })
478	}
479
480	/// Retrieve a specific range of keys from the datastore.
481	///
482	/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
483	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
484	pub async fn scan<K>(
485		&mut self,
486		rng: Range<K>,
487		limit: u32,
488		version: Option<u64>,
489	) -> Result<Vec<(Key, Val)>, Error>
490	where
491		K: KeyEncode + Debug,
492	{
493		let beg: Key = rng.start.encode_owned()?;
494		let end: Key = rng.end.encode_owned()?;
495		let rng = beg.as_slice()..end.as_slice();
496		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan");
497		if beg > end {
498			return Ok(vec![]);
499		}
500		expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await })
501	}
502
503	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
504	pub async fn scanr<K>(
505		&mut self,
506		rng: Range<K>,
507		limit: u32,
508		version: Option<u64>,
509	) -> Result<Vec<(Key, Val)>, Error>
510	where
511		K: Into<Key> + Debug,
512	{
513		let beg: Key = rng.start.into();
514		let end: Key = rng.end.into();
515		let rng = beg.as_slice()..end.as_slice();
516		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scanr");
517		if beg > end {
518			return Ok(vec![]);
519		}
520		expand_inner!(&mut self.inner, v => { v.scanr(beg..end, limit, version).await })
521	}
522
523	/// Retrieve a batched scan over a specific range of keys in the datastore.
524	///
525	/// This function fetches keys, in batches, with multiple requests to the underlying datastore.
526	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
527	pub async fn batch_keys<K>(
528		&mut self,
529		rng: Range<K>,
530		batch: u32,
531		version: Option<u64>,
532	) -> Result<Batch<Key>, Error>
533	where
534		K: KeyEncode + Debug,
535	{
536		let beg: Key = rng.start.encode_owned()?;
537		let end: Key = rng.end.encode_owned()?;
538		let rng = beg.as_slice()..end.as_slice();
539		trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
540		expand_inner!(&mut self.inner, v => { v.batch_keys(beg..end, batch, version).await })
541	}
542
543	/// Count the total number of keys within a range in the datastore.
544	///
545	/// This function fetches the total count, in batches, with multiple requests to the underlying datastore.
546	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
547	pub async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
548	where
549		K: KeyEncode + Debug,
550	{
551		let beg: Key = rng.start.encode_owned()?;
552		let end: Key = rng.end.encode_owned()?;
553		let rng = beg.as_slice()..end.as_slice();
554		trace!(target: TARGET, rng = rng.sprint(), "Count");
555		expand_inner!(&mut self.inner, v => { v.count(beg..end).await })
556	}
557
558	/// Retrieve a batched scan over a specific range of keys in the datastore.
559	///
560	/// This function fetches key-value pairs, in batches, with multiple requests to the underlying datastore.
561	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
562	pub async fn batch_keys_vals<K>(
563		&mut self,
564		rng: Range<K>,
565		batch: u32,
566		version: Option<u64>,
567	) -> Result<Batch<(Key, Val)>, Error>
568	where
569		K: KeyEncode + Debug,
570	{
571		let beg: Key = rng.start.encode_owned()?;
572		let end: Key = rng.end.encode_owned()?;
573		let rng = beg.as_slice()..end.as_slice();
574		trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
575		expand_inner!(&mut self.inner, v => { v.batch_keys_vals(beg..end, batch, version).await })
576	}
577
578	/// Retrieve a batched scan of all versions over a specific range of keys in the datastore.
579	///
580	/// This function fetches key-value-version pairs, in batches, with multiple requests to the underlying datastore.
581	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
582	pub async fn batch_keys_vals_versions<K>(
583		&mut self,
584		rng: Range<K>,
585		batch: u32,
586	) -> Result<Batch<(Key, Val, Version, bool)>, Error>
587	where
588		K: KeyEncode + Debug,
589	{
590		let beg: Key = rng.start.encode_owned()?;
591		let end: Key = rng.end.encode_owned()?;
592		let rng = beg.as_slice()..end.as_slice();
593		trace!(target: TARGET, rng = rng.sprint(), "BatchVersions");
594		expand_inner!(&mut self.inner, v => { v.batch_keys_vals_versions(beg..end, batch).await })
595	}
596
597	/// Obtain a new change timestamp for a key
598	/// which is replaced with the current timestamp when the transaction is committed.
599	/// NOTE: This should be called when composing the change feed entries for this transaction,
600	/// which should be done immediately before the transaction commit.
601	/// That is to keep other transactions commit delay(pessimistic) or conflict(optimistic) as less as possible.
602	pub async fn get_timestamp<K>(&mut self, key: K) -> Result<VersionStamp, Error>
603	where
604		K: KeyEncode + Debug,
605	{
606		let key = key.encode_owned()?;
607		expand_inner!(&mut self.inner, v => { v.get_timestamp(key).await })
608	}
609
610	/// Insert or update a key in the datastore.
611	pub async fn set_versionstamped<K, V>(
612		&mut self,
613		ts_key: K,
614		prefix: K,
615		suffix: K,
616		val: V,
617	) -> Result<(), Error>
618	where
619		K: KeyEncode + Debug,
620		V: Into<Val> + Debug,
621	{
622		let ts_key = ts_key.encode_owned()?;
623		let prefix = prefix.encode_owned()?;
624		let suffix = suffix.encode_owned()?;
625		expand_inner!(&mut self.inner, v => { v.set_versionstamp(ts_key, prefix, suffix, val).await })
626	}
627
628	// --------------------------------------------------
629	// Additional methods
630	// --------------------------------------------------
631
632	/// Clock retrieves the current timestamp, without guaranteeing
633	/// monotonicity in all implementations.
634	///
635	/// It is used for unreliable ordering of events as well as
636	/// handling of timeouts. Operations that are not guaranteed to be correct.
637	/// But also allows for lexicographical ordering.
638	///
639	/// Public for tests, but not required for usage from a user perspective.
640	pub async fn clock(&self) -> Timestamp {
641		self.clock.now().await
642	}
643
644	// change will record the change in the changefeed if enabled.
645	// To actually persist the record changes into the underlying kvs,
646	// you must call the `complete_changes` function and then commit the transaction.
647	#[allow(clippy::too_many_arguments)]
648	pub(crate) fn record_change(
649		&mut self,
650		ns: &str,
651		db: &str,
652		tb: &str,
653		id: &Thing,
654		previous: CursorValue,
655		current: CursorValue,
656		store_difference: bool,
657	) {
658		self.cf.record_cf_change(ns, db, tb, id.clone(), previous, current, store_difference)
659	}
660
661	// Records the table (re)definition in the changefeed if enabled.
662	pub(crate) fn record_table_change(
663		&mut self,
664		ns: &str,
665		db: &str,
666		tb: &str,
667		dt: &DefineTableStatement,
668	) {
669		self.cf.define_table(ns, db, tb, dt)
670	}
671
672	pub(crate) async fn get_idg(&mut self, key: &Key) -> Result<U32, Error> {
673		Ok(if let Some(v) = self.stash.get(key) {
674			v
675		} else {
676			let val = self.get(key.clone(), None).await?;
677			if let Some(val) = val {
678				U32::new(key.clone(), Some(val)).await?
679			} else {
680				U32::new(key.clone(), None).await?
681			}
682		})
683	}
684
685	/// Gets the next namespace id
686	pub(crate) async fn get_next_ns_id(&mut self) -> Result<u32, Error> {
687		let key = crate::key::root::ni::Ni::default().encode_owned()?;
688		let mut seq = self.get_idg(&key).await?;
689		let nid = seq.get_next_id();
690		self.stash.set(key, seq.clone());
691		let (k, v) = seq.finish().unwrap();
692		self.replace(k, v).await?;
693		Ok(nid)
694	}
695
696	/// Gets the next database id for the given namespace
697	pub(crate) async fn get_next_db_id(&mut self, ns: u32) -> Result<u32, Error> {
698		let key = crate::key::namespace::di::new(ns).encode_owned()?;
699		let mut seq = self.get_idg(&key).await?;
700		let nid = seq.get_next_id();
701		self.stash.set(key, seq.clone());
702		let (k, v) = seq.finish().unwrap();
703		self.replace(k, v).await?;
704		Ok(nid)
705	}
706
707	/// Gets the next table id for the given namespace and database
708	pub(crate) async fn get_next_tb_id(&mut self, ns: u32, db: u32) -> Result<u32, Error> {
709		let key = crate::key::database::ti::new(ns, db).encode_owned()?;
710		let mut seq = self.get_idg(&key).await?;
711		let nid = seq.get_next_id();
712		self.stash.set(key, seq.clone());
713		let (k, v) = seq.finish().unwrap();
714		self.replace(k, v).await?;
715		Ok(nid)
716	}
717
718	/// Removes the given namespace from the sequence.
719	#[allow(unused)]
720	pub(crate) async fn remove_ns_id(&mut self, ns: u32) -> Result<(), Error> {
721		let key = crate::key::root::ni::Ni::default().encode_owned()?;
722		let mut seq = self.get_idg(&key).await?;
723		seq.remove_id(ns);
724		self.stash.set(key, seq.clone());
725		let (k, v) = seq.finish().unwrap();
726		self.replace(k, v).await?;
727		Ok(())
728	}
729
730	/// Removes the given database from the sequence.
731	#[allow(unused)]
732	pub(crate) async fn remove_db_id(&mut self, ns: u32, db: u32) -> Result<(), Error> {
733		let key = crate::key::namespace::di::new(ns).encode_owned()?;
734		let mut seq = self.get_idg(&key).await?;
735		seq.remove_id(db);
736		self.stash.set(key, seq.clone());
737		let (k, v) = seq.finish().unwrap();
738		self.replace(k, v).await?;
739		Ok(())
740	}
741
742	/// Removes the given table from the sequence.
743	#[allow(unused)]
744	pub(crate) async fn remove_tb_id(&mut self, ns: u32, db: u32, tb: u32) -> Result<(), Error> {
745		let key = crate::key::database::ti::new(ns, db).encode_owned()?;
746		let mut seq = self.get_idg(&key).await?;
747		seq.remove_id(tb);
748		self.stash.set(key, seq.clone());
749		let (k, v) = seq.finish().unwrap();
750		self.replace(k, v).await?;
751		Ok(())
752	}
753
754	// complete_changes will complete the changefeed recording for the given namespace and database.
755	//
756	// Under the hood, this function calls the transaction's `set_versionstamped_key` for each change.
757	// Every change must be recorded by calling this struct's `record_change` function beforehand.
758	// If there were no preceding `record_change` function calls for this transaction, this function will do nothing.
759	//
760	// This function should be called only after all the changes have been made to the transaction.
761	// Otherwise, changes are missed in the change feed.
762	//
763	// This function should be called immediately before calling the commit function to guarantee that
764	// the lock, if needed by lock=true, is held only for the duration of the commit, not the entire transaction.
765	//
766	// This function is here because it needs access to mutably borrow the transaction.
767	//
768	// Lastly, you should set lock=true if you want the changefeed to be correctly ordered for
769	// non-FDB backends.
770	pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> {
771		let changes = self.cf.get()?;
772		for (tskey, prefix, suffix, v) in changes {
773			self.set_versionstamped(tskey, prefix, suffix, v).await?
774		}
775		Ok(())
776	}
777
778	// set_timestamp_for_versionstamp correlates the given timestamp with the current versionstamp.
779	// This allows get_versionstamp_from_timestamp to obtain the versionstamp from the timestamp later.
780	pub(crate) async fn set_timestamp_for_versionstamp(
781		&mut self,
782		ts: u64,
783		ns: &str,
784		db: &str,
785	) -> Result<VersionStamp, Error> {
786		// This also works as an advisory lock on the ts keys so that there is
787		// on other concurrent transactions that can write to the ts_key or the keys after it.
788		let key = crate::key::database::vs::new(ns, db);
789		let vst = self.get_timestamp(key).await?;
790		trace!(
791			target: TARGET,
792			"Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
793			ts,
794			vst.into_u64_lossy(),
795			ns,
796			db
797		);
798
799		// Ensure there are no keys after the ts_key
800		// Otherwise we can go back in time!
801		let mut ts_key = crate::key::database::ts::new(ns, db, ts);
802		let begin = ts_key.encode()?;
803		let end = crate::key::database::ts::suffix(ns, db)?;
804		let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
805		let latest_ts_pair = ts_pairs.last();
806		if let Some((k, _)) = latest_ts_pair {
807			trace!(
808				target: TARGET,
809				"There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
810				ts,
811				ns,
812				db,
813				k.sprint()
814			);
815			let k = crate::key::database::ts::Ts::decode(k)?;
816			let latest_ts = k.ts;
817			if latest_ts >= ts {
818				warn!("ts {ts} is less than the latest ts {latest_ts}");
819				ts_key = crate::key::database::ts::new(ns, db, latest_ts + 1);
820			}
821		}
822		self.replace(ts_key, vst.as_bytes()).await?;
823		Ok(vst)
824	}
825
826	pub(crate) async fn get_versionstamp_from_timestamp(
827		&mut self,
828		ts: u64,
829		ns: &str,
830		db: &str,
831	) -> Result<Option<VersionStamp>, Error> {
832		let start = crate::key::database::ts::prefix(ns, db)?;
833		let ts_key = crate::key::database::ts::new(ns, db, ts + 1).encode_owned()?;
834		let end = ts_key.encode_owned()?;
835		let ts = if self.supports_reverse_scan() {
836			self.scanr(start..end, 1, None).await?.pop().map(|x| x.1)
837		} else {
838			// Batch keys to avoid large memory usage when the amount of stored
839			// version stamps get's too big.
840			let mut batch = self.batch_keys(start..end, *NORMAL_FETCH_SIZE, None).await?;
841			let mut last = batch.result.pop();
842			while let Some(next) = batch.next {
843				// Pause and yield execution
844				yield_now!();
845				batch = self.batch_keys(next, *NORMAL_FETCH_SIZE, None).await?;
846				last = batch.result.pop();
847			}
848			if let Some(last) = last {
849				self.get(last, None).await?
850			} else {
851				None
852			}
853		};
854		if let Some(v) = ts {
855			return Ok(Some(VersionStamp::from_slice(&v)?));
856		}
857		Ok(None)
858	}
859
860	pub(crate) async fn new_save_point(&mut self) {
861		expand_inner!(&mut self.inner, v => { v.new_save_point() })
862	}
863
864	pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
865		expand_inner!(&mut self.inner, v => { v.rollback_to_save_point().await })
866	}
867
868	pub(crate) async fn release_last_save_point(&mut self) -> Result<(), Error> {
869		expand_inner!(&mut self.inner, v => { v.release_last_save_point() })
870	}
871}