1use crate::client::DgraphClient;
2use crate::types::{TxnContext, Request, Extensions, Mutation, Assigned, DgraphError};
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6use serde_json::Value;
7use std::time::SystemTime;
8
9#[derive(Debug, Clone)]
10pub struct TxnFinished {}
11
12impl fmt::Display for TxnFinished {
13 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
14 write!(f, "ERR_FINISHED")
15 }
16}
17
18impl Error for TxnFinished {
19 fn description(&self) -> &str {
20 "Transaction has already been committed or discarded"
21 }
22
23 fn cause(&self) -> Option<&Error> {
24 None
26 }
27}
28
29fn finished_error() -> TxnFinished {
30 let err = TxnFinished {};
31 err
32}
33
34#[derive(Debug, Clone)]
35pub struct StartTxnMismatch {}
36
37impl fmt::Display for StartTxnMismatch {
38 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39 write!(f, "ERR_START_TXN_MISMATCH")
40 }
41}
42
43impl Error for StartTxnMismatch {
44 fn description(&self) -> &str {
45 "StartTs mismatch"
46 }
47
48 fn cause(&self) -> Option<&Error> {
49 None
51 }
52}
53
54fn start_txn_mismatch_error() -> StartTxnMismatch {
55 let err = StartTxnMismatch {};
56 err
57}
58
59#[derive(Debug, Clone)]
60pub struct MutationErrors {
61 errors: Vec<DgraphError>,
62}
63
64impl fmt::Display for MutationErrors {
65 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66 write!(f, "MUTATION_ERRORS. {:?}", self.errors)
67 }
68}
69
70impl Error for MutationErrors {
71 fn description(&self) -> &str {
72 "Mutation data contains errors"
73 }
74
75 fn cause(&self) -> Option<&Error> {
76 None
78 }
79}
80
81fn mutation_error(errors: Vec<DgraphError>) -> MutationErrors {
82 let err = MutationErrors { errors };
83 err
84}
85
86fn merge_vec(mut a: Vec<String>, b: Vec<String>) -> Vec<String> {
87 a.extend(b);
88 a.sort_unstable();
89 a.dedup();
90 a
91}
92
93fn merge_context(ctx: TxnContext, extensions: Extensions) -> Result<TxnContext, Box<Error>> {
94 match extensions.txn {
95 Some(src) => {
96 let start_ts = if ctx.start_ts.unwrap() == 0 {
97 src.start_ts
98 } else if ctx.start_ts.unwrap() != src.start_ts.unwrap() {
99 return Err(Box::new(start_txn_mismatch_error()));
100 } else {
101 ctx.start_ts
102 };
103 let keys = if let Some(keys) = src.keys {
104 let actual_keys = ctx.keys.unwrap();
105 Some(merge_vec(keys, actual_keys))
106 } else {
107 None
108 };
109 let preds = if let Some(preds) = src.preds {
110 let actual_preds = ctx.preds.unwrap();
111 Some(merge_vec(preds, actual_preds))
112 } else {
113 None
114 };
115 let ctx = TxnContext {
116 start_ts,
117 keys,
118 preds,
119 aborted: Some(false),
120 };
121 Ok(ctx)
122 }
123 None => Ok(ctx)
124 }
125}
126
127pub struct Txn<'a> {
128 dc: &'a DgraphClient,
129 ctx: TxnContext,
130 finished: Option<bool>,
131 mutated: Option<bool>,
132}
133
134impl<'a> Txn<'a> {
135 pub fn new(dc: &DgraphClient) -> Txn {
136 let ctx = TxnContext {
137 start_ts: Some(0),
138 keys: Some(Vec::new()),
139 preds: Some(Vec::new()),
140 aborted: None,
141 };
142 Txn {
143 dc,
144 ctx,
145 finished: Some(false),
146 mutated: Some(false),
147 }
148 }
149
150 pub fn query(&mut self, q: String) -> Result<Value, Box<Error>> {
151 self.query_with_vars(q, None)
152 }
153
154 pub fn query_with_vars(&mut self, q: String, vars: Option<HashMap<String, String>>)
155 -> Result<Value, Box<Error>> {
156 let finished = self.finished.unwrap();
157 if finished == true {
158 self.dc.debug(format!("Query request (ERR_FINISHED):\nquery = {}\nvars = {:?}",
159 q, vars));
160 return Err(Box::new(finished_error()));
161 }
162 let query = q.clone();
163 let req = Request {
164 query: Some(q),
165 start_ts: self.ctx.start_ts,
166 vars,
167 };
168 self.dc.debug(format!("Query request:\n{}\nvars:{}",
169 query,
170 if let Some(vars) = &req.vars {
171 serde_json::to_string(vars)?
172 } else {
173 String::from("")
174 }));
175 let client = self.dc.any_client();
176 let start_time = SystemTime::now();
177 match client.query(req) {
178 Ok((value, mut extensions)) => {
179 let ctx = self.ctx.clone();
180 let latency = &mut extensions.server_latency;
181 latency.network_ns = Some(start_time.elapsed()?.subsec_nanos());
182 let value_json = serde_json::to_string(&value)?;
183 let debug_msg = format!("Query response:\n{}\nQuery {:?}", value_json, latency);
184 match merge_context(ctx, extensions) {
185 Ok(context) => {
186 self.ctx = context;
187 self.dc.debug(debug_msg);
188 Ok(value)
189 }
190 Err(e) => Err(e),
191 }
192 }
193 Err(e) => Err(e),
194 }
195 }
196
197 pub fn mutation(&mut self, mut mu: Mutation) -> Result<Assigned, Box<Error>> {
198 if self.finished.unwrap() == true {
199 self.dc.debug(format!("Mutate request (ERR_FINISHED):\nmutation = {:?}", mu));
200 return Err(Box::new(finished_error()));
201 }
202 self.mutated = Some(true);
203 mu.start_ts = self.ctx.start_ts;
204 self.dc.debug(format!("Mutate request:\n{:?}", mu));
205 if let Some(commit_now) = mu.commit_now {
206 if commit_now == true {
207 self.finished = Some(true);
208 }
209 };
210 let client = self.dc.any_client();
211 let start_time = SystemTime::now();
212 match client.mutate(mu) {
213 Ok(assigned) => {
214 let result = assigned.clone();
215 if let Some(mut extensions) = assigned.extensions {
216 let ctx = self.ctx.clone();
217 let latency = &mut extensions.server_latency;
218 latency.network_ns = Some(start_time.elapsed()?.subsec_nanos());
219 let debug_msg = format!("Mutate response:\n{:?}\nMutate {:?}",
220 result, latency);
221 match merge_context(ctx, extensions) {
222 Ok(context) => {
223 self.ctx = context;
224 self.dc.debug(debug_msg);
225 Ok(result)
226 }
227 Err(e) => Err(e),
228 }
229 } else if let Some(errors) = assigned.errors {
230 Err(Box::new(mutation_error(errors)))
231 } else {
232 Ok(result)
233 }
234 }
235 Err(e) => {
236 match self.discard() {
237 Ok(_) => Err(e),
238 Err(_) => Err(e),
239 }
240 }
241 }
242 }
243
244 pub fn commit(&mut self) -> Result<(), Box<Error>> {
245 if self.finished.unwrap() == true {
246 return Err(Box::new(finished_error()));
247 }
248 self.finished = Some(true);
249 let client = self.dc.any_client();
250 match client.commit(self.ctx.clone()) {
251 Ok(_ctx) => {
252 Ok(())
253 }
254 Err(e) => Err(e),
255 }
256 }
257
258 pub fn discard(&mut self) -> Result<(), Box<Error>> {
259 if self.finished.unwrap() == true {
260 return Ok(());
261 }
262 self.finished = Some(true);
263 if self.mutated.unwrap() == false {
264 return Ok(());
265 }
266 let ctx = &mut self.ctx;
267 ctx.aborted = Some(true);
268 let client = self.dc.any_client();
269 match client.abort(ctx) {
270 Ok(_ctx) => {
271 Ok(())
272 }
273 Err(e) => Err(e),
274 }
275 }
276}