espipe 0.4.0

A minimalist command-line utility to pipe documents from a file or I/O stream into an Elasticsearch cluster.
espipe-0.4.0 is not a library.

Elasticsearch document pipe (espipe)

The goal of espipe is to be a minimalist command-line utility to bulk ingest documents from a file or I/O stream into an Elasticsearch cluster. No enrichment, no transformation, no complication.

Have you ever had thousands of sample documents in an .ndjson or .csv file, and you just want to load them all into a local insecure Elasticsearch cluster?

espipe docs.ndjson http://localhost:9200/new_index

And you're done.

Add a my-cluster host entry with API keys to the ~/.espipe/hosts.yml and you can reference the host by name:

espipe docs.ndjson my-cluster:/new_index

Description

Being multi-threaded and unthrottled, espipe is capable of fully saturating the CPU of the sending host and can potentially overwhelm the target cluster, so use with caution. It will gracefully handle backpressure and http 429 responses to ensure at-least-once delivery.

Documents are batched into _bulk requests of 5,000 documents and sent with the create action by default. Use --action to switch to index or update based on your needs. For --action=update, each source document must include an _id field for the update target. Use --batch-size and --max-requests to tune bulk request size and concurrency at runtime.

Installation

Install the published crate with Cargo:

cargo install espipe

To build from source instead:

git clone https://github.com/VimCommando/espipe
cd espipe
cargo install --path .

What It Does

espipe reads records from:

  • .ndjson files
  • .ndjson.gz files
  • .json files
  • .csv files
  • .csv.gz files
  • stdin as NDJSON

It writes records to:

  • Elasticsearch _bulk
  • a local .ndjson or .ndjson.gz file
  • stdout

When writing to Elasticsearch, espipe batches documents into groups of 5,000 records by default, enables request body gzip compression by default, and sends multiple bulk requests concurrently. Use --batch-size to change the number of documents per bulk request and --max-requests to change the number of in-flight bulk requests. File gzip compression is selected only for supported .csv.gz, .ndjson.gz, and output .ndjson.gz suffixes, and is separate from Elasticsearch request body compression.

CLI Reference

Usage: espipe [OPTIONS] <INPUT> <OUTPUT>

Arguments:
  <INPUT>   The input URI to read docs from
  <OUTPUT>  The output URI to send docs to

Options:
  -k, --insecure                     Ignore certificate validation
  -a, --apikey <APIKEY>              Apikey to authenticate via http header
  -u, --username <USERNAME>          Username for basic authentication
  -p, --password <PASSWORD>          Password for basic authentication
  -q, --quiet                        Quiet mode, don't print runtime summary
  -z, --uncompressed                 Disable request body gzip compression
      --action <ACTION>              Bulk action for Elasticsearch outputs [default: create] [possible values: create, index, update]
      --batch-size <BATCH_SIZE>      Documents per Elasticsearch bulk request [default: 5000]
      --max-requests <MAX_REQUESTS>  Maximum concurrent Elasticsearch bulk requests [default: 16]
  -h, --help                         Print help

Input And Output

Both positional arguments are parsed as URI-like strings.

Supported input forms

  • - Reads NDJSON from stdin.
  • path/to/file.ndjson Reads NDJSON from a local file.
  • path/to/file.ndjson.gz Reads gzip-compressed NDJSON from a local file.
  • path/to/file.json Reads line-delimited JSON from a local file.
  • path/to/file.csv Reads CSV from a local file.
  • path/to/file.csv.gz Reads gzip-compressed CSV from a local file.
  • file:///absolute/path/to/file.ndjson Reads NDJSON from a file:// URI.
  • file:///absolute/path/to/file.ndjson.gz Reads gzip-compressed NDJSON from a file:// URI.
  • file:///absolute/path/to/file.json Reads line-delimited JSON from a file:// URI.
  • file:///absolute/path/to/file.csv Reads CSV from a file:// URI.
  • file:///absolute/path/to/file.csv.gz Reads gzip-compressed CSV from a file:// URI.

HTTPS input URIs are supported for unauthenticated remote .csv, .ndjson, and .json sources. URLs without a supported file extension can still be accepted when the response Content-Type maps to CSV or NDJSON-oriented JSON input.

Supported output forms

  • - Writes raw JSON lines to stdout.
  • path/to/output.ndjson Writes raw JSON lines to a local file, truncating any existing file.
  • path/to/output.ndjson.gz Writes gzip-compressed raw JSON lines to a local file, truncating any existing file.
  • file:///absolute/path/to/output.ndjson Writes raw JSON lines to a file:// URI target.
  • file:///absolute/path/to/output.ndjson.gz Writes gzip-compressed raw JSON lines to a file:// URI target.
  • http://host:9200/index-name Sends documents to Elasticsearch using the _bulk API.
  • https://host:9200/index-name Sends documents to Elasticsearch over TLS.
  • known-host:index-name Resolves known-host from a local hosts file and sends to the named index.

When writing to Elasticsearch, the output path must include an index name.

Remote .json inputs are treated as NDJSON. If the downloaded JSON payload does not match the required NDJSON shape, espipe exits with: JSON payload does not look like required NDJSON input format.

Data Format Rules

NDJSON input

Each line must be valid line-delimited JSON. For pass-through JSON inputs, espipe expects the first non-whitespace character on each line to be {.

CSV input

The first row must be a header row. Each subsequent row is converted into a JSON object using the CSV headers as field names.

CSV values are emitted as JSON strings. espipe does not infer numeric, boolean, or date types from CSV input.

Bulk actions

espipe supports three Elasticsearch bulk actions:

  • create Sends each document as a create operation.
  • index Sends each document as an index operation.
  • update Sends each document as an update operation with a { "doc": ... } payload.

For --action update, every input document must:

  • be a JSON object
  • include an _id field
  • have _id as a string

The _id field is removed from the document body and used as the update target.

Bulk tuning

For Elasticsearch targets:

  • --batch-size Sets the number of documents included in each _bulk request.
  • --max-requests Sets the maximum number of concurrent in-flight bulk requests.

The internal channel capacity always matches --batch-size.

Output Behavior

Elasticsearch output

For Elasticsearch targets, espipe:

  • batches documents into 5,000-document _bulk requests by default
  • keeps up to 16 bulk requests in flight by default
  • enables gzip request body compression by default
  • retries 429 Too Many Requests responses with exponential backoff
  • logs bulk-item error counts when Elasticsearch reports partial failures

400 Bad Request bulk responses are logged and counted as zero successful documents for that batch.

File and stdout output

For file and stdout targets, espipe writes one raw JSON document per line. It does not emit Elasticsearch bulk action metadata lines for these outputs.

Authentication And Known Hosts

Authentication flags apply only to direct http:// and https:// Elasticsearch outputs:

  • --apikey
  • --username
  • --password
  • --insecure

Known hosts are loaded from:

  • $ESPIPE_HOSTS, if set
  • otherwise ~/.espipe/hosts.yml

Example:

localhost:
  auth: None
  url: http://localhost:9200/

secure-cluster:
  auth: Basic
  url: https://example.com:9200/
  username: elastic
  password: changeme
  insecure: false

ess-cluster:
  auth: ApiKey
  url: https://cluster.example.com/
  apikey: "base64-encoded-api-key"

Usage:

espipe docs.ndjson localhost:my-index
espipe docs.ndjson secure-cluster:my-index
espipe docs.ndjson ess-cluster:my-index

For known-host outputs, authentication and TLS settings come from the host entry. CLI auth flags are not applied on top of the known-host configuration.

Examples

Ingest NDJSON into a local Elasticsearch index

espipe docs.ndjson http://localhost:9200/my-index

Ingest CSV into Elasticsearch

espipe users.csv http://localhost:9200/users

Read NDJSON from stdin

cat docs.ndjson | espipe - http://localhost:9200/my-index

Write normalized output to a file

espipe users.csv output.ndjson

Read and write gzip-compressed files

espipe users.csv.gz output.ndjson.gz
espipe docs.ndjson.gz http://localhost:9200/my-index

Use Elasticsearch basic authentication

espipe docs.ndjson https://example.com:9200/my-index \
  --username elastic \
  --password changeme

Use an API key

espipe docs.ndjson https://example.com:9200/my-index \
  --apikey "base64-encoded-api-key"

Disable gzip request body compression

espipe docs.ndjson http://localhost:9200/my-index --uncompressed

Use a smaller bulk size with lower concurrency

espipe docs.ndjson http://localhost:9200/my-index \
  --batch-size 1000 \
  --max-requests 4

Update existing documents by _id

Input:

{"_id":"1","message":"hello"}
{"_id":"2","message":"world"}

Command:

espipe docs.ndjson http://localhost:9200/my-index --action update

Error Handling And Exit Behavior

espipe is optimized for straightforward ingestion, not for rich machine-readable error reporting.

Current behavior:

  • invalid CLI argument combinations are rejected by clap
  • invalid authentication combinations fail at startup
  • invalid input or output targets fail at startup
  • Elasticsearch transport failures during send or close terminate the process
  • 429 bulk responses are retried automatically
  • bulk item failures are logged, but successful items in the same batch are still counted

One current limitation is that input parsing errors and end-of-input are handled through the same loop boundary. In practice, malformed NDJSON or CSV input may stop ingestion early without a dedicated non-zero parsing exit code.

Performance Notes

espipe is intentionally aggressive enough to saturate a local or small remote cluster.

Current bulk worker settings:

  • batch size: 5,000 documents
  • channel capacity: 5,000 documents
  • max in-flight bulk requests: 16
  • Tokio worker threads: 3

This is fast for local ingestion and test data loading, but it can overwhelm smaller clusters or shared environments.

Troubleshooting

Set LOG_LEVEL to inspect request and ingestion behavior:

LOG_LEVEL=debug espipe docs.ndjson http://localhost:9200/my-index

Useful checks:

  • verify the target index name is present in the output URI
  • verify CSV files have a header row
  • verify NDJSON files contain one complete JSON object per line
  • verify --action update inputs include string _id values
  • verify known-host entries live in ~/.espipe/hosts.yml or $ESPIPE_HOSTS

Scope

espipe is a binary crate. It does not publish or support a public Rust library interface.