use std::fs::File;
use std::ops::Deref;
use cloneable_file::CloneableFile;
use http::StatusCode;
use mockall_double::double;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use url;
use url::Url;
#[double]
use http_client::HttpClient;
use crate::cursor::Cursor;
use crate::error::RSolrError;
use crate::solr_response::SolrResponse;
pub mod error;
pub mod solr_response;
pub mod query;
pub mod cursor;
mod facet_fields;
mod http_client;
#[derive(Clone, Debug)]
pub enum Payload {
JsonBody(Value),
CsvBody(CloneableFile),
Empty,
None
}
#[non_exhaustive]
pub struct RequestHandlers;
impl RequestHandlers {
pub const QUERY: &'static str = "select";
pub const UPLOAD_JSON: &'static str = "update/json/docs";
pub const UPLOAD_CSV: &'static str = "update/csv";
pub const DELETE: &'static str = "update";
}
#[derive(Clone, Debug)]
pub struct Client<'a> {
request_handler: &'a str,
url: Url,
payload: Payload,
collection: &'a str,
response: Option<Value>
}
impl<'a> Client<'a> {
pub fn new(base_url: &str, collection: &'a str) -> Self {
let url = Url::parse(base_url).unwrap();
Client { request_handler: "", url, payload: Payload::None, collection, response: None }
}
pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
self.url.query_pairs_mut().append_pair(key, value);
self
}
pub fn facet_field(&mut self, field: &str) -> &mut Self {
self.switch_on_facet();
self.url
.query_pairs_mut()
.append_pair("facet_field", field);
self
}
pub fn facet_query(&mut self, query: &str) -> &mut Self {
self.switch_on_facet();
self.url
.query_pairs_mut()
.append_pair("facet_query", query);
self
}
pub fn request_handler(&mut self, handler: &'a str) -> &mut Self {
self.request_handler = handler;
self.payload = Payload::None;
self.url.path_segments_mut().unwrap()
.clear()
.push("solr")
.push(self.collection)
.push(self.request_handler);
self
}
pub fn auto_commit(&mut self) -> &mut Self {
self.add_query_param("commit", "true")
}
pub fn start(&mut self, start: u32) -> &mut Self {
self.add_query_param("start", &start.to_string())
}
pub fn update_cursor_mark(&mut self, cursor_mark: &str) -> &mut Self {
let url = self.url.clone();
let query = url.query().expect("Query part is required.");
let regex = Regex::new(r"(cursorMark=)(\w|\*)").unwrap();
let replace = format!("${{1}}{}", cursor_mark);
let updated = regex.replace(query, replace.as_str());
self.url.set_query(Some(updated.deref()));
self
}
pub fn url(&mut self, url: &str) -> &mut Self {
self.url = Url::parse(url).expect("Url parse failed.");
self
}
pub fn sort(&mut self, sort: &str) -> &mut Self {
self.add_query_param("sort", sort)
}
pub fn cursor(&mut self) -> &mut Self {
self.add_query_param("cursorMark", "*")
}
pub fn rows(&mut self, rows: u32) -> &mut Self {
self.add_query_param("rows", &rows.to_string())
}
pub fn query(&mut self, query: &str) -> &mut Self {
self.add_query_param("q", query)
}
pub fn default_field(&mut self, default_field: &str) -> &mut Self {
self.add_query_param("df", default_field)
}
pub fn url_str(&self) -> &str {
self.url.as_str()
}
pub fn set_json_document<P : Clone + Serialize>(&mut self, document: P) -> &mut Self {
self.payload(Payload::JsonBody(serde_json::to_value::<P>(document).unwrap()))
}
pub fn set_empty_payload(&mut self) -> &mut Self {
self.payload(Payload::Empty)
}
pub fn clear_payload(&mut self) -> &mut Self {
self.payload(Payload::None)
}
pub fn run(&mut self) -> Result<Option<Cursor>, RSolrError> {
let http_result = match &self.payload {
Payload::JsonBody(body) => HttpClient::new().post_json(self.url_str(), Some(body)),
Payload::Empty => HttpClient::new().post_json(self.url_str(), None),
Payload::None => HttpClient::new().get(self.url_str()),
Payload::CsvBody(file) => HttpClient::new().post_file_reader(self.url_str(), file.to_owned())
};
let http_response = match http_result {
Ok(response) => response,
Err(e) => return Err(RSolrError::Network { source: e }),
};
match http_response.status() {
StatusCode::OK => {
self.response = http_response.json::<Value>().ok();
match self.url.query().unwrap_or("no url").contains("cursorMark") {
true => {
let cursor_mark = self.get_response::<Value>().unwrap().nextCursorMark.unwrap();
let cursor = Cursor::new(self.clone(), cursor_mark);
self.url.query_pairs_mut().clear();
Ok(Some(cursor))
},
false => {
self.url.query_pairs_mut().clear();
Ok(None)
}
}
},
StatusCode::NOT_FOUND => Err(RSolrError::NotFound),
other_status => {
let body_text = http_response.text().unwrap();
match serde_json::from_str::<Value>(&body_text) {
Ok(r) => Err(RSolrError::Syntax(r["error"]["msg"].as_str().unwrap().to_owned())),
Err(e) => {
Err( RSolrError::Other { source: Box::new(e), status: other_status, body_text })
}
}
}
}
}
pub fn get_response<T: for<'de> Deserialize<'de> + Clone + Default>(&self) -> Result<SolrResponse<T>, RSolrError>{
match &self.response {
Some(v) => match serde_json::from_value(v.to_owned()) {
Ok(response) => Ok(response),
Err(e) => Err(RSolrError::Serialization(e.to_string()) )
},
_ => Ok(SolrResponse::default())
}
}
pub fn select(&mut self, query: &str) -> &mut Self {
self
.request_handler(RequestHandlers::QUERY)
.query(query)
}
#[deprecated(since = "0.3.2", note = "Use upload_json instead.")]
pub fn create<P: Serialize + Clone>(&mut self, document: P) -> &mut Self {
self.upload_json(document)
}
pub fn upload_json<P: Serialize + Clone>(&mut self, document: P) -> &mut Self {
self
.request_handler(RequestHandlers::UPLOAD_JSON)
.set_json_document::<P>(document)
}
pub fn upload_csv(&mut self, file: File) -> &mut Self {
self
.request_handler(RequestHandlers::UPLOAD_CSV)
.set_csv_file(file)
}
pub fn set_csv_file(&mut self, file: File) -> &mut Self {
let cloneable_file = CloneableFile::from(file);
self.payload(Payload::CsvBody(cloneable_file))
}
pub fn delete(&mut self, query: &str) -> &mut Self {
let delete_payload = json!({
"delete": { "query": query }
});
self
.request_handler(RequestHandlers::DELETE)
.set_json_document(delete_payload)
}
pub fn commit(&mut self) -> &mut Self {
self
.request_handler("update")
.auto_commit()
.set_empty_payload()
}
pub fn dismax(&mut self) -> &mut Self {
self.add_query_param("defType", "dismax")
}
pub fn edismax(&mut self) -> &mut Self {
self.add_query_param("defType", "edismax")
}
fn switch_on_facet(&mut self) {
for query_pair in self.url.query_pairs() {
if query_pair.0 == "facet" && query_pair.1 == "on" {
return
}
}
self.url.query_pairs_mut().append_pair("facet", "on");
}
fn payload(&mut self, payload: Payload) -> &mut Self {
self.payload = payload;
self
}
}
#[cfg(test)]
mod tests {
use std::sync::{Mutex, MutexGuard};
use mockall::lazy_static;
use mockall::predicate::eq;
use serde_json::json;
use super::*;
lazy_static! {
static ref MTX: Mutex<()> = Mutex::new(());
}
fn get_lock(m: &'static Mutex<()>) -> MutexGuard<'static, ()> {
match m.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
fn setup_get_mock(url: &'static str, status_code: u16, body: &'static str) -> HttpClient {
let mut mock = HttpClient::default();
mock.expect_get()
.with(eq(url))
.returning(move |_| Ok(
reqwest::blocking::Response::from(http::response::Builder::new()
.status(status_code)
.body(body)
.unwrap()))
);
mock
}
#[test]
fn build_a_url_from_parameters() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.query("*:*");
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?q=*%3A*");
}
#[test]
fn build_a_url_from_parameters_set_autocommit() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.auto_commit();
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?commit=true");
}
#[test]
fn build_a_url_with_start_and_rows() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.start(135545)
.rows(12);
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?start=135545&rows=12");
}
#[test]
fn build_a_url_with_default_field() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.default_field("defaultfield");
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?df=defaultfield");
}
#[test]
fn url_built_with_facet_if_facet_fields_set() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.facet_field("facetfield");
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?facet=on&facet_field=facetfield");
}
#[test]
fn url_built_with_facet_if_facet_query_set() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.facet_query("facet");
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?facet=on&facet_query=facet");
}
#[test]
fn url_built_with_facet_correctly_if_both_set() {
let mut params = Client::new("http://host:8983", "collection");
params
.request_handler("request_handler")
.facet_field("facetfield")
.facet_query("facet");
let url_string = params.url_str();
assert_eq!(url_string, "http://host:8983/solr/collection/request_handler?facet=on&facet_field=facetfield&facet_query=facet");
}
#[test]
fn run_formats_url_and_result() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_get()
.with(eq("http://localhost:8983/solr/default/select?q=*%3A*"))
.returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]}}"#)
.unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut command = Client::new(host, collection);
let result = command
.request_handler("select")
.query("*:*")
.run();
assert!(result.is_ok());
assert_eq!(command.get_response::<Value>().unwrap().response.unwrap().docs[0]["success"], true);
}
#[test]
fn run_handles_facet_fields() {
let _m = get_lock(&MTX);
let body = r#"{
"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]},
"facet_counts": {
"facet_queries": {},
"facet_fields": {
"exists": [
"term1", 23423, "term2", 993939
]
},
"facet_ranges":{},
"facet_intervals":{},
"facet_heatmaps":{}
}
}"#;
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
setup_get_mock("http://localhost:8983/solr/default/select?q=*%3A*&facet=on&facet_field=exists", 200, body)
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.request_handler("select")
.query("*:*")
.facet_field("exists")
.run();
assert!(result.is_ok());
let facets = client.get_response::<Value>().unwrap().facet_counts.unwrap();
assert_eq!(facets.facet_fields.fields, serde_json::from_str::<Value>(r#"{"exists":["term1", 23423,"term2",993939]}"#).unwrap());
}
#[test]
fn run_handles_facet_query_and_returns_unimplemented_facets_in_raw() {
let _m = get_lock(&MTX);
let body = r#"{
"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]},
"facet_counts": {
"facet_queries": {
"anything: *": 324534
},
"facet_fields": {},
"facet_ranges":"interesting ranges",
"facet_intervals":"interesting intervals",
"facet_heatmaps":"interesting heatmaps"
}
}"#;
let ctx = HttpClient::new_context();
ctx.expect().returning(||
setup_get_mock("http://localhost:8983/solr/default/select?q=*%3A*&facet=on&facet_query=anything%3A+*", 200, body)
);
let collection = "default";
let host = "http://localhost:8983";
let mut command = Client::new(host, collection);
let result = command
.request_handler("select")
.query("*:*")
.facet_query("anything: *")
.run();
assert!(result.is_ok());
let facets = command.get_response::<Value>().unwrap().facet_counts.unwrap();
assert_eq!(facets.facet_queries, serde_json::from_str::<Value>(r#"{"anything: *": 324534 }"#).unwrap());
assert_eq!(facets.raw.get("facet_ranges").unwrap(), "interesting ranges");
assert_eq!(facets.raw.get("facet_intervals").unwrap(), "interesting intervals");
assert_eq!(facets.raw.get("facet_heatmaps").unwrap(), "interesting heatmaps");
}
#[test]
fn run_deserializes_remaining_fields_into_raw() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
let body = r#"{"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]},"anything":"other fields"}"#;
ctx.expect().returning(||
setup_get_mock("http://localhost:8983/solr/default/select?q=*%3A*", 200, body)
);
let mut client = Client::new("http://localhost:8983", "default");
let result = client
.select("*:*")
.run();
assert!(result.is_ok());
assert_eq!(client.get_response::<Value>().unwrap().raw.get("anything").unwrap(),"other fields");
}
#[test]
fn run_calls_post_with_url_and_body() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_post_json()
.withf(| url, body | url == "http://localhost:8983/solr/default/update%2Fjson%2Fdocs?commit=true" && *body == Some(&json!({ "this is": "a document"})) )
.returning(|_, _| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]}}"#)
.unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut command = Client::new(host, collection);
let result = command
.request_handler("update/json/docs")
.auto_commit()
.set_json_document(json!({ "this is": "a document"}))
.run();
assert!(result.is_ok());
assert_eq!(command.get_response::<Value>().unwrap().response.unwrap().docs[0]["success"], true);
}
#[test]
fn select_responds_rsolr_error_with_other_problem_if_dunno() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_get()
.returning(|_| Ok(reqwest::blocking::Response::from(
http::response::Builder::new().status(500).body(r#"{"error": {"code": 500, "msg": "okapi"}}"#).unwrap())));
mock
});
let collection = "default";
let base_url = "http://localhost:8983";
let mut client = Client::new(base_url, collection);
let result = client
.select("bad: query")
.run();
assert!(result.is_err());
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Syntax(..) ));
assert_eq!(format!("{:?}", error), "Syntax(\"okapi\")");
}
#[test]
fn select_responds_rsolr_error_with_raw_text_body_and_status_code_if_no_standard_message() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_get().returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new().status(500).body(r#"some unparseable thing"#).unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.select("bad: query")
.run();
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Other {status: StatusCode::INTERNAL_SERVER_ERROR, ..} ));
assert!(format!("{:?}", error).contains("some unparseable thing"));
}
#[test]
fn create_responds_rsolr_error_with_other_problem_if_dunno() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_post_json().returning(|_, _| Ok(reqwest::blocking::Response::from(http::response::Builder::new().status(500).body(r#"{"error": {"code": 500, "msg": "okapi"}}"#).unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.auto_commit()
.upload_json(json!({"anything": "anything"}))
.run();
assert!(result.is_err());
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Syntax(..) ));
assert_eq!(format!("{:?}", error), "Syntax(\"okapi\")");
}
#[test]
fn create_responds_rsolr_error_with_raw_text_body_and_status_code_if_no_standard_message() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_post_json().returning(|_, _| Ok(reqwest::blocking::Response::from(http::response::Builder::new().status(500).body(r#"some unparseable thing"#).unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.auto_commit()
.upload_json(json!({"anything": "anything"}))
.run();
assert!(result.is_err());
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Other {status: StatusCode::INTERNAL_SERVER_ERROR, ..} ));
assert!(format!("{:?}", error).contains("some unparseable thing"));
}
#[test]
fn delete_responds_rsolr_error_with_other_problem_if_dunno() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_post_json().returning(|_, _| Ok(reqwest::blocking::Response::from(http::response::Builder::new().status(500).body(r#"{"error": {"code": 500, "msg": "okapi"}}"#).unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.auto_commit()
.delete("*:*")
.run();
assert!(result.is_err());
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Syntax(..) ));
assert_eq!(format!("{:?}", error), "Syntax(\"okapi\")");
}
#[test]
fn delete_responds_rsolr_error_with_raw_text_body_and_status_code_if_no_standard_message() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_post_json().returning(|_, _| Ok(reqwest::blocking::Response::from(http::response::Builder::new().status(500).body(r#"some unparseable thing"#).unwrap())));
mock
});
let collection = "default";
let host = "http://localhost:8983";
let mut client = Client::new(host, collection);
let result = client
.delete("*:*")
.run();
assert!(result.is_err());
let error = result.err().expect("No Error");
assert!(matches!(error, RSolrError::Other {status: StatusCode::INTERNAL_SERVER_ERROR, ..} ));
assert!(format!("{:?}", error).contains("some unparseable thing"));
}
#[test]
fn run_responds_cursor_if_cursor_set() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_get()
.returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 1,"numFoundExact": true,"start": 0,"docs": [{"success": true }]}, "nextCursorMark": "cursormark"}"#)
.unwrap())));
mock
});
let mut client = Client::new("http://localhost:8983", "default");
let result = client
.select("*:*")
.sort("field asc")
.cursor()
.run();
assert!(result.expect("Ok expected").is_some());
}
#[test]
fn next_returns_the_next_response() {
let _m = get_lock(&MTX);
let ctx = HttpClient::new_context();
ctx.expect().returning(|| {
let mut mock = HttpClient::default();
mock.expect_get()
.with(eq("http://solr.url/solr/dummy/select?q=*%3A*&rows=1&cursorMark=first_cursor_mark&sort=unique+asc"))
.returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 2,"numFoundExact": true,"start": 0,"docs": [{"success": true }]}, "nextCursorMark": "second_cursor_mark"}"#)
.unwrap())));
mock.expect_get()
.with(eq("http://solr.url/solr/dummy/select?q=*%3A*&rows=1&cursorMark=second_cursor_mark&sort=unique+asc"))
.returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 2,"numFoundExact": true,"start": 0,"docs": [{"success2": true }]}, "nextCursorMark": "third_cursor_mark"}"#)
.unwrap())));
mock.expect_get()
.with(eq("http://solr.url/solr/dummy/select?q=*%3A*&rows=1&cursorMark=third_cursor_mark&sort=unique+asc"))
.returning(|_| Ok(reqwest::blocking::Response::from(http::response::Builder::new()
.status(200)
.body(r#"{"response": {"numFound": 2,"numFoundExact": true,"start": 0,"docs": []}, "nextCursorMark": "third_cursor_mark"}"#)
.unwrap())));
mock
});
let mut client = Client::new("http://solr.url", "dummy");
client
.select("*:*")
.rows(1)
.cursor()
.sort("unique asc");
let mut cursor = Cursor::new(client, "first_cursor_mark".to_owned());
let result = cursor.next::<Value>();
assert_eq!(result.expect("Ok expected").expect("Response expected").response.expect("solr response expected").docs[0].get("success").unwrap(), true);
let result2 = cursor.next::<Value>();
assert_eq!(result2.expect("Ok expected").expect("Response expected").response.expect("solr response expected").docs[0].get("success2").unwrap(), true);
let result3 = cursor.next::<Value>();
assert!(result3.expect("Ok expected").is_none());
}
}