1use std::borrow::Cow;
2use std::io::Read;
3use std::path::PathBuf;
4
5use async_channel::Sender;
6use bincode::Options;
7use revision::Revisioned;
8use serde::Serialize;
9use serde::ser::SerializeMap as _;
10use uuid::Uuid;
11
12use super::MlExportConfig;
13use crate::Result;
14use crate::core::dbs::Notification;
15use crate::core::expr::LogicalPlan;
16use crate::core::kvs::export::Config as DbExportConfig;
17#[cfg(any(feature = "protocol-ws", feature = "protocol-http"))]
18use crate::core::val::Table as CoreTable;
19#[allow(unused_imports)]
20use crate::core::val::{Array as CoreArray, Object as CoreObject, Value as CoreValue};
21use crate::opt::Resource;
22
23#[derive(Debug, Clone)]
24#[allow(dead_code)]
25pub(crate) enum Command {
26 Use {
27 namespace: Option<String>,
28 database: Option<String>,
29 },
30 Signup {
31 credentials: CoreObject,
32 },
33 Signin {
34 credentials: CoreObject,
35 },
36 Authenticate {
37 token: String,
38 },
39 Invalidate,
40 Create {
41 txn: Option<Uuid>,
42 what: Resource,
43 data: Option<CoreValue>,
44 },
45 Upsert {
46 txn: Option<Uuid>,
47 what: Resource,
48 data: Option<CoreValue>,
49 },
50 Update {
51 txn: Option<Uuid>,
52 what: Resource,
53 data: Option<CoreValue>,
54 },
55 Insert {
56 txn: Option<Uuid>,
57 what: Option<String>,
59 data: CoreValue,
60 },
61 InsertRelation {
62 txn: Option<Uuid>,
63 what: Option<String>,
64 data: CoreValue,
65 },
66 Patch {
67 txn: Option<Uuid>,
68 what: Resource,
69 data: Option<CoreValue>,
70 upsert: bool,
71 },
72 Merge {
73 txn: Option<Uuid>,
74 what: Resource,
75 data: Option<CoreValue>,
76 upsert: bool,
77 },
78 Select {
79 txn: Option<Uuid>,
80 what: Resource,
81 },
82 Delete {
83 txn: Option<Uuid>,
84 what: Resource,
85 },
86 Query {
87 txn: Option<Uuid>,
88 query: LogicalPlan,
89 variables: CoreObject,
90 },
91 RawQuery {
92 txn: Option<Uuid>,
93 query: Cow<'static, str>,
94 variables: CoreObject,
95 },
96 ExportFile {
97 path: PathBuf,
98 config: Option<DbExportConfig>,
99 },
100 ExportMl {
101 path: PathBuf,
102 config: MlExportConfig,
103 },
104 ExportBytes {
105 bytes: Sender<Result<Vec<u8>>>,
106 config: Option<DbExportConfig>,
107 },
108 ExportBytesMl {
109 bytes: Sender<Result<Vec<u8>>>,
110 config: MlExportConfig,
111 },
112 ImportFile {
113 path: PathBuf,
114 },
115 ImportMl {
116 path: PathBuf,
117 },
118 Health,
119 Version,
120 Set {
121 key: String,
122 value: CoreValue,
123 },
124 Unset {
125 key: String,
126 },
127 SubscribeLive {
128 uuid: Uuid,
129 notification_sender: Sender<Notification>,
130 },
131 Kill {
132 uuid: Uuid,
133 },
134 Run {
135 name: String,
136 version: Option<String>,
137 args: CoreArray,
138 },
139}
140
141impl Command {
142 #[cfg(any(feature = "protocol-ws", feature = "protocol-http"))]
143 pub(crate) fn into_router_request(self, id: Option<i64>) -> Option<RouterRequest> {
144 use crate::core::expr::{Data, Output, UpdateStatement, UpsertStatement};
145 use crate::core::val::{self, Strand};
146 use crate::engine::resource_to_exprs;
147
148 let res = match self {
149 Command::Use {
150 namespace,
151 database,
152 } => {
153 let namespace = namespace
155 .map(|n| unsafe { Strand::new_unchecked(n) }.into())
156 .unwrap_or(CoreValue::None);
157 let database = database
158 .map(|d| unsafe { Strand::new_unchecked(d) }.into())
159 .unwrap_or(CoreValue::None);
160 RouterRequest {
161 id,
162 method: "use",
163 params: Some(vec![namespace, database].into()),
164 transaction: None,
165 }
166 }
167 Command::Signup {
168 credentials,
169 } => RouterRequest {
170 id,
171 method: "signup",
172 params: Some(vec![CoreValue::from(credentials)].into()),
173 transaction: None,
174 },
175 Command::Signin {
176 credentials,
177 } => RouterRequest {
178 id,
179 method: "signin",
180 params: Some(vec![CoreValue::from(credentials)].into()),
181 transaction: None,
182 },
183 Command::Authenticate {
184 token,
185 } => RouterRequest {
186 id,
187 method: "authenticate",
188 params: Some(vec![CoreValue::from(token)].into()),
189 transaction: None,
190 },
191 Command::Invalidate => RouterRequest {
192 id,
193 method: "invalidate",
194 params: None,
195 transaction: None,
196 },
197 Command::Create {
198 txn,
199 what,
200 data,
201 } => {
202 let mut params = vec![what.into_core_value()];
203 if let Some(data) = data {
204 params.push(data);
205 }
206
207 RouterRequest {
208 id,
209 method: "create",
210 params: Some(params.into()),
211 transaction: txn,
212 }
213 }
214 Command::Upsert {
215 txn,
216 what,
217 data,
218 ..
219 } => {
220 let mut params = vec![what.into_core_value()];
221 if let Some(data) = data {
222 params.push(data);
223 }
224
225 RouterRequest {
226 id,
227 method: "upsert",
228 params: Some(params.into()),
229 transaction: txn,
230 }
231 }
232 Command::Update {
233 txn,
234 what,
235 data,
236 ..
237 } => {
238 let mut params = vec![what.into_core_value()];
239
240 if let Some(data) = data {
241 params.push(data);
242 }
243
244 RouterRequest {
245 id,
246 method: "update",
247 params: Some(params.into()),
248 transaction: txn,
249 }
250 }
251 Command::Insert {
252 txn,
253 what,
254 data,
255 } => {
256 let table = match what {
257 Some(w) => {
258 let table = unsafe { CoreTable::new_unchecked(w) };
260 CoreValue::from(table)
261 }
262 None => CoreValue::None,
263 };
264
265 let params = vec![table, data];
266
267 RouterRequest {
268 id,
269 method: "insert",
270 params: Some(params.into()),
271 transaction: txn,
272 }
273 }
274 Command::InsertRelation {
275 txn,
276 what,
277 data,
278 } => {
279 let table = match what {
280 Some(w) => {
281 let table = unsafe { CoreTable::new_unchecked(w) };
283 CoreValue::from(table)
284 }
285 None => CoreValue::None,
286 };
287 let params = vec![table, data];
288
289 RouterRequest {
290 id,
291 method: "insert_relation",
292 params: Some(params.into()),
293 transaction: txn,
294 }
295 }
296 Command::Patch {
297 txn,
298 what,
299 data,
300 upsert,
301 ..
302 } => {
303 let query = if upsert {
304 let expr = UpsertStatement {
305 only: false,
306 what: resource_to_exprs(what),
307 with: None,
308 data: data.map(|x| Data::PatchExpression(x.into_literal())),
309 cond: None,
310 output: Some(Output::After),
311 timeout: None,
312 parallel: false,
313 explain: None,
314 };
315 expr.to_string()
316 } else {
317 let expr = UpdateStatement {
318 only: false,
319 what: resource_to_exprs(what),
320 with: None,
321 data: data.map(|x| Data::PatchExpression(x.into_literal())),
322 cond: None,
323 output: Some(Output::After),
324 timeout: None,
325 parallel: false,
326 explain: None,
327 };
328 expr.to_string()
329 };
330 let query = unsafe { Strand::new_unchecked(query) };
332
333 let variables = val::Object::default();
334 let params: Vec<CoreValue> = vec![query.into(), variables.into()];
335
336 RouterRequest {
337 id,
338 method: "query",
339 params: Some(params.into()),
340 transaction: txn,
341 }
342 }
343 Command::Merge {
344 txn,
345 what,
346 data,
347 upsert,
348 ..
349 } => {
350 let query = if upsert {
351 let expr = UpsertStatement {
352 only: false,
353 what: resource_to_exprs(what),
354 with: None,
355 data: data.map(|x| Data::MergeExpression(x.into_literal())),
356 cond: None,
357 output: Some(Output::After),
358 timeout: None,
359 parallel: false,
360 explain: None,
361 };
362 expr.to_string()
363 } else {
364 let expr = UpdateStatement {
365 only: false,
366 what: resource_to_exprs(what),
367 with: None,
368 data: data.map(|x| Data::MergeExpression(x.into_literal())),
369 cond: None,
370 output: Some(Output::After),
371 timeout: None,
372 parallel: false,
373 explain: None,
374 };
375 expr.to_string()
376 };
377 let query = unsafe { Strand::new_unchecked(query) };
379
380 let variables = val::Object::default();
381 let params: Vec<CoreValue> = vec![query.into(), variables.into()];
382
383 RouterRequest {
384 id,
385 method: "query",
386 params: Some(params.into()),
387 transaction: txn,
388 }
389 }
390 Command::Select {
391 txn,
392 what,
393 ..
394 } => RouterRequest {
395 id,
396 method: "select",
397 params: Some(CoreValue::Array(vec![what.into_core_value()].into())),
398 transaction: txn,
399 },
400 Command::Delete {
401 txn,
402 what,
403 ..
404 } => RouterRequest {
405 id,
406 method: "delete",
407 params: Some(CoreValue::Array(vec![what.into_core_value()].into())),
408 transaction: txn,
409 },
410 Command::Query {
411 txn,
412 query,
413 variables,
414 } => {
415 let query = unsafe { Strand::new_unchecked(query.to_string()) };
417 let params: Vec<CoreValue> = vec![query.into(), variables.into()];
418 RouterRequest {
419 id,
420 method: "query",
421 params: Some(params.into()),
422 transaction: txn,
423 }
424 }
425 Command::RawQuery {
426 txn,
427 query,
428 variables,
429 } => {
430 let params: Vec<CoreValue> = vec![query.into_owned().into(), variables.into()];
431 RouterRequest {
432 id,
433 method: "query",
434 params: Some(params.into()),
435 transaction: txn,
436 }
437 }
438 Command::ExportFile {
439 ..
440 }
441 | Command::ExportBytes {
442 ..
443 }
444 | Command::ImportFile {
445 ..
446 }
447 | Command::ExportBytesMl {
448 ..
449 }
450 | Command::ExportMl {
451 ..
452 }
453 | Command::ImportMl {
454 ..
455 } => return None,
456 Command::Health => RouterRequest {
457 id,
458 method: "ping",
459 params: None,
460 transaction: None,
461 },
462 Command::Version => RouterRequest {
463 id,
464 method: "version",
465 params: None,
466 transaction: None,
467 },
468 Command::Set {
469 key,
470 value,
471 } => RouterRequest {
472 id,
473 method: "let",
474 params: Some(CoreValue::from(vec![CoreValue::from(key), value])),
475 transaction: None,
476 },
477 Command::Unset {
478 key,
479 } => RouterRequest {
480 id,
481 method: "unset",
482 params: Some(CoreValue::from(vec![CoreValue::from(key)])),
483 transaction: None,
484 },
485 Command::SubscribeLive {
486 ..
487 } => return None,
488 Command::Kill {
489 uuid,
490 } => RouterRequest {
491 id,
492 method: "kill",
493 params: Some(CoreValue::from(vec![CoreValue::from(val::Uuid(uuid))])),
494 transaction: None,
495 },
496 Command::Run {
497 name,
498 version,
499 args,
500 } => {
501 let version = version
503 .map(|x| unsafe { Strand::new_unchecked(x) }.into())
504 .unwrap_or(CoreValue::None);
505 RouterRequest {
506 id,
507 method: "run",
508 params: Some(
509 vec![CoreValue::from(name), version, CoreValue::Array(args)].into(),
510 ),
511 transaction: None,
512 }
513 }
514 };
515 Some(res)
516 }
517
518 #[cfg(feature = "protocol-http")]
519 pub(crate) fn needs_flatten(&self) -> bool {
520 match self {
521 Command::Upsert {
522 what,
523 ..
524 }
525 | Command::Update {
526 what,
527 ..
528 }
529 | Command::Patch {
530 what,
531 ..
532 }
533 | Command::Merge {
534 what,
535 ..
536 }
537 | Command::Select {
538 what,
539 ..
540 }
541 | Command::Delete {
542 what,
543 ..
544 } => matches!(what, Resource::RecordId(_)),
545 Command::Insert {
546 data,
547 ..
548 } => !data.is_array(),
549 _ => false,
550 }
551 }
552}
553
554#[derive(Debug)]
559pub(crate) struct RouterRequest {
560 id: Option<i64>,
561 method: &'static str,
562 params: Option<CoreValue>,
563 #[allow(dead_code)]
564 transaction: Option<Uuid>,
565}
566
567impl Serialize for RouterRequest {
568 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
569 where
570 S: serde::Serializer,
571 {
572 struct InnerRequest<'a>(&'a RouterRequest);
573 struct InnerNumberVariant(i64);
574 struct InnerNumber(i64);
575 struct InnerMethod(&'static str);
576 struct InnerTransaction<'a>(&'a Uuid);
577 struct InnerUuid<'a>(&'a Uuid);
578 struct InnerStrand(&'static str);
579 struct InnerObject<'a>(&'a RouterRequest);
580
581 impl Serialize for InnerNumberVariant {
582 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
583 where
584 S: serde::Serializer,
585 {
586 serializer.serialize_newtype_variant("Value", 3, "Number", &InnerNumber(self.0))
587 }
588 }
589
590 impl Serialize for InnerNumber {
591 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
592 where
593 S: serde::Serializer,
594 {
595 serializer.serialize_newtype_variant("Number", 0, "Int", &self.0)
596 }
597 }
598
599 impl Serialize for InnerMethod {
600 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
601 where
602 S: serde::Serializer,
603 {
604 serializer.serialize_newtype_variant("Value", 4, "Strand", &InnerStrand(self.0))
605 }
606 }
607
608 impl Serialize for InnerTransaction<'_> {
609 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
610 where
611 S: serde::Serializer,
612 {
613 serializer.serialize_newtype_variant("Value", 7, "Uuid", &InnerUuid(self.0))
614 }
615 }
616
617 impl Serialize for InnerUuid<'_> {
618 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
619 where
620 S: serde::Serializer,
621 {
622 serializer.serialize_newtype_struct("$surrealdb::private::sql::Uuid", self.0)
623 }
624 }
625 impl Serialize for InnerStrand {
626 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
627 where
628 S: serde::Serializer,
629 {
630 serializer.serialize_newtype_struct("$surrealdb::private::sql::Strand", self.0)
631 }
632 }
633
634 impl Serialize for InnerRequest<'_> {
635 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
636 where
637 S: serde::Serializer,
638 {
639 let size = 1 + self.0.id.is_some() as usize + self.0.params.is_some() as usize;
640 let mut map = serializer.serialize_map(Some(size))?;
641 if let Some(id) = self.0.id.as_ref() {
642 map.serialize_entry("id", &InnerNumberVariant(*id))?;
643 }
644 map.serialize_entry("method", &InnerMethod(self.0.method))?;
645 if let Some(params) = self.0.params.as_ref() {
646 map.serialize_entry("params", params)?;
647 }
648 if let Some(txn) = self.0.transaction.as_ref() {
649 map.serialize_entry("transaction", &InnerTransaction(txn))?;
650 }
651 map.end()
652 }
653 }
654
655 impl Serialize for InnerObject<'_> {
656 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
657 where
658 S: serde::Serializer,
659 {
660 serializer.serialize_newtype_struct("Object", &InnerRequest(self.0))
661 }
662 }
663
664 serializer.serialize_newtype_variant(
665 "$surrealdb::private::sql::Value",
666 9,
667 "Object",
668 &InnerObject(self),
669 )
670 }
671}
672
673impl Revisioned for RouterRequest {
674 fn revision() -> u16 {
675 1
676 }
677
678 fn serialize_revisioned<W: std::io::Write>(
679 &self,
680 w: &mut W,
681 ) -> std::result::Result<(), revision::Error> {
682 Revisioned::serialize_revisioned(&1u32, w)?;
684 Revisioned::serialize_revisioned(&9u32, w)?;
686 Revisioned::serialize_revisioned(&1u32, w)?;
688
689 let size = 1
690 + self.id.is_some() as usize
691 + self.params.is_some() as usize
692 + self.transaction.is_some() as usize;
693 size.serialize_revisioned(w)?;
694
695 let serializer = bincode::options()
696 .with_no_limit()
697 .with_little_endian()
698 .with_varint_encoding()
699 .reject_trailing_bytes();
700
701 if let Some(x) = self.id.as_ref() {
702 serializer
703 .serialize_into(&mut *w, "id")
704 .map_err(|err| revision::Error::Serialize(err.to_string()))?;
705
706 1u16.serialize_revisioned(w)?;
708
709 3u16.serialize_revisioned(w)?;
711
712 1u16.serialize_revisioned(w)?;
714
715 0u16.serialize_revisioned(w)?;
717
718 x.serialize_revisioned(w)?;
719 }
720
721 serializer
722 .serialize_into(&mut *w, "method")
723 .map_err(|err| revision::Error::Serialize(err.to_string()))?;
724
725 1u16.serialize_revisioned(w)?;
727
728 4u16.serialize_revisioned(w)?;
730
731 1u16.serialize_revisioned(w)?;
733
734 serializer
735 .serialize_into(&mut *w, self.method)
736 .map_err(|e| revision::Error::Serialize(format!("{:?}", e)))?;
737
738 if let Some(x) = self.params.as_ref() {
739 serializer
740 .serialize_into(&mut *w, "params")
741 .map_err(|err| revision::Error::Serialize(err.to_string()))?;
742
743 x.serialize_revisioned(w)?;
744 }
745
746 if let Some(x) = self.transaction.as_ref() {
747 serializer
748 .serialize_into(&mut *w, "transaction")
749 .map_err(|err| revision::Error::Serialize(err.to_string()))?;
750
751 1u16.serialize_revisioned(w)?;
753
754 7u16.serialize_revisioned(w)?;
756
757 1u16.serialize_revisioned(w)?;
759
760 x.serialize_revisioned(w)?;
761 }
762
763 Ok(())
764 }
765
766 fn deserialize_revisioned<R: Read>(_: &mut R) -> std::result::Result<Self, revision::Error>
767 where
768 Self: Sized,
769 {
770 panic!("deliberately unimplemented");
771 }
772}
773
774#[cfg(test)]
775mod test {
776 use std::io::Cursor;
777
778 use revision::Revisioned;
779 use uuid::Uuid;
780
781 use super::RouterRequest;
782 use crate::core::val::{Number, Value};
783
784 fn assert_converts<S, D, I>(req: &RouterRequest, s: S, d: D)
785 where
786 S: FnOnce(&RouterRequest) -> I,
787 D: FnOnce(I) -> Value,
788 {
789 let ser = s(req);
790 let val = d(ser);
791 let Value::Object(obj) = val else {
792 panic!("not an object");
793 };
794 assert_eq!(
795 obj.get("id").cloned().and_then(|x| if let Value::Number(Number::Int(x)) = x {
796 Some(x)
797 } else {
798 None
799 }),
800 req.id
801 );
802 let Some(Value::Strand(x)) = obj.get("method") else {
803 panic!("invalid method field: {}", obj)
804 };
805 assert_eq!(x.as_str(), req.method);
806
807 assert_eq!(obj.get("params").cloned(), req.params);
808 }
809
810 #[test]
811 fn router_request_value_conversion() {
812 let request = RouterRequest {
813 id: Some(1234),
814 method: "request",
815 params: Some(vec![Value::from(1234i64), Value::from("request")].into()),
816 transaction: Some(Uuid::new_v4()),
817 };
818
819 println!("test convert bincode");
820
821 assert_converts(
822 &request,
823 |i| crate::core::rpc::format::bincode::encode(i).unwrap(),
824 |b| crate::core::rpc::format::bincode::decode(&b).unwrap(),
825 );
826
827 println!("test convert revisioned");
828
829 assert_converts(
830 &request,
831 |i| {
832 let mut buf = Vec::new();
833 i.serialize_revisioned(&mut Cursor::new(&mut buf)).unwrap();
834 buf
835 },
836 |b| Value::deserialize_revisioned(&mut Cursor::new(b)).unwrap(),
837 );
838
839 println!("done");
840 }
841}