dgraph_tonic/txn/mutated.rs
1use std::collections::hash_map::RandomState;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use anyhow::Result;
7use async_trait::async_trait;
8
9use crate::client::ILazyClient;
10use crate::errors::DgraphError;
11use crate::txn::default::Base;
12use crate::txn::{IState, Query, TxnState, TxnType, TxnVariant};
13#[cfg(feature = "dgraph-1-0")]
14use crate::Assigned;
15use crate::IDgraphClient;
16#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
17use crate::Response;
18use crate::{Mutation, Request};
19
20///
21/// In Dgraph v1.0.x is mutation response represented as Assigned object
22///
23#[cfg(feature = "dgraph-1-0")]
24pub type MutationResponse = Assigned;
25///
26/// In Dgraph v1.1.x is mutation response represented as Response object
27///
28#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
29pub type MutationResponse = Response;
30
31///
32/// Inner state for transaction which can modify data in DB.
33///
34#[derive(Clone, Debug)]
35pub struct Mutated<C: ILazyClient> {
36 base: Base<C>,
37 mutated: bool,
38}
39
40///
41/// Upsert mutation can be defined with one or more mutations
42///
43#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
44pub struct UpsertMutation {
45 mu: Vec<Mutation>,
46}
47
48#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
49impl From<Vec<Mutation>> for UpsertMutation {
50 fn from(mu: Vec<Mutation>) -> Self {
51 Self { mu }
52 }
53}
54
55#[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
56impl From<Mutation> for UpsertMutation {
57 fn from(mu: Mutation) -> Self {
58 Self { mu: vec![mu] }
59 }
60}
61
62#[async_trait]
63impl<C: ILazyClient> IState for Mutated<C> {
64 ///
65 /// Do same query like default transaction
66 ///
67 fn query_request<S: ILazyClient>(
68 &self,
69 state: &TxnState<S>,
70 query: String,
71 vars: HashMap<String, String, RandomState>,
72 ) -> Request {
73 self.base.query_request(state, query, vars)
74 }
75}
76
77///
78/// Transaction variant with mutations support.
79///
80pub type TxnMutatedType<C> = TxnVariant<Mutated<C>, C>;
81
82impl<C: ILazyClient> TxnType<C> {
83 ///
84 /// Create new transaction for mutation operations.
85 ///
86 pub fn mutated(self) -> TxnMutatedType<C> {
87 TxnVariant {
88 state: self.state,
89 extra: Mutated {
90 base: self.extra,
91 mutated: false,
92 },
93 }
94 }
95}
96
97///
98/// Allowed mutation operation in Dgraph
99///
100#[async_trait]
101pub trait Mutate: Query {
102 ///
103 /// Discard transaction
104 ///
105 /// # Errors
106 ///
107 /// Return gRPC error.
108 ///
109 async fn discard(mut self) -> Result<()>;
110
111 ///
112 /// Commit transaction
113 ///
114 /// # Errors
115 ///
116 /// Return gRPC error.
117 ///
118 async fn commit(self) -> Result<()>;
119
120 ///
121 /// Adding or removing data in Dgraph is called a mutation.
122 ///
123 /// # Arguments
124 ///
125 /// * `mu`: required mutations
126 ///
127 /// # Errors
128 ///
129 /// * `GrpcError`: there is error in communication or server does not accept mutation
130 /// * `MissingTxnContext`: there is error in txn setup
131 ///
132 /// # Example
133 ///
134 /// ```
135 /// use dgraph_tonic::{Client, Mutation, Mutate};
136 /// use serde::Serialize;
137 /// #[cfg(feature = "acl")]
138 /// use dgraph_tonic::{AclClientType, LazyChannel};
139 ///
140 /// #[cfg(not(feature = "acl"))]
141 /// async fn client() -> Client {
142 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
143 /// }
144 ///
145 /// #[cfg(feature = "acl")]
146 /// async fn client() -> AclClientType<LazyChannel> {
147 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
148 /// default.login("groot", "password").await.expect("Acl client")
149 /// }
150 ///
151 ///#[derive(Serialize)]
152 /// struct Person {
153 /// uid: String,
154 /// name: String,
155 /// }
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let p = Person {
159 /// uid: "_:alice".into(),
160 /// name: "Alice".into(),
161 /// };
162 ///
163 /// let mut mu = Mutation::new();
164 /// mu.set_set_json(&p).expect("JSON");
165 ///
166 /// let client = client().await;
167 /// let mut txn = client.new_mutated_txn();
168 /// let result = txn.mutate(mu).await.expect("failed to create data");
169 /// txn.commit().await.expect("Txn is not committed");
170 /// }
171 /// ```
172 ///
173 async fn mutate(&mut self, mu: Mutation) -> Result<MutationResponse>;
174
175 ///
176 /// Adding or removing data in Dgraph is called a mutation.
177 ///
178 /// Sometimes, you only want to commit a mutation, without querying anything further.
179 /// In such cases, you can use this function to indicate that the mutation must be immediately
180 /// committed.
181 ///
182 /// # Arguments
183 ///
184 /// * `mu`: required mutations
185 ///
186 /// # Errors
187 ///
188 /// * `GrpcError`: there is error in communication or server does not accept mutation
189 /// * `MissingTxnContext`: there is error in txn setup
190 ///
191 /// # Example
192 ///
193 /// ```
194 /// use dgraph_tonic::{Client, Mutation, Mutate};
195 /// use serde::Serialize;
196 /// #[cfg(feature = "acl")]
197 /// use dgraph_tonic::{AclClientType, LazyChannel};
198 ///
199 /// #[cfg(not(feature = "acl"))]
200 /// async fn client() -> Client {
201 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
202 /// }
203 ///
204 /// #[cfg(feature = "acl")]
205 /// async fn client() -> AclClientType<LazyChannel> {
206 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
207 /// default.login("groot", "password").await.expect("Acl client")
208 /// }
209 ///
210 ///#[derive(Serialize)]
211 /// struct Person {
212 /// uid: String,
213 /// name: String,
214 /// }
215 ///
216 /// #[tokio::main]
217 /// async fn main() {
218 /// let p = Person {
219 /// uid: "_:alice".into(),
220 /// name: "Alice".into(),
221 /// };
222 ///
223 /// let mut mu = Mutation::new();
224 /// mu.set_set_json(&p).expect("JSON");
225 ///
226 /// let client = client().await;
227 /// let txn = client.new_mutated_txn();
228 /// let result = txn.mutate_and_commit_now(mu).await.expect("failed to create data");
229 /// }
230 /// ```
231 ///
232 async fn mutate_and_commit_now(mut self, mu: Mutation) -> Result<MutationResponse>;
233
234 ///
235 /// This function allows you to run upserts consisting of one query and one or more mutations.
236 ///
237 ///
238 /// # Arguments
239 ///
240 /// * `q`: Dgraph query
241 /// * `mu`: required mutations
242 ///
243 /// # Errors
244 ///
245 /// * `GrpcError`: there is error in communication or server does not accept mutation
246 /// * `MissingTxnContext`: there is error in txn setup
247 ///
248 /// # Example
249 ///
250 /// Upsert with one mutation
251 /// ```
252 /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
253 /// #[cfg(feature = "acl")]
254 /// use dgraph_tonic::{AclClientType, LazyChannel};
255 ///
256 /// #[cfg(not(feature = "acl"))]
257 /// async fn client() -> Client {
258 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
259 /// }
260 ///
261 /// #[cfg(feature = "acl")]
262 /// async fn client() -> AclClientType<LazyChannel> {
263 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
264 /// default.login("groot", "password").await.expect("Acl client")
265 /// }
266 ///
267 /// #[tokio::main]
268 /// async fn main() {
269 /// let q = r#"
270 /// query {
271 /// user as var(func: eq(email, "wrong_email@dgraph.io"))
272 /// }"#;
273 ///
274 /// let mut mu = Mutation::new();
275 /// mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
276 ///
277 /// let client = client().await;
278 /// let op = Operation {
279 /// schema: "email: string @index(exact) .".into(),
280 /// ..Default::default()
281 /// };
282 /// client.alter(op).await.expect("Schema is not updated");
283 /// let mut txn = client.new_mutated_txn();
284 /// // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
285 /// let response = txn.upsert(q, mu).await.expect("failed to upsert data");
286 /// txn.commit().await.expect("Txn is not committed");
287 /// }
288 /// ```
289 ///
290 /// Upsert with more mutations
291 /// ```
292 /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
293 /// use std::collections::HashMap;
294 /// #[cfg(feature = "acl")]
295 /// use dgraph_tonic::{AclClientType, LazyChannel};
296 ///
297 /// #[cfg(not(feature = "acl"))]
298 /// async fn client() -> Client {
299 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
300 /// }
301 ///
302 /// #[cfg(feature = "acl")]
303 /// async fn client() -> AclClientType<LazyChannel> {
304 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
305 /// default.login("groot", "password").await.expect("Acl client")
306 /// }
307 ///
308 /// #[tokio::main]
309 /// async fn main() {
310 /// let q = r#"
311 /// query {
312 /// user as var(func: eq(email, "wrong_email@dgraph.io"))
313 /// }"#;
314 ///
315 /// let mut mu_1 = Mutation::new();
316 /// mu_1.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
317 /// mu_1.set_cond("@if(eq(len(user), 1))");
318 ///
319 /// let mut mu_2 = Mutation::new();
320 /// mu_2.set_set_nquads(r#"uid(user) <email> "another_email@dgraph.io" ."#);
321 /// mu_2.set_cond("@if(eq(len(user), 2))");
322 ///
323 /// let client = client().await;
324 /// let op = Operation {
325 /// schema: "email: string @index(exact) .".into(),
326 /// ..Default::default()
327 /// };
328 /// client.alter(op).await.expect("Schema is not updated");
329 /// let mut txn = client.new_mutated_txn();
330 /// // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
331 /// let response = txn.upsert(q, vec![mu_1, mu_2]).await.expect("failed to upsert data");
332 /// txn.commit().await.expect("Txn is not committed");
333 /// }
334 /// ```
335 ///
336 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
337 async fn upsert<Q, M>(&mut self, query: Q, mu: M) -> Result<MutationResponse>
338 where
339 Q: Into<String> + Send + Sync,
340 M: Into<UpsertMutation> + Send + Sync;
341
342 ///
343 /// This function allows you to run upserts consisting of one query and one or more mutations.
344 ///
345 /// Transaction is committed.
346 ///
347 ///
348 /// # Arguments
349 ///
350 /// * `q`: Dgraph query
351 /// * `mu`: required mutations
352 ///
353 /// # Errors
354 ///
355 /// * `GrpcError`: there is error in communication or server does not accept mutation
356 /// * `MissingTxnContext`: there is error in txn setup
357 ///
358 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
359 async fn upsert_and_commit_now<Q, M>(mut self, query: Q, mu: M) -> Result<MutationResponse>
360 where
361 Q: Into<String> + Send + Sync,
362 M: Into<UpsertMutation> + Send + Sync;
363
364 ///
365 /// This function allows you to run upserts with query variables consisting of one query and one
366 /// ore more mutations.
367 ///
368 ///
369 /// # Arguments
370 ///
371 /// * `q`: Dgraph query
372 /// * `mu`: required mutations
373 /// * `vars`: query variables
374 ///
375 /// # Errors
376 ///
377 /// * `GrpcError`: there is error in communication or server does not accept mutation
378 /// * `MissingTxnContext`: there is error in txn setup
379 ///
380 /// # Example
381 ///
382 /// Upsert with only one mutation
383 /// ```
384 /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
385 /// use std::collections::HashMap;
386 /// #[cfg(feature = "acl")]
387 /// use dgraph_tonic::{AclClientType, LazyChannel};
388 ///
389 /// #[cfg(not(feature = "acl"))]
390 /// async fn client() -> Client {
391 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
392 /// }
393 ///
394 /// #[cfg(feature = "acl")]
395 /// async fn client() -> AclClientType<LazyChannel> {
396 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
397 /// default.login("groot", "password").await.expect("Acl client")
398 /// }
399 ///
400 /// #[tokio::main]
401 /// async fn main() {
402 /// let q = r#"
403 /// query users($email: string) {
404 /// user as var(func: eq(email, $email))
405 /// }"#;
406 /// let mut vars = HashMap::new();
407 /// vars.insert("$email", "wrong_email@dgraph.io");
408 ///
409 /// let mut mu = Mutation::new();
410 /// mu.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
411 ///
412 /// let client = client().await;
413 /// let op = Operation {
414 /// schema: "email: string @index(exact) .".into(),
415 /// ..Default::default()
416 /// };
417 /// client.alter(op).await.expect("Schema is not updated");
418 /// let mut txn = client.new_mutated_txn();
419 /// // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
420 /// let response = txn.upsert_with_vars(q, vars, mu).await.expect("failed to upsert data");
421 /// txn.commit().await.expect("Txn is not committed");
422 /// }
423 /// ```
424 ///
425 /// Upsert with more mutations
426 /// ```
427 /// use dgraph_tonic::{Client, Mutation, Operation, Mutate};
428 /// use std::collections::HashMap;
429 /// #[cfg(feature = "acl")]
430 /// use dgraph_tonic::{AclClientType, LazyChannel};
431 ///
432 /// #[cfg(not(feature = "acl"))]
433 /// async fn client() -> Client {
434 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
435 /// }
436 ///
437 /// #[cfg(feature = "acl")]
438 /// async fn client() -> AclClientType<LazyChannel> {
439 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
440 /// default.login("groot", "password").await.expect("Acl client")
441 /// }
442 ///
443 /// #[tokio::main]
444 /// async fn main() {
445 /// let q = r#"
446 /// query users($email: string) {
447 /// user as var(func: eq(email, $email))
448 /// }"#;
449 /// let mut vars = HashMap::new();
450 /// vars.insert("$email","wrong_email@dgraph.io");
451 ///
452 /// let mut mu_1 = Mutation::new();
453 /// mu_1.set_set_nquads(r#"uid(user) <email> "correct_email@dgraph.io" ."#);
454 /// mu_1.set_cond("@if(eq(len(user), 1))");
455 ///
456 /// let mut mu_2 = Mutation::new();
457 /// mu_2.set_set_nquads(r#"uid(user) <email> "another_email@dgraph.io" ."#);
458 /// mu_2.set_cond("@if(eq(len(user), 2))");
459 ///
460 /// let client = client().await;
461 /// let op = Operation {
462 /// schema: "email: string @index(exact) .".into(),
463 /// ..Default::default()
464 /// };
465 /// client.alter(op).await.expect("Schema is not updated");
466 /// let mut txn = client.new_mutated_txn();
467 /// // Upsert: If wrong_email found, update the existing data or else perform a new mutation.
468 /// let response = txn.upsert_with_vars(q, vars, vec![mu_1, mu_2]).await.expect("failed to upsert data");
469 /// txn.commit().await.expect("Txn is not committed");
470 /// }
471 /// ```
472 ///
473 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
474 async fn upsert_with_vars<Q, K, V, M>(
475 &mut self,
476 query: Q,
477 vars: HashMap<K, V>,
478 mu: M,
479 ) -> Result<MutationResponse>
480 where
481 Q: Into<String> + Send + Sync,
482 K: Into<String> + Send + Sync + Eq + Hash,
483 V: Into<String> + Send + Sync,
484 M: Into<UpsertMutation> + Send + Sync;
485
486 ///
487 /// This function allows you to run upserts with query variables consisting of one query and one
488 /// ore more mutations.
489 ///
490 /// Transaction is committed.
491 ///
492 ///
493 /// # Arguments
494 ///
495 /// * `q`: Dgraph query
496 /// * `mu`: required mutations
497 /// * `vars`: query variables
498 ///
499 /// # Errors
500 ///
501 /// * `GrpcError`: there is error in communication or server does not accept mutation
502 /// * `MissingTxnContext`: there is error in txn setup
503 ///
504 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
505 async fn upsert_with_vars_and_commit_now<Q, K, V, M>(
506 mut self,
507 query: Q,
508 vars: HashMap<K, V>,
509 mu: M,
510 ) -> Result<MutationResponse>
511 where
512 Q: Into<String> + Send + Sync,
513 K: Into<String> + Send + Sync + Eq + Hash,
514 V: Into<String> + Send + Sync,
515 M: Into<UpsertMutation> + Send + Sync;
516}
517
518#[async_trait]
519impl<C: ILazyClient> Mutate for TxnMutatedType<C> {
520 async fn discard(mut self) -> Result<()> {
521 self.context.aborted = true;
522 self.commit_or_abort().await
523 }
524
525 async fn commit(self) -> Result<()> {
526 self.commit_or_abort().await
527 }
528
529 async fn mutate(&mut self, mu: Mutation) -> Result<MutationResponse> {
530 self.do_mutation("", HashMap::<String, String>::with_capacity(0), mu, false)
531 .await
532 }
533
534 async fn mutate_and_commit_now(mut self, mu: Mutation) -> Result<MutationResponse> {
535 self.do_mutation("", HashMap::<String, String>::with_capacity(0), mu, true)
536 .await
537 }
538
539 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
540 async fn upsert<Q, M>(&mut self, query: Q, mu: M) -> Result<MutationResponse>
541 where
542 Q: Into<String> + Send + Sync,
543 M: Into<UpsertMutation> + Send + Sync,
544 {
545 self.do_mutation(
546 query,
547 HashMap::<String, String>::with_capacity(0),
548 mu,
549 false,
550 )
551 .await
552 }
553
554 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
555 async fn upsert_and_commit_now<Q, M>(mut self, query: Q, mu: M) -> Result<MutationResponse>
556 where
557 Q: Into<String> + Send + Sync,
558 M: Into<UpsertMutation> + Send + Sync,
559 {
560 self.do_mutation(query, HashMap::<String, String>::with_capacity(0), mu, true)
561 .await
562 }
563
564 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
565 async fn upsert_with_vars<Q, K, V, M>(
566 &mut self,
567 query: Q,
568 vars: HashMap<K, V>,
569 mu: M,
570 ) -> Result<MutationResponse>
571 where
572 Q: Into<String> + Send + Sync,
573 K: Into<String> + Send + Sync + Eq + Hash,
574 V: Into<String> + Send + Sync,
575 M: Into<UpsertMutation> + Send + Sync,
576 {
577 self.do_mutation(query, vars, mu, false).await
578 }
579
580 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
581 async fn upsert_with_vars_and_commit_now<Q, K, V, M>(
582 mut self,
583 query: Q,
584 vars: HashMap<K, V>,
585 mu: M,
586 ) -> Result<MutationResponse>
587 where
588 Q: Into<String> + Send + Sync,
589 K: Into<String> + Send + Sync + Eq + Hash,
590 V: Into<String> + Send + Sync,
591 M: Into<UpsertMutation> + Send + Sync,
592 {
593 self.do_mutation(query, vars, mu, true).await
594 }
595}
596
597impl<C: ILazyClient> TxnMutatedType<C> {
598 #[cfg(feature = "dgraph-1-0")]
599 async fn do_mutation<Q, K, V>(
600 &mut self,
601 _query: Q,
602 _vars: HashMap<K, V>,
603 mut mu: Mutation,
604 commit_now: bool,
605 ) -> Result<MutationResponse>
606 where
607 Q: Into<String> + Send + Sync,
608 K: Into<String> + Send + Sync + Eq + Hash,
609 V: Into<String> + Send + Sync,
610 {
611 self.extra.mutated = true;
612 mu.commit_now = commit_now;
613 mu.start_ts = self.context.start_ts;
614 let assigned = match self.stub.mutate(mu).await {
615 Ok(assigned) => assigned,
616 Err(err) => {
617 anyhow::bail!(DgraphError::GrpcError(err));
618 }
619 };
620 match assigned.context.as_ref() {
621 Some(src) => self.context.merge_context(src)?,
622 None => anyhow::bail!(DgraphError::MissingTxnContext),
623 }
624 Ok(assigned)
625 }
626
627 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
628 async fn do_mutation<Q, K, V, M>(
629 &mut self,
630 query: Q,
631 vars: HashMap<K, V>,
632 mu: M,
633 commit_now: bool,
634 ) -> Result<MutationResponse>
635 where
636 Q: Into<String> + Send + Sync,
637 K: Into<String> + Send + Sync + Eq + Hash,
638 V: Into<String> + Send + Sync,
639 M: Into<UpsertMutation>,
640 {
641 self.extra.mutated = true;
642 let vars = vars.into_iter().fold(HashMap::new(), |mut tmp, (k, v)| {
643 tmp.insert(k.into(), v.into());
644 tmp
645 });
646 let mu: UpsertMutation = mu.into();
647 let request = Request {
648 query: query.into(),
649 vars,
650 start_ts: self.context.start_ts,
651 commit_now,
652 mutations: mu.mu,
653 ..Default::default()
654 };
655 let response = match self.stub.do_request(request).await {
656 Ok(response) => response,
657 Err(err) => {
658 anyhow::bail!(DgraphError::GrpcError(err));
659 }
660 };
661 match response.txn.as_ref() {
662 Some(txn) => self.context.merge_context(txn)?,
663 None => anyhow::bail!(DgraphError::MissingTxnContext),
664 }
665 Ok(response)
666 }
667
668 async fn commit_or_abort(self) -> Result<()> {
669 let extra = self.extra;
670 let state = *self.state;
671 if !extra.mutated {
672 return Ok(());
673 };
674 let mut client = state.stub;
675 let txn = state.context;
676 match client.commit_or_abort(txn).await {
677 Ok(_txn_context) => Ok(()),
678 Err(err) => anyhow::bail!(DgraphError::GrpcError(err)),
679 }
680 }
681}