dgraph_rs_http/
client_stub.rs

1use reqwest::{Certificate, Client, Identity, RequestBuilder, StatusCode};
2use crate::types::{Request, TxnContext, Mutation, Assigned, Operation, Extensions, DgraphError};
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6use serde_json::Value;
7
8#[derive(Debug, Clone)]
9pub struct InvalidServerStatus {
10    status_code: u16,
11}
12
13impl fmt::Display for InvalidServerStatus {
14    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
15        write!(f, "Invalid status code. {}", self.status_code)
16    }
17}
18
19impl Error for InvalidServerStatus {
20    fn description(&self) -> &str {
21        "Invalid respond code from dGraph Server"
22    }
23
24    fn cause(&self) -> Option<&Error> {
25        // Generic error, underlying cause isn't tracked.
26        None
27    }
28}
29
30fn server_error(status_code: &StatusCode) -> InvalidServerStatus {
31    let status_code = status_code.as_u16();
32    let err = InvalidServerStatus {
33        status_code
34    };
35    err
36}
37
38#[derive(Debug, Clone)]
39pub struct MutationIsAlter {}
40
41impl fmt::Display for MutationIsAlter {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        write!(f, "Mutation mu argument in alter")
44    }
45}
46
47impl Error for MutationIsAlter {
48    fn description(&self) -> &str {
49        "Mutation mu argument does not contain any set or delete operation"
50    }
51
52    fn cause(&self) -> Option<&Error> {
53        // Generic error, underlying cause isn't tracked.
54        None
55    }
56}
57
58fn mutation_error() -> MutationIsAlter {
59    let err = MutationIsAlter {};
60    err
61}
62
63#[derive(Debug, Clone)]
64pub struct InvalidAlter {}
65
66impl fmt::Display for InvalidAlter {
67    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68        write!(f, "Invalid op argument in alter")
69    }
70}
71
72impl Error for InvalidAlter {
73    fn description(&self) -> &str {
74        "Invalid op argument in alter"
75    }
76
77    fn cause(&self) -> Option<&Error> {
78        // Generic error, underlying cause isn't tracked.
79        None
80    }
81}
82
83fn alter_error() -> InvalidAlter {
84    let err = InvalidAlter {};
85    err
86}
87
88#[derive(Debug, Clone)]
89pub struct QueryErrors {
90    errors: Vec<DgraphError>,
91}
92
93impl fmt::Display for QueryErrors {
94    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
95        write!(f, "QUERY_ERRORS. {:?}", self.errors)
96    }
97}
98
99impl Error for QueryErrors {
100    fn description(&self) -> &str {
101        "Mutation data contains errors"
102    }
103
104    fn cause(&self) -> Option<&Error> {
105        // Generic error, underlying cause isn't tracked.
106        None
107    }
108}
109
110fn query_error(errors: Vec<DgraphError>) -> QueryErrors {
111    let err = QueryErrors { errors };
112    err
113}
114
115#[derive(Debug, Clone)]
116pub struct ClientError {}
117
118impl fmt::Display for ClientError {
119    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120        write!(f, "REQWEST_CLIENT_ERROR")
121    }
122}
123
124impl Error for ClientError {
125    fn description(&self) -> &str {
126        "Cannot build reqwest http client"
127    }
128
129    fn cause(&self) -> Option<&Error> {
130        // Generic error, underlying cause isn't tracked.
131        None
132    }
133}
134
135fn client_error() -> Box<ClientError> {
136    Box::new(ClientError {})
137}
138
139fn response(request: RequestBuilder) -> Result<(Value, Extensions), Box<Error>> {
140    let mut response = request.send()?;
141    let status_code = response.status();
142    if status_code.is_success() {
143        let resp: serde_json::Value = response.json()?;
144        match resp.get("errors") {
145            Some(errors) => {
146                let errors_value = errors.clone();
147                let errors: Vec<DgraphError> = serde_json::from_value(errors_value).unwrap();
148                Err(Box::new(query_error(errors)))
149            }
150            None => {
151                let extensions_value = resp.get("extensions").unwrap().clone();
152                let extensions: Extensions = serde_json::from_value(extensions_value).unwrap();
153                ;
154                let data: Value = resp.get("data").unwrap().clone();
155                Ok((data, extensions))
156            }
157        }
158    } else {
159        Err(Box::new(server_error(&status_code)))
160    }
161}
162
163fn text_response(request: RequestBuilder) -> Result<String, Box<Error>> {
164    let mut response = request.send()?;
165    let status_code = response.status();
166    if status_code.is_success() {
167        let resp = response.text()?;
168        Ok(resp)
169    } else {
170        Err(Box::new(server_error(&status_code)))
171    }
172}
173
174fn txn_context_response(request: RequestBuilder) -> Result<TxnContext, Box<Error>> {
175    let mut response = request.send()?;
176    let status_code = response.status();
177    if status_code.is_success() {
178        let response = response.json()?;
179        Ok(response)
180    } else {
181        Err(Box::new(server_error(&status_code)))
182    }
183}
184
185#[derive(Debug)]
186pub struct DgraphClientStub {
187    addr: String,
188    client: Client,
189}
190
191pub struct TlsConfig {
192    pub root_crt: Certificate,
193    pub identity: Identity,
194}
195
196impl DgraphClientStub {
197    fn get_url(&self, path: String) -> String {
198        let slash = if self.addr.ends_with("/") { "".to_string() } else { "/".to_string() };
199        let url = format!("{}{}{}", self.addr, slash, path);
200        url
201    }
202
203    /// Construct new DgraphClientStub
204    pub fn new(addr: Option<String>, tls_config: Option<TlsConfig>) -> Result<DgraphClientStub, Box<Error>> {
205        let client_builder =
206            if let Some(tls_config) = tls_config {
207                let client_builder = reqwest::ClientBuilder::new();
208                client_builder.add_root_certificate(tls_config.root_crt).identity(tls_config.identity)
209            } else {
210                reqwest::ClientBuilder::new()
211            };
212        let client = match client_builder.gzip(true).build() {
213            Ok(client) => client,
214            Err(_e) => return Err(client_error()),
215        };
216        let addr = match addr {
217            None => String::from("http://localhost:8080"),
218            Some(a) => a,
219        };
220        Ok(DgraphClientStub {
221            addr,
222            client,
223        })
224    }
225
226    /// query dGraph
227    pub fn query(&self, req: Request) -> Result<(Value, Extensions), Box<Error>> {
228        let path = if let Some(start_ts) = req.start_ts {
229            format!("query/{}", start_ts)
230        } else {
231            String::from("query")
232        };
233        let request = self.client.post(&self.get_url(path));
234        let request = if let Some(vars) = req.vars {
235            let vars = match serde_json::to_string(&vars) {
236                Ok(header) => header,
237                Err(_e) => String::from("{}"),
238            };
239            request.header("X-Dgraph-Vars", vars)
240        } else {
241            request
242        };
243        let request = if let Some(data) = req.query {
244            request.body(data)
245        } else {
246            request
247        };
248        response(request)
249    }
250
251    /// check health of dGraph server
252    pub fn health(&self) -> Result<String, Box<Error>> {
253        let request = self.client.get(&self.get_url(String::from("health")));
254        text_response(request)
255    }
256
257    /// abort transaction
258    pub fn abort(&self, ctx: &TxnContext) -> Result<TxnContext, Box<Error>> {
259        let path = format!("abort/{}", ctx.start_ts.unwrap());
260        let request = self.client.post(&self.get_url(path));
261        txn_context_response(request)
262    }
263
264    /// commit transaction
265    pub fn commit(&self, ctx: TxnContext) -> Result<TxnContext, Box<Error>> {
266        let path = format!("commit/{}", ctx.start_ts.unwrap());
267        let request = self.client.post(&self.get_url(path));
268        let body = if let Some(keys) = ctx.keys {
269            keys
270        } else {
271            Vec::new()
272        };
273        let request = request.json(&body);
274        txn_context_response(request)
275    }
276
277    /// make mutation on dGraph data
278    pub fn mutate(&self, mu: Mutation) -> Result<Assigned, Box<Error>> {
279        let mut using_json = false;
280        let body = if mu.set_json.is_some() || mu.delete_json.is_some() {
281            using_json = true;
282            format!("{{{set}{delete}}}",
283                    set = if let Some(set) = mu.set_json { format!("\"set\": {}", set) } else { String::from("") },
284                    delete = if let Some(del) = mu.delete_json { format!("\"delete\": {}", del) } else { String::from("") })
285        } else if mu.set_n_quads.is_some() || mu.delete_n_quads.is_some() {
286            let mut body = String::new();
287            body.push_str("{");
288            if let Some(set_n_quads) = mu.set_n_quads {
289                body.push_str("set {");
290                body.push_str(set_n_quads.as_str());
291                body.push_str("}");
292            }
293            if let Some(delete_n_quads) = mu.delete_n_quads {
294                body.push_str("delete {");
295                body.push_str(delete_n_quads.as_str());
296                body.push_str("}");
297            }
298            body.push_str("}");
299            body
300        } else {
301            return Err(Box::new(mutation_error()));
302        };
303        let path = if let Some(start_ts) = mu.start_ts {
304            format!("mutate/{}", start_ts)
305        } else {
306            String::from("mutate")
307        };
308        let request = self.client.post(&self.get_url(path));
309        let request = if using_json == true {
310            request.header("X-Dgraph-MutationType", "json")
311        } else {
312            request
313        };
314        let request = match mu.commit_now {
315            Some(commit_now) => if commit_now == true {
316                request.header("X-Dgraph-CommitNow", "true")
317            } else {
318                request
319            },
320            None => request,
321        };
322        let request = request.body(body);
323        let mut response = request.send()?;
324        let status_code = response.status();
325        if status_code.is_success() {
326            let response = response.json()?;
327            Ok(response)
328        } else {
329            Err(Box::new(server_error(&status_code)))
330        }
331    }
332
333    /// alter dGraph schema
334    pub fn alter(&self, op: Operation) -> Result<String, Box<Error>> {
335        let body = if let Some(schema) = op.schema {
336            schema
337        } else if let Some(drop_att) = op.drop_att {
338            let mut body = HashMap::new();
339            body.insert("drop_attr", drop_att.as_str());
340            serde_json::to_string(&body)?
341        } else if let Some(drop_all) = op.drop_all {
342            if drop_all == true {
343                let mut body = HashMap::new();
344                body.insert("drop_all", true);
345                serde_json::to_string(&body)?
346            } else {
347                return Err(Box::new(alter_error()));
348            }
349        } else {
350            return Err(Box::new(alter_error()));
351        };
352        let request = self.client.post(&self.get_url(String::from("alter")));
353        let request = request.body(body);
354        text_response(request)
355    }
356}