1use super::export;
2use super::tr::Transactor;
3use super::tx::Transaction;
4use super::version::Version;
5use crate::ctx::MutableContext;
6#[cfg(feature = "jwks")]
7use crate::dbs::capabilities::NetTarget;
8use crate::dbs::capabilities::{
9 ArbitraryQueryTarget, ExperimentalTarget, MethodTarget, RouteTarget,
10};
11use crate::dbs::node::Timestamp;
12use crate::dbs::{
13 Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
14};
15use crate::err::Error;
16#[cfg(feature = "jwks")]
17use crate::iam::jwks::JwksCache;
18use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
19use crate::idx::trees::store::IndexStores;
20use crate::kvs::cache::ds::DatastoreCache;
21use crate::kvs::clock::SizedClock;
22#[allow(unused_imports)]
23use crate::kvs::clock::SystemClock;
24#[cfg(not(target_family = "wasm"))]
25use crate::kvs::index::IndexBuilder;
26use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
27use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
28use crate::syn;
29use crate::syn::parser::{ParserSettings, StatementStream};
30use crate::{cf, cnf};
31use async_channel::{Receiver, Sender};
32use bytes::{Bytes, BytesMut};
33use futures::{Future, Stream};
34use reblessive::TreeStack;
35use std::fmt;
36#[cfg(storage)]
37use std::path::PathBuf;
38use std::pin::pin;
39use std::sync::Arc;
40use std::task::{ready, Poll};
41use std::time::Duration;
42#[cfg(not(target_family = "wasm"))]
43use std::time::{SystemTime, UNIX_EPOCH};
44#[cfg(feature = "jwks")]
45use tokio::sync::RwLock;
46use tracing::instrument;
47use tracing::trace;
48use uuid::Uuid;
49#[cfg(target_family = "wasm")]
50use wasmtimer::std::{SystemTime, UNIX_EPOCH};
51
52const TARGET: &str = "surrealdb::core::kvs::ds";
53
54const LQ_CHANNEL_SIZE: usize = 15_000;
56
57const INITIAL_USER_ROLE: &str = "owner";
59
60#[allow(dead_code)]
62#[non_exhaustive]
63pub struct Datastore {
64 transaction_factory: TransactionFactory,
65 id: Uuid,
67 strict: bool,
69 auth_enabled: bool,
71 query_timeout: Option<Duration>,
73 transaction_timeout: Option<Duration>,
75 capabilities: Arc<Capabilities>,
77 notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
79 index_stores: IndexStores,
81 cache: Arc<DatastoreCache>,
83 #[cfg(not(target_family = "wasm"))]
85 index_builder: IndexBuilder,
86 #[cfg(feature = "jwks")]
87 jwks_cache: Arc<RwLock<JwksCache>>,
89 #[cfg(storage)]
90 temporary_directory: Option<Arc<PathBuf>>,
92}
93
94#[derive(Clone)]
95pub(super) struct TransactionFactory {
96 clock: Arc<SizedClock>,
98 flavor: Arc<DatastoreFlavor>,
100}
101
102impl TransactionFactory {
103 #[allow(unreachable_code)]
104 pub async fn transaction(
105 &self,
106 write: TransactionType,
107 lock: LockType,
108 ) -> Result<Transaction, Error> {
109 #[allow(unused_variables)]
111 let write = match write {
112 Read => false,
113 Write => true,
114 };
115 #[allow(unused_variables)]
117 let lock = match lock {
118 Pessimistic => true,
119 Optimistic => false,
120 };
121 #[allow(unused_variables)]
123 let (inner, local) = match self.flavor.as_ref() {
124 #[cfg(feature = "kv-mem")]
125 DatastoreFlavor::Mem(v) => {
126 let tx = v.transaction(write, lock).await?;
127 (super::tr::Inner::Mem(tx), true)
128 }
129 #[cfg(feature = "kv-rocksdb")]
130 DatastoreFlavor::RocksDB(v) => {
131 let tx = v.transaction(write, lock).await?;
132 (super::tr::Inner::RocksDB(tx), true)
133 }
134 #[cfg(feature = "kv-indxdb")]
135 DatastoreFlavor::IndxDB(v) => {
136 let tx = v.transaction(write, lock).await?;
137 (super::tr::Inner::IndxDB(tx), true)
138 }
139 #[cfg(feature = "kv-tikv")]
140 DatastoreFlavor::TiKV(v) => {
141 let tx = v.transaction(write, lock).await?;
142 (super::tr::Inner::TiKV(tx), false)
143 }
144 #[cfg(feature = "kv-fdb")]
145 DatastoreFlavor::FoundationDB(v) => {
146 let tx = v.transaction(write, lock).await?;
147 (super::tr::Inner::FoundationDB(tx), false)
148 }
149 #[cfg(feature = "kv-surrealkv")]
150 DatastoreFlavor::SurrealKV(v) => {
151 let tx = v.transaction(write, lock).await?;
152 (super::tr::Inner::SurrealKV(tx), true)
153 }
154 #[cfg(feature = "kv-surrealcs")]
155 DatastoreFlavor::SurrealCS(v) => {
156 let tx = v.transaction(write, lock).await?;
157 (super::tr::Inner::SurrealCS(tx), false)
158 }
159 #[allow(unreachable_patterns)]
160 _ => unreachable!(),
161 };
162 Ok(Transaction::new(
163 local,
164 Transactor {
165 inner,
166 stash: super::stash::Stash::default(),
167 cf: cf::Writer::new(),
168 clock: self.clock.clone(),
169 },
170 ))
171 }
172}
173
174#[allow(clippy::large_enum_variant)]
175pub(super) enum DatastoreFlavor {
176 #[cfg(feature = "kv-mem")]
177 Mem(super::mem::Datastore),
178 #[cfg(feature = "kv-rocksdb")]
179 RocksDB(super::rocksdb::Datastore),
180 #[cfg(feature = "kv-indxdb")]
181 IndxDB(super::indxdb::Datastore),
182 #[cfg(feature = "kv-tikv")]
183 TiKV(super::tikv::Datastore),
184 #[cfg(feature = "kv-fdb")]
185 FoundationDB(super::fdb::Datastore),
186 #[cfg(feature = "kv-surrealkv")]
187 SurrealKV(super::surrealkv::Datastore),
188 #[cfg(feature = "kv-surrealcs")]
189 SurrealCS(super::surrealcs::Datastore),
190}
191
192impl fmt::Display for Datastore {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 #![allow(unused_variables)]
195 match self.transaction_factory.flavor.as_ref() {
196 #[cfg(feature = "kv-mem")]
197 DatastoreFlavor::Mem(_) => write!(f, "memory"),
198 #[cfg(feature = "kv-rocksdb")]
199 DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
200 #[cfg(feature = "kv-indxdb")]
201 DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
202 #[cfg(feature = "kv-tikv")]
203 DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
204 #[cfg(feature = "kv-fdb")]
205 DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
206 #[cfg(feature = "kv-surrealkv")]
207 DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
208 #[cfg(feature = "kv-surrealcs")]
209 DatastoreFlavor::SurrealCS(_) => write!(f, "surrealcs"),
210 #[allow(unreachable_patterns)]
211 _ => unreachable!(),
212 }
213 }
214}
215
216impl Datastore {
217 pub async fn new(path: &str) -> Result<Self, Error> {
255 Self::new_with_clock(path, None).await
256 }
257
258 #[allow(unused_variables)]
259 pub async fn new_with_clock(
260 path: &str,
261 clock: Option<Arc<SizedClock>>,
262 ) -> Result<Datastore, Error> {
263 let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
265 "memory" => {
267 #[cfg(feature = "kv-mem")]
268 {
269 info!(target: TARGET, "Starting kvs store in {}", path);
271 let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
272 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
273 info!(target: TARGET, "Started kvs store in {}", path);
274 Ok((v, c))
275 }
276 #[cfg(not(feature = "kv-mem"))]
277 return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
278 }
279 s if s.starts_with("file:") => {
281 #[cfg(feature = "kv-rocksdb")]
282 {
283 super::threadpool::initialise();
285 info!(target: TARGET, "Starting kvs store at {}", path);
287 warn!("file:// is deprecated, please use surrealkv:// or rocksdb://");
288 let s = s.trim_start_matches("file://");
289 let s = s.trim_start_matches("file:");
290 let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
291 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
292 info!(target: TARGET, "Started kvs store at {}", path);
293 Ok((v, c))
294 }
295 #[cfg(not(feature = "kv-rocksdb"))]
296 return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
297 }
298 s if s.starts_with("rocksdb:") => {
300 #[cfg(feature = "kv-rocksdb")]
301 {
302 super::threadpool::initialise();
304 info!(target: TARGET, "Starting kvs store at {}", path);
306 let s = s.trim_start_matches("rocksdb://");
307 let s = s.trim_start_matches("rocksdb:");
308 let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
309 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
310 info!(target: TARGET, "Started kvs store at {}", path);
311 Ok((v, c))
312 }
313 #[cfg(not(feature = "kv-rocksdb"))]
314 return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
315 }
316 s if s.starts_with("surrealkv") => {
318 #[cfg(feature = "kv-surrealkv")]
319 {
320 super::threadpool::initialise();
322 info!(target: TARGET, "Starting kvs store at {}", s);
324 let (path, enable_versions) =
325 super::surrealkv::Datastore::parse_start_string(s)?;
326 let v = super::surrealkv::Datastore::new(path, enable_versions)
327 .await
328 .map(DatastoreFlavor::SurrealKV);
329 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
330 info!(target: TARGET, "Started kvs store at {} with versions {}", path, if enable_versions { "enabled" } else { "disabled" });
331 Ok((v, c))
332 }
333 #[cfg(not(feature = "kv-surrealkv"))]
334 return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
335 }
336 s if s.starts_with("surrealcs:") => {
338 #[cfg(feature = "kv-surrealcs")]
339 {
340 info!(target: TARGET, "Starting kvs store at {}", path);
341 let s = s.trim_start_matches("surrealcs://");
342 let s = s.trim_start_matches("surrealcs:");
343 let v =
344 super::surrealcs::Datastore::new(s).await.map(DatastoreFlavor::SurrealCS);
345 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
346 info!(target: TARGET, "Started kvs store at {}", path);
347 Ok((v, c))
348 }
349 #[cfg(not(feature = "kv-surrealcs"))]
350 return Err(Error::Ds("Cannot connect to the `surrealcs` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
351 }
352 s if s.starts_with("indxdb:") => {
354 #[cfg(feature = "kv-indxdb")]
355 {
356 info!(target: TARGET, "Starting kvs store at {}", path);
357 let s = s.trim_start_matches("indxdb://");
358 let s = s.trim_start_matches("indxdb:");
359 let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
360 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
361 info!(target: TARGET, "Started kvs store at {}", path);
362 Ok((v, c))
363 }
364 #[cfg(not(feature = "kv-indxdb"))]
365 return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
366 }
367 s if s.starts_with("tikv:") => {
369 #[cfg(feature = "kv-tikv")]
370 {
371 info!(target: TARGET, "Connecting to kvs store at {}", path);
372 let s = s.trim_start_matches("tikv://");
373 let s = s.trim_start_matches("tikv:");
374 let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
375 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
376 info!(target: TARGET, "Connected to kvs store at {}", path);
377 Ok((v, c))
378 }
379 #[cfg(not(feature = "kv-tikv"))]
380 return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
381 }
382 s if s.starts_with("fdb:") => {
384 #[cfg(feature = "kv-fdb")]
385 {
386 info!(target: TARGET, "Connecting to kvs store at {}", path);
387 let s = s.trim_start_matches("fdb://");
388 let s = s.trim_start_matches("fdb:");
389 let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
390 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
391 info!(target: TARGET, "Connected to kvs store at {}", path);
392 Ok((v, c))
393 }
394 #[cfg(not(feature = "kv-fdb"))]
395 return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
396 }
397 _ => {
399 info!(target: TARGET, "Unable to load the specified datastore {}", path);
400 Err(Error::Ds("Unable to load the specified datastore".into()))
401 }
402 }?;
403 flavor.map(|flavor| {
405 let tf = TransactionFactory {
406 clock,
407 flavor: Arc::new(flavor),
408 };
409 Self {
410 id: Uuid::new_v4(),
411 transaction_factory: tf.clone(),
412 strict: false,
413 auth_enabled: false,
414 query_timeout: None,
415 transaction_timeout: None,
416 notification_channel: None,
417 capabilities: Arc::new(Capabilities::default()),
418 index_stores: IndexStores::default(),
419 #[cfg(not(target_family = "wasm"))]
420 index_builder: IndexBuilder::new(tf),
421 #[cfg(feature = "jwks")]
422 jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
423 #[cfg(storage)]
424 temporary_directory: None,
425 cache: Arc::new(DatastoreCache::new()),
426 }
427 })
428 }
429
430 #[allow(dead_code)]
433 pub fn restart(self) -> Self {
434 Self {
435 id: self.id,
436 strict: self.strict,
437 auth_enabled: self.auth_enabled,
438 query_timeout: self.query_timeout,
439 transaction_timeout: self.transaction_timeout,
440 capabilities: self.capabilities,
441 notification_channel: self.notification_channel,
442 index_stores: Default::default(),
443 #[cfg(not(target_family = "wasm"))]
444 index_builder: IndexBuilder::new(self.transaction_factory.clone()),
445 #[cfg(feature = "jwks")]
446 jwks_cache: Arc::new(Default::default()),
447 #[cfg(storage)]
448 temporary_directory: self.temporary_directory,
449 transaction_factory: self.transaction_factory,
450 cache: Arc::new(DatastoreCache::new()),
451 }
452 }
453
454 pub fn with_node_id(mut self, id: Uuid) -> Self {
456 self.id = id;
457 self
458 }
459
460 pub fn with_strict_mode(mut self, strict: bool) -> Self {
462 self.strict = strict;
463 self
464 }
465
466 pub fn with_notifications(mut self) -> Self {
468 self.notification_channel = Some(async_channel::bounded(LQ_CHANNEL_SIZE));
469 self
470 }
471
472 pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
474 self.query_timeout = duration;
475 self
476 }
477
478 pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
480 self.transaction_timeout = duration;
481 self
482 }
483
484 pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
486 self.auth_enabled = enabled;
487 self
488 }
489
490 pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
492 self.capabilities = Arc::new(caps);
493 self
494 }
495
496 #[cfg(storage)]
497 pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
499 self.temporary_directory = path.map(Arc::new);
500 self
501 }
502
503 pub fn index_store(&self) -> &IndexStores {
504 &self.index_stores
505 }
506
507 pub fn is_auth_enabled(&self) -> bool {
509 self.auth_enabled
510 }
511
512 pub fn id(&self) -> Uuid {
513 self.id
514 }
515
516 pub(crate) fn allows_rpc_method(&self, method_target: &MethodTarget) -> bool {
518 self.capabilities.allows_rpc_method(method_target)
519 }
520
521 pub fn allows_http_route(&self, route_target: &RouteTarget) -> bool {
524 self.capabilities.allows_http_route(route_target)
525 }
526
527 pub fn allows_query_by_subject(&self, subject: impl Into<ArbitraryQueryTarget>) -> bool {
529 self.capabilities.allows_query(&subject.into())
530 }
531
532 #[cfg(feature = "jwks")]
534 pub(crate) fn allows_network_target(&self, net_target: &NetTarget) -> bool {
535 self.capabilities.allows_network_target(net_target)
536 }
537
538 pub fn get_capabilities(&self) -> &Capabilities {
540 &self.capabilities
541 }
542
543 #[cfg(feature = "jwks")]
544 pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
545 &self.jwks_cache
546 }
547
548 pub(super) async fn clock_now(&self) -> Timestamp {
549 self.transaction_factory.clock.now().await
550 }
551
552 #[allow(dead_code)]
554 pub fn get_cache(&self) -> Arc<DatastoreCache> {
555 self.cache.clone()
556 }
557
558 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
560 pub async fn check_version(&self) -> Result<Version, Error> {
561 let version = self.get_version().await?;
562 if !version.is_latest() {
564 return Err(Error::OutdatedStorageVersion);
565 }
566 Ok(version)
568 }
569
570 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
572 pub async fn get_version(&self) -> Result<Version, Error> {
573 let txn = self.transaction(Write, Pessimistic).await?.enclose();
575 let key = crate::key::version::new();
577 let val = match catch!(txn, txn.get(key.clone(), None).await) {
579 Some(v) => {
581 let val = TryInto::<Version>::try_into(v);
583 match val {
585 Err(err) => {
587 catch!(txn, txn.cancel().await);
589 return Err(err);
591 }
592 Ok(val) => {
594 catch!(txn, txn.cancel().await);
596 val
598 }
599 }
600 }
601 None => {
603 let rng = crate::key::version::proceeding();
605 let keys = catch!(txn, txn.keys(rng, 1, None).await);
606 let val = if keys.is_empty() {
608 Version::latest()
610 } else {
611 Version::v1()
613 };
614 let bytes: Vec<u8> = val.into();
616 catch!(txn, txn.replace(key, bytes).await);
618 catch!(txn, txn.commit().await);
620 val
622 }
623 };
624 Ok(val)
626 }
627
628 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
630 pub async fn initialise_credentials(&self, user: &str, pass: &str) -> Result<(), Error> {
631 let txn = self.transaction(Write, Optimistic).await?.enclose();
633 let users = catch!(txn, txn.all_root_users().await);
635 if users.is_empty() {
637 info!(target: TARGET, "Credentials were provided, and no root users were found. The root user '{user}' will be created");
639 let stm = DefineUserStatement::from((Base::Root, user, pass, INITIAL_USER_ROLE));
641 let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
642 let mut ctx = MutableContext::default();
643 ctx.set_transaction(txn.clone());
644 let ctx = ctx.freeze();
645 catch!(txn, stm.compute(&ctx, &opt, None).await);
646 txn.commit().await
648 } else {
649 warn!(target: TARGET, "Credentials were provided, but existing root users were found. The root user '{user}' will not be created");
651 warn!(target: TARGET, "Consider removing the --user and --pass arguments from the server start command");
652 txn.cancel().await
654 }
655 }
656
657 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
659 pub async fn bootstrap(&self) -> Result<(), Error> {
660 self.insert_node(self.id).await?;
662 self.expire_nodes().await?;
664 self.remove_nodes().await?;
666 Ok(())
668 }
669
670 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
672 pub async fn node_membership_update(&self) -> Result<(), Error> {
673 trace!(target: TARGET, "Updating node registration information");
675 self.update_node(self.id).await?;
677 Ok(())
679 }
680
681 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
683 pub async fn node_membership_expire(&self) -> Result<(), Error> {
684 trace!(target: TARGET, "Processing and archiving inactive nodes");
686 self.expire_nodes().await?;
688 Ok(())
690 }
691
692 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
694 pub async fn node_membership_remove(&self) -> Result<(), Error> {
695 trace!(target: TARGET, "Processing and cleaning archived nodes");
697 self.remove_nodes().await?;
699 Ok(())
701 }
702
703 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
705 pub async fn changefeed_process(&self) -> Result<(), Error> {
706 trace!(target: TARGET, "Running changefeed garbage collection");
708 let ts = SystemTime::now()
710 .duration_since(UNIX_EPOCH)
711 .map_err(|e| {
712 Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
713 })?
714 .as_secs();
715 self.changefeed_versionstamp(ts).await?;
717 self.changefeed_cleanup(ts).await?;
719 Ok(())
721 }
722
723 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
725 pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> {
726 trace!(target: TARGET, "Running changefeed garbage collection");
728 self.changefeed_versionstamp(ts).await?;
730 self.changefeed_cleanup(ts).await?;
732 Ok(())
734 }
735
736 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
738 pub async fn startup(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
739 trace!(target: TARGET, "Running datastore startup import script");
741 if sess.expired() {
743 return Err(Error::ExpiredSession);
744 }
745 self.execute(sql, sess, None).await
747 }
748
749 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
751 pub async fn shutdown(&self) -> Result<(), Error> {
752 trace!(target: TARGET, "Running datastore shutdown operations");
754 self.delete_node(self.id).await?;
756 match self.transaction_factory.flavor.as_ref() {
758 #[cfg(feature = "kv-mem")]
759 DatastoreFlavor::Mem(v) => v.shutdown().await,
760 #[cfg(feature = "kv-rocksdb")]
761 DatastoreFlavor::RocksDB(v) => v.shutdown().await,
762 #[cfg(feature = "kv-indxdb")]
763 DatastoreFlavor::IndxDB(v) => v.shutdown().await,
764 #[cfg(feature = "kv-tikv")]
765 DatastoreFlavor::TiKV(v) => v.shutdown().await,
766 #[cfg(feature = "kv-fdb")]
767 DatastoreFlavor::FoundationDB(v) => v.shutdown().await,
768 #[cfg(feature = "kv-surrealkv")]
769 DatastoreFlavor::SurrealKV(v) => v.shutdown().await,
770 #[cfg(feature = "kv-surrealcs")]
771 DatastoreFlavor::SurrealCS(v) => v.shutdown().await,
772 #[allow(unreachable_patterns)]
773 _ => unreachable!(),
774 }
775 }
776
777 #[allow(unreachable_code)]
792 pub async fn transaction(
793 &self,
794 write: TransactionType,
795 lock: LockType,
796 ) -> Result<Transaction, Error> {
797 self.transaction_factory.transaction(write, lock).await
798 }
799
800 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
817 pub async fn execute(
818 &self,
819 txt: &str,
820 sess: &Session,
821 vars: Variables,
822 ) -> Result<Vec<Response>, Error> {
823 let ast = syn::parse_with_capabilities(txt, &self.capabilities)?;
825 self.process(ast, sess, vars).await
827 }
828
829 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
830 pub async fn execute_import<S>(
831 &self,
832 sess: &Session,
833 vars: Variables,
834 query: S,
835 ) -> Result<Vec<Response>, Error>
836 where
837 S: Stream<Item = Result<Bytes, Error>>,
838 {
839 if sess.expired() {
841 return Err(Error::ExpiredSession);
842 }
843
844 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
847 actor: "anonymous".to_string(),
848 action: "process".to_string(),
849 resource: "query".to_string(),
850 })?;
851
852 let opt = self.setup_options(sess);
854
855 let mut ctx = self.setup_ctx()?;
857 sess.context(&mut ctx);
859 vars.attach(&mut ctx)?;
861 let parser_settings = ParserSettings {
864 references_enabled: ctx
865 .get_capabilities()
866 .allows_experimental(&ExperimentalTarget::RecordReferences),
867 bearer_access_enabled: ctx
868 .get_capabilities()
869 .allows_experimental(&ExperimentalTarget::BearerAccess),
870 define_api_enabled: ctx
871 .get_capabilities()
872 .allows_experimental(&ExperimentalTarget::DefineApi),
873 ..Default::default()
874 };
875 let mut statements_stream = StatementStream::new_with_settings(parser_settings);
876 let mut buffer = BytesMut::new();
877 let mut parse_size = 4096;
878 let mut bytes_stream = pin!(query);
879 let mut complete = false;
880 let mut filling = true;
881
882 let stream = futures::stream::poll_fn(move |cx| loop {
883 while filling {
885 let bytes = ready!(bytes_stream.as_mut().poll_next(cx));
886 let bytes = match bytes {
887 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
888 Some(Ok(x)) => x,
889 None => {
890 complete = true;
891 filling = false;
892 break;
893 }
894 };
895
896 buffer.extend_from_slice(&bytes);
897 filling = buffer.len() < parse_size
898 }
899
900 if complete {
903 return match statements_stream.parse_complete(&mut buffer) {
904 Err(e) => Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
905 Ok(None) => Poll::Ready(None),
906 Ok(Some(x)) => Poll::Ready(Some(Ok(x))),
907 };
908 }
909
910 match statements_stream.parse_partial(&mut buffer) {
912 Err(e) => return Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
913 Ok(Some(x)) => return Poll::Ready(Some(Ok(x))),
914 Ok(None) => {
915 if buffer.len() >= parse_size && parse_size < u32::MAX as usize {
917 parse_size = (parse_size + 1).next_power_of_two();
921 }
922 filling = true;
924 }
925 }
926 });
927
928 Executor::execute_stream(
929 self,
930 Arc::new(ctx),
931 opt,
932 *cnf::SKIP_IMPORT_SUCCESS_RESULTS,
933 stream,
934 )
935 .await
936 }
937
938 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
956 pub async fn process(
957 &self,
958 ast: Query,
959 sess: &Session,
960 vars: Variables,
961 ) -> Result<Vec<Response>, Error> {
962 if sess.expired() {
964 return Err(Error::ExpiredSession);
965 }
966 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
969 actor: "anonymous".to_string(),
970 action: "process".to_string(),
971 resource: "query".to_string(),
972 })?;
973
974 let opt = self.setup_options(sess);
976
977 let mut ctx = self.setup_ctx()?;
979 sess.context(&mut ctx);
981 vars.attach(&mut ctx)?;
983 Executor::execute(self, ctx.freeze(), opt, ast).await
985 }
986
987 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1006 pub async fn compute(
1007 &self,
1008 val: Value,
1009 sess: &Session,
1010 vars: Variables,
1011 ) -> Result<Value, Error> {
1012 if sess.expired() {
1014 return Err(Error::ExpiredSession);
1015 }
1016 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
1019 actor: "anonymous".to_string(),
1020 action: "compute".to_string(),
1021 resource: "value".to_string(),
1022 })?;
1023
1024 let mut stack = TreeStack::new();
1026 let opt = self.setup_options(sess);
1028 let mut ctx = MutableContext::default();
1030 ctx.add_capabilities(self.capabilities.clone());
1032 if let Some(timeout) = self.query_timeout {
1034 ctx.add_timeout(timeout)?;
1035 }
1036 if let Some(channel) = &self.notification_channel {
1038 ctx.add_notifications(Some(&channel.0));
1039 }
1040 sess.context(&mut ctx);
1042 vars.attach(&mut ctx)?;
1044 let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1046 ctx.set_transaction(txn.clone());
1048 let ctx = ctx.freeze();
1050 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1052 match (res.is_ok(), val.writeable()) {
1054 (true, true) => txn.commit().await?,
1056 (_, _) => txn.cancel().await?,
1058 };
1059 res
1061 }
1062
1063 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1086 pub async fn evaluate(
1087 &self,
1088 val: &Value,
1089 sess: &Session,
1090 vars: Variables,
1091 ) -> Result<Value, Error> {
1092 if sess.expired() {
1094 return Err(Error::ExpiredSession);
1095 }
1096 let mut stack = TreeStack::new();
1098 let opt = self.setup_options(sess);
1100 let mut ctx = MutableContext::default();
1102 ctx.add_capabilities(self.capabilities.clone());
1104 if let Some(timeout) = self.query_timeout {
1106 ctx.add_timeout(timeout)?;
1107 }
1108 if let Some(channel) = &self.notification_channel {
1110 ctx.add_notifications(Some(&channel.0));
1111 }
1112 sess.context(&mut ctx);
1114 vars.attach(&mut ctx)?;
1116 let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1118 ctx.set_transaction(txn.clone());
1120 let ctx = ctx.freeze();
1122 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1124 match (res.is_ok(), val.writeable()) {
1126 (true, true) => txn.commit().await?,
1128 (_, _) => txn.cancel().await?,
1130 };
1131 res
1133 }
1134
1135 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1155 pub fn notifications(&self) -> Option<Receiver<Notification>> {
1156 self.notification_channel.as_ref().map(|v| v.1.clone())
1157 }
1158
1159 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1161 pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
1162 if sess.expired() {
1164 return Err(Error::ExpiredSession);
1165 }
1166 self.execute(sql, sess, None).await
1168 }
1169
1170 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1172 pub async fn import_stream<S>(&self, sess: &Session, stream: S) -> Result<Vec<Response>, Error>
1173 where
1174 S: Stream<Item = Result<Bytes, Error>>,
1175 {
1176 if sess.expired() {
1178 return Err(Error::ExpiredSession);
1179 }
1180 self.execute_import(sess, None, stream).await
1182 }
1183
1184 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1186 pub async fn export(
1187 &self,
1188 sess: &Session,
1189 chn: Sender<Vec<u8>>,
1190 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1191 let cfg = super::export::Config::default();
1193 self.export_with_config(sess, chn, cfg).await
1194 }
1195
1196 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1198 pub async fn export_with_config(
1199 &self,
1200 sess: &Session,
1201 chn: Sender<Vec<u8>>,
1202 cfg: export::Config,
1203 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1204 if sess.expired() {
1206 return Err(Error::ExpiredSession);
1207 }
1208 let (ns, db) = crate::iam::check::check_ns_db(sess)?;
1210 let txn = self.transaction(Read, Optimistic).await?;
1212 Ok(async move {
1214 txn.export(&ns, &db, cfg, chn).await?;
1216 Ok(())
1218 })
1219 }
1220
1221 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self, sess))]
1223 pub fn check(&self, sess: &Session, action: Action, resource: Resource) -> Result<(), Error> {
1224 if sess.expired() {
1226 return Err(Error::ExpiredSession);
1227 }
1228 let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
1230 if !skip_auth {
1231 sess.au.is_allowed(action, &resource)?;
1232 }
1233 Ok(())
1235 }
1236
1237 pub fn setup_options(&self, sess: &Session) -> Options {
1238 Options::default()
1239 .with_id(self.id)
1240 .with_ns(sess.ns())
1241 .with_db(sess.db())
1242 .with_live(sess.live())
1243 .with_auth(sess.au.clone())
1244 .with_strict(self.strict)
1245 .with_auth_enabled(self.auth_enabled)
1246 }
1247 pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
1248 let mut ctx = MutableContext::from_ds(
1249 self.query_timeout,
1250 self.capabilities.clone(),
1251 self.index_stores.clone(),
1252 self.cache.clone(),
1253 #[cfg(not(target_family = "wasm"))]
1254 self.index_builder.clone(),
1255 #[cfg(storage)]
1256 self.temporary_directory.clone(),
1257 )?;
1258 if let Some(channel) = &self.notification_channel {
1260 ctx.add_notifications(Some(&channel.0));
1261 }
1262 Ok(ctx)
1263 }
1264
1265 pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
1267 if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
1268 Err(IamError::NotAllowed {
1269 actor: "anonymous".to_string(),
1270 action: String::new(),
1271 resource: String::new(),
1272 })
1273 } else {
1274 Ok(())
1275 }
1276 }
1277}
1278
1279#[cfg(test)]
1280mod test {
1281 use super::*;
1282
1283 #[tokio::test]
1284 pub async fn very_deep_query() -> Result<(), Error> {
1285 use crate::kvs::Datastore;
1286 use crate::sql::{Expression, Future, Number, Operator, Value};
1287 use reblessive::{Stack, Stk};
1288
1289 let mut stack = Stack::new();
1291 async fn build_query(stk: &mut Stk, depth: usize) -> Value {
1292 if depth == 0 {
1293 Value::Expression(Box::new(Expression::Binary {
1294 l: Value::Number(Number::Int(1)),
1295 o: Operator::Add,
1296 r: Value::Number(Number::Int(1)),
1297 }))
1298 } else {
1299 let q = stk.run(|stk| build_query(stk, depth - 1)).await;
1300 Value::Future(Box::new(Future::from(q)))
1301 }
1302 }
1303 let val = stack.enter(|stk| build_query(stk, 1000)).finish();
1304
1305 let dbs = Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
1306
1307 let opt = Options::default()
1308 .with_id(dbs.id)
1309 .with_ns(Some("test".into()))
1310 .with_db(Some("test".into()))
1311 .with_live(false)
1312 .with_strict(false)
1313 .with_auth_enabled(false)
1314 .with_max_computation_depth(u32::MAX)
1315 .with_futures(true);
1316
1317 let mut ctx = MutableContext::default();
1319 ctx.add_capabilities(dbs.capabilities.clone());
1321 let txn = dbs.transaction(val.writeable().into(), Optimistic).await?;
1323 ctx.set_transaction(txn.enclose());
1325 let ctx = ctx.freeze();
1327 let mut stack = reblessive::tree::TreeStack::new();
1329 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
1330 assert_eq!(res, Value::Number(Number::Int(2)));
1331 Ok(())
1332 }
1333}