rs_es/
lib.rs

1/*
2 * Copyright 2015-2019 Ben Ashford
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//! A client for ElasticSearch's REST API
18//!
19//! The `Client` itself is used as the central access point, from which numerous
20//! operations are defined implementing each of the specific ElasticSearch APIs.
21//!
22//! Warning: at the time of writing the majority of such APIs are currently
23//! unimplemented.
24
25#[cfg(test)]
26#[macro_use]
27extern crate doc_comment;
28
29#[cfg(test)]
30doctest!("../README.md");
31
32#[macro_use]
33pub mod util;
34
35#[macro_use]
36pub mod json;
37
38pub mod error;
39pub mod operations;
40pub mod query;
41pub mod units;
42
43use std::time;
44
45use reqwest::{header::CONTENT_TYPE, RequestBuilder, StatusCode, Url};
46
47use serde::{de::DeserializeOwned, ser::Serialize};
48
49use crate::error::EsError;
50
51pub trait EsResponse {
52    fn status_code(&self) -> StatusCode;
53    fn read_response<R>(self) -> Result<R, EsError>
54    where
55        R: DeserializeOwned;
56}
57
58impl EsResponse for reqwest::Response {
59    fn status_code(&self) -> StatusCode {
60        self.status()
61    }
62
63    fn read_response<R>(self) -> Result<R, EsError>
64    where
65        R: DeserializeOwned,
66    {
67        Ok(serde_json::from_reader(self)?)
68    }
69}
70
71// The client
72
73/// Process the result of an HTTP request, returning the status code and the
74/// `Json` result (if the result had a body) or an `EsError` if there were any
75/// errors
76///
77/// This function is exposed to allow extensions to certain operations, it is
78/// not expected to be used by consumers of the library
79fn do_req(resp: reqwest::Response) -> Result<reqwest::Response, EsError> {
80    let mut resp = resp;
81    let status = resp.status();
82    match status {
83        StatusCode::OK | StatusCode::CREATED | StatusCode::NOT_FOUND => Ok(resp),
84        _ => Err(EsError::from(&mut resp)),
85    }
86}
87
88/// The core of the ElasticSearch client, owns a HTTP connection.
89///
90/// Each instance of `Client` is reusable, but only one thread can use each one
91/// at once.  This will be enforced by the borrow-checker as most methods are
92/// defined on `&mut self`.
93///
94/// To create a `Client`, the URL needs to be specified.
95///
96/// Each ElasticSearch API operation is defined as a method on `Client`.  Any
97/// compulsory parameters must be given as arguments to this method.  It returns
98/// an operation builder that can be used to add any optional parameters.
99///
100/// Finally `send` is called to submit the operation:
101///
102/// # Examples
103///
104/// ```
105/// use rs_es::Client;
106///
107/// let mut client = Client::init("http://localhost:9200");
108/// ```
109///
110/// See the specific operations and their builder objects for details.
111#[derive(Debug, Clone)]
112pub struct Client {
113    base_url: Url,
114    http_client: reqwest::Client,
115}
116
117impl Client {
118    fn do_es_op(
119        &self,
120        url: &str,
121        action: impl FnOnce(Url) -> RequestBuilder,
122    ) -> Result<reqwest::Response, EsError> {
123        let url = self.full_url(url);
124        let username = self.base_url.username();
125        let mut method = action(url);
126        if !username.is_empty() {
127            method = method.basic_auth(username, self.base_url.password());
128        }
129        let result = method.header(CONTENT_TYPE, "application/json").send()?;
130        do_req(result)
131    }
132}
133
134/// Create a HTTP function for the given method (GET/PUT/POST/DELETE)
135macro_rules! es_op {
136    ($n:ident,$cn:ident) => {
137        fn $n(&self, url: &str) -> Result<reqwest::Response, EsError> {
138            log::info!("Doing {} on {}", stringify!($n), url);
139            self.do_es_op(url, |url| self.http_client.$cn(url.clone()))
140        }
141    }
142}
143
144/// Create a HTTP function with a request body for the given method
145/// (GET/PUT/POST/DELETE)
146///
147macro_rules! es_body_op {
148    ($n:ident,$cn:ident) => {
149        fn $n<E>(&mut self, url: &str, body: &E) -> Result<reqwest::Response, EsError>
150            where E: Serialize {
151
152            log::info!("Doing {} on {}", stringify!($n), url);
153            let json_string = serde_json::to_string(body)?;
154            log::debug!("With body: {}", &json_string);
155
156            self.do_es_op(url, |url| {
157                self.http_client.$cn(url.clone()).body(json_string)
158            })
159        }
160    }
161}
162
163impl Client {
164    /// Create a new client
165    pub fn init(url_s: &str) -> Result<Client, reqwest::UrlError> {
166        let url = Url::parse(url_s)?;
167
168        Ok(Client {
169            http_client: reqwest::Client::new(),
170            base_url: url,
171        })
172    }
173
174    // TODO - this should be replaced with a builder object, especially if more options are going
175    // to be allowed
176    pub fn init_with_timeout(
177        url_s: &str,
178        timeout: Option<time::Duration>,
179    ) -> Result<Client, reqwest::UrlError> {
180        let url = Url::parse(url_s)?;
181
182        Ok(Client {
183            http_client: reqwest::Client::builder()
184                .timeout(timeout)
185                .build()
186                .expect("Failed to build client"),
187            base_url: url,
188        })
189    }
190
191    /// Take a nearly complete ElasticSearch URL, and stick
192    /// the URL on the front.
193    pub fn full_url(&self, suffix: &str) -> Url {
194        self.base_url.join(suffix).expect("Invalid URL created")
195    }
196
197    es_op!(get_op, get);
198
199    es_op!(post_op, post);
200    es_body_op!(post_body_op, post);
201    es_op!(put_op, put);
202    es_body_op!(put_body_op, put);
203    es_op!(delete_op, delete);
204}
205
206#[cfg(test)]
207pub mod tests {
208    use std::env;
209
210    use serde::{Deserialize, Serialize};
211
212    use super::{error::EsError, Client};
213
214    // test setup
215
216    pub fn make_client() -> Client {
217        let hostname = match env::var("ES_HOST") {
218            Ok(val) => val,
219            Err(_) => "http://localhost:9200".to_owned(),
220        };
221        Client::init(&hostname).unwrap()
222    }
223
224    #[derive(Debug, Serialize, Deserialize)]
225    pub struct TestDocument {
226        pub str_field: String,
227        pub int_field: i64,
228        pub bool_field: bool,
229    }
230
231    #[allow(clippy::new_without_default)]
232    impl TestDocument {
233        pub fn new() -> TestDocument {
234            TestDocument {
235                str_field: "I am a test".to_owned(),
236                int_field: 1,
237                bool_field: true,
238            }
239        }
240
241        pub fn with_str_field(mut self, s: &str) -> TestDocument {
242            self.str_field = s.to_owned();
243            self
244        }
245
246        pub fn with_int_field(mut self, i: i64) -> TestDocument {
247            self.int_field = i;
248            self
249        }
250
251        pub fn with_bool_field(mut self, b: bool) -> TestDocument {
252            self.bool_field = b;
253            self
254        }
255    }
256
257    pub fn setup_test_data(client: &mut Client, index_name: &str) {
258        // TODO - this should use the Bulk API
259        let documents = vec![
260            TestDocument::new()
261                .with_str_field("Document A123")
262                .with_int_field(1),
263            TestDocument::new()
264                .with_str_field("Document B456")
265                .with_int_field(2),
266            TestDocument::new()
267                .with_str_field("Document 1ABC")
268                .with_int_field(3),
269        ];
270        for doc in documents.iter() {
271            client
272                .index(index_name, "test_type")
273                .with_doc(doc)
274                .send()
275                .unwrap();
276        }
277        client.refresh().with_indexes(&[index_name]).send().unwrap();
278    }
279
280    pub fn clean_db(client: &mut Client, test_idx: &str) {
281        match client.delete_index(test_idx) {
282            // Ignore indices which don't exist yet
283            Err(EsError::EsError(ref msg)) if msg == "Unexpected status: 404 Not Found" => {}
284            Ok(_) => {}
285            e => {
286                e.unwrap_or_else(|_| panic!("Failed to clean db for index {:?}", test_idx));
287            }
288        };
289    }
290}