use std::fmt::Debug;
use elasticsearch::Elasticsearch;
use elasticsearch::DeleteParts;
use elasticsearch::GetParts;
use elasticsearch::IndexParts;
use elasticsearch::SearchParts;
use elasticsearch::UpdateParts;
use handlebars::Handlebars;
use serde_json::json;
use serde_json::Value;
use crate::core::http_clients::HttpClient;
use crate::commons::datetime::timestamp_millis;
use crate::commons::json_value_to_generic;
use crate::commons::json_value_to_generic2;
use crate::core::error2::Error;
use crate::core::error2::Result;
const TIMESTAMP_FIELD_NAME: &str = "@timestamp";
#[derive(Clone)]
pub struct EsClient {
pub server: String,
pub client: Elasticsearch,
pub request: HttpClient,
pub handlebars: Handlebars<'static>,
}
impl EsClient {
pub async fn build_mappings(&self, _mappings_location: &str) {
let mappings_location =
std::path::PathBuf::from(crate::commons::service_home()).join(_mappings_location);
if mappings_location.exists() {
let mappings_files =
crate::commons::list_files(mappings_location.clone(), "yaml").unwrap();
for file in mappings_files {
let map_data: serde_json::Map<String, serde_json::Value> =
match serde_any::from_file(&file) {
Ok(map) => map,
Err(e) => {
log::error!(
"Elasticsearch-Read-Mappings-YAML-Error: path={}, error={:?}",
&file.to_str().unwrap(),
e
);
serde_json::Map::new()
}
};
let mapping_name = if let Some(name) = map_data.get("mapping_name") {
json_value_to_generic2::<String>(name).unwrap()
} else {
String::default()
};
if mapping_name.is_empty() {
log::warn!(
"Elasticsearch-mapping-file: mapping_name={}, path={}",
"N/a",
file.to_str().unwrap()
);
continue;
}
let mapping_schema = if let Some(schema) = map_data.get("mapping_schema") {
json_value_to_generic2::<String>(schema).unwrap()
} else {
String::default()
};
if mapping_schema.is_empty() {
log::warn!(
"Elasticsearch-mapping-file: mapping_name={}, schema={}, path={}",
mapping_name,
"N/a",
file.to_str().unwrap()
);
continue;
}
let servers: Vec<&str> = self.server.split(',').collect();
let url = format!("{}/_template/{}", servers[0], mapping_name);
match self
.request
.put_with_json_text(&url, mapping_schema.clone())
.await
{
Ok(_) => {
log::info!(
"Elasticsearch-mapping-Created: mapping_name={}, path={}/{}",
mapping_name,
_mappings_location,
file.file_name().unwrap().to_str().unwrap()
);
}
Err(e) => {
log::error!(
"Elasticsearch-mapping-Failed: server={}, mapping_name={}, error={}",
servers[0],
mapping_name,
e
);
}
}
}
}
}
pub async fn search(&self, index_name: &str, query: &str) -> Result<Value> {
let query = serde_json::from_str::<serde_json::Value>(query)?;
match self
.client
.search(SearchParts::Index(&[index_name]))
.body(query)
.send()
.await
{
Ok(res) => Ok(res.json::<Value>().await.unwrap()),
Err(err) => {
log::error!("EsClient-search-failed: error={:?}", err);
Err(Error::UnexpectedError(anyhow::anyhow!(err)))
}
}
}
pub async fn select<T>(&self, index_name: &str, query: &str) -> Result<Vec<T>>
where
T: serde::de::DeserializeOwned,
{
let query = serde_json::from_str::<serde_json::Value>(query)?;
let res = self
.client
.search(SearchParts::Index(&[index_name]))
.body(&query)
.send()
.await
.map_err(|e| anyhow::anyhow!(e))?;
let select_value = res.json::<Value>().await.map_err(|e| anyhow::anyhow!(e))?;
let mut list: Vec<T> = vec![];
if let Some(error) = Self::error(&select_value) {
log::error!(
"EsClient-select-fail: index_name={}, error={}",
index_name,
error
);
return Err(Error::UnexpectedError(anyhow::anyhow!(error)));
}
for hobby in Self::hits(&select_value) {
if let Some(_id) = hobby.get("_id") {
if let Some(s) = hobby.get("_source") {
list.push(Self::item::<T>(s, _id)?);
}
}
}
log::debug!(
"EsClient-select-success: index_name={}, size={}, query={}",
index_name,
list.len(),
query
);
Ok(list)
}
pub async fn select_after<T>(
&self,
index_name: &str,
query: &str,
) -> Result<(Vec<T>, Option<Vec<Value>>)>
where
T: serde::de::DeserializeOwned,
{
let mut query = serde_json::from_str::<serde_json::Value>(query)?;
let mut total_size = 0;
let limit_size = 10000;
if let Some(size) = query.get_mut("size") {
if let Some(size_int) = size.as_i64() {
total_size = size_int as usize;
if total_size > limit_size {
*size = Value::from(limit_size);
}
}
}
let mut return_list: Vec<T> = vec![];
let mut last_sort_value: Option<Vec<Value>> = None;
if query.get("sort").is_none() {
return Err(Error::run_time("select_after must contain sort"));
}
while return_list.len() < total_size {
let curr_size = return_list.len();
if let Some(last_sort_value) = last_sort_value.clone() {
if let Value::Object(map) = &mut query {
map.remove("from");
map.insert("search_after".to_string(), Value::from(last_sort_value));
}
}
if let Value::Object(map) = &mut query {
let new_size = if total_size - curr_size > limit_size {
Value::from(limit_size)
} else {
Value::from(total_size - curr_size)
};
map.insert("size".to_string(), new_size);
}
let res = self
.client
.search(SearchParts::Index(&[index_name]))
.body(&query)
.send()
.await
.map_err(|e| anyhow::anyhow!(e))?;
let select_value = res.json::<Value>().await.map_err(|e| anyhow::anyhow!(e))?;
let mut list: Vec<T> = vec![];
if let Some(error) = Self::error(&select_value) {
log::error!(
"EsClient-select-fail: index_name={}, error={}",
index_name,
error
);
return Err(Error::UnexpectedError(anyhow::anyhow!(error)));
}
let iter = Self::hits(&select_value);
let last_item: Option<Value> = iter.last().cloned();
for hobby in iter {
if let Some(_id) = hobby.get("_id") {
if let Some(s) = hobby.get("_source") {
list.push(Self::item::<T>(s, _id)?);
}
}
}
if let Some(last_item) = last_item {
let last_sort = last_item.get("sort").cloned();
if let Some(last_sort) = last_sort {
last_sort_value = last_sort.as_array().cloned();
}
}
if list.is_empty() {
break;
}
let list_size = list.len();
return_list.extend(list);
log::debug!(
"EsClient-select-success: index_name={}, size={}/{}, query={}",
index_name,
list_size,
return_list.len(),
query
);
}
Ok((return_list, last_sort_value))
}
pub fn hits(select_value: &Value) -> Vec<Value> {
if let Some(select_value) = select_value.get("hits") {
if let Some(select_value) = select_value.get("hits") {
return select_value.as_array().unwrap().to_vec();
}
}
vec![]
}
pub fn error(select_value: &Value) -> Option<String> {
if let Some(error_value) = select_value.get("error") {
if let Some(caused_by) = error_value.get("caused_by") {
let l = caused_by.get("reason").unwrap();
return Some(l.as_str().unwrap().to_string());
}
if let Some(reason) = error_value.get("reason") {
if let Some(root_cause) = error_value.get("root_cause") {
let root_cause = root_cause.as_array().unwrap().to_vec();
if !root_cause.is_empty() {
if let Some(root_cause) = root_cause[0].get("reason") {
return Some(root_cause.as_str().unwrap().to_string());
} else {
return Some(reason.as_str().unwrap().to_string());
}
} else {
return Some(reason.as_str().unwrap().to_string());
}
} else {
return Some(reason.as_str().unwrap().to_string());
}
}
}
None
}
pub fn item<T>(s: &Value, _id: &Value) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let mut json_map = s.as_object().unwrap().clone();
json_map.insert("id".to_string(), _id.clone());
let item = match json_value_to_generic::<T>(Value::Object(json_map)) {
Ok(o) => o,
Err(err) => {
return Err(Error::UnexpectedError(anyhow::anyhow!(err)));
}
};
Ok(item)
}
pub async fn query_single<T>(&self, index_name: &str, id: &str) -> Result<Option<T>>
where
T: serde::de::DeserializeOwned, {
match self
.client
.get(GetParts::IndexId(index_name, id))
.send()
.await
{
Ok(res) => match res.json::<Value>().await {
Ok(query_result) => {
if let Some(result_value) = query_result.get("found") {
if result_value.as_bool().unwrap() {
let json_value = query_result.get("_source").cloned().unwrap();
let t: T = json_value_to_generic(json_value)?;
Ok(Some(t))
} else {
Ok(None)
}
} else {
log::error!(
"EsClient-query_single-fail: id={}, result_value={:?}",
id,
query_result
);
Ok(None)
}
}
Err(e) => {
log::error!("EsClient-query_single-fail: id={}, error={:?}", id, e);
Err(Error::UnexpectedError(anyhow::anyhow!(e)))
}
},
Err(err) => {
log::error!("EsClient-query_single-failed: id={}, error={:?}", id, err);
Err(Error::UnexpectedError(anyhow::anyhow!(err)))
}
}
}
pub async fn save<T>(&self, index_name: &str, data: &T) -> Result<String>
where
T: serde::Serialize,
{
let mut json_object: Value = serde_json::to_value(data).unwrap();
match &json_object {
Value::Object(map) => {
if !map.contains_key(TIMESTAMP_FIELD_NAME) {
let json_map = json_object.as_object_mut().unwrap();
json_map.insert(TIMESTAMP_FIELD_NAME.to_string(), json!(timestamp_millis()));
}
}
_ => return Err(Error::UnexpectedError(anyhow::anyhow!("created failed"))),
}
let idx = IndexParts::Index(index_name);
match self.client.index(idx).body(json_object).send().await {
Ok(res) => match res.json::<Value>().await {
Ok(created_result) => {
if let Some(result_value) = created_result.get("result") {
if result_value.as_str().unwrap() == "created" {
let id = created_result.get("_id").unwrap();
Ok(id.as_str().unwrap().into())
} else {
log::error!(
"EsClient-create_index-fail: created_result={:?}",
created_result
);
Err(Error::UnexpectedError(anyhow::anyhow!("created failed")))
}
} else {
log::error!(
"EsClient-create_index-fail: created_result={:?}",
created_result
);
Err(Error::UnexpectedError(anyhow::anyhow!("created failed")))
}
}
Err(e) => {
log::error!("EsClient-create_index-fail: error={:?}", e);
Err(Error::UnexpectedError(anyhow::anyhow!(e)))
}
},
Err(err) => {
log::error!("EsClient-create_index-failed: error={:?}", err);
Err(Error::UnexpectedError(anyhow::anyhow!(err)))
}
}
}
pub async fn save2(&self, index_name: &str, data: &str) -> Result<Option<String>> {
let mut json_object: Value = crate::commons::parse_json_string(data).unwrap();
match &json_object {
Value::Object(map) => {
if !map.contains_key(TIMESTAMP_FIELD_NAME) {
let json_map = json_object.as_object_mut().unwrap();
json_map.insert(TIMESTAMP_FIELD_NAME.to_string(), json!(timestamp_millis()));
}
}
_ => {
log::error!(
"EsClient-create_index-fail: index_name={}, error={}",
index_name,
"not a map"
);
return Ok(None);
}
}
let idx = IndexParts::Index(index_name);
match self.client.index(idx).body(json_object).send().await {
Ok(res) => match res.json::<Value>().await {
Ok(created_result) => {
if let Some(result_value) = created_result.get("result") {
if result_value.as_str().unwrap() == "created" {
let id = created_result.get("_id").unwrap();
Ok(Some(id.as_str().unwrap().into()))
} else {
log::error!(
"EsClient-create_index-fail: index_name={}, data={}, result_value={:?}",
index_name,
data, result_value
);
Ok(None)
}
} else {
log::error!(
"EsClient-create_index-fail: index_name={}, data={}, error_message={:?}",
index_name,
data, created_result
);
Ok(None)
}
}
Err(e) => {
log::error!(
"EsClient-create_index-fail: index_name={}, error={:?}",
index_name,
e
);
Ok(None)
}
},
Err(err) => {
log::error!(
"EsClient-create_index-failed: index_name={}, error={:?}",
index_name,
err
);
Ok(None)
}
}
}
pub async fn update<T>(&self, index_name: &str, id: &str, data: &T) -> Result<bool>
where
T: serde::Serialize + Debug,
{
let json_object = json!({
"doc": data
});
match self
.client
.update(UpdateParts::IndexId(index_name, id))
.body(&json_object)
.send()
.await
{
Ok(res) => match res.json::<Value>().await {
Ok(update_result) => {
if let Some(result_value) = update_result.get("result") {
if result_value == "updated" || result_value == "noop" {
Ok(true)
} else {
log::error!(
"EsClient-update_index-fail: id={}, update_result={:?}",
id,
update_result
);
Err(Error::UnexpectedError(anyhow::anyhow!("update failed")))
}
} else {
log::error!(
"EsClient-update_index-fail: id={}, update_result={:?}",
id,
update_result
);
Err(Error::UnexpectedError(anyhow::anyhow!("update failed")))
}
}
Err(e) => {
log::error!("EsClient-update_index-fail: id={}, uerror={:?}", id, e);
Err(Error::UnexpectedError(anyhow::anyhow!(e)))
}
},
Err(err) => {
log::error!("EsClient-update-failed: id={}, error={:?}", id, err);
Err(Error::UnexpectedError(anyhow::anyhow!(err)))
}
}
}
pub async fn delete(&self, index_name: &str, id: &str) -> Result<bool> {
match self
.client
.delete(DeleteParts::IndexId(index_name, id))
.send()
.await
{
Ok(res) => match res.json::<Value>().await {
Ok(delete_result) => {
if let Some(result_value) = delete_result.get("result") {
if result_value.as_str().unwrap() == "deleted"
|| result_value.as_str().unwrap() == "not_found"
{
Ok(true)
} else {
log::error!(
"EsClient-delete_index-fail: id={}, result_value={:?}",
id,
result_value
);
Ok(false)
}
} else {
log::error!(
"EsClient-delete_index-fail: id={}, result_value={:?}",
id,
delete_result
);
Ok(false)
}
}
Err(e) => {
log::error!("EsClient-delete_index-fail: id={}, error={:?}", id, e);
Err(Error::UnexpectedError(anyhow::anyhow!(e)))
}
},
Err(err) => {
log::error!("EsClient-delete_index-failed: id={}, error={:?}", id, err);
Err(Error::UnexpectedError(anyhow::anyhow!(err)))
}
}
}
}