Skip to main content

faucet_sink_http/
sink.rs

1//! HTTP sink executor.
2
3use crate::config::{HttpBatchMode, HttpSinkAuth, HttpSinkConfig};
4use async_trait::async_trait;
5use faucet_core::util::{DEFAULT_ERROR_BODY_MAX_LEN, check_http_response};
6use faucet_core::{AuthSpec, Credential, FaucetError, SharedAuthProvider};
7use futures::stream::{FuturesUnordered, StreamExt};
8use serde_json::Value;
9use std::collections::HashMap;
10
11/// Map a [`Credential`] from a shared provider onto the [`HttpSinkAuth`]
12/// representation so the existing header-application path can be reused.
13fn credential_to_auth(cred: Credential) -> HttpSinkAuth {
14    match cred {
15        Credential::Bearer(token) => HttpSinkAuth::Bearer { token },
16        Credential::Token(token) => HttpSinkAuth::Custom {
17            headers: HashMap::from([("Authorization".to_string(), token)]),
18        },
19        Credential::Basic { username, password } => HttpSinkAuth::Basic { username, password },
20        Credential::Header { name, value } => HttpSinkAuth::Custom {
21            headers: HashMap::from([(name, value)]),
22        },
23    }
24}
25
26/// An HTTP sink that sends records to an HTTP endpoint.
27pub struct HttpSink {
28    config: HttpSinkConfig,
29    client: reqwest::Client,
30    /// Optional shared auth provider. When set, it takes precedence over inline
31    /// auth. Set via [`HttpSink::with_auth_provider`].
32    auth_provider: Option<SharedAuthProvider>,
33}
34
35impl HttpSink {
36    /// Create a new HTTP sink from the given configuration.
37    pub fn new(config: HttpSinkConfig) -> Self {
38        Self {
39            config,
40            client: reqwest::Client::new(),
41            auth_provider: None,
42        }
43    }
44
45    /// Attach a shared [`AuthProvider`](faucet_core::AuthProvider). When set,
46    /// the provider supplies the credential for every request (taking
47    /// precedence over inline auth), so several sinks can share one token with
48    /// single-flight refresh. Used by the CLI to resolve `auth: { ref }`, and
49    /// by library callers who construct one provider and inject it into many
50    /// sinks.
51    pub fn with_auth_provider(mut self, provider: SharedAuthProvider) -> Self {
52        self.auth_provider = Some(provider);
53        self
54    }
55
56    /// Resolve the effective auth for the current batch. The provider (if any)
57    /// takes precedence; otherwise inline auth is used. A bare
58    /// `AuthSpec::Reference` with no provider is an error.
59    async fn resolve_auth(&self) -> Result<HttpSinkAuth, FaucetError> {
60        if let Some(provider) = &self.auth_provider {
61            Ok(credential_to_auth(provider.credential().await?))
62        } else {
63            match &self.config.auth {
64                AuthSpec::Inline(a) => Ok(a.clone()),
65                AuthSpec::Reference(r) => Err(FaucetError::Auth(format!(
66                    "auth references provider '{}' but no provider was supplied; \
67                     set one via the CLI `auth:` catalog or `with_auth_provider`",
68                    r.name
69                ))),
70            }
71        }
72    }
73
74    /// Build an HTTP request with auth and headers applied.
75    fn apply_auth(
76        &self,
77        mut req: reqwest::RequestBuilder,
78        auth: &HttpSinkAuth,
79    ) -> Result<reqwest::RequestBuilder, FaucetError> {
80        match auth {
81            HttpSinkAuth::None => {}
82            HttpSinkAuth::Bearer { token } => {
83                req = req.bearer_auth(token);
84            }
85            HttpSinkAuth::Basic { username, password } => {
86                req = req.basic_auth(username, Some(password));
87            }
88            HttpSinkAuth::Custom { headers } => {
89                let mut hm = reqwest::header::HeaderMap::new();
90                for (name, value) in headers {
91                    let n =
92                        reqwest::header::HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
93                            FaucetError::Auth(format!("invalid custom header name {name:?}: {e}"))
94                        })?;
95                    let v = reqwest::header::HeaderValue::from_str(value).map_err(|e| {
96                        FaucetError::Auth(format!("invalid custom header value for {name:?}: {e}"))
97                    })?;
98                    hm.insert(n, v);
99                }
100                req = req.headers(hm);
101            }
102        }
103        Ok(req)
104    }
105
106    /// Build an HTTP request with the given pre-resolved auth and body.
107    fn build_request_with_auth(
108        &self,
109        body: &Value,
110        auth: &HttpSinkAuth,
111    ) -> Result<reqwest::RequestBuilder, FaucetError> {
112        let req = self
113            .client
114            .request(self.config.method.clone(), &self.config.url)
115            .headers(self.config.headers.clone())
116            .json(body);
117        self.apply_auth(req, auth)
118    }
119
120    /// Send a single request with retry logic, using the pre-resolved `auth`.
121    async fn send_with_retry(&self, body: &Value, auth: &HttpSinkAuth) -> Result<(), FaucetError> {
122        let mut last_error = None;
123
124        for attempt in 0..=self.config.max_retries {
125            let req = self.build_request_with_auth(body, auth)?;
126
127            match req.send().await {
128                Ok(resp) => match check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await {
129                    Ok(_) => return Ok(()),
130                    Err(e) => {
131                        if attempt < self.config.max_retries && e.is_retriable() {
132                            tracing::warn!(
133                                attempt = attempt + 1,
134                                max_retries = self.config.max_retries,
135                                error = %e,
136                                "retrying request"
137                            );
138                            last_error = Some(e);
139                            continue;
140                        }
141                        return Err(e);
142                    }
143                },
144                Err(e) => {
145                    let faucet_err = FaucetError::Http(e);
146                    if attempt < self.config.max_retries && faucet_err.is_retriable() {
147                        tracing::warn!(
148                            attempt = attempt + 1,
149                            max_retries = self.config.max_retries,
150                            error = %faucet_err,
151                            "retrying request"
152                        );
153                        last_error = Some(faucet_err);
154                        continue;
155                    }
156                    return Err(faucet_err);
157                }
158            }
159        }
160
161        Err(last_error.unwrap_or_else(|| FaucetError::Sink("max retries exhausted".into())))
162    }
163}
164
165#[async_trait]
166impl faucet_core::Sink for HttpSink {
167    fn config_schema(&self) -> serde_json::Value {
168        serde_json::to_value(faucet_core::schema_for!(HttpSinkConfig))
169            .expect("schema serialization")
170    }
171
172    /// Non-mutating preflight probe (probe name `"network"`).
173    ///
174    /// Issues a lightweight `HEAD` request to the configured endpoint over the
175    /// existing reqwest client. We only care that the host is reachable — that
176    /// DNS, TCP, TLS and the server all work — so **any** HTTP response (2xx,
177    /// 4xx including `405 Method Not Allowed`, or 5xx) counts as a pass. Only a
178    /// transport/connection error (no response at all) is a failure.
179    async fn check(
180        &self,
181        ctx: &faucet_core::check::CheckContext,
182    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
183        use faucet_core::check::{CheckReport, Probe};
184
185        // Resolve auth so authenticated endpoints don't reject the connection
186        // before we learn the host is reachable. An unresolvable auth ref is a
187        // configuration failure surfaced on this probe.
188        let auth = match self.resolve_auth().await {
189            Ok(a) => a,
190            Err(e) => {
191                return Ok(CheckReport::single(Probe::fail_hint(
192                    "network",
193                    std::time::Duration::ZERO,
194                    e.to_string(),
195                    "check the configured auth / that a shared auth provider is wired up",
196                )));
197            }
198        };
199
200        let started = std::time::Instant::now();
201        let hint = "check the url / DNS / TLS / that the host is reachable";
202
203        let req = self
204            .client
205            .head(&self.config.url)
206            .headers(self.config.headers.clone());
207        let req = match self.apply_auth(req, &auth) {
208            Ok(r) => r,
209            Err(e) => {
210                return Ok(CheckReport::single(Probe::fail_hint(
211                    "network",
212                    started.elapsed(),
213                    e.to_string(),
214                    hint,
215                )));
216            }
217        };
218
219        let probe = match tokio::time::timeout(ctx.timeout, req.send()).await {
220            // Any HTTP response means DNS + TCP + TLS + the host all work.
221            Ok(Ok(_)) => Probe::pass("network", started.elapsed()),
222            // Transport/connection error: no response received.
223            Ok(Err(e)) => Probe::fail_hint("network", started.elapsed(), e.to_string(), hint),
224            Err(_) => Probe::fail_hint("network", started.elapsed(), "timed out", hint),
225        };
226        Ok(CheckReport::single(probe))
227    }
228
229    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
230        if records.is_empty() {
231            return Ok(0);
232        }
233
234        // Resolve auth once per batch (provider-first, then inline).
235        let auth = self.resolve_auth().await?;
236
237        match &self.config.batch_mode {
238            HttpBatchMode::Individual => {
239                // Run `send_with_retry` for every record with at most
240                // `concurrency` in-flight at once. We drive a
241                // `FuturesUnordered` directly, refilling it as each future
242                // completes, instead of acquiring permits up-front the way
243                // the previous semaphore-based code did — that approach
244                // deadlocked because permits were acquired sequentially in
245                // a loop before any future actually ran, so after
246                // `concurrency` iterations the next `acquire_owned().await`
247                // would block forever (closes #59).
248                let concurrency = self.config.concurrency.max(1);
249                let mut in_flight = FuturesUnordered::new();
250                let mut iter = records.iter();
251                for record in iter.by_ref().take(concurrency) {
252                    in_flight.push(self.send_with_retry(record, &auth));
253                }
254                while let Some(result) = in_flight.next().await {
255                    result?;
256                    if let Some(record) = iter.next() {
257                        in_flight.push(self.send_with_retry(record, &auth));
258                    }
259                }
260
261                tracing::debug!(records = records.len(), "HTTP individual batch written");
262                Ok(records.len())
263            }
264            HttpBatchMode::Array => {
265                // `batch_size = 0` is the "no batching" sentinel: forward
266                // whatever upstream handed us as a single JSON-array POST,
267                // preserving `StreamPage` framing. Otherwise re-chunk into
268                // `batch_size` slices and issue one POST per chunk.
269                let effective_chunk = if self.config.batch_size == 0 {
270                    records.len()
271                } else {
272                    self.config.batch_size
273                };
274
275                let mut total = 0;
276                for chunk in records.chunks(effective_chunk) {
277                    let array = Value::Array(chunk.to_vec());
278                    self.send_with_retry(&array, &auth).await?;
279                    total += chunk.len();
280                }
281                tracing::debug!(
282                    records = total,
283                    batch_size = self.config.batch_size,
284                    "HTTP array batch written"
285                );
286                Ok(total)
287            }
288        }
289    }
290
291    /// Report per-row outcomes so the DLQ router dead-letters only the records
292    /// that genuinely failed.
293    ///
294    /// In **Individual** mode every record is an independent POST, so each
295    /// record's success/failure is attributable: we attempt *all* of them
296    /// (unlike `write_batch`, whose `?` short-circuits on the first failure)
297    /// and return one `Ok`/`Err` per record. Without this
298    /// override the default impl would surface the first error as an outer
299    /// `Err`, and under `on_batch_error: dlq_all` the pipeline would route the
300    /// *entire* batch to the DLQ — duplicating the already-delivered rows
301    /// against a non-idempotent endpoint (#146 M14).
302    ///
303    /// In **Array** mode a single array POST cannot attribute a failure to
304    /// specific rows, so it stays all-or-nothing (matches the default trait
305    /// impl): the whole batch surfaces as an outer `Err` and the router's
306    /// `on_batch_error` policy decides whether to abort or dead-letter it.
307    async fn write_batch_partial(
308        &self,
309        records: &[Value],
310    ) -> Result<Vec<faucet_core::RowOutcome>, FaucetError> {
311        if records.is_empty() {
312            return Ok(Vec::new());
313        }
314
315        let auth = self.resolve_auth().await?;
316
317        match &self.config.batch_mode {
318            HttpBatchMode::Individual => {
319                let concurrency = self.config.concurrency.max(1);
320                let auth = &auth;
321                // Attempt every record (failures don't short-circuit the
322                // siblings) with at most `concurrency` POSTs in flight. Tag each
323                // outcome with its index so we can restore record order after
324                // the unordered completion. The per-record futures are built
325                // eagerly (lazy, not yet polled) so `buffer_unordered` drives a
326                // single concrete future type.
327                let pending: Vec<_> =
328                    records
329                        .iter()
330                        .enumerate()
331                        .map(|(idx, record)| async move {
332                            (idx, self.send_with_retry(record, auth).await)
333                        })
334                        .collect();
335                let mut indexed: Vec<(usize, faucet_core::RowOutcome)> =
336                    futures::stream::iter(pending)
337                        .buffer_unordered(concurrency)
338                        .collect()
339                        .await;
340                indexed.sort_by_key(|(idx, _)| *idx);
341                tracing::debug!(
342                    records = records.len(),
343                    "HTTP individual partial batch written"
344                );
345                Ok(indexed.into_iter().map(|(_, outcome)| outcome).collect())
346            }
347            HttpBatchMode::Array => {
348                self.write_batch(records).await?;
349                Ok(records.iter().map(|_| Ok(())).collect())
350            }
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use crate::config::HttpSinkConfig;
359
360    #[test]
361    fn creates_sink() {
362        let config = HttpSinkConfig::new("https://api.example.com/ingest");
363        let _sink = HttpSink::new(config);
364    }
365
366    #[test]
367    fn build_request_applies_bearer_auth() {
368        let auth = HttpSinkAuth::Bearer {
369            token: "my-token".into(),
370        };
371        let config = HttpSinkConfig::new("https://api.example.com/ingest").auth(auth.clone());
372        let sink = HttpSink::new(config);
373
374        let req = sink
375            .build_request_with_auth(&serde_json::json!({"test": true}), &auth)
376            .unwrap()
377            .build()
378            .unwrap();
379
380        let auth_header = req
381            .headers()
382            .get("authorization")
383            .unwrap()
384            .to_str()
385            .unwrap();
386        assert!(auth_header.starts_with("Bearer "));
387        assert!(auth_header.contains("my-token"));
388    }
389
390    #[test]
391    fn build_request_applies_basic_auth() {
392        let auth = HttpSinkAuth::Basic {
393            username: "user".into(),
394            password: "pass".into(),
395        };
396        let config = HttpSinkConfig::new("https://api.example.com/ingest").auth(auth.clone());
397        let sink = HttpSink::new(config);
398
399        let req = sink
400            .build_request_with_auth(&serde_json::json!({"test": true}), &auth)
401            .unwrap()
402            .build()
403            .unwrap();
404
405        let auth_header = req
406            .headers()
407            .get("authorization")
408            .unwrap()
409            .to_str()
410            .unwrap();
411        assert!(auth_header.starts_with("Basic "));
412    }
413
414    #[test]
415    fn build_request_uses_configured_method() {
416        let config =
417            HttpSinkConfig::new("https://api.example.com/ingest").method(reqwest::Method::PUT);
418        let sink = HttpSink::new(config);
419
420        let req = sink
421            .build_request_with_auth(&serde_json::json!({"test": true}), &HttpSinkAuth::None)
422            .unwrap()
423            .build()
424            .unwrap();
425
426        assert_eq!(req.method(), reqwest::Method::PUT);
427    }
428}