1use crate::config::{HttpBatchMode, HttpSinkAuth, HttpSinkConfig};
4use async_trait::async_trait;
5use faucet_core::util::{DEFAULT_ERROR_BODY_MAX_LEN, check_http_response};
6use faucet_core::{AuthSpec, Credential, FaucetError, SharedAuthProvider};
7use futures::stream::{FuturesUnordered, StreamExt};
8use serde_json::Value;
9use std::collections::HashMap;
10
11fn credential_to_auth(cred: Credential) -> HttpSinkAuth {
14 match cred {
15 Credential::Bearer(token) => HttpSinkAuth::Bearer { token },
16 Credential::Token(token) => HttpSinkAuth::Custom {
17 headers: HashMap::from([("Authorization".to_string(), token)]),
18 },
19 Credential::Basic { username, password } => HttpSinkAuth::Basic { username, password },
20 Credential::Header { name, value } => HttpSinkAuth::Custom {
21 headers: HashMap::from([(name, value)]),
22 },
23 }
24}
25
26pub struct HttpSink {
28 config: HttpSinkConfig,
29 client: reqwest::Client,
30 auth_provider: Option<SharedAuthProvider>,
33}
34
35impl HttpSink {
36 pub fn new(config: HttpSinkConfig) -> Self {
38 Self {
39 config,
40 client: reqwest::Client::new(),
41 auth_provider: None,
42 }
43 }
44
45 pub fn with_auth_provider(mut self, provider: SharedAuthProvider) -> Self {
52 self.auth_provider = Some(provider);
53 self
54 }
55
56 async fn resolve_auth(&self) -> Result<HttpSinkAuth, FaucetError> {
60 if let Some(provider) = &self.auth_provider {
61 Ok(credential_to_auth(provider.credential().await?))
62 } else {
63 match &self.config.auth {
64 AuthSpec::Inline(a) => Ok(a.clone()),
65 AuthSpec::Reference(r) => Err(FaucetError::Auth(format!(
66 "auth references provider '{}' but no provider was supplied; \
67 set one via the CLI `auth:` catalog or `with_auth_provider`",
68 r.name
69 ))),
70 }
71 }
72 }
73
74 fn apply_auth(
76 &self,
77 mut req: reqwest::RequestBuilder,
78 auth: &HttpSinkAuth,
79 ) -> Result<reqwest::RequestBuilder, FaucetError> {
80 match auth {
81 HttpSinkAuth::None => {}
82 HttpSinkAuth::Bearer { token } => {
83 req = req.bearer_auth(token);
84 }
85 HttpSinkAuth::Basic { username, password } => {
86 req = req.basic_auth(username, Some(password));
87 }
88 HttpSinkAuth::Custom { headers } => {
89 let mut hm = reqwest::header::HeaderMap::new();
90 for (name, value) in headers {
91 let n =
92 reqwest::header::HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
93 FaucetError::Auth(format!("invalid custom header name {name:?}: {e}"))
94 })?;
95 let v = reqwest::header::HeaderValue::from_str(value).map_err(|e| {
96 FaucetError::Auth(format!("invalid custom header value for {name:?}: {e}"))
97 })?;
98 hm.insert(n, v);
99 }
100 req = req.headers(hm);
101 }
102 }
103 Ok(req)
104 }
105
106 fn build_request_with_auth(
108 &self,
109 body: &Value,
110 auth: &HttpSinkAuth,
111 ) -> Result<reqwest::RequestBuilder, FaucetError> {
112 let req = self
113 .client
114 .request(self.config.method.clone(), &self.config.url)
115 .headers(self.config.headers.clone())
116 .json(body);
117 self.apply_auth(req, auth)
118 }
119
120 async fn send_with_retry(&self, body: &Value, auth: &HttpSinkAuth) -> Result<(), FaucetError> {
122 let mut last_error = None;
123
124 for attempt in 0..=self.config.max_retries {
125 let req = self.build_request_with_auth(body, auth)?;
126
127 match req.send().await {
128 Ok(resp) => match check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await {
129 Ok(_) => return Ok(()),
130 Err(e) => {
131 if attempt < self.config.max_retries && e.is_retriable() {
132 tracing::warn!(
133 attempt = attempt + 1,
134 max_retries = self.config.max_retries,
135 error = %e,
136 "retrying request"
137 );
138 last_error = Some(e);
139 continue;
140 }
141 return Err(e);
142 }
143 },
144 Err(e) => {
145 let faucet_err = FaucetError::Http(e);
146 if attempt < self.config.max_retries && faucet_err.is_retriable() {
147 tracing::warn!(
148 attempt = attempt + 1,
149 max_retries = self.config.max_retries,
150 error = %faucet_err,
151 "retrying request"
152 );
153 last_error = Some(faucet_err);
154 continue;
155 }
156 return Err(faucet_err);
157 }
158 }
159 }
160
161 Err(last_error.unwrap_or_else(|| FaucetError::Sink("max retries exhausted".into())))
162 }
163}
164
165#[async_trait]
166impl faucet_core::Sink for HttpSink {
167 fn config_schema(&self) -> serde_json::Value {
168 serde_json::to_value(faucet_core::schema_for!(HttpSinkConfig))
169 .expect("schema serialization")
170 }
171
172 async fn check(
180 &self,
181 ctx: &faucet_core::check::CheckContext,
182 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
183 use faucet_core::check::{CheckReport, Probe};
184
185 let auth = match self.resolve_auth().await {
189 Ok(a) => a,
190 Err(e) => {
191 return Ok(CheckReport::single(Probe::fail_hint(
192 "network",
193 std::time::Duration::ZERO,
194 e.to_string(),
195 "check the configured auth / that a shared auth provider is wired up",
196 )));
197 }
198 };
199
200 let started = std::time::Instant::now();
201 let hint = "check the url / DNS / TLS / that the host is reachable";
202
203 let req = self
204 .client
205 .head(&self.config.url)
206 .headers(self.config.headers.clone());
207 let req = match self.apply_auth(req, &auth) {
208 Ok(r) => r,
209 Err(e) => {
210 return Ok(CheckReport::single(Probe::fail_hint(
211 "network",
212 started.elapsed(),
213 e.to_string(),
214 hint,
215 )));
216 }
217 };
218
219 let probe = match tokio::time::timeout(ctx.timeout, req.send()).await {
220 Ok(Ok(_)) => Probe::pass("network", started.elapsed()),
222 Ok(Err(e)) => Probe::fail_hint("network", started.elapsed(), e.to_string(), hint),
224 Err(_) => Probe::fail_hint("network", started.elapsed(), "timed out", hint),
225 };
226 Ok(CheckReport::single(probe))
227 }
228
229 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
230 if records.is_empty() {
231 return Ok(0);
232 }
233
234 let auth = self.resolve_auth().await?;
236
237 match &self.config.batch_mode {
238 HttpBatchMode::Individual => {
239 let concurrency = self.config.concurrency.max(1);
249 let mut in_flight = FuturesUnordered::new();
250 let mut iter = records.iter();
251 for record in iter.by_ref().take(concurrency) {
252 in_flight.push(self.send_with_retry(record, &auth));
253 }
254 while let Some(result) = in_flight.next().await {
255 result?;
256 if let Some(record) = iter.next() {
257 in_flight.push(self.send_with_retry(record, &auth));
258 }
259 }
260
261 tracing::debug!(records = records.len(), "HTTP individual batch written");
262 Ok(records.len())
263 }
264 HttpBatchMode::Array => {
265 let effective_chunk = if self.config.batch_size == 0 {
270 records.len()
271 } else {
272 self.config.batch_size
273 };
274
275 let mut total = 0;
276 for chunk in records.chunks(effective_chunk) {
277 let array = Value::Array(chunk.to_vec());
278 self.send_with_retry(&array, &auth).await?;
279 total += chunk.len();
280 }
281 tracing::debug!(
282 records = total,
283 batch_size = self.config.batch_size,
284 "HTTP array batch written"
285 );
286 Ok(total)
287 }
288 }
289 }
290
291 async fn write_batch_partial(
308 &self,
309 records: &[Value],
310 ) -> Result<Vec<faucet_core::RowOutcome>, FaucetError> {
311 if records.is_empty() {
312 return Ok(Vec::new());
313 }
314
315 let auth = self.resolve_auth().await?;
316
317 match &self.config.batch_mode {
318 HttpBatchMode::Individual => {
319 let concurrency = self.config.concurrency.max(1);
320 let auth = &auth;
321 let pending: Vec<_> =
328 records
329 .iter()
330 .enumerate()
331 .map(|(idx, record)| async move {
332 (idx, self.send_with_retry(record, auth).await)
333 })
334 .collect();
335 let mut indexed: Vec<(usize, faucet_core::RowOutcome)> =
336 futures::stream::iter(pending)
337 .buffer_unordered(concurrency)
338 .collect()
339 .await;
340 indexed.sort_by_key(|(idx, _)| *idx);
341 tracing::debug!(
342 records = records.len(),
343 "HTTP individual partial batch written"
344 );
345 Ok(indexed.into_iter().map(|(_, outcome)| outcome).collect())
346 }
347 HttpBatchMode::Array => {
348 self.write_batch(records).await?;
349 Ok(records.iter().map(|_| Ok(())).collect())
350 }
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use crate::config::HttpSinkConfig;
359
360 #[test]
361 fn creates_sink() {
362 let config = HttpSinkConfig::new("https://api.example.com/ingest");
363 let _sink = HttpSink::new(config);
364 }
365
366 #[test]
367 fn build_request_applies_bearer_auth() {
368 let auth = HttpSinkAuth::Bearer {
369 token: "my-token".into(),
370 };
371 let config = HttpSinkConfig::new("https://api.example.com/ingest").auth(auth.clone());
372 let sink = HttpSink::new(config);
373
374 let req = sink
375 .build_request_with_auth(&serde_json::json!({"test": true}), &auth)
376 .unwrap()
377 .build()
378 .unwrap();
379
380 let auth_header = req
381 .headers()
382 .get("authorization")
383 .unwrap()
384 .to_str()
385 .unwrap();
386 assert!(auth_header.starts_with("Bearer "));
387 assert!(auth_header.contains("my-token"));
388 }
389
390 #[test]
391 fn build_request_applies_basic_auth() {
392 let auth = HttpSinkAuth::Basic {
393 username: "user".into(),
394 password: "pass".into(),
395 };
396 let config = HttpSinkConfig::new("https://api.example.com/ingest").auth(auth.clone());
397 let sink = HttpSink::new(config);
398
399 let req = sink
400 .build_request_with_auth(&serde_json::json!({"test": true}), &auth)
401 .unwrap()
402 .build()
403 .unwrap();
404
405 let auth_header = req
406 .headers()
407 .get("authorization")
408 .unwrap()
409 .to_str()
410 .unwrap();
411 assert!(auth_header.starts_with("Basic "));
412 }
413
414 #[test]
415 fn build_request_uses_configured_method() {
416 let config =
417 HttpSinkConfig::new("https://api.example.com/ingest").method(reqwest::Method::PUT);
418 let sink = HttpSink::new(config);
419
420 let req = sink
421 .build_request_with_auth(&serde_json::json!({"test": true}), &HttpSinkAuth::None)
422 .unwrap()
423 .build()
424 .unwrap();
425
426 assert_eq!(req.method(), reqwest::Method::PUT);
427 }
428}