1pub mod retry;
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use tracing::{debug, error, info, warn};
13
14use crate::channels::email::EmailChannel;
15use crate::channels::messaging::MessagingChannel;
16use crate::channels::rest_hook::RestHookChannel;
17use crate::channels::websocket::WebSocketChannel;
18use crate::channels::ws_manager::WebSocketManager;
19use crate::channels::ws_token::WsBindingTokenManager;
20use crate::channels::{ChannelDispatcher, DispatchResult};
21use crate::config::SubscriptionConfig;
22use crate::evaluator::EventEvaluator;
23use crate::event::{ResourceEvent, ResourceEventType};
24use crate::manager::{
25 ActiveSubscription, ChannelType, SubscriptionManager, SubscriptionStatusCode,
26};
27use crate::notification::{self, NotificationEventData};
28use crate::topics::InMemoryTopicRegistry;
29use helios_auth::{NoOpOutboundAuthProvider, OutboundAuthProvider};
30
31pub struct SubscriptionEngine {
36 topic_registry: Arc<InMemoryTopicRegistry>,
37 topic_resource_index: DashMap<(String, String, String), String>,
38 manager: Arc<SubscriptionManager>,
39 evaluator: EventEvaluator,
40 rest_hook_channel: Arc<RestHookChannel>,
41 ws_manager: Arc<WebSocketManager>,
42 ws_channel: Arc<WebSocketChannel>,
43 ws_token_manager: Arc<WsBindingTokenManager>,
44 email_channel: Option<Arc<EmailChannel>>,
45 messaging_channel: Option<Arc<MessagingChannel>>,
46 config: SubscriptionConfig,
47 base_url: String,
48}
49
50fn calculate_handshake_retry_delay(
51 config: &SubscriptionConfig,
52 attempt: u32,
53) -> std::time::Duration {
54 let exponent = attempt.saturating_sub(1) as i32;
55 let delay_secs = config.handshake_retry_initial_delay.as_secs_f64()
56 * config.retry_backoff_factor.powi(exponent);
57 let capped = delay_secs.min(config.handshake_retry_max_delay.as_secs_f64());
58
59 std::time::Duration::from_secs_f64(capped)
60}
61
62impl SubscriptionEngine {
63 pub fn new(config: SubscriptionConfig, base_url: String) -> Self {
65 Self::with_outbound_auth(config, base_url, Arc::new(NoOpOutboundAuthProvider))
66 }
67
68 pub fn with_outbound_auth(
75 config: SubscriptionConfig,
76 base_url: String,
77 outbound_auth: Arc<dyn OutboundAuthProvider>,
78 ) -> Self {
79 let topic_registry = Arc::new(InMemoryTopicRegistry::new());
80 let manager = Arc::new(SubscriptionManager::new(
81 Arc::clone(&topic_registry),
82 config.supported_channel_types.clone(),
83 ));
84 let evaluator = EventEvaluator::new(Arc::clone(&topic_registry), Arc::clone(&manager));
85 let rest_hook_channel = Arc::new(RestHookChannel::new());
86 let ws_manager = Arc::new(WebSocketManager::new());
87 let ws_channel = Arc::new(WebSocketChannel::new(Arc::clone(&ws_manager)));
88 let ws_token_manager = Arc::new(WsBindingTokenManager::new(config.ws_token_lifetime_secs));
89 let email_channel = match &config.smtp {
90 Some(settings) => match EmailChannel::new(settings.clone()) {
91 Ok(ch) => Some(Arc::new(ch)),
92 Err(e) => {
93 warn!(error = %e, "Failed to initialize email channel; email dispatch disabled");
94 None
95 }
96 },
97 None => None,
98 };
99 let messaging_channel = config.messaging.as_ref().map(|settings| {
100 Arc::new(
101 MessagingChannel::new(settings.source_endpoint.clone(), Arc::clone(&outbound_auth))
102 .with_private_endpoints_allowed(settings.allow_private_endpoints),
103 )
104 });
105
106 Self {
107 topic_registry,
108 topic_resource_index: DashMap::new(),
109 manager,
110 evaluator,
111 rest_hook_channel,
112 ws_manager,
113 ws_channel,
114 ws_token_manager,
115 email_channel,
116 messaging_channel,
117 config,
118 base_url,
119 }
120 }
121
122 pub fn topic_registry(&self) -> &Arc<InMemoryTopicRegistry> {
124 &self.topic_registry
125 }
126
127 pub fn manager(&self) -> &Arc<SubscriptionManager> {
129 &self.manager
130 }
131
132 pub fn ws_manager(&self) -> &Arc<WebSocketManager> {
134 &self.ws_manager
135 }
136
137 pub fn ws_token_manager(&self) -> &Arc<WsBindingTokenManager> {
139 &self.ws_token_manager
140 }
141
142 pub async fn on_resource_event(&self, event: ResourceEvent) {
150 match event.resource_type.as_str() {
152 "Subscription" => {
153 self.handle_subscription_event(&event).await;
154 return;
155 }
156 "SubscriptionTopic" => {
157 self.handle_topic_event(&event).await;
158 return;
159 }
160 "Basic" => {
161 if self.handle_r4_basic_topic_event(&event).await {
162 return;
163 }
164 }
165 _ => {}
166 }
167
168 let matches = self.evaluator.evaluate(&event);
170 info!(
171 tenant_id = %event.tenant_id,
172 resource_type = %event.resource_type,
173 resource_id = %event.resource_id,
174 event_type = %event.event_type,
175 matched_subscriptions = matches.len(),
176 "Subscription event evaluated"
177 );
178 if matches.is_empty() {
179 return;
180 }
181
182 debug!(
183 resource_type = %event.resource_type,
184 resource_id = %event.resource_id,
185 matched_subscriptions = matches.len(),
186 "Event matched subscriptions"
187 );
188
189 for eval_match in matches {
191 let mut subscription = eval_match.subscription;
192
193 let event_number = self
195 .manager
196 .increment_event_count(&subscription.tenant_id, &subscription.id)
197 .unwrap_or(0);
198 subscription.events_since_start = event_number;
200
201 let focus_reference = format!("{}/{}", event.resource_type, event.resource_id);
202 let event_data = NotificationEventData {
203 event_number,
204 timestamp: event.timestamp,
205 focus_reference: focus_reference.clone(),
206 };
207
208 let bundle = match notification::build_event_notification(
210 &subscription,
211 event_data,
212 event.resource.as_ref(),
213 &self.base_url,
214 ) {
215 Ok(b) => b,
216 Err(e) => {
217 error!(
218 subscription_id = %subscription.id,
219 error = %e,
220 "Failed to build notification"
221 );
222 continue;
223 }
224 };
225
226 info!(
227 tenant_id = %subscription.tenant_id,
228 subscription_id = %subscription.id,
229 topic_url = %subscription.topic_url,
230 channel_type = %subscription.channel.channel_type.as_fhir_str(),
231 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
232 event_number,
233 focus_reference = %focus_reference,
234 "Dispatching subscription event notification"
235 );
236
237 self.dispatch_with_retry(
239 &subscription,
240 &bundle,
241 "event-notification",
242 Some(event_number),
243 )
244 .await;
245 }
246 }
247
248 async fn handle_subscription_event(&self, event: &ResourceEvent) {
250 let tenant_id = event.tenant_id.to_string();
251 let subscription_id = &event.resource_id;
252
253 info!(
254 tenant_id,
255 subscription_id,
256 event_type = %event.event_type,
257 fhir_version = %event.fhir_version,
258 "Handling Subscription resource event"
259 );
260
261 match event.event_type {
262 ResourceEventType::Delete => {
263 let removed = self.manager.deregister(&tenant_id, subscription_id);
264 self.ws_manager
265 .remove_all_clients(&tenant_id, subscription_id);
266 info!(
267 tenant_id,
268 subscription_id, removed, "Subscription deregistered"
269 );
270 }
271 ResourceEventType::Create | ResourceEventType::Update => {
272 if let Some(resource) = &event.resource {
273 match self.manager.register(
275 &tenant_id,
276 subscription_id,
277 resource,
278 event.fhir_version,
279 ) {
280 Ok(sub) => {
281 info!(
282 tenant_id = %sub.tenant_id,
283 subscription_id = %sub.id,
284 topic_url = %sub.topic_url,
285 channel_type = %sub.channel.channel_type.as_fhir_str(),
286 endpoint = sub.channel.endpoint.as_deref().unwrap_or(""),
287 status = %sub.status,
288 fhir_version = %sub.fhir_version,
289 "Subscription registered"
290 );
291 if sub.status == SubscriptionStatusCode::Requested {
293 info!(
294 tenant_id = %sub.tenant_id,
295 subscription_id = %sub.id,
296 channel_type = %sub.channel.channel_type.as_fhir_str(),
297 endpoint = sub.channel.endpoint.as_deref().unwrap_or(""),
298 "Subscription activation requested"
299 );
300 self.activate_subscription(&sub).await;
301 }
302 }
303 Err(e) => {
304 warn!(
305 tenant_id,
306 subscription_id,
307 error = %e,
308 "Failed to register subscription"
309 );
310 }
311 }
312 }
313 }
314 }
315 }
316
317 async fn handle_topic_event(&self, event: &ResourceEvent) {
319 let topic_key = (
320 event.tenant_id.to_string(),
321 event.resource_type.clone(),
322 event.resource_id.clone(),
323 );
324
325 match event.event_type {
326 ResourceEventType::Delete => {
327 let mut candidate_urls = Vec::new();
328
329 if let Some((_, indexed_url)) = self.topic_resource_index.remove(&topic_key) {
330 candidate_urls.push(indexed_url);
331 }
332
333 if let Some(resource) = &event.resource {
334 match InMemoryTopicRegistry::parse_topic_resource(resource) {
335 Ok(topic) => candidate_urls.push(topic.canonical_url),
336 Err(e) => {
337 warn!(
338 resource_id = %event.resource_id,
339 error = %e,
340 "Failed to parse SubscriptionTopic delete payload"
341 );
342 }
343 }
344 }
345
346 if let Some(previous_resource) = &event.previous_resource {
347 match InMemoryTopicRegistry::parse_topic_resource(previous_resource) {
348 Ok(topic) => candidate_urls.push(topic.canonical_url),
349 Err(e) => {
350 warn!(
351 resource_id = %event.resource_id,
352 error = %e,
353 "Failed to parse previous SubscriptionTopic state"
354 );
355 }
356 }
357 }
358
359 candidate_urls.sort();
360 candidate_urls.dedup();
361
362 if candidate_urls.is_empty() {
363 warn!(
364 resource_id = %event.resource_id,
365 "SubscriptionTopic deleted but canonical URL could not be resolved"
366 );
367 return;
368 }
369
370 for canonical_url in candidate_urls {
371 let removed = self.topic_registry.remove_topic(&canonical_url);
372 info!(
373 resource_id = %event.resource_id,
374 topic_url = %canonical_url,
375 removed,
376 "SubscriptionTopic deleted"
377 );
378 }
379 }
380 ResourceEventType::Create | ResourceEventType::Update => {
381 if let Some(resource) = &event.resource {
382 match InMemoryTopicRegistry::parse_topic_resource(resource) {
383 Ok(topic) => {
384 let canonical_url = topic.canonical_url.clone();
385 if let Some(previous_url) = self
386 .topic_resource_index
387 .insert(topic_key, canonical_url.clone())
388 .filter(|previous_url| previous_url != &canonical_url)
389 {
390 let _ = self.topic_registry.remove_topic(&previous_url);
391 }
392 info!(
393 topic_url = %canonical_url,
394 "Registered SubscriptionTopic"
395 );
396 self.topic_registry.add_topic(topic);
397 }
398 Err(e) => {
399 warn!(
400 resource_id = %event.resource_id,
401 error = %e,
402 "Failed to parse SubscriptionTopic"
403 );
404 }
405 }
406 }
407 }
408 }
409 }
410
411 async fn handle_r4_basic_topic_event(&self, event: &ResourceEvent) -> bool {
416 if event.fhir_version.as_str() != "R4" {
417 return false;
418 }
419
420 let topic_key = (
421 event.tenant_id.to_string(),
422 event.resource_type.clone(),
423 event.resource_id.clone(),
424 );
425
426 match event.event_type {
427 ResourceEventType::Delete => {
428 let mut candidate_urls = Vec::new();
429 let mut recognized_topic = false;
430
431 if let Some((_, indexed_url)) = self.topic_resource_index.remove(&topic_key) {
432 candidate_urls.push(indexed_url);
433 recognized_topic = true;
434 }
435
436 if let Some(resource) = &event.resource {
437 match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(resource) {
438 Ok(Some(topic)) => {
439 candidate_urls.push(topic.canonical_url);
440 recognized_topic = true;
441 }
442 Ok(None) => {}
443 Err(e) => {
444 warn!(
445 resource_id = %event.resource_id,
446 error = %e,
447 "Failed to parse R4 Basic SubscriptionTopic delete payload"
448 );
449 recognized_topic = true;
450 }
451 }
452 }
453
454 if let Some(previous_resource) = &event.previous_resource {
455 match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(
456 previous_resource,
457 ) {
458 Ok(Some(topic)) => {
459 candidate_urls.push(topic.canonical_url);
460 recognized_topic = true;
461 }
462 Ok(None) => {}
463 Err(e) => {
464 warn!(
465 resource_id = %event.resource_id,
466 error = %e,
467 "Failed to parse previous R4 Basic SubscriptionTopic state"
468 );
469 recognized_topic = true;
470 }
471 }
472 }
473
474 candidate_urls.sort();
475 candidate_urls.dedup();
476
477 for canonical_url in candidate_urls {
478 let removed = self.topic_registry.remove_topic(&canonical_url);
479 info!(
480 resource_id = %event.resource_id,
481 topic_url = %canonical_url,
482 removed,
483 "R4 Basic SubscriptionTopic deleted"
484 );
485 }
486
487 recognized_topic
488 }
489 ResourceEventType::Create | ResourceEventType::Update => {
490 if let Some(resource) = &event.resource {
491 match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(resource) {
492 Ok(Some(topic)) => {
493 let canonical_url = topic.canonical_url.clone();
494 if let Some(previous_url) = self
495 .topic_resource_index
496 .insert(topic_key, canonical_url.clone())
497 .filter(|previous_url| previous_url != &canonical_url)
498 {
499 let _ = self.topic_registry.remove_topic(&previous_url);
500 }
501 info!(
502 topic_url = %canonical_url,
503 "Registered R4 Basic SubscriptionTopic"
504 );
505 self.topic_registry.add_topic(topic);
506 true
507 }
508 Ok(None) => false,
509 Err(e) => {
510 warn!(
511 resource_id = %event.resource_id,
512 error = %e,
513 "Failed to parse R4 Basic SubscriptionTopic"
514 );
515 true
516 }
517 }
518 } else {
519 false
520 }
521 }
522 }
523 }
524
525 async fn activate_subscription(&self, subscription: &ActiveSubscription) {
527 let tenant_id = &subscription.tenant_id;
528 let sub_id = &subscription.id;
529 let handshake_max_attempts = self.config.handshake_max_attempts.max(1);
530
531 let handshake_bundle = match notification::build_handshake(subscription, &self.base_url) {
533 Ok(b) => b,
534 Err(e) => {
535 warn!(
536 tenant_id,
537 subscription_id = sub_id,
538 channel_type = %subscription.channel.channel_type.as_fhir_str(),
539 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
540 error = %e,
541 "Failed to build handshake"
542 );
543 let _ =
544 self.manager
545 .update_status(tenant_id, sub_id, SubscriptionStatusCode::Error);
546 return;
547 }
548 };
549
550 if !self.config.handshake_initial_delay.is_zero() {
551 info!(
552 tenant_id,
553 subscription_id = sub_id,
554 channel_type = %subscription.channel.channel_type.as_fhir_str(),
555 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
556 delay_ms = self.config.handshake_initial_delay.as_millis() as u64,
557 "Delaying subscription handshake"
558 );
559 tokio::time::sleep(self.config.handshake_initial_delay).await;
560 }
561
562 let mut attempt = 1;
563 loop {
564 info!(
565 tenant_id,
566 subscription_id = sub_id,
567 channel_type = %subscription.channel.channel_type.as_fhir_str(),
568 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
569 attempt,
570 max_attempts = handshake_max_attempts,
571 "Sending subscription handshake"
572 );
573
574 let result = match subscription.channel.channel_type {
576 ChannelType::RestHook => {
577 self.rest_hook_channel
578 .handshake(subscription, &handshake_bundle)
579 .await
580 }
581 ChannelType::Websocket => {
582 self.ws_channel
583 .handshake(subscription, &handshake_bundle)
584 .await
585 }
586 ChannelType::Email => match self.email_channel.as_ref() {
587 Some(ch) => ch.handshake(subscription, &handshake_bundle).await,
588 None => {
589 warn!(
590 tenant_id,
591 subscription_id = sub_id,
592 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
593 "Email channel requested but no SMTP settings configured"
594 );
595 let _ = self.manager.update_status(
596 tenant_id,
597 sub_id,
598 SubscriptionStatusCode::Error,
599 );
600 return;
601 }
602 },
603 ChannelType::Message => match self.messaging_channel.as_ref() {
604 Some(ch) => ch.handshake(subscription, &handshake_bundle).await,
605 None => {
606 warn!(
607 tenant_id,
608 subscription_id = sub_id,
609 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
610 "Messaging channel requested but messaging settings not configured"
611 );
612 let _ = self.manager.update_status(
613 tenant_id,
614 sub_id,
615 SubscriptionStatusCode::Error,
616 );
617 return;
618 }
619 },
620 _ => {
621 warn!(
622 tenant_id,
623 subscription_id = sub_id,
624 channel_type = subscription.channel.channel_type.as_fhir_str(),
625 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
626 "Handshake not supported for channel type"
627 );
628 return;
629 }
630 };
631
632 match result {
633 Ok(DispatchResult::Success) => {
634 info!(
635 tenant_id,
636 subscription_id = sub_id,
637 channel_type = %subscription.channel.channel_type.as_fhir_str(),
638 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
639 attempt,
640 "Handshake successful, activating subscription"
641 );
642 let _ = self.manager.update_status(
643 tenant_id,
644 sub_id,
645 SubscriptionStatusCode::Active,
646 );
647 return;
648 }
649 Ok(DispatchResult::PermanentError(msg)) => {
650 warn!(
651 tenant_id,
652 subscription_id = sub_id,
653 channel_type = %subscription.channel.channel_type.as_fhir_str(),
654 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
655 attempt,
656 error = %msg,
657 "Handshake failed with permanent error"
658 );
659 let _ = self.manager.update_status(
660 tenant_id,
661 sub_id,
662 SubscriptionStatusCode::Error,
663 );
664 return;
665 }
666 Ok(DispatchResult::RetryableError(msg)) => {
667 if attempt >= handshake_max_attempts {
668 warn!(
669 tenant_id,
670 subscription_id = sub_id,
671 channel_type = %subscription.channel.channel_type.as_fhir_str(),
672 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
673 attempts = attempt,
674 error = %msg,
675 "Handshake retries exhausted"
676 );
677 let _ = self.manager.update_status(
678 tenant_id,
679 sub_id,
680 SubscriptionStatusCode::Error,
681 );
682 return;
683 }
684
685 let delay = calculate_handshake_retry_delay(&self.config, attempt);
686 warn!(
687 tenant_id,
688 subscription_id = sub_id,
689 channel_type = %subscription.channel.channel_type.as_fhir_str(),
690 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
691 attempt,
692 next_attempt = attempt + 1,
693 delay_ms = delay.as_millis() as u64,
694 error = %msg,
695 "Retrying subscription handshake"
696 );
697 tokio::time::sleep(delay).await;
698 attempt += 1;
699 }
700 Err(e) => {
701 warn!(
702 tenant_id,
703 subscription_id = sub_id,
704 channel_type = %subscription.channel.channel_type.as_fhir_str(),
705 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
706 attempt,
707 error = %e,
708 "Handshake error"
709 );
710 let _ = self.manager.update_status(
711 tenant_id,
712 sub_id,
713 SubscriptionStatusCode::Error,
714 );
715 return;
716 }
717 }
718 }
719 }
720
721 async fn dispatch_with_retry(
723 &self,
724 subscription: &ActiveSubscription,
725 bundle: &serde_json::Value,
726 notification_type: &'static str,
727 event_number: Option<u64>,
728 ) {
729 let tenant_id = &subscription.tenant_id;
730 let sub_id = &subscription.id;
731
732 let dispatcher: &dyn ChannelDispatcher = match subscription.channel.channel_type {
733 ChannelType::RestHook => self.rest_hook_channel.as_ref(),
734 ChannelType::Websocket => self.ws_channel.as_ref(),
735 ChannelType::Email => match self.email_channel.as_deref() {
736 Some(ch) => ch,
737 None => {
738 warn!(
739 tenant_id,
740 subscription_id = sub_id,
741 notification_type,
742 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
743 "Email dispatch requested but no SMTP settings configured"
744 );
745 return;
746 }
747 },
748 ChannelType::Message => match self.messaging_channel.as_deref() {
749 Some(ch) => ch,
750 None => {
751 warn!(
752 tenant_id,
753 subscription_id = sub_id,
754 notification_type,
755 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
756 "Messaging dispatch requested but messaging settings not configured"
757 );
758 return;
759 }
760 },
761 _ => {
762 warn!(
763 tenant_id,
764 subscription_id = sub_id,
765 notification_type,
766 channel = subscription.channel.channel_type.as_fhir_str(),
767 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
768 "No dispatcher for channel type"
769 );
770 return;
771 }
772 };
773
774 let mut attempt: u32 = 0;
775 loop {
776 match dispatcher.dispatch(subscription, bundle).await {
777 Ok(DispatchResult::Success) => {
778 self.manager.reset_failures(tenant_id, sub_id);
779 info!(
780 tenant_id,
781 subscription_id = sub_id,
782 notification_type,
783 event_number,
784 channel_type = %subscription.channel.channel_type.as_fhir_str(),
785 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
786 "Subscription notification dispatched"
787 );
788 return;
789 }
790 Ok(DispatchResult::PermanentError(msg)) => {
791 warn!(
792 tenant_id,
793 subscription_id = sub_id,
794 notification_type,
795 event_number,
796 channel_type = %subscription.channel.channel_type.as_fhir_str(),
797 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
798 error = %msg,
799 "Permanent delivery error"
800 );
801 self.handle_delivery_failure(tenant_id, sub_id);
802 return;
803 }
804 Ok(DispatchResult::RetryableError(msg)) => {
805 attempt += 1;
806 if !retry::should_retry(&self.config, attempt) {
807 warn!(
808 tenant_id,
809 subscription_id = sub_id,
810 notification_type,
811 event_number,
812 channel_type = %subscription.channel.channel_type.as_fhir_str(),
813 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
814 attempts = attempt,
815 error = %msg,
816 "Max retries exhausted"
817 );
818 self.handle_delivery_failure(tenant_id, sub_id);
819 return;
820 }
821
822 let delay = retry::calculate_delay(&self.config, attempt);
823 debug!(
824 tenant_id,
825 subscription_id = sub_id,
826 notification_type,
827 event_number,
828 attempt,
829 delay_ms = delay.as_millis() as u64,
830 "Retrying delivery"
831 );
832 tokio::time::sleep(delay).await;
833 }
834 Err(e) => {
835 error!(
836 tenant_id,
837 subscription_id = sub_id,
838 notification_type,
839 event_number,
840 channel_type = %subscription.channel.channel_type.as_fhir_str(),
841 endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
842 error = %e,
843 "Dispatch error"
844 );
845 self.handle_delivery_failure(tenant_id, sub_id);
846 return;
847 }
848 }
849 }
850 }
851
852 fn handle_delivery_failure(&self, tenant_id: &str, subscription_id: &str) {
855 if let Some(failure_count) = self.manager.record_failure(tenant_id, subscription_id) {
856 if failure_count >= self.config.off_threshold {
857 warn!(
858 subscription_id,
859 failures = failure_count,
860 "Turning off subscription after repeated failures"
861 );
862 let _ = self.manager.update_status(
863 tenant_id,
864 subscription_id,
865 SubscriptionStatusCode::Off,
866 );
867 } else if failure_count >= self.config.error_threshold {
868 let _ = self.manager.update_status(
869 tenant_id,
870 subscription_id,
871 SubscriptionStatusCode::Error,
872 );
873 }
874 }
875 }
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use crate::event::ResourceEventType;
882 use crate::topics::{ResourceTrigger, TopicDefinition};
883 use chrono::Utc;
884 use helios_fhir::FhirVersion;
885 use helios_persistence::tenant::TenantId;
886 use serde_json::json;
887 use std::sync::{
888 Arc,
889 atomic::{AtomicUsize, Ordering},
890 };
891 use wiremock::matchers::method;
892 use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
893
894 fn make_engine(base_url: &str) -> SubscriptionEngine {
895 let config = SubscriptionConfig {
896 max_retries: 2,
897 error_threshold: 2,
898 off_threshold: 4,
899 ..Default::default()
900 };
901 SubscriptionEngine::new(config, base_url.to_string())
902 }
903
904 fn encounter_topic() -> TopicDefinition {
905 TopicDefinition {
906 canonical_url: "http://example.org/topic/encounter-start".to_string(),
907 title: Some("Encounter Start".to_string()),
908 resource_triggers: vec![ResourceTrigger {
909 resource_type: "Encounter".to_string(),
910 interactions: vec![ResourceEventType::Create],
911 fhirpath_criteria: None,
912 }],
913 can_filter_by: vec![],
914 notification_shape: vec![],
915 }
916 }
917
918 fn encounter_event() -> ResourceEvent {
919 ResourceEvent {
920 tenant_id: TenantId::new("t1"),
921 fhir_version: FhirVersion::default(),
922 resource_type: "Encounter".to_string(),
923 resource_id: "enc-1".to_string(),
924 version_id: "1".to_string(),
925 event_type: ResourceEventType::Create,
926 resource: Some(json!({
927 "resourceType": "Encounter",
928 "id": "enc-1",
929 "status": "in-progress"
930 })),
931 previous_resource: None,
932 timestamp: Utc::now(),
933 }
934 }
935
936 #[derive(Clone)]
937 struct FailFirstHandshake {
938 attempts: Arc<AtomicUsize>,
939 }
940
941 impl Respond for FailFirstHandshake {
942 fn respond(&self, _request: &Request) -> ResponseTemplate {
943 if self.attempts.fetch_add(1, Ordering::SeqCst) == 0 {
944 ResponseTemplate::new(500)
945 } else {
946 ResponseTemplate::new(200)
947 }
948 }
949 }
950
951 fn topic_event() -> ResourceEvent {
952 ResourceEvent {
953 tenant_id: TenantId::new("t1"),
954 fhir_version: FhirVersion::default(),
955 resource_type: "SubscriptionTopic".to_string(),
956 resource_id: "topic-1".to_string(),
957 version_id: "1".to_string(),
958 event_type: ResourceEventType::Create,
959 resource: Some(json!({
960 "resourceType": "SubscriptionTopic",
961 "id": "topic-1",
962 "url": "http://example.org/topic/encounter-start",
963 "resourceTrigger": [{
964 "resource": "Encounter",
965 "supportedInteraction": ["create"]
966 }]
967 })),
968 previous_resource: None,
969 timestamp: Utc::now(),
970 }
971 }
972
973 #[cfg(feature = "R4")]
974 fn r4_basic_topic_event() -> ResourceEvent {
975 ResourceEvent {
976 tenant_id: TenantId::new("t1"),
977 fhir_version: FhirVersion::R4,
978 resource_type: "Basic".to_string(),
979 resource_id: "topic-basic-1".to_string(),
980 version_id: "1".to_string(),
981 event_type: ResourceEventType::Create,
982 resource: Some(json!({
983 "resourceType": "Basic",
984 "id": "topic-basic-1",
985 "code": {
986 "coding": [{
987 "system": "http://hl7.org/fhir/fhir-types",
988 "code": "SubscriptionTopic"
989 }]
990 },
991 "extension": [{
992 "url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.url",
993 "valueUri": "http://example.org/topic/encounter-start-basic"
994 }, {
995 "url": "http://hl7.org/fhir/4.3/StructureDefinition/extension-SubscriptionTopic.resourceTrigger",
996 "extension": [
997 { "url": "resource", "valueUri": "http://hl7.org/fhir/StructureDefinition/Encounter" },
998 { "url": "supportedInteraction", "valueCode": "create" }
999 ]
1000 }]
1001 })),
1002 previous_resource: None,
1003 timestamp: Utc::now(),
1004 }
1005 }
1006
1007 #[tokio::test]
1008 async fn test_topic_event_registers_topic() {
1009 let engine = make_engine("http://localhost:8080");
1010
1011 assert!(engine.topic_registry().list_topics().is_empty());
1012
1013 engine.on_resource_event(topic_event()).await;
1014
1015 let topics = engine.topic_registry().list_topics();
1016 assert_eq!(topics.len(), 1);
1017 assert!(topics.contains(&"http://example.org/topic/encounter-start".to_string()));
1018 }
1019
1020 #[tokio::test]
1021 async fn test_topic_delete_event_removes_topic_without_payload() {
1022 let engine = make_engine("http://localhost:8080");
1023 engine.on_resource_event(topic_event()).await;
1024
1025 let delete_event = ResourceEvent {
1026 tenant_id: TenantId::new("t1"),
1027 fhir_version: FhirVersion::default(),
1028 resource_type: "SubscriptionTopic".to_string(),
1029 resource_id: "topic-1".to_string(),
1030 version_id: "2".to_string(),
1031 event_type: ResourceEventType::Delete,
1032 resource: None,
1033 previous_resource: None,
1034 timestamp: Utc::now(),
1035 };
1036
1037 engine.on_resource_event(delete_event).await;
1038
1039 let topics = engine.topic_registry().list_topics();
1040 assert!(topics.is_empty());
1041 }
1042
1043 #[cfg(feature = "R4")]
1044 #[tokio::test]
1045 async fn test_r4_basic_topic_event_registers_topic() {
1046 let engine = make_engine("http://localhost:8080");
1047
1048 assert!(engine.topic_registry().list_topics().is_empty());
1049
1050 engine.on_resource_event(r4_basic_topic_event()).await;
1051
1052 let topics = engine.topic_registry().list_topics();
1053 assert_eq!(topics.len(), 1);
1054 assert!(topics.contains(&"http://example.org/topic/encounter-start-basic".to_string()));
1055 }
1056
1057 #[tokio::test]
1058 async fn test_subscription_event_registers_subscription() {
1059 let server = MockServer::start().await;
1060 Mock::given(method("POST"))
1061 .respond_with(ResponseTemplate::new(200))
1062 .mount(&server)
1063 .await;
1064
1065 let engine = make_engine("http://localhost:8080");
1066 engine.topic_registry().add_topic(encounter_topic());
1067
1068 let sub_resource = crate::manager::tests::build_subscription_json(
1070 "http://example.org/topic/encounter-start",
1071 "rest-hook",
1072 Some(&format!("{}/webhook", server.uri())),
1073 );
1074
1075 let event = ResourceEvent {
1076 tenant_id: TenantId::new("t1"),
1077 fhir_version: FhirVersion::default(),
1078 resource_type: "Subscription".to_string(),
1079 resource_id: "sub-1".to_string(),
1080 version_id: "1".to_string(),
1081 event_type: ResourceEventType::Create,
1082 resource: Some(sub_resource),
1083 previous_resource: None,
1084 timestamp: Utc::now(),
1085 };
1086
1087 engine.on_resource_event(event).await;
1088
1089 let sub = engine.manager().get_subscription("t1", "sub-1");
1091 assert!(sub.is_some());
1092 }
1093
1094 #[tokio::test]
1095 async fn test_subscription_activation_retries_retryable_handshake_failure() {
1096 let server = MockServer::start().await;
1097 let attempts = Arc::new(AtomicUsize::new(0));
1098
1099 Mock::given(method("POST"))
1100 .respond_with(FailFirstHandshake {
1101 attempts: Arc::clone(&attempts),
1102 })
1103 .mount(&server)
1104 .await;
1105
1106 let config = SubscriptionConfig {
1107 handshake_max_attempts: 2,
1108 handshake_retry_initial_delay: std::time::Duration::from_millis(1),
1109 handshake_retry_max_delay: std::time::Duration::from_millis(1),
1110 ..Default::default()
1111 };
1112 let engine = SubscriptionEngine::new(config, "http://localhost:8080".to_string());
1113 engine.topic_registry().add_topic(encounter_topic());
1114
1115 let sub_resource = crate::manager::tests::build_subscription_json(
1116 "http://example.org/topic/encounter-start",
1117 "rest-hook",
1118 Some(&format!("{}/webhook", server.uri())),
1119 );
1120
1121 let event = ResourceEvent {
1122 tenant_id: TenantId::new("t1"),
1123 fhir_version: FhirVersion::default(),
1124 resource_type: "Subscription".to_string(),
1125 resource_id: "sub-1".to_string(),
1126 version_id: "1".to_string(),
1127 event_type: ResourceEventType::Create,
1128 resource: Some(sub_resource),
1129 previous_resource: None,
1130 timestamp: Utc::now(),
1131 };
1132
1133 engine.on_resource_event(event).await;
1134
1135 let sub = engine.manager().get_subscription("t1", "sub-1").unwrap();
1136 assert_eq!(sub.status, SubscriptionStatusCode::Active);
1137 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1138 }
1139
1140 #[tokio::test]
1141 async fn test_full_pipeline_event_to_dispatch() {
1142 let server = MockServer::start().await;
1143 Mock::given(method("POST"))
1144 .respond_with(ResponseTemplate::new(200))
1145 .expect(2) .mount(&server)
1147 .await;
1148
1149 let engine = make_engine("http://localhost:8080");
1150 engine.topic_registry().add_topic(encounter_topic());
1151
1152 let sub_resource = crate::manager::tests::build_subscription_json(
1154 "http://example.org/topic/encounter-start",
1155 "rest-hook",
1156 Some(&format!("{}/webhook", server.uri())),
1157 );
1158
1159 let sub_event = ResourceEvent {
1160 tenant_id: TenantId::new("t1"),
1161 fhir_version: FhirVersion::default(),
1162 resource_type: "Subscription".to_string(),
1163 resource_id: "sub-1".to_string(),
1164 version_id: "1".to_string(),
1165 event_type: ResourceEventType::Create,
1166 resource: Some(sub_resource),
1167 previous_resource: None,
1168 timestamp: Utc::now(),
1169 };
1170
1171 engine.on_resource_event(sub_event).await;
1172
1173 engine.on_resource_event(encounter_event()).await;
1175
1176 }
1179
1180 #[tokio::test]
1181 async fn test_no_dispatch_when_no_matching_subscriptions() {
1182 let engine = make_engine("http://localhost:8080");
1183 engine.topic_registry().add_topic(encounter_topic());
1184
1185 engine.on_resource_event(encounter_event()).await;
1187 }
1189
1190 #[tokio::test]
1191 async fn test_no_dispatch_for_non_matching_resource_type() {
1192 let server = MockServer::start().await;
1193 Mock::given(method("POST"))
1194 .respond_with(ResponseTemplate::new(200))
1195 .expect(1) .mount(&server)
1197 .await;
1198
1199 let engine = make_engine("http://localhost:8080");
1200 engine.topic_registry().add_topic(encounter_topic());
1201
1202 let sub_resource = crate::manager::tests::build_subscription_json(
1204 "http://example.org/topic/encounter-start",
1205 "rest-hook",
1206 Some(&format!("{}/webhook", server.uri())),
1207 );
1208
1209 let sub_event = ResourceEvent {
1210 tenant_id: TenantId::new("t1"),
1211 fhir_version: FhirVersion::default(),
1212 resource_type: "Subscription".to_string(),
1213 resource_id: "sub-1".to_string(),
1214 version_id: "1".to_string(),
1215 event_type: ResourceEventType::Create,
1216 resource: Some(sub_resource),
1217 previous_resource: None,
1218 timestamp: Utc::now(),
1219 };
1220
1221 engine.on_resource_event(sub_event).await;
1222
1223 let patient_event = ResourceEvent {
1225 tenant_id: TenantId::new("t1"),
1226 fhir_version: FhirVersion::default(),
1227 resource_type: "Patient".to_string(),
1228 resource_id: "pat-1".to_string(),
1229 version_id: "1".to_string(),
1230 event_type: ResourceEventType::Create,
1231 resource: Some(json!({"resourceType": "Patient", "id": "pat-1"})),
1232 previous_resource: None,
1233 timestamp: Utc::now(),
1234 };
1235
1236 engine.on_resource_event(patient_event).await;
1237 }
1239
1240 #[tokio::test]
1241 async fn test_delivery_failure_updates_status() {
1242 let server = MockServer::start().await;
1243
1244 Mock::given(method("POST"))
1246 .respond_with(ResponseTemplate::new(200))
1247 .up_to_n_times(1)
1248 .mount(&server)
1249 .await;
1250
1251 Mock::given(method("POST"))
1252 .respond_with(ResponseTemplate::new(500))
1253 .mount(&server)
1254 .await;
1255
1256 let config = SubscriptionConfig {
1257 max_retries: 1,
1258 error_threshold: 1,
1259 off_threshold: 3,
1260 ..Default::default()
1261 };
1262 let engine = SubscriptionEngine::new(config, "http://localhost:8080".to_string());
1263 engine.topic_registry().add_topic(encounter_topic());
1264
1265 let sub_resource = crate::manager::tests::build_subscription_json(
1267 "http://example.org/topic/encounter-start",
1268 "rest-hook",
1269 Some(&format!("{}/webhook", server.uri())),
1270 );
1271
1272 let sub_event = ResourceEvent {
1273 tenant_id: TenantId::new("t1"),
1274 fhir_version: FhirVersion::default(),
1275 resource_type: "Subscription".to_string(),
1276 resource_id: "sub-1".to_string(),
1277 version_id: "1".to_string(),
1278 event_type: ResourceEventType::Create,
1279 resource: Some(sub_resource),
1280 previous_resource: None,
1281 timestamp: Utc::now(),
1282 };
1283
1284 engine.on_resource_event(sub_event).await;
1285
1286 engine.on_resource_event(encounter_event()).await;
1288
1289 let sub = engine.manager().get_subscription("t1", "sub-1").unwrap();
1291 assert_eq!(sub.status, SubscriptionStatusCode::Error);
1292 }
1293
1294 #[tokio::test]
1295 async fn test_subscription_delete_event() {
1296 let engine = make_engine("http://localhost:8080");
1297 engine.topic_registry().add_topic(encounter_topic());
1298
1299 let resource = crate::manager::tests::default_subscription_json();
1301 engine
1302 .manager()
1303 .register("t1", "sub-1", &resource, FhirVersion::default())
1304 .unwrap();
1305
1306 assert!(engine.manager().get_subscription("t1", "sub-1").is_some());
1307
1308 let delete_event = ResourceEvent {
1310 tenant_id: TenantId::new("t1"),
1311 fhir_version: FhirVersion::default(),
1312 resource_type: "Subscription".to_string(),
1313 resource_id: "sub-1".to_string(),
1314 version_id: "2".to_string(),
1315 event_type: ResourceEventType::Delete,
1316 resource: None,
1317 previous_resource: None,
1318 timestamp: Utc::now(),
1319 };
1320
1321 engine.on_resource_event(delete_event).await;
1322
1323 assert!(engine.manager().get_subscription("t1", "sub-1").is_none());
1324 }
1325
1326 #[tokio::test]
1327 async fn test_tenant_isolation() {
1328 let server = MockServer::start().await;
1329 Mock::given(method("POST"))
1330 .respond_with(ResponseTemplate::new(200))
1331 .expect(2) .mount(&server)
1333 .await;
1334
1335 let engine = make_engine("http://localhost:8080");
1336 engine.topic_registry().add_topic(encounter_topic());
1337
1338 let sub_resource = crate::manager::tests::build_subscription_json(
1340 "http://example.org/topic/encounter-start",
1341 "rest-hook",
1342 Some(&format!("{}/webhook", server.uri())),
1343 );
1344
1345 let sub_event_a = ResourceEvent {
1346 tenant_id: TenantId::new("tenant-a"),
1347 fhir_version: FhirVersion::default(),
1348 resource_type: "Subscription".to_string(),
1349 resource_id: "sub-a".to_string(),
1350 version_id: "1".to_string(),
1351 event_type: ResourceEventType::Create,
1352 resource: Some(sub_resource),
1353 previous_resource: None,
1354 timestamp: Utc::now(),
1355 };
1356
1357 engine.on_resource_event(sub_event_a).await;
1358
1359 let mut event_a = encounter_event();
1361 event_a.tenant_id = TenantId::new("tenant-a");
1362 engine.on_resource_event(event_a).await;
1363
1364 let mut event_b = encounter_event();
1366 event_b.tenant_id = TenantId::new("tenant-b");
1367 engine.on_resource_event(event_b).await;
1368
1369 }
1371}