1use std::net::IpAddr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use helios_auth::OutboundAuthProvider;
26use reqwest::{Client, Url};
27use tracing::{debug, warn};
28
29use crate::channels::{ChannelDispatcher, DispatchResult};
30use crate::error::SubscriptionError;
31use crate::manager::{ActiveSubscription, PayloadContent};
32use crate::notification::{notification_type_from_bundle, wrap_as_message_bundle};
33
34pub struct MessagingChannel {
36 client: Client,
37 auth_provider: Arc<dyn OutboundAuthProvider>,
38 source_endpoint: String,
39 private_endpoints_allowed: bool,
42}
43
44impl MessagingChannel {
45 pub fn new(source_endpoint: String, auth_provider: Arc<dyn OutboundAuthProvider>) -> Self {
48 let client = Client::builder()
49 .timeout(Duration::from_secs(30))
50 .build()
51 .expect("failed to build HTTP client");
52
53 Self {
54 client,
55 auth_provider,
56 source_endpoint,
57 private_endpoints_allowed: false,
58 }
59 }
60
61 pub fn with_client(mut self, client: Client) -> Self {
63 self.client = client;
64 self
65 }
66
67 pub fn with_private_endpoints_allowed(mut self, allowed: bool) -> Self {
70 self.private_endpoints_allowed = allowed;
71 self
72 }
73
74 async fn send(
75 &self,
76 subscription: &ActiveSubscription,
77 bundle: &serde_json::Value,
78 ) -> Result<DispatchResult, SubscriptionError> {
79 let endpoint = subscription.channel.endpoint.as_deref().ok_or_else(|| {
80 SubscriptionError::InvalidEndpoint {
81 message: "messaging channel requires an endpoint".to_string(),
82 }
83 })?;
84
85 if subscription.channel.payload_content == PayloadContent::FullResource
87 && !endpoint.starts_with("https://")
88 {
89 return Ok(DispatchResult::PermanentError(
90 "full-resource payload requires HTTPS endpoint".to_string(),
91 ));
92 }
93
94 if !self.private_endpoints_allowed {
96 if let Some(reason) = check_endpoint_host(endpoint) {
97 return Ok(DispatchResult::PermanentError(reason));
98 }
99 }
100
101 let notification_type = notification_type_from_bundle(bundle);
103 let message_bundle = wrap_as_message_bundle(
104 bundle,
105 subscription,
106 &self.source_endpoint,
107 notification_type,
108 )?;
109
110 let mut request = self
111 .client
112 .post(endpoint)
113 .header("Content-Type", "application/fhir+json")
114 .json(&message_bundle);
115
116 let mut subscription_supplies_auth = false;
120 for header in &subscription.channel.headers {
121 if let Some((name, value)) = header.split_once(':') {
122 let name = name.trim();
123 let value = value.trim();
124 if name.eq_ignore_ascii_case("authorization") {
125 subscription_supplies_auth = true;
126 }
127 request = request.header(name, value);
128 }
129 }
130
131 if !subscription_supplies_auth {
132 request = self.auth_provider.authorize(request, endpoint).await?;
133 }
134
135 debug!(
136 subscription_id = %subscription.id,
137 endpoint,
138 "Dispatching messaging notification"
139 );
140
141 match request.send().await {
142 Ok(response) => {
143 let status = response.status().as_u16();
144 if response.status().is_success() {
145 debug!(
146 subscription_id = %subscription.id,
147 status,
148 "Message delivered successfully"
149 );
150 Ok(DispatchResult::Success)
151 } else if response.status().is_client_error() {
152 warn!(
153 subscription_id = %subscription.id,
154 status,
155 "Message delivery failed (client error)"
156 );
157 Ok(DispatchResult::PermanentError(format!("HTTP {status}")))
158 } else {
159 warn!(
160 subscription_id = %subscription.id,
161 status,
162 "Message delivery failed (server error)"
163 );
164 Ok(DispatchResult::RetryableError(format!("HTTP {status}")))
165 }
166 }
167 Err(e) => {
168 if e.is_timeout() {
169 warn!(
170 subscription_id = %subscription.id,
171 "Message delivery timed out"
172 );
173 Ok(DispatchResult::RetryableError("timeout".to_string()))
174 } else if e.is_connect() {
175 warn!(
176 subscription_id = %subscription.id,
177 error = %e,
178 "Connection failed"
179 );
180 Ok(DispatchResult::RetryableError(format!(
181 "connection error: {e}"
182 )))
183 } else {
184 warn!(
185 subscription_id = %subscription.id,
186 error = %e,
187 "Message delivery failed"
188 );
189 Ok(DispatchResult::RetryableError(e.to_string()))
190 }
191 }
192 }
193 }
194}
195
196fn check_endpoint_host(endpoint: &str) -> Option<String> {
200 let url = match Url::parse(endpoint) {
201 Ok(u) => u,
202 Err(_) => return Some(format!("invalid endpoint URL: {endpoint}")),
203 };
204
205 let host = url.host_str()?;
206 let ip: IpAddr = match host.parse() {
207 Ok(ip) => ip,
208 Err(_) => return None, };
210
211 let is_blocked = match ip {
212 IpAddr::V4(v4) => {
213 v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified()
214 }
215 IpAddr::V6(v6) => v6.is_loopback() || v6.is_unspecified() || is_v6_private_or_link(&v6),
216 };
217
218 if is_blocked {
219 Some(format!(
220 "endpoint host {ip} is a private/loopback IP; set HFS_SUBSCRIPTION_ALLOW_PRIVATE_ENDPOINTS=true to allow"
221 ))
222 } else {
223 None
224 }
225}
226
227fn is_v6_private_or_link(addr: &std::net::Ipv6Addr) -> bool {
228 let segments = addr.segments();
229 let unique_local = (segments[0] & 0xfe00) == 0xfc00;
231 let link_local = (segments[0] & 0xffc0) == 0xfe80;
233 unique_local || link_local
234}
235
236#[async_trait]
237impl ChannelDispatcher for MessagingChannel {
238 async fn dispatch(
239 &self,
240 subscription: &ActiveSubscription,
241 notification_bundle: &serde_json::Value,
242 ) -> Result<DispatchResult, SubscriptionError> {
243 self.send(subscription, notification_bundle).await
244 }
245
246 async fn handshake(
247 &self,
248 subscription: &ActiveSubscription,
249 handshake_bundle: &serde_json::Value,
250 ) -> Result<DispatchResult, SubscriptionError> {
251 self.send(subscription, handshake_bundle).await
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
259 use crate::notification::{NotificationType, build_event_notification, build_handshake};
260 use chrono::Utc;
261 use helios_auth::{NoOpOutboundAuthProvider, StaticBearerOutboundAuthProvider};
262 use helios_fhir::FhirVersion;
263 use serde_json::Value;
264 use wiremock::matchers::{header, method, path};
265 use wiremock::{Mock, MockServer, ResponseTemplate};
266
267 const SOURCE: &str = "https://hfs.example.org/fhir";
268
269 fn test_subscription(endpoint: &str, payload: PayloadContent) -> ActiveSubscription {
270 ActiveSubscription {
271 id: "sub-msg-1".to_string(),
272 topic_url: "http://example.org/topic/test".to_string(),
273 status: SubscriptionStatusCode::Active,
274 channel: ChannelConfig {
275 channel_type: ChannelType::Message,
276 endpoint: Some(endpoint.to_string()),
277 payload_mime_type: Some("application/fhir+json".to_string()),
278 payload_content: payload,
279 headers: vec![],
280 heartbeat_period: None,
281 timeout: None,
282 max_count: None,
283 },
284 filters: vec![],
285 fhir_version: FhirVersion::default(),
286 events_since_start: 0,
287 consecutive_failures: 0,
288 tenant_id: "test".to_string(),
289 }
290 }
291
292 fn channel_with_noop() -> MessagingChannel {
293 MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider))
294 .with_private_endpoints_allowed(true)
295 }
296
297 fn channel_with_bearer(token: &str) -> MessagingChannel {
298 MessagingChannel::new(
299 SOURCE.to_string(),
300 Arc::new(StaticBearerOutboundAuthProvider::new(token)),
301 )
302 .with_private_endpoints_allowed(true)
303 }
304
305 fn handshake_bundle(sub: &ActiveSubscription) -> Value {
306 build_handshake(sub, SOURCE).unwrap()
307 }
308
309 fn event_bundle(sub: &ActiveSubscription) -> Value {
310 let event = crate::notification::NotificationEventData {
311 event_number: 1,
312 timestamp: Utc::now(),
313 focus_reference: "Encounter/abc".to_string(),
314 };
315 let resource = serde_json::json!({"resourceType": "Encounter", "id": "abc"});
316 build_event_notification(sub, event, Some(&resource), SOURCE).unwrap()
317 }
318
319 #[tokio::test]
320 async fn successful_delivery() {
321 let server = MockServer::start().await;
322 Mock::given(method("POST"))
323 .and(path("/process-message"))
324 .respond_with(ResponseTemplate::new(200))
325 .mount(&server)
326 .await;
327
328 let channel = channel_with_noop();
329 let sub = test_subscription(
330 &format!("{}/process-message", server.uri()),
331 PayloadContent::IdOnly,
332 );
333 let result = channel
334 .dispatch(&sub, &handshake_bundle(&sub))
335 .await
336 .unwrap();
337
338 assert!(matches!(result, DispatchResult::Success));
339 }
340
341 #[tokio::test]
342 async fn server_error_is_retryable() {
343 let server = MockServer::start().await;
344 Mock::given(method("POST"))
345 .and(path("/x"))
346 .respond_with(ResponseTemplate::new(503))
347 .mount(&server)
348 .await;
349
350 let channel = channel_with_noop();
351 let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
352 let result = channel
353 .dispatch(&sub, &handshake_bundle(&sub))
354 .await
355 .unwrap();
356 assert!(matches!(result, DispatchResult::RetryableError(_)));
357 }
358
359 #[tokio::test]
360 async fn client_error_is_permanent() {
361 let server = MockServer::start().await;
362 Mock::given(method("POST"))
363 .and(path("/x"))
364 .respond_with(ResponseTemplate::new(404))
365 .mount(&server)
366 .await;
367
368 let channel = channel_with_noop();
369 let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
370 let result = channel
371 .dispatch(&sub, &handshake_bundle(&sub))
372 .await
373 .unwrap();
374 assert!(matches!(result, DispatchResult::PermanentError(_)));
375 }
376
377 #[tokio::test]
378 async fn outbound_bearer_attached_when_subscription_does_not_supply_one() {
379 let server = MockServer::start().await;
380 Mock::given(method("POST"))
381 .and(path("/x"))
382 .and(header("Authorization", "Bearer outbound-token"))
383 .and(header("Content-Type", "application/fhir+json"))
384 .respond_with(ResponseTemplate::new(200))
385 .mount(&server)
386 .await;
387
388 let channel = channel_with_bearer("outbound-token");
389 let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
390 let result = channel
391 .dispatch(&sub, &handshake_bundle(&sub))
392 .await
393 .unwrap();
394 assert!(matches!(result, DispatchResult::Success));
395 }
396
397 #[tokio::test]
398 async fn subscription_authorization_takes_precedence_over_outbound() {
399 let server = MockServer::start().await;
400 Mock::given(method("POST"))
401 .and(path("/x"))
402 .and(header("Authorization", "Bearer subscriber-token"))
403 .respond_with(ResponseTemplate::new(200))
404 .mount(&server)
405 .await;
406
407 let channel = channel_with_bearer("outbound-token");
408 let mut sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
409 sub.channel.headers = vec!["Authorization: Bearer subscriber-token".to_string()];
410 let result = channel
411 .dispatch(&sub, &handshake_bundle(&sub))
412 .await
413 .unwrap();
414 assert!(
415 matches!(result, DispatchResult::Success),
416 "subscription-supplied Authorization header should take precedence; got {result:?}"
417 );
418
419 let received = &server.received_requests().await.unwrap()[0];
422 let auth = received
423 .headers
424 .get("authorization")
425 .expect("authorization header present");
426 assert_eq!(auth.to_str().unwrap(), "Bearer subscriber-token");
427 }
428
429 #[tokio::test]
430 async fn body_is_message_bundle_with_message_header_first() {
431 let server = MockServer::start().await;
432 Mock::given(method("POST"))
433 .and(path("/x"))
434 .respond_with(ResponseTemplate::new(200))
435 .mount(&server)
436 .await;
437
438 let channel = channel_with_noop();
439 let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
440 let _ = channel
441 .dispatch(&sub, &handshake_bundle(&sub))
442 .await
443 .unwrap();
444
445 let received = &server.received_requests().await.unwrap()[0];
446 let body: Value = serde_json::from_slice(&received.body).unwrap();
447 assert_eq!(body["resourceType"], "Bundle");
448 assert_eq!(body["type"], "message");
449 assert_eq!(
450 body["entry"][0]["resource"]["resourceType"],
451 "MessageHeader"
452 );
453 }
454
455 #[tokio::test]
456 async fn handshake_bundle_carries_handshake_event_code() {
457 let server = MockServer::start().await;
458 Mock::given(method("POST"))
459 .and(path("/x"))
460 .respond_with(ResponseTemplate::new(200))
461 .mount(&server)
462 .await;
463
464 let channel = channel_with_noop();
465 let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
466 let _ = channel
467 .handshake(&sub, &handshake_bundle(&sub))
468 .await
469 .unwrap();
470
471 let received = &server.received_requests().await.unwrap()[0];
472 let body: Value = serde_json::from_slice(&received.body).unwrap();
473 let header = &body["entry"][0]["resource"];
474 let handshake_code = header
477 .get("eventCoding")
478 .and_then(|c| c.get("code"))
479 .and_then(Value::as_str);
480 let event_uri = header.get("eventUri").and_then(Value::as_str);
481 let nt = notification_type_from_bundle(&handshake_bundle(&sub));
482 assert_eq!(nt, NotificationType::Handshake);
483 if let Some(code) = handshake_code {
484 assert_eq!(code, "handshake");
485 } else {
486 assert!(event_uri.is_some(), "R4 event URI must be set");
487 }
488 }
489
490 #[tokio::test]
491 async fn tls_enforced_for_full_resource() {
492 let channel = channel_with_noop();
493 let sub = test_subscription(
494 "http://insecure.example.com/x",
495 PayloadContent::FullResource,
496 );
497 let result = channel.dispatch(&sub, &event_bundle(&sub)).await.unwrap();
498 assert!(matches!(result, DispatchResult::PermanentError(_)));
499 }
500
501 #[tokio::test]
502 async fn missing_endpoint_errors() {
503 let channel = channel_with_noop();
504 let mut sub = test_subscription("http://example.com/x", PayloadContent::IdOnly);
505 sub.channel.endpoint = None;
506 let result = channel.dispatch(&sub, &handshake_bundle(&sub)).await;
507 assert!(matches!(
508 result,
509 Err(SubscriptionError::InvalidEndpoint { .. })
510 ));
511 }
512
513 #[tokio::test]
514 async fn ssrf_guard_rejects_loopback_when_disabled() {
515 let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
517 let sub = test_subscription("http://127.0.0.1:9/x", PayloadContent::IdOnly);
518 let result = channel
519 .dispatch(&sub, &handshake_bundle(&sub))
520 .await
521 .unwrap();
522 match result {
523 DispatchResult::PermanentError(msg) => {
524 assert!(msg.contains("127.0.0.1"));
525 }
526 other => panic!("expected PermanentError, got {other:?}"),
527 }
528 }
529
530 #[tokio::test]
531 async fn ssrf_guard_rejects_private_v4_when_disabled() {
532 let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
533 let sub = test_subscription("http://10.0.0.1:9/x", PayloadContent::IdOnly);
534 let result = channel
535 .dispatch(&sub, &handshake_bundle(&sub))
536 .await
537 .unwrap();
538 assert!(matches!(result, DispatchResult::PermanentError(_)));
539 }
540
541 #[tokio::test]
542 async fn ssrf_guard_allows_loopback_when_enabled() {
543 let server = MockServer::start().await;
544 Mock::given(method("POST"))
545 .and(path("/x"))
546 .respond_with(ResponseTemplate::new(200))
547 .mount(&server)
548 .await;
549
550 let channel = channel_with_noop(); let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
552 let result = channel
553 .dispatch(&sub, &handshake_bundle(&sub))
554 .await
555 .unwrap();
556 assert!(matches!(result, DispatchResult::Success));
557 }
558
559 #[tokio::test]
560 async fn ssrf_guard_passes_through_dns_names() {
561 let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
565 let sub = test_subscription(
566 "http://nonexistent.invalid.example.test/x",
567 PayloadContent::IdOnly,
568 );
569 let result = channel
570 .dispatch(&sub, &handshake_bundle(&sub))
571 .await
572 .unwrap();
573 match result {
576 DispatchResult::RetryableError(_) => {}
577 DispatchResult::PermanentError(msg) => {
578 assert!(
579 !msg.contains("private/loopback"),
580 "DNS host should not trip SSRF guard, got: {msg}"
581 );
582 }
583 other => panic!("unexpected: {other:?}"),
584 }
585 }
586
587 #[tokio::test]
588 async fn custom_subscription_headers_forwarded() {
589 let server = MockServer::start().await;
590 Mock::given(method("POST"))
591 .and(path("/x"))
592 .and(header("X-Trace-Id", "trace-123"))
593 .respond_with(ResponseTemplate::new(200))
594 .mount(&server)
595 .await;
596
597 let channel = channel_with_noop();
598 let mut sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
599 sub.channel.headers = vec!["X-Trace-Id: trace-123".to_string()];
600 let result = channel
601 .dispatch(&sub, &handshake_bundle(&sub))
602 .await
603 .unwrap();
604 assert!(matches!(result, DispatchResult::Success));
605 }
606}