espipe 0.4.0

A minimalist command-line utility to pipe documents from a file or I/O stream into an Elasticsearch cluster.
use serde_json::Value;
use std::{
    fs::{self, File},
    io::{BufRead, BufReader, Write},
    path::PathBuf,
    process::Command,
    time::{Instant, SystemTime, UNIX_EPOCH},
};

const BENCH_DOCS: usize = 525_000;
const BENCH_BYTES_MIN: u64 = 100 * 1024 * 1024;
const NGINX_BENCH_DOCS: usize = 10_000_000;
const NGINX_BENCH_BYTES_MIN: u64 = 2 * 1024 * 1024 * 1024;

#[test]
#[ignore = "requires localhost:9200 and is intended for manual benchmarking"]
fn localhost_large_ingest_benchmark() {
    let fixture = benchmark_fixture().unwrap();
    let fixture_size = fs::metadata(&fixture).unwrap().len();
    assert!(fixture_size >= BENCH_BYTES_MIN);

    let index = format!("espipe-bench-{}", unique_id());
    let output = format!("http://localhost:9200/{index}");

    let start = Instant::now();
    let status = Command::new(env!("CARGO_BIN_EXE_espipe"))
        .arg(&fixture)
        .arg(&output)
        .arg("-q")
        .status()
        .unwrap();
    assert!(status.success());
    let elapsed = start.elapsed().as_secs_f64();

    refresh_index(&index);

    let count_output = Command::new("curl")
        .args(["-sS", &format!("http://localhost:9200/{index}/_count")])
        .output()
        .unwrap();
    assert!(count_output.status.success());
    let count_json: Value = serde_json::from_slice(&count_output.stdout).unwrap();
    assert_eq!(count_json["count"].as_u64().unwrap(), BENCH_DOCS as u64);

    println!(
        "fixture={} bytes={} docs={} elapsed_seconds={:.3}",
        fixture.display(),
        fixture_size,
        BENCH_DOCS,
        elapsed
    );
}

#[test]
#[ignore = "requires localhost:9200 and is intended for manual benchmarking"]
fn localhost_nginx_access_log_benchmark() {
    let fixture = nginx_access_benchmark_fixture().unwrap();
    let fixture_size = fs::metadata(&fixture).unwrap().len();
    assert!(fixture_size >= NGINX_BENCH_BYTES_MIN);

    let index = format!("espipe-nginx-bench-{}", unique_id());
    let output = format!("http://localhost:9200/{index}");

    let start = Instant::now();
    let status = Command::new(env!("CARGO_BIN_EXE_espipe"))
        .arg(&fixture)
        .arg(&output)
        .arg("-q")
        .status()
        .unwrap();
    assert!(status.success());
    let elapsed = start.elapsed().as_secs_f64();

    refresh_index(&index);

    let count_output = Command::new("curl")
        .args(["-sS", &format!("http://localhost:9200/{index}/_count")])
        .output()
        .unwrap();
    assert!(count_output.status.success());
    let count_json: Value = serde_json::from_slice(&count_output.stdout).unwrap();
    assert_eq!(
        count_json["count"].as_u64().unwrap(),
        NGINX_BENCH_DOCS as u64
    );

    println!(
        "fixture={} bytes={} docs={} elapsed_seconds={:.3} type=nginx_access",
        fixture.display(),
        fixture_size,
        NGINX_BENCH_DOCS,
        elapsed
    );
}

fn benchmark_fixture() -> std::io::Result<PathBuf> {
    let path = std::env::var("ESPIPE_BENCH_INPUT")
        .map(PathBuf::from)
        .unwrap_or_else(|_| std::env::temp_dir().join("espipe-bench-525k.ndjson"));
    if benchmark_fixture_is_valid(&path, BENCH_BYTES_MIN, BENCH_DOCS)? {
        return Ok(path);
    }

    let mut writer = File::create(&path)?;
    for i in 1..=BENCH_DOCS {
        writeln!(
            writer,
            "{{\"id\":{},\"group\":{},\"ok\":true,\"msg\":\"{}\",\"meta\":{{\"source\":\"bench\",\"bucket\":{}}}}}",
            i,
            i % 10,
            "x".repeat(180),
            i % 100
        )?;
    }
    Ok(path)
}

fn nginx_access_benchmark_fixture() -> std::io::Result<PathBuf> {
    let path = std::env::var("ESPIPE_BENCH_NGINX_INPUT")
        .map(PathBuf::from)
        .unwrap_or_else(|_| std::env::temp_dir().join("espipe-bench-nginx-10m.ndjson"));
    if benchmark_fixture_is_valid(&path, NGINX_BENCH_BYTES_MIN, NGINX_BENCH_DOCS)? {
        return Ok(path);
    }

    let mut writer = File::create(&path)?;
    for i in 0..NGINX_BENCH_DOCS {
        writer.write_all(nginx_access_log_line(i).as_bytes())?;
        writer.write_all(b"\n")?;
    }
    Ok(path)
}

fn benchmark_fixture_is_valid(
    path: &PathBuf,
    min_bytes: u64,
    expected_docs: usize,
) -> std::io::Result<bool> {
    if !path.is_file() || fs::metadata(path)?.len() < min_bytes {
        return Ok(false);
    }

    let mut line_count = 0usize;
    for line in BufReader::new(File::open(path)?).lines() {
        line?;
        line_count += 1;
    }
    Ok(line_count == expected_docs)
}

fn refresh_index(index: &str) {
    let refresh_output = Command::new("curl")
        .args([
            "-sS",
            "-XPOST",
            &format!("http://localhost:9200/{index}/_refresh"),
        ])
        .output()
        .unwrap();
    assert!(
        refresh_output.status.success(),
        "refresh failed: {}",
        String::from_utf8_lossy(&refresh_output.stderr)
    );
}

fn nginx_access_log_line(i: usize) -> String {
    let method = match i % 5 {
        0 => "GET",
        1 => "POST",
        2 => "PUT",
        3 => "PATCH",
        _ => "DELETE",
    };
    let status = match i % 10 {
        0..=5 => 200,
        6 => 201,
        7 => 304,
        8 => 404,
        _ => 500,
    };
    let upstream_status = if status >= 500 { 502 } else { 200 };
    let scheme = if i % 20 == 0 { "http" } else { "https" };
    let host = format!("api-{}.example.internal", i % 64);
    let path = format!("/v1/accounts/{}/orders/{}", i % 50_000, i);
    let query = format!("region={}&limit={}", i % 12, 25 + (i % 200));
    let remote_addr = format!("10.{}.{}.{}", (i / 65_536) % 256, (i / 256) % 256, i % 256);
    let upstream_addr = format!("172.16.{}.{}:8080", (i / 256) % 256, i % 256);
    let user_agent = match i % 4 {
        0 => {
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/537.36 Chrome/125.0 Safari/537.36"
        }
        1 => "curl/8.7.1",
        2 => "k6/0.49.0",
        _ => "Datadog-Synthetics/1.0",
    };
    let referer = if i % 3 == 0 {
        format!("https://app.example.internal/dashboard/{}", i % 5000)
    } else {
        "-".to_string()
    };
    let request_length = 512 + (i % 4096);
    let body_bytes_sent = 2048 + ((i * 37) % 16384);
    let request_time = 0.001 + ((i % 4000) as f64 / 1000.0);
    let upstream_time = 0.001 + ((i % 2500) as f64 / 1000.0);
    let trace_id = format!("{:032x}", (i as u128) * 104729 + 17);
    let span_id = format!("{:016x}", (i as u64) * 8191 + 11);
    let ts = 1_710_000_000i64 + i as i64;

    format!(
        "{{\"@timestamp\":{ts},\"service\":\"nginx-gateway\",\"env\":\"bench\",\"host\":\"{host}\",\"scheme\":\"{scheme}\",\"remote_addr\":\"{remote_addr}\",\"request_method\":\"{method}\",\"request_path\":\"{path}\",\"query_string\":\"{query}\",\"status\":{status},\"upstream_status\":{upstream_status},\"request_length\":{request_length},\"body_bytes_sent\":{body_bytes_sent},\"request_time\":{request_time:.3},\"upstream_response_time\":{upstream_time:.3},\"upstream_addr\":\"{upstream_addr}\",\"http_referer\":\"{referer}\",\"http_user_agent\":\"{user_agent}\",\"trace_id\":\"{trace_id}\",\"span_id\":\"{span_id}\",\"geo\":{{\"country\":\"US\",\"region\":\"us-west-2\",\"city\":\"Seattle\"}},\"tls\":{{\"version\":\"TLSv1.3\",\"cipher\":\"TLS_AES_256_GCM_SHA384\"}},\"labels\":{{\"cluster\":\"bench\",\"namespace\":\"edge\",\"pod\":\"nginx-{pod}\"}}}}",
        pod = i % 2048,
    )
}

fn unique_id() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

#[test]
fn generated_fixture_is_large_enough() {
    let fixture = benchmark_fixture().unwrap();
    let bytes = fs::metadata(&fixture).unwrap().len();
    assert!(bytes >= BENCH_BYTES_MIN);

    let line_count = BufReader::new(File::open(fixture).unwrap()).lines().count();
    assert_eq!(line_count, BENCH_DOCS);
}

#[test]
fn generated_nginx_access_line_is_valid_json() {
    let line = nginx_access_log_line(42);
    let value: Value = serde_json::from_str(&line).unwrap();

    assert_eq!(value["service"], "nginx-gateway");
    assert_eq!(value["env"], "bench");
    assert!(
        value["request_path"]
            .as_str()
            .unwrap()
            .starts_with("/v1/accounts/")
    );
    assert!(value["http_user_agent"].as_str().unwrap().len() > 5);
    assert!(value["status"].as_i64().unwrap() >= 200);
}