rlink-connector-elasticsearch 0.3.0

High performance Stream Processing Framework
Documentation
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use elasticsearch::http::headers::HeaderMap;
use elasticsearch::http::request::JsonBody;
use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
use elasticsearch::http::Url;
use elasticsearch::{BulkParts, Elasticsearch};
use rlink::api::checkpoint::CheckpointFunction;
use rlink::api::element::Record;
use rlink::api::function::{Context, NamedFunction, OutputFormat};
use rlink::channel::utils::handover::Handover;
use rlink::utils::thread::{async_runtime, async_sleep, async_spawn};
use rlink::{api, utils};
use serde_json::Value;
use thiserror::Error;

pub struct ElasticsearchModel {
    pub index: String,
    pub es_type: &'static str,
    pub body: Value,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Index {
    index: HashMap<String, String>,
}

impl Index {
    pub fn new() -> Self {
        Index {
            index: HashMap::new(),
        }
    }

    pub fn set_index(&mut self, index_value: String) {
        self.index.insert("_index".to_string(), index_value);
    }

    pub fn set_type(&mut self, type_value: String) {
        self.index.insert("_type".to_string(), type_value);
    }

    pub fn to_json(&self) -> Result<Value, serde_json::Error> {
        serde_json::to_value(self)
    }
}

pub trait ElasticsearchConverter: Debug + Send + Sync {
    fn to_json(&self, record: &mut Record) -> ElasticsearchModel;
}

#[derive(Debug, NamedFunction)]
pub struct ElasticsearchOutputFormat {
    address: String,
    headers: HashMap<String, String>,

    builder: Arc<Box<dyn ElasticsearchConverter>>,
    handover: Option<Handover>,
}

impl ElasticsearchOutputFormat {
    pub fn new(
        address: &str,
        headers: HashMap<String, String>,
        builder: Box<dyn ElasticsearchConverter>,
    ) -> Self {
        ElasticsearchOutputFormat {
            address: address.to_string(),
            headers,
            builder: Arc::new(builder),
            handover: None,
        }
    }
}

impl OutputFormat for ElasticsearchOutputFormat {
    fn open(&mut self, context: &Context) -> api::Result<()> {
        self.handover = Some(Handover::new(self.name(), context.task_id.to_tags(), 10000));

        let mut write_thead = ElasticsearchWriteThread::new(
            self.address.as_str(),
            self.headers.clone(),
            self.handover.as_ref().unwrap().clone(),
            3000,
        )
        .expect("build elasticsearch connection error");

        let convert = self.builder.clone();
        utils::thread::spawn("elastic-sink-block", move || {
            async_runtime("es_sink").block_on(async {
                write_thead.run(convert, 5).await;
            });
        });

        Ok(())
    }

    fn write_record(&mut self, record: Record) {
        self.handover.as_ref().unwrap().produce(record).unwrap();
    }

    fn close(&mut self) -> api::Result<()> {
        Ok(())
    }
}

impl CheckpointFunction for ElasticsearchOutputFormat {}

#[derive(Clone)]
pub struct ElasticsearchWriteThread {
    client: Elasticsearch,
    batch_size: usize,
    handover: Handover,
}

impl ElasticsearchWriteThread {
    pub fn new(
        address: &str,
        headers: HashMap<String, String>,
        handover: Handover,
        batch_size: usize,
    ) -> Result<Self, Box<dyn std::error::Error>> {
        let mut header_map = HeaderMap::new();
        if headers.contains_key("stoken") {
            let val = headers.get("stoken").unwrap();
            header_map.insert("stoken", val.as_str().parse().unwrap());
        }

        let url = Url::parse(address)?;
        let conn_pool = SingleNodeConnectionPool::new(url);
        let transport = TransportBuilder::new(conn_pool)
            .headers(header_map)
            .build()?;
        let client = Elasticsearch::new(transport);

        Ok(ElasticsearchWriteThread {
            client,
            batch_size,
            handover,
        })
    }

    pub async fn run(
        &mut self,
        converters: Arc<Box<dyn ElasticsearchConverter>>,
        parallelism: usize,
    ) {
        let mut join_handlers = Vec::new();
        for _ in 0..parallelism {
            let mut self_clone = self.clone();
            let converter = converters.clone();

            let handler = async_spawn(async move {
                self_clone.run0(converter).await;
            });

            join_handlers.push(handler);
        }

        for handler in join_handlers {
            handler.await.unwrap();
        }
    }

    pub async fn run0(&mut self, converter: Arc<Box<dyn ElasticsearchConverter>>) {
        loop {
            match self.batch_send(&converter).await {
                Ok(len) => {
                    if len == 0 {
                        async_sleep(Duration::from_secs(1)).await;
                    }
                }
                Err(e) => {
                    error!("write elasticsearch error. {}", e);
                    async_sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }

    async fn batch_send(
        &self,
        converter: &Box<dyn ElasticsearchConverter>,
    ) -> Result<usize, Box<dyn std::error::Error + Send>> {
        let mut bulk_bodies = Vec::with_capacity(self.batch_size);
        for _ in 0..self.batch_size {
            match self.handover.try_poll_next() {
                Ok(mut record) => {
                    let ElasticsearchModel {
                        index,
                        es_type,
                        body,
                    } = converter.to_json(record.borrow_mut());

                    let mut index_model = Index::new();
                    index_model.set_index(index.clone());
                    index_model.set_type(es_type.to_string());
                    bulk_bodies.push(JsonBody::new(index_model.to_json().unwrap()));

                    bulk_bodies.push(JsonBody::new(body));
                }
                Err(_e) => {
                    break;
                }
            }
        }

        let len = bulk_bodies.len();
        self.flush(bulk_bodies).await.map_err(|e| {
            let err = std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e));
            let source: Box<dyn std::error::Error + Send> = Box::new(err);
            source
        })?;

        Ok(len)
    }

    async fn flush(
        &self,
        body_bulk: Vec<JsonBody<Value>>,
    ) -> Result<bool, Box<dyn std::error::Error>> {
        if body_bulk.len() == 0 {
            return Ok(true);
        }
        let response = self
            .client
            .bulk(BulkParts::None)
            .body(body_bulk)
            .send()
            .await?;
        let response_body = response.json::<Value>().await?;
        let errors = response_body["errors"]
            .as_bool()
            .ok_or(anyhow!("no errors field in es response"))?;

        if errors {
            let err = std::io::Error::new(std::io::ErrorKind::Other, "");
            let source: Box<dyn std::error::Error + Send> = Box::new(err);
            Err(source)
        } else {
            Ok(true)
        }
    }
}

#[derive(Error, Debug)]
#[error("boxed source")]
pub struct BoxedSource {
    #[source]
    source: Box<dyn std::error::Error + Send + 'static>,
}