Skip to main content

faucet_source_webhook/
config.rs

1//! Webhook source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the webhook receiver source.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(default)]
10pub struct WebhookSourceConfig {
11    /// Address to bind the HTTP server to (default: `"127.0.0.1:8080"`).
12    ///
13    /// **Security:** the default binds to loopback only. Binding to
14    /// `0.0.0.0` exposes the receiver to the whole network — only do so
15    /// behind a trusted gateway, and set `auth_token` to require a shared
16    /// secret on every request.
17    pub listen_addr: String,
18    /// Endpoint path for receiving webhooks (default: `"/webhook"`).
19    pub path: String,
20    /// Stop after receiving this many payloads.
21    pub max_payloads: Option<usize>,
22    /// How long to listen before returning, in seconds (default: 30).
23    pub timeout_secs: u64,
24    /// Maximum accepted request body size in bytes (default: 1 MiB). Larger
25    /// POSTs are rejected with `413 Payload Too Large` so a single huge
26    /// request can't exhaust memory.
27    pub max_body_bytes: usize,
28    /// Optional shared secret. When set, every request must carry it in the
29    /// `Authorization` header (either the raw token or `Bearer <token>`);
30    /// requests without it are rejected with `401 Unauthorized`. When `None`
31    /// (default) the endpoint is unauthenticated.
32    pub auth_token: Option<String>,
33    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). The webhook
34    /// source has no native streaming primitive — it accumulates incoming POSTs
35    /// into an in-memory buffer during the receive window, then the default
36    /// [`Source::stream_pages`](faucet_core::Source::stream_pages) impl chunks
37    /// that buffer into pages of this size. Defaults to [`DEFAULT_BATCH_SIZE`].
38    ///
39    /// `batch_size = 0` is the "no batching" sentinel: the entire flush window
40    /// is emitted in a single page. For this source it is functionally
41    /// equivalent to any positive value larger than the received payload count
42    /// — the server-side buffering behaviour does not change.
43    #[serde(default = "default_batch_size")]
44    pub batch_size: usize,
45}
46
47fn default_batch_size() -> usize {
48    DEFAULT_BATCH_SIZE
49}
50
51impl Default for WebhookSourceConfig {
52    fn default() -> Self {
53        Self {
54            listen_addr: "127.0.0.1:8080".into(),
55            path: "/webhook".into(),
56            max_payloads: None,
57            timeout_secs: 30,
58            max_body_bytes: 1024 * 1024,
59            auth_token: None,
60            batch_size: DEFAULT_BATCH_SIZE,
61        }
62    }
63}
64
65impl WebhookSourceConfig {
66    /// Create a new config with sensible defaults.
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Set the listen address.
72    pub fn listen_addr(mut self, addr: impl Into<String>) -> Self {
73        self.listen_addr = addr.into();
74        self
75    }
76
77    /// Set the webhook endpoint path.
78    pub fn path(mut self, path: impl Into<String>) -> Self {
79        self.path = path.into();
80        self
81    }
82
83    /// Stop after receiving this many payloads.
84    pub fn max_payloads(mut self, max: usize) -> Self {
85        self.max_payloads = Some(max);
86        self
87    }
88
89    /// Set the timeout in seconds.
90    pub fn timeout_secs(mut self, secs: u64) -> Self {
91        self.timeout_secs = secs;
92        self
93    }
94
95    /// Set the maximum accepted request body size in bytes.
96    pub fn max_body_bytes(mut self, bytes: usize) -> Self {
97        self.max_body_bytes = bytes;
98        self
99    }
100
101    /// Require a shared-secret token in the `Authorization` header.
102    pub fn auth_token(mut self, token: impl Into<String>) -> Self {
103        self.auth_token = Some(token.into());
104        self
105    }
106
107    /// Set the per-page record count for
108    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
109    ///
110    /// Pass `0` to opt out of batching — the entire flush window is emitted in
111    /// a single [`StreamPage`](faucet_core::StreamPage). The webhook source's
112    /// server-side buffering is unaffected; only the downstream chunking
113    /// changes.
114    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
115        self.batch_size = batch_size;
116        self
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn default_config() {
126        let config = WebhookSourceConfig::new();
127        assert_eq!(
128            config.listen_addr, "127.0.0.1:8080",
129            "default must bind loopback only, not 0.0.0.0"
130        );
131        assert_eq!(config.path, "/webhook");
132        assert!(config.max_payloads.is_none());
133        assert_eq!(config.timeout_secs, 30);
134        assert_eq!(config.max_body_bytes, 1024 * 1024);
135        assert!(config.auth_token.is_none());
136    }
137
138    #[test]
139    fn builder_methods() {
140        let config = WebhookSourceConfig::new()
141            .listen_addr("127.0.0.1:9090")
142            .path("/hooks/incoming")
143            .max_payloads(10)
144            .timeout_secs(60);
145        assert_eq!(config.listen_addr, "127.0.0.1:9090");
146        assert_eq!(config.path, "/hooks/incoming");
147        assert_eq!(config.max_payloads, Some(10));
148        assert_eq!(config.timeout_secs, 60);
149    }
150
151    #[test]
152    fn batch_size_defaults_to_default_batch_size() {
153        let config = WebhookSourceConfig::new();
154        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
155    }
156
157    #[test]
158    fn with_batch_size_overrides_default() {
159        let config = WebhookSourceConfig::new().with_batch_size(250);
160        assert_eq!(config.batch_size, 250);
161    }
162
163    #[test]
164    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
165        let config = WebhookSourceConfig::new().with_batch_size(0);
166        assert_eq!(config.batch_size, 0);
167        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
168    }
169
170    #[test]
171    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
172        let config = WebhookSourceConfig::new().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
173        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
174    }
175
176    #[test]
177    fn batch_size_deserializes_from_json() {
178        let json = r#"{
179            "listen_addr": "127.0.0.1:8080",
180            "path": "/webhook",
181            "timeout_secs": 30,
182            "batch_size": 500
183        }"#;
184        let config: WebhookSourceConfig = serde_json::from_str(json).unwrap();
185        assert_eq!(config.batch_size, 500);
186    }
187}