Skip to main content

helios_subscriptions/channels/
rest_hook.rs

1//! REST-hook channel dispatcher.
2//!
3//! Delivers notification bundles via HTTP POST to the subscriber's endpoint.
4
5use async_trait::async_trait;
6use reqwest::Client;
7use std::time::Duration;
8use tracing::{debug, warn};
9
10use crate::channels::{ChannelDispatcher, DispatchResult};
11use crate::error::SubscriptionError;
12use crate::manager::{ActiveSubscription, PayloadContent};
13
14/// REST-hook channel implementation.
15///
16/// Uses an HTTP client to POST notification bundles to subscriber endpoints.
17/// Supports custom headers and TLS enforcement for full-resource payloads.
18pub struct RestHookChannel {
19    client: Client,
20}
21
22impl RestHookChannel {
23    /// Creates a new REST-hook channel with default HTTP client settings.
24    pub fn new() -> Self {
25        let client = Client::builder()
26            .timeout(Duration::from_secs(30))
27            .build()
28            .expect("failed to build HTTP client");
29
30        Self { client }
31    }
32
33    /// Creates a new REST-hook channel with a custom HTTP client.
34    pub fn with_client(client: Client) -> Self {
35        Self { client }
36    }
37
38    /// Sends a notification bundle to the given endpoint.
39    async fn send(
40        &self,
41        subscription: &ActiveSubscription,
42        bundle: &serde_json::Value,
43    ) -> Result<DispatchResult, SubscriptionError> {
44        let endpoint = subscription.channel.endpoint.as_deref().ok_or_else(|| {
45            SubscriptionError::InvalidEndpoint {
46                message: "rest-hook channel requires an endpoint".to_string(),
47            }
48        })?;
49
50        // Enforce TLS for full-resource payloads.
51        if subscription.channel.payload_content == PayloadContent::FullResource
52            && !endpoint.starts_with("https://")
53        {
54            return Ok(DispatchResult::PermanentError(
55                "full-resource payload requires HTTPS endpoint".to_string(),
56            ));
57        }
58
59        let mut request = self
60            .client
61            .post(endpoint)
62            .header("Content-Type", "application/fhir+json")
63            .json(bundle);
64
65        // Add custom headers from the subscription.
66        for header in &subscription.channel.headers {
67            if let Some((name, value)) = header.split_once(':') {
68                let name = name.trim();
69                let value = value.trim();
70                request = request.header(name, value);
71            }
72        }
73
74        debug!(
75            subscription_id = %subscription.id,
76            endpoint,
77            "Dispatching rest-hook notification"
78        );
79
80        match request.send().await {
81            Ok(response) => {
82                let status = response.status().as_u16();
83                if response.status().is_success() {
84                    debug!(
85                        subscription_id = %subscription.id,
86                        status,
87                        "Notification delivered successfully"
88                    );
89                    Ok(DispatchResult::Success)
90                } else if response.status().is_client_error() {
91                    warn!(
92                        subscription_id = %subscription.id,
93                        status,
94                        "Notification delivery failed (client error)"
95                    );
96                    Ok(DispatchResult::PermanentError(format!("HTTP {status}")))
97                } else {
98                    warn!(
99                        subscription_id = %subscription.id,
100                        status,
101                        "Notification delivery failed (server error)"
102                    );
103                    Ok(DispatchResult::RetryableError(format!("HTTP {status}")))
104                }
105            }
106            Err(e) => {
107                if e.is_timeout() {
108                    warn!(
109                        subscription_id = %subscription.id,
110                        "Notification delivery timed out"
111                    );
112                    Ok(DispatchResult::RetryableError("timeout".to_string()))
113                } else if e.is_connect() {
114                    warn!(
115                        subscription_id = %subscription.id,
116                        error = %e,
117                        "Connection failed"
118                    );
119                    Ok(DispatchResult::RetryableError(format!(
120                        "connection error: {e}"
121                    )))
122                } else {
123                    warn!(
124                        subscription_id = %subscription.id,
125                        error = %e,
126                        "Notification delivery failed"
127                    );
128                    Ok(DispatchResult::RetryableError(e.to_string()))
129                }
130            }
131        }
132    }
133}
134
135impl Default for RestHookChannel {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141#[async_trait]
142impl ChannelDispatcher for RestHookChannel {
143    async fn dispatch(
144        &self,
145        subscription: &ActiveSubscription,
146        notification_bundle: &serde_json::Value,
147    ) -> Result<DispatchResult, SubscriptionError> {
148        self.send(subscription, notification_bundle).await
149    }
150
151    async fn handshake(
152        &self,
153        subscription: &ActiveSubscription,
154        handshake_bundle: &serde_json::Value,
155    ) -> Result<DispatchResult, SubscriptionError> {
156        self.send(subscription, handshake_bundle).await
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
164    use helios_fhir::FhirVersion;
165    use serde_json::json;
166    use wiremock::matchers::{method, path};
167    use wiremock::{Mock, MockServer, ResponseTemplate};
168
169    fn test_subscription(endpoint: &str, payload: PayloadContent) -> ActiveSubscription {
170        ActiveSubscription {
171            id: "sub-1".to_string(),
172            topic_url: "http://example.org/topic/test".to_string(),
173            status: SubscriptionStatusCode::Active,
174            channel: ChannelConfig {
175                channel_type: ChannelType::RestHook,
176                endpoint: Some(endpoint.to_string()),
177                payload_mime_type: Some("application/fhir+json".to_string()),
178                payload_content: payload,
179                headers: vec!["Authorization: Bearer test-token".to_string()],
180                heartbeat_period: None,
181                timeout: None,
182                max_count: None,
183            },
184            filters: vec![],
185            fhir_version: FhirVersion::default(),
186            events_since_start: 0,
187            consecutive_failures: 0,
188            tenant_id: "test".to_string(),
189        }
190    }
191
192    fn test_bundle() -> serde_json::Value {
193        json!({"resourceType": "Bundle", "type": "history", "entry": []})
194    }
195
196    #[tokio::test]
197    async fn test_successful_delivery() {
198        let server = MockServer::start().await;
199        Mock::given(method("POST"))
200            .and(path("/webhook"))
201            .respond_with(ResponseTemplate::new(200))
202            .mount(&server)
203            .await;
204
205        let channel = RestHookChannel::new();
206        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
207        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
208
209        assert!(matches!(result, DispatchResult::Success));
210    }
211
212    #[tokio::test]
213    async fn test_server_error_retryable() {
214        let server = MockServer::start().await;
215        Mock::given(method("POST"))
216            .and(path("/webhook"))
217            .respond_with(ResponseTemplate::new(500))
218            .mount(&server)
219            .await;
220
221        let channel = RestHookChannel::new();
222        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
223        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
224
225        assert!(matches!(result, DispatchResult::RetryableError(_)));
226    }
227
228    #[tokio::test]
229    async fn test_client_error_permanent() {
230        let server = MockServer::start().await;
231        Mock::given(method("POST"))
232            .and(path("/webhook"))
233            .respond_with(ResponseTemplate::new(404))
234            .mount(&server)
235            .await;
236
237        let channel = RestHookChannel::new();
238        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
239        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
240
241        assert!(matches!(result, DispatchResult::PermanentError(_)));
242    }
243
244    #[tokio::test]
245    async fn test_custom_headers_sent() {
246        let server = MockServer::start().await;
247        Mock::given(method("POST"))
248            .and(path("/webhook"))
249            .and(wiremock::matchers::header(
250                "Authorization",
251                "Bearer test-token",
252            ))
253            .and(wiremock::matchers::header(
254                "Content-Type",
255                "application/fhir+json",
256            ))
257            .respond_with(ResponseTemplate::new(200))
258            .mount(&server)
259            .await;
260
261        let channel = RestHookChannel::new();
262        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
263        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
264
265        assert!(matches!(result, DispatchResult::Success));
266    }
267
268    #[tokio::test]
269    async fn test_tls_enforcement_for_full_resource() {
270        let channel = RestHookChannel::new();
271        let sub = test_subscription(
272            "http://insecure.example.com/webhook",
273            PayloadContent::FullResource,
274        );
275        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
276
277        assert!(matches!(result, DispatchResult::PermanentError(_)));
278        if let DispatchResult::PermanentError(msg) = result {
279            assert!(msg.contains("HTTPS"));
280        }
281    }
282
283    #[tokio::test]
284    async fn test_tls_not_required_for_id_only() {
285        let server = MockServer::start().await;
286        Mock::given(method("POST"))
287            .and(path("/webhook"))
288            .respond_with(ResponseTemplate::new(200))
289            .mount(&server)
290            .await;
291
292        let channel = RestHookChannel::new();
293        // MockServer uses HTTP, which is fine for id-only.
294        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
295        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
296
297        assert!(matches!(result, DispatchResult::Success));
298    }
299
300    #[tokio::test]
301    async fn test_handshake_success() {
302        let server = MockServer::start().await;
303        Mock::given(method("POST"))
304            .and(path("/webhook"))
305            .respond_with(ResponseTemplate::new(200))
306            .mount(&server)
307            .await;
308
309        let channel = RestHookChannel::new();
310        let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
311        let bundle = test_bundle();
312        let result = channel.handshake(&sub, &bundle).await.unwrap();
313
314        assert!(matches!(result, DispatchResult::Success));
315    }
316
317    #[tokio::test]
318    async fn test_connection_error() {
319        let channel = RestHookChannel::new();
320        // Use a port that nothing is listening on.
321        let sub = test_subscription("http://127.0.0.1:1/webhook", PayloadContent::IdOnly);
322        let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
323
324        assert!(matches!(result, DispatchResult::RetryableError(_)));
325    }
326
327    #[tokio::test]
328    async fn test_missing_endpoint() {
329        let channel = RestHookChannel::new();
330        let mut sub = test_subscription("http://example.com/webhook", PayloadContent::IdOnly);
331        sub.channel.endpoint = None;
332
333        let result = channel.dispatch(&sub, &test_bundle()).await;
334        assert!(result.is_err());
335    }
336}