use std::collections::HashMap;
use std::net::IpAddr;
use std::time::Duration;
use async_trait::async_trait;
use reqwest::Client;
use reqwest::Url;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::info;
use crate::common::http::AuthConfig;
use crate::common::message::SharedMessage;
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::transform::value::ValueSource;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldMapping {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub from: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<Value>,
}
struct CompiledField {
name: String,
source: ValueSource,
}
fn default_method() -> String {
"POST".to_string()
}
fn default_timeout() -> Duration {
Duration::from_secs(30)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpClientSinkConfig {
pub url: String,
#[serde(default = "default_method")]
pub method: String,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default = "default_timeout", with = "humantime_serde")]
pub timeout: Duration,
#[serde(default)]
pub fields: Option<Vec<FieldMapping>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auth: Option<AuthConfig>,
}
impl Default for HttpClientSinkConfig {
fn default() -> Self {
Self {
url: String::new(),
method: "POST".to_string(),
headers: HashMap::new(),
timeout: Duration::from_secs(30),
fields: None,
auth: None,
}
}
}
impl HttpClientSinkConfig {
fn validate_and_normalize(&mut self) -> Result<Url> {
if self.url.is_empty() {
return Err(Error::config("http_client sink requires 'url' in config"));
}
self.method = self.method.trim().to_uppercase();
if self.method.is_empty() {
return Err(Error::config(
"http_client sink requires non-empty 'method'",
));
}
const SUPPORTED_METHODS: &[&str] = &["POST", "PUT", "PATCH"];
if !SUPPORTED_METHODS.contains(&self.method.as_str()) {
return Err(Error::config(format!(
"http_client sink has unsupported 'method': {} (supported: POST, PUT, PATCH)",
self.method
)));
}
Url::parse(&self.url)
.map_err(|e| Error::config(format!("http_client sink has invalid 'url': {}", e)))
}
fn build_headers(&self) -> Result<HeaderMap> {
let mut headers = HeaderMap::new();
for (key, value) in &self.headers {
let name = HeaderName::from_bytes(key.as_bytes()).map_err(|e| {
Error::config(format!(
"http_client sink has invalid header name '{}': {}",
key, e
))
})?;
let val = HeaderValue::from_str(value).map_err(|e| {
Error::config(format!(
"http_client sink has invalid header value for '{}': {}",
key, e
))
})?;
headers.insert(name, val);
}
if let Some(ref auth) = self.auth {
auth.apply_to_headers(&mut headers)?;
}
Ok(headers)
}
}
pub struct HttpClientSink {
id: String,
config: HttpClientSinkConfig,
client: Client,
headers: HeaderMap,
fields: Option<Vec<CompiledField>>,
}
impl std::fmt::Debug for HttpClientSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpClientSink")
.field("id", &self.id)
.field("config", &self.config)
.field("has_fields", &self.fields.is_some())
.finish_non_exhaustive()
}
}
impl HttpClientSink {
pub fn new(id: impl Into<String>, config: HttpClientSinkConfig) -> Result<Self> {
let mut config = config;
let parsed_url = config.validate_and_normalize()?;
let headers = config.build_headers()?;
let fields = if let Some(ref field_configs) = config.fields {
if field_configs.is_empty() {
return Err(Error::config(
"http_client sink 'fields' must not be empty when specified",
));
}
Some(Self::compile_fields(field_configs)?)
} else {
None
};
let bypass_proxy = parsed_url
.host_str()
.map(|host| {
if host.eq_ignore_ascii_case("localhost") {
return true;
}
host.parse::<IpAddr>().is_ok_and(|ip| ip.is_loopback())
})
.unwrap_or(false);
let mut builder = Client::builder().timeout(config.timeout);
if cfg!(test) || bypass_proxy {
builder = builder.no_proxy();
}
let client = builder
.build()
.map_err(|e| Error::sink(format!("Failed to create HTTP client: {}", e)))?;
let id = id.into();
info!(
sink_id = %id,
url = %config.url,
method = %config.method,
has_fields = fields.is_some(),
"HTTP client sink created"
);
Ok(Self {
id,
config,
client,
headers,
fields,
})
}
fn compile_fields(mappings: &[FieldMapping]) -> Result<Vec<CompiledField>> {
let mut fields = Vec::with_capacity(mappings.len());
for m in mappings {
let source = ValueSource::compile(m.from.as_deref(), m.value.as_ref())
.map_err(|e| Error::config(format!("Field '{}': {}", m.name, e)))?;
fields.push(CompiledField {
name: m.name.clone(),
source,
});
}
Ok(fields)
}
fn build_body(&self, msg: &SharedMessage) -> Value {
if let Some(ref fields) = self.fields {
let mut obj = serde_json::Map::new();
for field in fields {
let value = field.source.resolve(msg.as_ref());
if value.is_null() && field.source.should_skip_null() {
continue;
}
obj.insert(field.name.clone(), value);
}
Value::Object(obj)
} else {
msg.payload.clone()
}
}
async fn send(&self, body: Value) -> Result<()> {
tracing::debug!(
sink_id = %self.id,
method = %self.config.method,
url = %self.config.url,
"Sending HTTP request"
);
let mut request = match self.config.method.as_str() {
"POST" => self.client.post(&self.config.url),
"PUT" => self.client.put(&self.config.url),
"PATCH" => self.client.patch(&self.config.url),
other => unreachable!(
"Method '{}' should have been validated at construction",
other
),
};
if !self.headers.is_empty() {
request = request.headers(self.headers.clone());
}
request = request.json(&body);
let response = request.send().await.map_err(|e| {
tracing::debug!(
sink_id = %self.id,
method = %self.config.method,
url = %self.config.url,
error = %e,
result = "failed",
"HTTP request failed to send"
);
Error::sink(format!("HTTP request to {} failed: {}", self.config.url, e))
})?;
let status = response.status();
if !status.is_success() {
const MAX_ERROR_BODY_CHARS: usize = 2048;
let body = response
.text()
.await
.unwrap_or_else(|e| format!("<failed to read response body: {}>", e));
let body = truncate_for_error(&body, MAX_ERROR_BODY_CHARS);
tracing::debug!(
sink_id = %self.id,
method = %self.config.method,
url = %self.config.url,
status = %status,
result = "failed",
"HTTP request returned non-success status"
);
return Err(Error::sink(format!(
"HTTP request to {} returned status {}: {}",
self.config.url, status, body
)));
}
tracing::debug!(
sink_id = %self.id,
method = %self.config.method,
url = %self.config.url,
status = %status,
result = "success",
"HTTP request succeeded"
);
Ok(())
}
}
fn truncate_for_error(s: &str, max_chars: usize) -> String {
match s.char_indices().nth(max_chars) {
Some((idx, _)) => format!("{}...(truncated)", &s[..idx]),
None => s.to_string(),
}
}
#[async_trait]
impl Sink for HttpClientSink {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
let body = self.build_body(&msg);
self.send(body).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::message::Message;
use futures::FutureExt;
use serde_json::json;
use std::sync::Arc;
use wiremock::matchers::{body_json, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
async fn start_mock_server() -> Option<MockServer> {
let result = std::panic::AssertUnwindSafe(MockServer::start())
.catch_unwind()
.await;
match result {
Ok(server) => Some(server),
Err(_) => {
eprintln!("Skipping HTTP client sink test: mock server failed to bind");
None
}
}
}
#[test]
fn test_config_default() {
let cfg = HttpClientSinkConfig::default();
assert_eq!(cfg.method, "POST");
assert_eq!(cfg.timeout, Duration::from_secs(30));
assert!(cfg.headers.is_empty());
assert!(cfg.fields.is_none());
}
#[test]
fn test_new_requires_url() {
let cfg = HttpClientSinkConfig::default();
let result = HttpClientSink::new("test", cfg);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("url"));
}
#[test]
fn test_new_with_valid_url() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
..Default::default()
};
let result = HttpClientSink::new("test", cfg);
assert!(result.is_ok());
}
#[test]
fn test_new_normalizes_method() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
method: "put".to_string(),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
assert_eq!(sink.config.method, "PUT");
}
#[test]
fn test_new_rejects_invalid_method() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
method: "GET".to_string(), ..Default::default()
};
let result = HttpClientSink::new("test", cfg);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("unsupported"));
}
#[test]
fn test_new_rejects_empty_fields() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
fields: Some(vec![]),
..Default::default()
};
let result = HttpClientSink::new("test", cfg);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("empty"));
}
#[test]
fn test_config_deserialize_minimal() {
let yaml = r#"
url: "https://api.example.com/webhook"
"#;
let cfg: HttpClientSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.url, "https://api.example.com/webhook");
assert_eq!(cfg.method, "POST");
assert!(cfg.fields.is_none());
}
#[test]
fn test_config_deserialize_with_fields() {
let yaml = r#"
url: "https://api.example.com/webhook"
method: PUT
headers:
Authorization: "Bearer token"
fields:
- name: event_id
value: "$UUID"
- name: user_id
from: "$.data.user_id"
"#;
let cfg: HttpClientSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.method, "PUT");
assert!(cfg.fields.is_some());
let fields = cfg.fields.as_ref().unwrap();
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].name, "event_id");
assert_eq!(fields[1].name, "user_id");
}
#[test]
fn test_build_body_without_fields() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({"key": "value"})));
let body = sink.build_body(&msg);
assert_eq!(body, json!({"key": "value"}));
}
#[test]
fn test_build_body_with_static_fields() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
fields: Some(vec![
FieldMapping {
name: "type".to_string(),
from: None,
value: Some(json!("event")),
},
FieldMapping {
name: "count".to_string(),
from: None,
value: Some(json!(42)),
},
]),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({})));
let body = sink.build_body(&msg);
assert_eq!(body, json!({"type": "event", "count": 42}));
}
#[test]
fn test_build_body_with_jsonpath_fields() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
fields: Some(vec![FieldMapping {
name: "user_id".to_string(),
from: Some("$.data.user.id".to_string()),
value: None,
}]),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new(
"source",
json!({"data": {"user": {"id": 123}}}),
));
let body = sink.build_body(&msg);
assert_eq!(body, json!({"user_id": 123}));
}
#[test]
fn test_build_body_skips_missing_jsonpath() {
let cfg = HttpClientSinkConfig {
url: "https://example.com".to_string(),
fields: Some(vec![
FieldMapping {
name: "exists".to_string(),
from: Some("$.exists".to_string()),
value: None,
},
FieldMapping {
name: "missing".to_string(),
from: Some("$.missing".to_string()),
value: None,
},
]),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({"exists": "here"})));
let body = sink.build_body(&msg);
assert_eq!(body, json!({"exists": "here"}));
}
#[tokio::test]
async fn test_send_post_success() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("POST"))
.and(path("/webhook"))
.and(body_json(json!({"test": "data"})))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let cfg = HttpClientSinkConfig {
url: format!("{}/webhook", mock_server.uri()),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({"test": "data"})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_put_success() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("PUT"))
.and(path("/resource"))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let cfg = HttpClientSinkConfig {
url: format!("{}/resource", mock_server.uri()),
method: "PUT".to_string(),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({"data": "value"})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_with_headers() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("POST"))
.and(path("/webhook"))
.and(header("Authorization", "Bearer secret"))
.and(header("X-Custom", "value"))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let mut headers = HashMap::new();
headers.insert("Authorization".to_string(), "Bearer secret".to_string());
headers.insert("X-Custom".to_string(), "value".to_string());
let cfg = HttpClientSinkConfig {
url: format!("{}/webhook", mock_server.uri()),
headers,
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_error_status() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("POST"))
.and(path("/error"))
.respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
.mount(&mock_server)
.await;
let cfg = HttpClientSinkConfig {
url: format!("{}/error", mock_server.uri()),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({})));
let result = sink.process(msg).await;
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("500"));
assert!(error_msg.contains("Internal Server Error"));
}
#[tokio::test]
async fn test_send_with_field_mappings() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("POST"))
.and(path("/webhook"))
.and(body_json(json!({
"type": "event",
"user_id": 123
})))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let cfg = HttpClientSinkConfig {
url: format!("{}/webhook", mock_server.uri()),
fields: Some(vec![
FieldMapping {
name: "type".to_string(),
from: None,
value: Some(json!("event")),
},
FieldMapping {
name: "user_id".to_string(),
from: Some("$.data.user_id".to_string()),
value: None,
},
]),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({"data": {"user_id": 123}})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_with_auth() {
let Some(mock_server) = start_mock_server().await else {
return;
};
Mock::given(method("POST"))
.and(path("/secure-sink"))
.and(header("Authorization", "Bearer secret-token"))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let cfg = HttpClientSinkConfig {
url: format!("{}/secure-sink", mock_server.uri()),
auth: Some(AuthConfig::Bearer {
token: "secret-token".to_string(),
}),
..Default::default()
};
let sink = HttpClientSink::new("test", cfg).unwrap();
let msg = Arc::new(Message::new("source", json!({})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
}