faucet-source-webhook
A webhook receiver source that starts a temporary HTTP server and collects incoming POST payloads as JSON records.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-webhook"] }
Quick Start
use ;
use Source;
async
How It Works
When fetch_all() is called, the source:
- Starts an HTTP server on the configured address using axum
- Listens for incoming POST requests on the configured path
- Parses each request body as JSON (falls back to a plain string if not valid JSON)
- Collects payloads until either the timeout expires or
max_payloadsis reached - Shuts down the server and returns all collected payloads as records
The server responds with 200 OK to valid requests and 400 Bad Request for non-UTF-8 bodies.
Configuration
WebhookSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
listen_addr |
String |
"127.0.0.1:8080" |
Address to bind the HTTP server to. Defaults to loopback; bind 0.0.0.0 only behind a trusted gateway. |
path |
String |
"/webhook" |
Endpoint path for receiving webhooks |
max_payloads |
Option<usize> |
None |
Stop after receiving this many payloads. None means collect until timeout |
timeout_secs |
u64 |
30 |
How long to listen before returning, in seconds |
max_body_bytes |
usize |
1048576 |
Max accepted request body size (1 MiB). Larger POSTs are rejected with 413 so one huge request can't exhaust memory. |
auth_token |
Option<String> |
None |
Optional shared secret. When set, requests must send it in the Authorization header (raw or Bearer <token>); others get 401. The comparison is constant-time (no timing side-channel). Strongly recommended whenever listen_addr isn't loopback. |
batch_size |
usize |
1000 |
Records per emitted StreamPage. 0 is the "no batching" sentinel — emit the full flush window in one page. See Streaming and batching |
Streaming and batching
The webhook source collects POST requests into a per-flush in-memory buffer during its receive window (bounded by timeout_secs and, optionally, max_payloads). It has no native streaming primitive — Source::stream_pages falls back to the default trait implementation, which buffers the full flush window and then chunks it into pages of batch_size records.
batch_sizeis honoured for chunking the flush that is handed downstream, but does not change how POSTs are buffered server-side: the HTTP handler always pushes into the in-processVecuntil the receive window closes.batch_size = 0is functionally equivalent to any positive value larger than the received payload count for this source — both emit one page containing every collected record.- Flushes are bounded by the receive-window timer (
timeout_secs) andmax_payloads, configured separately on the source. Tune those if you need to bound memory at request-receive time;batch_sizeonly shapes the downstream page size.
Config Loading
use ;
use WebhookSourceConfig;
let config: WebhookSourceConfig = load_json?;
let config: WebhookSourceConfig = load_env_file?;
Example JSON config
Example .env file
WEBHOOK_LISTEN_ADDR=0.0.0.0:8080
WEBHOOK_PATH=/webhook
WEBHOOK_MAX_PAYLOADS=50
WEBHOOK_TIMEOUT_SECS=60
Config Schema Introspection
use Source;
let source = new;
let schema = source.config_schema;
println!;
Examples
Collect a fixed number of webhooks
use ;
use Source;
let config = new
.listen_addr
.path
.max_payloads
.timeout_secs;
let source = new;
// Server runs until 5 payloads are received or 5 minutes elapse
let events = source.fetch_all.await?;
Time-limited webhook collection
use ;
use Source;
// Collect all webhooks received within 30 seconds
let config = new
.timeout_secs;
let source = new;
let payloads = source.fetch_all.await?;
println!;
Using with a Pipeline
use ;
use ;
let config = new
.listen_addr
.path
.max_payloads
.timeout_secs;
let source = new;
let pipeline = new;
let result = pipeline.run.await?;
println!;
Payload Handling
- JSON bodies are parsed and stored as native JSON values
- Plain text bodies (non-JSON) are stored as JSON strings
- Non-UTF-8 bodies receive a
400 Bad Requestresponse and are not stored
License
Licensed under MIT or Apache-2.0.