1pub mod filters;
7pub mod status;
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use helios_fhir::FhirVersion;
13use tracing::{debug, warn};
14
15use crate::error::SubscriptionError;
16use crate::topics::InMemoryTopicRegistry;
17
18pub use filters::SubscriptionFilter;
19pub use status::SubscriptionStatusCode;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PayloadContent {
24 Empty,
26 IdOnly,
28 FullResource,
30}
31
32impl PayloadContent {
33 pub fn from_fhir_str(s: &str) -> Option<Self> {
35 match s {
36 "empty" => Some(Self::Empty),
37 "id-only" => Some(Self::IdOnly),
38 "full-resource" => Some(Self::FullResource),
39 _ => None,
40 }
41 }
42
43 pub fn as_fhir_str(&self) -> &'static str {
45 match self {
46 Self::Empty => "empty",
47 Self::IdOnly => "id-only",
48 Self::FullResource => "full-resource",
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub enum ChannelType {
56 RestHook,
57 Websocket,
58 Email,
59 Message,
60 Custom(String),
61}
62
63impl ChannelType {
64 pub fn from_fhir_str(s: &str) -> Self {
66 match s {
67 "rest-hook" => Self::RestHook,
68 "websocket" => Self::Websocket,
69 "email" => Self::Email,
70 "message" => Self::Message,
71 other => Self::Custom(other.to_string()),
72 }
73 }
74
75 pub fn as_fhir_str(&self) -> &str {
77 match self {
78 Self::RestHook => "rest-hook",
79 Self::Websocket => "websocket",
80 Self::Email => "email",
81 Self::Message => "message",
82 Self::Custom(s) => s,
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct ChannelConfig {
90 pub channel_type: ChannelType,
91 pub endpoint: Option<String>,
92 pub payload_mime_type: Option<String>,
93 pub payload_content: PayloadContent,
94 pub headers: Vec<String>,
95 pub heartbeat_period: Option<u32>,
96 pub timeout: Option<u32>,
97 pub max_count: Option<u32>,
98}
99
100#[derive(Debug, Clone)]
105pub struct ActiveSubscription {
106 pub id: String,
108
109 pub topic_url: String,
111
112 pub status: SubscriptionStatusCode,
114
115 pub channel: ChannelConfig,
117
118 pub filters: Vec<SubscriptionFilter>,
120
121 pub fhir_version: FhirVersion,
123
124 pub events_since_start: u64,
126
127 pub consecutive_failures: u32,
129
130 pub tenant_id: String,
132}
133
134pub struct SubscriptionManager {
136 subscriptions: DashMap<(String, String), ActiveSubscription>,
138
139 topic_registry: Arc<InMemoryTopicRegistry>,
141
142 supported_channels: Vec<String>,
144}
145
146impl SubscriptionManager {
147 pub fn new(
149 topic_registry: Arc<InMemoryTopicRegistry>,
150 supported_channels: Vec<String>,
151 ) -> Self {
152 Self {
153 subscriptions: DashMap::new(),
154 topic_registry,
155 supported_channels,
156 }
157 }
158
159 pub fn register(
164 &self,
165 tenant_id: &str,
166 subscription_id: &str,
167 resource: &serde_json::Value,
168 fhir_version: FhirVersion,
169 ) -> Result<ActiveSubscription, SubscriptionError> {
170 let topic_url = extract_topic_url(resource, fhir_version)?;
172 let channel = extract_channel_config(resource, fhir_version)?;
173 let filter_strings = extract_filter_criteria(resource, fhir_version);
174
175 let topic = self.topic_registry.get_topic(&topic_url).ok_or_else(|| {
177 SubscriptionError::TopicNotFound {
178 url: topic_url.clone(),
179 }
180 })?;
181
182 if !self
184 .supported_channels
185 .contains(&channel.channel_type.as_fhir_str().to_string())
186 {
187 return Err(SubscriptionError::UnsupportedChannel {
188 channel_type: channel.channel_type.as_fhir_str().to_string(),
189 });
190 }
191
192 if channel.channel_type == ChannelType::RestHook && channel.endpoint.is_none() {
194 return Err(SubscriptionError::InvalidEndpoint {
195 message: "rest-hook channel requires an endpoint".to_string(),
196 });
197 }
198
199 if channel.channel_type == ChannelType::Email {
201 let endpoint =
202 channel
203 .endpoint
204 .as_deref()
205 .ok_or_else(|| SubscriptionError::InvalidEndpoint {
206 message: "email channel requires a mailto: endpoint".to_string(),
207 })?;
208 match endpoint.strip_prefix("mailto:") {
209 Some(rest) if !rest.trim().is_empty() => {}
210 _ => {
211 return Err(SubscriptionError::InvalidEndpoint {
212 message: "email endpoint must be a non-empty mailto: URI".to_string(),
213 });
214 }
215 }
216 }
217
218 let mut parsed_filters = Vec::new();
220 for filter_str in &filter_strings {
221 let filter = filters::parse_filter_string(filter_str)?;
222 parsed_filters.push(filter);
223 }
224 filters::validate_filters(&parsed_filters, &topic.can_filter_by)?;
225
226 let status = resource
227 .get("status")
228 .and_then(|v| v.as_str())
229 .and_then(SubscriptionStatusCode::from_fhir_str)
230 .unwrap_or(SubscriptionStatusCode::Requested);
231
232 let subscription = ActiveSubscription {
233 id: subscription_id.to_string(),
234 topic_url,
235 status,
236 channel,
237 filters: parsed_filters,
238 fhir_version,
239 events_since_start: 0,
240 consecutive_failures: 0,
241 tenant_id: tenant_id.to_string(),
242 };
243
244 debug!(
245 tenant = tenant_id,
246 subscription_id,
247 topic = %subscription.topic_url,
248 "Registered subscription"
249 );
250
251 let key = (tenant_id.to_string(), subscription_id.to_string());
252 self.subscriptions.insert(key, subscription.clone());
253
254 Ok(subscription)
255 }
256
257 pub fn deregister(&self, tenant_id: &str, subscription_id: &str) -> bool {
259 let key = (tenant_id.to_string(), subscription_id.to_string());
260 let removed = self.subscriptions.remove(&key).is_some();
261 if removed {
262 debug!(
263 tenant = tenant_id,
264 subscription_id, "Deregistered subscription"
265 );
266 }
267 removed
268 }
269
270 pub fn active_subscriptions_for_topic(
272 &self,
273 tenant_id: &str,
274 topic_url: &str,
275 ) -> Vec<ActiveSubscription> {
276 self.subscriptions
277 .iter()
278 .filter(|entry| {
279 let sub = entry.value();
280 sub.tenant_id == tenant_id
281 && sub.topic_url == topic_url
282 && sub.status == SubscriptionStatusCode::Active
283 })
284 .map(|entry| entry.value().clone())
285 .collect()
286 }
287
288 pub fn get_subscription(
290 &self,
291 tenant_id: &str,
292 subscription_id: &str,
293 ) -> Option<ActiveSubscription> {
294 let key = (tenant_id.to_string(), subscription_id.to_string());
295 self.subscriptions.get(&key).map(|entry| entry.clone())
296 }
297
298 pub fn update_status(
300 &self,
301 tenant_id: &str,
302 subscription_id: &str,
303 new_status: SubscriptionStatusCode,
304 ) -> Result<SubscriptionStatusCode, SubscriptionError> {
305 let key = (tenant_id.to_string(), subscription_id.to_string());
306 let mut entry = self.subscriptions.get_mut(&key).ok_or_else(|| {
307 SubscriptionError::Internal(format!("subscription not found: {subscription_id}"))
308 })?;
309
310 let old_status = entry.status;
311 if !old_status.can_transition_to(new_status) {
312 return Err(SubscriptionError::InvalidStatusTransition {
313 from: old_status.to_string(),
314 to: new_status.to_string(),
315 });
316 }
317
318 entry.status = new_status;
319 debug!(
320 tenant = tenant_id,
321 subscription_id,
322 from = %old_status,
323 to = %new_status,
324 "Status transition"
325 );
326
327 Ok(old_status)
328 }
329
330 pub fn increment_event_count(&self, tenant_id: &str, subscription_id: &str) -> Option<u64> {
332 let key = (tenant_id.to_string(), subscription_id.to_string());
333 let mut entry = self.subscriptions.get_mut(&key)?;
334 entry.events_since_start += 1;
335 Some(entry.events_since_start)
336 }
337
338 pub fn record_failure(&self, tenant_id: &str, subscription_id: &str) -> Option<u32> {
340 let key = (tenant_id.to_string(), subscription_id.to_string());
341 let mut entry = self.subscriptions.get_mut(&key)?;
342 entry.consecutive_failures += 1;
343 let count = entry.consecutive_failures;
344 warn!(
345 tenant = tenant_id,
346 subscription_id,
347 consecutive_failures = count,
348 "Delivery failure recorded"
349 );
350 Some(count)
351 }
352
353 pub fn reset_failures(&self, tenant_id: &str, subscription_id: &str) {
355 let key = (tenant_id.to_string(), subscription_id.to_string());
356 if let Some(mut entry) = self.subscriptions.get_mut(&key) {
357 entry.consecutive_failures = 0;
358 }
359 }
360
361 pub fn all_subscriptions(&self) -> Vec<ActiveSubscription> {
363 self.subscriptions
364 .iter()
365 .map(|e| e.value().clone())
366 .collect()
367 }
368}
369
370fn extract_topic_url(
374 resource: &serde_json::Value,
375 fhir_version: FhirVersion,
376) -> Result<String, SubscriptionError> {
377 match fhir_version {
378 #[cfg(feature = "R4")]
381 FhirVersion::R4 => {
382 if let Some(url) = find_extension_value_url(
384 resource,
385 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
386 ) {
387 return Ok(url);
388 }
389 resource
391 .get("criteria")
392 .and_then(|v| v.as_str())
393 .map(|s| s.to_string())
394 .ok_or_else(|| SubscriptionError::InvalidSubscription {
395 message: "R4 Subscription missing topic URL (criteria or backport-topic-canonical extension)".to_string(),
396 })
397 }
398
399 #[allow(unreachable_patterns)]
401 _ => resource
402 .get("topic")
403 .and_then(|v| v.as_str())
404 .map(|s| s.to_string())
405 .ok_or_else(|| SubscriptionError::InvalidSubscription {
406 message: "Subscription missing 'topic' field".to_string(),
407 }),
408 }
409}
410
411fn extract_channel_config(
413 resource: &serde_json::Value,
414 fhir_version: FhirVersion,
415) -> Result<ChannelConfig, SubscriptionError> {
416 match fhir_version {
417 #[cfg(feature = "R4")]
419 FhirVersion::R4 => {
420 let channel =
421 resource
422 .get("channel")
423 .ok_or_else(|| SubscriptionError::InvalidSubscription {
424 message: "R4 Subscription missing 'channel' field".to_string(),
425 })?;
426
427 let channel_type_str = channel
428 .get("type")
429 .and_then(|v| v.as_str())
430 .unwrap_or("rest-hook");
431
432 let endpoint = channel
433 .get("endpoint")
434 .and_then(|v| v.as_str())
435 .map(String::from);
436
437 let payload_mime_type = channel
438 .get("payload")
439 .and_then(|v| v.as_str())
440 .map(String::from);
441
442 let headers = channel
443 .get("header")
444 .and_then(|v| v.as_array())
445 .map(|arr| {
446 arr.iter()
447 .filter_map(|v| v.as_str().map(String::from))
448 .collect()
449 })
450 .unwrap_or_default();
451
452 let payload_content = find_channel_payload_content_code(resource)
456 .or_else(|| {
457 find_extension_value_code(
458 resource,
459 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
460 )
461 })
462 .and_then(|s| PayloadContent::from_fhir_str(&s))
463 .unwrap_or(PayloadContent::IdOnly);
464
465 let heartbeat_period = find_extension_value_unsigned_int(
466 resource,
467 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-heartbeat-period",
468 );
469
470 let timeout = find_extension_value_unsigned_int(
471 resource,
472 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-timeout",
473 );
474
475 let max_count = find_extension_value_unsigned_int(
476 resource,
477 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-max-count",
478 );
479
480 Ok(ChannelConfig {
481 channel_type: ChannelType::from_fhir_str(channel_type_str),
482 endpoint,
483 payload_mime_type,
484 payload_content,
485 headers,
486 heartbeat_period,
487 timeout,
488 max_count,
489 })
490 }
491
492 #[allow(unreachable_patterns)]
494 _ => {
495 let channel_type_str = resource
496 .get("channelType")
497 .and_then(|v| v.get("code"))
498 .and_then(|v| v.as_str())
499 .unwrap_or("rest-hook");
500
501 let endpoint = resource
502 .get("endpoint")
503 .and_then(|v| v.as_str())
504 .map(String::from);
505
506 let payload_content = resource
507 .get("content")
508 .and_then(|v| v.as_str())
509 .and_then(PayloadContent::from_fhir_str)
510 .unwrap_or(PayloadContent::IdOnly);
511
512 let payload_mime_type = resource
513 .get("contentType")
514 .and_then(|v| v.as_str())
515 .map(String::from);
516
517 let heartbeat_period = resource
518 .get("heartbeatPeriod")
519 .and_then(|v| v.as_u64())
520 .map(|v| v as u32);
521
522 let timeout = resource
523 .get("timeout")
524 .and_then(|v| v.as_u64())
525 .map(|v| v as u32);
526
527 let max_count = resource
528 .get("maxCount")
529 .and_then(|v| v.as_u64())
530 .map(|v| v as u32);
531
532 let headers = resource
533 .get("parameter")
534 .and_then(|v| v.as_array())
535 .map(|arr| {
536 arr.iter()
537 .filter_map(|p| {
538 let name = p.get("name")?.as_str()?;
539 let value = p.get("value")?.as_str()?;
540 Some(format!("{name}: {value}"))
541 })
542 .collect()
543 })
544 .unwrap_or_default();
545
546 Ok(ChannelConfig {
547 channel_type: ChannelType::from_fhir_str(channel_type_str),
548 endpoint,
549 payload_mime_type,
550 payload_content,
551 headers,
552 heartbeat_period,
553 timeout,
554 max_count,
555 })
556 }
557 }
558}
559
560fn extract_filter_criteria(resource: &serde_json::Value, fhir_version: FhirVersion) -> Vec<String> {
562 match fhir_version {
563 #[cfg(feature = "R4")]
565 FhirVersion::R4 => find_all_extension_values_string(
566 resource,
567 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
568 ),
569
570 #[allow(unreachable_patterns)]
572 _ => {
573 let filter_by = match resource.get("filterBy").and_then(|v| v.as_array()) {
574 Some(arr) => arr,
575 None => return Vec::new(),
576 };
577
578 filter_by
579 .iter()
580 .filter_map(|f| {
581 let param = f.get("filterParameter")?.as_str()?;
582 let value = f.get("value")?.as_str()?;
583 let resource_type = f.get("resourceType").and_then(|v| v.as_str());
584 let comparator = f.get("comparator").and_then(|v| v.as_str());
585 let value_with_comparator = comparator
586 .filter(|c| !c.is_empty())
587 .map(|c| format!("{c}:{value}"))
588 .unwrap_or_else(|| value.to_string());
589
590 let filter_str = if let Some(rt) = resource_type {
591 format!("{rt}?{param}={value_with_comparator}")
592 } else {
593 format!("{param}={value_with_comparator}")
594 };
595
596 Some(filter_str)
597 })
598 .collect()
599 }
600 }
601}
602
603#[cfg(feature = "R4")]
607fn find_extension_value_url(resource: &serde_json::Value, url: &str) -> Option<String> {
608 resource
609 .get("extension")?
610 .as_array()?
611 .iter()
612 .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
613 .and_then(|ext| ext.get("valueUrl").or_else(|| ext.get("valueCanonical")))
614 .and_then(|v| v.as_str())
615 .map(String::from)
616}
617
618#[cfg(feature = "R4")]
620fn find_extension_value_code(resource: &serde_json::Value, url: &str) -> Option<String> {
621 resource
622 .get("extension")?
623 .as_array()?
624 .iter()
625 .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
626 .and_then(|ext| ext.get("valueCode"))
627 .and_then(|v| v.as_str())
628 .map(String::from)
629}
630
631#[cfg(feature = "R4")]
632fn find_channel_payload_content_code(resource: &serde_json::Value) -> Option<String> {
633 resource
634 .get("channel")?
635 .get("_payload")?
636 .get("extension")?
637 .as_array()?
638 .iter()
639 .find(|ext| {
640 ext.get("url").and_then(|v| v.as_str())
641 == Some(
642 "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
643 )
644 })
645 .and_then(|ext| ext.get("valueCode"))
646 .and_then(|v| v.as_str())
647 .map(String::from)
648}
649
650#[cfg(feature = "R4")]
652fn find_extension_value_unsigned_int(resource: &serde_json::Value, url: &str) -> Option<u32> {
653 resource
654 .get("extension")?
655 .as_array()?
656 .iter()
657 .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
658 .and_then(|ext| ext.get("valueUnsignedInt"))
659 .and_then(|v| v.as_u64())
660 .map(|v| v as u32)
661}
662
663#[cfg(feature = "R4")]
665fn find_all_extension_values_string(resource: &serde_json::Value, url: &str) -> Vec<String> {
666 resource
667 .get("extension")
668 .and_then(|v| v.as_array())
669 .map(|arr| {
670 arr.iter()
671 .filter(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
672 .filter_map(|ext| {
673 ext.get("valueString")
674 .and_then(|v| v.as_str())
675 .map(String::from)
676 })
677 .collect()
678 })
679 .unwrap_or_default()
680}
681
682#[cfg(test)]
683pub(crate) mod tests {
684 use super::*;
685 use crate::event::ResourceEventType;
686 use crate::topics::{FilterDefinition, ResourceTrigger, TopicDefinition};
687 use serde_json::json;
688
689 fn create_test_registry() -> Arc<InMemoryTopicRegistry> {
690 let registry = Arc::new(InMemoryTopicRegistry::new());
691 registry.add_topic(TopicDefinition {
692 canonical_url: "http://example.org/topic/encounter-start".to_string(),
693 title: Some("Encounter Start".to_string()),
694 resource_triggers: vec![ResourceTrigger {
695 resource_type: "Encounter".to_string(),
696 interactions: vec![ResourceEventType::Create],
697 fhirpath_criteria: None,
698 }],
699 can_filter_by: vec![FilterDefinition {
700 resource_type: Some("Encounter".to_string()),
701 filter_parameter: "patient".to_string(),
702 comparators: vec!["eq".to_string()],
703 modifiers: vec![],
704 }],
705 notification_shape: vec![],
706 });
707 registry
708 }
709
710 pub(crate) fn default_subscription_json() -> serde_json::Value {
713 #[cfg(feature = "R4")]
714 {
715 json!({
716 "resourceType": "Subscription",
717 "id": "sub-1",
718 "status": "requested",
719 "criteria": "http://example.org/topic/encounter-start",
720 "channel": {
721 "type": "rest-hook",
722 "endpoint": "https://example.com/webhook",
723 "payload": "application/fhir+json"
724 },
725 "extension": [
726 {
727 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
728 "valueCanonical": "http://example.org/topic/encounter-start"
729 },
730 {
731 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
732 "valueCode": "id-only"
733 }
734 ]
735 })
736 }
737 #[cfg(not(feature = "R4"))]
738 {
739 json!({
740 "resourceType": "Subscription",
741 "id": "sub-1",
742 "status": "requested",
743 "topic": "http://example.org/topic/encounter-start",
744 "channelType": {
745 "system": "http://terminology.hl7.org/CodeSystem/subscription-channel-type",
746 "code": "rest-hook"
747 },
748 "endpoint": "https://example.com/webhook",
749 "content": "id-only",
750 "contentType": "application/fhir+json"
751 })
752 }
753 }
754
755 #[cfg(feature = "R4")]
756 fn r4_subscription_json() -> serde_json::Value {
757 json!({
758 "resourceType": "Subscription",
759 "id": "sub-r4-1",
760 "status": "requested",
761 "criteria": "http://example.org/topic/encounter-start",
762 "channel": {
763 "type": "rest-hook",
764 "endpoint": "https://example.com/webhook",
765 "payload": "application/fhir+json",
766 "header": ["Authorization: Bearer token123"]
767 },
768 "extension": [
769 {
770 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
771 "valueCanonical": "http://example.org/topic/encounter-start"
772 },
773 {
774 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
775 "valueCode": "id-only"
776 },
777 {
778 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-heartbeat-period",
779 "valueUnsignedInt": 60
780 }
781 ]
782 })
783 }
784
785 #[cfg(feature = "R5")]
786 fn non_r4_test_version() -> FhirVersion {
787 FhirVersion::R5
788 }
789
790 #[cfg(all(not(feature = "R5"), feature = "R4B"))]
791 fn non_r4_test_version() -> FhirVersion {
792 FhirVersion::R4B
793 }
794
795 #[cfg(all(not(feature = "R5"), not(feature = "R4B"), feature = "R6"))]
796 fn non_r4_test_version() -> FhirVersion {
797 FhirVersion::R6
798 }
799
800 #[test]
801 fn test_register_r5_subscription() {
802 let registry = create_test_registry();
803 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
804
805 let resource = default_subscription_json();
806 let sub = manager
807 .register("tenant-1", "sub-1", &resource, FhirVersion::default())
808 .unwrap();
809
810 assert_eq!(sub.id, "sub-1");
811 assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
812 assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
813 assert_eq!(
814 sub.channel.endpoint.as_deref(),
815 Some("https://example.com/webhook")
816 );
817 assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
818 assert_eq!(sub.status, SubscriptionStatusCode::Requested);
819 assert_eq!(sub.events_since_start, 0);
820 }
821
822 #[cfg(any(feature = "R4B", feature = "R5", feature = "R6"))]
823 #[test]
824 fn test_register_native_subscription_topic_and_channeltype_parsing() {
825 let registry = create_test_registry();
826 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
827
828 let resource = json!({
829 "resourceType": "Subscription",
830 "id": "sub-native-1",
831 "status": "requested",
832 "topic": "http://example.org/topic/encounter-start",
833 "channelType": {
834 "system": "http://terminology.hl7.org/CodeSystem/subscription-channel-type",
835 "code": "rest-hook"
836 },
837 "endpoint": "https://example.com/webhook",
838 "contentType": "application/fhir+json",
839 "content": "id-only",
840 "parameter": [{
841 "name": "Authorization",
842 "value": "Bearer native-token"
843 }]
844 });
845
846 let sub = manager
847 .register("tenant-1", "sub-native-1", &resource, non_r4_test_version())
848 .unwrap();
849
850 assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
851 assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
852 assert_eq!(
853 sub.channel.endpoint.as_deref(),
854 Some("https://example.com/webhook")
855 );
856 assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
857 assert_eq!(
858 sub.channel.headers,
859 vec!["Authorization: Bearer native-token".to_string()]
860 );
861 }
862
863 #[cfg(feature = "R4")]
864 #[test]
865 fn test_register_r4_subscription_with_backport_extensions() {
866 let registry = create_test_registry();
867 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
868
869 let resource = r4_subscription_json();
870 let sub = manager
871 .register("tenant-1", "sub-r4-1", &resource, FhirVersion::R4)
872 .unwrap();
873
874 assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
875 assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
876 assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
877 assert_eq!(sub.channel.heartbeat_period, Some(60));
878 assert_eq!(sub.channel.headers.len(), 1);
879 assert!(sub.channel.headers[0].contains("Authorization"));
880 }
881
882 #[cfg(feature = "R4")]
883 #[test]
884 fn test_register_r4_subscription_payload_content_in_channel_payload_extension() {
885 let registry = create_test_registry();
886 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
887
888 let resource = json!({
889 "resourceType": "Subscription",
890 "id": "sub-r4-2",
891 "status": "requested",
892 "criteria": "http://example.org/topic/encounter-start",
893 "channel": {
894 "type": "rest-hook",
895 "endpoint": "https://example.com/webhook",
896 "payload": "application/fhir+json",
897 "_payload": {
898 "extension": [{
899 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
900 "valueCode": "empty"
901 }]
902 }
903 }
904 });
905
906 let sub = manager
907 .register("tenant-1", "sub-r4-2", &resource, FhirVersion::R4)
908 .unwrap();
909
910 assert_eq!(sub.channel.payload_content, PayloadContent::Empty);
911 }
912
913 pub(crate) fn build_subscription_json(
915 topic_url: &str,
916 channel_type: &str,
917 endpoint: Option<&str>,
918 ) -> serde_json::Value {
919 #[cfg(feature = "R4")]
920 {
921 let mut sub = json!({
922 "resourceType": "Subscription",
923 "status": "requested",
924 "criteria": topic_url,
925 "channel": {
926 "type": channel_type
927 },
928 "extension": [{
929 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
930 "valueCanonical": topic_url
931 }]
932 });
933 if let Some(ep) = endpoint {
934 sub["channel"]["endpoint"] = json!(ep);
935 }
936 sub
937 }
938 #[cfg(not(feature = "R4"))]
939 {
940 let mut sub = json!({
941 "resourceType": "Subscription",
942 "status": "requested",
943 "topic": topic_url,
944 "channelType": { "code": channel_type }
945 });
946 if let Some(ep) = endpoint {
947 sub["endpoint"] = json!(ep);
948 }
949 sub
950 }
951 }
952
953 #[test]
954 fn test_register_invalid_topic() {
955 let registry = create_test_registry();
956 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
957
958 let resource = build_subscription_json(
959 "http://nonexistent.org/topic/missing",
960 "rest-hook",
961 Some("https://example.com/webhook"),
962 );
963
964 let result = manager.register("t1", "s1", &resource, FhirVersion::default());
965 assert!(matches!(
966 result,
967 Err(SubscriptionError::TopicNotFound { .. })
968 ));
969 }
970
971 #[test]
972 fn test_register_unsupported_channel() {
973 let registry = create_test_registry();
974 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
975
976 let resource = build_subscription_json(
977 "http://example.org/topic/encounter-start",
978 "email",
979 Some("mailto:test@example.com"),
980 );
981
982 let result = manager.register("t1", "s1", &resource, FhirVersion::default());
983 assert!(matches!(
984 result,
985 Err(SubscriptionError::UnsupportedChannel { .. })
986 ));
987 }
988
989 #[test]
990 fn test_register_rest_hook_missing_endpoint() {
991 let registry = create_test_registry();
992 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
993
994 let resource = build_subscription_json(
995 "http://example.org/topic/encounter-start",
996 "rest-hook",
997 None, );
999
1000 let result = manager.register("t1", "s1", &resource, FhirVersion::default());
1001 assert!(matches!(
1002 result,
1003 Err(SubscriptionError::InvalidEndpoint { .. })
1004 ));
1005 }
1006
1007 #[test]
1008 fn test_deregister() {
1009 let registry = create_test_registry();
1010 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1011
1012 let resource = default_subscription_json();
1013 manager
1014 .register("t1", "sub-1", &resource, FhirVersion::default())
1015 .unwrap();
1016
1017 assert!(manager.deregister("t1", "sub-1"));
1018 assert!(manager.get_subscription("t1", "sub-1").is_none());
1019
1020 assert!(!manager.deregister("t1", "sub-1"));
1022 }
1023
1024 #[test]
1025 fn test_active_subscriptions_for_topic() {
1026 let registry = create_test_registry();
1027 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1028
1029 let resource = default_subscription_json();
1030 manager
1031 .register("t1", "sub-1", &resource, FhirVersion::default())
1032 .unwrap();
1033
1034 let active = manager
1036 .active_subscriptions_for_topic("t1", "http://example.org/topic/encounter-start");
1037 assert!(active.is_empty());
1038
1039 manager
1041 .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1042 .unwrap();
1043
1044 let active = manager
1045 .active_subscriptions_for_topic("t1", "http://example.org/topic/encounter-start");
1046 assert_eq!(active.len(), 1);
1047 assert_eq!(active[0].id, "sub-1");
1048 }
1049
1050 #[test]
1051 fn test_status_transitions() {
1052 let registry = create_test_registry();
1053 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1054
1055 let resource = default_subscription_json();
1056 manager
1057 .register("t1", "sub-1", &resource, FhirVersion::default())
1058 .unwrap();
1059
1060 let old = manager
1062 .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1063 .unwrap();
1064 assert_eq!(old, SubscriptionStatusCode::Requested);
1065
1066 manager
1068 .update_status("t1", "sub-1", SubscriptionStatusCode::Error)
1069 .unwrap();
1070
1071 manager
1073 .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1074 .unwrap();
1075
1076 let result = manager.update_status("t1", "sub-1", SubscriptionStatusCode::Requested);
1078 assert!(matches!(
1079 result,
1080 Err(SubscriptionError::InvalidStatusTransition { .. })
1081 ));
1082 }
1083
1084 #[test]
1085 fn test_increment_event_count() {
1086 let registry = create_test_registry();
1087 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1088
1089 let resource = default_subscription_json();
1090 manager
1091 .register("t1", "sub-1", &resource, FhirVersion::default())
1092 .unwrap();
1093
1094 assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(1));
1095 assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(2));
1096 assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(3));
1097
1098 assert!(manager.increment_event_count("t1", "nonexistent").is_none());
1100 }
1101
1102 #[test]
1103 fn test_record_and_reset_failures() {
1104 let registry = create_test_registry();
1105 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1106
1107 let resource = default_subscription_json();
1108 manager
1109 .register("t1", "sub-1", &resource, FhirVersion::default())
1110 .unwrap();
1111
1112 assert_eq!(manager.record_failure("t1", "sub-1"), Some(1));
1113 assert_eq!(manager.record_failure("t1", "sub-1"), Some(2));
1114
1115 manager.reset_failures("t1", "sub-1");
1116 let sub = manager.get_subscription("t1", "sub-1").unwrap();
1117 assert_eq!(sub.consecutive_failures, 0);
1118 }
1119
1120 #[test]
1121 fn test_tenant_isolation() {
1122 let registry = create_test_registry();
1123 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1124
1125 let resource = default_subscription_json();
1126 manager
1127 .register("tenant-a", "sub-1", &resource, FhirVersion::default())
1128 .unwrap();
1129 manager
1130 .register("tenant-b", "sub-1", &resource, FhirVersion::default())
1131 .unwrap();
1132
1133 assert!(manager.get_subscription("tenant-a", "sub-1").is_some());
1135 assert!(manager.get_subscription("tenant-b", "sub-1").is_some());
1136
1137 manager.deregister("tenant-a", "sub-1");
1139 assert!(manager.get_subscription("tenant-a", "sub-1").is_none());
1140 assert!(manager.get_subscription("tenant-b", "sub-1").is_some());
1141 }
1142
1143 #[test]
1144 fn test_register_with_filter_by() {
1145 let registry = create_test_registry();
1146 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1147
1148 #[cfg(feature = "R4")]
1149 let (resource, fhir_version) = (
1150 json!({
1151 "resourceType": "Subscription",
1152 "status": "requested",
1153 "criteria": "http://example.org/topic/encounter-start",
1154 "channel": {
1155 "type": "rest-hook",
1156 "endpoint": "https://example.com/webhook"
1157 },
1158 "extension": [
1159 {
1160 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
1161 "valueCanonical": "http://example.org/topic/encounter-start"
1162 },
1163 {
1164 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
1165 "valueCode": "full-resource"
1166 },
1167 {
1168 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
1169 "valueString": "Encounter?patient=Patient/123"
1170 }
1171 ]
1172 }),
1173 FhirVersion::R4,
1174 );
1175 #[cfg(all(
1176 not(feature = "R4"),
1177 any(feature = "R4B", feature = "R5", feature = "R6")
1178 ))]
1179 let (resource, fhir_version) = (
1180 json!({
1181 "resourceType": "Subscription",
1182 "status": "requested",
1183 "topic": "http://example.org/topic/encounter-start",
1184 "channelType": { "code": "rest-hook" },
1185 "endpoint": "https://example.com/webhook",
1186 "content": "full-resource",
1187 "filterBy": [{
1188 "resourceType": "Encounter",
1189 "filterParameter": "patient",
1190 "value": "Patient/123"
1191 }]
1192 }),
1193 non_r4_test_version(),
1194 );
1195
1196 let sub = manager
1197 .register("t1", "sub-1", &resource, fhir_version)
1198 .unwrap();
1199
1200 assert_eq!(sub.filters.len(), 1);
1201 assert_eq!(sub.filters[0].filter_parameter, "patient");
1202 assert_eq!(sub.filters[0].value, "Patient/123");
1203 assert_eq!(sub.channel.payload_content, PayloadContent::FullResource);
1204 }
1205
1206 #[cfg(any(feature = "R4B", feature = "R5", feature = "R6"))]
1207 #[test]
1208 fn test_register_with_native_filter_by_comparator() {
1209 let registry = create_test_registry();
1210 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1211
1212 let resource = json!({
1213 "resourceType": "Subscription",
1214 "status": "requested",
1215 "topic": "http://example.org/topic/encounter-start",
1216 "channelType": { "code": "rest-hook" },
1217 "endpoint": "https://example.com/webhook",
1218 "filterBy": [{
1219 "resourceType": "Encounter",
1220 "filterParameter": "patient",
1221 "comparator": "eq",
1222 "value": "Patient/123"
1223 }]
1224 });
1225
1226 let sub = manager
1227 .register("t1", "sub-1", &resource, non_r4_test_version())
1228 .unwrap();
1229
1230 assert_eq!(sub.filters.len(), 1);
1231 assert_eq!(sub.filters[0].filter_parameter, "patient");
1232 assert_eq!(sub.filters[0].comparator, "eq");
1233 assert_eq!(sub.filters[0].value, "Patient/123");
1234 }
1235
1236 #[test]
1237 fn test_register_with_invalid_filter() {
1238 let registry = create_test_registry();
1239 let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1240
1241 #[cfg(feature = "R4")]
1242 let (resource, fhir_version) = (
1243 json!({
1244 "resourceType": "Subscription",
1245 "status": "requested",
1246 "criteria": "http://example.org/topic/encounter-start",
1247 "channel": {
1248 "type": "rest-hook",
1249 "endpoint": "https://example.com/webhook"
1250 },
1251 "extension": [
1252 {
1253 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
1254 "valueCanonical": "http://example.org/topic/encounter-start"
1255 },
1256 {
1257 "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
1258 "valueString": "unknown-param=some-value"
1259 }
1260 ]
1261 }),
1262 FhirVersion::R4,
1263 );
1264 #[cfg(all(
1265 not(feature = "R4"),
1266 any(feature = "R4B", feature = "R5", feature = "R6")
1267 ))]
1268 let (resource, fhir_version) = (
1269 json!({
1270 "resourceType": "Subscription",
1271 "status": "requested",
1272 "topic": "http://example.org/topic/encounter-start",
1273 "channelType": { "code": "rest-hook" },
1274 "endpoint": "https://example.com/webhook",
1275 "filterBy": [{
1276 "filterParameter": "unknown-param",
1277 "value": "some-value"
1278 }]
1279 }),
1280 non_r4_test_version(),
1281 );
1282
1283 let result = manager.register("t1", "sub-1", &resource, fhir_version);
1284 assert!(matches!(
1285 result,
1286 Err(SubscriptionError::InvalidFilter { .. })
1287 ));
1288 }
1289}