use reqwest::{Certificate, Client, Identity, RequestBuilder, StatusCode};
use crate::types::{Request, TxnContext, Mutation, Assigned, Operation, Extensions, DgraphError};
use std::error::Error;
use std::fmt;
use std::collections::HashMap;
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct InvalidServerStatus {
status_code: u16,
}
impl fmt::Display for InvalidServerStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Invalid status code. {}", self.status_code)
}
}
impl Error for InvalidServerStatus {
fn description(&self) -> &str {
"Invalid respond code from dGraph Server"
}
fn cause(&self) -> Option<&Error> {
None
}
}
fn server_error(status_code: &StatusCode) -> InvalidServerStatus {
let status_code = status_code.as_u16();
let err = InvalidServerStatus {
status_code
};
err
}
#[derive(Debug, Clone)]
pub struct MutationIsAlter {}
impl fmt::Display for MutationIsAlter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Mutation mu argument in alter")
}
}
impl Error for MutationIsAlter {
fn description(&self) -> &str {
"Mutation mu argument does not contain any set or delete operation"
}
fn cause(&self) -> Option<&Error> {
None
}
}
fn mutation_error() -> MutationIsAlter {
let err = MutationIsAlter {};
err
}
#[derive(Debug, Clone)]
pub struct InvalidAlter {}
impl fmt::Display for InvalidAlter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Invalid op argument in alter")
}
}
impl Error for InvalidAlter {
fn description(&self) -> &str {
"Invalid op argument in alter"
}
fn cause(&self) -> Option<&Error> {
None
}
}
fn alter_error() -> InvalidAlter {
let err = InvalidAlter {};
err
}
#[derive(Debug, Clone)]
pub struct QueryErrors {
errors: Vec<DgraphError>,
}
impl fmt::Display for QueryErrors {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "QUERY_ERRORS. {:?}", self.errors)
}
}
impl Error for QueryErrors {
fn description(&self) -> &str {
"Mutation data contains errors"
}
fn cause(&self) -> Option<&Error> {
None
}
}
fn query_error(errors: Vec<DgraphError>) -> QueryErrors {
let err = QueryErrors { errors };
err
}
#[derive(Debug, Clone)]
pub struct ClientError {}
impl fmt::Display for ClientError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "REQWEST_CLIENT_ERROR")
}
}
impl Error for ClientError {
fn description(&self) -> &str {
"Cannot build reqwest http client"
}
fn cause(&self) -> Option<&Error> {
None
}
}
fn client_error() -> Box<ClientError> {
Box::new(ClientError {})
}
fn response(request: RequestBuilder) -> Result<(Value, Extensions), Box<Error>> {
let mut response = request.send()?;
let status_code = response.status();
if status_code.is_success() {
let resp: serde_json::Value = response.json()?;
match resp.get("errors") {
Some(errors) => {
let errors_value = errors.clone();
let errors: Vec<DgraphError> = serde_json::from_value(errors_value).unwrap();
Err(Box::new(query_error(errors)))
}
None => {
let extensions_value = resp.get("extensions").unwrap().clone();
let extensions: Extensions = serde_json::from_value(extensions_value).unwrap();
;
let data: Value = resp.get("data").unwrap().clone();
Ok((data, extensions))
}
}
} else {
Err(Box::new(server_error(&status_code)))
}
}
fn text_response(request: RequestBuilder) -> Result<String, Box<Error>> {
let mut response = request.send()?;
let status_code = response.status();
if status_code.is_success() {
let resp = response.text()?;
Ok(resp)
} else {
Err(Box::new(server_error(&status_code)))
}
}
fn txn_context_response(request: RequestBuilder) -> Result<TxnContext, Box<Error>> {
let mut response = request.send()?;
let status_code = response.status();
if status_code.is_success() {
let response = response.json()?;
Ok(response)
} else {
Err(Box::new(server_error(&status_code)))
}
}
#[derive(Debug)]
pub struct DgraphClientStub {
addr: String,
client: Client,
}
pub struct TlsConfig {
pub root_crt: Certificate,
pub identity: Identity,
}
impl DgraphClientStub {
fn get_url(&self, path: String) -> String {
let slash = if self.addr.ends_with("/") { "".to_string() } else { "/".to_string() };
let url = format!("{}{}{}", self.addr, slash, path);
url
}
pub fn new(addr: Option<String>, tls_config: Option<TlsConfig>) -> Result<DgraphClientStub, Box<Error>> {
let client_builder =
if let Some(tls_config) = tls_config {
let client_builder = reqwest::ClientBuilder::new();
client_builder.add_root_certificate(tls_config.root_crt).identity(tls_config.identity)
} else {
reqwest::ClientBuilder::new()
};
let client = match client_builder.gzip(true).build() {
Ok(client) => client,
Err(_e) => return Err(client_error()),
};
let addr = match addr {
None => String::from("http://localhost:8080"),
Some(a) => a,
};
Ok(DgraphClientStub {
addr,
client,
})
}
pub fn query(&self, req: Request) -> Result<(Value, Extensions), Box<Error>> {
let path = if let Some(start_ts) = req.start_ts {
format!("query/{}", start_ts)
} else {
String::from("query")
};
let request = self.client.post(&self.get_url(path));
let request = if let Some(vars) = req.vars {
let vars = match serde_json::to_string(&vars) {
Ok(header) => header,
Err(_e) => String::from("{}"),
};
request.header("X-Dgraph-Vars", vars)
} else {
request
};
let request = if let Some(data) = req.query {
request.body(data)
} else {
request
};
response(request)
}
pub fn health(&self) -> Result<String, Box<Error>> {
let request = self.client.get(&self.get_url(String::from("health")));
text_response(request)
}
pub fn abort(&self, ctx: &TxnContext) -> Result<TxnContext, Box<Error>> {
let path = format!("abort/{}", ctx.start_ts.unwrap());
let request = self.client.post(&self.get_url(path));
txn_context_response(request)
}
pub fn commit(&self, ctx: TxnContext) -> Result<TxnContext, Box<Error>> {
let path = format!("commit/{}", ctx.start_ts.unwrap());
let request = self.client.post(&self.get_url(path));
let body = if let Some(keys) = ctx.keys {
keys
} else {
Vec::new()
};
let request = request.json(&body);
txn_context_response(request)
}
pub fn mutate(&self, mu: Mutation) -> Result<Assigned, Box<Error>> {
let mut using_json = false;
let body = if mu.set_json.is_some() || mu.delete_json.is_some() {
using_json = true;
format!("{{{set}{delete}}}",
set = if let Some(set) = mu.set_json { format!("\"set\": {}", set) } else { String::from("") },
delete = if let Some(del) = mu.delete_json { format!("\"delete\": {}", del) } else { String::from("") })
} else if mu.set_n_quads.is_some() || mu.delete_n_quads.is_some() {
let mut body = String::new();
body.push_str("{");
if let Some(set_n_quads) = mu.set_n_quads {
body.push_str("set {");
body.push_str(set_n_quads.as_str());
body.push_str("}");
}
if let Some(delete_n_quads) = mu.delete_n_quads {
body.push_str("delete {");
body.push_str(delete_n_quads.as_str());
body.push_str("}");
}
body.push_str("}");
body
} else {
return Err(Box::new(mutation_error()));
};
let path = if let Some(start_ts) = mu.start_ts {
format!("mutate/{}", start_ts)
} else {
String::from("mutate")
};
let request = self.client.post(&self.get_url(path));
let request = if using_json == true {
request.header("X-Dgraph-MutationType", "json")
} else {
request
};
let request = match mu.commit_now {
Some(commit_now) => if commit_now == true {
request.header("X-Dgraph-CommitNow", "true")
} else {
request
},
None => request,
};
let request = request.body(body);
let mut response = request.send()?;
let status_code = response.status();
if status_code.is_success() {
let response = response.json()?;
Ok(response)
} else {
Err(Box::new(server_error(&status_code)))
}
}
pub fn alter(&self, op: Operation) -> Result<String, Box<Error>> {
let body = if let Some(schema) = op.schema {
schema
} else if let Some(drop_att) = op.drop_att {
let mut body = HashMap::new();
body.insert("drop_attr", drop_att.as_str());
serde_json::to_string(&body)?
} else if let Some(drop_all) = op.drop_all {
if drop_all == true {
let mut body = HashMap::new();
body.insert("drop_all", true);
serde_json::to_string(&body)?
} else {
return Err(Box::new(alter_error()));
}
} else {
return Err(Box::new(alter_error()));
};
let request = self.client.post(&self.get_url(String::from("alter")));
let request = request.body(body);
text_response(request)
}
}