faucet-sink-http 1.0.1

HTTP POST sink connector for the faucet-stream ecosystem
Documentation

faucet-sink-http

Crates.io Docs.rs

HTTP POST sink connector for the faucet-stream ecosystem.

Sends JSON records to an HTTP endpoint. Supports two batch modes: Individual (one request per record with concurrent execution) and Array (all records in a single request as a JSON array). Includes configurable authentication, retry logic for transient failures, and concurrency control via semaphores.

Installation

[dependencies]
faucet-sink-http = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["sink-http"] }

Quick Start

use faucet_sink_http::{HttpSink, HttpSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = HttpSinkConfig::new("https://api.example.com/ingest")
        .max_retries(3)
        .concurrency(10);

    let sink = HttpSink::new(config);

    let records = vec![
        json!({"user_id": "u123", "event": "signup"}),
        json!({"user_id": "u456", "event": "login"}),
    ];

    let written = sink.write_batch(&records).await?;
    println!("Sent {written} records");

    Ok(())
}

Configuration

Field Type Default Description
url String (required) Target endpoint URL
method reqwest::Method POST HTTP method to use
headers HeaderMap empty Additional request headers (not serializable; set via builder only)
auth HttpSinkAuth None Authentication method (see below)
batch_mode HttpBatchMode Individual How to batch records in requests (see below)
max_retries usize 0 Number of retries on transient failures
concurrency usize 10 Maximum number of concurrent requests in Individual mode
batch_size usize 1000 (faucet_core::DEFAULT_BATCH_SIZE) Maximum records per outbound HTTP request. Re-chunks the upstream StreamPage in Array mode; no-op in Individual mode. 0 = "no batching" sentinel (one POST per StreamPage).

Authentication (HttpSinkAuth)

Variant Fields Description
None -- No authentication
Bearer { token } String Bearer token in the Authorization header
Basic { username, password } String, String HTTP Basic authentication
Custom { headers } HashMap<String, String> Header name → value map attached to every request

The Debug implementation masks tokens and passwords with *** to prevent credential leakage in logs.

Batch Modes (HttpBatchMode)

Mode Description
Individual Send one HTTP request per record. Requests are executed concurrently up to the concurrency limit using a semaphore.
Array Send all records as a JSON array in a single HTTP request.

Streaming and batching

The HTTP sink honours the workspace-wide batch_size contract. The exact effect on the wire depends on the configured batch_mode:

  • Individual mode — one HTTP request per record, executed concurrently up to concurrency via a semaphore. batch_size has no effect on wire framing in this mode (each record is already its own request); the field is accepted only for config-shape parity with other sinks and validated via faucet_core::validate_batch_size at load time. Use concurrency to tune throughput.
  • Array mode — the sink re-chunks the upstream StreamPage into batch_size-row slices and issues one POST request per chunk, with each request body a JSON array of up to batch_size records. With the default batch_size = 1000, a 2 500-record write_batch produces 3 POSTs (1000 + 1000 + 500). When batch_size = 0 (the "no batching" sentinel), the entire records slice is forwarded as a single JSON array — useful when the upstream source already chunks the stream to a size the destination endpoint accepts (e.g. a Postgres source with its own batch_size).

Recommended value for HTTP POST endpoints that accept arrays: match the destination's documented batch limit (commonly 100–1000 records per request). For per-record send semantics, prefer batch_mode: Individual over batch_size: 1 — the former is the more direct expression of intent and the only one that drives concurrent in-flight requests.

Retry Behavior

When max_retries > 0, the sink retries requests that fail with retriable errors (network errors, 5xx status codes, etc.). Each retry is immediate (no backoff). After exhausting all retries, the last error is returned.

Builder Methods

use faucet_sink_http::{HttpSinkConfig, HttpSinkAuth, HttpBatchMode};

let config = HttpSinkConfig::new("https://api.example.com/ingest")
    .method(reqwest::Method::PUT)
    .auth(HttpSinkAuth::Bearer { token: "my-token".into() })
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3)
    .concurrency(20)
    .with_batch_size(500);

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_sink_http::HttpSinkConfig;

// From a JSON file
let config: HttpSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: HttpSinkConfig = load_env_file(".env", "HTTP_SINK")?;

Note: The headers field on HttpSinkConfig is HeaderMap and remains #[serde(skip)] — set it programmatically. The Custom auth variant uses a HashMap<String, String> and round-trips through JSON/YAML.

Example JSON config (Individual mode with Bearer auth)

{
  "url": "https://api.example.com/ingest",
  "method": "POST",
  "auth": {
    "type": "bearer",
    "config": {
      "token": "my-api-token"
    }
  },
  "batch_mode": {
    "type": "Individual"
  },
  "max_retries": 3,
  "concurrency": 10
}

Example JSON config (Array mode with Basic auth)

{
  "url": "https://api.example.com/bulk",
  "method": "POST",
  "auth": {
    "type": "basic",
    "config": {
      "username": "ingest-user",
      "password": "s3cret"
    }
  },
  "batch_mode": {
    "type": "Array"
  },
  "max_retries": 2,
  "concurrency": 1,
  "batch_size": 500
}

Example .env file

HTTP_SINK_URL=https://api.example.com/ingest
HTTP_SINK_METHOD=POST
HTTP_SINK_AUTH='{"type":"bearer","config":{"token":"my-api-token"}}'
HTTP_SINK_BATCH_MODE='{"type":"Individual"}'
HTTP_SINK_MAX_RETRIES=3
HTTP_SINK_CONCURRENCY=10

Config Schema Introspection

use faucet_core::Sink;

let sink = HttpSink::new(config);
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Pipeline Usage

use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_http::{HttpSink, HttpSinkConfig, HttpBatchMode};

let source = RestStream::new(
    RestStreamConfig::new("https://source-api.example.com", "/v1/events")
);

let sink = HttpSink::new(
    HttpSinkConfig::new("https://dest-api.example.com/ingest")
        .batch_mode(HttpBatchMode::Array)
        .max_retries(3)
);

let result = Pipeline::new(source, sink).run().await?;
println!("Forwarded {} records", result.records_written);

Examples

Individual mode -- one request per record with concurrency

let config = HttpSinkConfig::new("https://webhooks.example.com/event")
    .auth(HttpSinkAuth::Bearer { token: "webhook-token".into() })
    .batch_mode(HttpBatchMode::Individual)
    .concurrency(20)
    .max_retries(2);

let sink = HttpSink::new(config);

let records = vec![
    json!({"event": "user.created", "user_id": "u1"}),
    json!({"event": "user.updated", "user_id": "u2"}),
    json!({"event": "order.placed", "order_id": "o1"}),
];

// All 3 records are sent concurrently (up to 20 at a time)
sink.write_batch(&records).await?;

Array mode -- bulk send as JSON array

let config = HttpSinkConfig::new("https://api.example.com/bulk-ingest")
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3);

let sink = HttpSink::new(config);

let records = vec![
    json!({"metric": "cpu_usage", "value": 0.85}),
    json!({"metric": "memory_usage", "value": 0.72}),
];

// Sends: POST with body [{"metric":"cpu_usage","value":0.85},{"metric":"memory_usage","value":0.72}]
sink.write_batch(&records).await?;

Custom HTTP method and headers

use reqwest::header::{HeaderMap, HeaderValue};

let mut headers = HeaderMap::new();
headers.insert("X-Custom-Header", HeaderValue::from_static("my-value"));

let config = HttpSinkConfig::new("https://api.example.com/data")
    .method(reqwest::Method::PUT)
    .headers(headers)
    .auth(HttpSinkAuth::Basic {
        username: "api-user".into(),
        password: "api-pass".into(),
    });

let sink = HttpSink::new(config);

How It Works

  • The HTTP client is created in HttpSink::new() and reused across all requests.
  • In Individual mode, each record is sent as a separate HTTP request. Requests are executed concurrently (bounded by concurrency). In write_batch the first error aborts the batch; when a DLQ is configured the sink instead reports per-row outcomes (write_batch_partial), attempting every record and dead-lettering only the ones that actually failed — so already-delivered rows are never duplicated against a non-idempotent endpoint.
  • In Array mode, all records are collected into a JSON array and sent as a single request body. A failure can't be attributed to specific rows, so the whole array surfaces as an error (the DLQ on_batch_error policy decides whether to abort or dead-letter the batch).
  • Retry logic: on transient failures (network errors, retriable HTTP status codes), the request is retried up to max_retries times. Non-retriable errors (4xx client errors) are returned immediately.
  • Authentication and custom headers are applied to every request.
  • HTTP responses are validated using check_http_response() from faucet-core, which checks status codes and returns structured errors.

License

Licensed under MIT or Apache-2.0.