rs-es 0.3.0

Client for the ElasticSearch REST API
/*
 * Copyright 2015-2016 Ben Ashford
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#[macro_use]
pub mod util;

#[macro_use]
pub mod json;

pub mod error;
pub mod operations;
pub mod query;
pub mod units;

use hyper::client;
use hyper::status::StatusCode;
use hyper::header::{Headers, Authorization, Basic};

use serde::ser::Serialize;
use serde::de::Deserialize;

use error::EsError;

pub trait EsResponse {
    fn status_code<'a>(&'a self) -> &'a StatusCode;
    fn read_response<R>(mut self) -> Result<R, EsError> where R: Deserialize;
}

impl EsResponse for client::response::Response {
    fn status_code<'a>(&'a self) -> &'a StatusCode {
        &self.status
    }

    fn read_response<R>(self) -> Result<R, EsError>
        where R: Deserialize {

        Ok(try!(serde_json::from_reader(self)))
    }
}

// The client

/// Process the result of an HTTP request, returning the status code and the
/// `Json` result (if the result had a body) or an `EsError` if there were any
/// errors
///
/// This function is exposed to allow extensions to certain operations, it is
/// not expected to be used by consumers of the library
pub fn do_req(resp: client::response::Response) -> Result<client::response::Response, EsError> {
    let mut resp = resp;
    let status = resp.status;
    match status {
        StatusCode::Ok |
        StatusCode::Created |
        StatusCode::NotFound => Ok(resp),
        _                    => Err(EsError::from(&mut resp))
    }
}

/// The core of the ElasticSearch client, owns a HTTP connection.
///
/// Each instance of `Client` is reusable, but only one thread can use each one
/// at once.  This will be enforced by the borrow-checker as most methods are
/// defined on `&mut self`.
///
/// To create a `Client`, the hostname and port need to be specified.
///
/// Each ElasticSearch API operation is defined as a method on `Client`.  Any
/// compulsory parameters must be given as arguments to this method.  It returns
/// an operation builder that can be used to add any optional parameters.
///
/// Finally `send` is called to submit the operation:
///
/// # Examples
///
/// ```
/// use rs_es::Client;
///
/// let mut client = Client::new("localhost", 9200);
/// ```
///
/// See the specific operations and their builder objects for details.
pub struct Client {
    base_url:    String,
    http_client: hyper::Client,
    headers:     Headers
}

/// Create a HTTP function for the given method (GET/PUT/POST/DELETE)
macro_rules! es_op {
    ($n:ident,$cn:ident) => {
        fn $n(&mut self, url: &str) -> Result<client::response::Response, EsError> {
            info!("Doing {} on {}", stringify!($n), url);
            let url = self.full_url(url);
            let result = try!(self.http_client
                              .$cn(&url)
                              .headers(self.headers.clone())
                              .send());
            do_req(result)
        }
    }
}

/// Create a HTTP function with a request body for the given method
/// (GET/PUT/POST/DELETE)
///
macro_rules! es_body_op {
    ($n:ident,$cn:ident) => {
        fn $n<E>(&mut self, url: &str, body: &E) -> Result<client::response::Response, EsError>
            where E: Serialize {

            info!("Doing {} on {}", stringify!($n), url);
            let json_string = try!(serde_json::to_string(body));
            let url = self.full_url(url);
            let result = try!(self.http_client
                              .$cn(&url)
                              .headers(self.headers.clone())
                              .body(&json_string)
                              .send());

            do_req(result)
        }
    }
}

impl Client {
    /// Create a new client
    pub fn new(host: &str, port: u32) -> Client {
        Client {
            base_url:    format!("http://{}:{}", host, port),
            http_client: hyper::Client::new(),
            headers:     Self::auth_from_host(host)
        }
    }

    /// Add headers for the basic authentication to every request
    /// when given host's format is `USER:PASS@HOST`.
    fn auth_from_host(host: &str) -> Headers {
        let mut headers = Headers::new();

        let tokens = host.split('@').collect::<Vec<&str>>();
        if tokens.len() == 2 {
            let auth = tokens[0].split(':').collect::<Vec<&str>>();

            headers.set(
               Authorization(
                   Basic {
                       username: auth[0].to_owned(),
                       password: Some(auth[1].to_owned())
                   }
               )
            );
        }

        headers
    }

    /// Take a nearly complete ElasticSearch URL, and stick the host/port part
    /// on the front.
    pub fn full_url(&self, suffix: &str) -> String {
        format!("{}/{}", self.base_url, suffix)
    }

    es_op!(get_op, get);

    es_op!(post_op, post);
    es_body_op!(post_body_op, post);
    es_op!(put_op, put);
    es_body_op!(put_body_op, put);
    es_op!(delete_op, delete);
}

#[cfg(test)]
pub mod tests {
    extern crate env_logger;
    pub extern crate regex;

    use std::env;

    use serde_json::Value;

    use super::Client;
    use super::operations::bulk::Action;
    use super::operations::search::ScanResult;

    use super::query::Query;

    use super::units::Duration;

    // test setup

    pub fn make_client() -> Client {
        let hostname = match env::var("ES_HOST") {
            Ok(val) => val,
            Err(_)  => "localhost".to_owned()
        };
        Client::new(&hostname, 9200)
    }

    #[derive(Debug, Serialize, Deserialize)]
    pub struct TestDocument {
        pub str_field:  String,
        pub int_field:  i64,
        pub bool_field: bool
    }

    impl TestDocument {
        pub fn new() -> TestDocument {
            TestDocument {
                str_field: "I am a test".to_owned(),
                int_field: 1,
                bool_field: true
            }
        }

        pub fn with_str_field(mut self, s: &str) -> TestDocument {
            self.str_field = s.to_owned();
            self
        }

        pub fn with_int_field(mut self, i: i64) -> TestDocument {
            self.int_field = i;
            self
        }

        pub fn with_bool_field(mut self, b: bool) -> TestDocument {
            self.bool_field = b;
            self
        }
    }

    pub fn clean_db(mut client: &mut Client,
                    test_idx: &str) {
        let scroll = Duration::minutes(1);
        let mut scan:ScanResult<Value> = match client.search_query()
            .with_indexes(&[test_idx])
            .with_query(&Query::build_match_all().build())
            .scan(&scroll) {
                Ok(scan) => scan,
                Err(e) => {
                    warn!("Scan error: {:?}", e);
                    return // Ignore not-found errors
                }
            };

        loop {
            let page = scan.scroll(&mut client, &scroll).unwrap();
            let mut hits = page.hits.hits;
            if hits.len() == 0 {
                break;
            }
            let actions: Vec<Action<()>> = hits.drain(..)
                .map(|hit| {
                    Action::delete(hit.id)
                        .with_index(test_idx)
                        .with_doc_type(hit.doc_type)
                })
                .collect();
            client.bulk(&actions).send().unwrap();
        }

        scan.close(&mut client).unwrap();
    }
}