faucet-source-elasticsearch
An Elasticsearch source that reads documents using the search/scroll API with query DSL support, automatic scroll context management, and pluggable authentication.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-elasticsearch"] }
Quick Start
use ;
use Source;
async
How It Works
The source uses the Elasticsearch scroll API for efficient retrieval of large result sets:
- Sends an initial
POST /{index}/_search?scroll={timeout}&size={batch_size}with the query - Extracts
_sourcefrom each hit inhits.hits[*]._source - Continues scrolling with
POST /_search/scrollusing the_scroll_id - Stops when a scroll page returns zero hits, or
max_pagesis reached - Clears the scroll context (best-effort) to free server resources
Configuration
ElasticsearchSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
base_url |
String |
(required) | Base URL of the Elasticsearch cluster (e.g. "http://localhost:9200"). Trailing slash is trimmed |
index |
String |
(required) | Index name to search |
query |
Value |
{"match_all": {}} |
Elasticsearch query DSL |
scroll_timeout |
String |
"1m" |
Scroll context timeout (e.g. "1m", "5m") |
auth |
ElasticsearchAuth |
ElasticsearchAuth::None |
Authentication method |
max_pages |
Option<usize> |
None |
Maximum number of scroll pages to fetch. None means no limit |
batch_size |
usize |
DEFAULT_BATCH_SIZE (1000) |
Docs per emitted StreamPage; also the scroll API size parameter. 0 is the "no batching" sentinel — see below |
Authentication (ElasticsearchAuth)
| Variant | Fields | Description |
|---|---|---|
None |
-- | No authentication |
Basic { username, password } |
String, String |
HTTP Basic authentication. Password is masked in debug output |
Bearer { token } |
String |
Bearer token in the Authorization header. Masked in debug output |
ApiKey { key } |
String |
API key sent as ApiKey <key> in the Authorization header. Masked in debug output |
Streaming and batching
The source overrides Source::stream_pages so the pipeline writes documents to the sink as each scroll page lands instead of buffering the full result. Client-side memory stays O(batch_size) regardless of the index's total document count.
batch_size(default1000) is passed straight to the scroll API as thesizeparameter on the initialPOST /{index}/_search?scroll={timeout}&size={batch_size}. Each scroll response — initial and follow-up — becomes exactly oneStreamPage. The trailing empty scroll page is consumed but not emitted.batch_size = 0is the "no batching" sentinel: the source skips scroll entirely and issues a singlePOST /{index}/_search?size=10000, then emits oneStreamPage. Useful for small lookup indices, or for sinks like SQLCOPY/ BigQuery load jobs that prefer one large request to many small ones. The10000cap matches Elasticsearch's defaultindex.max_result_window; indices with a largermax_result_windowstill receive only10000hits — switch back to scroll if you need more.batch_size = 0is also honoured by the bufferedfetch_all/fetch_with_contextpath: it maps to the samesize=10000initial search (rather than the literalsize=0, which would return zero hits).max_pages, when set, caps the total number of scroll responses emitted. The cap applies after the page is yielded.- The Elasticsearch search source has no incremental-replication mode today, so every emitted page carries
bookmark: None. - Scroll-context cleanup is unconditional. On every exit path — clean drain,
max_pagestruncation, mid-stream HTTP error, or the consumer dropping the stream — the open_scroll_idis sent toDELETE _search/scrollso the cluster does not leak server-side state.
Config Loading
use ;
use ElasticsearchSourceConfig;
let config: ElasticsearchSourceConfig = load_json?;
let config: ElasticsearchSourceConfig = load_env_file?;
Example JSON config
Example .env file
ES_SOURCE_BASE_URL=http://localhost:9200
ES_SOURCE_INDEX=my_index
ES_SOURCE_SCROLL_TIMEOUT=1m
ES_SOURCE_BATCH_SIZE=1000
ES_SOURCE_MAX_PAGES=50
Config Schema Introspection
use Source;
let source = new;
let schema = source.config_schema;
println!;
Examples
Match-all query with defaults
use ;
use Source;
let config = new;
let source = new;
let products = source.fetch_all.await?;
Filtered query with bearer auth
use ;
use Source;
use json;
let config = new
.query
.auth
.with_batch_size
.scroll_timeout
.max_pages;
let source = new;
let orders = source.fetch_all.await?;
println!;
Using API key authentication
use ;
use Source;
use json;
let config = new
.query
.auth
.with_batch_size;
let source = new;
let metrics = source.fetch_all.await?;
Shared types
ElasticsearchAuth lives in faucet-common-elasticsearch and is shared with faucet-sink-elasticsearch. The source re-exports it for convenience, so faucet_source_elasticsearch::ElasticsearchAuth continues to work unchanged.
License
Licensed under MIT or Apache-2.0.