use super::error::{TransportError, TransportResult};
use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
#[cfg(feature = "http-server")]
use super::types::PayloadFormat;
use super::types::{Message, SendResult};
use serde::{Deserialize, Serialize};
#[cfg(feature = "http-server")]
use std::sync::Arc;
#[cfg(feature = "http-server")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug, Clone)]
pub struct HttpToken {
pub seq: u64,
pub source_addr: Option<String>,
}
impl HttpToken {
#[must_use]
pub fn new(seq: u64) -> Self {
Self {
seq,
source_addr: None,
}
}
#[must_use]
pub fn with_source(seq: u64, addr: String) -> Self {
Self {
seq,
source_addr: Some(addr),
}
}
}
impl CommitToken for HttpToken {}
impl std::fmt::Display for HttpToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.source_addr {
Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
None => write!(f, "http:{}", self.seq),
}
}
}
fn default_recv_path() -> String {
"/ingest".to_string()
}
fn default_buffer_size() -> usize {
10_000
}
fn default_recv_timeout_ms() -> u64 {
100
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpTransportConfig {
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub listen: Option<String>,
#[serde(default = "default_recv_path")]
pub recv_path: String,
#[serde(default = "default_buffer_size")]
pub recv_buffer_size: usize,
#[serde(default = "default_recv_timeout_ms")]
pub recv_timeout_ms: u64,
#[serde(default)]
pub filters_in: Vec<super::filter::FilterRule>,
#[serde(default)]
pub filters_out: Vec<super::filter::FilterRule>,
}
impl Default for HttpTransportConfig {
fn default() -> Self {
Self {
endpoint: None,
listen: None,
recv_path: default_recv_path(),
recv_buffer_size: default_buffer_size(),
recv_timeout_ms: default_recv_timeout_ms(),
filters_in: Vec::new(),
filters_out: Vec::new(),
}
}
}
impl HttpTransportConfig {
#[must_use]
pub fn from_cascade() -> Self {
#[cfg(feature = "config")]
{
if let Some(cfg) = crate::config::try_get()
&& let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.http")
{
return tc;
}
}
Self::default()
}
#[must_use]
pub fn sender(endpoint: &str) -> Self {
Self {
endpoint: Some(endpoint.to_string()),
..Default::default()
}
}
#[must_use]
pub fn receiver(listen: &str) -> Self {
Self {
listen: Some(listen.to_string()),
..Default::default()
}
}
}
pub struct HttpTransport {
client: reqwest::Client,
endpoint: Option<String>,
#[cfg(feature = "http-server")]
receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
#[cfg(feature = "http-server")]
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
#[cfg(feature = "http-server")]
_server_handle: Option<tokio::task::JoinHandle<()>>,
closed: Arc<AtomicBool>,
#[cfg(feature = "http-server")]
recv_timeout_ms: u64,
filter_engine: super::filter::TransportFilterEngine,
filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
}
impl HttpTransport {
pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
let client = reqwest::Client::builder()
.build()
.map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
#[cfg(feature = "http-server")]
let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
let addr: std::net::SocketAddr = listen
.parse()
.map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
let sequence = Arc::new(AtomicU64::new(0));
let recv_path = config.recv_path.clone();
let app = build_receiver_router(tx, sequence, &recv_path);
let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
TransportError::Connection(format!("failed to bind to {addr}: {e}"))
})?;
let handle = tokio::spawn(async move {
axum::serve(
listener,
app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
)
.with_graceful_shutdown(async {
sd_rx.await.ok();
})
.await
.ok();
});
(Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
} else {
(None, None, None)
};
#[cfg(feature = "logger")]
tracing::info!(
endpoint = ?config.endpoint,
listen = ?config.listen,
"HTTP transport opened"
);
let filter_engine = super::filter::TransportFilterEngine::new(
&config.filters_in,
&config.filters_out,
&crate::transport::filter::TransportFilterTierConfig::default(),
)
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
super::filter::TransportFilterEngine::empty()
});
let closed = Arc::new(AtomicBool::new(false));
#[cfg(feature = "health")]
{
let h = Arc::clone(&closed);
crate::health::HealthRegistry::register("transport:http", move || {
if h.load(Ordering::Relaxed) {
crate::health::HealthStatus::Unhealthy
} else {
crate::health::HealthStatus::Healthy
}
});
}
Ok(Self {
client,
endpoint: config.endpoint.clone(),
#[cfg(feature = "http-server")]
receiver,
#[cfg(feature = "http-server")]
shutdown_tx,
#[cfg(feature = "http-server")]
_server_handle: server_handle,
closed,
#[cfg(feature = "http-server")]
recv_timeout_ms: config.recv_timeout_ms,
filter_engine,
filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
})
}
}
#[cfg(feature = "http-server")]
fn build_receiver_router(
sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
sequence: Arc<AtomicU64>,
recv_path: &str,
) -> axum::Router {
use axum::routing::post;
let state = ReceiverState { sender, sequence };
axum::Router::new()
.route(recv_path, post(ingest_handler))
.with_state(state)
}
#[cfg(feature = "http-server")]
#[derive(Clone)]
struct ReceiverState {
sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
sequence: Arc<AtomicU64>,
}
#[cfg(feature = "http-server")]
async fn ingest_handler(
axum::extract::State(state): axum::extract::State<ReceiverState>,
axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> axum::http::StatusCode {
if body.is_empty() {
return axum::http::StatusCode::BAD_REQUEST;
}
#[cfg(feature = "otel")]
if let Some(tp) = headers
.get(super::propagation::TRACEPARENT_HEADER)
.and_then(|v| v.to_str().ok())
&& super::propagation::is_valid_traceparent(tp)
{
tracing::Span::current().record("traceparent", tp);
}
#[cfg(not(feature = "otel"))]
let _ = &headers;
let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
let format = PayloadFormat::detect(&body);
let timestamp_ms = chrono::Utc::now().timestamp_millis();
let msg = Message {
key: None,
payload: body.to_vec(),
token: HttpToken::with_source(seq, addr.to_string()),
timestamp_ms: Some(timestamp_ms),
format,
};
match state.sender.try_send(msg) {
Ok(()) => {
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
axum::http::StatusCode::OK
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
.increment(1);
axum::http::StatusCode::SERVICE_UNAVAILABLE
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
axum::http::StatusCode::GONE
}
}
}
impl TransportBase for HttpTransport {
async fn close(&self) -> TransportResult<()> {
self.closed.store(true, Ordering::Relaxed);
Ok(())
}
fn is_healthy(&self) -> bool {
!self.closed.load(Ordering::Relaxed)
}
fn name(&self) -> &'static str {
"http"
}
}
impl TransportSender for HttpTransport {
async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
if self.closed.load(Ordering::Relaxed) {
return SendResult::Fatal(TransportError::Closed);
}
if self.filter_engine.has_outbound_filters() {
match self.filter_engine.apply_outbound(payload) {
super::filter::FilterDisposition::Pass => {}
super::filter::FilterDisposition::Drop => return SendResult::Ok,
super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
}
}
let Some(base_url) = &self.endpoint else {
return SendResult::Fatal(TransportError::Config(
"no endpoint configured for sending".into(),
));
};
let url = if key.is_empty() {
base_url.clone()
} else {
let base = base_url.trim_end_matches('/');
let suffix = key.trim_start_matches('/');
format!("{base}/{suffix}")
};
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let request_builder = self
.client
.post(&url)
.header("content-type", "application/octet-stream");
#[cfg(feature = "otel")]
let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
} else {
request_builder
};
let result = match request_builder.body(payload.to_vec()).send().await {
Ok(resp) if resp.status().is_success() => {
#[cfg(feature = "logger")]
tracing::debug!(url = %url, bytes = payload.len(), "HTTP transport: POST sent");
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
SendResult::Ok
}
Ok(resp)
if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
|| resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
{
#[cfg(feature = "logger")]
tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
.increment(1);
SendResult::Backpressured
}
Ok(resp) => {
#[cfg(feature = "logger")]
tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
.increment(1);
SendResult::Fatal(TransportError::Send(format!(
"HTTP {} from {}",
resp.status(),
url
)))
}
Err(e) => {
#[cfg(feature = "logger")]
tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
.increment(1);
SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
}
};
#[cfg(feature = "metrics")]
metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
.record(start.elapsed().as_secs_f64());
result
}
}
impl TransportReceiver for HttpTransport {
type Token = HttpToken;
async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TransportError::Closed);
}
#[cfg(feature = "http-server")]
{
let Some(receiver) = &self.receiver else {
return Err(TransportError::Config(
"no listen address configured for receiving".into(),
));
};
let mut rx = receiver.lock().await;
let mut messages = Vec::with_capacity(max.min(100));
for _ in 0..max {
let result = if self.recv_timeout_ms == 0 {
match rx.try_recv() {
Ok(msg) => Some(msg),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
return Err(TransportError::Closed);
}
}
} else if messages.is_empty() {
match tokio::time::timeout(
std::time::Duration::from_millis(self.recv_timeout_ms),
rx.recv(),
)
.await
{
Ok(Some(msg)) => Some(msg),
Ok(None) => return Err(TransportError::Closed),
Err(_) => break, }
} else {
match rx.try_recv() {
Ok(msg) => Some(msg),
Err(_) => break,
}
};
if let Some(msg) = result {
messages.push(msg);
}
}
if self.filter_engine.has_inbound_filters() {
let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
super::filter::FilterDisposition::Pass => true,
super::filter::FilterDisposition::Drop => false,
super::filter::FilterDisposition::Dlq => {
staged_dlq.push(super::filter::FilteredDlqEntry {
payload: msg.payload.clone(),
key: msg.key.clone(),
reason: "transport filter".to_string(),
});
false
}
});
if !staged_dlq.is_empty() {
self.filtered_dlq_buffer.lock().extend(staged_dlq);
}
}
#[cfg(feature = "logger")]
if !messages.is_empty() {
tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
}
Ok(messages)
}
#[cfg(not(feature = "http-server"))]
{
let _ = max;
Err(TransportError::Config(
"HTTP receive requires the 'http-server' feature".into(),
))
}
}
fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
std::mem::take(&mut *self.filtered_dlq_buffer.lock())
}
async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
Ok(())
}
}
impl Drop for HttpTransport {
fn drop(&mut self) {
#[cfg(feature = "http-server")]
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn http_token_display() {
let token = HttpToken::new(42);
assert_eq!(format!("{token}"), "http:42");
}
#[test]
fn http_token_display_with_source() {
let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
}
#[test]
fn config_defaults() {
let config = HttpTransportConfig::default();
assert!(config.endpoint.is_none());
assert!(config.listen.is_none());
assert_eq!(config.recv_path, "/ingest");
assert_eq!(config.recv_buffer_size, 10_000);
assert_eq!(config.recv_timeout_ms, 100);
}
#[test]
fn config_sender_helper() {
let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
assert_eq!(
config.endpoint.as_deref(),
Some("http://localhost:8080/ingest")
);
assert!(config.listen.is_none());
}
#[test]
fn config_receiver_helper() {
let config = HttpTransportConfig::receiver("0.0.0.0:8080");
assert!(config.endpoint.is_none());
assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
}
#[tokio::test]
async fn send_only_transport() {
let config = HttpTransportConfig::default();
let transport = HttpTransport::new(&config).await.unwrap();
assert!(transport.is_healthy());
assert_eq!(transport.name(), "http");
let result = transport.send("test", b"payload").await;
assert!(result.is_fatal());
transport.commit(&[]).await.unwrap();
}
#[tokio::test]
async fn close_prevents_send() {
let config = HttpTransportConfig::sender("http://localhost:19999/test");
let transport = HttpTransport::new(&config).await.unwrap();
transport.close().await.unwrap();
assert!(!transport.is_healthy());
let result = transport.send("test", b"data").await;
assert!(result.is_fatal());
}
#[tokio::test]
async fn close_prevents_recv() {
let config = HttpTransportConfig::default();
let transport = HttpTransport::new(&config).await.unwrap();
transport.close().await.unwrap();
let result = transport.recv(1).await;
assert!(result.is_err());
}
#[cfg(feature = "http-server")]
#[tokio::test]
async fn send_and_receive_roundtrip() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
let recv_config = HttpTransportConfig {
listen: Some(addr.to_string()),
recv_path: "/ingest".to_string(),
recv_buffer_size: 100,
recv_timeout_ms: 1000,
..Default::default()
};
let receiver = HttpTransport::new(&recv_config).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let send_config =
HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
let sender = HttpTransport::new(&send_config).await.unwrap();
let result = sender.send("", b"{\"msg\":\"hello\"}").await;
assert!(result.is_ok(), "send failed: {result:?}");
let messages = receiver.recv(10).await.unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
assert!(messages[0].token.source_addr.is_some());
sender.close().await.unwrap();
receiver.close().await.unwrap();
}
#[cfg(feature = "http-server")]
#[tokio::test]
async fn receive_rejects_empty_body() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
let recv_config = HttpTransportConfig {
listen: Some(addr.to_string()),
recv_timeout_ms: 200,
..Default::default()
};
let receiver = HttpTransport::new(&recv_config).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{}/ingest", addr.port()))
.body(Vec::<u8>::new())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
let messages = receiver.recv(10).await.unwrap();
assert!(messages.is_empty());
receiver.close().await.unwrap();
}
#[cfg(feature = "http-server")]
#[tokio::test]
async fn recv_without_listen_returns_error() {
let config = HttpTransportConfig::sender("http://localhost:9999");
let transport = HttpTransport::new(&config).await.unwrap();
let result = transport.recv(10).await;
assert!(result.is_err());
}
#[test]
fn config_serde_roundtrip() {
let config = HttpTransportConfig {
endpoint: Some("http://example.com/ingest".into()),
listen: Some("0.0.0.0:8080".into()),
recv_path: "/custom".into(),
recv_buffer_size: 5000,
recv_timeout_ms: 250,
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.endpoint, config.endpoint);
assert_eq!(parsed.listen, config.listen);
assert_eq!(parsed.recv_path, config.recv_path);
assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
}
}