faucet-source-elasticsearch 1.0.1

Elasticsearch source connector for the faucet-stream ecosystem
Documentation

faucet-source-elasticsearch

Crates.io Docs.rs

An Elasticsearch source that reads documents using the search/scroll API with query DSL support, automatic scroll context management, and pluggable authentication.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-elasticsearch = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-elasticsearch"] }

Quick Start

use faucet_source_elasticsearch::{ElasticsearchSource, ElasticsearchSourceConfig};
use faucet_core::Source;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ElasticsearchSourceConfig::new(
        "http://localhost:9200",
        "my_index",
    );

    let source = ElasticsearchSource::new(config);
    let records = source.fetch_all().await?;

    println!("Read {} documents", records.len());
    Ok(())
}

How It Works

The source uses the Elasticsearch scroll API for efficient retrieval of large result sets:

  1. Sends an initial POST /{index}/_search?scroll={timeout}&size={batch_size} with the query
  2. Extracts _source from each hit in hits.hits[*]._source
  3. Continues scrolling with POST /_search/scroll using the _scroll_id
  4. Stops when a scroll page returns zero hits, or max_pages is reached
  5. Clears the scroll context (best-effort) to free server resources

Configuration

ElasticsearchSourceConfig

Field Type Default Description
base_url String (required) Base URL of the Elasticsearch cluster (e.g. "http://localhost:9200"). Trailing slash is trimmed
index String (required) Index name to search
query Value {"match_all": {}} Elasticsearch query DSL
scroll_timeout String "1m" Scroll context timeout (e.g. "1m", "5m")
auth ElasticsearchAuth ElasticsearchAuth::None Authentication method
max_pages Option<usize> None Maximum number of scroll pages to fetch. None means no limit
batch_size usize DEFAULT_BATCH_SIZE (1000) Docs per emitted StreamPage; also the scroll API size parameter. 0 is the "no batching" sentinel — see below

Authentication (ElasticsearchAuth)

Variant Fields Description
None -- No authentication
Basic { username, password } String, String HTTP Basic authentication. Password is masked in debug output
Bearer { token } String Bearer token in the Authorization header. Masked in debug output
ApiKey { key } String API key sent as ApiKey <key> in the Authorization header. Masked in debug output

Streaming and batching

The source overrides Source::stream_pages so the pipeline writes documents to the sink as each scroll page lands instead of buffering the full result. Client-side memory stays O(batch_size) regardless of the index's total document count.

  • batch_size (default 1000) is passed straight to the scroll API as the size parameter on the initial POST /{index}/_search?scroll={timeout}&size={batch_size}. Each scroll response — initial and follow-up — becomes exactly one StreamPage. The trailing empty scroll page is consumed but not emitted.
  • batch_size = 0 is the "no batching" sentinel: the source skips scroll entirely and issues a single POST /{index}/_search?size=10000, then emits one StreamPage. Useful for small lookup indices, or for sinks like SQL COPY / BigQuery load jobs that prefer one large request to many small ones. The 10000 cap matches Elasticsearch's default index.max_result_window; indices with a larger max_result_window still receive only 10000 hits — switch back to scroll if you need more.
  • batch_size = 0 is also honoured by the buffered fetch_all / fetch_with_context path: it maps to the same size=10000 initial search (rather than the literal size=0, which would return zero hits).
  • max_pages, when set, caps the total number of scroll responses emitted. The cap applies after the page is yielded.
  • The Elasticsearch search source has no incremental-replication mode today, so every emitted page carries bookmark: None.
  • Scroll-context cleanup is unconditional. On every exit path — clean drain, max_pages truncation, mid-stream HTTP error, or the consumer dropping the stream — the open _scroll_id is sent to DELETE _search/scroll so the cluster does not leak server-side state.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_elasticsearch::ElasticsearchSourceConfig;

let config: ElasticsearchSourceConfig = load_json("config.json")?;
let config: ElasticsearchSourceConfig = load_env_file(".env", "ES_SOURCE")?;

Example JSON config

{
  "base_url": "https://elasticsearch.example.com:9200",
  "index": "application-logs-2025.03",
  "query": {
    "bool": {
      "must": [
        { "match": { "level": "error" } },
        { "range": { "@timestamp": { "gte": "2025-03-01", "lt": "2025-04-01" } } }
      ]
    }
  },
  "scroll_timeout": "2m",
  "batch_size": 5000,
  "auth": {
    "type": "basic",
    "config": {
      "username": "elastic",
      "password": "changeme"
    }
  },
  "max_pages": 100
}

Example .env file

ES_SOURCE_BASE_URL=http://localhost:9200
ES_SOURCE_INDEX=my_index
ES_SOURCE_SCROLL_TIMEOUT=1m
ES_SOURCE_BATCH_SIZE=1000
ES_SOURCE_MAX_PAGES=50

Config Schema Introspection

use faucet_core::Source;

let source = ElasticsearchSource::new(config);
let schema = source.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Match-all query with defaults

use faucet_source_elasticsearch::{ElasticsearchSource, ElasticsearchSourceConfig};
use faucet_core::Source;

let config = ElasticsearchSourceConfig::new("http://localhost:9200", "products");
let source = ElasticsearchSource::new(config);
let products = source.fetch_all().await?;

Filtered query with bearer auth

use faucet_source_elasticsearch::{
    ElasticsearchSource, ElasticsearchSourceConfig, ElasticsearchAuth,
};
use faucet_core::Source;
use serde_json::json;

let config = ElasticsearchSourceConfig::new(
    "https://es.production.example.com:9200",
    "orders-2025",
)
.query(json!({
    "bool": {
        "filter": [
            { "term": { "status": "completed" } },
            { "range": { "amount": { "gte": 100 } } }
        ]
    }
}))
.auth(ElasticsearchAuth::Bearer {
    token: "your-token".into(),
})
.with_batch_size(2000)
.scroll_timeout("5m")
.max_pages(50);

let source = ElasticsearchSource::new(config);
let orders = source.fetch_all().await?;
println!("Found {} matching orders", orders.len());

Using API key authentication

use faucet_source_elasticsearch::{
    ElasticsearchSource, ElasticsearchSourceConfig, ElasticsearchAuth,
};
use faucet_core::Source;
use serde_json::json;

let config = ElasticsearchSourceConfig::new(
    "https://my-cluster.es.cloud.example.com:9243",
    "metrics-*",
)
.query(json!({
    "range": {
        "@timestamp": {
            "gte": "now-1h",
            "lt": "now"
        }
    }
}))
.auth(ElasticsearchAuth::ApiKey {
    key: "base64-encoded-api-key".into(),
})
.with_batch_size(5000);

let source = ElasticsearchSource::new(config);
let metrics = source.fetch_all().await?;

Shared types

ElasticsearchAuth lives in faucet-common-elasticsearch and is shared with faucet-sink-elasticsearch. The source re-exports it for convenience, so faucet_source_elasticsearch::ElasticsearchAuth continues to work unchanged.

License

Licensed under MIT or Apache-2.0.