Skip to main content

surrealdb_core/rpc/
protocol.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use surrealdb_types::{HashMap, object};
5use tokio::sync::RwLock;
6use uuid::Uuid;
7
8use crate::catalog::providers::{CatalogProvider, NamespaceProvider, RootProvider};
9use crate::dbs::capabilities::{ExperimentalTarget, MethodTarget};
10use crate::dbs::{QueryResult, QueryType, Session};
11use crate::iam::token::Token;
12use crate::kvs::{Datastore, LockType, TransactionType};
13use crate::rpc::args::extract_args;
14use crate::rpc::{
15	DbResult, Method, bad_lq_config, invalid_params, method_not_allowed, method_not_found,
16	session_exists, session_expired, session_not_found, types_error_from_anyhow,
17};
18use crate::sql::statements::live::LiveFields;
19use crate::sql::{
20	Ast, CreateStatement, Data as SqlData, DeleteStatement, Expr, Fields, Function, FunctionCall,
21	InsertStatement, KillStatement, Literal, LiveStatement, Model, Output, RelateStatement,
22	SelectStatement, TopLevelExpr, UpdateStatement, UpsertStatement,
23};
24use crate::types::{
25	PublicArray, PublicRecordIdKey, PublicUuid, PublicValue, PublicVariables, SurrealValue,
26};
27
28/// utility function converting a `Value::String` into a `Expr::Table`
29fn value_to_table(value: PublicValue) -> Expr {
30	match value {
31		PublicValue::String(s) => Expr::Table(s),
32		x => Expr::from_public_value(x),
33	}
34}
35
36/// returns if the expression returns a singular value when selected.
37///
38/// As this rpc is some what convuluted the singular conditions is not the same
39/// for all cases.
40fn singular(value: &PublicValue) -> bool {
41	match value {
42		PublicValue::Object(_) => true,
43		PublicValue::RecordId(t) => !matches!(t.key, PublicRecordIdKey::Range(_)),
44		_ => false,
45	}
46}
47
48#[expect(async_fn_in_trait)]
49pub trait RpcProtocol {
50	/// The datastore for this RPC interface
51	fn kvs(&self) -> &Datastore;
52	/// The version information for this RPC context
53	fn version_data(&self) -> DbResult;
54
55	// ------------------------------
56	// Sessions
57	// ------------------------------
58
59	/// A pointer to all active sessions
60	fn session_map(&self) -> &HashMap<Option<Uuid>, Arc<RwLock<Session>>>;
61
62	/// Registers a new session with the given ID
63	async fn attach(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
64		let mut session = Session::default().with_rt(Self::LQ_SUPPORT);
65		session.id = session_id;
66		match session_id {
67			Some(id) => {
68				if self.session_map().contains_key(&Some(id)) {
69					return Err(session_exists(id));
70				}
71				self.session_map().insert(Some(id), Arc::new(RwLock::new(session)));
72				Ok(DbResult::Other(PublicValue::None))
73			}
74			None => Err(invalid_params("Expected a session ID")),
75		}
76	}
77
78	/// Detaches a session from the given ID
79	async fn detach(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
80		match session_id {
81			Some(id) => {
82				self.del_session(&id).await;
83				Ok(DbResult::Other(PublicValue::None))
84			}
85			None => Err(invalid_params("Expected a session ID")),
86		}
87	}
88
89	/// The current session for this RPC context
90	fn get_session(
91		&self,
92		id: &Option<Uuid>,
93	) -> Result<Arc<RwLock<Session>>, surrealdb_types::Error> {
94		match self.session_map().get(id) {
95			Some(session) => Ok(session),
96			None => Err(session_not_found(*id)),
97		}
98	}
99
100	/// Mutable access to the current session for this RPC context
101	fn set_session(&self, id: Option<Uuid>, session: Arc<RwLock<Session>>) {
102		self.session_map().insert(id, session);
103	}
104
105	/// Deletes a session
106	async fn del_session(&self, id: &Uuid) {
107		self.session_map().remove(&Some(*id));
108		// Cleanup live queries
109		self.cleanup_lqs(Some(id)).await;
110	}
111
112	/// Lists all non-default sessions
113	async fn sessions(&self) -> Result<DbResult, surrealdb_types::Error> {
114		let array = self
115			.session_map()
116			.to_vec()
117			.into_iter()
118			.filter_map(|(key, _)| key)
119			.map(|x| PublicValue::Uuid(PublicUuid::from(x)))
120			.collect();
121		Ok(DbResult::Other(PublicValue::Array(array)))
122	}
123
124	// ------------------------------
125	// Transactions
126	// ------------------------------
127
128	/// Retrieves a transaction by ID
129	async fn get_tx(
130		&self,
131		_id: Uuid,
132	) -> Result<Arc<crate::kvs::Transaction>, surrealdb_types::Error> {
133		Err(method_not_allowed(Method::Unknown.to_string()))
134	}
135
136	/// Stores a transaction
137	async fn set_tx(
138		&self,
139		_id: Uuid,
140		_tx: Arc<crate::kvs::Transaction>,
141	) -> Result<(), surrealdb_types::Error> {
142		Err(method_not_found(Method::Unknown.to_string()))
143	}
144
145	// ------------------------------
146	// Realtime
147	// ------------------------------
148
149	/// Live queries are disabled by default
150	const LQ_SUPPORT: bool = false;
151
152	/// Handles the execution of a LIVE statement
153	fn handle_live(
154		&self,
155		_lqid: &Uuid,
156		_session_id: Option<Uuid>,
157	) -> impl std::future::Future<Output = ()> + Send {
158		async { unimplemented!("handle_live function must be implemented if LQ_SUPPORT = true") }
159	}
160	/// Handles the execution of a KILL statement
161	fn handle_kill(&self, _lqid: &Uuid) -> impl std::future::Future<Output = ()> + Send {
162		async { unimplemented!("handle_kill function must be implemented if LQ_SUPPORT = true") }
163	}
164
165	/// Handles the cleanup of live queries
166	fn cleanup_lqs(
167		&self,
168		session_id: Option<&Uuid>,
169	) -> impl std::future::Future<Output = ()> + Send;
170
171	/// Handles the cleanup of all live queries
172	fn cleanup_all_lqs(&self) -> impl std::future::Future<Output = ()> + Send;
173
174	// ------------------------------
175	// Method execution
176	// ------------------------------
177
178	/// Executes a method on this RPC implementation
179	async fn execute(
180		&self,
181		txn: Option<Uuid>,
182		session: Option<Uuid>,
183		method: Method,
184		params: PublicArray,
185	) -> Result<DbResult, surrealdb_types::Error> {
186		// Check if capabilities allow executing the requested RPC method
187		if !self.kvs().allows_rpc_method(&MethodTarget {
188			method,
189		}) {
190			warn!("Capabilities denied RPC method call attempt, target: '{method}'");
191			return Err(method_not_allowed(method.to_string()));
192		}
193		// Execute the desired method
194		match method {
195			Method::Ping => Ok(DbResult::Other(PublicValue::None)),
196			Method::Info => self.info(txn, session).await,
197			Method::Use => self.yuse(session, params).await,
198			Method::Signup => self.signup(session, params).await,
199			Method::Signin => self.signin(session, params).await,
200			Method::Authenticate => self.authenticate(session, params).await,
201			Method::Refresh => self.refresh(session, params).await,
202			Method::Invalidate => self.invalidate(session).await,
203			Method::Revoke => self.revoke(params).await,
204			Method::Reset => self.reset(session).await,
205			Method::Kill => self.kill(txn, session, params).await,
206			Method::Live => self.live(txn, session, params).await,
207			Method::Set => self.set(session, params).await,
208			Method::Unset => self.unset(session, params).await,
209			Method::Query => self.query(txn, session, params).await,
210			Method::Version => self.version(txn, params).await,
211			Method::Begin => self.begin(txn, session).await,
212			Method::Commit => self.commit(txn, session, params).await,
213			Method::Cancel => self.cancel(txn, session, params).await,
214			Method::Sessions => self.sessions().await,
215			Method::Attach => self.attach(session).await,
216			Method::Detach => self.detach(session).await,
217			// Deprecated methods
218			Method::Select => self.select(txn, session, params).await,
219			Method::Insert => self.insert(txn, session, params).await,
220			Method::Create => self.create(txn, session, params).await,
221			Method::Upsert => self.upsert(txn, session, params).await,
222			Method::Update => self.update(txn, session, params).await,
223			Method::Merge => self.merge(txn, session, params).await,
224			Method::Patch => self.patch(txn, session, params).await,
225			Method::Delete => self.delete(txn, session, params).await,
226			Method::Relate => self.relate(txn, session, params).await,
227			Method::Run => self.run(txn, session, params).await,
228			Method::InsertRelation => self.insert_relation(txn, session, params).await,
229			_ => Err(method_not_found(method.to_string())),
230		}
231	}
232
233	// ------------------------------
234	// Methods for authentication
235	// ------------------------------
236
237	/// Handles the USE RPC method for switching namespace and database context.
238	///
239	/// This method supports three usage patterns:
240	/// 1. **Explicit selection**: `USE ns "namespace" db "database"` - directly sets ns/db
241	/// 2. **Partial selection**: `USE ns "namespace"` - sets ns while preserving or clearing db
242	/// 3. **Default selection**: `USE` (empty call) - applies defaults from config or token
243	///
244	/// When called with no arguments (pattern 3), the behavior depends on session state:
245	/// - If the session already has ns/db from token authentication (JWT claims), those are
246	///   preserved
247	/// - Otherwise, defaults from the database configuration are applied if available
248	///
249	/// Returns an object with the resulting `namespace` and `database` values, allowing
250	/// clients (especially HTTP) to sync their local state with the server session.
251	async fn yuse(
252		&self,
253		session_id: Option<Uuid>,
254		params: PublicArray,
255	) -> Result<DbResult, surrealdb_types::Error> {
256		let session_lock = self.get_session(&session_id)?;
257
258		// Check permissions with read lock
259		{
260			let session = session_lock.read().await;
261			// Check if the user is allowed to query
262			if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
263				return Err(method_not_allowed(Method::Use.to_string()));
264			}
265		}
266
267		// For both ns+db, string = change, null = unset, none = do nothing
268		// We need to be able to adjust either ns or db without affecting the other
269		// To be able to select a namespace, and then list resources in that namespace,
270		// as an example
271		let (ns, db) = extract_args::<(PublicValue, PublicValue)>(params.into_vec())
272			.ok_or(invalid_params("Expected (ns, db)".to_string()))?;
273		// Get a write lock on the session to modify it
274		let mut session = session_lock.write().await;
275		// Empty USE call: apply defaults only if session doesn't already have ns/db
276		if ns.is_none() && db.is_none() {
277			// Skip applying defaults if ns is already set (e.g., from token authentication)
278			if session.ns.is_none() {
279				// Fetch defaults from database configuration
280				let kvs = self.kvs();
281				let tx = kvs
282					.transaction(TransactionType::Write, LockType::Optimistic)
283					.await
284					.map_err(types_error_from_anyhow)?;
285				let (ns, db) = if let Some(x) = match tx.get_default_config().await {
286					Err(e) => {
287						let _ = tx.cancel().await;
288						return Err(types_error_from_anyhow(e));
289					}
290					Ok(v) => v,
291				} {
292					(x.namespace.clone(), x.database.clone())
293				} else {
294					(None, None)
295				};
296
297				if let Some(ns) = ns {
298					match tx.get_or_add_ns(None, &ns).await {
299						Err(e) => {
300							let _ = tx.cancel().await;
301							return Err(types_error_from_anyhow(e));
302						}
303						Ok(v) => v,
304					};
305
306					if let Some(db) = db {
307						match tx.ensure_ns_db(None, &ns, &db).await {
308							Err(e) => {
309								let _ = tx.cancel().await;
310								return Err(types_error_from_anyhow(e));
311							}
312							Ok(v) => v,
313						};
314						session.db = Some(db);
315					}
316
317					session.ns = Some(ns);
318				}
319
320				if let Err(e) = tx.commit().await {
321					let _ = tx.cancel().await;
322					return Err(types_error_from_anyhow(e));
323				}
324			}
325		} else {
326			// Update the selected namespace
327			match ns {
328				PublicValue::None => (),
329				PublicValue::Null => session.ns = None,
330				PublicValue::String(ns) => {
331					let kvs = self.kvs();
332					let tx = kvs
333						.transaction(TransactionType::Write, LockType::Optimistic)
334						.await
335						.map_err(types_error_from_anyhow)?;
336					run!(tx, tx.get_or_add_ns(None, &ns).await).map_err(types_error_from_anyhow)?;
337					session.ns = Some(ns)
338				}
339				unexpected => {
340					return Err(invalid_params(format!(
341						"Expected ns to be string, got {unexpected:?}"
342					)));
343				}
344			}
345			// Update the selected database
346			match db {
347				PublicValue::None => (),
348				PublicValue::Null => session.db = None,
349				PublicValue::String(db) => {
350					let ns = session.ns.clone().expect("namespace should be set");
351					let tx = self
352						.kvs()
353						.transaction(TransactionType::Write, LockType::Optimistic)
354						.await
355						.map_err(types_error_from_anyhow)?;
356					run!(tx, tx.ensure_ns_db(None, &ns, &db).await)
357						.map_err(types_error_from_anyhow)?;
358					session.db = Some(db)
359				}
360				unexpected => {
361					return Err(invalid_params(format!(
362						"Expected db to be string, got {unexpected:?}"
363					)));
364				}
365			}
366		}
367		// Clear any residual database
368		if session.ns.is_none() && session.db.is_some() {
369			session.db = None;
370		}
371		// Log the session ns/db values for debugging
372		trace!(
373			"USE response: session_id={:?}, ns={:?}, db={:?}",
374			session_id, session.ns, session.db
375		);
376		// Build the return value
377		let value = PublicValue::from_t(object! {
378			namespace: session.ns.clone(),
379			database: session.db.clone(),
380		});
381		// Return the namespace and database
382		Ok(DbResult::Other(value))
383	}
384
385	async fn signup(
386		&self,
387		session_id: Option<Uuid>,
388		params: PublicArray,
389	) -> Result<DbResult, surrealdb_types::Error> {
390		// Process the method arguments
391		let Some(PublicValue::Object(params)) = extract_args(params.into_vec()) else {
392			return Err(invalid_params("Expected (params:object)".to_string()));
393		};
394		// Get a write lock on the session
395		let session_lock = self.get_session(&session_id)?;
396		let mut session = session_lock.write().await;
397		// Attempt signup, mutating the session
398		let out: Result<PublicValue> =
399			crate::iam::signup::signup(self.kvs(), &mut session, params.into())
400				.await
401				.map(SurrealValue::into_value);
402		// Return the signup result
403		out.map(DbResult::Other).map_err(types_error_from_anyhow)
404	}
405
406	async fn signin(
407		&self,
408		session_id: Option<Uuid>,
409		params: PublicArray,
410	) -> Result<DbResult, surrealdb_types::Error> {
411		// Process the method arguments
412		let Some(PublicValue::Object(params)) = extract_args(params.into_vec()) else {
413			return Err(invalid_params("Expected (params:object)".to_string()));
414		};
415		// Get a write lock on the session
416		let session_lock = self.get_session(&session_id)?;
417		let mut session = session_lock.write().await;
418		// Attempt signin, mutating the session
419		let out: Result<PublicValue> =
420			crate::iam::signin::signin(self.kvs(), &mut session, params.into())
421				.await
422				.map(SurrealValue::into_value);
423		// Return the signin result
424		out.map(DbResult::Other).map_err(types_error_from_anyhow)
425	}
426
427	async fn authenticate(
428		&self,
429		session_id: Option<Uuid>,
430		params: PublicArray,
431	) -> Result<DbResult, surrealdb_types::Error> {
432		// Process the method arguments
433		let Some(PublicValue::String(token)) = extract_args(params.into_vec()) else {
434			return Err(invalid_params("Expected (token:string)".to_string()));
435		};
436		// Get a write lock on the session
437		let session_lock = self.get_session(&session_id)?;
438		let mut session = session_lock.write().await;
439		// Log before authentication
440		trace!(
441			"Authenticate RPC: session_id={:?}, before: ns={:?}, db={:?}",
442			session_id, session.ns, session.db
443		);
444		// Attempt authentication, mutating the session
445		let out: Result<PublicValue> =
446			crate::iam::verify::token(self.kvs(), &mut session, token.as_str())
447				.await
448				.map(|_| PublicValue::None);
449		// Log after authentication
450		trace!(
451			"Authenticate RPC: session_id={:?}, after: ns={:?}, db={:?}",
452			session_id, session.ns, session.db
453		);
454		// Return nothing on success
455		out.map(DbResult::Other).map_err(types_error_from_anyhow)
456	}
457
458	/// Refreshes an access token using a refresh token.
459	///
460	/// This RPC method implements the token refresh flow, allowing clients to
461	/// obtain a new access token without re-authenticating. The method:
462	///
463	/// 1. Validates the provided token contains both access and refresh components
464	/// 2. Uses the refresh token to authenticate and create new tokens
465	/// 3. Revokes the old refresh token (single-use security model)
466	/// 4. Updates the session with the new authentication state
467	/// 5. Returns the new token pair to the client
468	///
469	/// # Arguments
470	///
471	/// * `session_id` - Optional session identifier for stateful connections
472	/// * `params` - Array containing the token with both access and refresh components
473	///
474	/// # Returns
475	///
476	/// A new token containing fresh access and refresh tokens.
477	///
478	/// # Errors
479	///
480	/// Returns an error if:
481	/// - The token parameter is missing or invalid
482	/// - The token doesn't contain a refresh component
483	/// - The refresh token is invalid, expired, or already revoked
484	async fn refresh(
485		&self,
486		session_id: Option<Uuid>,
487		params: PublicArray,
488	) -> Result<DbResult, surrealdb_types::Error> {
489		// Process the method arguments
490		let unexpected = || invalid_params("Expected (token:Token)".to_string());
491		let Some(value) = extract_args(params.into_vec()) else {
492			return Err(unexpected());
493		};
494		let Ok(token) = Token::from_value(value) else {
495			return Err(unexpected());
496		};
497		// Get a write lock on the session
498		let session_lock = self.get_session(&session_id)?;
499		let mut session = session_lock.write().await;
500		// Attempt token refresh, which will:
501		// - Validate the refresh token
502		// - Revoke the old refresh token
503		// - Create new access and refresh tokens
504		// - Update the session with the new authentication state
505		let out: Result<PublicValue> =
506			token.refresh(self.kvs(), &mut session).await.map(Token::into_value);
507		// Return the new token pair
508		out.map(DbResult::Other).map_err(types_error_from_anyhow)
509	}
510
511	async fn invalidate(
512		&self,
513		session_id: Option<Uuid>,
514	) -> Result<DbResult, surrealdb_types::Error> {
515		// Get a write lock on the session
516		let session_lock = self.get_session(&session_id)?;
517		let mut session = session_lock.write().await;
518		// Clear the current session
519		crate::iam::clear::clear(&mut session).map_err(types_error_from_anyhow)?;
520		// Return nothing on success
521		Ok(DbResult::Other(PublicValue::None))
522	}
523
524	/// Revokes a refresh token, preventing it from being used to obtain new access tokens.
525	///
526	/// This RPC method explicitly invalidates a refresh token without affecting the
527	/// current session. This is useful for:
528	///
529	/// - Logout operations where you want to prevent future token refreshes
530	/// - Security events requiring immediate token invalidation
531	/// - Explicit token lifecycle management
532	///
533	/// Unlike `invalidate()`, which clears the entire session, `revoke()` only
534	/// invalidates the specific refresh token, allowing other sessions using
535	/// different tokens to remain active.
536	///
537	/// # Arguments
538	///
539	/// * `params` - Array containing the token with the refresh token to revoke
540	///
541	/// # Returns
542	///
543	/// Returns nothing on success.
544	///
545	/// # Errors
546	///
547	/// Returns an error if:
548	/// - The token parameter is missing or invalid
549	/// - The token doesn't contain a refresh component
550	/// - The token doesn't contain valid namespace/database/access information
551	async fn revoke(&self, params: PublicArray) -> Result<DbResult, surrealdb_types::Error> {
552		// Process the method arguments
553		let unexpected = || invalid_params("Expected (token:Token)".to_string());
554		let Some(value) = extract_args(params.into_vec()) else {
555			return Err(unexpected());
556		};
557		let Ok(token) = Token::from_value(value) else {
558			return Err(unexpected());
559		};
560		// Revoke the refresh token by removing the grant record from the database.
561		// This prevents the refresh token from being used to obtain new access tokens.
562		token.revoke_refresh_token(self.kvs()).await.map_err(types_error_from_anyhow)?;
563		// Return nothing on success
564		Ok(DbResult::Other(PublicValue::None))
565	}
566
567	async fn reset(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
568		// Get a write lock on the session
569		let session_lock = self.get_session(&session_id)?;
570		let mut session = session_lock.write().await;
571		// Reset the current session
572		crate::iam::reset::reset(&mut session);
573		// Cleanup live queries
574		self.cleanup_lqs(session_id.as_ref()).await;
575		// Return nothing on success
576		Ok(DbResult::Other(PublicValue::None))
577	}
578
579	// ------------------------------
580	// Methods for identification
581	// ------------------------------
582
583	async fn info(
584		&self,
585		_txn: Option<Uuid>,
586		session_id: Option<Uuid>,
587	) -> Result<DbResult, surrealdb_types::Error> {
588		let session_lock = self.get_session(&session_id)?;
589		let session = session_lock.read().await;
590		let vars = Some(session.variables.clone());
591		let mut res = self.kvs().execute("SELECT * FROM $auth", &session, vars).await?;
592
593		let result = res.remove(0).result?;
594
595		let first = result.first().unwrap_or_default();
596		Ok(DbResult::Other(first))
597	}
598
599	// ------------------------------
600	// Methods for setting variables
601	// ------------------------------
602
603	async fn set(
604		&self,
605		session_id: Option<Uuid>,
606		params: PublicArray,
607	) -> Result<DbResult, surrealdb_types::Error> {
608		let session_lock = self.get_session(&session_id)?;
609
610		// Check permissions with read lock
611		{
612			let session = session_lock.read().await;
613			// Check if the user is allowed to query
614			if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
615				return Err(method_not_allowed(Method::Set.to_string()));
616			}
617		}
618
619		// Process the method arguments
620		let Some((PublicValue::String(key), val)) =
621			extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
622		else {
623			return Err(invalid_params("Expected (key:string, value:Value)".to_string()));
624		};
625
626		// Get a write lock on the session
627		let mut session = session_lock.write().await;
628
629		if session.expired() {
630			return Err(session_expired());
631		}
632
633		match val {
634			None | Some(PublicValue::None) => session.variables.remove(key.as_str()),
635			Some(val) => {
636				crate::rpc::check_protected_param(&key)?;
637				session.variables.insert(key, val)
638			}
639		}
640
641		// Return nothing
642		Ok(DbResult::Other(PublicValue::Null))
643	}
644
645	async fn unset(
646		&self,
647		session_id: Option<Uuid>,
648		params: PublicArray,
649	) -> Result<DbResult, surrealdb_types::Error> {
650		let session_lock = self.get_session(&session_id)?;
651
652		// Check permissions with read lock
653		{
654			let session = session_lock.read().await;
655			// Check if the user is allowed to query
656			if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
657				return Err(method_not_allowed(Method::Unset.to_string()));
658			}
659		}
660
661		// Process the method arguments
662		let Some(PublicValue::String(key)) = extract_args(params.into_vec()) else {
663			return Err(invalid_params("Expected (key)".to_string()));
664		};
665
666		// Get a write lock on the session
667		let mut session = session_lock.write().await;
668		session.variables.remove(key.as_str());
669
670		Ok(DbResult::Other(PublicValue::Null))
671	}
672
673	// ------------------------------
674	// Methods for live queries
675	// ------------------------------
676
677	async fn kill(
678		&self,
679		txn: Option<Uuid>,
680		session_id: Option<Uuid>,
681		params: PublicArray,
682	) -> Result<DbResult, surrealdb_types::Error> {
683		let session_lock = self.get_session(&session_id)?;
684		let session = session_lock.read().await;
685		// Check if the user is allowed to query
686		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
687			return Err(method_not_allowed(Method::Kill.to_string()));
688		}
689		// Process the method arguments
690		let (id,) = extract_args::<(PublicValue,)>(params.into_vec())
691			.ok_or(invalid_params("Expected (id)".to_string()))?;
692
693		// Specify the SQL query string
694		let ast = Ast {
695			expressions: vec![TopLevelExpr::Kill(KillStatement {
696				id: Expr::from_public_value(id),
697			})],
698		};
699		// Specify the query parameters
700		let vars = Some(session.variables.clone());
701		// Execute the query on the database
702		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
703			.await
704			.map_err(types_error_from_anyhow)?;
705		// Extract the first query result
706		Ok(DbResult::Other(res.remove(0).result?))
707	}
708
709	async fn live(
710		&self,
711		txn: Option<Uuid>,
712		session_id: Option<Uuid>,
713		params: PublicArray,
714	) -> Result<DbResult, surrealdb_types::Error> {
715		let session_lock = self.get_session(&session_id)?;
716		let session = session_lock.read().await;
717		// Check if the user is allowed to query
718		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
719			return Err(method_not_allowed(Method::Live.to_string()));
720		}
721		// Process the method arguments
722		let (what, diff) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
723			.ok_or(invalid_params("Expected (what, diff)".to_string()))?;
724
725		// If value is a strand, handle it as if it was a table.
726		let what = match what {
727			PublicValue::String(x) => Expr::Table(x),
728			x => Expr::from_public_value(x),
729		};
730
731		let fields = if diff.unwrap_or_default().is_true() {
732			LiveFields::Diff
733		} else {
734			LiveFields::Select(Fields::all())
735		};
736
737		// Specify the SQL query string
738		let sql = LiveStatement {
739			fields,
740			what,
741			cond: None,
742			fetch: None,
743		};
744		let ast = Ast {
745			expressions: vec![TopLevelExpr::Live(Box::new(sql))],
746		};
747		// Specify the query parameters
748		let vars = Some(session.variables.clone());
749
750		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
751			.await
752			.map_err(types_error_from_anyhow)?;
753
754		// Extract the first query result
755		let first = res.remove(0).result?;
756		Ok(DbResult::Other(first))
757	}
758
759	// ------------------------------
760	// Methods for selecting
761	// ------------------------------
762
763	async fn select(
764		&self,
765		txn: Option<Uuid>,
766		session_id: Option<Uuid>,
767		params: PublicArray,
768	) -> Result<DbResult, surrealdb_types::Error> {
769		let session_lock = self.get_session(&session_id)?;
770		let session = session_lock.read().await;
771		// Check if the user is allowed to query
772		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
773			return Err(method_not_allowed(Method::Select.to_string()));
774		}
775		// Process the method arguments
776		let (what,) = extract_args::<(PublicValue,)>(params.into_vec())
777			.ok_or(invalid_params("Expected (what:Value)".to_string()))?;
778
779		// If the what is a single record with a non range value, make it return only a
780		// single result.
781		let only = match what {
782			PublicValue::RecordId(ref x) => !x.key.is_range(),
783			_ => false,
784		};
785
786		// If value is a string, handle it as if it was a table.
787		let what = match what {
788			PublicValue::String(x) => Expr::Table(x),
789			x => Expr::from_public_value(x),
790		};
791
792		// Specify the SQL query string
793		let sql = SelectStatement {
794			only,
795			fields: Fields::all(),
796			what: vec![what],
797			with: None,
798			cond: None,
799			omit: vec![],
800			split: None,
801			group: None,
802			order: None,
803			limit: None,
804			start: None,
805			fetch: None,
806			version: Expr::Literal(Literal::None),
807			timeout: Expr::Literal(Literal::None),
808			explain: None,
809			tempfiles: false,
810		};
811		let ast = Ast::single_expr(Expr::Select(Box::new(sql)));
812
813		// Specify the query parameters
814		let vars = Some(session.variables.clone());
815		// Execute the query on the database
816		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
817			.await
818			.map_err(types_error_from_anyhow)?;
819		// Extract the first query result
820		let first = res.remove(0).result?;
821		Ok(DbResult::Other(first))
822	}
823
824	// ------------------------------
825	// Methods for inserting
826	// ------------------------------
827
828	async fn insert(
829		&self,
830		txn: Option<Uuid>,
831		session_id: Option<Uuid>,
832		params: PublicArray,
833	) -> Result<DbResult, surrealdb_types::Error> {
834		let session_lock = self.get_session(&session_id)?;
835		let session = session_lock.read().await;
836		// Check if the user is allowed to query
837		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
838			return Err(method_not_allowed(Method::Insert.to_string()));
839		}
840		// Process the method arguments
841		let (what, data) = extract_args::<(PublicValue, PublicValue)>(params.into_vec())
842			.ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
843
844		let into = match what {
845			PublicValue::Null | PublicValue::None => None,
846			PublicValue::Table(x) => Some(Expr::Table(x.into_string())),
847			PublicValue::String(x) => Some(Expr::Table(x)),
848			x => Some(Expr::from_public_value(x)),
849		};
850
851		// Specify the SQL query string
852		let sql = InsertStatement {
853			into,
854			data: SqlData::SingleExpression(Expr::from_public_value(data)),
855			output: Some(Output::After),
856			ignore: false,
857			update: None,
858			timeout: Expr::Literal(Literal::None),
859			relation: false,
860		};
861		let ast = Ast::single_expr(Expr::Insert(Box::new(sql)));
862		// Specify the query parameters
863		let var = Some(session.variables.clone());
864		// Execute the query on the database
865		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
866			.await
867			.map_err(types_error_from_anyhow)?;
868		// Extract the first query result
869		let first = res.remove(0).result?;
870		Ok(DbResult::Other(first))
871	}
872
873	async fn insert_relation(
874		&self,
875		txn: Option<Uuid>,
876		session_id: Option<Uuid>,
877		params: PublicArray,
878	) -> Result<DbResult, surrealdb_types::Error> {
879		let session_lock = self.get_session(&session_id)?;
880		let session = session_lock.read().await;
881		// Check if the user is allowed to query
882		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
883			return Err(method_not_allowed(Method::InsertRelation.to_string()));
884		}
885		// Process the method arguments
886		let (what, data) = extract_args::<(PublicValue, PublicValue)>(params.to_vec())
887			.ok_or(invalid_params("Expected (what, data)".to_string()))?;
888
889		let table_name = match what {
890			PublicValue::Null | PublicValue::None => None,
891			PublicValue::Table(x) => Some(Expr::Table(x.into_string())),
892			PublicValue::String(x) => Some(Expr::Table(x)),
893			x => Some(Expr::from_public_value(x)),
894		};
895
896		let data = SqlData::SingleExpression(Expr::from_public_value(data));
897
898		// Specify the SQL query string
899		let sql = InsertStatement {
900			relation: true,
901			into: table_name,
902			data,
903			output: Some(Output::After),
904			ignore: false,
905			update: None,
906			timeout: Expr::Literal(Literal::None),
907		};
908		let ast = Ast::single_expr(Expr::Insert(Box::new(sql)));
909		// Specify the query parameters
910		let var = Some(session.variables.clone());
911		// Execute the query on the database
912		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
913			.await
914			.map_err(types_error_from_anyhow)?;
915		// Extract the first query result
916		let first = res.remove(0).result?;
917		Ok(DbResult::Other(first))
918	}
919
920	// ------------------------------
921	// Methods for creating
922	// ------------------------------
923
924	async fn create(
925		&self,
926		txn: Option<Uuid>,
927		session_id: Option<Uuid>,
928		params: PublicArray,
929	) -> Result<DbResult, surrealdb_types::Error> {
930		let session_lock = self.get_session(&session_id)?;
931		let session = session_lock.read().await;
932		// Check if the user is allowed to query
933		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
934			return Err(method_not_allowed(Method::Create.to_string()));
935		}
936		// Process the method arguments
937		let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
938			.ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
939
940		let only = match what {
941			PublicValue::String(_) => true,
942			PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
943			_ => false,
944		};
945
946		let data = data
947			.and_then(|x| {
948				if x.is_nullish() {
949					None
950				} else {
951					Some(x)
952				}
953			})
954			.map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
955
956		// Specify the SQL query string
957		let sql = CreateStatement {
958			only,
959			what: vec![value_to_table(what)],
960			data,
961			output: Some(Output::After),
962			timeout: Expr::Literal(Literal::None),
963		};
964		let ast = Ast::single_expr(Expr::Create(Box::new(sql)));
965		// Execute the query on the database
966		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), None)
967			.await
968			.map_err(types_error_from_anyhow)?;
969		// Extract the first query result
970		let first = res.remove(0).result?;
971		Ok(DbResult::Other(first))
972	}
973
974	// ------------------------------
975	// Methods for upserting
976	// ------------------------------
977
978	async fn upsert(
979		&self,
980		txn: Option<Uuid>,
981		session_id: Option<Uuid>,
982		params: PublicArray,
983	) -> Result<DbResult, surrealdb_types::Error> {
984		let session_lock = self.get_session(&session_id)?;
985		let session = session_lock.read().await;
986		// Check if the user is allowed to query
987		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
988			return Err(method_not_allowed(Method::Upsert.to_string()));
989		}
990		// Process the method arguments
991		let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
992			.ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
993
994		let only = match what {
995			PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
996			_ => false,
997		};
998
999		let data = data
1000			.and_then(|x| {
1001				if x.is_nullish() {
1002					None
1003				} else {
1004					Some(x)
1005				}
1006			})
1007			.map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1008
1009		// Specify the SQL query string
1010		let sql = UpsertStatement {
1011			only,
1012			what: vec![value_to_table(what)],
1013			data,
1014			output: Some(Output::After),
1015			with: None,
1016			cond: None,
1017			timeout: Expr::Literal(Literal::None),
1018			explain: None,
1019		};
1020		let ast = Ast::single_expr(Expr::Upsert(Box::new(sql)));
1021		// Specify the query parameters
1022		let var = Some(session.variables.clone());
1023		// Execute the query on the database
1024		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1025			.await
1026			.map_err(types_error_from_anyhow)?;
1027		// Extract the first query result
1028		let first = res.remove(0).result?;
1029		Ok(DbResult::Other(first))
1030	}
1031
1032	// ------------------------------
1033	// Methods for updating
1034	// ------------------------------
1035
1036	async fn update(
1037		&self,
1038		_txn: Option<Uuid>,
1039		session_id: Option<Uuid>,
1040		params: PublicArray,
1041	) -> Result<DbResult, surrealdb_types::Error> {
1042		let session_lock = self.get_session(&session_id)?;
1043		let session = session_lock.read().await;
1044		// Check if the user is allowed to query
1045		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1046			return Err(method_not_allowed(Method::Update.to_string()));
1047		}
1048		// Process the method arguments
1049		let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1050			.ok_or(invalid_params("Expected (what, data)".to_string()))?;
1051
1052		let only = match what {
1053			PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1054			_ => false,
1055		};
1056
1057		let data = data
1058			.and_then(|x| {
1059				if x.is_nullish() {
1060					None
1061				} else {
1062					Some(x)
1063				}
1064			})
1065			.map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1066		// Specify the SQL query string
1067		let sql = UpdateStatement {
1068			only,
1069			what: vec![value_to_table(what)],
1070			data,
1071			output: Some(Output::After),
1072			with: None,
1073			cond: None,
1074			timeout: Expr::Literal(Literal::None),
1075			explain: None,
1076		};
1077		let ast = Ast::single_expr(Expr::Update(Box::new(sql)));
1078		// Specify the query parameters
1079		let var = Some(session.variables.clone());
1080		// Execute the query on the database
1081		let mut res = self.kvs().process(ast, &session, var).await?;
1082		// Extract the first query result
1083		let first = res.remove(0).result?;
1084		Ok(DbResult::Other(first))
1085	}
1086
1087	// ------------------------------
1088	// Methods for merging
1089	// ------------------------------
1090
1091	async fn merge(
1092		&self,
1093		txn: Option<Uuid>,
1094		session_id: Option<Uuid>,
1095		params: PublicArray,
1096	) -> Result<DbResult, surrealdb_types::Error> {
1097		let session_lock = self.get_session(&session_id)?;
1098		let session = session_lock.read().await;
1099		// Check if the user is allowed to query
1100		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1101			return Err(method_not_allowed(Method::Merge.to_string()));
1102		}
1103		// Process the method arguments
1104		let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1105			.ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
1106
1107		let only = match what {
1108			PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1109			_ => false,
1110		};
1111
1112		let data = data
1113			.and_then(|x| {
1114				if x.is_nullish() {
1115					None
1116				} else {
1117					Some(x)
1118				}
1119			})
1120			.map(|x| SqlData::MergeExpression(Expr::from_public_value(x)));
1121		// Specify the SQL query string
1122		let sql = UpdateStatement {
1123			only,
1124			what: vec![value_to_table(what)],
1125			data,
1126			output: Some(Output::After),
1127			..Default::default()
1128		};
1129		let ast = Ast::single_expr(Expr::Update(Box::new(sql)));
1130		// Specify the query parameters
1131		let var = Some(session.variables.clone());
1132		// Execute the query on the database
1133		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1134			.await
1135			.map_err(types_error_from_anyhow)?;
1136		// Extract the first query result
1137		let first = res.remove(0).result?;
1138		Ok(DbResult::Other(first))
1139	}
1140
1141	// ------------------------------
1142	// Methods for patching
1143	// ------------------------------
1144
1145	async fn patch(
1146		&self,
1147		_txn: Option<Uuid>,
1148		session_id: Option<Uuid>,
1149		params: PublicArray,
1150	) -> Result<DbResult, surrealdb_types::Error> {
1151		let session_lock = self.get_session(&session_id)?;
1152		let session = session_lock.read().await;
1153		// Check if the user is allowed to query
1154		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1155			return Err(method_not_allowed(Method::Patch.to_string()));
1156		}
1157		// Process the method arguments
1158		let (what, data, diff) =
1159			extract_args::<(PublicValue, Option<PublicValue>, Option<PublicValue>)>(
1160				params.into_vec(),
1161			)
1162			.ok_or(invalid_params("Expected (what:Value, data:Value, diff:Value)".to_string()))?;
1163
1164		// Process the method arguments
1165		let only = match what {
1166			PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1167			_ => false,
1168		};
1169
1170		let data = data
1171			.and_then(|x| {
1172				if x.is_nullish() {
1173					None
1174				} else {
1175					Some(x)
1176				}
1177			})
1178			.map(|x| SqlData::PatchExpression(Expr::from_public_value(x)));
1179
1180		let diff = matches!(diff, Some(PublicValue::Bool(true)));
1181
1182		// Specify the SQL query string
1183		let expr = Expr::Update(Box::new(UpdateStatement {
1184			only,
1185			what: vec![value_to_table(what)],
1186			data,
1187			output: if diff {
1188				Some(Output::Diff)
1189			} else {
1190				Some(Output::After)
1191			},
1192			with: None,
1193			cond: None,
1194			timeout: Expr::Literal(Literal::None),
1195			explain: None,
1196		}));
1197		// Specify the query parameters
1198		let var = Some(session.variables.clone());
1199		// Execute the query on the database
1200		let mut res = self.kvs().process(Ast::single_expr(expr), &session, var).await?;
1201		// Extract the first query result
1202		let first = res.remove(0).result?;
1203		Ok(DbResult::Other(first))
1204	}
1205
1206	// ------------------------------
1207	// Methods for relating
1208	// ------------------------------
1209
1210	async fn relate(
1211		&self,
1212		_txn: Option<Uuid>,
1213		session_id: Option<Uuid>,
1214		params: PublicArray,
1215	) -> Result<DbResult, surrealdb_types::Error> {
1216		let session_lock = self.get_session(&session_id)?;
1217		let session = session_lock.read().await;
1218		// Check if the user is allowed to query
1219		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1220			return Err(method_not_allowed(Method::Relate.to_string()));
1221		}
1222		// Process the method arguments
1223		let (from, kind, with, data) =
1224			extract_args::<(PublicValue, PublicValue, PublicValue, Option<PublicValue>)>(
1225				params.to_vec(),
1226			)
1227			.ok_or(invalid_params(
1228				"Expected (from:Value, kind:Value, with:Value, data:Value)".to_string(),
1229			))?;
1230
1231		// Returns if selecting on this value returns a single result.
1232		let only = singular(&from) && singular(&with);
1233
1234		let data = data
1235			.and_then(|x| {
1236				if x.is_nullish() {
1237					None
1238				} else {
1239					Some(x)
1240				}
1241			})
1242			.map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1243
1244		// Specify the SQL query string
1245		let expr = Expr::Relate(Box::new(RelateStatement {
1246			only,
1247			from: Expr::from_public_value(from),
1248			through: value_to_table(kind),
1249			to: Expr::from_public_value(with),
1250			data,
1251			output: Some(Output::After),
1252			timeout: Expr::Literal(Literal::None),
1253		}));
1254		// Specify the query parameters
1255		let var = Some(session.variables.clone());
1256		// Execute the query on the database
1257		let mut res = self.kvs().process(Ast::single_expr(expr), &session, var).await?;
1258		// Extract the first query result
1259		let first = res.remove(0).result?;
1260		Ok(DbResult::Other(first))
1261	}
1262
1263	// ------------------------------
1264	// Methods for deleting
1265	// ------------------------------
1266
1267	async fn delete(
1268		&self,
1269		txn: Option<Uuid>,
1270		session_id: Option<Uuid>,
1271		params: PublicArray,
1272	) -> Result<DbResult, surrealdb_types::Error> {
1273		let session_lock = self.get_session(&session_id)?;
1274		let session = session_lock.read().await;
1275		// Check if the user is allowed to query
1276		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1277			return Err(method_not_allowed(Method::Delete.to_string()));
1278		}
1279		// Process the method arguments
1280		let (what,) = extract_args::<(PublicValue,)>(params.into_vec())
1281			.ok_or(invalid_params("Expected (what:Value)".to_string()))?;
1282		// Specify the SQL query string
1283		let sql = Expr::Delete(Box::new(DeleteStatement {
1284			only: singular(&what),
1285			what: vec![value_to_table(what)],
1286			output: Some(Output::Before),
1287			with: None,
1288			cond: None,
1289			timeout: Expr::Literal(Literal::None),
1290			explain: None,
1291		}));
1292		let ast = Ast::single_expr(sql);
1293		// Specify the query parameters
1294		let var = Some(session.variables.clone());
1295		// Execute the query on the database
1296		let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1297			.await
1298			.map_err(types_error_from_anyhow)?;
1299		// Extract the first query result
1300		let first = res.remove(0).result?;
1301		Ok(DbResult::Other(first))
1302	}
1303
1304	// ------------------------------
1305	// Methods for getting info
1306	// ------------------------------
1307
1308	async fn version(
1309		&self,
1310		_txn: Option<Uuid>,
1311		params: PublicArray,
1312	) -> Result<DbResult, surrealdb_types::Error> {
1313		match params.len() {
1314			0 => Ok(self.version_data()),
1315			_ => Err(invalid_params("Expected 0 arguments".to_string())),
1316		}
1317	}
1318
1319	// ------------------------------
1320	// Methods for querying
1321	// ------------------------------
1322
1323	async fn query(
1324		&self,
1325		txn: Option<Uuid>,
1326		session_id: Option<Uuid>,
1327		params: PublicArray,
1328	) -> Result<DbResult, surrealdb_types::Error> {
1329		let session_lock = self.get_session(&session_id)?;
1330		let session = session_lock.read().await;
1331		// Check if the user is allowed to query
1332		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1333			return Err(method_not_allowed(Method::Query.to_string()));
1334		}
1335		// Process the method arguments
1336		let (query, vars) =
1337			extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1338				.ok_or(invalid_params("Expected (query:string, vars:object)".to_string()))?;
1339
1340		let PublicValue::String(query) = query else {
1341			return Err(invalid_params("Expected query to be string".to_string()));
1342		};
1343
1344		// Specify the query variables
1345		let vars = match vars {
1346			Some(PublicValue::Object(v)) => {
1347				let mut merged = session.variables.clone();
1348				merged.extend(v.into());
1349				Some(merged)
1350			}
1351			None | Some(PublicValue::None | PublicValue::Null) => Some(session.variables.clone()),
1352			unexpected => {
1353				return Err(invalid_params(format!(
1354					"Expected vars to be object, got {unexpected:?}"
1355				)));
1356			}
1357		};
1358
1359		Ok(DbResult::Query(
1360			run_query(self, txn, session_id, QueryForm::Text(&query), vars)
1361				.await
1362				.map_err(types_error_from_anyhow)?,
1363		))
1364	}
1365
1366	// ------------------------------
1367	// Methods for running functions
1368	// ------------------------------
1369
1370	async fn run(
1371		&self,
1372		_txn: Option<Uuid>,
1373		session_id: Option<Uuid>,
1374		params: PublicArray,
1375	) -> Result<DbResult, surrealdb_types::Error> {
1376		let session_lock = self.get_session(&session_id)?;
1377		let session = session_lock.read().await;
1378		// Check if the user is allowed to query
1379		if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1380			return Err(method_not_allowed(Method::Run.to_string()));
1381		}
1382		// Process the method arguments
1383		let (name, version, args) = extract_args::<(
1384			PublicValue,
1385			Option<PublicValue>,
1386			Option<PublicValue>,
1387		)>(params.into_vec())
1388		.ok_or(invalid_params("Expected (name:string, version:string, args:array)".to_string()))?;
1389		// Parse the function name argument
1390		let name = match name {
1391			PublicValue::String(v) => v,
1392			unexpected => {
1393				return Err(invalid_params(format!(
1394					"Expected name to be string, got {unexpected:?}"
1395				)));
1396			}
1397		};
1398		// Parse any function version argument
1399		let version = match version {
1400			Some(PublicValue::String(v)) => Some(v),
1401			None | Some(PublicValue::None | PublicValue::Null) => None,
1402			unexpected => {
1403				return Err(invalid_params(format!(
1404					"Expected version to be string, got {unexpected:?}"
1405				)));
1406			}
1407		};
1408		// Parse the function arguments if specified
1409		let args = match args {
1410			Some(PublicValue::Array(args)) => {
1411				args.into_iter().map(Expr::from_public_value).collect::<Vec<Expr>>()
1412			}
1413			None | Some(PublicValue::None | PublicValue::Null) => vec![],
1414			unexpected => {
1415				return Err(invalid_params(format!(
1416					"Expected args to be array, got {unexpected:?}"
1417				)));
1418			}
1419		};
1420
1421		let segments = name.split("::").collect::<Vec<&str>>();
1422		let name = match segments.first() {
1423			Some(&"fn") => Function::Custom(segments[1..].join("::")),
1424			Some(&"mod") => {
1425				if !self
1426					.kvs()
1427					.get_capabilities()
1428					.allows_experimental(&ExperimentalTarget::Surrealism)
1429				{
1430					return Err(invalid_params(
1431						"Experimental capability `surrealism` is not enabled".to_string(),
1432					));
1433				}
1434
1435				let Some(name) = segments.get(1).map(|x| (*x).to_string()) else {
1436					return Err(invalid_params("Expected module name".to_string()));
1437				};
1438
1439				let sub = segments.get(2).map(|x| (*x).to_string());
1440
1441				Function::Module(name, sub)
1442			}
1443			Some(&"silo") => {
1444				if !self
1445					.kvs()
1446					.get_capabilities()
1447					.allows_experimental(&ExperimentalTarget::Surrealism)
1448				{
1449					return Err(invalid_params(
1450						"Experimental capability `surrealism` is not enabled".to_string(),
1451					));
1452				}
1453
1454				let Some(org) = segments.get(1).map(|x| (*x).to_string()) else {
1455					return Err(invalid_params("Expected silo organisation name".to_string()));
1456				};
1457
1458				let Some(pkg) = segments.get(2).map(|x| (*x).to_string()) else {
1459					return Err(invalid_params("Expected silo package name".to_string()));
1460				};
1461
1462				let Some(version) = version else {
1463					return Err(invalid_params("Expected silo version".to_string()));
1464				};
1465				let mut split = version.split('.');
1466				let major = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1467					invalid_params("Expected major version (u32) in version string".to_string())
1468				})?;
1469				let minor = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1470					invalid_params("Expected minor version (u32) in version string".to_string())
1471				})?;
1472				let patch = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1473					invalid_params("Expected patch version (u32) in version string".to_string())
1474				})?;
1475
1476				let sub = segments.get(3).map(|x| (*x).to_string());
1477
1478				Function::Silo {
1479					org,
1480					pkg,
1481					major,
1482					minor,
1483					patch,
1484					sub,
1485				}
1486			}
1487			Some(&"ml") => {
1488				let name = segments[1..].join("::");
1489				Function::Model(Model {
1490					name,
1491					version: version.ok_or(invalid_params(
1492						"Expected version to be set for model function".to_string(),
1493					))?,
1494				})
1495			}
1496			_ => Function::Normal(name),
1497		};
1498
1499		let expr = Expr::FunctionCall(Box::new(FunctionCall {
1500			receiver: name,
1501			arguments: args,
1502		}));
1503		let ast = Ast::single_expr(expr);
1504
1505		// Specify the query parameters
1506		let var = Some(session.variables.clone());
1507		// Execute the function on the database
1508		let mut res = run_query(self, None, session_id, QueryForm::Parsed(ast), var)
1509			.await
1510			.map_err(types_error_from_anyhow)?;
1511		// Extract the first query result
1512		let first = res.remove(0).result?;
1513		Ok(DbResult::Other(first))
1514	}
1515
1516	// ------------------------------
1517	// Methods for transactions
1518	// ------------------------------
1519
1520	/// Begin a new transaction
1521	async fn begin(
1522		&self,
1523		_txn: Option<Uuid>,
1524		_session_id: Option<Uuid>,
1525	) -> Result<DbResult, surrealdb_types::Error> {
1526		Err(method_not_allowed(Method::Begin.to_string()))
1527	}
1528
1529	/// Commit a transaction
1530	async fn commit(
1531		&self,
1532		_txn: Option<Uuid>,
1533		_session_id: Option<Uuid>,
1534		_params: PublicArray,
1535	) -> Result<DbResult, surrealdb_types::Error> {
1536		Err(method_not_allowed(Method::Commit.to_string()))
1537	}
1538
1539	/// Cancel a transaction
1540	async fn cancel(
1541		&self,
1542		_txn: Option<Uuid>,
1543		_session_id: Option<Uuid>,
1544		_params: PublicArray,
1545	) -> Result<DbResult, surrealdb_types::Error> {
1546		Err(method_not_allowed(Method::Cancel.to_string()))
1547	}
1548}
1549
1550enum QueryForm<'a> {
1551	Text(&'a str),
1552	Parsed(Ast),
1553}
1554
1555async fn run_query<T>(
1556	this: &T,
1557	txn: Option<Uuid>,
1558	session_id: Option<Uuid>,
1559	query: QueryForm<'_>,
1560	vars: Option<PublicVariables>,
1561) -> Result<Vec<QueryResult>>
1562where
1563	T: RpcProtocol + ?Sized,
1564{
1565	let session_lock = this.get_session(&session_id).map_err(anyhow::Error::from)?;
1566	let session = session_lock.read().await;
1567	if !T::LQ_SUPPORT && session.rt {
1568		return Err(bad_lq_config().into());
1569	}
1570
1571	// If a transaction UUID is provided, retrieve it and execute with it
1572	let res = if let Some(txn_id) = txn {
1573		// Retrieve the transaction - fail if not found
1574		let tx = this.get_tx(txn_id).await.map_err(anyhow::Error::from)?;
1575		// Execute with the existing transaction by passing it through context
1576		match query {
1577			QueryForm::Text(query) => {
1578				this.kvs().execute_with_transaction(query, &session, vars, tx).await?
1579			}
1580			QueryForm::Parsed(ast) => {
1581				this.kvs().process_with_transaction(ast, &session, vars, tx).await?
1582			}
1583		}
1584	} else {
1585		// No transaction - execute normally
1586		match query {
1587			QueryForm::Text(query) => this.kvs().execute(query, &session, vars).await?,
1588			QueryForm::Parsed(ast) => this.kvs().process(ast, &session, vars).await?,
1589		}
1590	};
1591
1592	// Post-process hooks for web layer
1593	for response in &res {
1594		match &response.query_type {
1595			QueryType::Live => {
1596				if let Ok(PublicValue::Uuid(lqid)) = &response.result {
1597					this.handle_live(lqid, session_id).await;
1598				}
1599			}
1600			QueryType::Kill => {
1601				if let Ok(PublicValue::Uuid(lqid)) = &response.result {
1602					this.handle_kill(lqid).await;
1603				}
1604			}
1605			_ => {}
1606		}
1607	}
1608	// Return the result to the client
1609	Ok(res)
1610}