1use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use reqwest::header::HeaderMap;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Clone, Serialize, Deserialize, JsonSchema)]
11#[serde(tag = "type", content = "config", rename_all = "snake_case")]
12pub enum HttpSinkAuth {
13 None,
15 Bearer {
17 token: String,
19 },
20 Basic {
22 username: String,
24 password: String,
26 },
27 Custom {
29 headers: HashMap<String, String>,
31 },
32}
33
34impl std::fmt::Debug for HttpSinkAuth {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::None => write!(f, "None"),
38 Self::Bearer { .. } => f.debug_tuple("Bearer").field(&"***").finish(),
39 Self::Basic { username, .. } => f
40 .debug_struct("Basic")
41 .field("username", username)
42 .field("password", &"***")
43 .finish(),
44 Self::Custom { .. } => f.debug_tuple("Custom").field(&"***").finish(),
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
51#[serde(tag = "type")]
52pub enum HttpBatchMode {
53 Individual,
55 Array,
57}
58
59#[derive(Clone, Serialize, Deserialize, JsonSchema)]
61pub struct HttpSinkConfig {
62 pub url: String,
64 #[serde(with = "crate::serde_helpers::http_method")]
66 #[schemars(with = "String")]
67 pub method: reqwest::Method,
68 #[serde(skip, default)]
70 pub headers: HeaderMap,
71 pub auth: AuthSpec<HttpSinkAuth>,
74 pub batch_mode: HttpBatchMode,
76 pub max_retries: usize,
78 pub concurrency: usize,
80 #[serde(default = "default_batch_size")]
102 pub batch_size: usize,
103}
104
105fn default_batch_size() -> usize {
106 DEFAULT_BATCH_SIZE
107}
108
109impl std::fmt::Debug for HttpSinkConfig {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("HttpSinkConfig")
112 .field("url", &self.url)
113 .field("method", &self.method)
114 .field("headers", &self.headers)
115 .field("auth", &self.auth)
116 .field("batch_mode", &self.batch_mode)
117 .field("max_retries", &self.max_retries)
118 .field("concurrency", &self.concurrency)
119 .field("batch_size", &self.batch_size)
120 .finish()
121 }
122}
123
124impl HttpSinkConfig {
125 pub fn new(url: impl Into<String>) -> Self {
127 Self {
128 url: url.into(),
129 method: reqwest::Method::POST,
130 headers: HeaderMap::new(),
131 auth: AuthSpec::Inline(HttpSinkAuth::None),
132 batch_mode: HttpBatchMode::Individual,
133 max_retries: 0,
134 concurrency: 10,
135 batch_size: DEFAULT_BATCH_SIZE,
136 }
137 }
138
139 pub fn method(mut self, method: reqwest::Method) -> Self {
141 self.method = method;
142 self
143 }
144
145 pub fn headers(mut self, headers: HeaderMap) -> Self {
147 self.headers = headers;
148 self
149 }
150
151 pub fn auth(mut self, auth: HttpSinkAuth) -> Self {
153 self.auth = AuthSpec::Inline(auth);
154 self
155 }
156
157 pub fn batch_mode(mut self, mode: HttpBatchMode) -> Self {
159 self.batch_mode = mode;
160 self
161 }
162
163 pub fn max_retries(mut self, retries: usize) -> Self {
165 self.max_retries = retries;
166 self
167 }
168
169 pub fn concurrency(mut self, concurrency: usize) -> Self {
171 self.concurrency = concurrency;
172 self
173 }
174
175 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
186 self.batch_size = batch_size;
187 self
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn default_config() {
197 let config = HttpSinkConfig::new("https://api.example.com/ingest");
198 assert_eq!(config.url, "https://api.example.com/ingest");
199 assert_eq!(config.method, reqwest::Method::POST);
200 assert_eq!(config.max_retries, 0);
201 assert!(matches!(config.auth, AuthSpec::Inline(HttpSinkAuth::None)));
202 assert!(matches!(config.batch_mode, HttpBatchMode::Individual));
203 }
204
205 #[test]
206 fn builder_methods() {
207 let config = HttpSinkConfig::new("https://api.example.com/ingest")
208 .method(reqwest::Method::PUT)
209 .auth(HttpSinkAuth::Bearer {
210 token: "token123".into(),
211 })
212 .batch_mode(HttpBatchMode::Array)
213 .max_retries(3);
214 assert_eq!(config.method, reqwest::Method::PUT);
215 assert_eq!(config.max_retries, 3);
216 assert!(matches!(config.batch_mode, HttpBatchMode::Array));
217 }
218
219 #[test]
220 fn batch_size_defaults_to_default_batch_size() {
221 let config = HttpSinkConfig::new("https://api.example.com/ingest");
222 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223 }
224
225 #[test]
226 fn with_batch_size_overrides_default() {
227 let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(250);
228 assert_eq!(config.batch_size, 250);
229 }
230
231 #[test]
232 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
233 let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(0);
234 assert_eq!(config.batch_size, 0);
235 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
236 }
237
238 #[test]
239 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
240 let config = HttpSinkConfig::new("https://api.example.com/ingest")
241 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
242 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
243 }
244
245 #[test]
246 fn batch_size_deserializes_from_json() {
247 let json = r#"{
248 "url": "https://api.example.com/ingest",
249 "method": "POST",
250 "auth": {"type": "none"},
251 "batch_mode": {"type": "Array"},
252 "max_retries": 0,
253 "concurrency": 10,
254 "batch_size": 250
255 }"#;
256 let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
257 assert_eq!(config.batch_size, 250);
258 }
259
260 #[test]
261 fn batch_size_defaults_when_absent_from_json() {
262 let json = r#"{
263 "url": "https://api.example.com/ingest",
264 "method": "POST",
265 "auth": {"type": "none"},
266 "batch_mode": {"type": "Array"},
267 "max_retries": 0,
268 "concurrency": 10
269 }"#;
270 let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
271 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
272 }
273
274 #[test]
275 fn auth_debug_masks_secrets() {
276 let bearer = HttpSinkAuth::Bearer {
277 token: "secret-token".into(),
278 };
279 let debug = format!("{bearer:?}");
280 assert!(debug.contains("***"));
281 assert!(!debug.contains("secret-token"));
282
283 let basic = HttpSinkAuth::Basic {
284 username: "user".into(),
285 password: "secret-pass".into(),
286 };
287 let debug = format!("{basic:?}");
288 assert!(debug.contains("user"));
289 assert!(debug.contains("***"));
290 assert!(!debug.contains("secret-pass"));
291
292 let custom = HttpSinkAuth::Custom {
293 headers: HashMap::new(),
294 };
295 let debug = format!("{custom:?}");
296 assert!(debug.contains("***"));
297 }
298}