Skip to main content

faucet_source_elasticsearch/
config.rs

1//! Elasticsearch source configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8pub use faucet_common_elasticsearch::ElasticsearchAuth;
9
10/// Configuration for the Elasticsearch search source.
11#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
12pub struct ElasticsearchSourceConfig {
13    /// Base URL of the Elasticsearch cluster (e.g. `"http://localhost:9200"`).
14    pub base_url: String,
15    /// Index name to search.
16    pub index: String,
17    /// Elasticsearch query DSL. Defaults to `{"match_all": {}}`.
18    pub query: Value,
19    /// Scroll context timeout (e.g. `"1m"`). Defaults to `"1m"`.
20    pub scroll_timeout: String,
21    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
22    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
23    pub auth: AuthSpec<ElasticsearchAuth>,
24    /// Maximum number of scroll pages to fetch. `None` means no limit.
25    pub max_pages: Option<usize>,
26    /// Records per emitted [`StreamPage`](faucet_core::StreamPage), which is
27    /// also the `size` parameter passed to the Elasticsearch scroll API
28    /// (`POST /{index}/_search?scroll={timeout}&size={batch_size}`). Each
29    /// scroll response becomes exactly one `StreamPage`. Defaults to
30    /// [`DEFAULT_BATCH_SIZE`].
31    ///
32    /// `batch_size = 0` is the "no batching" sentinel: the source issues a
33    /// single non-scroll `_search` request with `size = 10_000` (the default
34    /// `index.max_result_window`) and emits one `StreamPage`. Use it for
35    /// small indices or for sinks (e.g. SQL `COPY`, BigQuery load jobs) that
36    /// prefer one large request to many small ones. Indices that have raised
37    /// their `max_result_window` will still cap at 10_000 — raise this knob
38    /// or switch back to scroll if you need more.
39    #[serde(default = "default_batch_size")]
40    pub batch_size: usize,
41}
42
43fn default_batch_size() -> usize {
44    DEFAULT_BATCH_SIZE
45}
46
47impl ElasticsearchSourceConfig {
48    /// Create a new config with the required fields and sensible defaults.
49    pub fn new(base_url: impl Into<String>, index: impl Into<String>) -> Self {
50        Self {
51            base_url: base_url.into().trim_end_matches('/').to_string(),
52            index: index.into(),
53            query: json!({"match_all": {}}),
54            scroll_timeout: "1m".to_string(),
55            auth: AuthSpec::Inline(ElasticsearchAuth::None),
56            max_pages: None,
57            batch_size: DEFAULT_BATCH_SIZE,
58        }
59    }
60
61    /// Set the Elasticsearch query DSL.
62    pub fn query(mut self, q: Value) -> Self {
63        self.query = q;
64        self
65    }
66
67    /// Set the scroll context timeout (e.g. `"5m"`).
68    pub fn scroll_timeout(mut self, t: impl Into<String>) -> Self {
69        self.scroll_timeout = t.into();
70        self
71    }
72
73    /// Set the authentication method.
74    pub fn auth(mut self, a: ElasticsearchAuth) -> Self {
75        self.auth = AuthSpec::Inline(a);
76        self
77    }
78
79    /// Set the maximum number of scroll pages to fetch.
80    pub fn max_pages(mut self, n: usize) -> Self {
81        self.max_pages = Some(n);
82        self
83    }
84
85    /// Set the per-page document count for both the scroll API's `size`
86    /// parameter and the emitted [`StreamPage`](faucet_core::StreamPage)
87    /// size.
88    ///
89    /// Pass `0` to opt out of scroll entirely — the source will issue a
90    /// single `_search` with `size = 10_000` and emit one page.
91    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
92        self.batch_size = batch_size;
93        self
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn default_config() {
103        let config = ElasticsearchSourceConfig::new("http://localhost:9200", "my_index");
104        assert_eq!(config.base_url, "http://localhost:9200");
105        assert_eq!(config.index, "my_index");
106        assert_eq!(config.query, json!({"match_all": {}}));
107        assert_eq!(config.scroll_timeout, "1m");
108        assert!(config.max_pages.is_none());
109    }
110
111    #[test]
112    fn builder_methods() {
113        let config = ElasticsearchSourceConfig::new("http://es:9200/", "idx")
114            .query(json!({"term": {"status": "active"}}))
115            .scroll_timeout("5m")
116            .max_pages(10)
117            .auth(ElasticsearchAuth::Bearer {
118                token: "tok".into(),
119            });
120        assert_eq!(config.base_url, "http://es:9200");
121        assert_eq!(config.scroll_timeout, "5m");
122        assert_eq!(config.max_pages, Some(10));
123    }
124
125    #[test]
126    fn batch_size_defaults_to_default_batch_size() {
127        let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx");
128        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
129    }
130
131    #[test]
132    fn with_batch_size_overrides_default() {
133        let config =
134            ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(500);
135        assert_eq!(config.batch_size, 500);
136    }
137
138    #[test]
139    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
140        let config =
141            ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(0);
142        assert_eq!(config.batch_size, 0);
143        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
144    }
145
146    #[test]
147    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
148        let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx")
149            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
150        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
151    }
152
153    #[test]
154    fn batch_size_deserializes_from_json() {
155        let json = r#"{
156            "base_url": "http://localhost:9200",
157            "index": "idx",
158            "query": {"match_all": {}},
159            "scroll_timeout": "1m",
160            "auth": {"type": "none"},
161            "batch_size": 250
162        }"#;
163        let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
164        assert_eq!(config.batch_size, 250);
165        assert!(matches!(config.auth, faucet_core::AuthSpec::Inline(_)));
166    }
167
168    #[test]
169    fn batch_size_defaults_when_missing_from_json() {
170        let json = r#"{
171            "base_url": "http://localhost:9200",
172            "index": "idx",
173            "query": {"match_all": {}},
174            "scroll_timeout": "1m",
175            "auth": {"type": "none"}
176        }"#;
177        let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
178        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
179    }
180}