extern crate serde;
extern crate hyper;
extern crate serde_json;
extern crate rand;
use dbcluster::DBCluster;
use self::serde_json::Value;
use self::serde::ser::Serialize;
use error::CrateDBError;
use rowiterator::RowIterator;
use std::collections::HashMap;
use std::convert::Into;
use backend::{Backend, BackendResult};
use dbcluster::{Loadbalancing, EndpointType};
#[derive(Serialize)]
pub struct Nothing {}
trait Executor {
fn execute<SQL, S>(&self,
sql: SQL,
bulk: bool,
params: Option<Box<S>>)
-> (BackendResult, String)
where SQL: Into<String>,
S: Serialize;
}
pub trait QueryRunner {
fn query<SQL, S>(&self,
sql: SQL,
params: Option<Box<S>>)
-> Result<(f64, RowIterator), CrateDBError>
where SQL: Into<String>,
S: Serialize;
fn bulk_query<SQL, S>(&self,
sql: SQL,
params: Box<S>)
-> Result<(f64, Vec<i64>), CrateDBError>
where SQL: Into<String>,
S: Serialize;
}
impl<T: Backend + Sized> Executor for DBCluster<T> {
fn execute<SQL, S>(&self,
sql: SQL,
bulk: bool,
params: Option<Box<S>>)
-> (BackendResult, String)
where SQL: Into<String>,
S: Serialize
{
let url = self.get_endpoint(EndpointType::SQL);
let json_query = if bulk {
json!({
"stmt": sql.into(),
"bulk_args": serde_json::to_value(params.unwrap()).unwrap()
})
.to_string()
} else if let Some(p) = params {
json!({
"stmt": sql.into(),
"args": serde_json::to_value(p).unwrap()
})
.to_string()
} else {
json!({
"stmt": sql.into()
})
.to_string()
};
match self.backend.execute(url, json_query) {
Ok(r) => r,
Err(e) => (BackendResult::Error, e.description),
}
}
}
fn extract_error(data: &Value) -> CrateDBError {
let message = data.pointer("/error/message").unwrap().as_str().unwrap();
let code = data.pointer("/error/code")
.unwrap()
.as_i64()
.unwrap()
.to_string();
CrateDBError::new(message, code)
}
impl<T: Backend + Sized> QueryRunner for DBCluster<T> {
fn query<SQL, S>(&self,
sql: SQL,
params: Option<Box<S>>)
-> Result<(f64, RowIterator), CrateDBError>
where SQL: Into<String>,
S: Serialize
{
let (result, body) = self.execute(sql, false, params);
if let Ok(raw) = serde_json::from_str(&body) {
let data: Value = raw;
match result {
BackendResult::NotFound |
BackendResult::NotAuthorized |
BackendResult::Timeout |
BackendResult::Error => Err(extract_error(&data)),
BackendResult::Ok => {
if let Some(cols) = data.pointer("/cols")
.and_then(|v| v.as_array())
.and_then(|cols_raw| {
let mut cols = HashMap::with_capacity(cols_raw.len());
for (i, c) in cols_raw.iter().enumerate() {
let _ = match *c {
Value::String(ref name) => cols.insert(name.to_owned(), i),
_ => None,
};
}
Some(cols)
}) {
let rows = data.pointer("/rows").unwrap().as_array().unwrap();
let duration = data.pointer("/duration").unwrap().as_f64().unwrap();
Ok((duration, RowIterator::new(rows.clone(), cols)))
} else {
Err(CrateDBError::new("Invalid JSON returned", "401"))
}
}
}
} else {
Err(CrateDBError::new(format!("{}: {}", "Invalid JSON was returned", body),
format!("{}", result as u8)))
}
}
fn bulk_query<SQL, S>(&self, sql: SQL, params: Box<S>) -> Result<(f64, Vec<i64>), CrateDBError>
where SQL: Into<String>,
S: Serialize
{
let (result, body) = self.execute(sql, true, Some(params));
if let Ok(raw) = serde_json::from_str(&body) {
let data: Value = raw;
return match data.pointer("/cols") {
Some(_) => {
let bulk_results = data.pointer("/results").unwrap().as_array().unwrap();
let rowcounts = bulk_results
.into_iter()
.map(|v| v.pointer("/rowcount").unwrap().as_i64().unwrap())
.collect();
let duration = data.pointer("/duration").unwrap().as_f64().unwrap();
Ok((duration, rowcounts))
}
None => Err(extract_error(&data)),
};
}
Err(CrateDBError::new(format!("{}: {}", "Invalid JSON was returned", body),
format!("{}", result as u8)))
}
}