#![crate_type = "lib"]
#![crate_name = "rs_es"]
#[macro_use]
extern crate log;
extern crate hyper;
extern crate rustc_serialize;
#[macro_use]
pub mod util;
pub mod error;
pub mod operations;
pub mod query;
pub mod units;
use hyper::status::StatusCode;
use hyper::header::{Headers, Authorization, Basic};
use rustc_serialize::Encodable;
use rustc_serialize::json::{self, Json};
use error::EsError;
use operations::bulk::{BulkOperation, Action};
use operations::delete::{DeleteOperation, DeleteByQueryOperation};
use operations::get::GetOperation;
use operations::index::IndexOperation;
use operations::search::{SearchURIOperation, SearchQueryOperation};
use operations::RefreshOperation;
use operations::analyze::AnalyzeOperation;
pub fn do_req(resp: &mut hyper::client::response::Response)
-> Result<(StatusCode, Option<Json>), EsError> {
info!("Response: {:?}", resp);
match resp.status {
StatusCode::Ok |
StatusCode::Created |
StatusCode::NotFound => match Json::from_reader(resp) {
Ok(json) => Ok((resp.status, Some(json))),
Err(e) => Err(EsError::from(e))
},
_ => Err(EsError::from(resp))
}
}
pub struct Client {
base_url: String,
http_client: hyper::Client,
headers: Headers
}
macro_rules! es_op {
($n:ident,$cn:ident) => {
fn $n(&mut self, url: &str)
-> Result<(StatusCode, Option<Json>), EsError> {
info!("Doing {} on {}", stringify!($n), url);
let url = self.full_url(url);
let mut result = try!(self.http_client
.$cn(&url)
.headers(self.headers.clone())
.send());
do_req(&mut result)
}
}
}
macro_rules! es_body_op {
($n:ident,$cn:ident) => {
fn $n<E>(&mut self, url: &str, body: &E)
-> Result<(StatusCode, Option<Json>), EsError>
where E: Encodable {
info!("Doing {} on {}", stringify!($n), url);
let json_string = try!(json::encode(body));
info!("Body: {}", json_string);
let url = self.full_url(url);
let mut result = try!(self.http_client
.$cn(&url)
.headers(self.headers.clone())
.body(&json_string)
.send());
do_req(&mut result)
}
}
}
impl Client {
pub fn new(host: &str, port: u32) -> Client {
Client {
base_url: format!("http://{}:{}", host, port),
http_client: hyper::Client::new(),
headers: Self::auth_from_host(host)
}
}
fn auth_from_host(host: &str) -> Headers {
let mut headers = Headers::new();
let tokens = host.split('@').collect::<Vec<&str>>();
if tokens.len() == 2 {
let auth = tokens[0].split(':').collect::<Vec<&str>>();
headers.set(
Authorization(
Basic {
username: auth[0].to_owned(),
password: Some(auth[1].to_owned())
}
)
);
}
headers
}
pub fn full_url(&self, suffix: &str) -> String {
format!("{}/{}", self.base_url, suffix)
}
es_op!(get_op, get);
es_op!(post_op, post);
es_body_op!(post_body_op, post);
es_op!(put_op, put);
es_body_op!(put_body_op, put);
es_op!(delete_op, delete);
es_body_op!(delete_body_op, delete);
pub fn version(&mut self) -> Result<String, EsError> {
let (_, result) = try!(self.get_op("/"));
let json = result.expect("No Json payload");
match json.find_path(&["version", "number"]) {
Some(version) => match version.as_string() {
Some(string) => Ok(string.to_owned()),
None => Err(EsError::EsError(format!("Cannot find version number in: {:?}",
json)))
},
None => Err(EsError::EsError(format!("Cannot find version number in {:?}",
json)))
}
}
pub fn refresh<'a>(&'a mut self) -> RefreshOperation {
RefreshOperation::new(self)
}
pub fn index<'a, 'b, E: Encodable>(&'a mut self, index: &'b str, doc_type: &'b str)
-> IndexOperation<'a, 'b, E> {
IndexOperation::new(self, index, doc_type)
}
pub fn get<'a>(&'a mut self,
index: &'a str,
id: &'a str) -> GetOperation {
GetOperation::new(self, index, id)
}
pub fn delete<'a>(&'a mut self,
index: &'a str,
doc_type: &'a str,
id: &'a str) -> DeleteOperation {
DeleteOperation::new(self, index, doc_type, id)
}
pub fn delete_by_query<'a>(&'a mut self) -> DeleteByQueryOperation {
DeleteByQueryOperation::new(self)
}
pub fn bulk<'a, 'b>(&'a mut self, actions: &'b [Action]) -> BulkOperation<'a, 'b> {
BulkOperation::new(self, actions)
}
pub fn analyze<'a>(&'a mut self,
body: &'a str) -> AnalyzeOperation {
AnalyzeOperation::new(self, body)
}
pub fn search_uri<'a>(&'a mut self) -> SearchURIOperation {
SearchURIOperation::new(self)
}
pub fn search_query<'a>(&'a mut self) -> SearchQueryOperation {
SearchQueryOperation::new(self)
}
}
#[cfg(test)]
pub mod tests {
extern crate env_logger;
extern crate regex;
use std::collections::BTreeMap;
use std::env;
use rustc_serialize::json::{Json, ToJson};
use super::Client;
use super::operations::bulk::Action;
use super::operations::index::OpType;
use super::query::{Filter, Query};
use self::regex::Regex;
pub fn make_client() -> Client {
let hostname = match env::var("ES_HOST") {
Ok(val) => val,
Err(_) => "localhost".to_owned()
};
Client::new(&hostname, 9200)
}
#[derive(Debug, RustcDecodable, RustcEncodable)]
pub struct TestDocument {
pub str_field: String,
pub int_field: i64,
pub bool_field: bool
}
impl TestDocument {
pub fn new() -> TestDocument {
TestDocument {
str_field: "I am a test".to_owned(),
int_field: 1,
bool_field: true
}
}
pub fn with_str_field(mut self, s: &str) -> TestDocument {
self.str_field = s.to_owned();
self
}
pub fn with_int_field(mut self, i: i64) -> TestDocument {
self.int_field = i;
self
}
pub fn with_bool_field(mut self, b: bool) -> TestDocument {
self.bool_field = b;
self
}
}
impl ToJson for TestDocument {
fn to_json(&self) -> Json {
let mut d = BTreeMap::new();
d.insert("str_field".to_owned(), self.str_field.to_json());
d.insert("int_field".to_owned(), self.int_field.to_json());
d.insert("bool_field".to_owned(), self.bool_field.to_json());
Json::Object(d)
}
}
pub fn clean_db(client: &mut Client,
test_idx: &str) {
client.delete_by_query()
.with_indexes(&[test_idx])
.with_query(&Query::build_match_all().build())
.send()
.unwrap();
}
#[test]
fn it_works() {
env_logger::init().unwrap();
let mut client = make_client();
let result = client.version().unwrap();
let expected_regex = Regex::new(r"^\d\.\d\.\d$").unwrap();
assert_eq!(expected_regex.is_match(&result), true);
}
#[test]
fn test_indexing() {
let index_name = "test_indexing";
let mut client = make_client();
clean_db(&mut client, index_name);
{
let result_wrapped = client
.index(index_name, "test_type")
.with_doc(&TestDocument::new().with_int_field(1))
.with_ttl(927500)
.send();
info!("TEST RESULT: {:?}", result_wrapped);
let result = result_wrapped.unwrap();
assert_eq!(result.created, true);
assert_eq!(result.index, index_name);
assert_eq!(result.doc_type, "test_type");
assert!(result.id.len() > 0);
assert_eq!(result.version, 1);
}
{
let delete_result = client.delete(index_name, "test_type", "TEST_INDEXING_2").send();
info!("DELETE RESULT: {:?}", delete_result);
let result_wrapped = client
.index(index_name, "test_type")
.with_doc(&TestDocument::new().with_int_field(2))
.with_id("TEST_INDEXING_2")
.with_op_type(OpType::Create)
.send();
let result = result_wrapped.unwrap();
assert_eq!(result.created, true);
assert_eq!(result.index, index_name);
assert_eq!(result.doc_type, "test_type");
assert_eq!(result.id, "TEST_INDEXING_2");
assert!(result.version >= 1);
}
}
#[test]
fn test_get() {
let index_name = "test_get";
let mut client = make_client();
clean_db(&mut client, index_name);
{
let doc = TestDocument::new().with_int_field(3)
.with_bool_field(false);
client
.index(index_name, "test_type")
.with_id("TEST_GETTING")
.with_doc(&doc)
.send().unwrap();
}
{
let mut getter = client.get(index_name, "TEST_GETTING");
let result_wrapped = getter
.with_doc_type("test_type")
.send();
info!("RESULT: {:?}", result_wrapped);
let result = result_wrapped.unwrap();
assert_eq!(result.id, "TEST_GETTING");
let source:TestDocument = result.source().unwrap();
assert_eq!(source.str_field, "I am a test");
assert_eq!(source.int_field, 3);
assert_eq!(source.bool_field, false);
}
}
#[test]
fn test_delete_by_query() {
let index_name = "test_delete_by_query";
let mut client = make_client();
clean_db(&mut client, index_name);
let td1 = TestDocument::new().with_str_field("TEST DOC 1").with_int_field(100);
let td2 = TestDocument::new().with_str_field("TEST DOC 2").with_int_field(200);
client
.index(index_name, "test_type")
.with_id("ABC123")
.with_doc(&td1)
.send().unwrap();
client
.index(index_name, "test_type")
.with_id("ABC124")
.with_doc(&td2)
.send().unwrap();
let delete_result = client
.delete_by_query()
.with_indexes(&[index_name])
.with_doc_types(&["test_type"])
.with_query(&Query::build_match("int_field", 200)
.with_lenient(false)
.build())
.send().unwrap();
assert!(delete_result.unwrap().successful());
let doc1 = client.get(index_name, "ABC123").with_doc_type("test_type").send().unwrap();
let doc2 = client.get(index_name, "ABC124").with_doc_type("test_type").send().unwrap();
assert!(doc1.found);
assert!(!doc2.found);
}
fn setup_search_test_data(client: &mut Client, index_name: &str) {
let documents = vec![
TestDocument::new().with_str_field("Document A123").with_int_field(1),
TestDocument::new().with_str_field("Document B456").with_int_field(2),
TestDocument::new().with_str_field("Document 1ABC").with_int_field(3)
];
for ref doc in documents {
client.index(index_name, "test_type")
.with_doc(doc)
.send()
.unwrap();
}
client.refresh().with_indexes(&[index_name]).send().unwrap();
}
#[test]
fn test_search_uri() {
let index_name = "test_search_uri";
let mut client = make_client();
clean_db(&mut client, index_name);
setup_search_test_data(&mut client, index_name);
let all_results = client.search_uri().with_indexes(&[index_name]).send().unwrap();
assert_eq!(3, all_results.hits.total);
let doc_a = client
.search_uri()
.with_indexes(&[index_name])
.with_query("A123")
.send()
.unwrap();
assert_eq!(1, doc_a.hits.total);
let doc_1 = client
.search_uri()
.with_indexes(&[index_name])
.with_query("str_field:1ABC")
.send()
.unwrap();
assert_eq!(1, doc_1.hits.total);
let limited_fields = client
.search_uri()
.with_indexes(&[index_name])
.with_query("str_field:B456")
.with_fields(&["int_field"])
.send()
.unwrap();
assert_eq!(1, limited_fields.hits.total);
}
#[test]
fn test_search_body() {
let index_name = "test_search_body";
let mut client = make_client();
clean_db(&mut client, index_name);
setup_search_test_data(&mut client, index_name);
let all_results = client
.search_query()
.with_indexes(&[index_name])
.with_query(&Query::build_match_all().build())
.send().unwrap();
assert_eq!(3, all_results.hits.total);
let within_range = client
.search_query()
.with_indexes(&[index_name])
.with_query(&Query::build_filtered(Filter::build_range("int_field")
.with_gte(2)
.with_lte(3)
.build())
.build())
.send().unwrap();
assert_eq!(2, within_range.hits.total);
}
#[test]
fn test_bulk() {
let index_name = "test_bulk";
let mut client = make_client();
clean_db(&mut client, index_name);
let actions:Vec<Action> = (1..10).map(|i| {
let doc = TestDocument::new().with_str_field("bulk_doc").with_int_field(i);
Action::index(doc)
}).collect();
let result = client.bulk(&actions)
.with_index(index_name)
.with_doc_type("bulk_type")
.send()
.unwrap();
assert_eq!(false, result.errors);
assert_eq!(9, result.items.len());
}
}