espipe 0.2.0

A minimalist command-line utility to pipe documents from a file or I/O stream into an Elasticsearch cluster.
mod action;
mod elasticsearch;
mod file;

extern crate elasticsearch as elasticsearch_client;
use crate::client::{Auth, ElasticsearchBuilder, KnownHost};
pub use action::BulkAction;
pub use elasticsearch::ElasticsearchOutputConfig;
use elasticsearch::ElasticsearchOutput;
use elasticsearch_client::Elasticsearch;
use eyre::Result;
use file::FileOutput;
use fluent_uri::UriRef;
use serde_json::value::RawValue;
use std::path::PathBuf;
use url::Url;

#[derive(Debug)]
pub enum Output {
    Elasticsearch(ElasticsearchOutput),
    File(FileOutput),
    Stdout,
}

impl Output {
    pub fn try_new(
        insecure: bool,
        auth: Auth,
        uri: UriRef<String>,
        action: BulkAction,
        request_body_compression: bool,
        elasticsearch_config: ElasticsearchOutputConfig,
    ) -> Result<Self> {
        log::trace!("{uri:?}");
        match uri.scheme() {
            Some(scheme) if ["http", "https"].contains(&scheme.as_str()) => {
                let url = Url::parse(uri.as_str())?;
                let mut client_url = url.clone();
                client_url.set_path("");
                let client = ElasticsearchBuilder::new(client_url)
                    .insecure(insecure)
                    .auth(auth)
                    .request_body_compression(request_body_compression)
                    .build()?;
                let output = ElasticsearchOutput::try_new(client, url, action, elasticsearch_config)?;
                Ok(Output::Elasticsearch(output))
            }
            Some(scheme) if scheme.as_str() == "file" => {
                let path = PathBuf::from(uri.path().as_str());
                let output = FileOutput::try_from(path)?;
                Ok(Output::File(output))
            }
            Some(scheme) => {
                let known_host = KnownHost::try_from(scheme.as_str())?;
                let url = known_host.get_url().join(uri.path().as_str())?;
                let client = Elasticsearch::try_from(known_host)?;
                let output = ElasticsearchOutput::try_new(client, url, action, elasticsearch_config)?;
                Ok(Output::Elasticsearch(output))
            }
            None => match uri.path().as_str() {
                "-" => Ok(Output::Stdout),
                _ => {
                    let path = PathBuf::from(uri.path().as_str());
                    let output = FileOutput::try_from(path)?;
                    Ok(Output::File(output))
                }
            },
        }
    }

    pub async fn send(&mut self, value: Box<RawValue>) -> Result<usize> {
        match self {
            Output::Elasticsearch(output) => Ok(output.send(value).await?),
            Output::File(output) => Ok(output.send(value).await?),
            Output::Stdout => {
                println!("{}", value.get());
                Ok(1)
            }
        }
    }

    pub async fn close(self) -> Result<usize> {
        match self {
            Output::Elasticsearch(output) => Ok(output.close().await?),
            Output::File(output) => Ok(output.close().await?),
            Output::Stdout => Ok(0),
        }
    }
}

impl std::fmt::Display for Output {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Output::Elasticsearch(output) => write!(f, "{output}"),
            Output::File(output) => write!(f, "{output}"),
            Output::Stdout => write!(f, "stdout"),
        }
    }
}

trait Sender {
    async fn send(&mut self, value: Box<RawValue>) -> Result<usize>;
    async fn close(self) -> Result<usize>;
}