1use std::collections::HashMap;
3use std::hash::Hash;
4use std::marker::{Send, Sync};
5use std::ops::{Deref, DerefMut};
6
7use anyhow::Result;
8use async_trait::async_trait;
9
10use crate::client::ILazyClient;
11use crate::stub::Stub;
12pub use crate::txn::best_effort::TxnBestEffortType;
13pub use crate::txn::default::TxnType;
14pub use crate::txn::mutated::{Mutate, MutationResponse, TxnMutatedType};
15pub use crate::txn::read_only::TxnReadOnlyType;
16use crate::{DgraphError, IDgraphClient};
17use crate::{Request, Response, TxnContext};
18
19pub(crate) mod best_effort;
20pub(crate) mod default;
21pub(crate) mod mutated;
22pub(crate) mod read_only;
23
24#[derive(Clone, Debug)]
29pub struct TxnState<C: ILazyClient> {
30 stub: Stub<C>,
31 context: TxnContext,
32}
33
34pub trait IState: Send + Sync + Clone {
38 fn query_request<C: ILazyClient>(
39 &self,
40 state: &TxnState<C>,
41 query: String,
42 vars: HashMap<String, String>,
43 ) -> Request;
44}
45
46#[derive(Clone, Debug)]
50pub struct TxnVariant<S: IState, C: ILazyClient> {
51 state: Box<TxnState<C>>,
52 extra: S,
53}
54
55impl<S: IState, C: ILazyClient> Deref for TxnVariant<S, C> {
56 type Target = Box<TxnState<C>>;
57
58 fn deref(&self) -> &Self::Target {
59 &self.state
60 }
61}
62
63impl<S: IState, C: ILazyClient> DerefMut for TxnVariant<S, C> {
64 fn deref_mut(&mut self) -> &mut Self::Target {
65 &mut self.state
66 }
67}
68
69impl<S: IState, C: ILazyClient> TxnVariant<S, C> {
70 pub fn get_txn_context(&self) -> TxnContext {
74 self.context.clone()
75 }
76
77 pub fn clone_and_reset(&mut self) -> Self {
81 let mut result = self.clone();
82 result.context = Default::default();
83 result
84 }
85}
86
87#[async_trait]
91pub trait Query: Send + Sync {
92 async fn query<Q>(&mut self, query: Q) -> Result<Response>
153 where
154 Q: Into<String> + Send + Sync;
155
156 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
207 async fn query_rdf<Q>(&mut self, query: Q) -> Result<Response>
208 where
209 Q: Into<String> + Send + Sync;
210
211 async fn query_with_vars<Q, K, V>(&mut self, query: Q, vars: HashMap<K, V>) -> Result<Response>
275 where
276 Q: Into<String> + Send + Sync,
277 K: Into<String> + Send + Sync + Eq + Hash,
278 V: Into<String> + Send + Sync;
279
280 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
334 async fn query_rdf_with_vars<Q, K, V>(
335 &mut self,
336 query: Q,
337 vars: HashMap<K, V>,
338 ) -> Result<Response>
339 where
340 Q: Into<String> + Send + Sync,
341 K: Into<String> + Send + Sync + Eq + Hash,
342 V: Into<String> + Send + Sync;
343}
344
345#[async_trait]
346impl<S: IState, C: ILazyClient> Query for TxnVariant<S, C> {
347 async fn query<Q>(&mut self, query: Q) -> Result<Response>
348 where
349 Q: Into<String> + Send + Sync,
350 {
351 self.query_with_vars(query, HashMap::<String, String, _>::with_capacity(0))
352 .await
353 }
354
355 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
356 async fn query_rdf<Q>(&mut self, query: Q) -> Result<Response>
357 where
358 Q: Into<String> + Send + Sync,
359 {
360 self.query_rdf_with_vars(query, HashMap::<String, String, _>::with_capacity(0))
361 .await
362 }
363
364 async fn query_with_vars<Q, K, V>(&mut self, query: Q, vars: HashMap<K, V>) -> Result<Response>
365 where
366 Q: Into<String> + Send + Sync,
367 K: Into<String> + Send + Sync + Eq + Hash,
368 V: Into<String> + Send + Sync,
369 {
370 let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
371 tmp.insert(k.into(), v.into());
372 tmp
373 });
374 let request = self.extra.query_request(&self.state, query.into(), vars);
375 let response = match self.stub.query(request).await {
376 Ok(response) => response,
377 Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
378 };
379 match response.txn.as_ref() {
380 Some(src) => self.context.merge_context(src)?,
381 None => anyhow::bail!(DgraphError::EmptyTxn),
382 };
383 Ok(response)
384 }
385
386 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
387 async fn query_rdf_with_vars<Q, K, V>(
388 &mut self,
389 query: Q,
390 vars: HashMap<K, V>,
391 ) -> Result<Response>
392 where
393 Q: Into<String> + Send + Sync,
394 K: Into<String> + Send + Sync + Eq + Hash,
395 V: Into<String> + Send + Sync,
396 {
397 let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
398 tmp.insert(k.into(), v.into());
399 tmp
400 });
401 let mut request = self.extra.query_request(&self.state, query.into(), vars);
402 request.resp_format = crate::api::request::RespFormat::Rdf as i32;
403 let response = match self.stub.query(request).await {
404 Ok(response) => response,
405 Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
406 };
407 match response.txn.as_ref() {
408 Some(src) => self.context.merge_context(src)?,
409 None => anyhow::bail!(DgraphError::EmptyTxn),
410 };
411 Ok(response)
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use std::collections::HashMap;
418 use std::time::Duration;
419
420 use serde_derive::{Deserialize, Serialize};
421
422 use crate::client::Client;
423 #[cfg(feature = "acl")]
424 use crate::client::{AclClientType, LazyChannel};
425 use crate::{Mutate, Mutation};
426
427 use super::*;
428
429 #[cfg(not(feature = "acl"))]
430 async fn client() -> Client {
431 Client::new("http://127.0.0.1:19080").unwrap()
432 }
433
434 #[cfg(feature = "acl")]
435 async fn client() -> AclClientType<LazyChannel> {
436 let default = Client::new("http://127.0.0.1:19080").unwrap();
437 default.login("groot", "password").await.unwrap()
438 }
439
440 #[derive(Serialize, Deserialize, Default, Debug)]
441 struct Person {
442 uid: String,
443 name: String,
444 }
445
446 #[derive(Serialize, Deserialize, Default, Debug)]
447 pub struct UidJson {
448 pub uids: Vec<Uid>,
449 }
450
451 #[derive(Serialize, Deserialize, Default, Debug)]
452 pub struct Uid {
453 pub uid: String,
454 }
455
456 async fn insert_data() {
457 let client = client().await;
458 let txn = client.new_mutated_txn();
459 let p = Person {
460 uid: "_:alice".to_string(),
461 name: "Alice".to_string(),
462 };
463 let mut mu = Mutation::new();
464 mu.set_set_json(&p).expect("Invalid JSON");
465 let response = txn.mutate_and_commit_now(mu).await;
466 assert!(response.is_ok());
467 }
468
469 #[tokio::test]
470 async fn mutate_and_commit_now() {
471 let client = client().await;
472 let txn = client.new_mutated_txn();
473 let p = Person {
474 uid: "_:alice".to_string(),
475 name: "Alice".to_string(),
476 };
477 let mut mu = Mutation::new();
478 mu.set_set_json(&p).expect("Invalid JSON");
479 let response = txn.mutate_and_commit_now(mu).await;
480 assert!(response.is_ok());
481 }
482
483 #[tokio::test]
484 async fn commit() {
485 let client = client().await;
486 let mut txn = client.new_mutated_txn();
487 let p = Person {
489 uid: "_:alice".to_string(),
490 name: "Alice".to_string(),
491 };
492 let mut mu = Mutation::new();
493 mu.set_set_json(&p).expect("Invalid JSON");
494 let response = txn.mutate(mu).await;
495 assert!(response.is_ok());
496 let p = Person {
498 uid: "_:mike".to_string(),
499 name: "Mike".to_string(),
500 };
501 let mut mu = Mutation::new();
502 mu.set_set_json(&p).expect("Invalid JSON");
503 let response = txn.mutate(mu).await;
504 assert!(response.is_ok());
505 let commit = txn.commit().await;
507 assert!(commit.is_ok())
508 }
509
510 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
511 #[tokio::test]
512 async fn upsert() {
513 let client = client().await;
514 client
515 .set_schema("name: string @index(exact) .")
516 .await
517 .expect("Schema is not updated");
518 let mut txn = client.new_mutated_txn();
519 let p = Person {
521 uid: "_:alice".to_string(),
522 name: "Alice".to_string(),
523 };
524 let mut mu = Mutation::new();
525 mu.set_set_json(&p).expect("Invalid JSON");
526 let response = txn.mutate(mu).await;
527 assert!(response.is_ok());
528 let p = Person {
530 uid: "_:mike".to_string(),
531 name: "Mike".to_string(),
532 };
533 let mut mu = Mutation::new();
534 mu.set_set_json(&p).expect("Invalid JSON");
535 let response = txn.mutate(mu).await;
536 assert!(response.is_ok());
537 let commit = txn.commit().await;
539 assert!(commit.is_ok());
540 let query = r#"
542 query {
543 user as var(func: eq(name, "Alice"))
544 }"#;
545 let mut mu = Mutation::new();
546 mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
547 let mut txn = client.new_mutated_txn();
548 assert!(txn.upsert(query, mu).await.is_ok());
549 assert!(txn.commit().await.is_ok());
550 }
551
552 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
553 #[tokio::test]
554 async fn upsert_and_commit_now() {
555 let client = client().await;
556 client
557 .set_schema("name: string @index(exact) .")
558 .await
559 .expect("Schema is not updated");
560 let mut txn = client.new_mutated_txn();
561 let p = Person {
563 uid: "_:alice".to_string(),
564 name: "Alice".to_string(),
565 };
566 let mut mu = Mutation::new();
567 mu.set_set_json(&p).expect("Invalid JSON");
568 let response = txn.mutate(mu).await;
569 assert!(response.is_ok());
570 let p = Person {
572 uid: "_:mike".to_string(),
573 name: "Mike".to_string(),
574 };
575 let mut mu = Mutation::new();
576 mu.set_set_json(&p).expect("Invalid JSON");
577 let response = txn.mutate(mu).await;
578 assert!(response.is_ok());
579 let commit = txn.commit().await;
581 assert!(commit.is_ok());
582 let query = r#"
584 query {
585 user as var(func: eq(name, "Alice"))
586 }"#;
587 let mut mu = Mutation::new();
588 mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
589 let txn = client.new_mutated_txn();
590 let response = txn.upsert_and_commit_now(query, mu).await;
591 assert!(response.is_ok())
592 }
593
594 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
595 #[tokio::test]
596 async fn upsert_with_vars() {
597 let client = client().await;
598 client
599 .set_schema("name: string @index(exact) .")
600 .await
601 .expect("Schema is not updated");
602 let mut txn = client.new_mutated_txn();
603 let p = Person {
605 uid: "_:alice".to_string(),
606 name: "Alice".to_string(),
607 };
608 let mut mu = Mutation::new();
609 mu.set_set_json(&p).expect("Invalid JSON");
610 let response = txn.mutate(mu).await;
611 assert!(response.is_ok());
612 let p = Person {
614 uid: "_:mike".to_string(),
615 name: "Mike".to_string(),
616 };
617 let mut mu = Mutation::new();
618 mu.set_set_json(&p).expect("Invalid JSON");
619 let response = txn.mutate(mu).await;
620 assert!(response.is_ok());
621 let commit = txn.commit().await;
623 assert!(commit.is_ok());
624 let query = r#"
626 query alices($a: string) {
627 user as var(func: eq(name, $a))
628 }"#;
629 let mut mu = Mutation::new();
630 mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
631 let mut vars = HashMap::new();
632 vars.insert("$a", "Alice");
633 let mut txn = client.new_mutated_txn();
634 assert!(txn.upsert_with_vars(query, vars, vec![mu]).await.is_ok());
635 assert!(txn.commit().await.is_ok());
636 }
637
638 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
639 #[tokio::test]
640 async fn upsert_with_vars_and_commit_now() {
641 let client = client().await;
642 client
643 .set_schema("name: string @index(exact) .")
644 .await
645 .expect("Schema is not updated");
646 let mut txn = client.new_mutated_txn();
647 let p = Person {
649 uid: "_:alice".to_string(),
650 name: "Alice".to_string(),
651 };
652 let mut mu = Mutation::new();
653 mu.set_set_json(&p).expect("Invalid JSON");
654 let response = txn.mutate(mu).await;
655 assert!(response.is_ok());
656 let p = Person {
658 uid: "_:mike".to_string(),
659 name: "Mike".to_string(),
660 };
661 let mut mu = Mutation::new();
662 mu.set_set_json(&p).expect("Invalid JSON");
663 let response = txn.mutate(mu).await;
664 assert!(response.is_ok());
665 let commit = txn.commit().await;
667 assert!(commit.is_ok());
668 let query = r#"
670 query alices($a: string) {
671 user as var(func: eq(name, $a))
672 }"#;
673 let mut mu = Mutation::new();
674 mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
675 let mut vars = HashMap::new();
676 vars.insert("$a", "Alice");
677 let txn = client.new_mutated_txn();
678 let response = txn
679 .upsert_with_vars_and_commit_now(query, vars, vec![mu])
680 .await;
681 assert!(response.is_ok())
682 }
683
684 #[tokio::test]
685 async fn query() {
686 let client = client().await;
687 client
688 .set_schema("name: string @index(exact) .")
689 .await
690 .expect("Schema is not updated");
691 insert_data().await;
692 let mut txn = client.new_read_only_txn();
693 let query = r#"{
694 uids(func: eq(name, "Alice")) {
695 uid
696 }
697 }"#;
698 let response = txn.query(query).await;
699 assert!(response.is_ok());
700 let mut json: UidJson = response.unwrap().try_into().unwrap();
701 assert!(json.uids.pop().is_some());
702 }
703
704 #[tokio::test]
705 async fn mutated_txn_query() {
706 let client = client().await;
707 client
708 .set_schema("name: string @index(exact) .")
709 .await
710 .expect("Schema is not updated");
711 insert_data().await;
712 let mut txn = client.new_mutated_txn();
713 let query = r#"{
714 uids(func: eq(name, "Alice")) {
715 uid
716 }
717 }"#;
718 let response = txn.query(query).await;
719 assert!(response.is_ok());
720 let mut json: UidJson = response.unwrap().try_into().unwrap();
721 assert!(json.uids.pop().is_some());
722 assert!(txn.discard().await.is_ok());
723 }
724
725 #[tokio::test]
726 async fn best_effort_txn_query() {
727 let client = client().await;
728 client
729 .set_schema("name: string @index(exact) .")
730 .await
731 .expect("Schema is not updated");
732 insert_data().await;
733 std::thread::sleep(Duration::from_secs(1));
734 let mut txn = client.new_best_effort_txn();
735 let query = r#"{
736 uids(func: eq(name, "Alice")) {
737 uid
738 }
739 }"#;
740 let response = txn.query(query).await;
741 assert!(response.is_ok());
742 let mut json: UidJson = response.unwrap().try_into().unwrap();
743 assert!(json.uids.pop().is_some());
744 }
745
746 #[tokio::test]
747 async fn query_with_vars() {
748 let client = client().await;
749 client
750 .set_schema("name: string @index(exact) .")
751 .await
752 .expect("Schema is not updated");
753 insert_data().await;
754 let mut txn = client.new_read_only_txn();
755 let query = r#"query all($a: string) {
756 uids(func: eq(name, $a)) {
757 uid
758 }
759 }"#;
760 let mut vars = HashMap::new();
761 vars.insert("$a", "Alice");
762 let response = txn.query_with_vars(query, vars).await;
763 assert!(response.is_ok());
764 let mut json: UidJson = response.unwrap().try_into().unwrap();
765 assert!(json.uids.pop().is_some());
766 }
767
768 #[tokio::test]
769 async fn mutated_txn_query_with_vars() {
770 let client = client().await;
771 client
772 .set_schema("name: string @index(exact) .")
773 .await
774 .expect("Schema is not updated");
775 insert_data().await;
776 let mut txn = client.new_mutated_txn();
777 let query = r#"query all($a: string) {
778 uids(func: eq(name, $a)) {
779 uid
780 }
781 }"#;
782 let mut vars = HashMap::new();
783 vars.insert("$a", "Alice");
784 let response = txn.query_with_vars(query, vars).await;
785 assert!(response.is_ok());
786 let mut json: UidJson = response.unwrap().try_into().unwrap();
787 assert!(json.uids.pop().is_some());
788 assert!(txn.discard().await.is_ok());
789 }
790
791 #[tokio::test]
792 async fn best_effort_txn_query_with_vars() {
793 let client = client().await;
794 client
795 .set_schema("name: string @index(exact) .")
796 .await
797 .expect("Schema is not updated");
798 insert_data().await;
799 let mut txn = client.new_best_effort_txn();
800 let query = r#"query all($a: string) {
801 uids(func: eq(name, $a)) {
802 uid
803 }
804 }"#;
805 let mut vars = HashMap::new();
806 vars.insert("$a", "Alice");
807 let response = txn.query_with_vars(query, vars).await;
808 assert!(response.is_ok());
809 let mut json: UidJson = response.unwrap().try_into().unwrap();
810 assert!(json.uids.pop().is_some());
811 }
812}