faucet-sink-http 0.2.0

HTTP POST sink connector for the faucet-stream ecosystem
Documentation
# faucet-sink-http

[![Crates.io](https://img.shields.io/crates/v/faucet-sink-http.svg)](https://crates.io/crates/faucet-sink-http)
[![Docs.rs](https://docs.rs/faucet-sink-http/badge.svg)](https://docs.rs/faucet-sink-http)

HTTP POST sink connector for the [faucet-stream](https://github.com/PawanSikawat/faucet-stream) ecosystem.

Sends JSON records to an HTTP endpoint. Supports two batch modes: Individual (one request per record with concurrent execution) and Array (all records in a single request as a JSON array). Includes configurable authentication, retry logic for transient failures, and concurrency control via semaphores.

## Installation

```toml
[dependencies]
faucet-sink-http = "0.1"
tokio = { version = "1", features = ["full"] }
```

Or via the umbrella crate:

```toml
faucet-stream = { version = "0.2", features = ["sink-http"] }
```

## Quick Start

```rust
use faucet_sink_http::{HttpSink, HttpSinkConfig};
use faucet_core::Sink;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = HttpSinkConfig::new("https://api.example.com/ingest")
        .max_retries(3)
        .concurrency(10);

    let sink = HttpSink::new(config);

    let records = vec![
        json!({"user_id": "u123", "event": "signup"}),
        json!({"user_id": "u456", "event": "login"}),
    ];

    let written = sink.write_batch(&records).await?;
    println!("Sent {written} records");

    Ok(())
}
```

## Configuration

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `url` | `String` | *(required)* | Target endpoint URL |
| `method` | `reqwest::Method` | `POST` | HTTP method to use |
| `headers` | `HeaderMap` | empty | Additional request headers (not serializable; set via builder only) |
| `auth` | `HttpSinkAuth` | `None` | Authentication method (see below) |
| `batch_mode` | `HttpBatchMode` | `Individual` | How to batch records in requests (see below) |
| `max_retries` | `usize` | `0` | Number of retries on transient failures |
| `concurrency` | `usize` | `10` | Maximum number of concurrent requests in Individual mode |

### Authentication (`HttpSinkAuth`)

| Variant | Fields | Description |
|---------|--------|-------------|
| `None` | -- | No authentication |
| `Bearer` | `String` | Bearer token in the Authorization header |
| `Basic` | `username: String`, `password: String` | HTTP Basic authentication |
| `Custom` | `HeaderMap` | Custom headers for authentication (e.g. API keys). Not serializable; set via builder only. |

The `Debug` implementation masks tokens and passwords with `***` to prevent credential leakage in logs.

### Batch Modes (`HttpBatchMode`)

| Mode | Description |
|------|-------------|
| `Individual` | Send one HTTP request per record. Requests are executed concurrently up to the `concurrency` limit using a semaphore. |
| `Array` | Send all records as a JSON array in a single HTTP request. |

### Retry Behavior

When `max_retries > 0`, the sink retries requests that fail with retriable errors (network errors, 5xx status codes, etc.). Each retry is immediate (no backoff). After exhausting all retries, the last error is returned.

### Builder Methods

```rust
use faucet_sink_http::{HttpSinkConfig, HttpSinkAuth, HttpBatchMode};

let config = HttpSinkConfig::new("https://api.example.com/ingest")
    .method(reqwest::Method::PUT)
    .auth(HttpSinkAuth::Bearer("my-token".into()))
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3)
    .concurrency(20);
```

## Config Loading

```rust
use faucet_core::config::{load_json, load_env_file};
use faucet_sink_http::HttpSinkConfig;

// From a JSON file
let config: HttpSinkConfig = load_json("config.json")?;

// From an .env file with a prefix
let config: HttpSinkConfig = load_env_file(".env", "HTTP_SINK")?;
```

Note: The `headers` field and `Custom` auth variant use `HeaderMap` which is not serializable. These must be set programmatically via builder methods.

### Example JSON config (Individual mode with Bearer auth)

```json
{
  "url": "https://api.example.com/ingest",
  "method": "POST",
  "auth": {
    "type": "Bearer",
    "0": "my-api-token"
  },
  "batch_mode": {
    "type": "Individual"
  },
  "max_retries": 3,
  "concurrency": 10
}
```

### Example JSON config (Array mode with Basic auth)

```json
{
  "url": "https://api.example.com/bulk",
  "method": "POST",
  "auth": {
    "type": "Basic",
    "username": "ingest-user",
    "password": "s3cret"
  },
  "batch_mode": {
    "type": "Array"
  },
  "max_retries": 2,
  "concurrency": 1
}
```

### Example .env file

```env
HTTP_SINK_URL=https://api.example.com/ingest
HTTP_SINK_METHOD=POST
HTTP_SINK_AUTH='{"type":"Bearer","0":"my-api-token"}'
HTTP_SINK_BATCH_MODE='{"type":"Individual"}'
HTTP_SINK_MAX_RETRIES=3
HTTP_SINK_CONCURRENCY=10
```

## Config Schema Introspection

```rust
use faucet_core::Sink;

let sink = HttpSink::new(config);
let schema = sink.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);
```

## Pipeline Usage

```rust
use faucet_core::Pipeline;
use faucet_source_rest::{RestStream, RestStreamConfig};
use faucet_sink_http::{HttpSink, HttpSinkConfig, HttpBatchMode};

let source = RestStream::new(
    RestStreamConfig::new("https://source-api.example.com", "/v1/events")
);

let sink = HttpSink::new(
    HttpSinkConfig::new("https://dest-api.example.com/ingest")
        .batch_mode(HttpBatchMode::Array)
        .max_retries(3)
);

let result = Pipeline::new(source, sink).run().await?;
println!("Forwarded {} records", result.records_written);
```

## Examples

### Individual mode -- one request per record with concurrency

```rust
let config = HttpSinkConfig::new("https://webhooks.example.com/event")
    .auth(HttpSinkAuth::Bearer("webhook-token".into()))
    .batch_mode(HttpBatchMode::Individual)
    .concurrency(20)
    .max_retries(2);

let sink = HttpSink::new(config);

let records = vec![
    json!({"event": "user.created", "user_id": "u1"}),
    json!({"event": "user.updated", "user_id": "u2"}),
    json!({"event": "order.placed", "order_id": "o1"}),
];

// All 3 records are sent concurrently (up to 20 at a time)
sink.write_batch(&records).await?;
```

### Array mode -- bulk send as JSON array

```rust
let config = HttpSinkConfig::new("https://api.example.com/bulk-ingest")
    .batch_mode(HttpBatchMode::Array)
    .max_retries(3);

let sink = HttpSink::new(config);

let records = vec![
    json!({"metric": "cpu_usage", "value": 0.85}),
    json!({"metric": "memory_usage", "value": 0.72}),
];

// Sends: POST with body [{"metric":"cpu_usage","value":0.85},{"metric":"memory_usage","value":0.72}]
sink.write_batch(&records).await?;
```

### Custom HTTP method and headers

```rust
use reqwest::header::{HeaderMap, HeaderValue};

let mut headers = HeaderMap::new();
headers.insert("X-Custom-Header", HeaderValue::from_static("my-value"));

let config = HttpSinkConfig::new("https://api.example.com/data")
    .method(reqwest::Method::PUT)
    .headers(headers)
    .auth(HttpSinkAuth::Basic {
        username: "api-user".into(),
        password: "api-pass".into(),
    });

let sink = HttpSink::new(config);
```

## How It Works

- The HTTP client is created in `HttpSink::new()` and reused across all requests.
- In **Individual** mode, each record is sent as a separate HTTP request. Requests are executed concurrently using a `tokio::sync::Semaphore` to limit the number of in-flight requests to the `concurrency` value. All requests must succeed; the first error aborts the batch via `try_join_all`.
- In **Array** mode, all records are collected into a JSON array and sent as a single request body.
- Retry logic: on transient failures (network errors, retriable HTTP status codes), the request is retried up to `max_retries` times. Non-retriable errors (4xx client errors) are returned immediately.
- Authentication and custom headers are applied to every request.
- HTTP responses are validated using `check_http_response()` from `faucet-core`, which checks status codes and returns structured errors.

## License

Licensed under MIT or Apache-2.0.