dgraph_rs_http/
txn.rs

1use crate::client::DgraphClient;
2use crate::types::{TxnContext, Request, Extensions, Mutation, Assigned, DgraphError};
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6use serde_json::Value;
7use std::time::SystemTime;
8
9#[derive(Debug, Clone)]
10pub struct TxnFinished {}
11
12impl fmt::Display for TxnFinished {
13    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
14        write!(f, "ERR_FINISHED")
15    }
16}
17
18impl Error for TxnFinished {
19    fn description(&self) -> &str {
20        "Transaction has already been committed or discarded"
21    }
22
23    fn cause(&self) -> Option<&Error> {
24        // Generic error, underlying cause isn't tracked.
25        None
26    }
27}
28
29fn finished_error() -> TxnFinished {
30    let err = TxnFinished {};
31    err
32}
33
34#[derive(Debug, Clone)]
35pub struct StartTxnMismatch {}
36
37impl fmt::Display for StartTxnMismatch {
38    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39        write!(f, "ERR_START_TXN_MISMATCH")
40    }
41}
42
43impl Error for StartTxnMismatch {
44    fn description(&self) -> &str {
45        "StartTs mismatch"
46    }
47
48    fn cause(&self) -> Option<&Error> {
49        // Generic error, underlying cause isn't tracked.
50        None
51    }
52}
53
54fn start_txn_mismatch_error() -> StartTxnMismatch {
55    let err = StartTxnMismatch {};
56    err
57}
58
59#[derive(Debug, Clone)]
60pub struct MutationErrors {
61    errors: Vec<DgraphError>,
62}
63
64impl fmt::Display for MutationErrors {
65    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66        write!(f, "MUTATION_ERRORS. {:?}", self.errors)
67    }
68}
69
70impl Error for MutationErrors {
71    fn description(&self) -> &str {
72        "Mutation data contains errors"
73    }
74
75    fn cause(&self) -> Option<&Error> {
76        // Generic error, underlying cause isn't tracked.
77        None
78    }
79}
80
81fn mutation_error(errors: Vec<DgraphError>) -> MutationErrors {
82    let err = MutationErrors { errors };
83    err
84}
85
86fn merge_vec(mut a: Vec<String>, b: Vec<String>) -> Vec<String> {
87    a.extend(b);
88    a.sort_unstable();
89    a.dedup();
90    a
91}
92
93fn merge_context(ctx: TxnContext, extensions: Extensions) -> Result<TxnContext, Box<Error>> {
94    match extensions.txn {
95        Some(src) => {
96            let start_ts = if ctx.start_ts.unwrap() == 0 {
97                src.start_ts
98            } else if ctx.start_ts.unwrap() != src.start_ts.unwrap() {
99                return Err(Box::new(start_txn_mismatch_error()));
100            } else {
101                ctx.start_ts
102            };
103            let keys = if let Some(keys) = src.keys {
104                let actual_keys = ctx.keys.unwrap();
105                Some(merge_vec(keys, actual_keys))
106            } else {
107                None
108            };
109            let preds = if let Some(preds) = src.preds {
110                let actual_preds = ctx.preds.unwrap();
111                Some(merge_vec(preds, actual_preds))
112            } else {
113                None
114            };
115            let ctx = TxnContext {
116                start_ts,
117                keys,
118                preds,
119                aborted: Some(false),
120            };
121            Ok(ctx)
122        }
123        None => Ok(ctx)
124    }
125}
126
127pub struct Txn<'a> {
128    dc: &'a DgraphClient,
129    ctx: TxnContext,
130    finished: Option<bool>,
131    mutated: Option<bool>,
132}
133
134impl<'a> Txn<'a> {
135    pub fn new(dc: &DgraphClient) -> Txn {
136        let ctx = TxnContext {
137            start_ts: Some(0),
138            keys: Some(Vec::new()),
139            preds: Some(Vec::new()),
140            aborted: None,
141        };
142        Txn {
143            dc,
144            ctx,
145            finished: Some(false),
146            mutated: Some(false),
147        }
148    }
149
150    pub fn query(&mut self, q: String) -> Result<Value, Box<Error>> {
151        self.query_with_vars(q, None)
152    }
153
154    pub fn query_with_vars(&mut self, q: String, vars: Option<HashMap<String, String>>)
155                           -> Result<Value, Box<Error>> {
156        let finished = self.finished.unwrap();
157        if finished == true {
158            self.dc.debug(format!("Query request (ERR_FINISHED):\nquery = {}\nvars = {:?}",
159                                  q, vars));
160            return Err(Box::new(finished_error()));
161        }
162        let query = q.clone();
163        let req = Request {
164            query: Some(q),
165            start_ts: self.ctx.start_ts,
166            vars,
167        };
168        self.dc.debug(format!("Query request:\n{}\nvars:{}",
169                              query,
170                              if let Some(vars) = &req.vars {
171                                  serde_json::to_string(vars)?
172                              } else {
173                                  String::from("")
174                              }));
175        let client = self.dc.any_client();
176        let start_time = SystemTime::now();
177        match client.query(req) {
178            Ok((value, mut extensions)) => {
179                let ctx = self.ctx.clone();
180                let latency = &mut extensions.server_latency;
181                latency.network_ns = Some(start_time.elapsed()?.subsec_nanos());
182                let value_json = serde_json::to_string(&value)?;
183                let debug_msg = format!("Query response:\n{}\nQuery {:?}", value_json, latency);
184                match merge_context(ctx, extensions) {
185                    Ok(context) => {
186                        self.ctx = context;
187                        self.dc.debug(debug_msg);
188                        Ok(value)
189                    }
190                    Err(e) => Err(e),
191                }
192            }
193            Err(e) => Err(e),
194        }
195    }
196
197    pub fn mutation(&mut self, mut mu: Mutation) -> Result<Assigned, Box<Error>> {
198        if self.finished.unwrap() == true {
199            self.dc.debug(format!("Mutate request (ERR_FINISHED):\nmutation = {:?}", mu));
200            return Err(Box::new(finished_error()));
201        }
202        self.mutated = Some(true);
203        mu.start_ts = self.ctx.start_ts;
204        self.dc.debug(format!("Mutate request:\n{:?}", mu));
205        if let Some(commit_now) = mu.commit_now {
206            if commit_now == true {
207                self.finished = Some(true);
208            }
209        };
210        let client = self.dc.any_client();
211        let start_time = SystemTime::now();
212        match client.mutate(mu) {
213            Ok(assigned) => {
214                let result = assigned.clone();
215                if let Some(mut extensions) = assigned.extensions {
216                    let ctx = self.ctx.clone();
217                    let latency = &mut extensions.server_latency;
218                    latency.network_ns = Some(start_time.elapsed()?.subsec_nanos());
219                    let debug_msg = format!("Mutate response:\n{:?}\nMutate {:?}",
220                                            result, latency);
221                    match merge_context(ctx, extensions) {
222                        Ok(context) => {
223                            self.ctx = context;
224                            self.dc.debug(debug_msg);
225                            Ok(result)
226                        }
227                        Err(e) => Err(e),
228                    }
229                } else if let Some(errors) = assigned.errors {
230                    Err(Box::new(mutation_error(errors)))
231                } else {
232                    Ok(result)
233                }
234            }
235            Err(e) => {
236                match self.discard() {
237                    Ok(_) => Err(e),
238                    Err(_) => Err(e),
239                }
240            }
241        }
242    }
243
244    pub fn commit(&mut self) -> Result<(), Box<Error>> {
245        if self.finished.unwrap() == true {
246            return Err(Box::new(finished_error()));
247        }
248        self.finished = Some(true);
249        let client = self.dc.any_client();
250        match client.commit(self.ctx.clone()) {
251            Ok(_ctx) => {
252                Ok(())
253            }
254            Err(e) => Err(e),
255        }
256    }
257
258    pub fn discard(&mut self) -> Result<(), Box<Error>> {
259        if self.finished.unwrap() == true {
260            return Ok(());
261        }
262        self.finished = Some(true);
263        if self.mutated.unwrap() == false {
264            return Ok(());
265        }
266        let ctx = &mut self.ctx;
267        ctx.aborted = Some(true);
268        let client = self.dc.any_client();
269        match client.abort(ctx) {
270            Ok(_ctx) => {
271                Ok(())
272            }
273            Err(e) => Err(e),
274        }
275    }
276}