surrealdb/api/conn/
cmd.rs

1use std::borrow::Cow;
2use std::io::Read;
3use std::path::PathBuf;
4
5use async_channel::Sender;
6use bincode::Options;
7use revision::Revisioned;
8use serde::Serialize;
9use serde::ser::SerializeMap as _;
10use uuid::Uuid;
11
12use super::MlExportConfig;
13use crate::Result;
14use crate::core::dbs::Notification;
15use crate::core::expr::LogicalPlan;
16use crate::core::kvs::export::Config as DbExportConfig;
17#[cfg(any(feature = "protocol-ws", feature = "protocol-http"))]
18use crate::core::val::Table as CoreTable;
19#[allow(unused_imports)]
20use crate::core::val::{Array as CoreArray, Object as CoreObject, Value as CoreValue};
21use crate::opt::Resource;
22
23#[derive(Debug, Clone)]
24#[allow(dead_code)]
25pub(crate) enum Command {
26	Use {
27		namespace: Option<String>,
28		database: Option<String>,
29	},
30	Signup {
31		credentials: CoreObject,
32	},
33	Signin {
34		credentials: CoreObject,
35	},
36	Authenticate {
37		token: String,
38	},
39	Invalidate,
40	Create {
41		txn: Option<Uuid>,
42		what: Resource,
43		data: Option<CoreValue>,
44	},
45	Upsert {
46		txn: Option<Uuid>,
47		what: Resource,
48		data: Option<CoreValue>,
49	},
50	Update {
51		txn: Option<Uuid>,
52		what: Resource,
53		data: Option<CoreValue>,
54	},
55	Insert {
56		txn: Option<Uuid>,
57		// inserts can only be on a table.
58		what: Option<String>,
59		data: CoreValue,
60	},
61	InsertRelation {
62		txn: Option<Uuid>,
63		what: Option<String>,
64		data: CoreValue,
65	},
66	Patch {
67		txn: Option<Uuid>,
68		what: Resource,
69		data: Option<CoreValue>,
70		upsert: bool,
71	},
72	Merge {
73		txn: Option<Uuid>,
74		what: Resource,
75		data: Option<CoreValue>,
76		upsert: bool,
77	},
78	Select {
79		txn: Option<Uuid>,
80		what: Resource,
81	},
82	Delete {
83		txn: Option<Uuid>,
84		what: Resource,
85	},
86	Query {
87		txn: Option<Uuid>,
88		query: LogicalPlan,
89		variables: CoreObject,
90	},
91	RawQuery {
92		txn: Option<Uuid>,
93		query: Cow<'static, str>,
94		variables: CoreObject,
95	},
96	ExportFile {
97		path: PathBuf,
98		config: Option<DbExportConfig>,
99	},
100	ExportMl {
101		path: PathBuf,
102		config: MlExportConfig,
103	},
104	ExportBytes {
105		bytes: Sender<Result<Vec<u8>>>,
106		config: Option<DbExportConfig>,
107	},
108	ExportBytesMl {
109		bytes: Sender<Result<Vec<u8>>>,
110		config: MlExportConfig,
111	},
112	ImportFile {
113		path: PathBuf,
114	},
115	ImportMl {
116		path: PathBuf,
117	},
118	Health,
119	Version,
120	Set {
121		key: String,
122		value: CoreValue,
123	},
124	Unset {
125		key: String,
126	},
127	SubscribeLive {
128		uuid: Uuid,
129		notification_sender: Sender<Notification>,
130	},
131	Kill {
132		uuid: Uuid,
133	},
134	Run {
135		name: String,
136		version: Option<String>,
137		args: CoreArray,
138	},
139}
140
141impl Command {
142	#[cfg(any(feature = "protocol-ws", feature = "protocol-http"))]
143	pub(crate) fn into_router_request(self, id: Option<i64>) -> Option<RouterRequest> {
144		use crate::core::expr::{Data, Output, UpdateStatement, UpsertStatement};
145		use crate::core::val::{self, Strand};
146		use crate::engine::resource_to_exprs;
147
148		let res = match self {
149			Command::Use {
150				namespace,
151				database,
152			} => {
153				// TODO: Null byte validity
154				let namespace = namespace
155					.map(|n| unsafe { Strand::new_unchecked(n) }.into())
156					.unwrap_or(CoreValue::None);
157				let database = database
158					.map(|d| unsafe { Strand::new_unchecked(d) }.into())
159					.unwrap_or(CoreValue::None);
160				RouterRequest {
161					id,
162					method: "use",
163					params: Some(vec![namespace, database].into()),
164					transaction: None,
165				}
166			}
167			Command::Signup {
168				credentials,
169			} => RouterRequest {
170				id,
171				method: "signup",
172				params: Some(vec![CoreValue::from(credentials)].into()),
173				transaction: None,
174			},
175			Command::Signin {
176				credentials,
177			} => RouterRequest {
178				id,
179				method: "signin",
180				params: Some(vec![CoreValue::from(credentials)].into()),
181				transaction: None,
182			},
183			Command::Authenticate {
184				token,
185			} => RouterRequest {
186				id,
187				method: "authenticate",
188				params: Some(vec![CoreValue::from(token)].into()),
189				transaction: None,
190			},
191			Command::Invalidate => RouterRequest {
192				id,
193				method: "invalidate",
194				params: None,
195				transaction: None,
196			},
197			Command::Create {
198				txn,
199				what,
200				data,
201			} => {
202				let mut params = vec![what.into_core_value()];
203				if let Some(data) = data {
204					params.push(data);
205				}
206
207				RouterRequest {
208					id,
209					method: "create",
210					params: Some(params.into()),
211					transaction: txn,
212				}
213			}
214			Command::Upsert {
215				txn,
216				what,
217				data,
218				..
219			} => {
220				let mut params = vec![what.into_core_value()];
221				if let Some(data) = data {
222					params.push(data);
223				}
224
225				RouterRequest {
226					id,
227					method: "upsert",
228					params: Some(params.into()),
229					transaction: txn,
230				}
231			}
232			Command::Update {
233				txn,
234				what,
235				data,
236				..
237			} => {
238				let mut params = vec![what.into_core_value()];
239
240				if let Some(data) = data {
241					params.push(data);
242				}
243
244				RouterRequest {
245					id,
246					method: "update",
247					params: Some(params.into()),
248					transaction: txn,
249				}
250			}
251			Command::Insert {
252				txn,
253				what,
254				data,
255			} => {
256				let table = match what {
257					Some(w) => {
258						// TODO: Null byte validity
259						let table = unsafe { CoreTable::new_unchecked(w) };
260						CoreValue::from(table)
261					}
262					None => CoreValue::None,
263				};
264
265				let params = vec![table, data];
266
267				RouterRequest {
268					id,
269					method: "insert",
270					params: Some(params.into()),
271					transaction: txn,
272				}
273			}
274			Command::InsertRelation {
275				txn,
276				what,
277				data,
278			} => {
279				let table = match what {
280					Some(w) => {
281						// TODO: Null byte validity
282						let table = unsafe { CoreTable::new_unchecked(w) };
283						CoreValue::from(table)
284					}
285					None => CoreValue::None,
286				};
287				let params = vec![table, data];
288
289				RouterRequest {
290					id,
291					method: "insert_relation",
292					params: Some(params.into()),
293					transaction: txn,
294				}
295			}
296			Command::Patch {
297				txn,
298				what,
299				data,
300				upsert,
301				..
302			} => {
303				let query = if upsert {
304					let expr = UpsertStatement {
305						only: false,
306						what: resource_to_exprs(what),
307						with: None,
308						data: data.map(|x| Data::PatchExpression(x.into_literal())),
309						cond: None,
310						output: Some(Output::After),
311						timeout: None,
312						parallel: false,
313						explain: None,
314					};
315					expr.to_string()
316				} else {
317					let expr = UpdateStatement {
318						only: false,
319						what: resource_to_exprs(what),
320						with: None,
321						data: data.map(|x| Data::PatchExpression(x.into_literal())),
322						cond: None,
323						output: Some(Output::After),
324						timeout: None,
325						parallel: false,
326						explain: None,
327					};
328					expr.to_string()
329				};
330				//TODO: Null byte validity
331				let query = unsafe { Strand::new_unchecked(query) };
332
333				let variables = val::Object::default();
334				let params: Vec<CoreValue> = vec![query.into(), variables.into()];
335
336				RouterRequest {
337					id,
338					method: "query",
339					params: Some(params.into()),
340					transaction: txn,
341				}
342			}
343			Command::Merge {
344				txn,
345				what,
346				data,
347				upsert,
348				..
349			} => {
350				let query = if upsert {
351					let expr = UpsertStatement {
352						only: false,
353						what: resource_to_exprs(what),
354						with: None,
355						data: data.map(|x| Data::MergeExpression(x.into_literal())),
356						cond: None,
357						output: Some(Output::After),
358						timeout: None,
359						parallel: false,
360						explain: None,
361					};
362					expr.to_string()
363				} else {
364					let expr = UpdateStatement {
365						only: false,
366						what: resource_to_exprs(what),
367						with: None,
368						data: data.map(|x| Data::MergeExpression(x.into_literal())),
369						cond: None,
370						output: Some(Output::After),
371						timeout: None,
372						parallel: false,
373						explain: None,
374					};
375					expr.to_string()
376				};
377				//TODO: Null byte validity
378				let query = unsafe { Strand::new_unchecked(query) };
379
380				let variables = val::Object::default();
381				let params: Vec<CoreValue> = vec![query.into(), variables.into()];
382
383				RouterRequest {
384					id,
385					method: "query",
386					params: Some(params.into()),
387					transaction: txn,
388				}
389			}
390			Command::Select {
391				txn,
392				what,
393				..
394			} => RouterRequest {
395				id,
396				method: "select",
397				params: Some(CoreValue::Array(vec![what.into_core_value()].into())),
398				transaction: txn,
399			},
400			Command::Delete {
401				txn,
402				what,
403				..
404			} => RouterRequest {
405				id,
406				method: "delete",
407				params: Some(CoreValue::Array(vec![what.into_core_value()].into())),
408				transaction: txn,
409			},
410			Command::Query {
411				txn,
412				query,
413				variables,
414			} => {
415				// TODO: Null byte validity
416				let query = unsafe { Strand::new_unchecked(query.to_string()) };
417				let params: Vec<CoreValue> = vec![query.into(), variables.into()];
418				RouterRequest {
419					id,
420					method: "query",
421					params: Some(params.into()),
422					transaction: txn,
423				}
424			}
425			Command::RawQuery {
426				txn,
427				query,
428				variables,
429			} => {
430				let params: Vec<CoreValue> = vec![query.into_owned().into(), variables.into()];
431				RouterRequest {
432					id,
433					method: "query",
434					params: Some(params.into()),
435					transaction: txn,
436				}
437			}
438			Command::ExportFile {
439				..
440			}
441			| Command::ExportBytes {
442				..
443			}
444			| Command::ImportFile {
445				..
446			}
447			| Command::ExportBytesMl {
448				..
449			}
450			| Command::ExportMl {
451				..
452			}
453			| Command::ImportMl {
454				..
455			} => return None,
456			Command::Health => RouterRequest {
457				id,
458				method: "ping",
459				params: None,
460				transaction: None,
461			},
462			Command::Version => RouterRequest {
463				id,
464				method: "version",
465				params: None,
466				transaction: None,
467			},
468			Command::Set {
469				key,
470				value,
471			} => RouterRequest {
472				id,
473				method: "let",
474				params: Some(CoreValue::from(vec![CoreValue::from(key), value])),
475				transaction: None,
476			},
477			Command::Unset {
478				key,
479			} => RouterRequest {
480				id,
481				method: "unset",
482				params: Some(CoreValue::from(vec![CoreValue::from(key)])),
483				transaction: None,
484			},
485			Command::SubscribeLive {
486				..
487			} => return None,
488			Command::Kill {
489				uuid,
490			} => RouterRequest {
491				id,
492				method: "kill",
493				params: Some(CoreValue::from(vec![CoreValue::from(val::Uuid(uuid))])),
494				transaction: None,
495			},
496			Command::Run {
497				name,
498				version,
499				args,
500			} => {
501				// TODO: Null byte validity
502				let version = version
503					.map(|x| unsafe { Strand::new_unchecked(x) }.into())
504					.unwrap_or(CoreValue::None);
505				RouterRequest {
506					id,
507					method: "run",
508					params: Some(
509						vec![CoreValue::from(name), version, CoreValue::Array(args)].into(),
510					),
511					transaction: None,
512				}
513			}
514		};
515		Some(res)
516	}
517
518	#[cfg(feature = "protocol-http")]
519	pub(crate) fn needs_flatten(&self) -> bool {
520		match self {
521			Command::Upsert {
522				what,
523				..
524			}
525			| Command::Update {
526				what,
527				..
528			}
529			| Command::Patch {
530				what,
531				..
532			}
533			| Command::Merge {
534				what,
535				..
536			}
537			| Command::Select {
538				what,
539				..
540			}
541			| Command::Delete {
542				what,
543				..
544			} => matches!(what, Resource::RecordId(_)),
545			Command::Insert {
546				data,
547				..
548			} => !data.is_array(),
549			_ => false,
550		}
551	}
552}
553
554/// A struct which will be serialized as a map to behave like the previously
555/// used BTreeMap.
556///
557/// This struct serializes as if it is a crate::core::expr::Value::Object.
558#[derive(Debug)]
559pub(crate) struct RouterRequest {
560	id: Option<i64>,
561	method: &'static str,
562	params: Option<CoreValue>,
563	#[allow(dead_code)]
564	transaction: Option<Uuid>,
565}
566
567impl Serialize for RouterRequest {
568	fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
569	where
570		S: serde::Serializer,
571	{
572		struct InnerRequest<'a>(&'a RouterRequest);
573		struct InnerNumberVariant(i64);
574		struct InnerNumber(i64);
575		struct InnerMethod(&'static str);
576		struct InnerTransaction<'a>(&'a Uuid);
577		struct InnerUuid<'a>(&'a Uuid);
578		struct InnerStrand(&'static str);
579		struct InnerObject<'a>(&'a RouterRequest);
580
581		impl Serialize for InnerNumberVariant {
582			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
583			where
584				S: serde::Serializer,
585			{
586				serializer.serialize_newtype_variant("Value", 3, "Number", &InnerNumber(self.0))
587			}
588		}
589
590		impl Serialize for InnerNumber {
591			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
592			where
593				S: serde::Serializer,
594			{
595				serializer.serialize_newtype_variant("Number", 0, "Int", &self.0)
596			}
597		}
598
599		impl Serialize for InnerMethod {
600			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
601			where
602				S: serde::Serializer,
603			{
604				serializer.serialize_newtype_variant("Value", 4, "Strand", &InnerStrand(self.0))
605			}
606		}
607
608		impl Serialize for InnerTransaction<'_> {
609			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
610			where
611				S: serde::Serializer,
612			{
613				serializer.serialize_newtype_variant("Value", 7, "Uuid", &InnerUuid(self.0))
614			}
615		}
616
617		impl Serialize for InnerUuid<'_> {
618			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
619			where
620				S: serde::Serializer,
621			{
622				serializer.serialize_newtype_struct("$surrealdb::private::sql::Uuid", self.0)
623			}
624		}
625		impl Serialize for InnerStrand {
626			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
627			where
628				S: serde::Serializer,
629			{
630				serializer.serialize_newtype_struct("$surrealdb::private::sql::Strand", self.0)
631			}
632		}
633
634		impl Serialize for InnerRequest<'_> {
635			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
636			where
637				S: serde::Serializer,
638			{
639				let size = 1 + self.0.id.is_some() as usize + self.0.params.is_some() as usize;
640				let mut map = serializer.serialize_map(Some(size))?;
641				if let Some(id) = self.0.id.as_ref() {
642					map.serialize_entry("id", &InnerNumberVariant(*id))?;
643				}
644				map.serialize_entry("method", &InnerMethod(self.0.method))?;
645				if let Some(params) = self.0.params.as_ref() {
646					map.serialize_entry("params", params)?;
647				}
648				if let Some(txn) = self.0.transaction.as_ref() {
649					map.serialize_entry("transaction", &InnerTransaction(txn))?;
650				}
651				map.end()
652			}
653		}
654
655		impl Serialize for InnerObject<'_> {
656			fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
657			where
658				S: serde::Serializer,
659			{
660				serializer.serialize_newtype_struct("Object", &InnerRequest(self.0))
661			}
662		}
663
664		serializer.serialize_newtype_variant(
665			"$surrealdb::private::sql::Value",
666			9,
667			"Object",
668			&InnerObject(self),
669		)
670	}
671}
672
673impl Revisioned for RouterRequest {
674	fn revision() -> u16 {
675		1
676	}
677
678	fn serialize_revisioned<W: std::io::Write>(
679		&self,
680		w: &mut W,
681	) -> std::result::Result<(), revision::Error> {
682		// version
683		Revisioned::serialize_revisioned(&1u32, w)?;
684		// object variant
685		Revisioned::serialize_revisioned(&9u32, w)?;
686		// object wrapper version
687		Revisioned::serialize_revisioned(&1u32, w)?;
688
689		let size = 1
690			+ self.id.is_some() as usize
691			+ self.params.is_some() as usize
692			+ self.transaction.is_some() as usize;
693		size.serialize_revisioned(w)?;
694
695		let serializer = bincode::options()
696			.with_no_limit()
697			.with_little_endian()
698			.with_varint_encoding()
699			.reject_trailing_bytes();
700
701		if let Some(x) = self.id.as_ref() {
702			serializer
703				.serialize_into(&mut *w, "id")
704				.map_err(|err| revision::Error::Serialize(err.to_string()))?;
705
706			// the Value version
707			1u16.serialize_revisioned(w)?;
708
709			// the Value::Number variant
710			3u16.serialize_revisioned(w)?;
711
712			// the Number version
713			1u16.serialize_revisioned(w)?;
714
715			// the Number::Int variant
716			0u16.serialize_revisioned(w)?;
717
718			x.serialize_revisioned(w)?;
719		}
720
721		serializer
722			.serialize_into(&mut *w, "method")
723			.map_err(|err| revision::Error::Serialize(err.to_string()))?;
724
725		// the Value version
726		1u16.serialize_revisioned(w)?;
727
728		// the Value::Strand variant
729		4u16.serialize_revisioned(w)?;
730
731		// the Strand version
732		1u16.serialize_revisioned(w)?;
733
734		serializer
735			.serialize_into(&mut *w, self.method)
736			.map_err(|e| revision::Error::Serialize(format!("{:?}", e)))?;
737
738		if let Some(x) = self.params.as_ref() {
739			serializer
740				.serialize_into(&mut *w, "params")
741				.map_err(|err| revision::Error::Serialize(err.to_string()))?;
742
743			x.serialize_revisioned(w)?;
744		}
745
746		if let Some(x) = self.transaction.as_ref() {
747			serializer
748				.serialize_into(&mut *w, "transaction")
749				.map_err(|err| revision::Error::Serialize(err.to_string()))?;
750
751			// the Value version
752			1u16.serialize_revisioned(w)?;
753
754			// the Value::Uuid variant
755			7u16.serialize_revisioned(w)?;
756
757			// the Uuid version
758			1u16.serialize_revisioned(w)?;
759
760			x.serialize_revisioned(w)?;
761		}
762
763		Ok(())
764	}
765
766	fn deserialize_revisioned<R: Read>(_: &mut R) -> std::result::Result<Self, revision::Error>
767	where
768		Self: Sized,
769	{
770		panic!("deliberately unimplemented");
771	}
772}
773
774#[cfg(test)]
775mod test {
776	use std::io::Cursor;
777
778	use revision::Revisioned;
779	use uuid::Uuid;
780
781	use super::RouterRequest;
782	use crate::core::val::{Number, Value};
783
784	fn assert_converts<S, D, I>(req: &RouterRequest, s: S, d: D)
785	where
786		S: FnOnce(&RouterRequest) -> I,
787		D: FnOnce(I) -> Value,
788	{
789		let ser = s(req);
790		let val = d(ser);
791		let Value::Object(obj) = val else {
792			panic!("not an object");
793		};
794		assert_eq!(
795			obj.get("id").cloned().and_then(|x| if let Value::Number(Number::Int(x)) = x {
796				Some(x)
797			} else {
798				None
799			}),
800			req.id
801		);
802		let Some(Value::Strand(x)) = obj.get("method") else {
803			panic!("invalid method field: {}", obj)
804		};
805		assert_eq!(x.as_str(), req.method);
806
807		assert_eq!(obj.get("params").cloned(), req.params);
808	}
809
810	#[test]
811	fn router_request_value_conversion() {
812		let request = RouterRequest {
813			id: Some(1234),
814			method: "request",
815			params: Some(vec![Value::from(1234i64), Value::from("request")].into()),
816			transaction: Some(Uuid::new_v4()),
817		};
818
819		println!("test convert bincode");
820
821		assert_converts(
822			&request,
823			|i| crate::core::rpc::format::bincode::encode(i).unwrap(),
824			|b| crate::core::rpc::format::bincode::decode(&b).unwrap(),
825		);
826
827		println!("test convert revisioned");
828
829		assert_converts(
830			&request,
831			|i| {
832				let mut buf = Vec::new();
833				i.serialize_revisioned(&mut Cursor::new(&mut buf)).unwrap();
834				buf
835			},
836			|b| Value::deserialize_revisioned(&mut Cursor::new(b)).unwrap(),
837		);
838
839		println!("done");
840	}
841}