use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_json::json;
pub struct Client {
pub url: String,
}
impl Client {
pub fn new(url: &str) -> Self {
Client {
url: url.to_string(),
}
}
pub async fn bulk(&self, bulk_requests: &str) -> Result<BulkResponse> {
let client = reqwest::Client::new();
let res = client
.post(&format!("{}/_bulk", &self.url))
.header("Content-type", "application/x-ndjson")
.body(bulk_requests.to_string())
.send()
.await?;
let status = res.status();
if reqwest::StatusCode::INTERNAL_SERVER_ERROR == status {
return Err(anyhow!("internal server error {}", res.text().await?,));
}
let text = res.text().await?;
let bulk_response: BulkResponse = serde_json::from_str(&text)?;
Ok(bulk_response)
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct BulkResponse {
took: i64,
errors: bool,
pub items: Vec<BulkResponseItem>,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum BulkResponseItem {
#[allow(non_camel_case_types)]
index(IndexResponseItem),
#[allow(non_camel_case_types)]
delete(DeleteResponseItem),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexResponseItem {
_index: String,
_type: String,
_id: String,
_version: Option<i64>,
result: Option<String>,
created: Option<bool>,
pub status: i64,
pub error: Option<BulkItemError>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct BulkItemError {
pub r#type: String,
pub reason: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct DeleteResponseItem {
_index: String,
_type: String,
_id: String,
pub error: Option<BulkItemError>,
}
pub enum BulkItem {
Index(IndexBulkItem),
Delete(DeleteBulkItem),
}
#[derive(Debug)]
pub struct IndexBulkItem {
pub id: String,
pub r#type: String,
pub parent: Option<String>,
pub body: String,
pub version: i64,
}
impl IndexBulkItem {
fn to_request(&self, index: &str) -> String {
let head = json!({
"index": {
"_index": index,
"_id": &self.id,
"_type": &self.r#type,
"_parent": &self.parent,
"_version_type": "external",
"_version": &self.version,
}
});
let mut body = head.to_string();
body = body + "\n";
body += &self.body;
body += "\n";
body
}
}
#[derive(Debug)]
pub struct DeleteBulkItem {
pub id: String,
pub r#type: String,
pub parent: Option<String>,
}
impl DeleteBulkItem {
fn to_request(&self, index: &str) -> String {
let head = json!({
"delete": {
"_index": index,
"_id": &self.id,
"_type": &self.r#type,
"_parent": &self.parent,
}
});
let mut body = head.to_string();
body = body + "\n";
body
}
}
#[derive(Debug)]
pub enum Bulkable {
Delete(DeleteBulkItem),
Index(IndexBulkItem),
}
pub struct BulkRequestGenerator {
pub index: String,
pub items: Vec<Bulkable>,
}
impl BulkRequestGenerator {
pub fn new(index: &str) -> Self {
Self {
index: index.to_string(),
items: vec![],
}
}
pub fn add_item(&mut self, item: Bulkable) -> &mut Self {
&self.items.push(item);
self
}
pub fn generate(&self) -> String {
let mut body = String::new();
for item in &self.items {
match item {
Bulkable::Delete(i) => {
body += i.to_request(&self.index).as_str();
}
Bulkable::Index(i) => {
body += i.to_request(&self.index).as_str();
}
}
}
body
}
}