faucet-source-webhook 1.0.0

Webhook receiver source connector for the faucet-stream ecosystem
Documentation

faucet-source-webhook

Crates.io Docs.rs

A webhook receiver source that starts a temporary HTTP server and collects incoming POST payloads as JSON records.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-webhook = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-webhook"] }

Quick Start

use faucet_source_webhook::{WebhookSource, WebhookSourceConfig};
use faucet_core::Source;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = WebhookSourceConfig::new()
        .listen_addr("0.0.0.0:8080")
        .path("/webhook")
        .max_payloads(10)
        .timeout_secs(60);

    let source = WebhookSource::new(config);
    // Blocks until timeout or max_payloads is reached
    let records = source.fetch_all().await?;

    println!("Received {} webhook payloads", records.len());
    Ok(())
}

How It Works

When fetch_all() is called, the source:

  1. Starts an HTTP server on the configured address using axum
  2. Listens for incoming POST requests on the configured path
  3. Parses each request body as JSON (falls back to a plain string if not valid JSON)
  4. Collects payloads until either the timeout expires or max_payloads is reached
  5. Shuts down the server and returns all collected payloads as records

The server responds with 200 OK to valid requests and 400 Bad Request for non-UTF-8 bodies.

Configuration

WebhookSourceConfig

Field Type Default Description
listen_addr String "127.0.0.1:8080" Address to bind the HTTP server to. Defaults to loopback; bind 0.0.0.0 only behind a trusted gateway.
path String "/webhook" Endpoint path for receiving webhooks
max_payloads Option<usize> None Stop after receiving this many payloads. None means collect until timeout
timeout_secs u64 30 How long to listen before returning, in seconds
max_body_bytes usize 1048576 Max accepted request body size (1 MiB). Larger POSTs are rejected with 413 so one huge request can't exhaust memory.
auth_token Option<String> None Optional shared secret. When set, requests must send it in the Authorization header (raw or Bearer <token>); others get 401. The comparison is constant-time (no timing side-channel). Strongly recommended whenever listen_addr isn't loopback.
batch_size usize 1000 Records per emitted StreamPage. 0 is the "no batching" sentinel — emit the full flush window in one page. See Streaming and batching

Streaming and batching

The webhook source collects POST requests into a per-flush in-memory buffer during its receive window (bounded by timeout_secs and, optionally, max_payloads). It has no native streaming primitive — Source::stream_pages falls back to the default trait implementation, which buffers the full flush window and then chunks it into pages of batch_size records.

  • batch_size is honoured for chunking the flush that is handed downstream, but does not change how POSTs are buffered server-side: the HTTP handler always pushes into the in-process Vec until the receive window closes.
  • batch_size = 0 is functionally equivalent to any positive value larger than the received payload count for this source — both emit one page containing every collected record.
  • Flushes are bounded by the receive-window timer (timeout_secs) and max_payloads, configured separately on the source. Tune those if you need to bound memory at request-receive time; batch_size only shapes the downstream page size.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_webhook::WebhookSourceConfig;

let config: WebhookSourceConfig = load_json("config.json")?;
let config: WebhookSourceConfig = load_env_file(".env", "WEBHOOK")?;

Example JSON config

{
  "listen_addr": "0.0.0.0:9090",
  "path": "/hooks/incoming",
  "max_payloads": 100,
  "timeout_secs": 120,
  "batch_size": 1000
}

Example .env file

WEBHOOK_LISTEN_ADDR=0.0.0.0:8080
WEBHOOK_PATH=/webhook
WEBHOOK_MAX_PAYLOADS=50
WEBHOOK_TIMEOUT_SECS=60

Config Schema Introspection

use faucet_core::Source;

let source = WebhookSource::new(config);
let schema = source.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Collect a fixed number of webhooks

use faucet_source_webhook::{WebhookSource, WebhookSourceConfig};
use faucet_core::Source;

let config = WebhookSourceConfig::new()
    .listen_addr("127.0.0.1:8080")
    .path("/events")
    .max_payloads(5)
    .timeout_secs(300);

let source = WebhookSource::new(config);
// Server runs until 5 payloads are received or 5 minutes elapse
let events = source.fetch_all().await?;

Time-limited webhook collection

use faucet_source_webhook::{WebhookSource, WebhookSourceConfig};
use faucet_core::Source;

// Collect all webhooks received within 30 seconds
let config = WebhookSourceConfig::new()
    .timeout_secs(30);

let source = WebhookSource::new(config);
let payloads = source.fetch_all().await?;
println!("Collected {} payloads in 30 seconds", payloads.len());

Using with a Pipeline

use faucet_source_webhook::{WebhookSource, WebhookSourceConfig};
use faucet_core::{Pipeline, Source, Sink};

let config = WebhookSourceConfig::new()
    .listen_addr("0.0.0.0:9090")
    .path("/github-webhooks")
    .max_payloads(100)
    .timeout_secs(3600);

let source = WebhookSource::new(config);
let pipeline = Pipeline::new(Box::new(source), Box::new(my_sink));
let result = pipeline.run().await?;
println!("Processed {} webhook events", result.records_written);

Payload Handling

  • JSON bodies are parsed and stored as native JSON values
  • Plain text bodies (non-JSON) are stored as JSON strings
  • Non-UTF-8 bodies receive a 400 Bad Request response and are not stored

License

Licensed under MIT or Apache-2.0.