1use std::sync::Arc;
2
3use anyhow::Result;
4use surrealdb_types::{HashMap, object};
5use tokio::sync::RwLock;
6use uuid::Uuid;
7
8use crate::catalog::providers::{CatalogProvider, NamespaceProvider, RootProvider};
9use crate::dbs::capabilities::{ExperimentalTarget, MethodTarget};
10use crate::dbs::{QueryResult, QueryType, Session};
11use crate::iam::token::Token;
12use crate::kvs::{Datastore, LockType, TransactionType};
13use crate::rpc::args::extract_args;
14use crate::rpc::{
15 DbResult, Method, bad_lq_config, invalid_params, method_not_allowed, method_not_found,
16 session_exists, session_expired, session_not_found, types_error_from_anyhow,
17};
18use crate::sql::statements::live::LiveFields;
19use crate::sql::{
20 Ast, CreateStatement, Data as SqlData, DeleteStatement, Expr, Fields, Function, FunctionCall,
21 InsertStatement, KillStatement, Literal, LiveStatement, Model, Output, RelateStatement,
22 SelectStatement, TopLevelExpr, UpdateStatement, UpsertStatement,
23};
24use crate::types::{
25 PublicArray, PublicRecordIdKey, PublicUuid, PublicValue, PublicVariables, SurrealValue,
26};
27
28fn value_to_table(value: PublicValue) -> Expr {
30 match value {
31 PublicValue::String(s) => Expr::Table(s),
32 x => Expr::from_public_value(x),
33 }
34}
35
36fn singular(value: &PublicValue) -> bool {
41 match value {
42 PublicValue::Object(_) => true,
43 PublicValue::RecordId(t) => !matches!(t.key, PublicRecordIdKey::Range(_)),
44 _ => false,
45 }
46}
47
48#[expect(async_fn_in_trait)]
49pub trait RpcProtocol {
50 fn kvs(&self) -> &Datastore;
52 fn version_data(&self) -> DbResult;
54
55 fn session_map(&self) -> &HashMap<Option<Uuid>, Arc<RwLock<Session>>>;
61
62 async fn attach(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
64 let mut session = Session::default().with_rt(Self::LQ_SUPPORT);
65 session.id = session_id;
66 match session_id {
67 Some(id) => {
68 if self.session_map().contains_key(&Some(id)) {
69 return Err(session_exists(id));
70 }
71 self.session_map().insert(Some(id), Arc::new(RwLock::new(session)));
72 Ok(DbResult::Other(PublicValue::None))
73 }
74 None => Err(invalid_params("Expected a session ID")),
75 }
76 }
77
78 async fn detach(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
80 match session_id {
81 Some(id) => {
82 self.del_session(&id).await;
83 Ok(DbResult::Other(PublicValue::None))
84 }
85 None => Err(invalid_params("Expected a session ID")),
86 }
87 }
88
89 fn get_session(
91 &self,
92 id: &Option<Uuid>,
93 ) -> Result<Arc<RwLock<Session>>, surrealdb_types::Error> {
94 match self.session_map().get(id) {
95 Some(session) => Ok(session),
96 None => Err(session_not_found(*id)),
97 }
98 }
99
100 fn set_session(&self, id: Option<Uuid>, session: Arc<RwLock<Session>>) {
102 self.session_map().insert(id, session);
103 }
104
105 async fn del_session(&self, id: &Uuid) {
107 self.session_map().remove(&Some(*id));
108 self.cleanup_lqs(Some(id)).await;
110 }
111
112 async fn sessions(&self) -> Result<DbResult, surrealdb_types::Error> {
114 let array = self
115 .session_map()
116 .to_vec()
117 .into_iter()
118 .filter_map(|(key, _)| key)
119 .map(|x| PublicValue::Uuid(PublicUuid::from(x)))
120 .collect();
121 Ok(DbResult::Other(PublicValue::Array(array)))
122 }
123
124 async fn get_tx(
130 &self,
131 _id: Uuid,
132 ) -> Result<Arc<crate::kvs::Transaction>, surrealdb_types::Error> {
133 Err(method_not_allowed(Method::Unknown.to_string()))
134 }
135
136 async fn set_tx(
138 &self,
139 _id: Uuid,
140 _tx: Arc<crate::kvs::Transaction>,
141 ) -> Result<(), surrealdb_types::Error> {
142 Err(method_not_found(Method::Unknown.to_string()))
143 }
144
145 const LQ_SUPPORT: bool = false;
151
152 fn handle_live(
154 &self,
155 _lqid: &Uuid,
156 _session_id: Option<Uuid>,
157 ) -> impl std::future::Future<Output = ()> + Send {
158 async { unimplemented!("handle_live function must be implemented if LQ_SUPPORT = true") }
159 }
160 fn handle_kill(&self, _lqid: &Uuid) -> impl std::future::Future<Output = ()> + Send {
162 async { unimplemented!("handle_kill function must be implemented if LQ_SUPPORT = true") }
163 }
164
165 fn cleanup_lqs(
167 &self,
168 session_id: Option<&Uuid>,
169 ) -> impl std::future::Future<Output = ()> + Send;
170
171 fn cleanup_all_lqs(&self) -> impl std::future::Future<Output = ()> + Send;
173
174 async fn execute(
180 &self,
181 txn: Option<Uuid>,
182 session: Option<Uuid>,
183 method: Method,
184 params: PublicArray,
185 ) -> Result<DbResult, surrealdb_types::Error> {
186 if !self.kvs().allows_rpc_method(&MethodTarget {
188 method,
189 }) {
190 warn!("Capabilities denied RPC method call attempt, target: '{method}'");
191 return Err(method_not_allowed(method.to_string()));
192 }
193 match method {
195 Method::Ping => Ok(DbResult::Other(PublicValue::None)),
196 Method::Info => self.info(txn, session).await,
197 Method::Use => self.yuse(session, params).await,
198 Method::Signup => self.signup(session, params).await,
199 Method::Signin => self.signin(session, params).await,
200 Method::Authenticate => self.authenticate(session, params).await,
201 Method::Refresh => self.refresh(session, params).await,
202 Method::Invalidate => self.invalidate(session).await,
203 Method::Revoke => self.revoke(params).await,
204 Method::Reset => self.reset(session).await,
205 Method::Kill => self.kill(txn, session, params).await,
206 Method::Live => self.live(txn, session, params).await,
207 Method::Set => self.set(session, params).await,
208 Method::Unset => self.unset(session, params).await,
209 Method::Query => self.query(txn, session, params).await,
210 Method::Version => self.version(txn, params).await,
211 Method::Begin => self.begin(txn, session).await,
212 Method::Commit => self.commit(txn, session, params).await,
213 Method::Cancel => self.cancel(txn, session, params).await,
214 Method::Sessions => self.sessions().await,
215 Method::Attach => self.attach(session).await,
216 Method::Detach => self.detach(session).await,
217 Method::Select => self.select(txn, session, params).await,
219 Method::Insert => self.insert(txn, session, params).await,
220 Method::Create => self.create(txn, session, params).await,
221 Method::Upsert => self.upsert(txn, session, params).await,
222 Method::Update => self.update(txn, session, params).await,
223 Method::Merge => self.merge(txn, session, params).await,
224 Method::Patch => self.patch(txn, session, params).await,
225 Method::Delete => self.delete(txn, session, params).await,
226 Method::Relate => self.relate(txn, session, params).await,
227 Method::Run => self.run(txn, session, params).await,
228 Method::InsertRelation => self.insert_relation(txn, session, params).await,
229 _ => Err(method_not_found(method.to_string())),
230 }
231 }
232
233 async fn yuse(
252 &self,
253 session_id: Option<Uuid>,
254 params: PublicArray,
255 ) -> Result<DbResult, surrealdb_types::Error> {
256 let session_lock = self.get_session(&session_id)?;
257
258 {
260 let session = session_lock.read().await;
261 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
263 return Err(method_not_allowed(Method::Use.to_string()));
264 }
265 }
266
267 let (ns, db) = extract_args::<(PublicValue, PublicValue)>(params.into_vec())
272 .ok_or(invalid_params("Expected (ns, db)".to_string()))?;
273 let mut session = session_lock.write().await;
275 if ns.is_none() && db.is_none() {
277 if session.ns.is_none() {
279 let kvs = self.kvs();
281 let tx = kvs
282 .transaction(TransactionType::Write, LockType::Optimistic)
283 .await
284 .map_err(types_error_from_anyhow)?;
285 let (ns, db) = if let Some(x) = match tx.get_default_config().await {
286 Err(e) => {
287 let _ = tx.cancel().await;
288 return Err(types_error_from_anyhow(e));
289 }
290 Ok(v) => v,
291 } {
292 (x.namespace.clone(), x.database.clone())
293 } else {
294 (None, None)
295 };
296
297 if let Some(ns) = ns {
298 match tx.get_or_add_ns(None, &ns).await {
299 Err(e) => {
300 let _ = tx.cancel().await;
301 return Err(types_error_from_anyhow(e));
302 }
303 Ok(v) => v,
304 };
305
306 if let Some(db) = db {
307 match tx.ensure_ns_db(None, &ns, &db).await {
308 Err(e) => {
309 let _ = tx.cancel().await;
310 return Err(types_error_from_anyhow(e));
311 }
312 Ok(v) => v,
313 };
314 session.db = Some(db);
315 }
316
317 session.ns = Some(ns);
318 }
319
320 if let Err(e) = tx.commit().await {
321 let _ = tx.cancel().await;
322 return Err(types_error_from_anyhow(e));
323 }
324 }
325 } else {
326 match ns {
328 PublicValue::None => (),
329 PublicValue::Null => session.ns = None,
330 PublicValue::String(ns) => {
331 let kvs = self.kvs();
332 let tx = kvs
333 .transaction(TransactionType::Write, LockType::Optimistic)
334 .await
335 .map_err(types_error_from_anyhow)?;
336 run!(tx, tx.get_or_add_ns(None, &ns).await).map_err(types_error_from_anyhow)?;
337 session.ns = Some(ns)
338 }
339 unexpected => {
340 return Err(invalid_params(format!(
341 "Expected ns to be string, got {unexpected:?}"
342 )));
343 }
344 }
345 match db {
347 PublicValue::None => (),
348 PublicValue::Null => session.db = None,
349 PublicValue::String(db) => {
350 let ns = session.ns.clone().expect("namespace should be set");
351 let tx = self
352 .kvs()
353 .transaction(TransactionType::Write, LockType::Optimistic)
354 .await
355 .map_err(types_error_from_anyhow)?;
356 run!(tx, tx.ensure_ns_db(None, &ns, &db).await)
357 .map_err(types_error_from_anyhow)?;
358 session.db = Some(db)
359 }
360 unexpected => {
361 return Err(invalid_params(format!(
362 "Expected db to be string, got {unexpected:?}"
363 )));
364 }
365 }
366 }
367 if session.ns.is_none() && session.db.is_some() {
369 session.db = None;
370 }
371 trace!(
373 "USE response: session_id={:?}, ns={:?}, db={:?}",
374 session_id, session.ns, session.db
375 );
376 let value = PublicValue::from_t(object! {
378 namespace: session.ns.clone(),
379 database: session.db.clone(),
380 });
381 Ok(DbResult::Other(value))
383 }
384
385 async fn signup(
386 &self,
387 session_id: Option<Uuid>,
388 params: PublicArray,
389 ) -> Result<DbResult, surrealdb_types::Error> {
390 let Some(PublicValue::Object(params)) = extract_args(params.into_vec()) else {
392 return Err(invalid_params("Expected (params:object)".to_string()));
393 };
394 let session_lock = self.get_session(&session_id)?;
396 let mut session = session_lock.write().await;
397 let out: Result<PublicValue> =
399 crate::iam::signup::signup(self.kvs(), &mut session, params.into())
400 .await
401 .map(SurrealValue::into_value);
402 out.map(DbResult::Other).map_err(types_error_from_anyhow)
404 }
405
406 async fn signin(
407 &self,
408 session_id: Option<Uuid>,
409 params: PublicArray,
410 ) -> Result<DbResult, surrealdb_types::Error> {
411 let Some(PublicValue::Object(params)) = extract_args(params.into_vec()) else {
413 return Err(invalid_params("Expected (params:object)".to_string()));
414 };
415 let session_lock = self.get_session(&session_id)?;
417 let mut session = session_lock.write().await;
418 let out: Result<PublicValue> =
420 crate::iam::signin::signin(self.kvs(), &mut session, params.into())
421 .await
422 .map(SurrealValue::into_value);
423 out.map(DbResult::Other).map_err(types_error_from_anyhow)
425 }
426
427 async fn authenticate(
428 &self,
429 session_id: Option<Uuid>,
430 params: PublicArray,
431 ) -> Result<DbResult, surrealdb_types::Error> {
432 let Some(PublicValue::String(token)) = extract_args(params.into_vec()) else {
434 return Err(invalid_params("Expected (token:string)".to_string()));
435 };
436 let session_lock = self.get_session(&session_id)?;
438 let mut session = session_lock.write().await;
439 trace!(
441 "Authenticate RPC: session_id={:?}, before: ns={:?}, db={:?}",
442 session_id, session.ns, session.db
443 );
444 let out: Result<PublicValue> =
446 crate::iam::verify::token(self.kvs(), &mut session, token.as_str())
447 .await
448 .map(|_| PublicValue::None);
449 trace!(
451 "Authenticate RPC: session_id={:?}, after: ns={:?}, db={:?}",
452 session_id, session.ns, session.db
453 );
454 out.map(DbResult::Other).map_err(types_error_from_anyhow)
456 }
457
458 async fn refresh(
485 &self,
486 session_id: Option<Uuid>,
487 params: PublicArray,
488 ) -> Result<DbResult, surrealdb_types::Error> {
489 let unexpected = || invalid_params("Expected (token:Token)".to_string());
491 let Some(value) = extract_args(params.into_vec()) else {
492 return Err(unexpected());
493 };
494 let Ok(token) = Token::from_value(value) else {
495 return Err(unexpected());
496 };
497 let session_lock = self.get_session(&session_id)?;
499 let mut session = session_lock.write().await;
500 let out: Result<PublicValue> =
506 token.refresh(self.kvs(), &mut session).await.map(Token::into_value);
507 out.map(DbResult::Other).map_err(types_error_from_anyhow)
509 }
510
511 async fn invalidate(
512 &self,
513 session_id: Option<Uuid>,
514 ) -> Result<DbResult, surrealdb_types::Error> {
515 let session_lock = self.get_session(&session_id)?;
517 let mut session = session_lock.write().await;
518 crate::iam::clear::clear(&mut session).map_err(types_error_from_anyhow)?;
520 Ok(DbResult::Other(PublicValue::None))
522 }
523
524 async fn revoke(&self, params: PublicArray) -> Result<DbResult, surrealdb_types::Error> {
552 let unexpected = || invalid_params("Expected (token:Token)".to_string());
554 let Some(value) = extract_args(params.into_vec()) else {
555 return Err(unexpected());
556 };
557 let Ok(token) = Token::from_value(value) else {
558 return Err(unexpected());
559 };
560 token.revoke_refresh_token(self.kvs()).await.map_err(types_error_from_anyhow)?;
563 Ok(DbResult::Other(PublicValue::None))
565 }
566
567 async fn reset(&self, session_id: Option<Uuid>) -> Result<DbResult, surrealdb_types::Error> {
568 let session_lock = self.get_session(&session_id)?;
570 let mut session = session_lock.write().await;
571 crate::iam::reset::reset(&mut session);
573 self.cleanup_lqs(session_id.as_ref()).await;
575 Ok(DbResult::Other(PublicValue::None))
577 }
578
579 async fn info(
584 &self,
585 _txn: Option<Uuid>,
586 session_id: Option<Uuid>,
587 ) -> Result<DbResult, surrealdb_types::Error> {
588 let session_lock = self.get_session(&session_id)?;
589 let session = session_lock.read().await;
590 let vars = Some(session.variables.clone());
591 let mut res = self.kvs().execute("SELECT * FROM $auth", &session, vars).await?;
592
593 let result = res.remove(0).result?;
594
595 let first = result.first().unwrap_or_default();
596 Ok(DbResult::Other(first))
597 }
598
599 async fn set(
604 &self,
605 session_id: Option<Uuid>,
606 params: PublicArray,
607 ) -> Result<DbResult, surrealdb_types::Error> {
608 let session_lock = self.get_session(&session_id)?;
609
610 {
612 let session = session_lock.read().await;
613 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
615 return Err(method_not_allowed(Method::Set.to_string()));
616 }
617 }
618
619 let Some((PublicValue::String(key), val)) =
621 extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
622 else {
623 return Err(invalid_params("Expected (key:string, value:Value)".to_string()));
624 };
625
626 let mut session = session_lock.write().await;
628
629 if session.expired() {
630 return Err(session_expired());
631 }
632
633 match val {
634 None | Some(PublicValue::None) => session.variables.remove(key.as_str()),
635 Some(val) => {
636 crate::rpc::check_protected_param(&key)?;
637 session.variables.insert(key, val)
638 }
639 }
640
641 Ok(DbResult::Other(PublicValue::Null))
643 }
644
645 async fn unset(
646 &self,
647 session_id: Option<Uuid>,
648 params: PublicArray,
649 ) -> Result<DbResult, surrealdb_types::Error> {
650 let session_lock = self.get_session(&session_id)?;
651
652 {
654 let session = session_lock.read().await;
655 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
657 return Err(method_not_allowed(Method::Unset.to_string()));
658 }
659 }
660
661 let Some(PublicValue::String(key)) = extract_args(params.into_vec()) else {
663 return Err(invalid_params("Expected (key)".to_string()));
664 };
665
666 let mut session = session_lock.write().await;
668 session.variables.remove(key.as_str());
669
670 Ok(DbResult::Other(PublicValue::Null))
671 }
672
673 async fn kill(
678 &self,
679 txn: Option<Uuid>,
680 session_id: Option<Uuid>,
681 params: PublicArray,
682 ) -> Result<DbResult, surrealdb_types::Error> {
683 let session_lock = self.get_session(&session_id)?;
684 let session = session_lock.read().await;
685 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
687 return Err(method_not_allowed(Method::Kill.to_string()));
688 }
689 let (id,) = extract_args::<(PublicValue,)>(params.into_vec())
691 .ok_or(invalid_params("Expected (id)".to_string()))?;
692
693 let ast = Ast {
695 expressions: vec![TopLevelExpr::Kill(KillStatement {
696 id: Expr::from_public_value(id),
697 })],
698 };
699 let vars = Some(session.variables.clone());
701 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
703 .await
704 .map_err(types_error_from_anyhow)?;
705 Ok(DbResult::Other(res.remove(0).result?))
707 }
708
709 async fn live(
710 &self,
711 txn: Option<Uuid>,
712 session_id: Option<Uuid>,
713 params: PublicArray,
714 ) -> Result<DbResult, surrealdb_types::Error> {
715 let session_lock = self.get_session(&session_id)?;
716 let session = session_lock.read().await;
717 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
719 return Err(method_not_allowed(Method::Live.to_string()));
720 }
721 let (what, diff) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
723 .ok_or(invalid_params("Expected (what, diff)".to_string()))?;
724
725 let what = match what {
727 PublicValue::String(x) => Expr::Table(x),
728 x => Expr::from_public_value(x),
729 };
730
731 let fields = if diff.unwrap_or_default().is_true() {
732 LiveFields::Diff
733 } else {
734 LiveFields::Select(Fields::all())
735 };
736
737 let sql = LiveStatement {
739 fields,
740 what,
741 cond: None,
742 fetch: None,
743 };
744 let ast = Ast {
745 expressions: vec![TopLevelExpr::Live(Box::new(sql))],
746 };
747 let vars = Some(session.variables.clone());
749
750 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
751 .await
752 .map_err(types_error_from_anyhow)?;
753
754 let first = res.remove(0).result?;
756 Ok(DbResult::Other(first))
757 }
758
759 async fn select(
764 &self,
765 txn: Option<Uuid>,
766 session_id: Option<Uuid>,
767 params: PublicArray,
768 ) -> Result<DbResult, surrealdb_types::Error> {
769 let session_lock = self.get_session(&session_id)?;
770 let session = session_lock.read().await;
771 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
773 return Err(method_not_allowed(Method::Select.to_string()));
774 }
775 let (what,) = extract_args::<(PublicValue,)>(params.into_vec())
777 .ok_or(invalid_params("Expected (what:Value)".to_string()))?;
778
779 let only = match what {
782 PublicValue::RecordId(ref x) => !x.key.is_range(),
783 _ => false,
784 };
785
786 let what = match what {
788 PublicValue::String(x) => Expr::Table(x),
789 x => Expr::from_public_value(x),
790 };
791
792 let sql = SelectStatement {
794 only,
795 fields: Fields::all(),
796 what: vec![what],
797 with: None,
798 cond: None,
799 omit: vec![],
800 split: None,
801 group: None,
802 order: None,
803 limit: None,
804 start: None,
805 fetch: None,
806 version: Expr::Literal(Literal::None),
807 timeout: Expr::Literal(Literal::None),
808 explain: None,
809 tempfiles: false,
810 };
811 let ast = Ast::single_expr(Expr::Select(Box::new(sql)));
812
813 let vars = Some(session.variables.clone());
815 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), vars)
817 .await
818 .map_err(types_error_from_anyhow)?;
819 let first = res.remove(0).result?;
821 Ok(DbResult::Other(first))
822 }
823
824 async fn insert(
829 &self,
830 txn: Option<Uuid>,
831 session_id: Option<Uuid>,
832 params: PublicArray,
833 ) -> Result<DbResult, surrealdb_types::Error> {
834 let session_lock = self.get_session(&session_id)?;
835 let session = session_lock.read().await;
836 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
838 return Err(method_not_allowed(Method::Insert.to_string()));
839 }
840 let (what, data) = extract_args::<(PublicValue, PublicValue)>(params.into_vec())
842 .ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
843
844 let into = match what {
845 PublicValue::Null | PublicValue::None => None,
846 PublicValue::Table(x) => Some(Expr::Table(x.into_string())),
847 PublicValue::String(x) => Some(Expr::Table(x)),
848 x => Some(Expr::from_public_value(x)),
849 };
850
851 let sql = InsertStatement {
853 into,
854 data: SqlData::SingleExpression(Expr::from_public_value(data)),
855 output: Some(Output::After),
856 ignore: false,
857 update: None,
858 timeout: Expr::Literal(Literal::None),
859 relation: false,
860 };
861 let ast = Ast::single_expr(Expr::Insert(Box::new(sql)));
862 let var = Some(session.variables.clone());
864 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
866 .await
867 .map_err(types_error_from_anyhow)?;
868 let first = res.remove(0).result?;
870 Ok(DbResult::Other(first))
871 }
872
873 async fn insert_relation(
874 &self,
875 txn: Option<Uuid>,
876 session_id: Option<Uuid>,
877 params: PublicArray,
878 ) -> Result<DbResult, surrealdb_types::Error> {
879 let session_lock = self.get_session(&session_id)?;
880 let session = session_lock.read().await;
881 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
883 return Err(method_not_allowed(Method::InsertRelation.to_string()));
884 }
885 let (what, data) = extract_args::<(PublicValue, PublicValue)>(params.to_vec())
887 .ok_or(invalid_params("Expected (what, data)".to_string()))?;
888
889 let table_name = match what {
890 PublicValue::Null | PublicValue::None => None,
891 PublicValue::Table(x) => Some(Expr::Table(x.into_string())),
892 PublicValue::String(x) => Some(Expr::Table(x)),
893 x => Some(Expr::from_public_value(x)),
894 };
895
896 let data = SqlData::SingleExpression(Expr::from_public_value(data));
897
898 let sql = InsertStatement {
900 relation: true,
901 into: table_name,
902 data,
903 output: Some(Output::After),
904 ignore: false,
905 update: None,
906 timeout: Expr::Literal(Literal::None),
907 };
908 let ast = Ast::single_expr(Expr::Insert(Box::new(sql)));
909 let var = Some(session.variables.clone());
911 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
913 .await
914 .map_err(types_error_from_anyhow)?;
915 let first = res.remove(0).result?;
917 Ok(DbResult::Other(first))
918 }
919
920 async fn create(
925 &self,
926 txn: Option<Uuid>,
927 session_id: Option<Uuid>,
928 params: PublicArray,
929 ) -> Result<DbResult, surrealdb_types::Error> {
930 let session_lock = self.get_session(&session_id)?;
931 let session = session_lock.read().await;
932 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
934 return Err(method_not_allowed(Method::Create.to_string()));
935 }
936 let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
938 .ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
939
940 let only = match what {
941 PublicValue::String(_) => true,
942 PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
943 _ => false,
944 };
945
946 let data = data
947 .and_then(|x| {
948 if x.is_nullish() {
949 None
950 } else {
951 Some(x)
952 }
953 })
954 .map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
955
956 let sql = CreateStatement {
958 only,
959 what: vec![value_to_table(what)],
960 data,
961 output: Some(Output::After),
962 timeout: Expr::Literal(Literal::None),
963 };
964 let ast = Ast::single_expr(Expr::Create(Box::new(sql)));
965 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), None)
967 .await
968 .map_err(types_error_from_anyhow)?;
969 let first = res.remove(0).result?;
971 Ok(DbResult::Other(first))
972 }
973
974 async fn upsert(
979 &self,
980 txn: Option<Uuid>,
981 session_id: Option<Uuid>,
982 params: PublicArray,
983 ) -> Result<DbResult, surrealdb_types::Error> {
984 let session_lock = self.get_session(&session_id)?;
985 let session = session_lock.read().await;
986 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
988 return Err(method_not_allowed(Method::Upsert.to_string()));
989 }
990 let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
992 .ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
993
994 let only = match what {
995 PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
996 _ => false,
997 };
998
999 let data = data
1000 .and_then(|x| {
1001 if x.is_nullish() {
1002 None
1003 } else {
1004 Some(x)
1005 }
1006 })
1007 .map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1008
1009 let sql = UpsertStatement {
1011 only,
1012 what: vec![value_to_table(what)],
1013 data,
1014 output: Some(Output::After),
1015 with: None,
1016 cond: None,
1017 timeout: Expr::Literal(Literal::None),
1018 explain: None,
1019 };
1020 let ast = Ast::single_expr(Expr::Upsert(Box::new(sql)));
1021 let var = Some(session.variables.clone());
1023 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1025 .await
1026 .map_err(types_error_from_anyhow)?;
1027 let first = res.remove(0).result?;
1029 Ok(DbResult::Other(first))
1030 }
1031
1032 async fn update(
1037 &self,
1038 _txn: Option<Uuid>,
1039 session_id: Option<Uuid>,
1040 params: PublicArray,
1041 ) -> Result<DbResult, surrealdb_types::Error> {
1042 let session_lock = self.get_session(&session_id)?;
1043 let session = session_lock.read().await;
1044 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1046 return Err(method_not_allowed(Method::Update.to_string()));
1047 }
1048 let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1050 .ok_or(invalid_params("Expected (what, data)".to_string()))?;
1051
1052 let only = match what {
1053 PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1054 _ => false,
1055 };
1056
1057 let data = data
1058 .and_then(|x| {
1059 if x.is_nullish() {
1060 None
1061 } else {
1062 Some(x)
1063 }
1064 })
1065 .map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1066 let sql = UpdateStatement {
1068 only,
1069 what: vec![value_to_table(what)],
1070 data,
1071 output: Some(Output::After),
1072 with: None,
1073 cond: None,
1074 timeout: Expr::Literal(Literal::None),
1075 explain: None,
1076 };
1077 let ast = Ast::single_expr(Expr::Update(Box::new(sql)));
1078 let var = Some(session.variables.clone());
1080 let mut res = self.kvs().process(ast, &session, var).await?;
1082 let first = res.remove(0).result?;
1084 Ok(DbResult::Other(first))
1085 }
1086
1087 async fn merge(
1092 &self,
1093 txn: Option<Uuid>,
1094 session_id: Option<Uuid>,
1095 params: PublicArray,
1096 ) -> Result<DbResult, surrealdb_types::Error> {
1097 let session_lock = self.get_session(&session_id)?;
1098 let session = session_lock.read().await;
1099 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1101 return Err(method_not_allowed(Method::Merge.to_string()));
1102 }
1103 let (what, data) = extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1105 .ok_or(invalid_params("Expected (what:Value, data:Value)".to_string()))?;
1106
1107 let only = match what {
1108 PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1109 _ => false,
1110 };
1111
1112 let data = data
1113 .and_then(|x| {
1114 if x.is_nullish() {
1115 None
1116 } else {
1117 Some(x)
1118 }
1119 })
1120 .map(|x| SqlData::MergeExpression(Expr::from_public_value(x)));
1121 let sql = UpdateStatement {
1123 only,
1124 what: vec![value_to_table(what)],
1125 data,
1126 output: Some(Output::After),
1127 ..Default::default()
1128 };
1129 let ast = Ast::single_expr(Expr::Update(Box::new(sql)));
1130 let var = Some(session.variables.clone());
1132 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1134 .await
1135 .map_err(types_error_from_anyhow)?;
1136 let first = res.remove(0).result?;
1138 Ok(DbResult::Other(first))
1139 }
1140
1141 async fn patch(
1146 &self,
1147 _txn: Option<Uuid>,
1148 session_id: Option<Uuid>,
1149 params: PublicArray,
1150 ) -> Result<DbResult, surrealdb_types::Error> {
1151 let session_lock = self.get_session(&session_id)?;
1152 let session = session_lock.read().await;
1153 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1155 return Err(method_not_allowed(Method::Patch.to_string()));
1156 }
1157 let (what, data, diff) =
1159 extract_args::<(PublicValue, Option<PublicValue>, Option<PublicValue>)>(
1160 params.into_vec(),
1161 )
1162 .ok_or(invalid_params("Expected (what:Value, data:Value, diff:Value)".to_string()))?;
1163
1164 let only = match what {
1166 PublicValue::RecordId(ref x) => !matches!(x.key, PublicRecordIdKey::Range(_)),
1167 _ => false,
1168 };
1169
1170 let data = data
1171 .and_then(|x| {
1172 if x.is_nullish() {
1173 None
1174 } else {
1175 Some(x)
1176 }
1177 })
1178 .map(|x| SqlData::PatchExpression(Expr::from_public_value(x)));
1179
1180 let diff = matches!(diff, Some(PublicValue::Bool(true)));
1181
1182 let expr = Expr::Update(Box::new(UpdateStatement {
1184 only,
1185 what: vec![value_to_table(what)],
1186 data,
1187 output: if diff {
1188 Some(Output::Diff)
1189 } else {
1190 Some(Output::After)
1191 },
1192 with: None,
1193 cond: None,
1194 timeout: Expr::Literal(Literal::None),
1195 explain: None,
1196 }));
1197 let var = Some(session.variables.clone());
1199 let mut res = self.kvs().process(Ast::single_expr(expr), &session, var).await?;
1201 let first = res.remove(0).result?;
1203 Ok(DbResult::Other(first))
1204 }
1205
1206 async fn relate(
1211 &self,
1212 _txn: Option<Uuid>,
1213 session_id: Option<Uuid>,
1214 params: PublicArray,
1215 ) -> Result<DbResult, surrealdb_types::Error> {
1216 let session_lock = self.get_session(&session_id)?;
1217 let session = session_lock.read().await;
1218 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1220 return Err(method_not_allowed(Method::Relate.to_string()));
1221 }
1222 let (from, kind, with, data) =
1224 extract_args::<(PublicValue, PublicValue, PublicValue, Option<PublicValue>)>(
1225 params.to_vec(),
1226 )
1227 .ok_or(invalid_params(
1228 "Expected (from:Value, kind:Value, with:Value, data:Value)".to_string(),
1229 ))?;
1230
1231 let only = singular(&from) && singular(&with);
1233
1234 let data = data
1235 .and_then(|x| {
1236 if x.is_nullish() {
1237 None
1238 } else {
1239 Some(x)
1240 }
1241 })
1242 .map(|x| SqlData::ContentExpression(Expr::from_public_value(x)));
1243
1244 let expr = Expr::Relate(Box::new(RelateStatement {
1246 only,
1247 from: Expr::from_public_value(from),
1248 through: value_to_table(kind),
1249 to: Expr::from_public_value(with),
1250 data,
1251 output: Some(Output::After),
1252 timeout: Expr::Literal(Literal::None),
1253 }));
1254 let var = Some(session.variables.clone());
1256 let mut res = self.kvs().process(Ast::single_expr(expr), &session, var).await?;
1258 let first = res.remove(0).result?;
1260 Ok(DbResult::Other(first))
1261 }
1262
1263 async fn delete(
1268 &self,
1269 txn: Option<Uuid>,
1270 session_id: Option<Uuid>,
1271 params: PublicArray,
1272 ) -> Result<DbResult, surrealdb_types::Error> {
1273 let session_lock = self.get_session(&session_id)?;
1274 let session = session_lock.read().await;
1275 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1277 return Err(method_not_allowed(Method::Delete.to_string()));
1278 }
1279 let (what,) = extract_args::<(PublicValue,)>(params.into_vec())
1281 .ok_or(invalid_params("Expected (what:Value)".to_string()))?;
1282 let sql = Expr::Delete(Box::new(DeleteStatement {
1284 only: singular(&what),
1285 what: vec![value_to_table(what)],
1286 output: Some(Output::Before),
1287 with: None,
1288 cond: None,
1289 timeout: Expr::Literal(Literal::None),
1290 explain: None,
1291 }));
1292 let ast = Ast::single_expr(sql);
1293 let var = Some(session.variables.clone());
1295 let mut res = run_query(self, txn, session_id, QueryForm::Parsed(ast), var)
1297 .await
1298 .map_err(types_error_from_anyhow)?;
1299 let first = res.remove(0).result?;
1301 Ok(DbResult::Other(first))
1302 }
1303
1304 async fn version(
1309 &self,
1310 _txn: Option<Uuid>,
1311 params: PublicArray,
1312 ) -> Result<DbResult, surrealdb_types::Error> {
1313 match params.len() {
1314 0 => Ok(self.version_data()),
1315 _ => Err(invalid_params("Expected 0 arguments".to_string())),
1316 }
1317 }
1318
1319 async fn query(
1324 &self,
1325 txn: Option<Uuid>,
1326 session_id: Option<Uuid>,
1327 params: PublicArray,
1328 ) -> Result<DbResult, surrealdb_types::Error> {
1329 let session_lock = self.get_session(&session_id)?;
1330 let session = session_lock.read().await;
1331 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1333 return Err(method_not_allowed(Method::Query.to_string()));
1334 }
1335 let (query, vars) =
1337 extract_args::<(PublicValue, Option<PublicValue>)>(params.into_vec())
1338 .ok_or(invalid_params("Expected (query:string, vars:object)".to_string()))?;
1339
1340 let PublicValue::String(query) = query else {
1341 return Err(invalid_params("Expected query to be string".to_string()));
1342 };
1343
1344 let vars = match vars {
1346 Some(PublicValue::Object(v)) => {
1347 let mut merged = session.variables.clone();
1348 merged.extend(v.into());
1349 Some(merged)
1350 }
1351 None | Some(PublicValue::None | PublicValue::Null) => Some(session.variables.clone()),
1352 unexpected => {
1353 return Err(invalid_params(format!(
1354 "Expected vars to be object, got {unexpected:?}"
1355 )));
1356 }
1357 };
1358
1359 Ok(DbResult::Query(
1360 run_query(self, txn, session_id, QueryForm::Text(&query), vars)
1361 .await
1362 .map_err(types_error_from_anyhow)?,
1363 ))
1364 }
1365
1366 async fn run(
1371 &self,
1372 _txn: Option<Uuid>,
1373 session_id: Option<Uuid>,
1374 params: PublicArray,
1375 ) -> Result<DbResult, surrealdb_types::Error> {
1376 let session_lock = self.get_session(&session_id)?;
1377 let session = session_lock.read().await;
1378 if !self.kvs().allows_query_by_subject(session.au.as_ref()) {
1380 return Err(method_not_allowed(Method::Run.to_string()));
1381 }
1382 let (name, version, args) = extract_args::<(
1384 PublicValue,
1385 Option<PublicValue>,
1386 Option<PublicValue>,
1387 )>(params.into_vec())
1388 .ok_or(invalid_params("Expected (name:string, version:string, args:array)".to_string()))?;
1389 let name = match name {
1391 PublicValue::String(v) => v,
1392 unexpected => {
1393 return Err(invalid_params(format!(
1394 "Expected name to be string, got {unexpected:?}"
1395 )));
1396 }
1397 };
1398 let version = match version {
1400 Some(PublicValue::String(v)) => Some(v),
1401 None | Some(PublicValue::None | PublicValue::Null) => None,
1402 unexpected => {
1403 return Err(invalid_params(format!(
1404 "Expected version to be string, got {unexpected:?}"
1405 )));
1406 }
1407 };
1408 let args = match args {
1410 Some(PublicValue::Array(args)) => {
1411 args.into_iter().map(Expr::from_public_value).collect::<Vec<Expr>>()
1412 }
1413 None | Some(PublicValue::None | PublicValue::Null) => vec![],
1414 unexpected => {
1415 return Err(invalid_params(format!(
1416 "Expected args to be array, got {unexpected:?}"
1417 )));
1418 }
1419 };
1420
1421 let segments = name.split("::").collect::<Vec<&str>>();
1422 let name = match segments.first() {
1423 Some(&"fn") => Function::Custom(segments[1..].join("::")),
1424 Some(&"mod") => {
1425 if !self
1426 .kvs()
1427 .get_capabilities()
1428 .allows_experimental(&ExperimentalTarget::Surrealism)
1429 {
1430 return Err(invalid_params(
1431 "Experimental capability `surrealism` is not enabled".to_string(),
1432 ));
1433 }
1434
1435 let Some(name) = segments.get(1).map(|x| (*x).to_string()) else {
1436 return Err(invalid_params("Expected module name".to_string()));
1437 };
1438
1439 let sub = segments.get(2).map(|x| (*x).to_string());
1440
1441 Function::Module(name, sub)
1442 }
1443 Some(&"silo") => {
1444 if !self
1445 .kvs()
1446 .get_capabilities()
1447 .allows_experimental(&ExperimentalTarget::Surrealism)
1448 {
1449 return Err(invalid_params(
1450 "Experimental capability `surrealism` is not enabled".to_string(),
1451 ));
1452 }
1453
1454 let Some(org) = segments.get(1).map(|x| (*x).to_string()) else {
1455 return Err(invalid_params("Expected silo organisation name".to_string()));
1456 };
1457
1458 let Some(pkg) = segments.get(2).map(|x| (*x).to_string()) else {
1459 return Err(invalid_params("Expected silo package name".to_string()));
1460 };
1461
1462 let Some(version) = version else {
1463 return Err(invalid_params("Expected silo version".to_string()));
1464 };
1465 let mut split = version.split('.');
1466 let major = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1467 invalid_params("Expected major version (u32) in version string".to_string())
1468 })?;
1469 let minor = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1470 invalid_params("Expected minor version (u32) in version string".to_string())
1471 })?;
1472 let patch = split.next().and_then(|s| s.parse::<u32>().ok()).ok_or_else(|| {
1473 invalid_params("Expected patch version (u32) in version string".to_string())
1474 })?;
1475
1476 let sub = segments.get(3).map(|x| (*x).to_string());
1477
1478 Function::Silo {
1479 org,
1480 pkg,
1481 major,
1482 minor,
1483 patch,
1484 sub,
1485 }
1486 }
1487 Some(&"ml") => {
1488 let name = segments[1..].join("::");
1489 Function::Model(Model {
1490 name,
1491 version: version.ok_or(invalid_params(
1492 "Expected version to be set for model function".to_string(),
1493 ))?,
1494 })
1495 }
1496 _ => Function::Normal(name),
1497 };
1498
1499 let expr = Expr::FunctionCall(Box::new(FunctionCall {
1500 receiver: name,
1501 arguments: args,
1502 }));
1503 let ast = Ast::single_expr(expr);
1504
1505 let var = Some(session.variables.clone());
1507 let mut res = run_query(self, None, session_id, QueryForm::Parsed(ast), var)
1509 .await
1510 .map_err(types_error_from_anyhow)?;
1511 let first = res.remove(0).result?;
1513 Ok(DbResult::Other(first))
1514 }
1515
1516 async fn begin(
1522 &self,
1523 _txn: Option<Uuid>,
1524 _session_id: Option<Uuid>,
1525 ) -> Result<DbResult, surrealdb_types::Error> {
1526 Err(method_not_allowed(Method::Begin.to_string()))
1527 }
1528
1529 async fn commit(
1531 &self,
1532 _txn: Option<Uuid>,
1533 _session_id: Option<Uuid>,
1534 _params: PublicArray,
1535 ) -> Result<DbResult, surrealdb_types::Error> {
1536 Err(method_not_allowed(Method::Commit.to_string()))
1537 }
1538
1539 async fn cancel(
1541 &self,
1542 _txn: Option<Uuid>,
1543 _session_id: Option<Uuid>,
1544 _params: PublicArray,
1545 ) -> Result<DbResult, surrealdb_types::Error> {
1546 Err(method_not_allowed(Method::Cancel.to_string()))
1547 }
1548}
1549
1550enum QueryForm<'a> {
1551 Text(&'a str),
1552 Parsed(Ast),
1553}
1554
1555async fn run_query<T>(
1556 this: &T,
1557 txn: Option<Uuid>,
1558 session_id: Option<Uuid>,
1559 query: QueryForm<'_>,
1560 vars: Option<PublicVariables>,
1561) -> Result<Vec<QueryResult>>
1562where
1563 T: RpcProtocol + ?Sized,
1564{
1565 let session_lock = this.get_session(&session_id).map_err(anyhow::Error::from)?;
1566 let session = session_lock.read().await;
1567 if !T::LQ_SUPPORT && session.rt {
1568 return Err(bad_lq_config().into());
1569 }
1570
1571 let res = if let Some(txn_id) = txn {
1573 let tx = this.get_tx(txn_id).await.map_err(anyhow::Error::from)?;
1575 match query {
1577 QueryForm::Text(query) => {
1578 this.kvs().execute_with_transaction(query, &session, vars, tx).await?
1579 }
1580 QueryForm::Parsed(ast) => {
1581 this.kvs().process_with_transaction(ast, &session, vars, tx).await?
1582 }
1583 }
1584 } else {
1585 match query {
1587 QueryForm::Text(query) => this.kvs().execute(query, &session, vars).await?,
1588 QueryForm::Parsed(ast) => this.kvs().process(ast, &session, vars).await?,
1589 }
1590 };
1591
1592 for response in &res {
1594 match &response.query_type {
1595 QueryType::Live => {
1596 if let Ok(PublicValue::Uuid(lqid)) = &response.result {
1597 this.handle_live(lqid, session_id).await;
1598 }
1599 }
1600 QueryType::Kill => {
1601 if let Ok(PublicValue::Uuid(lqid)) = &response.result {
1602 this.handle_kill(lqid).await;
1603 }
1604 }
1605 _ => {}
1606 }
1607 }
1608 Ok(res)
1610}