faucet-sink-http 0.2.0

HTTP POST sink connector for the faucet-stream ecosystem
Documentation

faucet-sink-http

Crates.io Docs.rs

HTTP POST sink connector for the faucet-stream ecosystem.

Sends JSON records to an HTTP endpoint. Supports two batch modes: Individual (one request per record with concurrent execution) and Array (all records in a single request as a JSON array). Includes configurable authentication, retry logic for transient failures, and concurrency control via semaphores.

Installation

[dependencies]
faucet-sink-http = "0.1"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "0.2", features = ["sink-http"] }

Quick Start

use faucet_sink_http::{HttpSink, HttpSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = HttpSinkConfig::new("https://api.example.com/ingest")
        .max_retries(3)
        .concurrency(10);

    let sink = HttpSink::new(config);

    let records = vec![
        json!({"user_id": "u123", "event": "signup"}),
        json!({"user_id": "u456", "event": "login"}),
    ];

    let written = sink.write_batch(&records).await?;
    println!("Sent {written} records");

    Ok(())
}

Configuration

Field Type Default Description
url String (required) Target endpoint URL
method reqwest::Method POST HTTP method to use
headers HeaderMap empty Additional request headers (not serializable; set via builder only)
auth HttpSinkAuth None Authentication method (see below)
batch_mode HttpBatchMode Individual How to batch records in requests (see below)
max_retries usize 0 Number of retries on transient failures
concurrency usize 10 Maximum number of concurrent requests in Individual mode

Authentication (HttpSinkAuth)

Variant Fields Description
None -- No authentication
Bearer String Bearer token in the Authorization header
Basic username: String, password: String HTTP Basic authentication
Custom HeaderMap Custom headers for authentication (e.g. API keys). Not serializable; set via builder only.

The Debug implementation masks tokens and passwords with *** to prevent credential leakage in logs.

Batch Modes (HttpBatchMode)

Mode Description
Individual Send one HTTP request per record. Requests are executed concurrently up to the concurrency limit using a semaphore.
Array Send all records as a JSON array in a single HTTP request.

Retry Behavior

When max_retries > 0, the sink retries requests that fail with retriable errors (network errors, 5xx status codes, etc.). Each retry is immediate (no backoff). After exhausting all retries, the last error is returned.

Builder Methods

use faucet_sink_http::{HttpSinkConfig, HttpSinkAuth, HttpBatchMode};

let config = HttpSinkConfig::new("https://api.example.com/ingest")
    .method(reqwest::Method::PUT)
    .auth(HttpSinkAuth::Bearer("my-token".into()))
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3)
    .concurrency(20);

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_sink_http::HttpSinkConfig;

// From a JSON file
let config: HttpSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: HttpSinkConfig = load_env_file(".env", "HTTP_SINK")?;

Note: The headers field and Custom auth variant use HeaderMap which is not serializable. These must be set programmatically via builder methods.

Example JSON config (Individual mode with Bearer auth)

{
  "url": "https://api.example.com/ingest",
  "method": "POST",
  "auth": {
    "type": "Bearer",
    "0": "my-api-token"
  },
  "batch_mode": {
    "type": "Individual"
  },
  "max_retries": 3,
  "concurrency": 10
}

Example JSON config (Array mode with Basic auth)

{
  "url": "https://api.example.com/bulk",
  "method": "POST",
  "auth": {
    "type": "Basic",
    "username": "ingest-user",
    "password": "s3cret"
  },
  "batch_mode": {
    "type": "Array"
  },
  "max_retries": 2,
  "concurrency": 1
}

Example .env file

HTTP_SINK_URL=https://api.example.com/ingest
HTTP_SINK_METHOD=POST
HTTP_SINK_AUTH='{"type":"Bearer","0":"my-api-token"}'
HTTP_SINK_BATCH_MODE='{"type":"Individual"}'
HTTP_SINK_MAX_RETRIES=3
HTTP_SINK_CONCURRENCY=10

Config Schema Introspection

use faucet_core::Sink;

let sink = HttpSink::new(config);
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Pipeline Usage

use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_http::{HttpSink, HttpSinkConfig, HttpBatchMode};

let source = RestStream::new(
    RestStreamConfig::new("https://source-api.example.com", "/v1/events")
);

let sink = HttpSink::new(
    HttpSinkConfig::new("https://dest-api.example.com/ingest")
        .batch_mode(HttpBatchMode::Array)
        .max_retries(3)
);

let result = Pipeline::new(source, sink).run().await?;
println!("Forwarded {} records", result.records_written);

Examples

Individual mode -- one request per record with concurrency

let config = HttpSinkConfig::new("https://webhooks.example.com/event")
    .auth(HttpSinkAuth::Bearer("webhook-token".into()))
    .batch_mode(HttpBatchMode::Individual)
    .concurrency(20)
    .max_retries(2);

let sink = HttpSink::new(config);

let records = vec![
    json!({"event": "user.created", "user_id": "u1"}),
    json!({"event": "user.updated", "user_id": "u2"}),
    json!({"event": "order.placed", "order_id": "o1"}),
];

// All 3 records are sent concurrently (up to 20 at a time)
sink.write_batch(&records).await?;

Array mode -- bulk send as JSON array

let config = HttpSinkConfig::new("https://api.example.com/bulk-ingest")
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3);

let sink = HttpSink::new(config);

let records = vec![
    json!({"metric": "cpu_usage", "value": 0.85}),
    json!({"metric": "memory_usage", "value": 0.72}),
];

// Sends: POST with body [{"metric":"cpu_usage","value":0.85},{"metric":"memory_usage","value":0.72}]
sink.write_batch(&records).await?;

Custom HTTP method and headers

use reqwest::header::{HeaderMap, HeaderValue};

let mut headers = HeaderMap::new();
headers.insert("X-Custom-Header", HeaderValue::from_static("my-value"));

let config = HttpSinkConfig::new("https://api.example.com/data")
    .method(reqwest::Method::PUT)
    .headers(headers)
    .auth(HttpSinkAuth::Basic {
        username: "api-user".into(),
        password: "api-pass".into(),
    });

let sink = HttpSink::new(config);

How It Works

  • The HTTP client is created in HttpSink::new() and reused across all requests.
  • In Individual mode, each record is sent as a separate HTTP request. Requests are executed concurrently using a tokio::sync::Semaphore to limit the number of in-flight requests to the concurrency value. All requests must succeed; the first error aborts the batch via try_join_all.
  • In Array mode, all records are collected into a JSON array and sent as a single request body.
  • Retry logic: on transient failures (network errors, retriable HTTP status codes), the request is retried up to max_retries times. Non-retriable errors (4xx client errors) are returned immediately.
  • Authentication and custom headers are applied to every request.
  • HTTP responses are validated using check_http_response() from faucet-core, which checks status codes and returns structured errors.

License

Licensed under MIT or Apache-2.0.