use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use super::backend::DlqBackend;
use super::entry::DlqEntry;
use super::error::DlqError;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct HttpDlqConfig {
pub endpoint: String,
pub timeout_secs: u64,
}
impl Default for HttpDlqConfig {
fn default() -> Self {
Self {
endpoint: String::new(),
timeout_secs: 30,
}
}
}
pub struct HttpDlq {
client: reqwest::Client,
endpoint: String,
}
impl HttpDlq {
pub fn new(config: &HttpDlqConfig) -> Result<Self, DlqError> {
if config.endpoint.is_empty() {
return Err(DlqError::BackendError("HTTP DLQ endpoint is empty".into()));
}
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(config.timeout_secs))
.build()
.map_err(|e| DlqError::BackendError(format!("failed to build HTTP DLQ client: {e}")))?;
Ok(Self {
client,
endpoint: config.endpoint.clone(),
})
}
}
#[async_trait]
impl DlqBackend for HttpDlq {
async fn send(&self, entry: &DlqEntry) -> Result<(), DlqError> {
let body = serde_json::to_vec(entry).map_err(|e| DlqError::Serialization(e.to_string()))?;
let resp = self
.client
.post(&self.endpoint)
.header("content-type", "application/json")
.body(body)
.send()
.await
.map_err(|e| DlqError::BackendError(format!("HTTP DLQ send failed: {e}")))?;
if !resp.status().is_success() {
return Err(DlqError::BackendError(format!(
"HTTP DLQ endpoint returned {}",
resp.status()
)));
}
#[cfg(feature = "metrics")]
metrics::counter!("dfe_dlq_sent_total", "backend" => "http").increment(1);
Ok(())
}
async fn send_batch(&self, entries: &[DlqEntry]) -> Result<(), DlqError> {
if entries.is_empty() {
return Ok(());
}
let mut body = Vec::new();
for entry in entries {
serde_json::to_writer(&mut body, entry)
.map_err(|e| DlqError::Serialization(e.to_string()))?;
body.push(b'\n');
}
let resp = self
.client
.post(&self.endpoint)
.header("content-type", "application/x-ndjson")
.body(body)
.send()
.await
.map_err(|e| DlqError::BackendError(format!("HTTP DLQ batch send failed: {e}")))?;
if !resp.status().is_success() {
return Err(DlqError::BackendError(format!(
"HTTP DLQ endpoint returned {}",
resp.status()
)));
}
#[cfg(feature = "metrics")]
metrics::counter!("dfe_dlq_sent_total", "backend" => "http")
.increment(entries.len() as u64);
Ok(())
}
fn name(&self) -> &'static str {
"http"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults() {
let config = HttpDlqConfig::default();
assert!(config.endpoint.is_empty());
assert_eq!(config.timeout_secs, 30);
}
#[test]
fn config_deserialise() {
let json = r#"{"endpoint":"http://dlq.example.com/ingest","timeout_secs":10}"#;
let config: HttpDlqConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.endpoint, "http://dlq.example.com/ingest");
assert_eq!(config.timeout_secs, 10);
}
#[test]
fn rejects_empty_endpoint() {
let config = HttpDlqConfig::default();
assert!(HttpDlq::new(&config).is_err());
}
}