use std::sync::Arc;
use chrono::Utc;
use helios_auth::StaticBearerOutboundAuthProvider;
use helios_fhir::FhirVersion;
use helios_persistence::tenant::TenantId;
use helios_subscriptions::manager::SubscriptionStatusCode;
use helios_subscriptions::{
MessagingSettings, ResourceEvent, ResourceEventType, SubscriptionConfig, SubscriptionEngine,
SubscriptionError,
};
use serde_json::{Value, json};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
const TENANT_ID: &str = "tenant-msg";
const TOPIC_URL: &str = "http://example.org/topic/encounter-start";
const SOURCE_ENDPOINT: &str = "http://localhost:8080/fhir";
fn current_fhir_version() -> FhirVersion {
match std::env::var("HFS_TEST_FHIR_VERSION").ok().as_deref() {
#[cfg(feature = "R4B")]
Some("R4B") => FhirVersion::R4B,
#[cfg(feature = "R5")]
Some("R5") => FhirVersion::R5,
#[cfg(feature = "R6")]
Some("R6") => FhirVersion::R6,
_ => FhirVersion::default(),
}
}
fn uses_backport_ig() -> bool {
current_fhir_version() == FhirVersion::R4
}
fn topic_resource() -> Value {
if uses_backport_ig() {
json!({
"resourceType": "Basic",
"id": "topic-msg",
"code": {
"coding": [{
"system": "http://hl7.org/fhir/fhir-types",
"code": "SubscriptionTopic"
}]
},
"extension": [{
"url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.url",
"valueUri": TOPIC_URL
}, {
"url": "http://hl7.org/fhir/4.3/StructureDefinition/extension-SubscriptionTopic.resourceTrigger",
"extension": [{
"url": "resource",
"valueUri": "http://hl7.org/fhir/StructureDefinition/Encounter"
}, {
"url": "supportedInteraction",
"valueCode": "create"
}]
}]
})
} else {
json!({
"resourceType": "SubscriptionTopic",
"id": "topic-msg",
"url": TOPIC_URL,
"status": "active",
"resourceTrigger": [{
"resource": "Encounter",
"supportedInteraction": ["create"]
}]
})
}
}
fn message_subscription_resource(endpoint: &str) -> Value {
if uses_backport_ig() {
json!({
"resourceType": "Subscription",
"id": "sub-msg",
"status": "requested",
"criteria": TOPIC_URL,
"channel": {
"type": "message",
"endpoint": endpoint,
"payload": "application/fhir+json"
}
})
} else {
json!({
"resourceType": "Subscription",
"id": "sub-msg",
"status": "requested",
"topic": TOPIC_URL,
"channelType": {
"system": "http://terminology.hl7.org/CodeSystem/subscription-channel-type",
"code": "message"
},
"endpoint": endpoint,
"contentType": "application/fhir+json",
"content": "id-only"
})
}
}
fn topic_create_event() -> ResourceEvent {
let resource = topic_resource();
let resource_type = resource
.get("resourceType")
.and_then(Value::as_str)
.unwrap_or("SubscriptionTopic")
.to_string();
ResourceEvent {
tenant_id: TenantId::new(TENANT_ID),
fhir_version: current_fhir_version(),
resource_type,
resource_id: "topic-msg".to_string(),
version_id: "1".to_string(),
event_type: ResourceEventType::Create,
resource: Some(resource),
previous_resource: None,
timestamp: Utc::now(),
}
}
fn subscription_create_event(subscription_resource: Value) -> ResourceEvent {
ResourceEvent {
tenant_id: TenantId::new(TENANT_ID),
fhir_version: current_fhir_version(),
resource_type: "Subscription".to_string(),
resource_id: "sub-msg".to_string(),
version_id: "1".to_string(),
event_type: ResourceEventType::Create,
resource: Some(subscription_resource),
previous_resource: None,
timestamp: Utc::now(),
}
}
fn encounter_create_event() -> ResourceEvent {
ResourceEvent {
tenant_id: TenantId::new(TENANT_ID),
fhir_version: current_fhir_version(),
resource_type: "Encounter".to_string(),
resource_id: "enc-msg".to_string(),
version_id: "1".to_string(),
event_type: ResourceEventType::Create,
resource: Some(json!({
"resourceType": "Encounter",
"id": "enc-msg",
"status": "in-progress"
})),
previous_resource: None,
timestamp: Utc::now(),
}
}
fn header_value<'a>(request: &'a Request, name: &str) -> Option<&'a str> {
request
.headers
.get(name)
.and_then(|value| value.to_str().ok())
}
fn message_header_event_code(message_header: &Value) -> Option<&str> {
if let Some(uri) = message_header.get("eventUri").and_then(Value::as_str) {
if uri == TOPIC_URL {
return Some("topic-uri");
}
return Some(uri);
}
message_header
.get("eventCoding")
.and_then(|c| c.get("code"))
.and_then(Value::as_str)
}
fn engine_with_messaging_enabled() -> SubscriptionEngine {
let config = SubscriptionConfig {
max_retries: 1,
supported_channel_types: vec!["rest-hook".to_string(), "message".to_string()],
messaging: Some(MessagingSettings {
source_endpoint: SOURCE_ENDPOINT.to_string(),
allow_private_endpoints: true, }),
..Default::default()
};
SubscriptionEngine::with_outbound_auth(
config,
SOURCE_ENDPOINT.to_string(),
Arc::new(StaticBearerOutboundAuthProvider::new("integration-bearer")),
)
}
#[tokio::test]
async fn messaging_handshake_and_event_notifications_follow_expected_flow() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/process-message"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
let engine = engine_with_messaging_enabled();
engine.on_resource_event(topic_create_event()).await;
let endpoint = format!("{}/process-message", server.uri());
engine
.on_resource_event(subscription_create_event(message_subscription_resource(
&endpoint,
)))
.await;
let registered = engine
.manager()
.get_subscription(TENANT_ID, "sub-msg")
.expect("subscription should be registered");
assert_eq!(registered.status, SubscriptionStatusCode::Active);
engine.on_resource_event(encounter_create_event()).await;
let requests = server
.received_requests()
.await
.expect("should retrieve requests from mock server");
assert_eq!(
requests.len(),
2,
"expected handshake + event notification messages"
);
for request in &requests {
assert_eq!(request.method.as_str(), "POST");
assert_eq!(request.url.path(), "/process-message");
assert_eq!(
header_value(request, "content-type"),
Some("application/fhir+json")
);
assert_eq!(
header_value(request, "authorization"),
Some("Bearer integration-bearer"),
"outbound bearer token should be attached"
);
let body: Value =
serde_json::from_slice(&request.body).expect("request body should be JSON");
assert_eq!(body["resourceType"], "Bundle");
assert_eq!(
body["type"], "message",
"messaging channel must produce Bundle.type = message"
);
let entries = body["entry"]
.as_array()
.expect("bundle entry should be an array");
let header = &entries[0]["resource"];
assert_eq!(header["resourceType"], "MessageHeader");
assert_eq!(header["source"]["endpoint"], SOURCE_ENDPOINT);
assert_eq!(header["destination"][0]["endpoint"], endpoint);
let focus = header["focus"][0]["reference"]
.as_str()
.expect("focus reference present");
assert!(focus.starts_with("Subscription/"));
let event_code = message_header_event_code(header).expect("event identifier present");
assert!(
event_code == TOPIC_URL
|| event_code == "topic-uri"
|| event_code == "handshake"
|| event_code == "event-notification",
"unexpected event identifier: {event_code}"
);
assert!(
entries.len() >= 2,
"expected MessageHeader + at least the SubscriptionStatus entry"
);
}
let updated = engine
.manager()
.get_subscription(TENANT_ID, "sub-msg")
.expect("subscription still registered after dispatch");
assert_eq!(updated.events_since_start, 1);
}
#[tokio::test]
async fn subscription_authorization_header_takes_precedence() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/process-message"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
let engine = engine_with_messaging_enabled();
engine.on_resource_event(topic_create_event()).await;
let endpoint = format!("{}/process-message", server.uri());
let mut subscription_resource = message_subscription_resource(&endpoint);
if uses_backport_ig() {
subscription_resource["channel"]["header"] =
json!(["Authorization: Bearer subscriber-token"]);
} else {
subscription_resource["parameter"] = json!([
{"name": "Authorization", "value": "Bearer subscriber-token"}
]);
}
engine
.on_resource_event(subscription_create_event(subscription_resource))
.await;
engine.on_resource_event(encounter_create_event()).await;
let requests = server.received_requests().await.unwrap();
assert!(!requests.is_empty(), "at least one delivery expected");
for request in &requests {
assert_eq!(
header_value(request, "authorization"),
Some("Bearer subscriber-token"),
"subscription-supplied Authorization wins over outbound bearer"
);
}
}
#[tokio::test]
async fn message_channel_rejected_when_messaging_disabled() {
let engine =
SubscriptionEngine::new(SubscriptionConfig::default(), SOURCE_ENDPOINT.to_string());
engine.on_resource_event(topic_create_event()).await;
let result = engine.manager().register(
TENANT_ID,
"sub-msg-disabled",
&message_subscription_resource("https://receiver.example.org/process-message"),
current_fhir_version(),
);
assert!(
matches!(
&result,
Err(SubscriptionError::UnsupportedChannel { channel_type }) if channel_type == "message"
),
"expected UnsupportedChannel error, got {result:?}"
);
}
#[tokio::test]
async fn tenant_isolation_does_not_cross_dispatch_boundaries() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/process-message"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
let engine = engine_with_messaging_enabled();
engine.on_resource_event(topic_create_event()).await;
let endpoint = format!("{}/process-message", server.uri());
engine
.on_resource_event(subscription_create_event(message_subscription_resource(
&endpoint,
)))
.await;
let other_event = ResourceEvent {
tenant_id: TenantId::new("tenant-other"),
fhir_version: current_fhir_version(),
resource_type: "Encounter".to_string(),
resource_id: "enc-other".to_string(),
version_id: "1".to_string(),
event_type: ResourceEventType::Create,
resource: Some(json!({"resourceType": "Encounter", "id": "enc-other"})),
previous_resource: None,
timestamp: Utc::now(),
};
engine.on_resource_event(other_event).await;
let requests = server.received_requests().await.unwrap();
assert_eq!(
requests.len(),
1,
"no event delivery should cross tenant boundaries"
);
}