dgraph_tonic/txn/
mutated.rs

1use std::collections::hash_map::RandomState;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use anyhow::Result;
7use async_trait::async_trait;
8
9use crate::client::ILazyClient;
10use crate::errors::DgraphError;
11use crate::txn::default::Base;
12use crate::txn::{IState, Query, TxnState, TxnType, TxnVariant};
13#[cfg(feature = "dgraph-1-0")]
14use crate::Assigned;
15use crate::IDgraphClient;
16#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
17use crate::Response;
18use crate::{Mutation, Request};
19
20///
21/// In Dgraph v1.0.x is mutation response represented as Assigned object
22///
23#[cfg(feature = "dgraph-1-0")]
24pub type MutationResponse = Assigned;
25///
26/// In Dgraph v1.1.x is mutation response represented as Response object
27///
28#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
29pub type MutationResponse = Response;
30
31///
32/// Inner state for transaction which can modify data in DB.
33///
34#[derive(Clone, Debug)]
35pub struct Mutated<C: ILazyClient> {
36    base: Base<C>,
37    mutated: bool,
38}
39
40///
41/// Upsert mutation can be defined with one or more mutations
42///
43#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
44pub struct UpsertMutation {
45    mu: Vec<Mutation>,
46}
47
48#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
49impl From<Vec<Mutation>> for UpsertMutation {
50    fn from(mu: Vec<Mutation>) -> Self {
51        Self { mu }
52    }
53}
54
55#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
56impl From<Mutation> for UpsertMutation {
57    fn from(mu: Mutation) -> Self {
58        Self { mu: vec![mu] }
59    }
60}
61
62#[async_trait]
63impl<C: ILazyClient> IState for Mutated<C> {
64    ///
65    /// Do same query like default transaction
66    ///
67    fn query_request<S: ILazyClient>(
68        &self,
69        state: &TxnState<S>,
70        query: String,
71        vars: HashMap<String, String, RandomState>,
72    ) -> Request {
73        self.base.query_request(state, query, vars)
74    }
75}
76
77///
78/// Transaction variant with mutations support.
79///
80pub type TxnMutatedType<C> = TxnVariant<Mutated<C>, C>;
81
82impl<C: ILazyClient> TxnType<C> {
83    ///
84    /// Create new transaction for mutation operations.
85    ///
86    pub fn mutated(self) -> TxnMutatedType<C> {
87        TxnVariant {
88            state: self.state,
89            extra: Mutated {
90                base: self.extra,
91                mutated: false,
92            },
93        }
94    }
95}
96
97///
98/// Allowed mutation operation in Dgraph
99///
100#[async_trait]
101pub trait Mutate: Query {
102    ///
103    /// Discard transaction
104    ///
105    /// # Errors
106    ///
107    /// Return gRPC error.
108    ///
109    async fn discard(mut self) -> Result<()>;
110
111    ///
112    /// Commit transaction
113    ///
114    /// # Errors
115    ///
116    /// Return gRPC error.
117    ///
118    async fn commit(self) -> Result<()>;
119
120    ///
121    /// Adding or removing data in Dgraph is called a mutation.
122    ///
123    /// # Arguments
124    ///
125    /// * `mu`: required mutations
126    ///
127    /// # Errors
128    ///
129    /// * `GrpcError`: there is error in communication or server does not accept mutation
130    /// * `MissingTxnContext`: there is error in txn setup
131    ///
132    /// # Example
133    ///
134    /// ```
135    /// use dgraph_tonic::{Client, Mutation, Mutate};
136    /// use serde::Serialize;
137    /// #[cfg(feature = "acl")]
138    /// use dgraph_tonic::{AclClientType, LazyChannel};
139    ///
140    /// #[cfg(not(feature = "acl"))]
141    /// async fn client() -> Client {
142    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
143    /// }
144    ///
145    /// #[cfg(feature = "acl")]
146    /// async fn client() -> AclClientType<LazyChannel> {
147    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
148    ///     default.login("groot", "password").await.expect("Acl client")
149    /// }
150    ///
151    ///#[derive(Serialize)]
152    /// struct Person {
153    ///   uid: String,
154    ///   name: String,
155    /// }
156    /// #[tokio::main]
157    /// async fn main() {
158    ///    let p = Person {
159    ///        uid:  "_:alice".into(),
160    ///        name: "Alice".into(),
161    ///    };
162    ///
163    ///    let mut mu = Mutation::new();
164    ///    mu.set_set_json(&p).expect("JSON");
165    ///
166    ///    let client = client().await;
167    ///    let mut txn = client.new_mutated_txn();
168    ///    let result = txn.mutate(mu).await.expect("failed to create data");
169    ///    txn.commit().await.expect("Txn is not committed");
170    /// }
171    /// ```
172    ///
173    async fn mutate(&mut self, mu: Mutation) -> Result<MutationResponse>;
174
175    ///
176    /// Adding or removing data in Dgraph is called a mutation.
177    ///
178    /// Sometimes, you only want to commit a mutation, without querying anything further.
179    /// In such cases, you can use this function to indicate that the mutation must be immediately
180    /// committed.
181    ///
182    /// # Arguments
183    ///
184    /// * `mu`: required mutations
185    ///
186    /// # Errors
187    ///
188    /// * `GrpcError`: there is error in communication or server does not accept mutation
189    /// * `MissingTxnContext`: there is error in txn setup
190    ///
191    /// # Example
192    ///
193    /// ```
194    /// use dgraph_tonic::{Client, Mutation, Mutate};
195    /// use serde::Serialize;
196    /// #[cfg(feature = "acl")]
197    /// use dgraph_tonic::{AclClientType, LazyChannel};
198    ///
199    /// #[cfg(not(feature = "acl"))]
200    /// async fn client() -> Client {
201    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
202    /// }
203    ///
204    /// #[cfg(feature = "acl")]
205    /// async fn client() -> AclClientType<LazyChannel> {
206    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
207    ///     default.login("groot", "password").await.expect("Acl client")
208    /// }
209    ///
210    ///#[derive(Serialize)]
211    /// struct Person {
212    ///   uid: String,
213    ///   name: String,
214    /// }
215    ///
216    /// #[tokio::main]
217    /// async fn main() {
218    ///    let p = Person {
219    ///        uid:  "_:alice".into(),
220    ///        name: "Alice".into(),
221    ///    };
222    ///
223    ///    let mut mu = Mutation::new();
224    ///    mu.set_set_json(&p).expect("JSON");
225    ///
226    ///    let client = client().await;
227    ///    let txn = client.new_mutated_txn();
228    ///    let result = txn.mutate_and_commit_now(mu).await.expect("failed to create data");
229    /// }
230    /// ```
231    ///
232    async fn mutate_and_commit_now(mut self, mu: Mutation) -> Result<MutationResponse>;
233
234    ///
235    /// This function allows you to run upserts consisting of one query and one or more mutations.
236    ///
237    ///
238    /// # Arguments
239    ///
240    /// * `q`: Dgraph query
241    /// * `mu`: required mutations
242    ///
243    /// # Errors
244    ///
245    /// * `GrpcError`: there is error in communication or server does not accept mutation
246    /// * `MissingTxnContext`: there is error in txn setup
247    ///
248    /// # Example
249    ///
250    /// Upsert with one mutation
251    /// ```
252    /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
253    /// #[cfg(feature = "acl")]
254    /// use dgraph_tonic::{AclClientType, LazyChannel};
255    ///
256    /// #[cfg(not(feature = "acl"))]
257    /// async fn client() -> Client {
258    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
259    /// }
260    ///
261    /// #[cfg(feature = "acl")]
262    /// async fn client() -> AclClientType<LazyChannel> {
263    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
264    ///     default.login("groot", "password").await.expect("Acl client")
265    /// }
266    ///
267    /// #[tokio::main]
268    /// async fn main() {
269    ///     let q = r#"
270    ///         query {
271    ///             user as var(func: eq(email, "wrong_email@dgraph.io"))
272    ///         }"#;
273    ///
274    ///     let mut mu = Mutation::new();
275    ///     mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
276    ///
277    ///     let client = client().await;
278    ///     let op = Operation {
279    ///         schema: "email: string @index(exact) .".into(),
280    ///         ..Default::default()
281    ///     };
282    ///     client.alter(op).await.expect("Schema is not updated");
283    ///     let mut txn = client.new_mutated_txn();
284    ///     // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
285    ///     let response = txn.upsert(q, mu).await.expect("failed to upsert data");
286    ///     txn.commit().await.expect("Txn is not committed");
287    /// }
288    /// ```
289    ///
290    /// Upsert with more mutations
291    /// ```
292    /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
293    /// use std::collections::HashMap;
294    /// #[cfg(feature = "acl")]
295    /// use dgraph_tonic::{AclClientType, LazyChannel};
296    ///
297    /// #[cfg(not(feature = "acl"))]
298    /// async fn client() -> Client {
299    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
300    /// }
301    ///
302    /// #[cfg(feature = "acl")]
303    /// async fn client() -> AclClientType<LazyChannel> {
304    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
305    ///     default.login("groot", "password").await.expect("Acl client")
306    /// }
307    ///
308    /// #[tokio::main]
309    /// async fn main() {
310    ///     let q = r#"
311    ///         query {
312    ///             user as var(func: eq(email, "wrong_email@dgraph.io"))
313    ///         }"#;
314    ///
315    ///     let mut mu_1 = Mutation::new();
316    ///     mu_1.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
317    ///     mu_1.set_cond("@if(eq(len(user), 1))");
318    ///
319    ///     let mut mu_2 = Mutation::new();
320    ///     mu_2.set_set_nquads(r#"uid(user) <email> "another_email@dgraph.io" ."#);
321    ///     mu_2.set_cond("@if(eq(len(user), 2))");
322    ///
323    ///     let client = client().await;
324    ///     let op = Operation {
325    ///         schema: "email: string @index(exact) .".into(),
326    ///         ..Default::default()
327    ///     };
328    ///     client.alter(op).await.expect("Schema is not updated");
329    ///     let mut txn = client.new_mutated_txn();
330    ///     // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
331    ///     let response = txn.upsert(q, vec![mu_1, mu_2]).await.expect("failed to upsert data");
332    ///     txn.commit().await.expect("Txn is not committed");
333    /// }
334    /// ```
335    ///
336    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
337    async fn upsert<Q, M>(&mut self, query: Q, mu: M) -> Result<MutationResponse>
338    where
339        Q: Into<String> + Send + Sync,
340        M: Into<UpsertMutation> + Send + Sync;
341
342    ///
343    /// This function allows you to run upserts consisting of one query and one or more mutations.
344    ///
345    /// Transaction is committed.
346    ///
347    ///
348    /// # Arguments
349    ///
350    /// * `q`: Dgraph query
351    /// * `mu`: required mutations
352    ///
353    /// # Errors
354    ///
355    /// * `GrpcError`: there is error in communication or server does not accept mutation
356    /// * `MissingTxnContext`: there is error in txn setup
357    ///
358    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
359    async fn upsert_and_commit_now<Q, M>(mut self, query: Q, mu: M) -> Result<MutationResponse>
360    where
361        Q: Into<String> + Send + Sync,
362        M: Into<UpsertMutation> + Send + Sync;
363
364    ///
365    /// This function allows you to run upserts with query variables consisting of one query and one
366    /// ore more mutations.
367    ///
368    ///
369    /// # Arguments
370    ///
371    /// * `q`: Dgraph query
372    /// * `mu`: required mutations
373    /// * `vars`: query variables
374    ///
375    /// # Errors
376    ///
377    /// * `GrpcError`: there is error in communication or server does not accept mutation
378    /// * `MissingTxnContext`: there is error in txn setup
379    ///
380    /// # Example
381    ///
382    /// Upsert with only one mutation
383    /// ```
384    /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
385    /// use std::collections::HashMap;
386    /// #[cfg(feature = "acl")]
387    /// use dgraph_tonic::{AclClientType, LazyChannel};
388    ///
389    /// #[cfg(not(feature = "acl"))]
390    /// async fn client() -> Client {
391    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
392    /// }
393    ///
394    /// #[cfg(feature = "acl")]
395    /// async fn client() -> AclClientType<LazyChannel> {
396    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
397    ///     default.login("groot", "password").await.expect("Acl client")
398    /// }
399    ///
400    /// #[tokio::main]
401    /// async fn main() {
402    ///     let q = r#"
403    ///         query users($email: string) {
404    ///             user as var(func: eq(email, $email))
405    ///         }"#;
406    ///     let mut vars = HashMap::new();
407    ///     vars.insert("$email", "wrong_email@dgraph.io");
408    ///
409    ///     let mut mu = Mutation::new();
410    ///     mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
411    ///
412    ///     let client = client().await;
413    ///     let op = Operation {
414    ///         schema: "email: string @index(exact) .".into(),
415    ///         ..Default::default()
416    ///     };
417    ///     client.alter(op).await.expect("Schema is not updated");
418    ///     let mut txn = client.new_mutated_txn();
419    ///     // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
420    ///     let response = txn.upsert_with_vars(q, vars, mu).await.expect("failed to upsert data");
421    ///     txn.commit().await.expect("Txn is not committed");
422    /// }
423    /// ```
424    ///
425    /// Upsert with more mutations
426    /// ```
427    /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
428    /// use std::collections::HashMap;
429    /// #[cfg(feature = "acl")]
430    /// use dgraph_tonic::{AclClientType, LazyChannel};
431    ///
432    /// #[cfg(not(feature = "acl"))]
433    /// async fn client() -> Client {
434    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
435    /// }
436    ///
437    /// #[cfg(feature = "acl")]
438    /// async fn client() -> AclClientType<LazyChannel> {
439    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
440    ///     default.login("groot", "password").await.expect("Acl client")
441    /// }
442    ///
443    /// #[tokio::main]
444    /// async fn main() {
445    ///     let q = r#"
446    ///         query users($email: string) {
447    ///             user as var(func: eq(email, $email))
448    ///         }"#;
449    ///     let mut vars = HashMap::new();
450    ///     vars.insert("$email","wrong_email@dgraph.io");
451    ///
452    ///     let mut mu_1 = Mutation::new();
453    ///     mu_1.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
454    ///     mu_1.set_cond("@if(eq(len(user), 1))");
455    ///
456    ///     let mut mu_2 = Mutation::new();
457    ///     mu_2.set_set_nquads(r#"uid(user) <email> "another_email@dgraph.io" ."#);
458    ///     mu_2.set_cond("@if(eq(len(user), 2))");
459    ///
460    ///     let client = client().await;
461    ///     let op = Operation {
462    ///         schema: "email: string @index(exact) .".into(),
463    ///         ..Default::default()
464    ///     };
465    ///     client.alter(op).await.expect("Schema is not updated");
466    ///     let mut txn = client.new_mutated_txn();
467    ///     // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
468    ///     let response = txn.upsert_with_vars(q, vars, vec![mu_1, mu_2]).await.expect("failed to upsert data");
469    ///     txn.commit().await.expect("Txn is not committed");
470    /// }
471    /// ```
472    ///
473    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
474    async fn upsert_with_vars<Q, K, V, M>(
475        &mut self,
476        query: Q,
477        vars: HashMap<K, V>,
478        mu: M,
479    ) -> Result<MutationResponse>
480    where
481        Q: Into<String> + Send + Sync,
482        K: Into<String> + Send + Sync + Eq + Hash,
483        V: Into<String> + Send + Sync,
484        M: Into<UpsertMutation> + Send + Sync;
485
486    ///
487    /// This function allows you to run upserts with query variables consisting of one query and one
488    /// ore more mutations.
489    ///
490    /// Transaction is committed.
491    ///
492    ///
493    /// # Arguments
494    ///
495    /// * `q`: Dgraph query
496    /// * `mu`: required mutations
497    /// * `vars`: query variables
498    ///
499    /// # Errors
500    ///
501    /// * `GrpcError`: there is error in communication or server does not accept mutation
502    /// * `MissingTxnContext`: there is error in txn setup
503    ///
504    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
505    async fn upsert_with_vars_and_commit_now<Q, K, V, M>(
506        mut self,
507        query: Q,
508        vars: HashMap<K, V>,
509        mu: M,
510    ) -> Result<MutationResponse>
511    where
512        Q: Into<String> + Send + Sync,
513        K: Into<String> + Send + Sync + Eq + Hash,
514        V: Into<String> + Send + Sync,
515        M: Into<UpsertMutation> + Send + Sync;
516}
517
518#[async_trait]
519impl<C: ILazyClient> Mutate for TxnMutatedType<C> {
520    async fn discard(mut self) -> Result<()> {
521        self.context.aborted = true;
522        self.commit_or_abort().await
523    }
524
525    async fn commit(self) -> Result<()> {
526        self.commit_or_abort().await
527    }
528
529    async fn mutate(&mut self, mu: Mutation) -> Result<MutationResponse> {
530        self.do_mutation("", HashMap::<String, String>::with_capacity(0), mu, false)
531            .await
532    }
533
534    async fn mutate_and_commit_now(mut self, mu: Mutation) -> Result<MutationResponse> {
535        self.do_mutation("", HashMap::<String, String>::with_capacity(0), mu, true)
536            .await
537    }
538
539    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
540    async fn upsert<Q, M>(&mut self, query: Q, mu: M) -> Result<MutationResponse>
541    where
542        Q: Into<String> + Send + Sync,
543        M: Into<UpsertMutation> + Send + Sync,
544    {
545        self.do_mutation(
546            query,
547            HashMap::<String, String>::with_capacity(0),
548            mu,
549            false,
550        )
551        .await
552    }
553
554    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
555    async fn upsert_and_commit_now<Q, M>(mut self, query: Q, mu: M) -> Result<MutationResponse>
556    where
557        Q: Into<String> + Send + Sync,
558        M: Into<UpsertMutation> + Send + Sync,
559    {
560        self.do_mutation(query, HashMap::<String, String>::with_capacity(0), mu, true)
561            .await
562    }
563
564    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
565    async fn upsert_with_vars<Q, K, V, M>(
566        &mut self,
567        query: Q,
568        vars: HashMap<K, V>,
569        mu: M,
570    ) -> Result<MutationResponse>
571    where
572        Q: Into<String> + Send + Sync,
573        K: Into<String> + Send + Sync + Eq + Hash,
574        V: Into<String> + Send + Sync,
575        M: Into<UpsertMutation> + Send + Sync,
576    {
577        self.do_mutation(query, vars, mu, false).await
578    }
579
580    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
581    async fn upsert_with_vars_and_commit_now<Q, K, V, M>(
582        mut self,
583        query: Q,
584        vars: HashMap<K, V>,
585        mu: M,
586    ) -> Result<MutationResponse>
587    where
588        Q: Into<String> + Send + Sync,
589        K: Into<String> + Send + Sync + Eq + Hash,
590        V: Into<String> + Send + Sync,
591        M: Into<UpsertMutation> + Send + Sync,
592    {
593        self.do_mutation(query, vars, mu, true).await
594    }
595}
596
597impl<C: ILazyClient> TxnMutatedType<C> {
598    #[cfg(feature = "dgraph-1-0")]
599    async fn do_mutation<Q, K, V>(
600        &mut self,
601        _query: Q,
602        _vars: HashMap<K, V>,
603        mut mu: Mutation,
604        commit_now: bool,
605    ) -> Result<MutationResponse>
606    where
607        Q: Into<String> + Send + Sync,
608        K: Into<String> + Send + Sync + Eq + Hash,
609        V: Into<String> + Send + Sync,
610    {
611        self.extra.mutated = true;
612        mu.commit_now = commit_now;
613        mu.start_ts = self.context.start_ts;
614        let assigned = match self.stub.mutate(mu).await {
615            Ok(assigned) => assigned,
616            Err(err) => {
617                anyhow::bail!(DgraphError::GrpcError(err));
618            }
619        };
620        match assigned.context.as_ref() {
621            Some(src) => self.context.merge_context(src)?,
622            None => anyhow::bail!(DgraphError::MissingTxnContext),
623        }
624        Ok(assigned)
625    }
626
627    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
628    async fn do_mutation<Q, K, V, M>(
629        &mut self,
630        query: Q,
631        vars: HashMap<K, V>,
632        mu: M,
633        commit_now: bool,
634    ) -> Result<MutationResponse>
635    where
636        Q: Into<String> + Send + Sync,
637        K: Into<String> + Send + Sync + Eq + Hash,
638        V: Into<String> + Send + Sync,
639        M: Into<UpsertMutation>,
640    {
641        self.extra.mutated = true;
642        let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
643            tmp.insert(k.into(), v.into());
644            tmp
645        });
646        let mu: UpsertMutation = mu.into();
647        let request = Request {
648            query: query.into(),
649            vars,
650            start_ts: self.context.start_ts,
651            commit_now,
652            mutations: mu.mu,
653            ..Default::default()
654        };
655        let response = match self.stub.do_request(request).await {
656            Ok(response) => response,
657            Err(err) => {
658                anyhow::bail!(DgraphError::GrpcError(err));
659            }
660        };
661        match response.txn.as_ref() {
662            Some(txn) => self.context.merge_context(txn)?,
663            None => anyhow::bail!(DgraphError::MissingTxnContext),
664        }
665        Ok(response)
666    }
667
668    async fn commit_or_abort(self) -> Result<()> {
669        let extra = self.extra;
670        let state = *self.state;
671        if !extra.mutated {
672            return Ok(());
673        };
674        let mut client = state.stub;
675        let txn = state.context;
676        match client.commit_or_abort(txn).await {
677            Ok(_txn_context) => Ok(()),
678            Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
679        }
680    }
681}