surrealdb/api/engine/local/
mod.rs

1//! Embedded database instance
2//!
3//! `SurrealDB` itself can be embedded in this library, allowing you to query it
4//! using the same crate and API that you would use when connecting to it
5//! remotely via WebSockets or HTTP. All storage engines are supported but you
6//! have to activate their feature flags first.
7//!
8//! **NB**: Some storage engines like `TiKV` and `RocksDB` depend on non-Rust
9//! libraries so you need to install those libraries before you can build this
10//! crate when you activate their feature flags. Please refer to [these instructions](https://github.com/surrealdb/surrealdb/blob/main/doc/BUILDING.md)
11//! for more details on how to install them. If you are on Linux and you use
12//! [the Nix package manager](https://github.com/surrealdb/surrealdb/tree/main/pkg/nix#installing-nix)
13//! you can just run
14//!
15//! ```bash
16//! nix develop github:surrealdb/surrealdb
17//! ```
18//!
19//! which will drop you into a shell with all the dependencies available. One
20//! tip you may find useful is to only enable the in-memory engine (`kv-mem`)
21//! during development. Besides letting you not worry about those dependencies
22//! on your dev machine, it allows you to keep compile times low
23//! during development while allowing you to test your code fully.
24//!
25//! When running SurrealDB as an embedded database within Rust, using the
26//! correct release profile and memory allocator can greatly improve the
27//! performance of the database core engine. In addition using an optimised
28//! asynchronous runtime configuration can help speed up concurrent queries and
29//! increase database throughput.
30//!
31//! In your project’s Cargo.toml file, ensure that the release profile uses the
32//! following configuration:
33//!
34//! ```toml
35//! [profile.release]
36//! lto = true
37//! strip = true
38//! opt-level = 3
39//! panic = 'abort'
40//! codegen-units = 1
41//! ```
42//!
43//! In your project’s Cargo.toml file, ensure that the allocator feature is
44//! among those enabled on the surrealdb dependency:
45//!
46//! ```toml
47//! [dependencies]
48//! surrealdb = { version = "2", features = ["allocator", "storage-rocksdb"] }
49//! ```
50//!
51//! When running SurrealDB within your Rust code, ensure that the asynchronous
52//! runtime is configured correctly, making use of multiple threads, an
53//! increased stack size, and an optimised number of threads:
54//!
55//! ```toml
56//! [dependencies]
57//! tokio = { version = "1", features = ["sync", "rt-multi-thread"] }
58//! ```
59//!
60//! ```no_run
61//! tokio::runtime::Builder::new_multi_thread()
62//!     .enable_all()
63//!     .thread_stack_size(10 * 1024 * 1024) // 10MiB
64//!     .build()
65//!     .unwrap()
66//!     .block_on(async {
67//!         // Your application code
68//!     })
69//! ```
70//!
71//! # Example
72//!
73//! ```no_compile
74//! use std::borrow::Cow;
75//! use serde::{Serialize, Deserialize};
76//! use serde_json::json;
77//! use surrealdb::{Error, Surreal};
78//! use surrealdb::opt::auth::Root;
79//! use surrealdb::engine::local::RocksDb;
80//!
81//! #[derive(Serialize, Deserialize)]
82//! struct Person {
83//!     title: String,
84//!     name: Name,
85//!     marketing: bool,
86//! }
87//!
88//! // Pro tip: Replace String with Cow<'static, str> to
89//! // avoid unnecessary heap allocations when inserting
90//!
91//! #[derive(Serialize, Deserialize)]
92//! struct Name {
93//!     first: Cow<'static, str>,
94//!     last: Cow<'static, str>,
95//! }
96//!
97//! #[tokio::main]
98//! async fn main() -> Result<(), Error> {
99//!     let db = Surreal::new::<RocksDb>("path/to/database/folder").await?;
100//!
101//!     // Select a specific namespace / database
102//!     db.use_ns("namespace").use_db("database").await?;
103//!
104//!     // Create a new person with a random ID
105//!     let created: Option<Person> = db.create("person")
106//!         .content(Person {
107//!             title: "Founder & CEO".into(),
108//!             name: Name {
109//!                 first: "Tobie".into(),
110//!                 last: "Morgan Hitchcock".into(),
111//!             },
112//!             marketing: true,
113//!         })
114//!         .await?;
115//!
116//!     // Create a new person with a specific ID
117//!     let created: Option<Person> = db.create(("person", "jaime"))
118//!         .content(Person {
119//!             title: "Founder & COO".into(),
120//!             name: Name {
121//!                 first: "Jaime".into(),
122//!                 last: "Morgan Hitchcock".into(),
123//!             },
124//!             marketing: false,
125//!         })
126//!         .await?;
127//!
128//!     // Update a person record with a specific ID
129//!     let updated: Option<Person> = db.update(("person", "jaime"))
130//!         .merge(json!({"marketing": true}))
131//!         .await?;
132//!
133//!     // Select all people records
134//!     let people: Vec<Person> = db.select("person").await?;
135//!
136//!     // Perform a custom advanced query
137//!     let query = r#"
138//!         SELECT marketing, count()
139//!         FROM type::table($table)
140//!         GROUP BY marketing
141//!     "#;
142//!
143//!     let groups = db.query(query)
144//!         .bind(("table", "person"))
145//!         .await?;
146//!
147//!     Ok(())
148//! }
149//! ```
150
151use std::collections::HashMap;
152use std::marker::PhantomData;
153use std::mem;
154#[cfg(not(target_family = "wasm"))]
155use std::pin::pin;
156use std::sync::Arc;
157#[cfg(not(target_family = "wasm"))]
158use std::task::{Poll, ready};
159#[cfg(not(target_family = "wasm"))]
160use std::{future::Future, path::PathBuf};
161
162#[cfg(not(target_family = "wasm"))]
163use anyhow::bail;
164use async_channel::Sender;
165#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
166use futures::StreamExt;
167#[cfg(not(target_family = "wasm"))]
168use futures::stream::poll_fn;
169use indexmap::IndexMap;
170use surrealdb_core::dbs::{Notification, Response, Session, Variables};
171#[cfg(not(target_family = "wasm"))]
172use surrealdb_core::err::Error as CoreError;
173#[cfg(feature = "ml")]
174use surrealdb_core::expr::Model;
175use surrealdb_core::expr::statements::DeleteStatement;
176use surrealdb_core::expr::{
177	CreateStatement, Data, Expr, Fields, Function, Ident, InsertStatement, KillStatement, Literal,
178	LogicalPlan, Output, SelectStatement, TopLevelExpr, UpdateStatement, UpsertStatement,
179};
180use surrealdb_core::iam;
181#[cfg(not(target_family = "wasm"))]
182use surrealdb_core::kvs::export::Config as DbExportConfig;
183use surrealdb_core::kvs::{Datastore, LockType, TransactionType};
184use surrealdb_core::val::{self, Strand};
185#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
186use surrealdb_core::{
187	expr::statements::{DefineModelStatement, DefineStatement},
188	iam::{Action, ResourceKind, check::check_ns_db},
189	ml::storage::surml_file::SurMlFile,
190};
191use tokio::sync::RwLock;
192#[cfg(not(target_family = "wasm"))]
193use tokio::{
194	fs::OpenOptions,
195	io::{self, AsyncReadExt, AsyncWriteExt},
196};
197#[cfg(not(target_family = "wasm"))]
198use tokio_util::bytes::BytesMut;
199use uuid::Uuid;
200
201use super::resource_to_exprs;
202use crate::Result;
203#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
204use crate::api::conn::MlExportConfig;
205use crate::api::conn::{Command, DbResponse, RequestData};
206use crate::api::err::Error;
207use crate::api::{Connect, Response as QueryResponse, Surreal};
208use crate::method::Stats;
209use crate::opt::IntoEndpoint;
210
211#[cfg(not(target_family = "wasm"))]
212pub(crate) mod native;
213#[cfg(target_family = "wasm")]
214pub(crate) mod wasm;
215
216type LiveQueryMap = HashMap<Uuid, Sender<Notification>>;
217
218/// In-memory database
219///
220/// # Examples
221///
222/// Instantiating a global instance
223///
224/// ```
225/// use std::sync::LazyLock;
226/// use surrealdb::{Result, Surreal};
227/// use surrealdb::engine::local::Db;
228/// use surrealdb::engine::local::Mem;
229///
230/// static DB: LazyLock<Surreal<Db>> = LazyLock::new(Surreal::init);
231///
232/// #[tokio::main]
233/// async fn main() -> Result<()> {
234///     DB.connect::<Mem>(()).await?;
235///
236///     Ok(())
237/// }
238/// ```
239///
240/// Instantiating an in-memory instance
241///
242/// ```
243/// use surrealdb::Surreal;
244/// use surrealdb::engine::local::Mem;
245///
246/// # #[tokio::main]
247/// # async fn main() -> surrealdb::Result<()> {
248/// let db = Surreal::new::<Mem>(()).await?;
249/// # Ok(())
250/// # }
251/// ```
252///
253/// Instantiating an in-memory strict instance
254///
255/// ```
256/// use surrealdb::opt::Config;
257/// use surrealdb::Surreal;
258/// use surrealdb::engine::local::Mem;
259///
260/// # #[tokio::main]
261/// # async fn main() -> surrealdb::Result<()> {
262/// let config = Config::default().strict();
263/// let db = Surreal::new::<Mem>(config).await?;
264/// # Ok(())
265/// # }
266/// ```
267#[cfg(feature = "kv-mem")]
268#[cfg_attr(docsrs, doc(cfg(feature = "kv-mem")))]
269#[derive(Debug)]
270pub struct Mem;
271
272/// RocksDB database
273///
274/// # Examples
275///
276/// Instantiating a RocksDB-backed instance
277///
278/// ```no_run
279/// # #[tokio::main]
280/// # async fn main() -> surrealdb::Result<()> {
281/// use surrealdb::Surreal;
282/// use surrealdb::engine::local::RocksDb;
283///
284/// let db = Surreal::new::<RocksDb>("path/to/database-folder").await?;
285/// # Ok(())
286/// # }
287/// ```
288///
289/// Instantiating a RocksDB-backed strict instance
290///
291/// ```no_run
292/// # #[tokio::main]
293/// # async fn main() -> surrealdb::Result<()> {
294/// use surrealdb::opt::Config;
295/// use surrealdb::Surreal;
296/// use surrealdb::engine::local::RocksDb;
297///
298/// let config = Config::default().strict();
299/// let db = Surreal::new::<RocksDb>(("path/to/database-folder", config)).await?;
300/// # Ok(())
301/// # }
302/// ```
303#[cfg(feature = "kv-rocksdb")]
304#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
305#[derive(Debug)]
306pub struct RocksDb;
307
308/// IndxDB database
309///
310/// # Examples
311///
312/// Instantiating a IndxDB-backed instance
313///
314/// ```no_run
315/// # #[tokio::main]
316/// # async fn main() -> surrealdb::Result<()> {
317/// use surrealdb::Surreal;
318/// use surrealdb::engine::local::IndxDb;
319///
320/// let db = Surreal::new::<IndxDb>("DatabaseName").await?;
321/// # Ok(())
322/// # }
323/// ```
324///
325/// Instantiating an IndxDB-backed strict instance
326///
327/// ```no_run
328/// # #[tokio::main]
329/// # async fn main() -> surrealdb::Result<()> {
330/// use surrealdb::opt::Config;
331/// use surrealdb::Surreal;
332/// use surrealdb::engine::local::IndxDb;
333///
334/// let config = Config::default().strict();
335/// let db = Surreal::new::<IndxDb>(("DatabaseName", config)).await?;
336/// # Ok(())
337/// # }
338/// ```
339#[cfg(feature = "kv-indxdb")]
340#[cfg_attr(docsrs, doc(cfg(feature = "kv-indxdb")))]
341#[derive(Debug)]
342pub struct IndxDb;
343
344/// TiKV database
345///
346/// # Examples
347///
348/// Instantiating a TiKV instance
349///
350/// ```no_run
351/// # #[tokio::main]
352/// # async fn main() -> surrealdb::Result<()> {
353/// use surrealdb::Surreal;
354/// use surrealdb::engine::local::TiKv;
355///
356/// let db = Surreal::new::<TiKv>("localhost:2379").await?;
357/// # Ok(())
358/// # }
359/// ```
360///
361/// Instantiating a TiKV strict instance
362///
363/// ```no_run
364/// # #[tokio::main]
365/// # async fn main() -> surrealdb::Result<()> {
366/// use surrealdb::opt::Config;
367/// use surrealdb::Surreal;
368/// use surrealdb::engine::local::TiKv;
369///
370/// let config = Config::default().strict();
371/// let db = Surreal::new::<TiKv>(("localhost:2379", config)).await?;
372/// # Ok(())
373/// # }
374/// ```
375#[cfg(feature = "kv-tikv")]
376#[cfg_attr(docsrs, doc(cfg(feature = "kv-tikv")))]
377#[derive(Debug)]
378pub struct TiKv;
379
380/// FoundationDB database
381///
382/// # Examples
383///
384/// Instantiating a FoundationDB-backed instance
385///
386/// ```no_run
387/// # #[tokio::main]
388/// # async fn main() -> surrealdb::Result<()> {
389/// use surrealdb::Surreal;
390/// use surrealdb::engine::local::FDb;
391///
392/// let db = Surreal::new::<FDb>("path/to/fdb.cluster").await?;
393/// # Ok(())
394/// # }
395/// ```
396///
397/// Instantiating a FoundationDB-backed strict instance
398///
399/// ```no_run
400/// # #[tokio::main]
401/// # async fn main() -> surrealdb::Result<()> {
402/// use surrealdb::opt::Config;
403/// use surrealdb::Surreal;
404/// use surrealdb::engine::local::FDb;
405///
406/// let config = Config::default().strict();
407/// let db = Surreal::new::<FDb>(("path/to/fdb.cluster", config)).await?;
408/// # Ok(())
409/// # }
410/// ```
411#[cfg(kv_fdb)]
412#[cfg_attr(docsrs, doc(cfg(feature = "kv-fdb-7_3")))]
413#[derive(Debug)]
414pub struct FDb;
415
416/// SurrealKV database
417///
418/// # Examples
419///
420/// Instantiating a SurrealKV-backed instance
421///
422/// ```no_run
423/// # #[tokio::main]
424/// # async fn main() -> surrealdb::Result<()> {
425/// use surrealdb::Surreal;
426/// use surrealdb::engine::local::SurrealKv;
427///
428/// let db = Surreal::new::<SurrealKv>("path/to/database-folder").await?;
429/// # Ok(())
430/// # }
431/// ```
432///
433/// Instantiating a SurrealKV-backed strict instance
434///
435/// ```no_run
436/// # #[tokio::main]
437/// # async fn main() -> surrealdb::Result<()> {
438/// use surrealdb::opt::Config;
439/// use surrealdb::Surreal;
440/// use surrealdb::engine::local::SurrealKv;
441///
442/// let config = Config::default().strict();
443/// let db = Surreal::new::<SurrealKv>(("path/to/database-folder", config)).await?;
444/// # Ok(())
445/// # }
446/// ```
447#[cfg(feature = "kv-surrealkv")]
448#[cfg_attr(docsrs, doc(cfg(feature = "kv-surrealkv")))]
449#[derive(Debug)]
450pub struct SurrealKv;
451
452/// An embedded database
453#[derive(Debug, Clone)]
454pub struct Db(());
455
456impl Surreal<Db> {
457	/// Connects to a specific database endpoint, saving the connection on the
458	/// static client
459	pub fn connect<P>(&self, address: impl IntoEndpoint<P, Client = Db>) -> Connect<Db, ()> {
460		Connect {
461			surreal: self.inner.clone().into(),
462			address: address.into_endpoint(),
463			capacity: 0,
464			response_type: PhantomData,
465		}
466	}
467}
468
469fn process(responses: Vec<Response>) -> QueryResponse {
470	let mut map = IndexMap::<usize, (Stats, Result<val::Value>)>::with_capacity(responses.len());
471	for (index, response) in responses.into_iter().enumerate() {
472		let stats = Stats {
473			execution_time: Some(response.time),
474		};
475		match response.result {
476			Ok(value) => {
477				// Deserializing from a core value should always work.
478				map.insert(index, (stats, Ok(value)));
479			}
480			Err(error) => {
481				map.insert(index, (stats, Err(error)));
482			}
483		};
484	}
485	QueryResponse {
486		results: map,
487		..QueryResponse::new()
488	}
489}
490
491async fn take(one: bool, responses: Vec<Response>) -> Result<val::Value> {
492	if let Some((_stats, result)) = process(responses).results.swap_remove(&0) {
493		let value = result?;
494		match one {
495			true => match value {
496				val::Value::Array(mut array) => {
497					if let [ref mut value] = array[..] {
498						return Ok(mem::replace(value, val::Value::None));
499					}
500				}
501				val::Value::None | val::Value::Null => {}
502				value => return Ok(value),
503			},
504			false => return Ok(value),
505		}
506	}
507	match one {
508		true => Ok(val::Value::None),
509		false => Ok(val::Value::Array(Default::default())),
510	}
511}
512
513#[cfg(not(target_family = "wasm"))]
514async fn export_file(
515	kvs: &Datastore,
516	sess: &Session,
517	chn: async_channel::Sender<Vec<u8>>,
518	config: Option<DbExportConfig>,
519) -> Result<()> {
520	let res = match config {
521		Some(config) => kvs.export_with_config(sess, chn, config).await?.await,
522		None => kvs.export(sess, chn).await?.await,
523	};
524
525	if let Err(error) = res {
526		if let Some(surrealdb_core::err::Error::Channel(message)) = error.downcast_ref() {
527			// This is not really an error. Just logging it for improved visibility.
528			trace!("{message}");
529			return Ok(());
530		}
531
532		return Err(error);
533	}
534	Ok(())
535}
536
537#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
538async fn export_ml(
539	kvs: &Datastore,
540	sess: &Session,
541	chn: async_channel::Sender<Vec<u8>>,
542	MlExportConfig {
543		name,
544		version,
545	}: MlExportConfig,
546) -> Result<()> {
547	let (nsv, dbv) = check_ns_db(sess)?;
548	// Check the permissions level
549	kvs.check(sess, Action::View, ResourceKind::Model.on_db(&nsv, &dbv))?;
550
551	// Ensure a NS and DB are set
552	let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
553	let Some(db) = tx.get_db_by_name(&nsv, &dbv).await? else {
554		anyhow::bail!("Database not found".to_string());
555	};
556	tx.cancel().await?;
557
558	// Attempt to get the model definition
559	let Some(model) = tx.get_db_model(db.namespace_id, db.database_id, &name, &version).await?
560	else {
561		// Attempt to get the model definition
562		anyhow::bail!("Model not found".to_string());
563	};
564	// Export the file data in to the store
565	let mut data = surrealdb_core::obs::stream(model.hash.clone()).await?;
566	// Process all stream values
567	while let Some(Ok(bytes)) = data.next().await {
568		if chn.send(bytes.to_vec()).await.is_err() {
569			break;
570		}
571	}
572	Ok(())
573}
574
575#[cfg(not(target_family = "wasm"))]
576async fn copy<'a, R, W>(path: PathBuf, reader: &'a mut R, writer: &'a mut W) -> Result<()>
577where
578	R: tokio::io::AsyncRead + Unpin + ?Sized,
579	W: tokio::io::AsyncWrite + Unpin + ?Sized,
580{
581	io::copy(reader, writer)
582		.await
583		.map(|_| ())
584		.map_err(|error| crate::error::Api::FileRead {
585			path,
586			error,
587		})
588		.map_err(anyhow::Error::new)
589}
590
591async fn kill_live_query(
592	kvs: &Datastore,
593	id: Uuid,
594	session: &Session,
595	vars: Variables,
596) -> Result<val::Value> {
597	let kill_plan = KillStatement {
598		id: Expr::Literal(Literal::Uuid(id.into())),
599	};
600	let plan = LogicalPlan {
601		expressions: vec![TopLevelExpr::Kill(kill_plan)],
602	};
603
604	let response = kvs.process_plan(plan, session, Some(vars)).await?;
605	take(true, response).await
606}
607
608async fn router(
609	RequestData {
610		command,
611		..
612	}: RequestData,
613	kvs: &Arc<Datastore>,
614	session: &Arc<RwLock<Session>>,
615	vars: &Arc<RwLock<Variables>>,
616	live_queries: &Arc<RwLock<LiveQueryMap>>,
617) -> Result<DbResponse> {
618	match command {
619		Command::Use {
620			namespace,
621			database,
622		} => {
623			if let Some(ns) = namespace {
624				let tx = kvs.transaction(TransactionType::Write, LockType::Optimistic).await?;
625				tx.get_or_add_ns(&ns, kvs.is_strict_mode()).await?;
626				tx.commit().await?;
627				session.write().await.ns = Some(ns);
628			}
629			if let Some(db) = database {
630				let ns = session.read().await.ns.clone().unwrap();
631				let tx = kvs.transaction(TransactionType::Write, LockType::Optimistic).await?;
632				tx.ensure_ns_db(&ns, &db, kvs.is_strict_mode()).await?;
633				tx.commit().await?;
634				session.write().await.db = Some(db);
635			}
636			Ok(DbResponse::Other(val::Value::None))
637		}
638		Command::Signup {
639			credentials,
640		} => {
641			let response =
642				iam::signup::signup(kvs, &mut *session.write().await, credentials).await?.token;
643			// TODO: Null byte validity
644			let response = response
645				.map(|x| unsafe { Strand::new_unchecked(x) })
646				.map(From::from)
647				.unwrap_or(val::Value::None);
648			Ok(DbResponse::Other(response))
649		}
650		Command::Signin {
651			credentials,
652		} => {
653			let response =
654				iam::signin::signin(kvs, &mut *session.write().await, credentials).await?.token;
655			Ok(DbResponse::Other(response.into()))
656		}
657		Command::Authenticate {
658			token,
659		} => {
660			iam::verify::token(kvs, &mut *session.write().await, &token).await?;
661			Ok(DbResponse::Other(val::Value::None))
662		}
663		Command::Invalidate => {
664			iam::clear::clear(&mut *session.write().await)?;
665			Ok(DbResponse::Other(val::Value::None))
666		}
667		Command::Create {
668			txn: _,
669			what,
670			data,
671		} => {
672			let create_plan = CreateStatement {
673				only: false,
674				what: resource_to_exprs(what),
675				data: data.map(|x| Data::ContentExpression(x.into_literal())),
676				output: Some(Output::After),
677				timeout: None,
678				parallel: false,
679				version: None,
680			};
681			let plan = LogicalPlan {
682				expressions: vec![TopLevelExpr::Expr(Expr::Create(Box::new(create_plan)))],
683			};
684
685			let response = kvs
686				.process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
687				.await?;
688			let value = take(true, response).await?;
689			Ok(DbResponse::Other(value))
690		}
691		Command::Upsert {
692			txn: _,
693			what,
694			data,
695		} => {
696			let one = what.is_single_recordid();
697			let upsert_plan = UpsertStatement {
698				only: false,
699				what: resource_to_exprs(what),
700				data: data.map(|x| Data::ContentExpression(x.into_literal())),
701				with: None,
702				cond: None,
703				output: Some(Output::After),
704				timeout: None,
705				parallel: false,
706				explain: None,
707			};
708			let plan = LogicalPlan {
709				expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
710			};
711			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
712			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
713			let value = take(one, response).await?;
714			Ok(DbResponse::Other(value))
715		}
716		Command::Update {
717			txn: _,
718			what,
719			data,
720		} => {
721			let one = what.is_single_recordid();
722			let update_plan = UpdateStatement {
723				only: false,
724				what: resource_to_exprs(what),
725				data: data.map(|x| Data::ContentExpression(x.into_literal())),
726				with: None,
727				cond: None,
728				output: Some(Output::After),
729				timeout: None,
730				parallel: false,
731				explain: None,
732			};
733			let plan = LogicalPlan {
734				expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
735			};
736			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
737			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
738			let value = take(one, response).await?;
739			Ok(DbResponse::Other(value))
740		}
741		Command::Insert {
742			txn: _,
743			what,
744			data,
745		} => {
746			let one = !data.is_array();
747
748			let insert_plan = InsertStatement {
749				into: what.map(|w| Expr::Table(unsafe { Ident::new_unchecked(w) })),
750				data: Data::SingleExpression(data.into_literal()),
751				ignore: false,
752				update: None,
753				output: Some(Output::After),
754				timeout: None,
755				parallel: false,
756				relation: false,
757				version: None,
758			};
759			let plan = LogicalPlan {
760				expressions: vec![TopLevelExpr::Expr(Expr::Insert(Box::new(insert_plan)))],
761			};
762			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
763			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
764			let value = take(one, response).await?;
765			Ok(DbResponse::Other(value))
766		}
767		Command::InsertRelation {
768			txn: _,
769			what,
770			data,
771		} => {
772			let one = !data.is_array();
773
774			let insert_plan = InsertStatement {
775				into: what.map(|w| Expr::Table(unsafe { Ident::new_unchecked(w) })),
776				data: Data::SingleExpression(data.into_literal()),
777				output: Some(Output::After),
778				relation: true,
779				ignore: false,
780				update: None,
781				timeout: None,
782				parallel: false,
783				version: None,
784			};
785			let plan = LogicalPlan {
786				expressions: vec![TopLevelExpr::Expr(Expr::Insert(Box::new(insert_plan)))],
787			};
788
789			let response = kvs
790				.process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
791				.await?;
792			let value = take(one, response).await?;
793			Ok(DbResponse::Other(value))
794		}
795		Command::Patch {
796			txn: _,
797			what,
798			data,
799			upsert,
800		} => {
801			let plan = if upsert {
802				let upsert_plan = UpsertStatement {
803					only: false,
804					what: resource_to_exprs(what),
805					data: data.map(|x| Data::PatchExpression(x.into_literal())),
806					with: None,
807					cond: None,
808					output: Some(Output::After),
809					timeout: None,
810					parallel: false,
811					explain: None,
812				};
813				LogicalPlan {
814					expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
815				}
816			} else {
817				let update_plan = UpdateStatement {
818					only: false,
819					what: resource_to_exprs(what),
820					data: data.map(|x| Data::PatchExpression(x.into_literal())),
821					with: None,
822					cond: None,
823					output: Some(Output::After),
824					timeout: None,
825					parallel: false,
826					explain: None,
827				};
828				LogicalPlan {
829					expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
830				}
831			};
832			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
833			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
834			let response = process(response);
835			Ok(DbResponse::Query(response))
836		}
837		Command::Merge {
838			txn: _,
839			what,
840			data,
841			upsert,
842		} => {
843			let plan = if upsert {
844				let upsert_plan = UpsertStatement {
845					only: false,
846					what: resource_to_exprs(what),
847					data: data.map(|x| Data::MergeExpression(x.into_literal())),
848					with: None,
849					cond: None,
850					output: Some(Output::After),
851					timeout: None,
852					parallel: false,
853					explain: None,
854				};
855				LogicalPlan {
856					expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
857				}
858			} else {
859				let update_plan = UpdateStatement {
860					only: false,
861					what: resource_to_exprs(what),
862					data: data.map(|x| Data::MergeExpression(x.into_literal())),
863					with: None,
864					cond: None,
865					output: Some(Output::After),
866					timeout: None,
867					parallel: false,
868					explain: None,
869				};
870				LogicalPlan {
871					expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
872				}
873			};
874			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
875			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
876			let response = process(response);
877			Ok(DbResponse::Query(response))
878		}
879		Command::Select {
880			txn: _,
881			what,
882		} => {
883			let one = what.is_single_recordid();
884
885			let select_plan = SelectStatement {
886				expr: Fields::all(),
887				what: resource_to_exprs(what),
888				omit: None,
889				only: false,
890				with: None,
891				cond: None,
892				split: None,
893				group: None,
894				order: None,
895				limit: None,
896				start: None,
897				fetch: None,
898				version: None,
899				timeout: None,
900				parallel: false,
901				explain: None,
902				tempfiles: false,
903			};
904
905			let plan = LogicalPlan {
906				expressions: vec![TopLevelExpr::Expr(Expr::Select(Box::new(select_plan)))],
907			};
908			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
909			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
910			let value = take(one, response).await?;
911			Ok(DbResponse::Other(value))
912		}
913		Command::Delete {
914			txn: _,
915			what,
916		} => {
917			let one = what.is_single_recordid();
918			let delete_plan = DeleteStatement {
919				only: false,
920				what: resource_to_exprs(what),
921				with: None,
922				cond: None,
923				output: Some(Output::Before),
924				timeout: None,
925				parallel: false,
926				explain: None,
927			};
928			let plan = LogicalPlan {
929				expressions: vec![TopLevelExpr::Expr(Expr::Delete(Box::new(delete_plan)))],
930			};
931			let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
932			let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
933			let value = take(one, response).await?;
934			Ok(DbResponse::Other(value))
935		}
936		Command::Query {
937			txn: _,
938			query,
939			variables,
940		} => {
941			let mut vars = vars.read().await.clone();
942			vars.merge(variables);
943			let response =
944				kvs.execute(&query.to_string(), &*session.read().await, Some(vars)).await?;
945			let response = process(response);
946			Ok(DbResponse::Query(response))
947		}
948		Command::RawQuery {
949			txn: _,
950			query,
951			variables,
952		} => {
953			let mut vars = vars.read().await.clone();
954			vars.merge(variables);
955			let response = kvs.execute(query.as_ref(), &*session.read().await, Some(vars)).await?;
956			let response = process(response);
957			Ok(DbResponse::Query(response))
958		}
959
960		#[cfg(target_family = "wasm")]
961		Command::ExportFile {
962			..
963		}
964		| Command::ExportBytes {
965			..
966		}
967		| Command::ImportFile {
968			..
969		} => Err(crate::api::Error::BackupsNotSupported.into()),
970
971		#[cfg(any(target_family = "wasm", not(feature = "ml")))]
972		Command::ExportMl {
973			..
974		}
975		| Command::ExportBytesMl {
976			..
977		}
978		| Command::ImportMl {
979			..
980		} => Err(crate::api::Error::BackupsNotSupported.into()),
981
982		#[cfg(not(target_family = "wasm"))]
983		Command::ExportFile {
984			path: file,
985			config,
986		} => {
987			let (tx, rx) = crate::channel::bounded(1);
988			let (mut writer, mut reader) = io::duplex(10_240);
989
990			// Write to channel.
991			let session = session.read().await.clone();
992			let export = export_file(kvs, &session, tx, config);
993
994			// Read from channel and write to pipe.
995			let bridge = async move {
996				while let Ok(value) = rx.recv().await {
997					if writer.write_all(&value).await.is_err() {
998						// Broken pipe. Let either side's error be propagated.
999						break;
1000					}
1001				}
1002				Ok(())
1003			};
1004
1005			// Output to stdout or file.
1006			let mut output = match OpenOptions::new()
1007				.write(true)
1008				.create(true)
1009				.truncate(true)
1010				.open(&file)
1011				.await
1012			{
1013				Ok(path) => path,
1014				Err(error) => {
1015					return Err(Error::FileOpen {
1016						path: file,
1017						error,
1018					}
1019					.into());
1020				}
1021			};
1022
1023			// Copy from pipe to output.
1024			let copy = copy(file, &mut reader, &mut output);
1025
1026			tokio::try_join!(export, bridge, copy)?;
1027			Ok(DbResponse::Other(val::Value::None))
1028		}
1029
1030		#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1031		Command::ExportMl {
1032			path,
1033			config,
1034		} => {
1035			let (tx, rx) = crate::channel::bounded(1);
1036			let (mut writer, mut reader) = io::duplex(10_240);
1037
1038			// Write to channel.
1039			let session = session.read().await;
1040			let export = export_ml(kvs, &session, tx, config);
1041
1042			// Read from channel and write to pipe.
1043			let bridge = async move {
1044				while let Ok(value) = rx.recv().await {
1045					if writer.write_all(&value).await.is_err() {
1046						// Broken pipe. Let either side's error be propagated.
1047						break;
1048					}
1049				}
1050				Ok(())
1051			};
1052
1053			// Output to stdout or file.
1054			let mut output = match OpenOptions::new()
1055				.write(true)
1056				.create(true)
1057				.truncate(true)
1058				.open(&path)
1059				.await
1060			{
1061				Ok(path) => path,
1062				Err(error) => {
1063					return Err(Error::FileOpen {
1064						path,
1065						error,
1066					}
1067					.into());
1068				}
1069			};
1070
1071			// Copy from pipe to output.
1072			let copy = copy(path, &mut reader, &mut output);
1073
1074			tokio::try_join!(export, bridge, copy)?;
1075			Ok(DbResponse::Other(val::Value::None))
1076		}
1077
1078		#[cfg(not(target_family = "wasm"))]
1079		Command::ExportBytes {
1080			bytes,
1081			config,
1082		} => {
1083			let (tx, rx) = crate::channel::bounded(1);
1084
1085			let kvs = kvs.clone();
1086			let session = session.read().await.clone();
1087			tokio::spawn(async move {
1088				let export = async {
1089					if let Err(error) = export_file(&kvs, &session, tx, config).await {
1090						let _ = bytes.send(Err(error)).await;
1091					}
1092				};
1093
1094				let bridge = async {
1095					while let Ok(b) = rx.recv().await {
1096						if bytes.send(Ok(b)).await.is_err() {
1097							break;
1098						}
1099					}
1100				};
1101
1102				tokio::join!(export, bridge);
1103			});
1104
1105			Ok(DbResponse::Other(val::Value::None))
1106		}
1107		#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1108		Command::ExportBytesMl {
1109			bytes,
1110			config,
1111		} => {
1112			let (tx, rx) = crate::channel::bounded(1);
1113
1114			let kvs = kvs.clone();
1115			let session = session.clone();
1116			tokio::spawn(async move {
1117				let export = async {
1118					if let Err(error) = export_ml(&kvs, &*session.read().await, tx, config).await {
1119						let _ = bytes.send(Err(error)).await;
1120					}
1121				};
1122
1123				let bridge = async {
1124					while let Ok(b) = rx.recv().await {
1125						if bytes.send(Ok(b)).await.is_err() {
1126							break;
1127						}
1128					}
1129				};
1130
1131				tokio::join!(export, bridge);
1132			});
1133
1134			Ok(DbResponse::Other(val::Value::None))
1135		}
1136		#[cfg(not(target_family = "wasm"))]
1137		Command::ImportFile {
1138			path,
1139		} => {
1140			let file = match OpenOptions::new().read(true).open(&path).await {
1141				Ok(path) => path,
1142				Err(error) => {
1143					bail!(Error::FileOpen {
1144						path,
1145						error,
1146					});
1147				}
1148			};
1149
1150			let mut file = pin!(file);
1151			let mut buffer = BytesMut::with_capacity(4096);
1152
1153			let stream = poll_fn(|ctx| {
1154				// Doing it this way optimizes allocation.
1155				// It is highly likely that the buffer we return from this stream will be
1156				// dropped between calls to this function.
1157				// If this is the case than instead of allocating new memory the call to reserve
1158				// will instead reclaim the existing used memory.
1159				if buffer.capacity() == 0 {
1160					buffer.reserve(4096);
1161				}
1162
1163				let future = pin!(file.read_buf(&mut buffer));
1164				match ready!(future.poll(ctx)) {
1165					Ok(0) => Poll::Ready(None),
1166					Ok(_) => Poll::Ready(Some(Ok(buffer.split().freeze()))),
1167					Err(e) => {
1168						let error = anyhow::Error::new(CoreError::QueryStream(e.to_string()));
1169						Poll::Ready(Some(Err(error)))
1170					}
1171				}
1172			});
1173
1174			let responses = kvs
1175				.execute_import(&*session.read().await, Some(vars.read().await.clone()), stream)
1176				.await?;
1177
1178			for response in responses {
1179				response.result?;
1180			}
1181
1182			Ok(DbResponse::Other(val::Value::None))
1183		}
1184		#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1185		Command::ImportMl {
1186			path,
1187		} => {
1188			let mut file = match OpenOptions::new().read(true).open(&path).await {
1189				Ok(path) => path,
1190				Err(error) => {
1191					return Err(Error::FileOpen {
1192						path,
1193						error,
1194					}
1195					.into());
1196				}
1197			};
1198
1199			// Ensure a NS and DB are set
1200			let (nsv, dbv) = check_ns_db(&*session.read().await)?;
1201			// Check the permissions level
1202			kvs.check(&*session.read().await, Action::Edit, ResourceKind::Model.on_db(&nsv, &dbv))?;
1203			// Create a new buffer
1204			let mut buffer = Vec::new();
1205			// Load all the uploaded file chunks
1206			if let Err(error) = file.read_to_end(&mut buffer).await {
1207				return Err(Error::FileRead {
1208					path,
1209					error,
1210				}
1211				.into());
1212			}
1213			// Check that the SurrealML file is valid
1214			let file = match SurMlFile::from_bytes(buffer) {
1215				Ok(file) => file,
1216				Err(error) => {
1217					return Err(Error::FileRead {
1218						path,
1219						error: io::Error::new(
1220							io::ErrorKind::InvalidData,
1221							error.message.to_string(),
1222						),
1223					}
1224					.into());
1225				}
1226			};
1227			// Convert the file back in to raw bytes
1228			let data = file.to_bytes();
1229			// Calculate the hash of the model file
1230			let hash = surrealdb_core::obs::hash(&data);
1231			// Insert the file data in to the store
1232			surrealdb_core::obs::put(&hash, data).await?;
1233			// Insert the model in to the database
1234			let model = DefineModelStatement {
1235				name: Ident::new(file.header.name.to_string()).unwrap(),
1236				version: file.header.version.to_string(),
1237				comment: Some(file.header.description.to_string().into()),
1238				hash,
1239				..Default::default()
1240			};
1241			// TODO: Null byte validity
1242			let q = DefineStatement::Model(model);
1243			let q = LogicalPlan {
1244				expressions: vec![TopLevelExpr::Expr(Expr::Define(Box::new(q)))],
1245			};
1246			let responses = kvs
1247				.process_plan(q, &*session.read().await, Some(vars.read().await.clone()))
1248				.await?;
1249
1250			for response in responses {
1251				response.result?;
1252			}
1253
1254			Ok(DbResponse::Other(val::Value::None))
1255		}
1256		Command::Health => Ok(DbResponse::Other(val::Value::None)),
1257		Command::Version => {
1258			Ok(DbResponse::Other(val::Value::from(surrealdb_core::env::VERSION.to_string())))
1259		}
1260		Command::Set {
1261			key,
1262			value,
1263		} => {
1264			surrealdb_core::rpc::check_protected_param(&key)?;
1265			// Need to compute because certain keys might not be allowed to be set and those
1266			// should be rejected by an error.
1267			match value {
1268				val::Value::None => vars.write().await.remove(&key),
1269				v => vars.write().await.insert(key, v),
1270			};
1271
1272			Ok(DbResponse::Other(val::Value::None))
1273		}
1274		Command::Unset {
1275			key,
1276		} => {
1277			vars.write().await.remove(&key);
1278			Ok(DbResponse::Other(val::Value::None))
1279		}
1280		Command::SubscribeLive {
1281			uuid,
1282			notification_sender,
1283		} => {
1284			live_queries.write().await.insert(uuid, notification_sender);
1285			Ok(DbResponse::Other(val::Value::None))
1286		}
1287		Command::Kill {
1288			uuid,
1289		} => {
1290			live_queries.write().await.remove(&uuid);
1291			let value =
1292				kill_live_query(kvs, uuid, &*session.read().await, vars.read().await.clone())
1293					.await?;
1294			Ok(DbResponse::Other(value))
1295		}
1296
1297		Command::Run {
1298			name,
1299			version: _version,
1300			args,
1301		} => {
1302			let func = match name.strip_prefix("fn::") {
1303				Some(name) => Function::Custom(name.to_owned()),
1304				None => match name.strip_prefix("ml::") {
1305					#[cfg(feature = "ml")]
1306					Some(name) => Function::Model(Model {
1307						name: name.to_owned(),
1308						version: _version
1309							.ok_or(Error::Query("ML functions must have a version".to_string()))?,
1310					}),
1311					#[cfg(not(feature = "ml"))]
1312					Some(_) => {
1313						return Err(Error::Query(format!(
1314							"tried to call an ML function `{name}` but the `ml` feature is not enabled"
1315						))
1316						.into());
1317					}
1318					None => Function::Normal(name),
1319				},
1320			};
1321
1322			let args = args.into_iter().map(|x| x.into_literal()).collect();
1323
1324			let plan = Expr::FunctionCall(Box::new(surrealdb_core::expr::FunctionCall {
1325				receiver: func,
1326				arguments: args,
1327			}));
1328
1329			let plan = LogicalPlan {
1330				expressions: vec![TopLevelExpr::Expr(plan)],
1331			};
1332
1333			let response = kvs
1334				.process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
1335				.await?;
1336			let value = take(true, response).await?;
1337
1338			Ok(DbResponse::Other(value))
1339		}
1340	}
1341}