espipe 0.2.0

A minimalist command-line utility to pipe documents from a file or I/O stream into an Elasticsearch cluster.
use elasticsearch::indices::{IndicesDeleteParts, IndicesRefreshParts};
use elasticsearch::{
    http::transport::{SingleNodeConnectionPool, TransportBuilder},
    CountParts, Elasticsearch,
};
use eyre::Result;
use serde_json::Value;
use std::{
    fs,
    path::PathBuf,
    process::Command,
    time::{SystemTime, UNIX_EPOCH},
};
use url::Url;

fn temp_dir(prefix: &str) -> PathBuf {
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("time went backwards")
        .as_nanos();
    let dir = std::env::temp_dir().join(format!("{prefix}-{}-{nanos}", std::process::id()));
    fs::create_dir_all(&dir).expect("create temp dir");
    dir
}

fn test_index_name() -> String {
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("time went backwards")
        .as_nanos();
    format!("espipe-test-{}-{nanos}", std::process::id())
}

fn write_input_file(dir: &PathBuf, filename: &str) -> PathBuf {
    let path = dir.join(filename);
    let contents = r#"{"message":"hello"}
{"message":"world"}
"#;
    fs::write(&path, contents).expect("write input file");
    path
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cli_ingests_into_elasticsearch_if_available() -> Result<()> {
    let base_url = Url::parse("http://localhost:9200")?;
    let transport =
        TransportBuilder::new(SingleNodeConnectionPool::new(base_url.clone())).build()?;
    let client = Elasticsearch::new(transport);

    if !is_connected(&client).await.unwrap_or(false) {
        eprintln!("Skipping Elasticsearch integration test; local node not available.");
        return Ok(());
    }

    let temp_dir = temp_dir("espipe-es-it");
    let input_path = write_input_file(&temp_dir, "bulk_input.ndjson");
    let index = test_index_name();
    let output_url = format!("{}/{}", base_url.as_str().trim_end_matches('/'), index);

    let status = Command::new(env!("CARGO_BIN_EXE_espipe"))
        .arg(&input_path)
        .arg(&output_url)
        .status()
        .expect("run espipe");

    assert!(status.success(), "espipe exited with failure");

    client
        .indices()
        .refresh(IndicesRefreshParts::Index(&[&index]))
        .send()
        .await?;

    let response = client.count(CountParts::Index(&[&index])).send().await?;
    let body: Value = response.json().await?;
    let count = body.get("count").and_then(Value::as_u64).unwrap_or(0);
    assert_eq!(count, 2);

    client
        .indices()
        .delete(IndicesDeleteParts::Index(&[&index]))
        .send()
        .await?;

    Ok(())
}

async fn is_connected(client: &Elasticsearch) -> Result<bool> {
    let response = match client.info().send().await {
        Ok(response) => response,
        Err(_) => return Ok(false),
    };

    let body: Value = match response.json().await {
        Ok(body) => body,
        Err(_) => return Ok(false),
    };

    Ok(body
        .get("tagline")
        .and_then(Value::as_str)
        .is_some_and(|tagline| tagline == "You Know, for Search"))
}