Skip to main content

faucet_sink_http/
config.rs

1//! HTTP sink configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use reqwest::header::HeaderMap;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Authentication method for the HTTP sink.
10#[derive(Clone, Serialize, Deserialize, JsonSchema)]
11#[serde(tag = "type", content = "config", rename_all = "snake_case")]
12pub enum HttpSinkAuth {
13    /// No authentication.
14    None,
15    /// Bearer token in the Authorization header.
16    Bearer {
17        /// The token value (sent as `Authorization: Bearer <token>`).
18        token: String,
19    },
20    /// HTTP Basic authentication.
21    Basic {
22        /// Username.
23        username: String,
24        /// Password.
25        password: String,
26    },
27    /// Custom headers for authentication (e.g. API keys).
28    Custom {
29        /// Header name → value map applied to every request.
30        headers: HashMap<String, String>,
31    },
32}
33
34impl std::fmt::Debug for HttpSinkAuth {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::None => write!(f, "None"),
38            Self::Bearer { .. } => f.debug_tuple("Bearer").field(&"***").finish(),
39            Self::Basic { username, .. } => f
40                .debug_struct("Basic")
41                .field("username", username)
42                .field("password", &"***")
43                .finish(),
44            Self::Custom { .. } => f.debug_tuple("Custom").field(&"***").finish(),
45        }
46    }
47}
48
49/// How records are sent in HTTP requests.
50#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
51#[serde(tag = "type")]
52pub enum HttpBatchMode {
53    /// Send one HTTP request per record.
54    Individual,
55    /// Send all records as a JSON array in a single request.
56    Array,
57}
58
59/// Configuration for the HTTP sink connector.
60#[derive(Clone, Serialize, Deserialize, JsonSchema)]
61pub struct HttpSinkConfig {
62    /// Target endpoint URL.
63    pub url: String,
64    /// HTTP method (default: POST).
65    #[serde(with = "crate::serde_helpers::http_method")]
66    #[schemars(with = "String")]
67    pub method: reqwest::Method,
68    /// Additional request headers.
69    #[serde(skip, default)]
70    pub headers: HeaderMap,
71    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
72    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
73    pub auth: AuthSpec<HttpSinkAuth>,
74    /// How to batch records in requests.
75    pub batch_mode: HttpBatchMode,
76    /// Number of retries on transient failures (default: 0).
77    pub max_retries: usize,
78    /// Maximum number of concurrent requests in Individual mode (default: 10).
79    pub concurrency: usize,
80    /// Maximum number of records sent per outbound HTTP request. Defaults to
81    /// [`DEFAULT_BATCH_SIZE`] (1000).
82    ///
83    /// Interpretation depends on [`batch_mode`](Self::batch_mode):
84    ///
85    /// - In [`HttpBatchMode::Array`] mode, `write_batch` re-chunks the
86    ///   incoming slice into `batch_size`-row chunks and issues one POST
87    ///   request per chunk, with each request body a JSON array of up to
88    ///   `batch_size` records. `batch_size = 0` is the **"no batching"
89    ///   sentinel**: the entire upstream `StreamPage` is forwarded as a
90    ///   single JSON array — useful when the source already chunks to a
91    ///   size the destination endpoint accepts.
92    /// - In [`HttpBatchMode::Individual`] mode the sink already sends one
93    ///   request per record, so `batch_size` has no effect on wire framing;
94    ///   the field is accepted for config-shape parity with other sinks
95    ///   and validated via [`faucet_core::validate_batch_size`] at load
96    ///   time.
97    ///
98    /// Recommended value for HTTP POST endpoints that accept arrays:
99    /// match the destination's documented batch limit (commonly 100–1000
100    /// records per request).
101    #[serde(default = "default_batch_size")]
102    pub batch_size: usize,
103}
104
105fn default_batch_size() -> usize {
106    DEFAULT_BATCH_SIZE
107}
108
109impl std::fmt::Debug for HttpSinkConfig {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("HttpSinkConfig")
112            .field("url", &self.url)
113            .field("method", &self.method)
114            .field("headers", &self.headers)
115            .field("auth", &self.auth)
116            .field("batch_mode", &self.batch_mode)
117            .field("max_retries", &self.max_retries)
118            .field("concurrency", &self.concurrency)
119            .field("batch_size", &self.batch_size)
120            .finish()
121    }
122}
123
124impl HttpSinkConfig {
125    /// Create a new config with the given URL and sensible defaults.
126    pub fn new(url: impl Into<String>) -> Self {
127        Self {
128            url: url.into(),
129            method: reqwest::Method::POST,
130            headers: HeaderMap::new(),
131            auth: AuthSpec::Inline(HttpSinkAuth::None),
132            batch_mode: HttpBatchMode::Individual,
133            max_retries: 0,
134            concurrency: 10,
135            batch_size: DEFAULT_BATCH_SIZE,
136        }
137    }
138
139    /// Set the HTTP method.
140    pub fn method(mut self, method: reqwest::Method) -> Self {
141        self.method = method;
142        self
143    }
144
145    /// Set additional request headers.
146    pub fn headers(mut self, headers: HeaderMap) -> Self {
147        self.headers = headers;
148        self
149    }
150
151    /// Set the authentication method (inline).
152    pub fn auth(mut self, auth: HttpSinkAuth) -> Self {
153        self.auth = AuthSpec::Inline(auth);
154        self
155    }
156
157    /// Set the batch mode.
158    pub fn batch_mode(mut self, mode: HttpBatchMode) -> Self {
159        self.batch_mode = mode;
160        self
161    }
162
163    /// Set the maximum number of retries.
164    pub fn max_retries(mut self, retries: usize) -> Self {
165        self.max_retries = retries;
166        self
167    }
168
169    /// Set the maximum number of concurrent requests in Individual mode.
170    pub fn concurrency(mut self, concurrency: usize) -> Self {
171        self.concurrency = concurrency;
172        self
173    }
174
175    /// Set the maximum number of records sent per outbound HTTP request.
176    ///
177    /// In [`HttpBatchMode::Array`] mode this controls how many records are
178    /// packed into each POST body. In [`HttpBatchMode::Individual`] mode it
179    /// has no effect on wire framing (one request per record either way)
180    /// and is accepted only for parity.
181    ///
182    /// Pass `0` to opt out of re-chunking in Array mode — the entire
183    /// records slice handed to `write_batch` is sent as a single JSON
184    /// array, preserving upstream `StreamPage` framing.
185    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
186        self.batch_size = batch_size;
187        self
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn default_config() {
197        let config = HttpSinkConfig::new("https://api.example.com/ingest");
198        assert_eq!(config.url, "https://api.example.com/ingest");
199        assert_eq!(config.method, reqwest::Method::POST);
200        assert_eq!(config.max_retries, 0);
201        assert!(matches!(config.auth, AuthSpec::Inline(HttpSinkAuth::None)));
202        assert!(matches!(config.batch_mode, HttpBatchMode::Individual));
203    }
204
205    #[test]
206    fn builder_methods() {
207        let config = HttpSinkConfig::new("https://api.example.com/ingest")
208            .method(reqwest::Method::PUT)
209            .auth(HttpSinkAuth::Bearer {
210                token: "token123".into(),
211            })
212            .batch_mode(HttpBatchMode::Array)
213            .max_retries(3);
214        assert_eq!(config.method, reqwest::Method::PUT);
215        assert_eq!(config.max_retries, 3);
216        assert!(matches!(config.batch_mode, HttpBatchMode::Array));
217    }
218
219    #[test]
220    fn batch_size_defaults_to_default_batch_size() {
221        let config = HttpSinkConfig::new("https://api.example.com/ingest");
222        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223    }
224
225    #[test]
226    fn with_batch_size_overrides_default() {
227        let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(250);
228        assert_eq!(config.batch_size, 250);
229    }
230
231    #[test]
232    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
233        let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(0);
234        assert_eq!(config.batch_size, 0);
235        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
236    }
237
238    #[test]
239    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
240        let config = HttpSinkConfig::new("https://api.example.com/ingest")
241            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
242        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
243    }
244
245    #[test]
246    fn batch_size_deserializes_from_json() {
247        let json = r#"{
248            "url": "https://api.example.com/ingest",
249            "method": "POST",
250            "auth": {"type": "none"},
251            "batch_mode": {"type": "Array"},
252            "max_retries": 0,
253            "concurrency": 10,
254            "batch_size": 250
255        }"#;
256        let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
257        assert_eq!(config.batch_size, 250);
258    }
259
260    #[test]
261    fn batch_size_defaults_when_absent_from_json() {
262        let json = r#"{
263            "url": "https://api.example.com/ingest",
264            "method": "POST",
265            "auth": {"type": "none"},
266            "batch_mode": {"type": "Array"},
267            "max_retries": 0,
268            "concurrency": 10
269        }"#;
270        let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
271        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
272    }
273
274    #[test]
275    fn auth_debug_masks_secrets() {
276        let bearer = HttpSinkAuth::Bearer {
277            token: "secret-token".into(),
278        };
279        let debug = format!("{bearer:?}");
280        assert!(debug.contains("***"));
281        assert!(!debug.contains("secret-token"));
282
283        let basic = HttpSinkAuth::Basic {
284            username: "user".into(),
285            password: "secret-pass".into(),
286        };
287        let debug = format!("{basic:?}");
288        assert!(debug.contains("user"));
289        assert!(debug.contains("***"));
290        assert!(!debug.contains("secret-pass"));
291
292        let custom = HttpSinkAuth::Custom {
293            headers: HashMap::new(),
294        };
295        let debug = format!("{custom:?}");
296        assert!(debug.contains("***"));
297    }
298}