Skip to main content

helios_subscriptions/channels/
messaging.rs

1//! FHIR Messaging channel dispatcher.
2//!
3//! Wraps notification bundles in a FHIR `Bundle.type = "message"` (with a
4//! `MessageHeader` first entry) and POSTs them to the subscriber's
5//! `$process-message` endpoint.
6//!
7//! # Security
8//!
9//! Three layers of protection:
10//!
11//! 1. **TLS enforcement** — full-resource payloads are rejected over plain
12//!    HTTP (matches the rest-hook channel).
13//! 2. **SSRF guard** — receiver endpoints whose host is a literal loopback /
14//!    private / link-local / unspecified IP are rejected by default. Operators
15//!    set `HFS_SUBSCRIPTION_ALLOW_PRIVATE_ENDPOINTS=true` for local development.
16//! 3. **Outbound auth** — when `HFS_OUTBOUND_BEARER_TOKEN` is configured, an
17//!    `Authorization: Bearer <token>` header is attached to every outgoing
18//!    request. Subscription-supplied `Authorization` headers take precedence.
19
20use std::net::IpAddr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use helios_auth::OutboundAuthProvider;
26use reqwest::{Client, Url};
27use tracing::{debug, warn};
28
29use crate::channels::{ChannelDispatcher, DispatchResult};
30use crate::error::SubscriptionError;
31use crate::manager::{ActiveSubscription, PayloadContent};
32use crate::notification::{notification_type_from_bundle, wrap_as_message_bundle};
33
34/// FHIR Messaging channel implementation.
35pub struct MessagingChannel {
36    client: Client,
37    auth_provider: Arc<dyn OutboundAuthProvider>,
38    source_endpoint: String,
39    /// When `true`, dispatch to private/loopback IPs is allowed. Default
40    /// `false`; tests and local dev opt in.
41    private_endpoints_allowed: bool,
42}
43
44impl MessagingChannel {
45    /// Create a messaging channel with the given source endpoint (HFS base
46    /// URL) and outbound auth provider.
47    pub fn new(source_endpoint: String, auth_provider: Arc<dyn OutboundAuthProvider>) -> Self {
48        let client = Client::builder()
49            .timeout(Duration::from_secs(30))
50            .build()
51            .expect("failed to build HTTP client");
52
53        Self {
54            client,
55            auth_provider,
56            source_endpoint,
57            private_endpoints_allowed: false,
58        }
59    }
60
61    /// Replace the HTTP client (used by tests).
62    pub fn with_client(mut self, client: Client) -> Self {
63        self.client = client;
64        self
65    }
66
67    /// Enable or disable dispatch to private/loopback IPs. Tests typically
68    /// enable this to dispatch to `127.0.0.1` mock servers.
69    pub fn with_private_endpoints_allowed(mut self, allowed: bool) -> Self {
70        self.private_endpoints_allowed = allowed;
71        self
72    }
73
74    async fn send(
75        &self,
76        subscription: &ActiveSubscription,
77        bundle: &serde_json::Value,
78    ) -> Result<DispatchResult, SubscriptionError> {
79        let endpoint = subscription.channel.endpoint.as_deref().ok_or_else(|| {
80            SubscriptionError::InvalidEndpoint {
81                message: "messaging channel requires an endpoint".to_string(),
82            }
83        })?;
84
85        // Enforce TLS for full-resource payloads.
86        if subscription.channel.payload_content == PayloadContent::FullResource
87            && !endpoint.starts_with("https://")
88        {
89            return Ok(DispatchResult::PermanentError(
90                "full-resource payload requires HTTPS endpoint".to_string(),
91            ));
92        }
93
94        // SSRF guard.
95        if !self.private_endpoints_allowed {
96            if let Some(reason) = check_endpoint_host(endpoint) {
97                return Ok(DispatchResult::PermanentError(reason));
98            }
99        }
100
101        // Wrap the prebuilt notification bundle as a message bundle.
102        let notification_type = notification_type_from_bundle(bundle);
103        let message_bundle = wrap_as_message_bundle(
104            bundle,
105            subscription,
106            &self.source_endpoint,
107            notification_type,
108        )?;
109
110        let mut request = self
111            .client
112            .post(endpoint)
113            .header("Content-Type", "application/fhir+json")
114            .json(&message_bundle);
115
116        // Track whether the subscription supplies an Authorization header
117        // BEFORE we attach the outbound provider's token — subscription
118        // wins.
119        let mut subscription_supplies_auth = false;
120        for header in &subscription.channel.headers {
121            if let Some((name, value)) = header.split_once(':') {
122                let name = name.trim();
123                let value = value.trim();
124                if name.eq_ignore_ascii_case("authorization") {
125                    subscription_supplies_auth = true;
126                }
127                request = request.header(name, value);
128            }
129        }
130
131        if !subscription_supplies_auth {
132            request = self.auth_provider.authorize(request, endpoint).await?;
133        }
134
135        debug!(
136            subscription_id = %subscription.id,
137            endpoint,
138            "Dispatching messaging notification"
139        );
140
141        match request.send().await {
142            Ok(response) => {
143                let status = response.status().as_u16();
144                if response.status().is_success() {
145                    debug!(
146                        subscription_id = %subscription.id,
147                        status,
148                        "Message delivered successfully"
149                    );
150                    Ok(DispatchResult::Success)
151                } else if response.status().is_client_error() {
152                    warn!(
153                        subscription_id = %subscription.id,
154                        status,
155                        "Message delivery failed (client error)"
156                    );
157                    Ok(DispatchResult::PermanentError(format!("HTTP {status}")))
158                } else {
159                    warn!(
160                        subscription_id = %subscription.id,
161                        status,
162                        "Message delivery failed (server error)"
163                    );
164                    Ok(DispatchResult::RetryableError(format!("HTTP {status}")))
165                }
166            }
167            Err(e) => {
168                if e.is_timeout() {
169                    warn!(
170                        subscription_id = %subscription.id,
171                        "Message delivery timed out"
172                    );
173                    Ok(DispatchResult::RetryableError("timeout".to_string()))
174                } else if e.is_connect() {
175                    warn!(
176                        subscription_id = %subscription.id,
177                        error = %e,
178                        "Connection failed"
179                    );
180                    Ok(DispatchResult::RetryableError(format!(
181                        "connection error: {e}"
182                    )))
183                } else {
184                    warn!(
185                        subscription_id = %subscription.id,
186                        error = %e,
187                        "Message delivery failed"
188                    );
189                    Ok(DispatchResult::RetryableError(e.to_string()))
190                }
191            }
192        }
193    }
194}
195
196/// Inspect the receiver URL and return a rejection reason if its host is a
197/// literal private/loopback/link-local/unspecified IP. DNS-resolved private
198/// targets are not blocked by this check.
199fn check_endpoint_host(endpoint: &str) -> Option<String> {
200    let url = match Url::parse(endpoint) {
201        Ok(u) => u,
202        Err(_) => return Some(format!("invalid endpoint URL: {endpoint}")),
203    };
204
205    let host = url.host_str()?;
206    let ip: IpAddr = match host.parse() {
207        Ok(ip) => ip,
208        Err(_) => return None, // hostname — not an SSRF concern at this layer
209    };
210
211    let is_blocked = match ip {
212        IpAddr::V4(v4) => {
213            v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified()
214        }
215        IpAddr::V6(v6) => v6.is_loopback() || v6.is_unspecified() || is_v6_private_or_link(&v6),
216    };
217
218    if is_blocked {
219        Some(format!(
220            "endpoint host {ip} is a private/loopback IP; set HFS_SUBSCRIPTION_ALLOW_PRIVATE_ENDPOINTS=true to allow"
221        ))
222    } else {
223        None
224    }
225}
226
227fn is_v6_private_or_link(addr: &std::net::Ipv6Addr) -> bool {
228    let segments = addr.segments();
229    // fc00::/7 — Unique Local Addresses (RFC 4193)
230    let unique_local = (segments[0] & 0xfe00) == 0xfc00;
231    // fe80::/10 — Link-Local Addresses
232    let link_local = (segments[0] & 0xffc0) == 0xfe80;
233    unique_local || link_local
234}
235
236#[async_trait]
237impl ChannelDispatcher for MessagingChannel {
238    async fn dispatch(
239        &self,
240        subscription: &ActiveSubscription,
241        notification_bundle: &serde_json::Value,
242    ) -> Result<DispatchResult, SubscriptionError> {
243        self.send(subscription, notification_bundle).await
244    }
245
246    async fn handshake(
247        &self,
248        subscription: &ActiveSubscription,
249        handshake_bundle: &serde_json::Value,
250    ) -> Result<DispatchResult, SubscriptionError> {
251        self.send(subscription, handshake_bundle).await
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
259    use crate::notification::{NotificationType, build_event_notification, build_handshake};
260    use chrono::Utc;
261    use helios_auth::{NoOpOutboundAuthProvider, StaticBearerOutboundAuthProvider};
262    use helios_fhir::FhirVersion;
263    use serde_json::Value;
264    use wiremock::matchers::{header, method, path};
265    use wiremock::{Mock, MockServer, ResponseTemplate};
266
267    const SOURCE: &str = "https://hfs.example.org/fhir";
268
269    fn test_subscription(endpoint: &str, payload: PayloadContent) -> ActiveSubscription {
270        ActiveSubscription {
271            id: "sub-msg-1".to_string(),
272            topic_url: "http://example.org/topic/test".to_string(),
273            status: SubscriptionStatusCode::Active,
274            channel: ChannelConfig {
275                channel_type: ChannelType::Message,
276                endpoint: Some(endpoint.to_string()),
277                payload_mime_type: Some("application/fhir+json".to_string()),
278                payload_content: payload,
279                headers: vec![],
280                heartbeat_period: None,
281                timeout: None,
282                max_count: None,
283            },
284            filters: vec![],
285            fhir_version: FhirVersion::default(),
286            events_since_start: 0,
287            consecutive_failures: 0,
288            tenant_id: "test".to_string(),
289        }
290    }
291
292    fn channel_with_noop() -> MessagingChannel {
293        MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider))
294            .with_private_endpoints_allowed(true)
295    }
296
297    fn channel_with_bearer(token: &str) -> MessagingChannel {
298        MessagingChannel::new(
299            SOURCE.to_string(),
300            Arc::new(StaticBearerOutboundAuthProvider::new(token)),
301        )
302        .with_private_endpoints_allowed(true)
303    }
304
305    fn handshake_bundle(sub: &ActiveSubscription) -> Value {
306        build_handshake(sub, SOURCE).unwrap()
307    }
308
309    fn event_bundle(sub: &ActiveSubscription) -> Value {
310        let event = crate::notification::NotificationEventData {
311            event_number: 1,
312            timestamp: Utc::now(),
313            focus_reference: "Encounter/abc".to_string(),
314        };
315        let resource = serde_json::json!({"resourceType": "Encounter", "id": "abc"});
316        build_event_notification(sub, event, Some(&resource), SOURCE).unwrap()
317    }
318
319    #[tokio::test]
320    async fn successful_delivery() {
321        let server = MockServer::start().await;
322        Mock::given(method("POST"))
323            .and(path("/process-message"))
324            .respond_with(ResponseTemplate::new(200))
325            .mount(&server)
326            .await;
327
328        let channel = channel_with_noop();
329        let sub = test_subscription(
330            &format!("{}/process-message", server.uri()),
331            PayloadContent::IdOnly,
332        );
333        let result = channel
334            .dispatch(&sub, &handshake_bundle(&sub))
335            .await
336            .unwrap();
337
338        assert!(matches!(result, DispatchResult::Success));
339    }
340
341    #[tokio::test]
342    async fn server_error_is_retryable() {
343        let server = MockServer::start().await;
344        Mock::given(method("POST"))
345            .and(path("/x"))
346            .respond_with(ResponseTemplate::new(503))
347            .mount(&server)
348            .await;
349
350        let channel = channel_with_noop();
351        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
352        let result = channel
353            .dispatch(&sub, &handshake_bundle(&sub))
354            .await
355            .unwrap();
356        assert!(matches!(result, DispatchResult::RetryableError(_)));
357    }
358
359    #[tokio::test]
360    async fn client_error_is_permanent() {
361        let server = MockServer::start().await;
362        Mock::given(method("POST"))
363            .and(path("/x"))
364            .respond_with(ResponseTemplate::new(404))
365            .mount(&server)
366            .await;
367
368        let channel = channel_with_noop();
369        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
370        let result = channel
371            .dispatch(&sub, &handshake_bundle(&sub))
372            .await
373            .unwrap();
374        assert!(matches!(result, DispatchResult::PermanentError(_)));
375    }
376
377    #[tokio::test]
378    async fn outbound_bearer_attached_when_subscription_does_not_supply_one() {
379        let server = MockServer::start().await;
380        Mock::given(method("POST"))
381            .and(path("/x"))
382            .and(header("Authorization", "Bearer outbound-token"))
383            .and(header("Content-Type", "application/fhir+json"))
384            .respond_with(ResponseTemplate::new(200))
385            .mount(&server)
386            .await;
387
388        let channel = channel_with_bearer("outbound-token");
389        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
390        let result = channel
391            .dispatch(&sub, &handshake_bundle(&sub))
392            .await
393            .unwrap();
394        assert!(matches!(result, DispatchResult::Success));
395    }
396
397    #[tokio::test]
398    async fn subscription_authorization_takes_precedence_over_outbound() {
399        let server = MockServer::start().await;
400        Mock::given(method("POST"))
401            .and(path("/x"))
402            .and(header("Authorization", "Bearer subscriber-token"))
403            .respond_with(ResponseTemplate::new(200))
404            .mount(&server)
405            .await;
406
407        let channel = channel_with_bearer("outbound-token");
408        let mut sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
409        sub.channel.headers = vec!["Authorization: Bearer subscriber-token".to_string()];
410        let result = channel
411            .dispatch(&sub, &handshake_bundle(&sub))
412            .await
413            .unwrap();
414        assert!(
415            matches!(result, DispatchResult::Success),
416            "subscription-supplied Authorization header should take precedence; got {result:?}"
417        );
418
419        // Confirm the bearer token sent on the wire is the subscriber's, not
420        // the outbound provider's.
421        let received = &server.received_requests().await.unwrap()[0];
422        let auth = received
423            .headers
424            .get("authorization")
425            .expect("authorization header present");
426        assert_eq!(auth.to_str().unwrap(), "Bearer subscriber-token");
427    }
428
429    #[tokio::test]
430    async fn body_is_message_bundle_with_message_header_first() {
431        let server = MockServer::start().await;
432        Mock::given(method("POST"))
433            .and(path("/x"))
434            .respond_with(ResponseTemplate::new(200))
435            .mount(&server)
436            .await;
437
438        let channel = channel_with_noop();
439        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
440        let _ = channel
441            .dispatch(&sub, &handshake_bundle(&sub))
442            .await
443            .unwrap();
444
445        let received = &server.received_requests().await.unwrap()[0];
446        let body: Value = serde_json::from_slice(&received.body).unwrap();
447        assert_eq!(body["resourceType"], "Bundle");
448        assert_eq!(body["type"], "message");
449        assert_eq!(
450            body["entry"][0]["resource"]["resourceType"],
451            "MessageHeader"
452        );
453    }
454
455    #[tokio::test]
456    async fn handshake_bundle_carries_handshake_event_code() {
457        let server = MockServer::start().await;
458        Mock::given(method("POST"))
459            .and(path("/x"))
460            .respond_with(ResponseTemplate::new(200))
461            .mount(&server)
462            .await;
463
464        let channel = channel_with_noop();
465        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
466        let _ = channel
467            .handshake(&sub, &handshake_bundle(&sub))
468            .await
469            .unwrap();
470
471        let received = &server.received_requests().await.unwrap()[0];
472        let body: Value = serde_json::from_slice(&received.body).unwrap();
473        let header = &body["entry"][0]["resource"];
474        // R4 emits eventUri, R4B+ emits eventCoding; one of them must
475        // identify this as a handshake.
476        let handshake_code = header
477            .get("eventCoding")
478            .and_then(|c| c.get("code"))
479            .and_then(Value::as_str);
480        let event_uri = header.get("eventUri").and_then(Value::as_str);
481        let nt = notification_type_from_bundle(&handshake_bundle(&sub));
482        assert_eq!(nt, NotificationType::Handshake);
483        if let Some(code) = handshake_code {
484            assert_eq!(code, "handshake");
485        } else {
486            assert!(event_uri.is_some(), "R4 event URI must be set");
487        }
488    }
489
490    #[tokio::test]
491    async fn tls_enforced_for_full_resource() {
492        let channel = channel_with_noop();
493        let sub = test_subscription(
494            "http://insecure.example.com/x",
495            PayloadContent::FullResource,
496        );
497        let result = channel.dispatch(&sub, &event_bundle(&sub)).await.unwrap();
498        assert!(matches!(result, DispatchResult::PermanentError(_)));
499    }
500
501    #[tokio::test]
502    async fn missing_endpoint_errors() {
503        let channel = channel_with_noop();
504        let mut sub = test_subscription("http://example.com/x", PayloadContent::IdOnly);
505        sub.channel.endpoint = None;
506        let result = channel.dispatch(&sub, &handshake_bundle(&sub)).await;
507        assert!(matches!(
508            result,
509            Err(SubscriptionError::InvalidEndpoint { .. })
510        ));
511    }
512
513    #[tokio::test]
514    async fn ssrf_guard_rejects_loopback_when_disabled() {
515        // Default: private_endpoints_allowed = false.
516        let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
517        let sub = test_subscription("http://127.0.0.1:9/x", PayloadContent::IdOnly);
518        let result = channel
519            .dispatch(&sub, &handshake_bundle(&sub))
520            .await
521            .unwrap();
522        match result {
523            DispatchResult::PermanentError(msg) => {
524                assert!(msg.contains("127.0.0.1"));
525            }
526            other => panic!("expected PermanentError, got {other:?}"),
527        }
528    }
529
530    #[tokio::test]
531    async fn ssrf_guard_rejects_private_v4_when_disabled() {
532        let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
533        let sub = test_subscription("http://10.0.0.1:9/x", PayloadContent::IdOnly);
534        let result = channel
535            .dispatch(&sub, &handshake_bundle(&sub))
536            .await
537            .unwrap();
538        assert!(matches!(result, DispatchResult::PermanentError(_)));
539    }
540
541    #[tokio::test]
542    async fn ssrf_guard_allows_loopback_when_enabled() {
543        let server = MockServer::start().await;
544        Mock::given(method("POST"))
545            .and(path("/x"))
546            .respond_with(ResponseTemplate::new(200))
547            .mount(&server)
548            .await;
549
550        let channel = channel_with_noop(); // already has private_endpoints_allowed=true
551        let sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
552        let result = channel
553            .dispatch(&sub, &handshake_bundle(&sub))
554            .await
555            .unwrap();
556        assert!(matches!(result, DispatchResult::Success));
557    }
558
559    #[tokio::test]
560    async fn ssrf_guard_passes_through_dns_names() {
561        // We don't block DNS-resolved hosts at this layer; rely on network policy.
562        // Use a hostname that won't resolve so we get a connection error
563        // (still proves the guard didn't trip).
564        let channel = MessagingChannel::new(SOURCE.to_string(), Arc::new(NoOpOutboundAuthProvider));
565        let sub = test_subscription(
566            "http://nonexistent.invalid.example.test/x",
567            PayloadContent::IdOnly,
568        );
569        let result = channel
570            .dispatch(&sub, &handshake_bundle(&sub))
571            .await
572            .unwrap();
573        // Connection error is retryable; the SSRF guard would have produced a
574        // PermanentError with a recognizable message.
575        match result {
576            DispatchResult::RetryableError(_) => {}
577            DispatchResult::PermanentError(msg) => {
578                assert!(
579                    !msg.contains("private/loopback"),
580                    "DNS host should not trip SSRF guard, got: {msg}"
581                );
582            }
583            other => panic!("unexpected: {other:?}"),
584        }
585    }
586
587    #[tokio::test]
588    async fn custom_subscription_headers_forwarded() {
589        let server = MockServer::start().await;
590        Mock::given(method("POST"))
591            .and(path("/x"))
592            .and(header("X-Trace-Id", "trace-123"))
593            .respond_with(ResponseTemplate::new(200))
594            .mount(&server)
595            .await;
596
597        let channel = channel_with_noop();
598        let mut sub = test_subscription(&format!("{}/x", server.uri()), PayloadContent::IdOnly);
599        sub.channel.headers = vec!["X-Trace-Id: trace-123".to_string()];
600        let result = channel
601            .dispatch(&sub, &handshake_bundle(&sub))
602            .await
603            .unwrap();
604        assert!(matches!(result, DispatchResult::Success));
605    }
606}