faucet-sink-http
HTTP POST sink connector for the faucet-stream ecosystem.
Sends JSON records to an HTTP endpoint. Supports two batch modes: Individual (one request per record with concurrent execution) and Array (all records in a single request as a JSON array). Includes configurable authentication, retry logic for transient failures, and concurrency control via semaphores.
Installation
[]
= "0.1"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "0.2", = ["sink-http"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
url |
String |
(required) | Target endpoint URL |
method |
reqwest::Method |
POST |
HTTP method to use |
headers |
HeaderMap |
empty | Additional request headers (not serializable; set via builder only) |
auth |
HttpSinkAuth |
None |
Authentication method (see below) |
batch_mode |
HttpBatchMode |
Individual |
How to batch records in requests (see below) |
max_retries |
usize |
0 |
Number of retries on transient failures |
concurrency |
usize |
10 |
Maximum number of concurrent requests in Individual mode |
Authentication (HttpSinkAuth)
| Variant | Fields | Description |
|---|---|---|
None |
-- | No authentication |
Bearer |
String |
Bearer token in the Authorization header |
Basic |
username: String, password: String |
HTTP Basic authentication |
Custom |
HeaderMap |
Custom headers for authentication (e.g. API keys). Not serializable; set via builder only. |
The Debug implementation masks tokens and passwords with *** to prevent credential leakage in logs.
Batch Modes (HttpBatchMode)
| Mode | Description |
|---|---|
Individual |
Send one HTTP request per record. Requests are executed concurrently up to the concurrency limit using a semaphore. |
Array |
Send all records as a JSON array in a single HTTP request. |
Retry Behavior
When max_retries > 0, the sink retries requests that fail with retriable errors (network errors, 5xx status codes, etc.). Each retry is immediate (no backoff). After exhausting all retries, the last error is returned.
Builder Methods
use ;
let config = new
.method
.auth
.batch_mode
.max_retries
.concurrency;
Config Loading
use ;
use HttpSinkConfig;
// From a JSON file
let config: HttpSinkConfig = load_json?;
// From an .env file with a prefix
let config: HttpSinkConfig = load_env_file?;
Note: The headers field and Custom auth variant use HeaderMap which is not serializable. These must be set programmatically via builder methods.
Example JSON config (Individual mode with Bearer auth)
Example JSON config (Array mode with Basic auth)
Example .env file
HTTP_SINK_URL=https://api.example.com/ingest
HTTP_SINK_METHOD=POST
HTTP_SINK_AUTH='{"type":"Bearer","0":"my-api-token"}'
HTTP_SINK_BATCH_MODE='{"type":"Individual"}'
HTTP_SINK_MAX_RETRIES=3
HTTP_SINK_CONCURRENCY=10
Config Schema Introspection
use Sink;
let sink = new;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source = new;
let sink = new;
let result = new.run.await?;
println!;
Examples
Individual mode -- one request per record with concurrency
let config = new
.auth
.batch_mode
.concurrency
.max_retries;
let sink = new;
let records = vec!;
// All 3 records are sent concurrently (up to 20 at a time)
sink.write_batch.await?;
Array mode -- bulk send as JSON array
let config = new
.batch_mode
.max_retries;
let sink = new;
let records = vec!;
// Sends: POST with body [{"metric":"cpu_usage","value":0.85},{"metric":"memory_usage","value":0.72}]
sink.write_batch.await?;
Custom HTTP method and headers
use ;
let mut headers = new;
headers.insert;
let config = new
.method
.headers
.auth;
let sink = new;
How It Works
- The HTTP client is created in
HttpSink::new()and reused across all requests. - In Individual mode, each record is sent as a separate HTTP request. Requests are executed concurrently using a
tokio::sync::Semaphoreto limit the number of in-flight requests to theconcurrencyvalue. All requests must succeed; the first error aborts the batch viatry_join_all. - In Array mode, all records are collected into a JSON array and sent as a single request body.
- Retry logic: on transient failures (network errors, retriable HTTP status codes), the request is retried up to
max_retriestimes. Non-retriable errors (4xx client errors) are returned immediately. - Authentication and custom headers are applied to every request.
- HTTP responses are validated using
check_http_response()fromfaucet-core, which checks status codes and returns structured errors.
License
Licensed under MIT or Apache-2.0.