Elasticsearch document pipe (espipe)
The goal of espipe is to be a minimalist command-line utility to bulk ingest documents from a file or I/O stream into an Elasticsearch cluster. No enrichment, no transformation, no complication.
Have you ever had thousands of sample documents in an .ndjson or .csv file, and you just want to load them all into a local insecure Elasticsearch cluster?
And you're done.
Add a my-cluster host entry with API keys to the ~/.espipe/hosts.yml and you can reference the host by name:
Description
Being multi-threaded and unthrottled, espipe is capable of fully saturating the CPU of the sending host and can potentially overwhelm the target cluster, so use with caution. It will gracefully handle backpressure and http 429 responses to ensure at-least-once delivery.
Documents are batched into _bulk requests of 5,000 documents and sent with the create action by default. Use --action to switch to index or update based on your needs. For --action=update, each source document must include an _id field for the update target. Use --batch-size and --max-requests to tune bulk request size and concurrency at runtime.
Installation
Install the published crate with Cargo:
To build from source instead:
What It Does
espipe reads records from:
.ndjsonfiles.jsonfiles.csvfilesstdinas NDJSON
It writes records to:
- Elasticsearch
_bulk - a local file
stdout
When writing to Elasticsearch, espipe batches documents into groups of 5,000 records by default, enables request body gzip compression by default, and sends multiple bulk requests concurrently. Use --batch-size to change the number of documents per bulk request and --max-requests to change the number of in-flight bulk requests.
CLI Reference
<INPUT> The
<OUTPUT> The
Input And Output
Both positional arguments are parsed as URI-like strings.
Supported input forms
-Reads NDJSON fromstdin.path/to/file.ndjsonReads NDJSON from a local file.path/to/file.jsonReads line-delimited JSON from a local file.path/to/file.csvReads CSV from a local file.file:///absolute/path/to/file.ndjsonReads NDJSON from afile://URI.file:///absolute/path/to/file.jsonReads line-delimited JSON from afile://URI.file:///absolute/path/to/file.csvReads CSV from afile://URI.
HTTPS input URIs are supported for unauthenticated remote .csv, .ndjson, and .json sources. URLs without a supported file extension can still be accepted when the response Content-Type maps to CSV or NDJSON-oriented JSON input.
Supported output forms
-Writes raw JSON lines tostdout.path/to/output.ndjsonWrites raw JSON lines to a local file, truncating any existing file.file:///absolute/path/to/output.ndjsonWrites raw JSON lines to afile://URI target.http://host:9200/index-nameSends documents to Elasticsearch using the_bulkAPI.https://host:9200/index-nameSends documents to Elasticsearch over TLS.known-host:index-nameResolvesknown-hostfrom a local hosts file and sends to the named index.
When writing to Elasticsearch, the output path must include an index name.
Remote .json inputs are treated as NDJSON. If the downloaded JSON payload does not match the required NDJSON shape, espipe exits with: JSON payload does not look like required NDJSON input format.
Data Format Rules
NDJSON input
Each line must be valid line-delimited JSON. For pass-through JSON inputs, espipe expects the first non-whitespace character on each line to be {.
CSV input
The first row must be a header row. Each subsequent row is converted into a JSON object using the CSV headers as field names.
CSV values are emitted as JSON strings. espipe does not infer numeric, boolean, or date types from CSV input.
Bulk actions
espipe supports three Elasticsearch bulk actions:
createSends each document as acreateoperation.indexSends each document as anindexoperation.updateSends each document as anupdateoperation with a{ "doc": ... }payload.
For --action update, every input document must:
- be a JSON object
- include an
_idfield - have
_idas a string
The _id field is removed from the document body and used as the update target.
Bulk tuning
For Elasticsearch targets:
--batch-sizeSets the number of documents included in each_bulkrequest.--max-requestsSets the maximum number of concurrent in-flight bulk requests.
The internal channel capacity always matches --batch-size.
Output Behavior
Elasticsearch output
For Elasticsearch targets, espipe:
- batches documents into 5,000-document
_bulkrequests by default - keeps up to 16 bulk requests in flight by default
- enables gzip request body compression by default
- retries
429 Too Many Requestsresponses with exponential backoff - logs bulk-item error counts when Elasticsearch reports partial failures
400 Bad Request bulk responses are logged and counted as zero successful documents for that batch.
File and stdout output
For file and stdout targets, espipe writes one raw JSON document per line. It does not emit Elasticsearch bulk action metadata lines for these outputs.
Authentication And Known Hosts
Authentication flags apply only to direct http:// and https:// Elasticsearch outputs:
--apikey--username--password--insecure
Known hosts are loaded from:
$ESPIPE_HOSTS, if set- otherwise
~/.espipe/hosts.yml
Example:
localhost:
auth: None
url: http://localhost:9200/
secure-cluster:
auth: Basic
url: https://example.com:9200/
username: elastic
password: changeme
insecure: false
ess-cluster:
auth: ApiKey
url: https://cluster.example.com/
apikey: "base64-encoded-api-key"
Usage:
For known-host outputs, authentication and TLS settings come from the host entry. CLI auth flags are not applied on top of the known-host configuration.
Examples
Ingest NDJSON into a local Elasticsearch index
Ingest CSV into Elasticsearch
Read NDJSON from stdin
|
Write normalized output to a file
Use Elasticsearch basic authentication
Use an API key
Disable gzip request body compression
Use a smaller bulk size with lower concurrency
Update existing documents by _id
Input:
{"_id":"1","message":"hello"}
{"_id":"2","message":"world"}
Command:
Error Handling And Exit Behavior
espipe is optimized for straightforward ingestion, not for rich machine-readable error reporting.
Current behavior:
- invalid CLI argument combinations are rejected by
clap - invalid authentication combinations fail at startup
- invalid input or output targets fail at startup
- Elasticsearch transport failures during send or close terminate the process
429bulk responses are retried automatically- bulk item failures are logged, but successful items in the same batch are still counted
One current limitation is that input parsing errors and end-of-input are handled through the same loop boundary. In practice, malformed NDJSON or CSV input may stop ingestion early without a dedicated non-zero parsing exit code.
Performance Notes
espipe is intentionally aggressive enough to saturate a local or small remote cluster.
Current bulk worker settings:
- batch size: 5,000 documents
- channel capacity: 5,000 documents
- max in-flight bulk requests: 16
- Tokio worker threads: 3
This is fast for local ingestion and test data loading, but it can overwhelm smaller clusters or shared environments.
Troubleshooting
Set LOG_LEVEL to inspect request and ingestion behavior:
LOG_LEVEL=debug
Useful checks:
- verify the target index name is present in the output URI
- verify CSV files have a header row
- verify NDJSON files contain one complete JSON object per line
- verify
--action updateinputs include string_idvalues - verify known-host entries live in
~/.espipe/hosts.ymlor$ESPIPE_HOSTS
Scope
espipe is a binary crate. It does not publish or support a public Rust library interface.