Skip to main content

surrealdb_core/kvs/
tr.rs

1use std::fmt;
2use std::fmt::Debug;
3use std::ops::Range;
4
5use futures::stream::Stream;
6
7use super::api::{ScanLimit, Transactable};
8use super::batch::Batch;
9use super::scanner::{Direction, Scanner};
10use super::{IntoBytes, Key, Result, Val};
11use crate::kvs::timestamp::{BoxTimeStamp, BoxTimeStampImpl};
12
13/// Specifies whether the transaction is read-only or writeable.
14#[derive(Copy, Clone, Eq, PartialEq)]
15pub enum TransactionType {
16	Read,
17	Write,
18}
19
20/// Specifies whether the transaction is optimistic or pessimistic.
21#[derive(Copy, Clone)]
22pub enum LockType {
23	Pessimistic,
24	Optimistic,
25}
26
27impl From<bool> for LockType {
28	fn from(value: bool) -> Self {
29		match value {
30			true => LockType::Pessimistic,
31			false => LockType::Optimistic,
32		}
33	}
34}
35
36/// A set of undoable updates and requests against a dataset.
37pub struct Transactor {
38	// The underlying transaction
39	pub(super) inner: Box<dyn Transactable>,
40}
41
42impl fmt::Display for Transactor {
43	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44		write!(f, "{}", self.kind())
45	}
46}
47
48impl Drop for Transactor {
49	fn drop(&mut self) {
50		if !self.closed() && self.writeable() {
51			// Warn when running in test mode
52			#[cfg(test)]
53			warn!("A transaction was dropped without being committed or cancelled");
54			// Panic when running in normal mode
55			#[cfg(not(test))]
56			error!("A transaction was dropped without being committed or cancelled");
57		}
58	}
59}
60
61impl Transactor {
62	/// Get the underlying datastore kind.
63	pub(super) fn kind(&self) -> &'static str {
64		self.inner.kind()
65	}
66
67	/// Check if transaction is finished.
68	///
69	/// If the transaction has been cancelled or committed,
70	/// then this function will return [`true`], and any further
71	/// calls to functions on this transaction will result
72	/// in a [`crate::kvs::Error::TransactionFinished`] error.
73	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
74	pub fn closed(&self) -> bool {
75		self.inner.closed()
76	}
77
78	/// Check if transaction is writeable.
79	///
80	/// If the transaction has been marked as a writeable
81	/// transaction, then this function will return [`true`].
82	/// This fuction can be used to check whether a transaction
83	/// allows data to be modified, and if not then the function
84	/// will return a [`crate::kvs::Error::TransactionReadonly`] error when
85	/// attempting to modify any data within the transaction.
86	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
87	pub fn writeable(&self) -> bool {
88		self.inner.writeable()
89	}
90
91	/// Cancel a transaction.
92	///
93	/// This reverses all changes made within the transaction.
94	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
95	pub async fn cancel(&self) -> Result<()> {
96		self.inner.cancel().await
97	}
98
99	/// Commit a transaction.
100	///
101	/// This attempts to commit all changes made within the transaction.
102	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
103	pub async fn commit(&self) -> Result<()> {
104		self.inner.commit().await
105	}
106
107	/// Check if a key exists in the datastore.
108	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
109	pub async fn exists<K>(&self, key: K, version: Option<u64>) -> Result<bool>
110	where
111		K: IntoBytes + Debug,
112	{
113		let key = key.into_vec();
114		self.inner.exists(key, version).await
115	}
116
117	/// Fetch a key from the datastore.
118	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
119	pub async fn get<K>(&self, key: K, version: Option<u64>) -> Result<Option<Val>>
120	where
121		K: IntoBytes + Debug,
122	{
123		let key = key.into_vec();
124		self.inner.get(key, version).await
125	}
126
127	/// Fetch many keys from the datastore.
128	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
129	pub async fn getm<K>(&self, keys: Vec<K>, version: Option<u64>) -> Result<Vec<Option<Val>>>
130	where
131		K: IntoBytes + Debug,
132	{
133		let keys = keys.into_iter().map(IntoBytes::into_vec).collect();
134		self.inner.getm(keys, version).await
135	}
136
137	/// Retrieve a specific prefixed range of keys from the datastore.
138	///
139	/// This function fetches all matching key-value pairs from the underlying
140	/// datastore in grouped batches.
141	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
142	pub async fn getp<K>(&self, key: K) -> Result<Vec<(Key, Val)>>
143	where
144		K: IntoBytes + Debug,
145	{
146		let key = key.into_vec();
147		self.inner.getp(key).await
148	}
149
150	/// Retrieve a specific range of keys from the datastore.
151	///
152	/// This function fetches all matching key-value pairs from the underlying
153	/// datastore in grouped batches.
154	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
155	pub async fn getr<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<Vec<(Key, Val)>>
156	where
157		K: IntoBytes + Debug,
158	{
159		let beg = rng.start.into_vec();
160		let end = rng.end.into_vec();
161		self.inner.getr(beg..end, version).await
162	}
163
164	/// Insert or update a key in the datastore.
165	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
166	pub async fn set<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<()>
167	where
168		K: IntoBytes + Debug,
169		V: IntoBytes + Debug,
170	{
171		let key = key.into_vec();
172		let val = val.into_vec();
173		self.inner.set(key, val, version).await
174	}
175
176	/// Insert or replace a key in the datastore.
177	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
178	pub async fn replace<K, V>(&self, key: K, val: V) -> Result<()>
179	where
180		K: IntoBytes + Debug,
181		V: IntoBytes + Debug,
182	{
183		let key = key.into_vec();
184		let val = val.into_vec();
185		self.inner.replace(key, val).await
186	}
187
188	/// Insert a key if it doesn't exist in the datastore.
189	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
190	pub async fn put<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<()>
191	where
192		K: IntoBytes + Debug,
193		V: IntoBytes + Debug,
194	{
195		let key = key.into_vec();
196		let val = val.into_vec();
197		self.inner.put(key, val, version).await
198	}
199
200	/// Update a key in the datastore if the current value matches a condition.
201	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
202	pub async fn putc<K, V>(&self, key: K, val: V, chk: Option<V>) -> Result<()>
203	where
204		K: IntoBytes + Debug,
205		V: IntoBytes + Debug,
206	{
207		let key = key.into_vec();
208		let val = val.into_vec();
209		let chk = chk.map(|v| v.into_vec());
210		self.inner.putc(key, val, chk).await
211	}
212
213	/// Delete a key from the datastore.
214	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
215	pub async fn del<K>(&self, key: K) -> Result<()>
216	where
217		K: IntoBytes + Debug,
218	{
219		let key = key.into_vec();
220		self.inner.del(key).await
221	}
222
223	/// Delete a key from the datastore if the current value matches a
224	/// condition.
225	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
226	pub async fn delc<K, V>(&self, key: K, chk: Option<V>) -> Result<()>
227	where
228		K: IntoBytes + Debug,
229		V: IntoBytes + Debug,
230	{
231		let key = key.into_vec();
232		let chk = chk.map(|v| v.into_vec());
233		self.inner.delc(key, chk).await
234	}
235
236	/// Delete a prefixed range of keys from the datastore.
237	///
238	/// This function deletes all matching key-value pairs from the underlying
239	/// datastore in grouped batches.
240	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
241	pub async fn delp<K>(&self, key: K) -> Result<()>
242	where
243		K: IntoBytes + Debug,
244	{
245		let key = key.into_vec();
246		self.inner.delp(key).await
247	}
248
249	/// Delete a range of keys from the datastore.
250	///
251	/// This function deletes all matching key-value pairs from the underlying
252	/// datastore in grouped batches.
253	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
254	pub async fn delr<K>(&self, rng: Range<K>) -> Result<()>
255	where
256		K: IntoBytes + Debug,
257	{
258		let beg = rng.start.into_vec();
259		let end = rng.end.into_vec();
260		self.inner.delr(beg..end).await
261	}
262
263	/// Delete all versions of a key from the datastore.
264	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
265	pub async fn clr<K>(&self, key: K) -> Result<()>
266	where
267		K: IntoBytes + Debug,
268	{
269		let key = key.into_vec();
270		self.inner.clr(key).await
271	}
272
273	/// Delete all versions of a key from the datastore if the current value
274	/// matches a condition.
275	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
276	pub async fn clrc<K, V>(&self, key: K, chk: Option<V>) -> Result<()>
277	where
278		K: IntoBytes + Debug,
279		V: IntoBytes + Debug,
280	{
281		let key = key.into_vec();
282		let chk = chk.map(|v| v.into_vec());
283		self.inner.clrc(key, chk).await
284	}
285
286	/// Delete all versions of a prefixed range of keys from the datastore.
287	///
288	/// This function deletes all matching key-value pairs from the underlying
289	/// datastore in grouped batches.
290	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
291	pub async fn clrp<K>(&self, key: K) -> Result<()>
292	where
293		K: IntoBytes + Debug,
294	{
295		let key = key.into_vec();
296		self.inner.clrp(key).await
297	}
298
299	/// Delete all versions of a range of keys from the datastore.
300	///
301	/// This function deletes all matching key-value pairs from the underlying
302	/// datastore in grouped batches.
303	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
304	pub async fn clrr<K>(&self, rng: Range<K>) -> Result<()>
305	where
306		K: IntoBytes + Debug,
307	{
308		let beg = rng.start.into_vec();
309		let end = rng.end.into_vec();
310		self.inner.clrr(beg..end).await
311	}
312
313	// --------------------------------------------------
314	// Range functions
315	// --------------------------------------------------
316
317	/// Retrieve a specific range of keys from the datastore.
318	///
319	/// This function fetches the full range of keys without values, in a single
320	/// request to the underlying datastore.
321	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
322	pub async fn keys<K>(
323		&self,
324		rng: Range<K>,
325		limit: ScanLimit,
326		skip: u32,
327		version: Option<u64>,
328	) -> Result<Vec<Key>>
329	where
330		K: IntoBytes + Debug,
331	{
332		let beg = rng.start.into_vec();
333		let end = rng.end.into_vec();
334		if beg > end {
335			return Ok(vec![]);
336		}
337		self.inner.keys(beg..end, limit, skip, version).await
338	}
339
340	/// Retrieve a specific range of keys from the datastore.
341	///
342	/// This function fetches the full range of keys without values, in a single
343	/// request to the underlying datastore.
344	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
345	pub async fn keysr<K>(
346		&self,
347		rng: Range<K>,
348		limit: ScanLimit,
349		skip: u32,
350		version: Option<u64>,
351	) -> Result<Vec<Key>>
352	where
353		K: IntoBytes + Debug,
354	{
355		let beg = rng.start.into_vec();
356		let end = rng.end.into_vec();
357		if beg > end {
358			return Ok(vec![]);
359		}
360		self.inner.keysr(beg..end, limit, skip, version).await
361	}
362
363	/// Retrieve a specific range of key-value pairs from the datastore.
364	///
365	/// This function fetches the full range of key-value pairs, in a single
366	/// request to the underlying datastore.
367	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
368	pub async fn scan<K>(
369		&self,
370		rng: Range<K>,
371		limit: ScanLimit,
372		skip: u32,
373		version: Option<u64>,
374	) -> Result<Vec<(Key, Val)>>
375	where
376		K: IntoBytes + Debug,
377	{
378		let beg = rng.start.into_vec();
379		let end = rng.end.into_vec();
380		if beg > end {
381			return Ok(vec![]);
382		}
383		self.inner.scan(beg..end, limit, skip, version).await
384	}
385
386	/// Retrieve a specific range of key-value pairs from the datastore.
387	///
388	/// This function fetches the full range of key-value pairs, in a single
389	/// request to the underlying datastore.
390	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
391	pub async fn scanr<K>(
392		&self,
393		rng: Range<K>,
394		limit: ScanLimit,
395		skip: u32,
396		version: Option<u64>,
397	) -> Result<Vec<(Key, Val)>>
398	where
399		K: IntoBytes + Debug,
400	{
401		let beg = rng.start.into_vec();
402		let end = rng.end.into_vec();
403		if beg > end {
404			return Ok(vec![]);
405		}
406		self.inner.scanr(beg..end, limit, skip, version).await
407	}
408
409	/// Count the total number of keys within a range in the datastore.
410	///
411	/// This function fetches the total count, in batches, with multiple
412	/// requests to the underlying datastore.
413	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
414	pub async fn count<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<usize>
415	where
416		K: IntoBytes + Debug,
417	{
418		let beg = rng.start.into_vec();
419		let end = rng.end.into_vec();
420		self.inner.count(beg..end, version).await
421	}
422
423	// --------------------------------------------------
424	// Batch functions
425	// --------------------------------------------------
426
427	/// Retrieve a batched scan over a specific range of keys in the datastore.
428	///
429	/// This function fetches keys, in batches, with multiple requests to the
430	/// underlying datastore.
431	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
432	pub async fn batch_keys<K>(
433		&self,
434		rng: Range<K>,
435		batch: u32,
436		version: Option<u64>,
437	) -> Result<Batch<Key>>
438	where
439		K: IntoBytes + Debug,
440	{
441		let beg = rng.start.into_vec();
442		let end = rng.end.into_vec();
443		self.inner.batch_keys(beg..end, batch, version).await
444	}
445
446	/// Retrieve a batched scan over a specific range of keys in the datastore.
447	///
448	/// This function fetches key-value pairs, in batches, with multiple
449	/// requests to the underlying datastore.
450	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
451	pub async fn batch_keys_vals<K>(
452		&self,
453		rng: Range<K>,
454		batch: u32,
455		version: Option<u64>,
456	) -> Result<Batch<(Key, Val)>>
457	where
458		K: IntoBytes + Debug,
459	{
460		let beg = rng.start.into_vec();
461		let end = rng.end.into_vec();
462		self.inner.batch_keys_vals(beg..end, batch, version).await
463	}
464
465	// --------------------------------------------------
466	// Stream functions
467	// --------------------------------------------------
468
469	/// Retrieve a stream of key batches over a specific range in the datastore.
470	///
471	/// This function returns a stream that yields batches of keys. The scanner:
472	/// - Fetches an initial batch of up to 500 items
473	/// - Fetches subsequent batches of up to 16 MiB (local) or 4 MiB (remote)
474	/// - Prefetches the next batch while the current batch is being processed
475	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
476	pub fn stream_keys<K>(
477		&self,
478		rng: Range<K>,
479		version: Option<u64>,
480		limit: Option<usize>,
481		skip: u32,
482		dir: Direction,
483	) -> impl Stream<Item = Result<Vec<Key>>> + '_
484	where
485		K: IntoBytes + Debug,
486	{
487		let beg = rng.start.into_vec();
488		let end = rng.end.into_vec();
489		let mut scanner = Scanner::<Key>::new(self, beg..end, limit, dir);
490		// Set the version
491		if let Some(v) = version {
492			scanner = scanner.version(v);
493		}
494		// Set the skip
495		if skip > 0 {
496			scanner = scanner.skip(skip);
497		}
498		// Return the stream
499		scanner
500	}
501
502	/// Retrieve a stream of key-value batches over a specific range in the datastore.
503	///
504	/// This function returns a stream that yields batches of key-value pairs. The scanner:
505	/// - Fetches an initial batch of up to 500 items (or 1000 when `prefetch` is enabled)
506	/// - Fetches subsequent batches of up to 16 MiB (local) or 4 MiB (remote)
507	/// - When `prefetch` is true, prefetches the next batch while the current batch is being
508	///   processed, and uses a larger initial batch size (500 items)
509	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
510	pub fn stream_keys_vals<K>(
511		&self,
512		rng: Range<K>,
513		version: Option<u64>,
514		limit: Option<usize>,
515		skip: u32,
516		dir: Direction,
517		prefetch: bool,
518	) -> impl Stream<Item = Result<Vec<(Key, Val)>>> + '_
519	where
520		K: IntoBytes + Debug,
521	{
522		let beg = rng.start.into_vec();
523		let end = rng.end.into_vec();
524		let mut scanner = Scanner::<(Key, Val)>::new(self, beg..end, limit, dir);
525		// Set the version
526		if let Some(v) = version {
527			scanner = scanner.version(v);
528		}
529		// Set the skip
530		if skip > 0 {
531			scanner = scanner.skip(skip);
532		}
533		// Enable prefetching and larger initial batch for full scans.
534		// The scanner default is already NORMAL_FETCH_SIZE (500); when
535		// prefetching is active we double it to amortise the overlap cost.
536		if prefetch {
537			scanner = scanner
538				.prefetch(true)
539				.initial_batch_size(ScanLimit::Count(*crate::cnf::NORMAL_FETCH_SIZE * 2));
540		}
541		// Return the stream
542		scanner
543	}
544
545	// --------------------------------------------------
546	// Savepoint functions
547	// --------------------------------------------------
548
549	/// Set a new save point on the transaction.
550	pub async fn new_save_point(&self) -> Result<()> {
551		self.inner.new_save_point().await
552	}
553
554	/// Release the last save point.
555	pub async fn release_last_save_point(&self) -> Result<()> {
556		self.inner.release_last_save_point().await
557	}
558
559	/// Rollback to the last save point.
560	pub async fn rollback_to_save_point(&self) -> Result<()> {
561		self.inner.rollback_to_save_point().await
562	}
563
564	// --------------------------------------------------
565	// Timestamp functions
566	// --------------------------------------------------
567
568	/// Get the current monotonic timestamp
569	pub async fn timestamp(&self) -> Result<BoxTimeStamp> {
570		self.inner.timestamp().await
571	}
572
573	/// Returns the implementation of timestamp that this transaction uses.
574	pub fn timestamp_impl(&self) -> BoxTimeStampImpl {
575		self.inner.timestamp_impl()
576	}
577}