helios_subscriptions/channels/
rest_hook.rs1use async_trait::async_trait;
6use reqwest::Client;
7use std::time::Duration;
8use tracing::{debug, warn};
9
10use crate::channels::{ChannelDispatcher, DispatchResult};
11use crate::error::SubscriptionError;
12use crate::manager::{ActiveSubscription, PayloadContent};
13
14pub struct RestHookChannel {
19 client: Client,
20}
21
22impl RestHookChannel {
23 pub fn new() -> Self {
25 let client = Client::builder()
26 .timeout(Duration::from_secs(30))
27 .build()
28 .expect("failed to build HTTP client");
29
30 Self { client }
31 }
32
33 pub fn with_client(client: Client) -> Self {
35 Self { client }
36 }
37
38 async fn send(
40 &self,
41 subscription: &ActiveSubscription,
42 bundle: &serde_json::Value,
43 ) -> Result<DispatchResult, SubscriptionError> {
44 let endpoint = subscription.channel.endpoint.as_deref().ok_or_else(|| {
45 SubscriptionError::InvalidEndpoint {
46 message: "rest-hook channel requires an endpoint".to_string(),
47 }
48 })?;
49
50 if subscription.channel.payload_content == PayloadContent::FullResource
52 && !endpoint.starts_with("https://")
53 {
54 return Ok(DispatchResult::PermanentError(
55 "full-resource payload requires HTTPS endpoint".to_string(),
56 ));
57 }
58
59 let mut request = self
60 .client
61 .post(endpoint)
62 .header("Content-Type", "application/fhir+json")
63 .json(bundle);
64
65 for header in &subscription.channel.headers {
67 if let Some((name, value)) = header.split_once(':') {
68 let name = name.trim();
69 let value = value.trim();
70 request = request.header(name, value);
71 }
72 }
73
74 debug!(
75 subscription_id = %subscription.id,
76 endpoint,
77 "Dispatching rest-hook notification"
78 );
79
80 match request.send().await {
81 Ok(response) => {
82 let status = response.status().as_u16();
83 if response.status().is_success() {
84 debug!(
85 subscription_id = %subscription.id,
86 status,
87 "Notification delivered successfully"
88 );
89 Ok(DispatchResult::Success)
90 } else if response.status().is_client_error() {
91 warn!(
92 subscription_id = %subscription.id,
93 status,
94 "Notification delivery failed (client error)"
95 );
96 Ok(DispatchResult::PermanentError(format!("HTTP {status}")))
97 } else {
98 warn!(
99 subscription_id = %subscription.id,
100 status,
101 "Notification delivery failed (server error)"
102 );
103 Ok(DispatchResult::RetryableError(format!("HTTP {status}")))
104 }
105 }
106 Err(e) => {
107 if e.is_timeout() {
108 warn!(
109 subscription_id = %subscription.id,
110 "Notification delivery timed out"
111 );
112 Ok(DispatchResult::RetryableError("timeout".to_string()))
113 } else if e.is_connect() {
114 warn!(
115 subscription_id = %subscription.id,
116 error = %e,
117 "Connection failed"
118 );
119 Ok(DispatchResult::RetryableError(format!(
120 "connection error: {e}"
121 )))
122 } else {
123 warn!(
124 subscription_id = %subscription.id,
125 error = %e,
126 "Notification delivery failed"
127 );
128 Ok(DispatchResult::RetryableError(e.to_string()))
129 }
130 }
131 }
132 }
133}
134
135impl Default for RestHookChannel {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141#[async_trait]
142impl ChannelDispatcher for RestHookChannel {
143 async fn dispatch(
144 &self,
145 subscription: &ActiveSubscription,
146 notification_bundle: &serde_json::Value,
147 ) -> Result<DispatchResult, SubscriptionError> {
148 self.send(subscription, notification_bundle).await
149 }
150
151 async fn handshake(
152 &self,
153 subscription: &ActiveSubscription,
154 handshake_bundle: &serde_json::Value,
155 ) -> Result<DispatchResult, SubscriptionError> {
156 self.send(subscription, handshake_bundle).await
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
164 use helios_fhir::FhirVersion;
165 use serde_json::json;
166 use wiremock::matchers::{method, path};
167 use wiremock::{Mock, MockServer, ResponseTemplate};
168
169 fn test_subscription(endpoint: &str, payload: PayloadContent) -> ActiveSubscription {
170 ActiveSubscription {
171 id: "sub-1".to_string(),
172 topic_url: "http://example.org/topic/test".to_string(),
173 status: SubscriptionStatusCode::Active,
174 channel: ChannelConfig {
175 channel_type: ChannelType::RestHook,
176 endpoint: Some(endpoint.to_string()),
177 payload_mime_type: Some("application/fhir+json".to_string()),
178 payload_content: payload,
179 headers: vec!["Authorization: Bearer test-token".to_string()],
180 heartbeat_period: None,
181 timeout: None,
182 max_count: None,
183 },
184 filters: vec![],
185 fhir_version: FhirVersion::default(),
186 events_since_start: 0,
187 consecutive_failures: 0,
188 tenant_id: "test".to_string(),
189 }
190 }
191
192 fn test_bundle() -> serde_json::Value {
193 json!({"resourceType": "Bundle", "type": "history", "entry": []})
194 }
195
196 #[tokio::test]
197 async fn test_successful_delivery() {
198 let server = MockServer::start().await;
199 Mock::given(method("POST"))
200 .and(path("/webhook"))
201 .respond_with(ResponseTemplate::new(200))
202 .mount(&server)
203 .await;
204
205 let channel = RestHookChannel::new();
206 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
207 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
208
209 assert!(matches!(result, DispatchResult::Success));
210 }
211
212 #[tokio::test]
213 async fn test_server_error_retryable() {
214 let server = MockServer::start().await;
215 Mock::given(method("POST"))
216 .and(path("/webhook"))
217 .respond_with(ResponseTemplate::new(500))
218 .mount(&server)
219 .await;
220
221 let channel = RestHookChannel::new();
222 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
223 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
224
225 assert!(matches!(result, DispatchResult::RetryableError(_)));
226 }
227
228 #[tokio::test]
229 async fn test_client_error_permanent() {
230 let server = MockServer::start().await;
231 Mock::given(method("POST"))
232 .and(path("/webhook"))
233 .respond_with(ResponseTemplate::new(404))
234 .mount(&server)
235 .await;
236
237 let channel = RestHookChannel::new();
238 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
239 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
240
241 assert!(matches!(result, DispatchResult::PermanentError(_)));
242 }
243
244 #[tokio::test]
245 async fn test_custom_headers_sent() {
246 let server = MockServer::start().await;
247 Mock::given(method("POST"))
248 .and(path("/webhook"))
249 .and(wiremock::matchers::header(
250 "Authorization",
251 "Bearer test-token",
252 ))
253 .and(wiremock::matchers::header(
254 "Content-Type",
255 "application/fhir+json",
256 ))
257 .respond_with(ResponseTemplate::new(200))
258 .mount(&server)
259 .await;
260
261 let channel = RestHookChannel::new();
262 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
263 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
264
265 assert!(matches!(result, DispatchResult::Success));
266 }
267
268 #[tokio::test]
269 async fn test_tls_enforcement_for_full_resource() {
270 let channel = RestHookChannel::new();
271 let sub = test_subscription(
272 "http://insecure.example.com/webhook",
273 PayloadContent::FullResource,
274 );
275 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
276
277 assert!(matches!(result, DispatchResult::PermanentError(_)));
278 if let DispatchResult::PermanentError(msg) = result {
279 assert!(msg.contains("HTTPS"));
280 }
281 }
282
283 #[tokio::test]
284 async fn test_tls_not_required_for_id_only() {
285 let server = MockServer::start().await;
286 Mock::given(method("POST"))
287 .and(path("/webhook"))
288 .respond_with(ResponseTemplate::new(200))
289 .mount(&server)
290 .await;
291
292 let channel = RestHookChannel::new();
293 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
295 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
296
297 assert!(matches!(result, DispatchResult::Success));
298 }
299
300 #[tokio::test]
301 async fn test_handshake_success() {
302 let server = MockServer::start().await;
303 Mock::given(method("POST"))
304 .and(path("/webhook"))
305 .respond_with(ResponseTemplate::new(200))
306 .mount(&server)
307 .await;
308
309 let channel = RestHookChannel::new();
310 let sub = test_subscription(&format!("{}/webhook", server.uri()), PayloadContent::IdOnly);
311 let bundle = test_bundle();
312 let result = channel.handshake(&sub, &bundle).await.unwrap();
313
314 assert!(matches!(result, DispatchResult::Success));
315 }
316
317 #[tokio::test]
318 async fn test_connection_error() {
319 let channel = RestHookChannel::new();
320 let sub = test_subscription("http://127.0.0.1:1/webhook", PayloadContent::IdOnly);
322 let result = channel.dispatch(&sub, &test_bundle()).await.unwrap();
323
324 assert!(matches!(result, DispatchResult::RetryableError(_)));
325 }
326
327 #[tokio::test]
328 async fn test_missing_endpoint() {
329 let channel = RestHookChannel::new();
330 let mut sub = test_subscription("http://example.com/webhook", PayloadContent::IdOnly);
331 sub.channel.endpoint = None;
332
333 let result = channel.dispatch(&sub, &test_bundle()).await;
334 assert!(result.is_err());
335 }
336}