use reqwest::header::HeaderMap;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum HttpSinkAuth {
None,
Bearer(String),
Basic {
username: String,
password: String,
},
#[serde(skip)]
Custom(HeaderMap),
}
impl std::fmt::Debug for HttpSinkAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "None"),
Self::Bearer(_) => f.debug_tuple("Bearer").field(&"***").finish(),
Self::Basic { username, .. } => f
.debug_struct("Basic")
.field("username", username)
.field("password", &"***")
.finish(),
Self::Custom(_) => f.debug_tuple("Custom").field(&"***").finish(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum HttpBatchMode {
Individual,
Array,
}
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct HttpSinkConfig {
pub url: String,
#[serde(with = "crate::serde_helpers::http_method")]
#[schemars(with = "String")]
pub method: reqwest::Method,
#[serde(skip, default)]
pub headers: HeaderMap,
pub auth: HttpSinkAuth,
pub batch_mode: HttpBatchMode,
pub max_retries: usize,
pub concurrency: usize,
}
impl std::fmt::Debug for HttpSinkConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpSinkConfig")
.field("url", &self.url)
.field("method", &self.method)
.field("headers", &self.headers)
.field("auth", &self.auth)
.field("batch_mode", &self.batch_mode)
.field("max_retries", &self.max_retries)
.field("concurrency", &self.concurrency)
.finish()
}
}
impl HttpSinkConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
method: reqwest::Method::POST,
headers: HeaderMap::new(),
auth: HttpSinkAuth::None,
batch_mode: HttpBatchMode::Individual,
max_retries: 0,
concurrency: 10,
}
}
pub fn method(mut self, method: reqwest::Method) -> Self {
self.method = method;
self
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers;
self
}
pub fn auth(mut self, auth: HttpSinkAuth) -> Self {
self.auth = auth;
self
}
pub fn batch_mode(mut self, mode: HttpBatchMode) -> Self {
self.batch_mode = mode;
self
}
pub fn max_retries(mut self, retries: usize) -> Self {
self.max_retries = retries;
self
}
pub fn concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = HttpSinkConfig::new("https://api.example.com/ingest");
assert_eq!(config.url, "https://api.example.com/ingest");
assert_eq!(config.method, reqwest::Method::POST);
assert_eq!(config.max_retries, 0);
assert!(matches!(config.auth, HttpSinkAuth::None));
assert!(matches!(config.batch_mode, HttpBatchMode::Individual));
}
#[test]
fn builder_methods() {
let config = HttpSinkConfig::new("https://api.example.com/ingest")
.method(reqwest::Method::PUT)
.auth(HttpSinkAuth::Bearer("token123".into()))
.batch_mode(HttpBatchMode::Array)
.max_retries(3);
assert_eq!(config.method, reqwest::Method::PUT);
assert_eq!(config.max_retries, 3);
assert!(matches!(config.batch_mode, HttpBatchMode::Array));
}
#[test]
fn auth_debug_masks_secrets() {
let bearer = HttpSinkAuth::Bearer("secret-token".into());
let debug = format!("{bearer:?}");
assert!(debug.contains("***"));
assert!(!debug.contains("secret-token"));
let basic = HttpSinkAuth::Basic {
username: "user".into(),
password: "secret-pass".into(),
};
let debug = format!("{basic:?}");
assert!(debug.contains("user"));
assert!(debug.contains("***"));
assert!(!debug.contains("secret-pass"));
let custom = HttpSinkAuth::Custom(HeaderMap::new());
let debug = format!("{custom:?}");
assert!(debug.contains("***"));
}
}