use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
pub use faucet_common_elasticsearch::ElasticsearchAuth;
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ElasticsearchSourceConfig {
pub base_url: String,
pub index: String,
pub query: Value,
pub scroll_timeout: String,
pub auth: AuthSpec<ElasticsearchAuth>,
pub max_pages: Option<usize>,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl ElasticsearchSourceConfig {
pub fn new(base_url: impl Into<String>, index: impl Into<String>) -> Self {
Self {
base_url: base_url.into().trim_end_matches('/').to_string(),
index: index.into(),
query: json!({"match_all": {}}),
scroll_timeout: "1m".to_string(),
auth: AuthSpec::Inline(ElasticsearchAuth::None),
max_pages: None,
batch_size: DEFAULT_BATCH_SIZE,
}
}
pub fn query(mut self, q: Value) -> Self {
self.query = q;
self
}
pub fn scroll_timeout(mut self, t: impl Into<String>) -> Self {
self.scroll_timeout = t.into();
self
}
pub fn auth(mut self, a: ElasticsearchAuth) -> Self {
self.auth = AuthSpec::Inline(a);
self
}
pub fn max_pages(mut self, n: usize) -> Self {
self.max_pages = Some(n);
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = ElasticsearchSourceConfig::new("http://localhost:9200", "my_index");
assert_eq!(config.base_url, "http://localhost:9200");
assert_eq!(config.index, "my_index");
assert_eq!(config.query, json!({"match_all": {}}));
assert_eq!(config.scroll_timeout, "1m");
assert!(config.max_pages.is_none());
}
#[test]
fn builder_methods() {
let config = ElasticsearchSourceConfig::new("http://es:9200/", "idx")
.query(json!({"term": {"status": "active"}}))
.scroll_timeout("5m")
.max_pages(10)
.auth(ElasticsearchAuth::Bearer {
token: "tok".into(),
});
assert_eq!(config.base_url, "http://es:9200");
assert_eq!(config.scroll_timeout, "5m");
assert_eq!(config.max_pages, Some(10));
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx");
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config =
ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(500);
assert_eq!(config.batch_size, 500);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config =
ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx")
.with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"base_url": "http://localhost:9200",
"index": "idx",
"query": {"match_all": {}},
"scroll_timeout": "1m",
"auth": {"type": "none"},
"batch_size": 250
}"#;
let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
assert!(matches!(config.auth, faucet_core::AuthSpec::Inline(_)));
}
#[test]
fn batch_size_defaults_when_missing_from_json() {
let json = r#"{
"base_url": "http://localhost:9200",
"index": "idx",
"query": {"match_all": {}},
"scroll_timeout": "1m",
"auth": {"type": "none"}
}"#;
let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
}