faucet-sink-http
HTTP POST sink connector for the faucet-stream ecosystem.
Sends JSON records to an HTTP endpoint. Supports two batch modes: Individual (one request per record with concurrent execution) and Array (all records in a single request as a JSON array). Includes configurable authentication, retry logic for transient failures, and concurrency control via semaphores.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["sink-http"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
url |
String |
(required) | Target endpoint URL |
method |
reqwest::Method |
POST |
HTTP method to use |
headers |
HeaderMap |
empty | Additional request headers (not serializable; set via builder only) |
auth |
HttpSinkAuth |
None |
Authentication method (see below) |
batch_mode |
HttpBatchMode |
Individual |
How to batch records in requests (see below) |
max_retries |
usize |
0 |
Number of retries on transient failures |
concurrency |
usize |
10 |
Maximum number of concurrent requests in Individual mode |
batch_size |
usize |
1000 (faucet_core::DEFAULT_BATCH_SIZE) |
Maximum records per outbound HTTP request. Re-chunks the upstream StreamPage in Array mode; no-op in Individual mode. 0 = "no batching" sentinel (one POST per StreamPage). |
Authentication (HttpSinkAuth)
| Variant | Fields | Description |
|---|---|---|
None |
-- | No authentication |
Bearer { token } |
String |
Bearer token in the Authorization header |
Basic { username, password } |
String, String |
HTTP Basic authentication |
Custom { headers } |
HashMap<String, String> |
Header name → value map attached to every request |
The Debug implementation masks tokens and passwords with *** to prevent credential leakage in logs.
Batch Modes (HttpBatchMode)
| Mode | Description |
|---|---|
Individual |
Send one HTTP request per record. Requests are executed concurrently up to the concurrency limit using a semaphore. |
Array |
Send all records as a JSON array in a single HTTP request. |
Streaming and batching
The HTTP sink honours the workspace-wide batch_size contract. The exact effect on the wire depends on the configured batch_mode:
Individualmode — one HTTP request per record, executed concurrently up toconcurrencyvia a semaphore.batch_sizehas no effect on wire framing in this mode (each record is already its own request); the field is accepted only for config-shape parity with other sinks and validated viafaucet_core::validate_batch_sizeat load time. Useconcurrencyto tune throughput.Arraymode — the sink re-chunks the upstreamStreamPageintobatch_size-row slices and issues one POST request per chunk, with each request body a JSON array of up tobatch_sizerecords. With the defaultbatch_size = 1000, a 2 500-recordwrite_batchproduces 3 POSTs (1000 + 1000 + 500). Whenbatch_size = 0(the "no batching" sentinel), the entire records slice is forwarded as a single JSON array — useful when the upstream source already chunks the stream to a size the destination endpoint accepts (e.g. a Postgres source with its ownbatch_size).
Recommended value for HTTP POST endpoints that accept arrays: match the destination's documented batch limit (commonly 100–1000 records per request). For per-record send semantics, prefer batch_mode: Individual over batch_size: 1 — the former is the more direct expression of intent and the only one that drives concurrent in-flight requests.
Retry Behavior
When max_retries > 0, the sink retries requests that fail with retriable errors (network errors, 5xx status codes, etc.). Each retry is immediate (no backoff). After exhausting all retries, the last error is returned.
Builder Methods
use ;
let config = new
.method
.auth
.batch_mode
.max_retries
.concurrency
.with_batch_size;
Config Loading
use ;
use HttpSinkConfig;
// From a JSON file
let config: HttpSinkConfig = load_json?;
// From an .env file with a prefix
let config: HttpSinkConfig = load_env_file?;
Note: The headers field on HttpSinkConfig is HeaderMap and remains #[serde(skip)] — set it programmatically. The Custom auth variant uses a HashMap<String, String> and round-trips through JSON/YAML.
Example JSON config (Individual mode with Bearer auth)
Example JSON config (Array mode with Basic auth)
Example .env file
HTTP_SINK_URL=https://api.example.com/ingest
HTTP_SINK_METHOD=POST
HTTP_SINK_AUTH='{"type":"bearer","config":{"token":"my-api-token"}}'
HTTP_SINK_BATCH_MODE='{"type":"Individual"}'
HTTP_SINK_MAX_RETRIES=3
HTTP_SINK_CONCURRENCY=10
Config Schema Introspection
use Sink;
let sink = new;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source = new;
let sink = new;
let result = new.run.await?;
println!;
Examples
Individual mode -- one request per record with concurrency
let config = new
.auth
.batch_mode
.concurrency
.max_retries;
let sink = new;
let records = vec!;
// All 3 records are sent concurrently (up to 20 at a time)
sink.write_batch.await?;
Array mode -- bulk send as JSON array
let config = new
.batch_mode
.max_retries;
let sink = new;
let records = vec!;
// Sends: POST with body [{"metric":"cpu_usage","value":0.85},{"metric":"memory_usage","value":0.72}]
sink.write_batch.await?;
Custom HTTP method and headers
use ;
let mut headers = new;
headers.insert;
let config = new
.method
.headers
.auth;
let sink = new;
How It Works
- The HTTP client is created in
HttpSink::new()and reused across all requests. - In Individual mode, each record is sent as a separate HTTP request. Requests are executed concurrently (bounded by
concurrency). Inwrite_batchthe first error aborts the batch; when a DLQ is configured the sink instead reports per-row outcomes (write_batch_partial), attempting every record and dead-lettering only the ones that actually failed — so already-delivered rows are never duplicated against a non-idempotent endpoint. - In Array mode, all records are collected into a JSON array and sent as a single request body. A failure can't be attributed to specific rows, so the whole array surfaces as an error (the DLQ
on_batch_errorpolicy decides whether to abort or dead-letter the batch). - Retry logic: on transient failures (network errors, retriable HTTP status codes), the request is retried up to
max_retriestimes. Non-retriable errors (4xx client errors) are returned immediately. - Authentication and custom headers are applied to every request.
- HTTP responses are validated using
check_http_response()fromfaucet-core, which checks status codes and returns structured errors.
License
Licensed under MIT or Apache-2.0.