Skip to main content

hyperi_rustlib/dlq/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/http.rs
3// Purpose:   HTTP POST DLQ backend variant
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! HTTP backend variant for the DLQ enum.
10//!
11//! POSTs failed messages as NDJSON to a configured HTTP endpoint via
12//! reqwest. The reqwest client is async-native so no `spawn_blocking`
13//! is needed.
14
15use serde::{Deserialize, Serialize};
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20/// Configuration for the HTTP DLQ backend.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(default)]
23pub struct HttpDlqConfig {
24    /// Enable the HTTP backend.
25    pub enabled: bool,
26
27    /// Endpoint URL to POST failed messages to.
28    pub endpoint: String,
29
30    /// Request timeout in seconds.
31    pub timeout_secs: u64,
32}
33
34impl Default for HttpDlqConfig {
35    fn default() -> Self {
36        Self {
37            enabled: false,
38            endpoint: String::new(),
39            timeout_secs: 30,
40        }
41    }
42}
43
44/// HTTP backend -- internal variant carried by [`super::DlqBackend::Http`].
45#[derive(Debug)]
46pub struct HttpDlqInner {
47    client: reqwest::Client,
48    endpoint: String,
49}
50
51impl HttpDlqInner {
52    /// Build the HTTP backend.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if `endpoint` is empty or the reqwest client
57    /// cannot be built.
58    pub fn new(config: &HttpDlqConfig) -> Result<Self, DlqError> {
59        if config.endpoint.is_empty() {
60            return Err(DlqError::BackendError("HTTP DLQ endpoint is empty".into()));
61        }
62
63        let client = reqwest::Client::builder()
64            .timeout(std::time::Duration::from_secs(config.timeout_secs))
65            .build()
66            .map_err(|e| DlqError::BackendError(format!("failed to build HTTP DLQ client: {e}")))?;
67
68        Ok(Self {
69            client,
70            endpoint: config.endpoint.clone(),
71        })
72    }
73
74    /// Send a batch as `application/x-ndjson`.
75    pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
76        if batch.is_empty() {
77            return Ok(());
78        }
79
80        let mut body = Vec::with_capacity(batch.len() * 256);
81        for entry in batch {
82            serde_json::to_writer(&mut body, entry)
83                .map_err(|e| DlqError::Serialization(e.to_string()))?;
84            body.push(b'\n');
85        }
86
87        let resp = self
88            .client
89            .post(&self.endpoint)
90            .header("content-type", "application/x-ndjson")
91            .body(body)
92            .send()
93            .await
94            .map_err(|e| DlqError::BackendError(format!("HTTP DLQ batch send failed: {e}")))?;
95
96        if !resp.status().is_success() {
97            return Err(DlqError::BackendError(format!(
98                "HTTP DLQ endpoint returned {}",
99                resp.status()
100            )));
101        }
102
103        #[cfg(feature = "metrics")]
104        metrics::counter!("dfe_dlq_sent_total", "backend" => "http").increment(batch.len() as u64);
105
106        Ok(())
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn config_defaults() {
116        let config = HttpDlqConfig::default();
117        assert!(!config.enabled);
118        assert!(config.endpoint.is_empty());
119        assert_eq!(config.timeout_secs, 30);
120    }
121
122    #[test]
123    fn config_deserialise() {
124        let json =
125            r#"{"enabled":true,"endpoint":"http://dlq.example.com/ingest","timeout_secs":10}"#;
126        let config: HttpDlqConfig = serde_json::from_str(json).unwrap();
127        assert!(config.enabled);
128        assert_eq!(config.endpoint, "http://dlq.example.com/ingest");
129        assert_eq!(config.timeout_secs, 10);
130    }
131
132    #[test]
133    fn rejects_empty_endpoint() {
134        let config = HttpDlqConfig::default();
135        assert!(HttpDlqInner::new(&config).is_err());
136    }
137}