dgraph_tonic/txn/
mod.rs

1///! Transactions is modeled with principles of [The Typestate Pattern in Rust](http://cliffle.com/blog/rust-typestate/)
2use std::collections::HashMap;
3use std::hash::Hash;
4use std::marker::{Send, Sync};
5use std::ops::{Deref, DerefMut};
6
7use anyhow::Result;
8use async_trait::async_trait;
9
10use crate::client::ILazyClient;
11use crate::stub::Stub;
12pub use crate::txn::best_effort::TxnBestEffortType;
13pub use crate::txn::default::TxnType;
14pub use crate::txn::mutated::{Mutate, MutationResponse, TxnMutatedType};
15pub use crate::txn::read_only::TxnReadOnlyType;
16use crate::{DgraphError, IDgraphClient};
17use crate::{Request, Response, TxnContext};
18
19pub(crate) mod best_effort;
20pub(crate) mod default;
21pub(crate) mod mutated;
22pub(crate) mod read_only;
23
24///
25/// Transaction state.
26/// Hold txn context and Dgraph client for communication.
27///
28#[derive(Clone, Debug)]
29pub struct TxnState<C: ILazyClient> {
30    stub: Stub<C>,
31    context: TxnContext,
32}
33
34///
35/// Each transaction variant must implement this state trait.
36///
37pub trait IState: Send + Sync + Clone {
38    fn query_request<C: ILazyClient>(
39        &self,
40        state: &TxnState<C>,
41        query: String,
42        vars: HashMap<String, String>,
43    ) -> Request;
44}
45
46///
47/// Type state for Transaction variants
48///
49#[derive(Clone, Debug)]
50pub struct TxnVariant<S: IState, C: ILazyClient> {
51    state: Box<TxnState<C>>,
52    extra: S,
53}
54
55impl<S: IState, C: ILazyClient> Deref for TxnVariant<S, C> {
56    type Target = Box<TxnState<C>>;
57
58    fn deref(&self) -> &Self::Target {
59        &self.state
60    }
61}
62
63impl<S: IState, C: ILazyClient> DerefMut for TxnVariant<S, C> {
64    fn deref_mut(&mut self) -> &mut Self::Target {
65        &mut self.state
66    }
67}
68
69impl<S: IState, C: ILazyClient> TxnVariant<S, C> {
70    ///
71    /// Return cloned txn context
72    ///
73    pub fn get_txn_context(&self) -> TxnContext {
74        self.context.clone()
75    }
76
77    ///
78    /// Return new transaction of same variant with default state
79    ///
80    pub fn clone_and_reset(&mut self) -> Self {
81        let mut result = self.clone();
82        result.context = Default::default();
83        result
84    }
85}
86
87///
88/// All Dgaph transaction types can performe a queries
89///
90#[async_trait]
91pub trait Query: Send + Sync {
92    ///
93    /// You can run a query by calling `txn.query(q)`.
94    ///
95    /// # Arguments
96    ///
97    /// * `query`: GraphQL+- query
98    ///
99    /// # Errors
100    ///
101    /// If transaction is not initialized properly, return `EmptyTxn` error.
102    ///
103    /// gRPC errors can be returned also.
104    ///
105    /// # Example
106    ///
107    /// ```
108    /// use std::collections::HashMap;
109    /// use dgraph_tonic::{Client, Response, Query};
110    /// use serde::Deserialize;
111    /// #[cfg(feature = "acl")]
112    /// use dgraph_tonic::{AclClientType, LazyChannel};
113    ///
114    /// #[cfg(not(feature = "acl"))]
115    /// async fn client() -> Client {
116    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
117    /// }
118    ///
119    /// #[cfg(feature = "acl")]
120    /// async fn client() -> AclClientType<LazyChannel> {
121    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
122    ///     default.login("groot", "password").await.expect("Acl client")
123    /// }
124    ///
125    /// #[derive(Deserialize, Debug)]
126    /// struct Person {
127    ///   uid: String,
128    ///   name: String,
129    /// }
130    ///
131    /// #[derive(Deserialize, Debug)]
132    /// struct Persons {
133    ///   all: Vec<Person>
134    /// }
135    ///
136    /// #[tokio::main]
137    /// async fn main() {
138    ///     let q = r#"query all($a: string) {
139    ///     all(func: eq(name, "Alice")) {
140    ///       uid
141    ///       name
142    ///     }
143    ///   }"#;
144    ///
145    ///   let client = client().await;
146    ///   let mut txn = client.new_read_only_txn();
147    ///   let resp: Response = txn.query(q).await.expect("Query response");
148    ///   let persons: Persons = resp.try_into().expect("Persons");
149    /// }
150    /// ```
151    ///
152    async fn query<Q>(&mut self, query: Q) -> Result<Response>
153    where
154        Q: Into<String> + Send + Sync;
155
156    ///
157    /// You can run a query with rdf response by calling `txn.query_rdf(q)`.
158    ///
159    /// # Arguments
160    ///
161    /// * `query`: GraphQL+- query
162    ///
163    /// # Errors
164    ///
165    /// If transaction is not initialized properly, return `EmptyTxn` error.
166    ///
167    /// gRPC errors can be returned also.
168    ///
169    /// # Example
170    ///
171    /// ```
172    /// use std::collections::HashMap;
173    /// use dgraph_tonic::{Client, Response, Query};
174    /// use serde::Deserialize;
175    /// #[cfg(feature = "acl")]
176    /// use dgraph_tonic::{AclClientType, LazyChannel};
177    ///
178    /// #[cfg(not(feature = "acl"))]
179    /// async fn client() -> Client {
180    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
181    /// }
182    ///
183    /// #[cfg(feature = "acl")]
184    /// async fn client() -> AclClientType<LazyChannel> {
185    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
186    ///     default.login("groot", "password").await.expect("Acl client")
187    /// }
188    ///
189    ///
190    /// #[tokio::main]
191    /// async fn main() {
192    ///     let q = r#"query all($a: string) {
193    ///     all(func: eq(name, "Alice")) {
194    ///       uid
195    ///       name
196    ///     }
197    ///   }"#;
198    ///
199    ///   let client = client().await;
200    ///   let mut txn = client.new_read_only_txn();
201    ///   let resp: Response = txn.query_rdf(q).await.expect("Query response");
202    ///   println!("{}",String::from_utf8(resp.rdf).unwrap());
203    /// }
204    /// ```
205    ///
206    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
207    async fn query_rdf<Q>(&mut self, query: Q) -> Result<Response>
208    where
209        Q: Into<String> + Send + Sync;
210
211    ///
212    /// You can run a query with defined variables by calling `txn.query_with_vars(q, vars)`.
213    ///
214    /// # Arguments
215    ///
216    /// * `query`: GraphQL+- query
217    /// * `vars`: map of variables
218    ///
219    /// # Errors
220    ///
221    /// If transaction is not initialized properly, return `EmptyTxn` error.
222    ///
223    /// gRPC errors can be returned also.
224    ///
225    /// # Example
226    ///
227    /// ```
228    /// use std::collections::HashMap;
229    /// use dgraph_tonic::{Client, Response, Query};
230    /// use serde::Deserialize;
231    /// #[cfg(feature = "acl")]
232    /// use dgraph_tonic::{AclClientType, LazyChannel};
233    ///
234    /// #[cfg(not(feature = "acl"))]
235    /// async fn client() -> Client {
236    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
237    /// }
238    ///
239    /// #[cfg(feature = "acl")]
240    /// async fn client() -> AclClientType<LazyChannel> {
241    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
242    ///     default.login("groot", "password").await.expect("Acl client")
243    /// }
244    ///
245    /// #[derive(Deserialize, Debug)]
246    /// struct Person {
247    ///     uid: String,
248    ///     name: String,
249    /// }
250    ///
251    /// #[derive(Deserialize, Debug)]
252    /// struct Persons {
253    ///     all: Vec<Person>
254    /// }
255    ///
256    /// #[tokio::main]
257    /// async fn main() {
258    ///     let q = r#"query all($a: string) {
259    ///         all(func: eq(name, $a)) {
260    ///         uid
261    ///         name
262    ///         }
263    ///     }"#;
264    ///
265    ///     let mut vars = HashMap::new();
266    ///     vars.insert("$a", "Alice");
267    ///
268    ///     let client = client().await;
269    ///     let mut txn = client.new_read_only_txn();
270    ///     let resp: Response = txn.query_with_vars(q, vars).await.expect("query response");
271    ///     let persons: Persons = resp.try_into().expect("Persons");
272    /// }
273    /// ```
274    async fn query_with_vars<Q, K, V>(&mut self, query: Q, vars: HashMap<K, V>) -> Result<Response>
275    where
276        Q: Into<String> + Send + Sync,
277        K: Into<String> + Send + Sync + Eq + Hash,
278        V: Into<String> + Send + Sync;
279
280    ///
281    /// You can run a query with defined variables and rdf response by calling `txn.query_rdf_with_vars(q, vars)`.
282    ///
283    /// # Arguments
284    ///
285    /// * `query`: GraphQL+- query
286    /// * `vars`: map of variables
287    ///
288    /// # Errors
289    ///
290    /// If transaction is not initialized properly, return `EmptyTxn` error.
291    ///
292    /// gRPC errors can be returned also.
293    ///
294    /// # Example
295    ///
296    /// ```
297    /// use std::collections::HashMap;
298    /// use dgraph_tonic::{Client, Response, Query};
299    /// use serde::Deserialize;
300    /// #[cfg(feature = "acl")]
301    /// use dgraph_tonic::{AclClientType, LazyChannel};
302    ///
303    /// #[cfg(not(feature = "acl"))]
304    /// async fn client() -> Client {
305    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
306    /// }
307    ///
308    /// #[cfg(feature = "acl")]
309    /// async fn client() -> AclClientType<LazyChannel> {
310    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
311    ///     default.login("groot", "password").await.expect("Acl client")
312    /// }
313    ///
314    ///
315    /// #[tokio::main]
316    /// async fn main() {
317    ///     let q = r#"query all($a: string) {
318    ///         all(func: eq(name, $a)) {
319    ///         uid
320    ///         name
321    ///         }
322    ///     }"#;
323    ///
324    ///     let mut vars = HashMap::new();
325    ///     vars.insert("$a", "Alice");
326    ///
327    ///     let client = client().await;
328    ///     let mut txn = client.new_read_only_txn();
329    ///     let resp: Response = txn.query_rdf_with_vars(q, vars).await.expect("query response");
330    ///     println!("{}",String::from_utf8(resp.rdf).unwrap());
331    /// }
332    /// ```
333    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
334    async fn query_rdf_with_vars<Q, K, V>(
335        &mut self,
336        query: Q,
337        vars: HashMap<K, V>,
338    ) -> Result<Response>
339    where
340        Q: Into<String> + Send + Sync,
341        K: Into<String> + Send + Sync + Eq + Hash,
342        V: Into<String> + Send + Sync;
343}
344
345#[async_trait]
346impl<S: IState, C: ILazyClient> Query for TxnVariant<S, C> {
347    async fn query<Q>(&mut self, query: Q) -> Result<Response>
348    where
349        Q: Into<String> + Send + Sync,
350    {
351        self.query_with_vars(query, HashMap::<String, String, _>::with_capacity(0))
352            .await
353    }
354
355    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
356    async fn query_rdf<Q>(&mut self, query: Q) -> Result<Response>
357    where
358        Q: Into<String> + Send + Sync,
359    {
360        self.query_rdf_with_vars(query, HashMap::<String, String, _>::with_capacity(0))
361            .await
362    }
363
364    async fn query_with_vars<Q, K, V>(&mut self, query: Q, vars: HashMap<K, V>) -> Result<Response>
365    where
366        Q: Into<String> + Send + Sync,
367        K: Into<String> + Send + Sync + Eq + Hash,
368        V: Into<String> + Send + Sync,
369    {
370        let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
371            tmp.insert(k.into(), v.into());
372            tmp
373        });
374        let request = self.extra.query_request(&self.state, query.into(), vars);
375        let response = match self.stub.query(request).await {
376            Ok(response) => response,
377            Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
378        };
379        match response.txn.as_ref() {
380            Some(src) => self.context.merge_context(src)?,
381            None => anyhow::bail!(DgraphError::EmptyTxn),
382        };
383        Ok(response)
384    }
385
386    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
387    async fn query_rdf_with_vars<Q, K, V>(
388        &mut self,
389        query: Q,
390        vars: HashMap<K, V>,
391    ) -> Result<Response>
392    where
393        Q: Into<String> + Send + Sync,
394        K: Into<String> + Send + Sync + Eq + Hash,
395        V: Into<String> + Send + Sync,
396    {
397        let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
398            tmp.insert(k.into(), v.into());
399            tmp
400        });
401        let mut request = self.extra.query_request(&self.state, query.into(), vars);
402        request.resp_format = crate::api::request::RespFormat::Rdf as i32;
403        let response = match self.stub.query(request).await {
404            Ok(response) => response,
405            Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
406        };
407        match response.txn.as_ref() {
408            Some(src) => self.context.merge_context(src)?,
409            None => anyhow::bail!(DgraphError::EmptyTxn),
410        };
411        Ok(response)
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use std::collections::HashMap;
418    use std::time::Duration;
419
420    use serde_derive::{Deserialize, Serialize};
421
422    use crate::client::Client;
423    #[cfg(feature = "acl")]
424    use crate::client::{AclClientType, LazyChannel};
425    use crate::{Mutate, Mutation};
426
427    use super::*;
428
429    #[cfg(not(feature = "acl"))]
430    async fn client() -> Client {
431        Client::new("http://127.0.0.1:19080").unwrap()
432    }
433
434    #[cfg(feature = "acl")]
435    async fn client() -> AclClientType<LazyChannel> {
436        let default = Client::new("http://127.0.0.1:19080").unwrap();
437        default.login("groot", "password").await.unwrap()
438    }
439
440    #[derive(Serialize, Deserialize, Default, Debug)]
441    struct Person {
442        uid: String,
443        name: String,
444    }
445
446    #[derive(Serialize, Deserialize, Default, Debug)]
447    pub struct UidJson {
448        pub uids: Vec<Uid>,
449    }
450
451    #[derive(Serialize, Deserialize, Default, Debug)]
452    pub struct Uid {
453        pub uid: String,
454    }
455
456    async fn insert_data() {
457        let client = client().await;
458        let txn = client.new_mutated_txn();
459        let p = Person {
460            uid: "_:alice".to_string(),
461            name: "Alice".to_string(),
462        };
463        let mut mu = Mutation::new();
464        mu.set_set_json(&p).expect("Invalid JSON");
465        let response = txn.mutate_and_commit_now(mu).await;
466        assert!(response.is_ok());
467    }
468
469    #[tokio::test]
470    async fn mutate_and_commit_now() {
471        let client = client().await;
472        let txn = client.new_mutated_txn();
473        let p = Person {
474            uid: "_:alice".to_string(),
475            name: "Alice".to_string(),
476        };
477        let mut mu = Mutation::new();
478        mu.set_set_json(&p).expect("Invalid JSON");
479        let response = txn.mutate_and_commit_now(mu).await;
480        assert!(response.is_ok());
481    }
482
483    #[tokio::test]
484    async fn commit() {
485        let client = client().await;
486        let mut txn = client.new_mutated_txn();
487        //first mutation
488        let p = Person {
489            uid: "_:alice".to_string(),
490            name: "Alice".to_string(),
491        };
492        let mut mu = Mutation::new();
493        mu.set_set_json(&p).expect("Invalid JSON");
494        let response = txn.mutate(mu).await;
495        assert!(response.is_ok());
496        //second mutation
497        let p = Person {
498            uid: "_:mike".to_string(),
499            name: "Mike".to_string(),
500        };
501        let mut mu = Mutation::new();
502        mu.set_set_json(&p).expect("Invalid JSON");
503        let response = txn.mutate(mu).await;
504        assert!(response.is_ok());
505        //commit
506        let commit = txn.commit().await;
507        assert!(commit.is_ok())
508    }
509
510    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
511    #[tokio::test]
512    async fn upsert() {
513        let client = client().await;
514        client
515            .set_schema("name: string @index(exact) .")
516            .await
517            .expect("Schema is not updated");
518        let mut txn = client.new_mutated_txn();
519        //first mutation
520        let p = Person {
521            uid: "_:alice".to_string(),
522            name: "Alice".to_string(),
523        };
524        let mut mu = Mutation::new();
525        mu.set_set_json(&p).expect("Invalid JSON");
526        let response = txn.mutate(mu).await;
527        assert!(response.is_ok());
528        //second mutation
529        let p = Person {
530            uid: "_:mike".to_string(),
531            name: "Mike".to_string(),
532        };
533        let mut mu = Mutation::new();
534        mu.set_set_json(&p).expect("Invalid JSON");
535        let response = txn.mutate(mu).await;
536        assert!(response.is_ok());
537        //commit
538        let commit = txn.commit().await;
539        assert!(commit.is_ok());
540        //upser all alices with email
541        let query = r#"
542          query {
543              user as var(func: eq(name, "Alice"))
544          }"#;
545        let mut mu = Mutation::new();
546        mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
547        let mut txn = client.new_mutated_txn();
548        assert!(txn.upsert(query, mu).await.is_ok());
549        assert!(txn.commit().await.is_ok());
550    }
551
552    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
553    #[tokio::test]
554    async fn upsert_and_commit_now() {
555        let client = client().await;
556        client
557            .set_schema("name: string @index(exact) .")
558            .await
559            .expect("Schema is not updated");
560        let mut txn = client.new_mutated_txn();
561        //first mutation
562        let p = Person {
563            uid: "_:alice".to_string(),
564            name: "Alice".to_string(),
565        };
566        let mut mu = Mutation::new();
567        mu.set_set_json(&p).expect("Invalid JSON");
568        let response = txn.mutate(mu).await;
569        assert!(response.is_ok());
570        //second mutation
571        let p = Person {
572            uid: "_:mike".to_string(),
573            name: "Mike".to_string(),
574        };
575        let mut mu = Mutation::new();
576        mu.set_set_json(&p).expect("Invalid JSON");
577        let response = txn.mutate(mu).await;
578        assert!(response.is_ok());
579        //commit
580        let commit = txn.commit().await;
581        assert!(commit.is_ok());
582        //upser all alices with email
583        let query = r#"
584          query {
585              user as var(func: eq(name, "Alice"))
586          }"#;
587        let mut mu = Mutation::new();
588        mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
589        let txn = client.new_mutated_txn();
590        let response = txn.upsert_and_commit_now(query, mu).await;
591        assert!(response.is_ok())
592    }
593
594    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
595    #[tokio::test]
596    async fn upsert_with_vars() {
597        let client = client().await;
598        client
599            .set_schema("name: string @index(exact) .")
600            .await
601            .expect("Schema is not updated");
602        let mut txn = client.new_mutated_txn();
603        //first mutation
604        let p = Person {
605            uid: "_:alice".to_string(),
606            name: "Alice".to_string(),
607        };
608        let mut mu = Mutation::new();
609        mu.set_set_json(&p).expect("Invalid JSON");
610        let response = txn.mutate(mu).await;
611        assert!(response.is_ok());
612        //second mutation
613        let p = Person {
614            uid: "_:mike".to_string(),
615            name: "Mike".to_string(),
616        };
617        let mut mu = Mutation::new();
618        mu.set_set_json(&p).expect("Invalid JSON");
619        let response = txn.mutate(mu).await;
620        assert!(response.is_ok());
621        //commit
622        let commit = txn.commit().await;
623        assert!(commit.is_ok());
624        //upser all alices with email
625        let query = r#"
626          query alices($a: string) {
627              user as var(func: eq(name, $a))
628          }"#;
629        let mut mu = Mutation::new();
630        mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
631        let mut vars = HashMap::new();
632        vars.insert("$a", "Alice");
633        let mut txn = client.new_mutated_txn();
634        assert!(txn.upsert_with_vars(query, vars, vec![mu]).await.is_ok());
635        assert!(txn.commit().await.is_ok());
636    }
637
638    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
639    #[tokio::test]
640    async fn upsert_with_vars_and_commit_now() {
641        let client = client().await;
642        client
643            .set_schema("name: string @index(exact) .")
644            .await
645            .expect("Schema is not updated");
646        let mut txn = client.new_mutated_txn();
647        //first mutation
648        let p = Person {
649            uid: "_:alice".to_string(),
650            name: "Alice".to_string(),
651        };
652        let mut mu = Mutation::new();
653        mu.set_set_json(&p).expect("Invalid JSON");
654        let response = txn.mutate(mu).await;
655        assert!(response.is_ok());
656        //second mutation
657        let p = Person {
658            uid: "_:mike".to_string(),
659            name: "Mike".to_string(),
660        };
661        let mut mu = Mutation::new();
662        mu.set_set_json(&p).expect("Invalid JSON");
663        let response = txn.mutate(mu).await;
664        assert!(response.is_ok());
665        //commit
666        let commit = txn.commit().await;
667        assert!(commit.is_ok());
668        //upser all alices with email
669        let query = r#"
670          query alices($a: string) {
671              user as var(func: eq(name, $a))
672          }"#;
673        let mut mu = Mutation::new();
674        mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
675        let mut vars = HashMap::new();
676        vars.insert("$a", "Alice");
677        let txn = client.new_mutated_txn();
678        let response = txn
679            .upsert_with_vars_and_commit_now(query, vars, vec![mu])
680            .await;
681        assert!(response.is_ok())
682    }
683
684    #[tokio::test]
685    async fn query() {
686        let client = client().await;
687        client
688            .set_schema("name: string @index(exact) .")
689            .await
690            .expect("Schema is not updated");
691        insert_data().await;
692        let mut txn = client.new_read_only_txn();
693        let query = r#"{
694            uids(func: eq(name, "Alice")) {
695                uid
696            }
697        }"#;
698        let response = txn.query(query).await;
699        assert!(response.is_ok());
700        let mut json: UidJson = response.unwrap().try_into().unwrap();
701        assert!(json.uids.pop().is_some());
702    }
703
704    #[tokio::test]
705    async fn mutated_txn_query() {
706        let client = client().await;
707        client
708            .set_schema("name: string @index(exact) .")
709            .await
710            .expect("Schema is not updated");
711        insert_data().await;
712        let mut txn = client.new_mutated_txn();
713        let query = r#"{
714            uids(func: eq(name, "Alice")) {
715                uid
716            }
717        }"#;
718        let response = txn.query(query).await;
719        assert!(response.is_ok());
720        let mut json: UidJson = response.unwrap().try_into().unwrap();
721        assert!(json.uids.pop().is_some());
722        assert!(txn.discard().await.is_ok());
723    }
724
725    #[tokio::test]
726    async fn best_effort_txn_query() {
727        let client = client().await;
728        client
729            .set_schema("name: string @index(exact) .")
730            .await
731            .expect("Schema is not updated");
732        insert_data().await;
733        std::thread::sleep(Duration::from_secs(1));
734        let mut txn = client.new_best_effort_txn();
735        let query = r#"{
736            uids(func: eq(name, "Alice")) {
737                uid
738            }
739        }"#;
740        let response = txn.query(query).await;
741        assert!(response.is_ok());
742        let mut json: UidJson = response.unwrap().try_into().unwrap();
743        assert!(json.uids.pop().is_some());
744    }
745
746    #[tokio::test]
747    async fn query_with_vars() {
748        let client = client().await;
749        client
750            .set_schema("name: string @index(exact) .")
751            .await
752            .expect("Schema is not updated");
753        insert_data().await;
754        let mut txn = client.new_read_only_txn();
755        let query = r#"query all($a: string) {
756            uids(func: eq(name, $a)) {
757              uid
758            }
759          }"#;
760        let mut vars = HashMap::new();
761        vars.insert("$a", "Alice");
762        let response = txn.query_with_vars(query, vars).await;
763        assert!(response.is_ok());
764        let mut json: UidJson = response.unwrap().try_into().unwrap();
765        assert!(json.uids.pop().is_some());
766    }
767
768    #[tokio::test]
769    async fn mutated_txn_query_with_vars() {
770        let client = client().await;
771        client
772            .set_schema("name: string @index(exact) .")
773            .await
774            .expect("Schema is not updated");
775        insert_data().await;
776        let mut txn = client.new_mutated_txn();
777        let query = r#"query all($a: string) {
778            uids(func: eq(name, $a)) {
779              uid
780            }
781          }"#;
782        let mut vars = HashMap::new();
783        vars.insert("$a", "Alice");
784        let response = txn.query_with_vars(query, vars).await;
785        assert!(response.is_ok());
786        let mut json: UidJson = response.unwrap().try_into().unwrap();
787        assert!(json.uids.pop().is_some());
788        assert!(txn.discard().await.is_ok());
789    }
790
791    #[tokio::test]
792    async fn best_effort_txn_query_with_vars() {
793        let client = client().await;
794        client
795            .set_schema("name: string @index(exact) .")
796            .await
797            .expect("Schema is not updated");
798        insert_data().await;
799        let mut txn = client.new_best_effort_txn();
800        let query = r#"query all($a: string) {
801            uids(func: eq(name, $a)) {
802              uid
803            }
804          }"#;
805        let mut vars = HashMap::new();
806        vars.insert("$a", "Alice");
807        let response = txn.query_with_vars(query, vars).await;
808        assert!(response.is_ok());
809        let mut json: UidJson = response.unwrap().try_into().unwrap();
810        assert!(json.uids.pop().is_some());
811    }
812}