hyperi_rustlib/dlq/
http.rs1use serde::{Deserialize, Serialize};
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(default)]
23pub struct HttpDlqConfig {
24 pub enabled: bool,
26
27 pub endpoint: String,
29
30 pub timeout_secs: u64,
32}
33
34impl Default for HttpDlqConfig {
35 fn default() -> Self {
36 Self {
37 enabled: false,
38 endpoint: String::new(),
39 timeout_secs: 30,
40 }
41 }
42}
43
44#[derive(Debug)]
46pub struct HttpDlqInner {
47 client: reqwest::Client,
48 endpoint: String,
49}
50
51impl HttpDlqInner {
52 pub fn new(config: &HttpDlqConfig) -> Result<Self, DlqError> {
59 if config.endpoint.is_empty() {
60 return Err(DlqError::BackendError("HTTP DLQ endpoint is empty".into()));
61 }
62
63 let client = reqwest::Client::builder()
64 .timeout(std::time::Duration::from_secs(config.timeout_secs))
65 .build()
66 .map_err(|e| DlqError::BackendError(format!("failed to build HTTP DLQ client: {e}")))?;
67
68 Ok(Self {
69 client,
70 endpoint: config.endpoint.clone(),
71 })
72 }
73
74 pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
76 if batch.is_empty() {
77 return Ok(());
78 }
79
80 let mut body = Vec::with_capacity(batch.len() * 256);
81 for entry in batch {
82 serde_json::to_writer(&mut body, entry)
83 .map_err(|e| DlqError::Serialization(e.to_string()))?;
84 body.push(b'\n');
85 }
86
87 let resp = self
88 .client
89 .post(&self.endpoint)
90 .header("content-type", "application/x-ndjson")
91 .body(body)
92 .send()
93 .await
94 .map_err(|e| DlqError::BackendError(format!("HTTP DLQ batch send failed: {e}")))?;
95
96 if !resp.status().is_success() {
97 return Err(DlqError::BackendError(format!(
98 "HTTP DLQ endpoint returned {}",
99 resp.status()
100 )));
101 }
102
103 #[cfg(feature = "metrics")]
104 metrics::counter!("dfe_dlq_sent_total", "backend" => "http").increment(batch.len() as u64);
105
106 Ok(())
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn config_defaults() {
116 let config = HttpDlqConfig::default();
117 assert!(!config.enabled);
118 assert!(config.endpoint.is_empty());
119 assert_eq!(config.timeout_secs, 30);
120 }
121
122 #[test]
123 fn config_deserialise() {
124 let json =
125 r#"{"enabled":true,"endpoint":"http://dlq.example.com/ingest","timeout_secs":10}"#;
126 let config: HttpDlqConfig = serde_json::from_str(json).unwrap();
127 assert!(config.enabled);
128 assert_eq!(config.endpoint, "http://dlq.example.com/ingest");
129 assert_eq!(config.timeout_secs, 10);
130 }
131
132 #[test]
133 fn rejects_empty_endpoint() {
134 let config = HttpDlqConfig::default();
135 assert!(HttpDlqInner::new(&config).is_err());
136 }
137}