faucet_source_elasticsearch/
config.rs1use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8pub use faucet_common_elasticsearch::ElasticsearchAuth;
9
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
12pub struct ElasticsearchSourceConfig {
13 pub base_url: String,
15 pub index: String,
17 pub query: Value,
19 pub scroll_timeout: String,
21 pub auth: AuthSpec<ElasticsearchAuth>,
24 pub max_pages: Option<usize>,
26 #[serde(default = "default_batch_size")]
40 pub batch_size: usize,
41}
42
43fn default_batch_size() -> usize {
44 DEFAULT_BATCH_SIZE
45}
46
47impl ElasticsearchSourceConfig {
48 pub fn new(base_url: impl Into<String>, index: impl Into<String>) -> Self {
50 Self {
51 base_url: base_url.into().trim_end_matches('/').to_string(),
52 index: index.into(),
53 query: json!({"match_all": {}}),
54 scroll_timeout: "1m".to_string(),
55 auth: AuthSpec::Inline(ElasticsearchAuth::None),
56 max_pages: None,
57 batch_size: DEFAULT_BATCH_SIZE,
58 }
59 }
60
61 pub fn query(mut self, q: Value) -> Self {
63 self.query = q;
64 self
65 }
66
67 pub fn scroll_timeout(mut self, t: impl Into<String>) -> Self {
69 self.scroll_timeout = t.into();
70 self
71 }
72
73 pub fn auth(mut self, a: ElasticsearchAuth) -> Self {
75 self.auth = AuthSpec::Inline(a);
76 self
77 }
78
79 pub fn max_pages(mut self, n: usize) -> Self {
81 self.max_pages = Some(n);
82 self
83 }
84
85 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
92 self.batch_size = batch_size;
93 self
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn default_config() {
103 let config = ElasticsearchSourceConfig::new("http://localhost:9200", "my_index");
104 assert_eq!(config.base_url, "http://localhost:9200");
105 assert_eq!(config.index, "my_index");
106 assert_eq!(config.query, json!({"match_all": {}}));
107 assert_eq!(config.scroll_timeout, "1m");
108 assert!(config.max_pages.is_none());
109 }
110
111 #[test]
112 fn builder_methods() {
113 let config = ElasticsearchSourceConfig::new("http://es:9200/", "idx")
114 .query(json!({"term": {"status": "active"}}))
115 .scroll_timeout("5m")
116 .max_pages(10)
117 .auth(ElasticsearchAuth::Bearer {
118 token: "tok".into(),
119 });
120 assert_eq!(config.base_url, "http://es:9200");
121 assert_eq!(config.scroll_timeout, "5m");
122 assert_eq!(config.max_pages, Some(10));
123 }
124
125 #[test]
126 fn batch_size_defaults_to_default_batch_size() {
127 let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx");
128 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
129 }
130
131 #[test]
132 fn with_batch_size_overrides_default() {
133 let config =
134 ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(500);
135 assert_eq!(config.batch_size, 500);
136 }
137
138 #[test]
139 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
140 let config =
141 ElasticsearchSourceConfig::new("http://localhost:9200", "idx").with_batch_size(0);
142 assert_eq!(config.batch_size, 0);
143 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
144 }
145
146 #[test]
147 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
148 let config = ElasticsearchSourceConfig::new("http://localhost:9200", "idx")
149 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
150 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
151 }
152
153 #[test]
154 fn batch_size_deserializes_from_json() {
155 let json = r#"{
156 "base_url": "http://localhost:9200",
157 "index": "idx",
158 "query": {"match_all": {}},
159 "scroll_timeout": "1m",
160 "auth": {"type": "none"},
161 "batch_size": 250
162 }"#;
163 let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
164 assert_eq!(config.batch_size, 250);
165 assert!(matches!(config.auth, faucet_core::AuthSpec::Inline(_)));
166 }
167
168 #[test]
169 fn batch_size_defaults_when_missing_from_json() {
170 let json = r#"{
171 "base_url": "http://localhost:9200",
172 "index": "idx",
173 "query": {"match_all": {}},
174 "scroll_timeout": "1m",
175 "auth": {"type": "none"}
176 }"#;
177 let config: ElasticsearchSourceConfig = serde_json::from_str(json).unwrap();
178 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
179 }
180}