1use std::collections::HashMap;
152use std::marker::PhantomData;
153use std::mem;
154#[cfg(not(target_family = "wasm"))]
155use std::pin::pin;
156use std::sync::Arc;
157#[cfg(not(target_family = "wasm"))]
158use std::task::{Poll, ready};
159#[cfg(not(target_family = "wasm"))]
160use std::{future::Future, path::PathBuf};
161
162#[cfg(not(target_family = "wasm"))]
163use anyhow::bail;
164use async_channel::Sender;
165#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
166use futures::StreamExt;
167#[cfg(not(target_family = "wasm"))]
168use futures::stream::poll_fn;
169use indexmap::IndexMap;
170use surrealdb_core::dbs::{Notification, Response, Session, Variables};
171#[cfg(not(target_family = "wasm"))]
172use surrealdb_core::err::Error as CoreError;
173#[cfg(feature = "ml")]
174use surrealdb_core::expr::Model;
175use surrealdb_core::expr::statements::DeleteStatement;
176use surrealdb_core::expr::{
177 CreateStatement, Data, Expr, Fields, Function, Ident, InsertStatement, KillStatement, Literal,
178 LogicalPlan, Output, SelectStatement, TopLevelExpr, UpdateStatement, UpsertStatement,
179};
180use surrealdb_core::iam;
181#[cfg(not(target_family = "wasm"))]
182use surrealdb_core::kvs::export::Config as DbExportConfig;
183use surrealdb_core::kvs::{Datastore, LockType, TransactionType};
184use surrealdb_core::val::{self, Strand};
185#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
186use surrealdb_core::{
187 expr::statements::{DefineModelStatement, DefineStatement},
188 iam::{Action, ResourceKind, check::check_ns_db},
189 ml::storage::surml_file::SurMlFile,
190};
191use tokio::sync::RwLock;
192#[cfg(not(target_family = "wasm"))]
193use tokio::{
194 fs::OpenOptions,
195 io::{self, AsyncReadExt, AsyncWriteExt},
196};
197#[cfg(not(target_family = "wasm"))]
198use tokio_util::bytes::BytesMut;
199use uuid::Uuid;
200
201use super::resource_to_exprs;
202use crate::Result;
203#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
204use crate::api::conn::MlExportConfig;
205use crate::api::conn::{Command, DbResponse, RequestData};
206use crate::api::err::Error;
207use crate::api::{Connect, Response as QueryResponse, Surreal};
208use crate::method::Stats;
209use crate::opt::IntoEndpoint;
210
211#[cfg(not(target_family = "wasm"))]
212pub(crate) mod native;
213#[cfg(target_family = "wasm")]
214pub(crate) mod wasm;
215
216type LiveQueryMap = HashMap<Uuid, Sender<Notification>>;
217
218#[cfg(feature = "kv-mem")]
268#[cfg_attr(docsrs, doc(cfg(feature = "kv-mem")))]
269#[derive(Debug)]
270pub struct Mem;
271
272#[cfg(feature = "kv-rocksdb")]
304#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
305#[derive(Debug)]
306pub struct RocksDb;
307
308#[cfg(feature = "kv-indxdb")]
340#[cfg_attr(docsrs, doc(cfg(feature = "kv-indxdb")))]
341#[derive(Debug)]
342pub struct IndxDb;
343
344#[cfg(feature = "kv-tikv")]
376#[cfg_attr(docsrs, doc(cfg(feature = "kv-tikv")))]
377#[derive(Debug)]
378pub struct TiKv;
379
380#[cfg(kv_fdb)]
412#[cfg_attr(docsrs, doc(cfg(feature = "kv-fdb-7_3")))]
413#[derive(Debug)]
414pub struct FDb;
415
416#[cfg(feature = "kv-surrealkv")]
448#[cfg_attr(docsrs, doc(cfg(feature = "kv-surrealkv")))]
449#[derive(Debug)]
450pub struct SurrealKv;
451
452#[derive(Debug, Clone)]
454pub struct Db(());
455
456impl Surreal<Db> {
457 pub fn connect<P>(&self, address: impl IntoEndpoint<P, Client = Db>) -> Connect<Db, ()> {
460 Connect {
461 surreal: self.inner.clone().into(),
462 address: address.into_endpoint(),
463 capacity: 0,
464 response_type: PhantomData,
465 }
466 }
467}
468
469fn process(responses: Vec<Response>) -> QueryResponse {
470 let mut map = IndexMap::<usize, (Stats, Result<val::Value>)>::with_capacity(responses.len());
471 for (index, response) in responses.into_iter().enumerate() {
472 let stats = Stats {
473 execution_time: Some(response.time),
474 };
475 match response.result {
476 Ok(value) => {
477 map.insert(index, (stats, Ok(value)));
479 }
480 Err(error) => {
481 map.insert(index, (stats, Err(error)));
482 }
483 };
484 }
485 QueryResponse {
486 results: map,
487 ..QueryResponse::new()
488 }
489}
490
491async fn take(one: bool, responses: Vec<Response>) -> Result<val::Value> {
492 if let Some((_stats, result)) = process(responses).results.swap_remove(&0) {
493 let value = result?;
494 match one {
495 true => match value {
496 val::Value::Array(mut array) => {
497 if let [ref mut value] = array[..] {
498 return Ok(mem::replace(value, val::Value::None));
499 }
500 }
501 val::Value::None | val::Value::Null => {}
502 value => return Ok(value),
503 },
504 false => return Ok(value),
505 }
506 }
507 match one {
508 true => Ok(val::Value::None),
509 false => Ok(val::Value::Array(Default::default())),
510 }
511}
512
513#[cfg(not(target_family = "wasm"))]
514async fn export_file(
515 kvs: &Datastore,
516 sess: &Session,
517 chn: async_channel::Sender<Vec<u8>>,
518 config: Option<DbExportConfig>,
519) -> Result<()> {
520 let res = match config {
521 Some(config) => kvs.export_with_config(sess, chn, config).await?.await,
522 None => kvs.export(sess, chn).await?.await,
523 };
524
525 if let Err(error) = res {
526 if let Some(surrealdb_core::err::Error::Channel(message)) = error.downcast_ref() {
527 trace!("{message}");
529 return Ok(());
530 }
531
532 return Err(error);
533 }
534 Ok(())
535}
536
537#[cfg(all(not(target_family = "wasm"), feature = "ml"))]
538async fn export_ml(
539 kvs: &Datastore,
540 sess: &Session,
541 chn: async_channel::Sender<Vec<u8>>,
542 MlExportConfig {
543 name,
544 version,
545 }: MlExportConfig,
546) -> Result<()> {
547 let (nsv, dbv) = check_ns_db(sess)?;
548 kvs.check(sess, Action::View, ResourceKind::Model.on_db(&nsv, &dbv))?;
550
551 let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
553 let Some(db) = tx.get_db_by_name(&nsv, &dbv).await? else {
554 anyhow::bail!("Database not found".to_string());
555 };
556 tx.cancel().await?;
557
558 let Some(model) = tx.get_db_model(db.namespace_id, db.database_id, &name, &version).await?
560 else {
561 anyhow::bail!("Model not found".to_string());
563 };
564 let mut data = surrealdb_core::obs::stream(model.hash.clone()).await?;
566 while let Some(Ok(bytes)) = data.next().await {
568 if chn.send(bytes.to_vec()).await.is_err() {
569 break;
570 }
571 }
572 Ok(())
573}
574
575#[cfg(not(target_family = "wasm"))]
576async fn copy<'a, R, W>(path: PathBuf, reader: &'a mut R, writer: &'a mut W) -> Result<()>
577where
578 R: tokio::io::AsyncRead + Unpin + ?Sized,
579 W: tokio::io::AsyncWrite + Unpin + ?Sized,
580{
581 io::copy(reader, writer)
582 .await
583 .map(|_| ())
584 .map_err(|error| crate::error::Api::FileRead {
585 path,
586 error,
587 })
588 .map_err(anyhow::Error::new)
589}
590
591async fn kill_live_query(
592 kvs: &Datastore,
593 id: Uuid,
594 session: &Session,
595 vars: Variables,
596) -> Result<val::Value> {
597 let kill_plan = KillStatement {
598 id: Expr::Literal(Literal::Uuid(id.into())),
599 };
600 let plan = LogicalPlan {
601 expressions: vec![TopLevelExpr::Kill(kill_plan)],
602 };
603
604 let response = kvs.process_plan(plan, session, Some(vars)).await?;
605 take(true, response).await
606}
607
608async fn router(
609 RequestData {
610 command,
611 ..
612 }: RequestData,
613 kvs: &Arc<Datastore>,
614 session: &Arc<RwLock<Session>>,
615 vars: &Arc<RwLock<Variables>>,
616 live_queries: &Arc<RwLock<LiveQueryMap>>,
617) -> Result<DbResponse> {
618 match command {
619 Command::Use {
620 namespace,
621 database,
622 } => {
623 if let Some(ns) = namespace {
624 let tx = kvs.transaction(TransactionType::Write, LockType::Optimistic).await?;
625 tx.get_or_add_ns(&ns, kvs.is_strict_mode()).await?;
626 tx.commit().await?;
627 session.write().await.ns = Some(ns);
628 }
629 if let Some(db) = database {
630 let ns = session.read().await.ns.clone().unwrap();
631 let tx = kvs.transaction(TransactionType::Write, LockType::Optimistic).await?;
632 tx.ensure_ns_db(&ns, &db, kvs.is_strict_mode()).await?;
633 tx.commit().await?;
634 session.write().await.db = Some(db);
635 }
636 Ok(DbResponse::Other(val::Value::None))
637 }
638 Command::Signup {
639 credentials,
640 } => {
641 let response =
642 iam::signup::signup(kvs, &mut *session.write().await, credentials).await?.token;
643 let response = response
645 .map(|x| unsafe { Strand::new_unchecked(x) })
646 .map(From::from)
647 .unwrap_or(val::Value::None);
648 Ok(DbResponse::Other(response))
649 }
650 Command::Signin {
651 credentials,
652 } => {
653 let response =
654 iam::signin::signin(kvs, &mut *session.write().await, credentials).await?.token;
655 Ok(DbResponse::Other(response.into()))
656 }
657 Command::Authenticate {
658 token,
659 } => {
660 iam::verify::token(kvs, &mut *session.write().await, &token).await?;
661 Ok(DbResponse::Other(val::Value::None))
662 }
663 Command::Invalidate => {
664 iam::clear::clear(&mut *session.write().await)?;
665 Ok(DbResponse::Other(val::Value::None))
666 }
667 Command::Create {
668 txn: _,
669 what,
670 data,
671 } => {
672 let create_plan = CreateStatement {
673 only: false,
674 what: resource_to_exprs(what),
675 data: data.map(|x| Data::ContentExpression(x.into_literal())),
676 output: Some(Output::After),
677 timeout: None,
678 parallel: false,
679 version: None,
680 };
681 let plan = LogicalPlan {
682 expressions: vec![TopLevelExpr::Expr(Expr::Create(Box::new(create_plan)))],
683 };
684
685 let response = kvs
686 .process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
687 .await?;
688 let value = take(true, response).await?;
689 Ok(DbResponse::Other(value))
690 }
691 Command::Upsert {
692 txn: _,
693 what,
694 data,
695 } => {
696 let one = what.is_single_recordid();
697 let upsert_plan = UpsertStatement {
698 only: false,
699 what: resource_to_exprs(what),
700 data: data.map(|x| Data::ContentExpression(x.into_literal())),
701 with: None,
702 cond: None,
703 output: Some(Output::After),
704 timeout: None,
705 parallel: false,
706 explain: None,
707 };
708 let plan = LogicalPlan {
709 expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
710 };
711 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
712 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
713 let value = take(one, response).await?;
714 Ok(DbResponse::Other(value))
715 }
716 Command::Update {
717 txn: _,
718 what,
719 data,
720 } => {
721 let one = what.is_single_recordid();
722 let update_plan = UpdateStatement {
723 only: false,
724 what: resource_to_exprs(what),
725 data: data.map(|x| Data::ContentExpression(x.into_literal())),
726 with: None,
727 cond: None,
728 output: Some(Output::After),
729 timeout: None,
730 parallel: false,
731 explain: None,
732 };
733 let plan = LogicalPlan {
734 expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
735 };
736 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
737 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
738 let value = take(one, response).await?;
739 Ok(DbResponse::Other(value))
740 }
741 Command::Insert {
742 txn: _,
743 what,
744 data,
745 } => {
746 let one = !data.is_array();
747
748 let insert_plan = InsertStatement {
749 into: what.map(|w| Expr::Table(unsafe { Ident::new_unchecked(w) })),
750 data: Data::SingleExpression(data.into_literal()),
751 ignore: false,
752 update: None,
753 output: Some(Output::After),
754 timeout: None,
755 parallel: false,
756 relation: false,
757 version: None,
758 };
759 let plan = LogicalPlan {
760 expressions: vec![TopLevelExpr::Expr(Expr::Insert(Box::new(insert_plan)))],
761 };
762 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
763 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
764 let value = take(one, response).await?;
765 Ok(DbResponse::Other(value))
766 }
767 Command::InsertRelation {
768 txn: _,
769 what,
770 data,
771 } => {
772 let one = !data.is_array();
773
774 let insert_plan = InsertStatement {
775 into: what.map(|w| Expr::Table(unsafe { Ident::new_unchecked(w) })),
776 data: Data::SingleExpression(data.into_literal()),
777 output: Some(Output::After),
778 relation: true,
779 ignore: false,
780 update: None,
781 timeout: None,
782 parallel: false,
783 version: None,
784 };
785 let plan = LogicalPlan {
786 expressions: vec![TopLevelExpr::Expr(Expr::Insert(Box::new(insert_plan)))],
787 };
788
789 let response = kvs
790 .process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
791 .await?;
792 let value = take(one, response).await?;
793 Ok(DbResponse::Other(value))
794 }
795 Command::Patch {
796 txn: _,
797 what,
798 data,
799 upsert,
800 } => {
801 let plan = if upsert {
802 let upsert_plan = UpsertStatement {
803 only: false,
804 what: resource_to_exprs(what),
805 data: data.map(|x| Data::PatchExpression(x.into_literal())),
806 with: None,
807 cond: None,
808 output: Some(Output::After),
809 timeout: None,
810 parallel: false,
811 explain: None,
812 };
813 LogicalPlan {
814 expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
815 }
816 } else {
817 let update_plan = UpdateStatement {
818 only: false,
819 what: resource_to_exprs(what),
820 data: data.map(|x| Data::PatchExpression(x.into_literal())),
821 with: None,
822 cond: None,
823 output: Some(Output::After),
824 timeout: None,
825 parallel: false,
826 explain: None,
827 };
828 LogicalPlan {
829 expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
830 }
831 };
832 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
833 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
834 let response = process(response);
835 Ok(DbResponse::Query(response))
836 }
837 Command::Merge {
838 txn: _,
839 what,
840 data,
841 upsert,
842 } => {
843 let plan = if upsert {
844 let upsert_plan = UpsertStatement {
845 only: false,
846 what: resource_to_exprs(what),
847 data: data.map(|x| Data::MergeExpression(x.into_literal())),
848 with: None,
849 cond: None,
850 output: Some(Output::After),
851 timeout: None,
852 parallel: false,
853 explain: None,
854 };
855 LogicalPlan {
856 expressions: vec![TopLevelExpr::Expr(Expr::Upsert(Box::new(upsert_plan)))],
857 }
858 } else {
859 let update_plan = UpdateStatement {
860 only: false,
861 what: resource_to_exprs(what),
862 data: data.map(|x| Data::MergeExpression(x.into_literal())),
863 with: None,
864 cond: None,
865 output: Some(Output::After),
866 timeout: None,
867 parallel: false,
868 explain: None,
869 };
870 LogicalPlan {
871 expressions: vec![TopLevelExpr::Expr(Expr::Update(Box::new(update_plan)))],
872 }
873 };
874 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
875 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
876 let response = process(response);
877 Ok(DbResponse::Query(response))
878 }
879 Command::Select {
880 txn: _,
881 what,
882 } => {
883 let one = what.is_single_recordid();
884
885 let select_plan = SelectStatement {
886 expr: Fields::all(),
887 what: resource_to_exprs(what),
888 omit: None,
889 only: false,
890 with: None,
891 cond: None,
892 split: None,
893 group: None,
894 order: None,
895 limit: None,
896 start: None,
897 fetch: None,
898 version: None,
899 timeout: None,
900 parallel: false,
901 explain: None,
902 tempfiles: false,
903 };
904
905 let plan = LogicalPlan {
906 expressions: vec![TopLevelExpr::Expr(Expr::Select(Box::new(select_plan)))],
907 };
908 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
909 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
910 let value = take(one, response).await?;
911 Ok(DbResponse::Other(value))
912 }
913 Command::Delete {
914 txn: _,
915 what,
916 } => {
917 let one = what.is_single_recordid();
918 let delete_plan = DeleteStatement {
919 only: false,
920 what: resource_to_exprs(what),
921 with: None,
922 cond: None,
923 output: Some(Output::Before),
924 timeout: None,
925 parallel: false,
926 explain: None,
927 };
928 let plan = LogicalPlan {
929 expressions: vec![TopLevelExpr::Expr(Expr::Delete(Box::new(delete_plan)))],
930 };
931 let vars = vars.read().await.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
932 let response = kvs.process_plan(plan, &*session.read().await, Some(vars)).await?;
933 let value = take(one, response).await?;
934 Ok(DbResponse::Other(value))
935 }
936 Command::Query {
937 txn: _,
938 query,
939 variables,
940 } => {
941 let mut vars = vars.read().await.clone();
942 vars.merge(variables);
943 let response =
944 kvs.execute(&query.to_string(), &*session.read().await, Some(vars)).await?;
945 let response = process(response);
946 Ok(DbResponse::Query(response))
947 }
948 Command::RawQuery {
949 txn: _,
950 query,
951 variables,
952 } => {
953 let mut vars = vars.read().await.clone();
954 vars.merge(variables);
955 let response = kvs.execute(query.as_ref(), &*session.read().await, Some(vars)).await?;
956 let response = process(response);
957 Ok(DbResponse::Query(response))
958 }
959
960 #[cfg(target_family = "wasm")]
961 Command::ExportFile {
962 ..
963 }
964 | Command::ExportBytes {
965 ..
966 }
967 | Command::ImportFile {
968 ..
969 } => Err(crate::api::Error::BackupsNotSupported.into()),
970
971 #[cfg(any(target_family = "wasm", not(feature = "ml")))]
972 Command::ExportMl {
973 ..
974 }
975 | Command::ExportBytesMl {
976 ..
977 }
978 | Command::ImportMl {
979 ..
980 } => Err(crate::api::Error::BackupsNotSupported.into()),
981
982 #[cfg(not(target_family = "wasm"))]
983 Command::ExportFile {
984 path: file,
985 config,
986 } => {
987 let (tx, rx) = crate::channel::bounded(1);
988 let (mut writer, mut reader) = io::duplex(10_240);
989
990 let session = session.read().await.clone();
992 let export = export_file(kvs, &session, tx, config);
993
994 let bridge = async move {
996 while let Ok(value) = rx.recv().await {
997 if writer.write_all(&value).await.is_err() {
998 break;
1000 }
1001 }
1002 Ok(())
1003 };
1004
1005 let mut output = match OpenOptions::new()
1007 .write(true)
1008 .create(true)
1009 .truncate(true)
1010 .open(&file)
1011 .await
1012 {
1013 Ok(path) => path,
1014 Err(error) => {
1015 return Err(Error::FileOpen {
1016 path: file,
1017 error,
1018 }
1019 .into());
1020 }
1021 };
1022
1023 let copy = copy(file, &mut reader, &mut output);
1025
1026 tokio::try_join!(export, bridge, copy)?;
1027 Ok(DbResponse::Other(val::Value::None))
1028 }
1029
1030 #[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1031 Command::ExportMl {
1032 path,
1033 config,
1034 } => {
1035 let (tx, rx) = crate::channel::bounded(1);
1036 let (mut writer, mut reader) = io::duplex(10_240);
1037
1038 let session = session.read().await;
1040 let export = export_ml(kvs, &session, tx, config);
1041
1042 let bridge = async move {
1044 while let Ok(value) = rx.recv().await {
1045 if writer.write_all(&value).await.is_err() {
1046 break;
1048 }
1049 }
1050 Ok(())
1051 };
1052
1053 let mut output = match OpenOptions::new()
1055 .write(true)
1056 .create(true)
1057 .truncate(true)
1058 .open(&path)
1059 .await
1060 {
1061 Ok(path) => path,
1062 Err(error) => {
1063 return Err(Error::FileOpen {
1064 path,
1065 error,
1066 }
1067 .into());
1068 }
1069 };
1070
1071 let copy = copy(path, &mut reader, &mut output);
1073
1074 tokio::try_join!(export, bridge, copy)?;
1075 Ok(DbResponse::Other(val::Value::None))
1076 }
1077
1078 #[cfg(not(target_family = "wasm"))]
1079 Command::ExportBytes {
1080 bytes,
1081 config,
1082 } => {
1083 let (tx, rx) = crate::channel::bounded(1);
1084
1085 let kvs = kvs.clone();
1086 let session = session.read().await.clone();
1087 tokio::spawn(async move {
1088 let export = async {
1089 if let Err(error) = export_file(&kvs, &session, tx, config).await {
1090 let _ = bytes.send(Err(error)).await;
1091 }
1092 };
1093
1094 let bridge = async {
1095 while let Ok(b) = rx.recv().await {
1096 if bytes.send(Ok(b)).await.is_err() {
1097 break;
1098 }
1099 }
1100 };
1101
1102 tokio::join!(export, bridge);
1103 });
1104
1105 Ok(DbResponse::Other(val::Value::None))
1106 }
1107 #[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1108 Command::ExportBytesMl {
1109 bytes,
1110 config,
1111 } => {
1112 let (tx, rx) = crate::channel::bounded(1);
1113
1114 let kvs = kvs.clone();
1115 let session = session.clone();
1116 tokio::spawn(async move {
1117 let export = async {
1118 if let Err(error) = export_ml(&kvs, &*session.read().await, tx, config).await {
1119 let _ = bytes.send(Err(error)).await;
1120 }
1121 };
1122
1123 let bridge = async {
1124 while let Ok(b) = rx.recv().await {
1125 if bytes.send(Ok(b)).await.is_err() {
1126 break;
1127 }
1128 }
1129 };
1130
1131 tokio::join!(export, bridge);
1132 });
1133
1134 Ok(DbResponse::Other(val::Value::None))
1135 }
1136 #[cfg(not(target_family = "wasm"))]
1137 Command::ImportFile {
1138 path,
1139 } => {
1140 let file = match OpenOptions::new().read(true).open(&path).await {
1141 Ok(path) => path,
1142 Err(error) => {
1143 bail!(Error::FileOpen {
1144 path,
1145 error,
1146 });
1147 }
1148 };
1149
1150 let mut file = pin!(file);
1151 let mut buffer = BytesMut::with_capacity(4096);
1152
1153 let stream = poll_fn(|ctx| {
1154 if buffer.capacity() == 0 {
1160 buffer.reserve(4096);
1161 }
1162
1163 let future = pin!(file.read_buf(&mut buffer));
1164 match ready!(future.poll(ctx)) {
1165 Ok(0) => Poll::Ready(None),
1166 Ok(_) => Poll::Ready(Some(Ok(buffer.split().freeze()))),
1167 Err(e) => {
1168 let error = anyhow::Error::new(CoreError::QueryStream(e.to_string()));
1169 Poll::Ready(Some(Err(error)))
1170 }
1171 }
1172 });
1173
1174 let responses = kvs
1175 .execute_import(&*session.read().await, Some(vars.read().await.clone()), stream)
1176 .await?;
1177
1178 for response in responses {
1179 response.result?;
1180 }
1181
1182 Ok(DbResponse::Other(val::Value::None))
1183 }
1184 #[cfg(all(not(target_family = "wasm"), feature = "ml"))]
1185 Command::ImportMl {
1186 path,
1187 } => {
1188 let mut file = match OpenOptions::new().read(true).open(&path).await {
1189 Ok(path) => path,
1190 Err(error) => {
1191 return Err(Error::FileOpen {
1192 path,
1193 error,
1194 }
1195 .into());
1196 }
1197 };
1198
1199 let (nsv, dbv) = check_ns_db(&*session.read().await)?;
1201 kvs.check(&*session.read().await, Action::Edit, ResourceKind::Model.on_db(&nsv, &dbv))?;
1203 let mut buffer = Vec::new();
1205 if let Err(error) = file.read_to_end(&mut buffer).await {
1207 return Err(Error::FileRead {
1208 path,
1209 error,
1210 }
1211 .into());
1212 }
1213 let file = match SurMlFile::from_bytes(buffer) {
1215 Ok(file) => file,
1216 Err(error) => {
1217 return Err(Error::FileRead {
1218 path,
1219 error: io::Error::new(
1220 io::ErrorKind::InvalidData,
1221 error.message.to_string(),
1222 ),
1223 }
1224 .into());
1225 }
1226 };
1227 let data = file.to_bytes();
1229 let hash = surrealdb_core::obs::hash(&data);
1231 surrealdb_core::obs::put(&hash, data).await?;
1233 let model = DefineModelStatement {
1235 name: Ident::new(file.header.name.to_string()).unwrap(),
1236 version: file.header.version.to_string(),
1237 comment: Some(file.header.description.to_string().into()),
1238 hash,
1239 ..Default::default()
1240 };
1241 let q = DefineStatement::Model(model);
1243 let q = LogicalPlan {
1244 expressions: vec![TopLevelExpr::Expr(Expr::Define(Box::new(q)))],
1245 };
1246 let responses = kvs
1247 .process_plan(q, &*session.read().await, Some(vars.read().await.clone()))
1248 .await?;
1249
1250 for response in responses {
1251 response.result?;
1252 }
1253
1254 Ok(DbResponse::Other(val::Value::None))
1255 }
1256 Command::Health => Ok(DbResponse::Other(val::Value::None)),
1257 Command::Version => {
1258 Ok(DbResponse::Other(val::Value::from(surrealdb_core::env::VERSION.to_string())))
1259 }
1260 Command::Set {
1261 key,
1262 value,
1263 } => {
1264 surrealdb_core::rpc::check_protected_param(&key)?;
1265 match value {
1268 val::Value::None => vars.write().await.remove(&key),
1269 v => vars.write().await.insert(key, v),
1270 };
1271
1272 Ok(DbResponse::Other(val::Value::None))
1273 }
1274 Command::Unset {
1275 key,
1276 } => {
1277 vars.write().await.remove(&key);
1278 Ok(DbResponse::Other(val::Value::None))
1279 }
1280 Command::SubscribeLive {
1281 uuid,
1282 notification_sender,
1283 } => {
1284 live_queries.write().await.insert(uuid, notification_sender);
1285 Ok(DbResponse::Other(val::Value::None))
1286 }
1287 Command::Kill {
1288 uuid,
1289 } => {
1290 live_queries.write().await.remove(&uuid);
1291 let value =
1292 kill_live_query(kvs, uuid, &*session.read().await, vars.read().await.clone())
1293 .await?;
1294 Ok(DbResponse::Other(value))
1295 }
1296
1297 Command::Run {
1298 name,
1299 version: _version,
1300 args,
1301 } => {
1302 let func = match name.strip_prefix("fn::") {
1303 Some(name) => Function::Custom(name.to_owned()),
1304 None => match name.strip_prefix("ml::") {
1305 #[cfg(feature = "ml")]
1306 Some(name) => Function::Model(Model {
1307 name: name.to_owned(),
1308 version: _version
1309 .ok_or(Error::Query("ML functions must have a version".to_string()))?,
1310 }),
1311 #[cfg(not(feature = "ml"))]
1312 Some(_) => {
1313 return Err(Error::Query(format!(
1314 "tried to call an ML function `{name}` but the `ml` feature is not enabled"
1315 ))
1316 .into());
1317 }
1318 None => Function::Normal(name),
1319 },
1320 };
1321
1322 let args = args.into_iter().map(|x| x.into_literal()).collect();
1323
1324 let plan = Expr::FunctionCall(Box::new(surrealdb_core::expr::FunctionCall {
1325 receiver: func,
1326 arguments: args,
1327 }));
1328
1329 let plan = LogicalPlan {
1330 expressions: vec![TopLevelExpr::Expr(plan)],
1331 };
1332
1333 let response = kvs
1334 .process_plan(plan, &*session.read().await, Some(vars.read().await.clone()))
1335 .await?;
1336 let value = take(true, response).await?;
1337
1338 Ok(DbResponse::Other(value))
1339 }
1340 }
1341}