dgraph_rs_http/
client_stub.rs1use reqwest::{Certificate, Client, Identity, RequestBuilder, StatusCode};
2use crate::types::{Request, TxnContext, Mutation, Assigned, Operation, Extensions, DgraphError};
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6use serde_json::Value;
7
8#[derive(Debug, Clone)]
9pub struct InvalidServerStatus {
10 status_code: u16,
11}
12
13impl fmt::Display for InvalidServerStatus {
14 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
15 write!(f, "Invalid status code. {}", self.status_code)
16 }
17}
18
19impl Error for InvalidServerStatus {
20 fn description(&self) -> &str {
21 "Invalid respond code from dGraph Server"
22 }
23
24 fn cause(&self) -> Option<&Error> {
25 None
27 }
28}
29
30fn server_error(status_code: &StatusCode) -> InvalidServerStatus {
31 let status_code = status_code.as_u16();
32 let err = InvalidServerStatus {
33 status_code
34 };
35 err
36}
37
38#[derive(Debug, Clone)]
39pub struct MutationIsAlter {}
40
41impl fmt::Display for MutationIsAlter {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 write!(f, "Mutation mu argument in alter")
44 }
45}
46
47impl Error for MutationIsAlter {
48 fn description(&self) -> &str {
49 "Mutation mu argument does not contain any set or delete operation"
50 }
51
52 fn cause(&self) -> Option<&Error> {
53 None
55 }
56}
57
58fn mutation_error() -> MutationIsAlter {
59 let err = MutationIsAlter {};
60 err
61}
62
63#[derive(Debug, Clone)]
64pub struct InvalidAlter {}
65
66impl fmt::Display for InvalidAlter {
67 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68 write!(f, "Invalid op argument in alter")
69 }
70}
71
72impl Error for InvalidAlter {
73 fn description(&self) -> &str {
74 "Invalid op argument in alter"
75 }
76
77 fn cause(&self) -> Option<&Error> {
78 None
80 }
81}
82
83fn alter_error() -> InvalidAlter {
84 let err = InvalidAlter {};
85 err
86}
87
88#[derive(Debug, Clone)]
89pub struct QueryErrors {
90 errors: Vec<DgraphError>,
91}
92
93impl fmt::Display for QueryErrors {
94 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
95 write!(f, "QUERY_ERRORS. {:?}", self.errors)
96 }
97}
98
99impl Error for QueryErrors {
100 fn description(&self) -> &str {
101 "Mutation data contains errors"
102 }
103
104 fn cause(&self) -> Option<&Error> {
105 None
107 }
108}
109
110fn query_error(errors: Vec<DgraphError>) -> QueryErrors {
111 let err = QueryErrors { errors };
112 err
113}
114
115#[derive(Debug, Clone)]
116pub struct ClientError {}
117
118impl fmt::Display for ClientError {
119 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120 write!(f, "REQWEST_CLIENT_ERROR")
121 }
122}
123
124impl Error for ClientError {
125 fn description(&self) -> &str {
126 "Cannot build reqwest http client"
127 }
128
129 fn cause(&self) -> Option<&Error> {
130 None
132 }
133}
134
135fn client_error() -> Box<ClientError> {
136 Box::new(ClientError {})
137}
138
139fn response(request: RequestBuilder) -> Result<(Value, Extensions), Box<Error>> {
140 let mut response = request.send()?;
141 let status_code = response.status();
142 if status_code.is_success() {
143 let resp: serde_json::Value = response.json()?;
144 match resp.get("errors") {
145 Some(errors) => {
146 let errors_value = errors.clone();
147 let errors: Vec<DgraphError> = serde_json::from_value(errors_value).unwrap();
148 Err(Box::new(query_error(errors)))
149 }
150 None => {
151 let extensions_value = resp.get("extensions").unwrap().clone();
152 let extensions: Extensions = serde_json::from_value(extensions_value).unwrap();
153 ;
154 let data: Value = resp.get("data").unwrap().clone();
155 Ok((data, extensions))
156 }
157 }
158 } else {
159 Err(Box::new(server_error(&status_code)))
160 }
161}
162
163fn text_response(request: RequestBuilder) -> Result<String, Box<Error>> {
164 let mut response = request.send()?;
165 let status_code = response.status();
166 if status_code.is_success() {
167 let resp = response.text()?;
168 Ok(resp)
169 } else {
170 Err(Box::new(server_error(&status_code)))
171 }
172}
173
174fn txn_context_response(request: RequestBuilder) -> Result<TxnContext, Box<Error>> {
175 let mut response = request.send()?;
176 let status_code = response.status();
177 if status_code.is_success() {
178 let response = response.json()?;
179 Ok(response)
180 } else {
181 Err(Box::new(server_error(&status_code)))
182 }
183}
184
185#[derive(Debug)]
186pub struct DgraphClientStub {
187 addr: String,
188 client: Client,
189}
190
191pub struct TlsConfig {
192 pub root_crt: Certificate,
193 pub identity: Identity,
194}
195
196impl DgraphClientStub {
197 fn get_url(&self, path: String) -> String {
198 let slash = if self.addr.ends_with("/") { "".to_string() } else { "/".to_string() };
199 let url = format!("{}{}{}", self.addr, slash, path);
200 url
201 }
202
203 pub fn new(addr: Option<String>, tls_config: Option<TlsConfig>) -> Result<DgraphClientStub, Box<Error>> {
205 let client_builder =
206 if let Some(tls_config) = tls_config {
207 let client_builder = reqwest::ClientBuilder::new();
208 client_builder.add_root_certificate(tls_config.root_crt).identity(tls_config.identity)
209 } else {
210 reqwest::ClientBuilder::new()
211 };
212 let client = match client_builder.gzip(true).build() {
213 Ok(client) => client,
214 Err(_e) => return Err(client_error()),
215 };
216 let addr = match addr {
217 None => String::from("http://localhost:8080"),
218 Some(a) => a,
219 };
220 Ok(DgraphClientStub {
221 addr,
222 client,
223 })
224 }
225
226 pub fn query(&self, req: Request) -> Result<(Value, Extensions), Box<Error>> {
228 let path = if let Some(start_ts) = req.start_ts {
229 format!("query/{}", start_ts)
230 } else {
231 String::from("query")
232 };
233 let request = self.client.post(&self.get_url(path));
234 let request = if let Some(vars) = req.vars {
235 let vars = match serde_json::to_string(&vars) {
236 Ok(header) => header,
237 Err(_e) => String::from("{}"),
238 };
239 request.header("X-Dgraph-Vars", vars)
240 } else {
241 request
242 };
243 let request = if let Some(data) = req.query {
244 request.body(data)
245 } else {
246 request
247 };
248 response(request)
249 }
250
251 pub fn health(&self) -> Result<String, Box<Error>> {
253 let request = self.client.get(&self.get_url(String::from("health")));
254 text_response(request)
255 }
256
257 pub fn abort(&self, ctx: &TxnContext) -> Result<TxnContext, Box<Error>> {
259 let path = format!("abort/{}", ctx.start_ts.unwrap());
260 let request = self.client.post(&self.get_url(path));
261 txn_context_response(request)
262 }
263
264 pub fn commit(&self, ctx: TxnContext) -> Result<TxnContext, Box<Error>> {
266 let path = format!("commit/{}", ctx.start_ts.unwrap());
267 let request = self.client.post(&self.get_url(path));
268 let body = if let Some(keys) = ctx.keys {
269 keys
270 } else {
271 Vec::new()
272 };
273 let request = request.json(&body);
274 txn_context_response(request)
275 }
276
277 pub fn mutate(&self, mu: Mutation) -> Result<Assigned, Box<Error>> {
279 let mut using_json = false;
280 let body = if mu.set_json.is_some() || mu.delete_json.is_some() {
281 using_json = true;
282 format!("{{{set}{delete}}}",
283 set = if let Some(set) = mu.set_json { format!("\"set\": {}", set) } else { String::from("") },
284 delete = if let Some(del) = mu.delete_json { format!("\"delete\": {}", del) } else { String::from("") })
285 } else if mu.set_n_quads.is_some() || mu.delete_n_quads.is_some() {
286 let mut body = String::new();
287 body.push_str("{");
288 if let Some(set_n_quads) = mu.set_n_quads {
289 body.push_str("set {");
290 body.push_str(set_n_quads.as_str());
291 body.push_str("}");
292 }
293 if let Some(delete_n_quads) = mu.delete_n_quads {
294 body.push_str("delete {");
295 body.push_str(delete_n_quads.as_str());
296 body.push_str("}");
297 }
298 body.push_str("}");
299 body
300 } else {
301 return Err(Box::new(mutation_error()));
302 };
303 let path = if let Some(start_ts) = mu.start_ts {
304 format!("mutate/{}", start_ts)
305 } else {
306 String::from("mutate")
307 };
308 let request = self.client.post(&self.get_url(path));
309 let request = if using_json == true {
310 request.header("X-Dgraph-MutationType", "json")
311 } else {
312 request
313 };
314 let request = match mu.commit_now {
315 Some(commit_now) => if commit_now == true {
316 request.header("X-Dgraph-CommitNow", "true")
317 } else {
318 request
319 },
320 None => request,
321 };
322 let request = request.body(body);
323 let mut response = request.send()?;
324 let status_code = response.status();
325 if status_code.is_success() {
326 let response = response.json()?;
327 Ok(response)
328 } else {
329 Err(Box::new(server_error(&status_code)))
330 }
331 }
332
333 pub fn alter(&self, op: Operation) -> Result<String, Box<Error>> {
335 let body = if let Some(schema) = op.schema {
336 schema
337 } else if let Some(drop_att) = op.drop_att {
338 let mut body = HashMap::new();
339 body.insert("drop_attr", drop_att.as_str());
340 serde_json::to_string(&body)?
341 } else if let Some(drop_all) = op.drop_all {
342 if drop_all == true {
343 let mut body = HashMap::new();
344 body.insert("drop_all", true);
345 serde_json::to_string(&body)?
346 } else {
347 return Err(Box::new(alter_error()));
348 }
349 } else {
350 return Err(Box::new(alter_error()));
351 };
352 let request = self.client.post(&self.get_url(String::from("alter")));
353 let request = request.body(body);
354 text_response(request)
355 }
356}