surrealdb_core/kvs/
ds.rs

1use super::export;
2use super::tr::Transactor;
3use super::tx::Transaction;
4use super::version::Version;
5use crate::ctx::MutableContext;
6#[cfg(feature = "jwks")]
7use crate::dbs::capabilities::NetTarget;
8use crate::dbs::capabilities::{
9	ArbitraryQueryTarget, ExperimentalTarget, MethodTarget, RouteTarget,
10};
11use crate::dbs::node::Timestamp;
12use crate::dbs::{
13	Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
14};
15use crate::err::Error;
16#[cfg(feature = "jwks")]
17use crate::iam::jwks::JwksCache;
18use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
19use crate::idx::trees::store::IndexStores;
20use crate::kvs::cache::ds::DatastoreCache;
21use crate::kvs::clock::SizedClock;
22#[allow(unused_imports)]
23use crate::kvs::clock::SystemClock;
24#[cfg(not(target_family = "wasm"))]
25use crate::kvs::index::IndexBuilder;
26use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
27use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
28use crate::syn;
29use crate::syn::parser::{ParserSettings, StatementStream};
30use crate::{cf, cnf};
31use async_channel::{Receiver, Sender};
32use bytes::{Bytes, BytesMut};
33use futures::{Future, Stream};
34use reblessive::TreeStack;
35use std::fmt;
36#[cfg(storage)]
37use std::path::PathBuf;
38use std::pin::pin;
39use std::sync::Arc;
40use std::task::{ready, Poll};
41use std::time::Duration;
42#[cfg(not(target_family = "wasm"))]
43use std::time::{SystemTime, UNIX_EPOCH};
44#[cfg(feature = "jwks")]
45use tokio::sync::RwLock;
46use tracing::instrument;
47use tracing::trace;
48use uuid::Uuid;
49#[cfg(target_family = "wasm")]
50use wasmtimer::std::{SystemTime, UNIX_EPOCH};
51
52const TARGET: &str = "surrealdb::core::kvs::ds";
53
54// If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks
55const LQ_CHANNEL_SIZE: usize = 15_000;
56
57// The role assigned to the initial user created when starting the server with credentials for the first time
58const INITIAL_USER_ROLE: &str = "owner";
59
60/// The underlying datastore instance which stores the dataset.
61#[allow(dead_code)]
62#[non_exhaustive]
63pub struct Datastore {
64	transaction_factory: TransactionFactory,
65	/// The unique id of this datastore, used in notifications.
66	id: Uuid,
67	/// Whether this datastore runs in strict mode by default.
68	strict: bool,
69	/// Whether authentication is enabled on this datastore.
70	auth_enabled: bool,
71	/// The maximum duration timeout for running multiple statements in a query.
72	query_timeout: Option<Duration>,
73	/// The maximum duration timeout for running multiple statements in a transaction.
74	transaction_timeout: Option<Duration>,
75	/// The security and feature capabilities for this datastore.
76	capabilities: Arc<Capabilities>,
77	// Whether this datastore enables live query notifications to subscribers.
78	notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
79	// The index store cache
80	index_stores: IndexStores,
81	// The cross transaction cache
82	cache: Arc<DatastoreCache>,
83	// The index asynchronous builder
84	#[cfg(not(target_family = "wasm"))]
85	index_builder: IndexBuilder,
86	#[cfg(feature = "jwks")]
87	// The JWKS object cache
88	jwks_cache: Arc<RwLock<JwksCache>>,
89	#[cfg(storage)]
90	// The temporary directory
91	temporary_directory: Option<Arc<PathBuf>>,
92}
93
94#[derive(Clone)]
95pub(super) struct TransactionFactory {
96	// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
97	clock: Arc<SizedClock>,
98	// The inner datastore type
99	flavor: Arc<DatastoreFlavor>,
100}
101
102impl TransactionFactory {
103	#[allow(unreachable_code)]
104	pub async fn transaction(
105		&self,
106		write: TransactionType,
107		lock: LockType,
108	) -> Result<Transaction, Error> {
109		// Specify if the transaction is writeable
110		#[allow(unused_variables)]
111		let write = match write {
112			Read => false,
113			Write => true,
114		};
115		// Specify if the transaction is lockable
116		#[allow(unused_variables)]
117		let lock = match lock {
118			Pessimistic => true,
119			Optimistic => false,
120		};
121		// Create a new transaction on the datastore
122		#[allow(unused_variables)]
123		let (inner, local) = match self.flavor.as_ref() {
124			#[cfg(feature = "kv-mem")]
125			DatastoreFlavor::Mem(v) => {
126				let tx = v.transaction(write, lock).await?;
127				(super::tr::Inner::Mem(tx), true)
128			}
129			#[cfg(feature = "kv-rocksdb")]
130			DatastoreFlavor::RocksDB(v) => {
131				let tx = v.transaction(write, lock).await?;
132				(super::tr::Inner::RocksDB(tx), true)
133			}
134			#[cfg(feature = "kv-indxdb")]
135			DatastoreFlavor::IndxDB(v) => {
136				let tx = v.transaction(write, lock).await?;
137				(super::tr::Inner::IndxDB(tx), true)
138			}
139			#[cfg(feature = "kv-tikv")]
140			DatastoreFlavor::TiKV(v) => {
141				let tx = v.transaction(write, lock).await?;
142				(super::tr::Inner::TiKV(tx), false)
143			}
144			#[cfg(feature = "kv-fdb")]
145			DatastoreFlavor::FoundationDB(v) => {
146				let tx = v.transaction(write, lock).await?;
147				(super::tr::Inner::FoundationDB(tx), false)
148			}
149			#[cfg(feature = "kv-surrealkv")]
150			DatastoreFlavor::SurrealKV(v) => {
151				let tx = v.transaction(write, lock).await?;
152				(super::tr::Inner::SurrealKV(tx), true)
153			}
154			#[cfg(feature = "kv-surrealcs")]
155			DatastoreFlavor::SurrealCS(v) => {
156				let tx = v.transaction(write, lock).await?;
157				(super::tr::Inner::SurrealCS(tx), false)
158			}
159			#[allow(unreachable_patterns)]
160			_ => unreachable!(),
161		};
162		Ok(Transaction::new(
163			local,
164			Transactor {
165				inner,
166				stash: super::stash::Stash::default(),
167				cf: cf::Writer::new(),
168				clock: self.clock.clone(),
169			},
170		))
171	}
172}
173
174#[allow(clippy::large_enum_variant)]
175pub(super) enum DatastoreFlavor {
176	#[cfg(feature = "kv-mem")]
177	Mem(super::mem::Datastore),
178	#[cfg(feature = "kv-rocksdb")]
179	RocksDB(super::rocksdb::Datastore),
180	#[cfg(feature = "kv-indxdb")]
181	IndxDB(super::indxdb::Datastore),
182	#[cfg(feature = "kv-tikv")]
183	TiKV(super::tikv::Datastore),
184	#[cfg(feature = "kv-fdb")]
185	FoundationDB(super::fdb::Datastore),
186	#[cfg(feature = "kv-surrealkv")]
187	SurrealKV(super::surrealkv::Datastore),
188	#[cfg(feature = "kv-surrealcs")]
189	SurrealCS(super::surrealcs::Datastore),
190}
191
192impl fmt::Display for Datastore {
193	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194		#![allow(unused_variables)]
195		match self.transaction_factory.flavor.as_ref() {
196			#[cfg(feature = "kv-mem")]
197			DatastoreFlavor::Mem(_) => write!(f, "memory"),
198			#[cfg(feature = "kv-rocksdb")]
199			DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
200			#[cfg(feature = "kv-indxdb")]
201			DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
202			#[cfg(feature = "kv-tikv")]
203			DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
204			#[cfg(feature = "kv-fdb")]
205			DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
206			#[cfg(feature = "kv-surrealkv")]
207			DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
208			#[cfg(feature = "kv-surrealcs")]
209			DatastoreFlavor::SurrealCS(_) => write!(f, "surrealcs"),
210			#[allow(unreachable_patterns)]
211			_ => unreachable!(),
212		}
213	}
214}
215
216impl Datastore {
217	/// Creates a new datastore instance
218	///
219	/// # Examples
220	///
221	/// ```rust,no_run
222	/// # use surrealdb_core::kvs::Datastore;
223	/// # use surrealdb_core::err::Error;
224	/// # #[tokio::main]
225	/// # async fn main() -> Result<(), Error> {
226	/// let ds = Datastore::new("memory").await?;
227	/// # Ok(())
228	/// # }
229	/// ```
230	///
231	/// Or to create a file-backed store:
232	///
233	/// ```rust,no_run
234	/// # use surrealdb_core::kvs::Datastore;
235	/// # use surrealdb_core::err::Error;
236	/// # #[tokio::main]
237	/// # async fn main() -> Result<(), Error> {
238	/// let ds = Datastore::new("surrealkv://temp.skv").await?;
239	/// # Ok(())
240	/// # }
241	/// ```
242	///
243	/// Or to connect to a tikv-backed distributed store:
244	///
245	/// ```rust,no_run
246	/// # use surrealdb_core::kvs::Datastore;
247	/// # use surrealdb_core::err::Error;
248	/// # #[tokio::main]
249	/// # async fn main() -> Result<(), Error> {
250	/// let ds = Datastore::new("tikv://127.0.0.1:2379").await?;
251	/// # Ok(())
252	/// # }
253	/// ```
254	pub async fn new(path: &str) -> Result<Self, Error> {
255		Self::new_with_clock(path, None).await
256	}
257
258	#[allow(unused_variables)]
259	pub async fn new_with_clock(
260		path: &str,
261		clock: Option<Arc<SizedClock>>,
262	) -> Result<Datastore, Error> {
263		// Initiate the desired datastore
264		let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
265			// Initiate an in-memory datastore
266			"memory" => {
267				#[cfg(feature = "kv-mem")]
268				{
269					// Innitialise the storage engine
270					info!(target: TARGET, "Starting kvs store in {}", path);
271					let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
272					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
273					info!(target: TARGET, "Started kvs store in {}", path);
274					Ok((v, c))
275				}
276				#[cfg(not(feature = "kv-mem"))]
277                return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
278			}
279			// Parse and initiate a File datastore
280			s if s.starts_with("file:") => {
281				#[cfg(feature = "kv-rocksdb")]
282				{
283					// Create a new blocking threadpool
284					super::threadpool::initialise();
285					// Innitialise the storage engine
286					info!(target: TARGET, "Starting kvs store at {}", path);
287					warn!("file:// is deprecated, please use surrealkv:// or rocksdb://");
288					let s = s.trim_start_matches("file://");
289					let s = s.trim_start_matches("file:");
290					let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
291					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
292					info!(target: TARGET, "Started kvs store at {}", path);
293					Ok((v, c))
294				}
295				#[cfg(not(feature = "kv-rocksdb"))]
296                return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
297			}
298			// Parse and initiate a RocksDB datastore
299			s if s.starts_with("rocksdb:") => {
300				#[cfg(feature = "kv-rocksdb")]
301				{
302					// Create a new blocking threadpool
303					super::threadpool::initialise();
304					// Innitialise the storage engine
305					info!(target: TARGET, "Starting kvs store at {}", path);
306					let s = s.trim_start_matches("rocksdb://");
307					let s = s.trim_start_matches("rocksdb:");
308					let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
309					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
310					info!(target: TARGET, "Started kvs store at {}", path);
311					Ok((v, c))
312				}
313				#[cfg(not(feature = "kv-rocksdb"))]
314                return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
315			}
316			// Parse and initiate a SurrealKV datastore
317			s if s.starts_with("surrealkv") => {
318				#[cfg(feature = "kv-surrealkv")]
319				{
320					// Create a new blocking threadpool
321					super::threadpool::initialise();
322					// Innitialise the storage engine
323					info!(target: TARGET, "Starting kvs store at {}", s);
324					let (path, enable_versions) =
325						super::surrealkv::Datastore::parse_start_string(s)?;
326					let v = super::surrealkv::Datastore::new(path, enable_versions)
327						.await
328						.map(DatastoreFlavor::SurrealKV);
329					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
330					info!(target: TARGET, "Started kvs store at {} with versions {}", path, if enable_versions { "enabled" } else { "disabled" });
331					Ok((v, c))
332				}
333				#[cfg(not(feature = "kv-surrealkv"))]
334                return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
335			}
336			// Parse and initiate a SurrealCS datastore
337			s if s.starts_with("surrealcs:") => {
338				#[cfg(feature = "kv-surrealcs")]
339				{
340					info!(target: TARGET, "Starting kvs store at {}", path);
341					let s = s.trim_start_matches("surrealcs://");
342					let s = s.trim_start_matches("surrealcs:");
343					let v =
344						super::surrealcs::Datastore::new(s).await.map(DatastoreFlavor::SurrealCS);
345					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
346					info!(target: TARGET, "Started kvs store at {}", path);
347					Ok((v, c))
348				}
349				#[cfg(not(feature = "kv-surrealcs"))]
350				return Err(Error::Ds("Cannot connect to the `surrealcs` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
351			}
352			// Parse and initiate an IndxDB database
353			s if s.starts_with("indxdb:") => {
354				#[cfg(feature = "kv-indxdb")]
355				{
356					info!(target: TARGET, "Starting kvs store at {}", path);
357					let s = s.trim_start_matches("indxdb://");
358					let s = s.trim_start_matches("indxdb:");
359					let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
360					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
361					info!(target: TARGET, "Started kvs store at {}", path);
362					Ok((v, c))
363				}
364				#[cfg(not(feature = "kv-indxdb"))]
365                return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
366			}
367			// Parse and initiate a TiKV datastore
368			s if s.starts_with("tikv:") => {
369				#[cfg(feature = "kv-tikv")]
370				{
371					info!(target: TARGET, "Connecting to kvs store at {}", path);
372					let s = s.trim_start_matches("tikv://");
373					let s = s.trim_start_matches("tikv:");
374					let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
375					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
376					info!(target: TARGET, "Connected to kvs store at {}", path);
377					Ok((v, c))
378				}
379				#[cfg(not(feature = "kv-tikv"))]
380                return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
381			}
382			// Parse and initiate a FoundationDB datastore
383			s if s.starts_with("fdb:") => {
384				#[cfg(feature = "kv-fdb")]
385				{
386					info!(target: TARGET, "Connecting to kvs store at {}", path);
387					let s = s.trim_start_matches("fdb://");
388					let s = s.trim_start_matches("fdb:");
389					let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
390					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
391					info!(target: TARGET, "Connected to kvs store at {}", path);
392					Ok((v, c))
393				}
394				#[cfg(not(feature = "kv-fdb"))]
395                return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
396			}
397			// The datastore path is not valid
398			_ => {
399				info!(target: TARGET, "Unable to load the specified datastore {}", path);
400				Err(Error::Ds("Unable to load the specified datastore".into()))
401			}
402		}?;
403		// Set the properties on the datastore
404		flavor.map(|flavor| {
405			let tf = TransactionFactory {
406				clock,
407				flavor: Arc::new(flavor),
408			};
409			Self {
410				id: Uuid::new_v4(),
411				transaction_factory: tf.clone(),
412				strict: false,
413				auth_enabled: false,
414				query_timeout: None,
415				transaction_timeout: None,
416				notification_channel: None,
417				capabilities: Arc::new(Capabilities::default()),
418				index_stores: IndexStores::default(),
419				#[cfg(not(target_family = "wasm"))]
420				index_builder: IndexBuilder::new(tf),
421				#[cfg(feature = "jwks")]
422				jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
423				#[cfg(storage)]
424				temporary_directory: None,
425				cache: Arc::new(DatastoreCache::new()),
426			}
427		})
428	}
429
430	/// Create a new datastore with the same persistent data (inner), with flushed cache.
431	/// Simulating a server restart
432	#[allow(dead_code)]
433	pub fn restart(self) -> Self {
434		Self {
435			id: self.id,
436			strict: self.strict,
437			auth_enabled: self.auth_enabled,
438			query_timeout: self.query_timeout,
439			transaction_timeout: self.transaction_timeout,
440			capabilities: self.capabilities,
441			notification_channel: self.notification_channel,
442			index_stores: Default::default(),
443			#[cfg(not(target_family = "wasm"))]
444			index_builder: IndexBuilder::new(self.transaction_factory.clone()),
445			#[cfg(feature = "jwks")]
446			jwks_cache: Arc::new(Default::default()),
447			#[cfg(storage)]
448			temporary_directory: self.temporary_directory,
449			transaction_factory: self.transaction_factory,
450			cache: Arc::new(DatastoreCache::new()),
451		}
452	}
453
454	/// Specify whether this Datastore should run in strict mode
455	pub fn with_node_id(mut self, id: Uuid) -> Self {
456		self.id = id;
457		self
458	}
459
460	/// Specify whether this Datastore should run in strict mode
461	pub fn with_strict_mode(mut self, strict: bool) -> Self {
462		self.strict = strict;
463		self
464	}
465
466	/// Specify whether this datastore should enable live query notifications
467	pub fn with_notifications(mut self) -> Self {
468		self.notification_channel = Some(async_channel::bounded(LQ_CHANNEL_SIZE));
469		self
470	}
471
472	/// Set a global query timeout for this Datastore
473	pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
474		self.query_timeout = duration;
475		self
476	}
477
478	/// Set a global transaction timeout for this Datastore
479	pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
480		self.transaction_timeout = duration;
481		self
482	}
483
484	/// Set whether authentication is enabled for this Datastore
485	pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
486		self.auth_enabled = enabled;
487		self
488	}
489
490	/// Set specific capabilities for this Datastore
491	pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
492		self.capabilities = Arc::new(caps);
493		self
494	}
495
496	#[cfg(storage)]
497	/// Set a temporary directory for ordering of large result sets
498	pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
499		self.temporary_directory = path.map(Arc::new);
500		self
501	}
502
503	pub fn index_store(&self) -> &IndexStores {
504		&self.index_stores
505	}
506
507	/// Is authentication enabled for this Datastore?
508	pub fn is_auth_enabled(&self) -> bool {
509		self.auth_enabled
510	}
511
512	pub fn id(&self) -> Uuid {
513		self.id
514	}
515
516	/// Does the datastore allow excecuting an RPC method?
517	pub(crate) fn allows_rpc_method(&self, method_target: &MethodTarget) -> bool {
518		self.capabilities.allows_rpc_method(method_target)
519	}
520
521	/// Does the datastore allow requesting an HTTP route?
522	/// This function needs to be public to allow access from the CLI crate.
523	pub fn allows_http_route(&self, route_target: &RouteTarget) -> bool {
524		self.capabilities.allows_http_route(route_target)
525	}
526
527	/// Is the user allowed to query?
528	pub fn allows_query_by_subject(&self, subject: impl Into<ArbitraryQueryTarget>) -> bool {
529		self.capabilities.allows_query(&subject.into())
530	}
531
532	/// Does the datastore allow connections to a network target?
533	#[cfg(feature = "jwks")]
534	pub(crate) fn allows_network_target(&self, net_target: &NetTarget) -> bool {
535		self.capabilities.allows_network_target(net_target)
536	}
537
538	/// Set specific capabilities for this Datastore
539	pub fn get_capabilities(&self) -> &Capabilities {
540		&self.capabilities
541	}
542
543	#[cfg(feature = "jwks")]
544	pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
545		&self.jwks_cache
546	}
547
548	pub(super) async fn clock_now(&self) -> Timestamp {
549		self.transaction_factory.clock.now().await
550	}
551
552	// Used for testing live queries
553	#[allow(dead_code)]
554	pub fn get_cache(&self) -> Arc<DatastoreCache> {
555		self.cache.clone()
556	}
557
558	// Initialise the cluster and run bootstrap utilities
559	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
560	pub async fn check_version(&self) -> Result<Version, Error> {
561		let version = self.get_version().await?;
562		// Check we are running the latest version
563		if !version.is_latest() {
564			return Err(Error::OutdatedStorageVersion);
565		}
566		// Everything ok
567		Ok(version)
568	}
569
570	// Initialise the cluster and run bootstrap utilities
571	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
572	pub async fn get_version(&self) -> Result<Version, Error> {
573		// Start a new writeable transaction
574		let txn = self.transaction(Write, Pessimistic).await?.enclose();
575		// Create the key where the version is stored
576		let key = crate::key::version::new();
577		// Check if a version is already set in storage
578		let val = match catch!(txn, txn.get(key.clone(), None).await) {
579			// There is a version set in the storage
580			Some(v) => {
581				// Attempt to decode the current stored version
582				let val = TryInto::<Version>::try_into(v);
583				// Check for errors, and cancel the transaction
584				match val {
585					// There was en error getting the version
586					Err(err) => {
587						// We didn't write anything, so just rollback
588						catch!(txn, txn.cancel().await);
589						// Return the error
590						return Err(err);
591					}
592					// We could decode the version correctly
593					Ok(val) => {
594						// We didn't write anything, so just rollback
595						catch!(txn, txn.cancel().await);
596						// Return the current version
597						val
598					}
599				}
600			}
601			// There is no version set in the storage
602			None => {
603				// Fetch any keys immediately following the version key
604				let rng = crate::key::version::proceeding();
605				let keys = catch!(txn, txn.keys(rng, 1, None).await);
606				// Check the storage if there are any other keys set
607				let val = if keys.is_empty() {
608					// There are no keys set in storage, so this is a new database
609					Version::latest()
610				} else {
611					// There were keys in storage, so this is an upgrade
612					Version::v1()
613				};
614				// Convert the version to binary
615				let bytes: Vec<u8> = val.into();
616				// Attempt to set the current version in storage
617				catch!(txn, txn.replace(key, bytes).await);
618				// We set the version, so commit the transaction
619				catch!(txn, txn.commit().await);
620				// Return the current version
621				val
622			}
623		};
624		// Everything ok
625		Ok(val)
626	}
627
628	/// Setup the initial cluster access credentials
629	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
630	pub async fn initialise_credentials(&self, user: &str, pass: &str) -> Result<(), Error> {
631		// Start a new writeable transaction
632		let txn = self.transaction(Write, Optimistic).await?.enclose();
633		// Fetch the root users from the storage
634		let users = catch!(txn, txn.all_root_users().await);
635		// Process credentials, depending on existing users
636		if users.is_empty() {
637			// Display information in the logs
638			info!(target: TARGET, "Credentials were provided, and no root users were found. The root user '{user}' will be created");
639			// Create and new root user definition
640			let stm = DefineUserStatement::from((Base::Root, user, pass, INITIAL_USER_ROLE));
641			let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
642			let mut ctx = MutableContext::default();
643			ctx.set_transaction(txn.clone());
644			let ctx = ctx.freeze();
645			catch!(txn, stm.compute(&ctx, &opt, None).await);
646			// We added a user, so commit the transaction
647			txn.commit().await
648		} else {
649			// Display information in the logs
650			warn!(target: TARGET, "Credentials were provided, but existing root users were found. The root user '{user}' will not be created");
651			warn!(target: TARGET, "Consider removing the --user and --pass arguments from the server start command");
652			// We didn't write anything, so just rollback
653			txn.cancel().await
654		}
655	}
656
657	/// Initialise the cluster and run bootstrap utilities
658	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
659	pub async fn bootstrap(&self) -> Result<(), Error> {
660		// Insert this node in the cluster
661		self.insert_node(self.id).await?;
662		// Mark inactive nodes as archived
663		self.expire_nodes().await?;
664		// Remove archived nodes
665		self.remove_nodes().await?;
666		// Everything ok
667		Ok(())
668	}
669
670	/// Run the background task to update node registration information
671	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
672	pub async fn node_membership_update(&self) -> Result<(), Error> {
673		// Output function invocation details to logs
674		trace!(target: TARGET, "Updating node registration information");
675		// Update this node in the cluster
676		self.update_node(self.id).await?;
677		// Everything ok
678		Ok(())
679	}
680
681	/// Run the background task to process and archive inactive nodes
682	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
683	pub async fn node_membership_expire(&self) -> Result<(), Error> {
684		// Output function invocation details to logs
685		trace!(target: TARGET, "Processing and archiving inactive nodes");
686		// Mark expired nodes as archived
687		self.expire_nodes().await?;
688		// Everything ok
689		Ok(())
690	}
691
692	/// Run the background task to process and cleanup archived nodes
693	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
694	pub async fn node_membership_remove(&self) -> Result<(), Error> {
695		// Output function invocation details to logs
696		trace!(target: TARGET, "Processing and cleaning archived nodes");
697		// Cleanup expired nodes data
698		self.remove_nodes().await?;
699		// Everything ok
700		Ok(())
701	}
702
703	/// Run the background task to perform changefeed garbage collection
704	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
705	pub async fn changefeed_process(&self) -> Result<(), Error> {
706		// Output function invocation details to logs
707		trace!(target: TARGET, "Running changefeed garbage collection");
708		// Calculate the current system time
709		let ts = SystemTime::now()
710			.duration_since(UNIX_EPOCH)
711			.map_err(|e| {
712				Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
713			})?
714			.as_secs();
715		// Save timestamps for current versionstamps
716		self.changefeed_versionstamp(ts).await?;
717		// Garbage old changefeed data from all databases
718		self.changefeed_cleanup(ts).await?;
719		// Everything ok
720		Ok(())
721	}
722
723	/// Run the background task to perform changefeed garbage collection
724	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
725	pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> {
726		// Output function invocation details to logs
727		trace!(target: TARGET, "Running changefeed garbage collection");
728		// Save timestamps for current versionstamps
729		self.changefeed_versionstamp(ts).await?;
730		// Garbage old changefeed data from all databases
731		self.changefeed_cleanup(ts).await?;
732		// Everything ok
733		Ok(())
734	}
735
736	/// Performs a database import from SQL
737	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
738	pub async fn startup(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
739		// Output function invocation details to logs
740		trace!(target: TARGET, "Running datastore startup import script");
741		// Check if the session has expired
742		if sess.expired() {
743			return Err(Error::ExpiredSession);
744		}
745		// Execute the SQL import
746		self.execute(sql, sess, None).await
747	}
748
749	/// Run the datastore shutdown tasks, perfoming any necessary cleanup
750	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
751	pub async fn shutdown(&self) -> Result<(), Error> {
752		// Output function invocation details to logs
753		trace!(target: TARGET, "Running datastore shutdown operations");
754		// Delete this datastore from the cluster
755		self.delete_node(self.id).await?;
756		// Run any storag engine shutdown tasks
757		match self.transaction_factory.flavor.as_ref() {
758			#[cfg(feature = "kv-mem")]
759			DatastoreFlavor::Mem(v) => v.shutdown().await,
760			#[cfg(feature = "kv-rocksdb")]
761			DatastoreFlavor::RocksDB(v) => v.shutdown().await,
762			#[cfg(feature = "kv-indxdb")]
763			DatastoreFlavor::IndxDB(v) => v.shutdown().await,
764			#[cfg(feature = "kv-tikv")]
765			DatastoreFlavor::TiKV(v) => v.shutdown().await,
766			#[cfg(feature = "kv-fdb")]
767			DatastoreFlavor::FoundationDB(v) => v.shutdown().await,
768			#[cfg(feature = "kv-surrealkv")]
769			DatastoreFlavor::SurrealKV(v) => v.shutdown().await,
770			#[cfg(feature = "kv-surrealcs")]
771			DatastoreFlavor::SurrealCS(v) => v.shutdown().await,
772			#[allow(unreachable_patterns)]
773			_ => unreachable!(),
774		}
775	}
776
777	/// Create a new transaction on this datastore
778	///
779	/// ```rust,no_run
780	/// use surrealdb_core::kvs::{Datastore, TransactionType::*, LockType::*};
781	/// use surrealdb_core::err::Error;
782	///
783	/// #[tokio::main]
784	/// async fn main() -> Result<(), Error> {
785	///     let ds = Datastore::new("file://database.db").await?;
786	///     let mut tx = ds.transaction(Write, Optimistic).await?;
787	///     tx.cancel().await?;
788	///     Ok(())
789	/// }
790	/// ```
791	#[allow(unreachable_code)]
792	pub async fn transaction(
793		&self,
794		write: TransactionType,
795		lock: LockType,
796	) -> Result<Transaction, Error> {
797		self.transaction_factory.transaction(write, lock).await
798	}
799
800	/// Parse and execute an SQL query
801	///
802	/// ```rust,no_run
803	/// use surrealdb_core::kvs::Datastore;
804	/// use surrealdb_core::err::Error;
805	/// use surrealdb_core::dbs::Session;
806	///
807	/// #[tokio::main]
808	/// async fn main() -> Result<(), Error> {
809	///     let ds = Datastore::new("memory").await?;
810	///     let ses = Session::owner();
811	///     let ast = "USE NS test DB test; SELECT * FROM person;";
812	///     let res = ds.execute(ast, &ses, None).await?;
813	///     Ok(())
814	/// }
815	/// ```
816	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
817	pub async fn execute(
818		&self,
819		txt: &str,
820		sess: &Session,
821		vars: Variables,
822	) -> Result<Vec<Response>, Error> {
823		// Parse the SQL query text
824		let ast = syn::parse_with_capabilities(txt, &self.capabilities)?;
825		// Process the AST
826		self.process(ast, sess, vars).await
827	}
828
829	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
830	pub async fn execute_import<S>(
831		&self,
832		sess: &Session,
833		vars: Variables,
834		query: S,
835	) -> Result<Vec<Response>, Error>
836	where
837		S: Stream<Item = Result<Bytes, Error>>,
838	{
839		// Check if the session has expired
840		if sess.expired() {
841			return Err(Error::ExpiredSession);
842		}
843
844		// Check if anonymous actors can execute queries when auth is enabled
845		// TODO(sgirones): Check this as part of the authorisation layer
846		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
847			actor: "anonymous".to_string(),
848			action: "process".to_string(),
849			resource: "query".to_string(),
850		})?;
851
852		// Create a new query options
853		let opt = self.setup_options(sess);
854
855		// Create a default context
856		let mut ctx = self.setup_ctx()?;
857		// Start an execution context
858		sess.context(&mut ctx);
859		// Store the query variables
860		vars.attach(&mut ctx)?;
861		// Process all statements
862
863		let parser_settings = ParserSettings {
864			references_enabled: ctx
865				.get_capabilities()
866				.allows_experimental(&ExperimentalTarget::RecordReferences),
867			bearer_access_enabled: ctx
868				.get_capabilities()
869				.allows_experimental(&ExperimentalTarget::BearerAccess),
870			define_api_enabled: ctx
871				.get_capabilities()
872				.allows_experimental(&ExperimentalTarget::DefineApi),
873			..Default::default()
874		};
875		let mut statements_stream = StatementStream::new_with_settings(parser_settings);
876		let mut buffer = BytesMut::new();
877		let mut parse_size = 4096;
878		let mut bytes_stream = pin!(query);
879		let mut complete = false;
880		let mut filling = true;
881
882		let stream = futures::stream::poll_fn(move |cx| loop {
883			// fill the buffer to at least parse_size when filling is required.
884			while filling {
885				let bytes = ready!(bytes_stream.as_mut().poll_next(cx));
886				let bytes = match bytes {
887					Some(Err(e)) => return Poll::Ready(Some(Err(e))),
888					Some(Ok(x)) => x,
889					None => {
890						complete = true;
891						filling = false;
892						break;
893					}
894				};
895
896				buffer.extend_from_slice(&bytes);
897				filling = buffer.len() < parse_size
898			}
899
900			// if we finished streaming we can parse with complete so that the parser can be sure
901			// of it's results.
902			if complete {
903				return match statements_stream.parse_complete(&mut buffer) {
904					Err(e) => Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
905					Ok(None) => Poll::Ready(None),
906					Ok(Some(x)) => Poll::Ready(Some(Ok(x))),
907				};
908			}
909
910			// otherwise try to parse a single statement.
911			match statements_stream.parse_partial(&mut buffer) {
912				Err(e) => return Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
913				Ok(Some(x)) => return Poll::Ready(Some(Ok(x))),
914				Ok(None) => {
915					// Couldn't parse a statement for sure.
916					if buffer.len() >= parse_size && parse_size < u32::MAX as usize {
917						// the buffer already contained more or equal to parse_size bytes
918						// this means we are trying to parse a statement of more then buffer size.
919						// so we need to increase the buffer size.
920						parse_size = (parse_size + 1).next_power_of_two();
921					}
922					// start filling the buffer again.
923					filling = true;
924				}
925			}
926		});
927
928		Executor::execute_stream(
929			self,
930			Arc::new(ctx),
931			opt,
932			*cnf::SKIP_IMPORT_SUCCESS_RESULTS,
933			stream,
934		)
935		.await
936	}
937
938	/// Execute a pre-parsed SQL query
939	///
940	/// ```rust,no_run
941	/// use surrealdb_core::kvs::Datastore;
942	/// use surrealdb_core::err::Error;
943	/// use surrealdb_core::dbs::Session;
944	/// use surrealdb_core::sql::parse;
945	///
946	/// #[tokio::main]
947	/// async fn main() -> Result<(), Error> {
948	///     let ds = Datastore::new("memory").await?;
949	///     let ses = Session::owner();
950	///     let ast = parse("USE NS test DB test; SELECT * FROM person;")?;
951	///     let res = ds.process(ast, &ses, None).await?;
952	///     Ok(())
953	/// }
954	/// ```
955	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
956	pub async fn process(
957		&self,
958		ast: Query,
959		sess: &Session,
960		vars: Variables,
961	) -> Result<Vec<Response>, Error> {
962		// Check if the session has expired
963		if sess.expired() {
964			return Err(Error::ExpiredSession);
965		}
966		// Check if anonymous actors can execute queries when auth is enabled
967		// TODO(sgirones): Check this as part of the authorisation layer
968		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
969			actor: "anonymous".to_string(),
970			action: "process".to_string(),
971			resource: "query".to_string(),
972		})?;
973
974		// Create a new query options
975		let opt = self.setup_options(sess);
976
977		// Create a default context
978		let mut ctx = self.setup_ctx()?;
979		// Start an execution context
980		sess.context(&mut ctx);
981		// Store the query variables
982		vars.attach(&mut ctx)?;
983		// Process all statements
984		Executor::execute(self, ctx.freeze(), opt, ast).await
985	}
986
987	/// Ensure a SQL [`Value`] is fully computed
988	///
989	/// ```rust,no_run
990	/// use surrealdb_core::kvs::Datastore;
991	/// use surrealdb_core::err::Error;
992	/// use surrealdb_core::dbs::Session;
993	/// use surrealdb_core::sql::Future;
994	/// use surrealdb_core::sql::Value;
995	///
996	/// #[tokio::main]
997	/// async fn main() -> Result<(), Error> {
998	///     let ds = Datastore::new("memory").await?;
999	///     let ses = Session::owner();
1000	///     let val = Value::Future(Box::new(Future::from(Value::Bool(true))));
1001	///     let res = ds.compute(val, &ses, None).await?;
1002	///     Ok(())
1003	/// }
1004	/// ```
1005	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1006	pub async fn compute(
1007		&self,
1008		val: Value,
1009		sess: &Session,
1010		vars: Variables,
1011	) -> Result<Value, Error> {
1012		// Check if the session has expired
1013		if sess.expired() {
1014			return Err(Error::ExpiredSession);
1015		}
1016		// Check if anonymous actors can compute values when auth is enabled
1017		// TODO(sgirones): Check this as part of the authorisation layer
1018		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
1019			actor: "anonymous".to_string(),
1020			action: "compute".to_string(),
1021			resource: "value".to_string(),
1022		})?;
1023
1024		// Create a new memory stack
1025		let mut stack = TreeStack::new();
1026		// Create a new query options
1027		let opt = self.setup_options(sess);
1028		// Create a default context
1029		let mut ctx = MutableContext::default();
1030		// Set context capabilities
1031		ctx.add_capabilities(self.capabilities.clone());
1032		// Set the global query timeout
1033		if let Some(timeout) = self.query_timeout {
1034			ctx.add_timeout(timeout)?;
1035		}
1036		// Setup the notification channel
1037		if let Some(channel) = &self.notification_channel {
1038			ctx.add_notifications(Some(&channel.0));
1039		}
1040		// Start an execution context
1041		sess.context(&mut ctx);
1042		// Store the query variables
1043		vars.attach(&mut ctx)?;
1044		// Start a new transaction
1045		let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1046		// Store the transaction
1047		ctx.set_transaction(txn.clone());
1048		// Freeze the context
1049		let ctx = ctx.freeze();
1050		// Compute the value
1051		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1052		// Store any data
1053		match (res.is_ok(), val.writeable()) {
1054			// If the compute was successful, then commit if writeable
1055			(true, true) => txn.commit().await?,
1056			// Cancel if the compute was an error, or if readonly
1057			(_, _) => txn.cancel().await?,
1058		};
1059		// Return result
1060		res
1061	}
1062
1063	/// Evaluates a SQL [`Value`] without checking authenticating config
1064	/// This is used in very specific cases, where we do not need to check
1065	/// whether authentication is enabled, or guest access is disabled.
1066	/// For example, this is used when processing a record access SIGNUP or
1067	/// SIGNIN clause, which still needs to work without guest access.
1068	///
1069	/// ```rust,no_run
1070	/// use surrealdb_core::kvs::Datastore;
1071	/// use surrealdb_core::err::Error;
1072	/// use surrealdb_core::dbs::Session;
1073	/// use surrealdb_core::sql::Future;
1074	/// use surrealdb_core::sql::Value;
1075	///
1076	/// #[tokio::main]
1077	/// async fn main() -> Result<(), Error> {
1078	///     let ds = Datastore::new("memory").await?;
1079	///     let ses = Session::owner();
1080	///     let val = Value::Future(Box::new(Future::from(Value::Bool(true))));
1081	///     let res = ds.evaluate(&val, &ses, None).await?;
1082	///     Ok(())
1083	/// }
1084	/// ```
1085	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1086	pub async fn evaluate(
1087		&self,
1088		val: &Value,
1089		sess: &Session,
1090		vars: Variables,
1091	) -> Result<Value, Error> {
1092		// Check if the session has expired
1093		if sess.expired() {
1094			return Err(Error::ExpiredSession);
1095		}
1096		// Create a new memory stack
1097		let mut stack = TreeStack::new();
1098		// Create a new query options
1099		let opt = self.setup_options(sess);
1100		// Create a default context
1101		let mut ctx = MutableContext::default();
1102		// Set context capabilities
1103		ctx.add_capabilities(self.capabilities.clone());
1104		// Set the global query timeout
1105		if let Some(timeout) = self.query_timeout {
1106			ctx.add_timeout(timeout)?;
1107		}
1108		// Setup the notification channel
1109		if let Some(channel) = &self.notification_channel {
1110			ctx.add_notifications(Some(&channel.0));
1111		}
1112		// Start an execution context
1113		sess.context(&mut ctx);
1114		// Store the query variables
1115		vars.attach(&mut ctx)?;
1116		// Start a new transaction
1117		let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1118		// Store the transaction
1119		ctx.set_transaction(txn.clone());
1120		// Freeze the context
1121		let ctx = ctx.freeze();
1122		// Compute the value
1123		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1124		// Store any data
1125		match (res.is_ok(), val.writeable()) {
1126			// If the compute was successful, then commit if writeable
1127			(true, true) => txn.commit().await?,
1128			// Cancel if the compute was an error, or if readonly
1129			(_, _) => txn.cancel().await?,
1130		};
1131		// Return result
1132		res
1133	}
1134
1135	/// Subscribe to live notifications
1136	///
1137	/// ```rust,no_run
1138	/// use surrealdb_core::kvs::Datastore;
1139	/// use surrealdb_core::err::Error;
1140	/// use surrealdb_core::dbs::Session;
1141	///
1142	/// #[tokio::main]
1143	/// async fn main() -> Result<(), Error> {
1144	///     let ds = Datastore::new("memory").await?.with_notifications();
1145	///     let ses = Session::owner();
1146	/// 	if let Some(channel) = ds.notifications() {
1147	///     	while let Ok(v) = channel.recv().await {
1148	///     	    println!("Received notification: {v}");
1149	///     	}
1150	/// 	}
1151	///     Ok(())
1152	/// }
1153	/// ```
1154	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1155	pub fn notifications(&self) -> Option<Receiver<Notification>> {
1156		self.notification_channel.as_ref().map(|v| v.1.clone())
1157	}
1158
1159	/// Performs a database import from SQL
1160	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1161	pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
1162		// Check if the session has expired
1163		if sess.expired() {
1164			return Err(Error::ExpiredSession);
1165		}
1166		// Execute the SQL import
1167		self.execute(sql, sess, None).await
1168	}
1169
1170	/// Performs a database import from SQL
1171	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1172	pub async fn import_stream<S>(&self, sess: &Session, stream: S) -> Result<Vec<Response>, Error>
1173	where
1174		S: Stream<Item = Result<Bytes, Error>>,
1175	{
1176		// Check if the session has expired
1177		if sess.expired() {
1178			return Err(Error::ExpiredSession);
1179		}
1180		// Execute the SQL import
1181		self.execute_import(sess, None, stream).await
1182	}
1183
1184	/// Performs a full database export as SQL
1185	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1186	pub async fn export(
1187		&self,
1188		sess: &Session,
1189		chn: Sender<Vec<u8>>,
1190	) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1191		// Create a default export config
1192		let cfg = super::export::Config::default();
1193		self.export_with_config(sess, chn, cfg).await
1194	}
1195
1196	/// Performs a full database export as SQL
1197	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1198	pub async fn export_with_config(
1199		&self,
1200		sess: &Session,
1201		chn: Sender<Vec<u8>>,
1202		cfg: export::Config,
1203	) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1204		// Check if the session has expired
1205		if sess.expired() {
1206			return Err(Error::ExpiredSession);
1207		}
1208		// Retrieve the provided NS and DB
1209		let (ns, db) = crate::iam::check::check_ns_db(sess)?;
1210		// Create a new readonly transaction
1211		let txn = self.transaction(Read, Optimistic).await?;
1212		// Return an async export job
1213		Ok(async move {
1214			// Process the export
1215			txn.export(&ns, &db, cfg, chn).await?;
1216			// Everything ok
1217			Ok(())
1218		})
1219	}
1220
1221	/// Checks the required permissions level for this session
1222	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self, sess))]
1223	pub fn check(&self, sess: &Session, action: Action, resource: Resource) -> Result<(), Error> {
1224		// Check if the session has expired
1225		if sess.expired() {
1226			return Err(Error::ExpiredSession);
1227		}
1228		// Skip auth for Anonymous users if auth is disabled
1229		let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
1230		if !skip_auth {
1231			sess.au.is_allowed(action, &resource)?;
1232		}
1233		// All ok
1234		Ok(())
1235	}
1236
1237	pub fn setup_options(&self, sess: &Session) -> Options {
1238		Options::default()
1239			.with_id(self.id)
1240			.with_ns(sess.ns())
1241			.with_db(sess.db())
1242			.with_live(sess.live())
1243			.with_auth(sess.au.clone())
1244			.with_strict(self.strict)
1245			.with_auth_enabled(self.auth_enabled)
1246	}
1247	pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
1248		let mut ctx = MutableContext::from_ds(
1249			self.query_timeout,
1250			self.capabilities.clone(),
1251			self.index_stores.clone(),
1252			self.cache.clone(),
1253			#[cfg(not(target_family = "wasm"))]
1254			self.index_builder.clone(),
1255			#[cfg(storage)]
1256			self.temporary_directory.clone(),
1257		)?;
1258		// Setup the notification channel
1259		if let Some(channel) = &self.notification_channel {
1260			ctx.add_notifications(Some(&channel.0));
1261		}
1262		Ok(ctx)
1263	}
1264
1265	/// check for disallowed anonymous users
1266	pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
1267		if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
1268			Err(IamError::NotAllowed {
1269				actor: "anonymous".to_string(),
1270				action: String::new(),
1271				resource: String::new(),
1272			})
1273		} else {
1274			Ok(())
1275		}
1276	}
1277}
1278
1279#[cfg(test)]
1280mod test {
1281	use super::*;
1282
1283	#[tokio::test]
1284	pub async fn very_deep_query() -> Result<(), Error> {
1285		use crate::kvs::Datastore;
1286		use crate::sql::{Expression, Future, Number, Operator, Value};
1287		use reblessive::{Stack, Stk};
1288
1289		// build query manually to bypass query limits.
1290		let mut stack = Stack::new();
1291		async fn build_query(stk: &mut Stk, depth: usize) -> Value {
1292			if depth == 0 {
1293				Value::Expression(Box::new(Expression::Binary {
1294					l: Value::Number(Number::Int(1)),
1295					o: Operator::Add,
1296					r: Value::Number(Number::Int(1)),
1297				}))
1298			} else {
1299				let q = stk.run(|stk| build_query(stk, depth - 1)).await;
1300				Value::Future(Box::new(Future::from(q)))
1301			}
1302		}
1303		let val = stack.enter(|stk| build_query(stk, 1000)).finish();
1304
1305		let dbs = Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
1306
1307		let opt = Options::default()
1308			.with_id(dbs.id)
1309			.with_ns(Some("test".into()))
1310			.with_db(Some("test".into()))
1311			.with_live(false)
1312			.with_strict(false)
1313			.with_auth_enabled(false)
1314			.with_max_computation_depth(u32::MAX)
1315			.with_futures(true);
1316
1317		// Create a default context
1318		let mut ctx = MutableContext::default();
1319		// Set context capabilities
1320		ctx.add_capabilities(dbs.capabilities.clone());
1321		// Start a new transaction
1322		let txn = dbs.transaction(val.writeable().into(), Optimistic).await?;
1323		// Store the transaction
1324		ctx.set_transaction(txn.enclose());
1325		// Freeze the context
1326		let ctx = ctx.freeze();
1327		// Compute the value
1328		let mut stack = reblessive::tree::TreeStack::new();
1329		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
1330		assert_eq!(res, Value::Number(Number::Int(2)));
1331		Ok(())
1332	}
1333}