use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
use reqwest::header::HeaderMap;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", content = "config", rename_all = "snake_case")]
pub enum HttpSinkAuth {
None,
Bearer {
token: String,
},
Basic {
username: String,
password: String,
},
Custom {
headers: HashMap<String, String>,
},
}
impl std::fmt::Debug for HttpSinkAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "None"),
Self::Bearer { .. } => f.debug_tuple("Bearer").field(&"***").finish(),
Self::Basic { username, .. } => f
.debug_struct("Basic")
.field("username", username)
.field("password", &"***")
.finish(),
Self::Custom { .. } => f.debug_tuple("Custom").field(&"***").finish(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum HttpBatchMode {
Individual,
Array,
}
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct HttpSinkConfig {
pub url: String,
#[serde(with = "crate::serde_helpers::http_method")]
#[schemars(with = "String")]
pub method: reqwest::Method,
#[serde(skip, default)]
pub headers: HeaderMap,
pub auth: AuthSpec<HttpSinkAuth>,
pub batch_mode: HttpBatchMode,
pub max_retries: usize,
pub concurrency: usize,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl std::fmt::Debug for HttpSinkConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpSinkConfig")
.field("url", &self.url)
.field("method", &self.method)
.field("headers", &self.headers)
.field("auth", &self.auth)
.field("batch_mode", &self.batch_mode)
.field("max_retries", &self.max_retries)
.field("concurrency", &self.concurrency)
.field("batch_size", &self.batch_size)
.finish()
}
}
impl HttpSinkConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
auth: AuthSpec::Inline(HttpSinkAuth::None),
batch_mode: HttpBatchMode::Individual,
max_retries: 0,
concurrency: 10,
batch_size: DEFAULT_BATCH_SIZE,
}
}
pub fn method(mut self, method: reqwest::Method) -> Self {
self.method = method;
self
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
pub fn auth(mut self, auth: HttpSinkAuth) -> Self {
self.auth = AuthSpec::Inline(auth);
self
}
pub fn batch_mode(mut self, mode: HttpBatchMode) -> Self {
self.batch_mode = mode;
self
}
pub fn max_retries(mut self, retries: usize) -> Self {
self.max_retries = retries;
self
}
pub fn concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = HttpSinkConfig::new("https://api.example.com/ingest");
assert_eq!(config.url, "https://api.example.com/ingest");
assert_eq!(config.method, reqwest::Method::POST);
assert_eq!(config.max_retries, 0);
assert!(matches!(config.auth, AuthSpec::Inline(HttpSinkAuth::None)));
assert!(matches!(config.batch_mode, HttpBatchMode::Individual));
}
#[test]
fn builder_methods() {
let config = HttpSinkConfig::new("https://api.example.com/ingest")
.method(reqwest::Method::PUT)
.auth(HttpSinkAuth::Bearer {
token: "token123".into(),
})
.batch_mode(HttpBatchMode::Array)
.max_retries(3);
assert_eq!(config.method, reqwest::Method::PUT);
assert_eq!(config.max_retries, 3);
assert!(matches!(config.batch_mode, HttpBatchMode::Array));
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = HttpSinkConfig::new("https://api.example.com/ingest");
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(250);
assert_eq!(config.batch_size, 250);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config = HttpSinkConfig::new("https://api.example.com/ingest").with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config = HttpSinkConfig::new("https://api.example.com/ingest")
.with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"url": "https://api.example.com/ingest",
"method": "POST",
"auth": {"type": "none"},
"batch_mode": {"type": "Array"},
"max_retries": 0,
"concurrency": 10,
"batch_size": 250
}"#;
let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
}
#[test]
fn batch_size_defaults_when_absent_from_json() {
let json = r#"{
"url": "https://api.example.com/ingest",
"method": "POST",
"auth": {"type": "none"},
"batch_mode": {"type": "Array"},
"max_retries": 0,
"concurrency": 10
}"#;
let config: HttpSinkConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn auth_debug_masks_secrets() {
let bearer = HttpSinkAuth::Bearer {
token: "secret-token".into(),
};
let debug = format!("{bearer:?}");
assert!(debug.contains("***"));
assert!(!debug.contains("secret-token"));
let basic = HttpSinkAuth::Basic {
username: "user".into(),
password: "secret-pass".into(),
};
let debug = format!("{basic:?}");
assert!(debug.contains("user"));
assert!(debug.contains("***"));
assert!(!debug.contains("secret-pass"));
let custom = HttpSinkAuth::Custom {
headers: HashMap::new(),
};
let debug = format!("{custom:?}");
assert!(debug.contains("***"));
}
}