Skip to main content

helios_subscriptions/engine/
mod.rs

1//! Subscription engine — the orchestrator.
2//!
3//! Ties together topic registry, subscription manager, event evaluator,
4//! notification builder, and channel dispatchers. Invoked asynchronously
5//! after each resource write.
6
7pub mod retry;
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use tracing::{debug, error, info, warn};
13
14use crate::channels::email::EmailChannel;
15use crate::channels::messaging::MessagingChannel;
16use crate::channels::rest_hook::RestHookChannel;
17use crate::channels::websocket::WebSocketChannel;
18use crate::channels::ws_manager::WebSocketManager;
19use crate::channels::ws_token::WsBindingTokenManager;
20use crate::channels::{ChannelDispatcher, DispatchResult};
21use crate::config::SubscriptionConfig;
22use crate::evaluator::EventEvaluator;
23use crate::event::{ResourceEvent, ResourceEventType};
24use crate::manager::{
25    ActiveSubscription, ChannelType, SubscriptionManager, SubscriptionStatusCode,
26};
27use crate::notification::{self, NotificationEventData};
28use crate::topics::InMemoryTopicRegistry;
29use helios_auth::{NoOpOutboundAuthProvider, OutboundAuthProvider};
30
31/// The subscription engine orchestrates the entire subscription pipeline.
32///
33/// It is designed to be invoked asynchronously after resource writes
34/// (fire-and-forget via `tokio::spawn`), mirroring the audit middleware pattern.
35pub struct SubscriptionEngine {
36    topic_registry: Arc<InMemoryTopicRegistry>,
37    topic_resource_index: DashMap<(String, String, String), String>,
38    manager: Arc<SubscriptionManager>,
39    evaluator: EventEvaluator,
40    rest_hook_channel: Arc<RestHookChannel>,
41    ws_manager: Arc<WebSocketManager>,
42    ws_channel: Arc<WebSocketChannel>,
43    ws_token_manager: Arc<WsBindingTokenManager>,
44    email_channel: Option<Arc<EmailChannel>>,
45    messaging_channel: Option<Arc<MessagingChannel>>,
46    config: SubscriptionConfig,
47    base_url: String,
48}
49
50fn calculate_handshake_retry_delay(
51    config: &SubscriptionConfig,
52    attempt: u32,
53) -> std::time::Duration {
54    let exponent = attempt.saturating_sub(1) as i32;
55    let delay_secs = config.handshake_retry_initial_delay.as_secs_f64()
56        * config.retry_backoff_factor.powi(exponent);
57    let capped = delay_secs.min(config.handshake_retry_max_delay.as_secs_f64());
58
59    std::time::Duration::from_secs_f64(capped)
60}
61
62impl SubscriptionEngine {
63    /// Creates a new subscription engine with a no-op outbound auth provider.
64    pub fn new(config: SubscriptionConfig, base_url: String) -> Self {
65        Self::with_outbound_auth(config, base_url, Arc::new(NoOpOutboundAuthProvider))
66    }
67
68    /// Creates a new subscription engine with a custom outbound auth provider.
69    ///
70    /// The provider is used by channels that initiate outbound HTTP requests
71    /// (currently the FHIR Messaging channel) to attach server-side
72    /// credentials when the subscription does not supply its own
73    /// `Authorization` header.
74    pub fn with_outbound_auth(
75        config: SubscriptionConfig,
76        base_url: String,
77        outbound_auth: Arc<dyn OutboundAuthProvider>,
78    ) -> Self {
79        let topic_registry = Arc::new(InMemoryTopicRegistry::new());
80        let manager = Arc::new(SubscriptionManager::new(
81            Arc::clone(&topic_registry),
82            config.supported_channel_types.clone(),
83        ));
84        let evaluator = EventEvaluator::new(Arc::clone(&topic_registry), Arc::clone(&manager));
85        let rest_hook_channel = Arc::new(RestHookChannel::new());
86        let ws_manager = Arc::new(WebSocketManager::new());
87        let ws_channel = Arc::new(WebSocketChannel::new(Arc::clone(&ws_manager)));
88        let ws_token_manager = Arc::new(WsBindingTokenManager::new(config.ws_token_lifetime_secs));
89        let email_channel = match &config.smtp {
90            Some(settings) => match EmailChannel::new(settings.clone()) {
91                Ok(ch) => Some(Arc::new(ch)),
92                Err(e) => {
93                    warn!(error = %e, "Failed to initialize email channel; email dispatch disabled");
94                    None
95                }
96            },
97            None => None,
98        };
99        let messaging_channel = config.messaging.as_ref().map(|settings| {
100            Arc::new(
101                MessagingChannel::new(settings.source_endpoint.clone(), Arc::clone(&outbound_auth))
102                    .with_private_endpoints_allowed(settings.allow_private_endpoints),
103            )
104        });
105
106        Self {
107            topic_registry,
108            topic_resource_index: DashMap::new(),
109            manager,
110            evaluator,
111            rest_hook_channel,
112            ws_manager,
113            ws_channel,
114            ws_token_manager,
115            email_channel,
116            messaging_channel,
117            config,
118            base_url,
119        }
120    }
121
122    /// Returns a reference to the topic registry.
123    pub fn topic_registry(&self) -> &Arc<InMemoryTopicRegistry> {
124        &self.topic_registry
125    }
126
127    /// Returns a reference to the subscription manager.
128    pub fn manager(&self) -> &Arc<SubscriptionManager> {
129        &self.manager
130    }
131
132    /// Returns a reference to the WebSocket manager.
133    pub fn ws_manager(&self) -> &Arc<WebSocketManager> {
134        &self.ws_manager
135    }
136
137    /// Returns a reference to the WebSocket binding token manager.
138    pub fn ws_token_manager(&self) -> &Arc<WsBindingTokenManager> {
139        &self.ws_token_manager
140    }
141
142    /// Called after a resource write has been committed.
143    ///
144    /// This method:
145    /// 1. Handles subscription/topic lifecycle events (if the written resource
146    ///    is a Subscription, SubscriptionTopic, or an R4 backport Basic topic).
147    /// 2. Evaluates the event against all active subscriptions.
148    /// 3. Builds and dispatches notifications.
149    pub async fn on_resource_event(&self, event: ResourceEvent) {
150        // Handle subscription lifecycle events.
151        match event.resource_type.as_str() {
152            "Subscription" => {
153                self.handle_subscription_event(&event).await;
154                return;
155            }
156            "SubscriptionTopic" => {
157                self.handle_topic_event(&event).await;
158                return;
159            }
160            "Basic" => {
161                if self.handle_r4_basic_topic_event(&event).await {
162                    return;
163                }
164            }
165            _ => {}
166        }
167
168        // Evaluate which subscriptions match this event.
169        let matches = self.evaluator.evaluate(&event);
170        info!(
171            tenant_id = %event.tenant_id,
172            resource_type = %event.resource_type,
173            resource_id = %event.resource_id,
174            event_type = %event.event_type,
175            matched_subscriptions = matches.len(),
176            "Subscription event evaluated"
177        );
178        if matches.is_empty() {
179            return;
180        }
181
182        debug!(
183            resource_type = %event.resource_type,
184            resource_id = %event.resource_id,
185            matched_subscriptions = matches.len(),
186            "Event matched subscriptions"
187        );
188
189        // Build and dispatch notifications for each match.
190        for eval_match in matches {
191            let mut subscription = eval_match.subscription;
192
193            // Increment event counter.
194            let event_number = self
195                .manager
196                .increment_event_count(&subscription.tenant_id, &subscription.id)
197                .unwrap_or(0);
198            // Ensure notification metadata reflects the event being emitted.
199            subscription.events_since_start = event_number;
200
201            let focus_reference = format!("{}/{}", event.resource_type, event.resource_id);
202            let event_data = NotificationEventData {
203                event_number,
204                timestamp: event.timestamp,
205                focus_reference: focus_reference.clone(),
206            };
207
208            // Build notification bundle.
209            let bundle = match notification::build_event_notification(
210                &subscription,
211                event_data,
212                event.resource.as_ref(),
213                &self.base_url,
214            ) {
215                Ok(b) => b,
216                Err(e) => {
217                    error!(
218                        subscription_id = %subscription.id,
219                        error = %e,
220                        "Failed to build notification"
221                    );
222                    continue;
223                }
224            };
225
226            info!(
227                tenant_id = %subscription.tenant_id,
228                subscription_id = %subscription.id,
229                topic_url = %subscription.topic_url,
230                channel_type = %subscription.channel.channel_type.as_fhir_str(),
231                endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
232                event_number,
233                focus_reference = %focus_reference,
234                "Dispatching subscription event notification"
235            );
236
237            // Dispatch with retry.
238            self.dispatch_with_retry(
239                &subscription,
240                &bundle,
241                "event-notification",
242                Some(event_number),
243            )
244            .await;
245        }
246    }
247
248    /// Handle a Subscription resource event (create/update/delete).
249    async fn handle_subscription_event(&self, event: &ResourceEvent) {
250        let tenant_id = event.tenant_id.to_string();
251        let subscription_id = &event.resource_id;
252
253        info!(
254            tenant_id,
255            subscription_id,
256            event_type = %event.event_type,
257            fhir_version = %event.fhir_version,
258            "Handling Subscription resource event"
259        );
260
261        match event.event_type {
262            ResourceEventType::Delete => {
263                let removed = self.manager.deregister(&tenant_id, subscription_id);
264                self.ws_manager
265                    .remove_all_clients(&tenant_id, subscription_id);
266                info!(
267                    tenant_id,
268                    subscription_id, removed, "Subscription deregistered"
269                );
270            }
271            ResourceEventType::Create | ResourceEventType::Update => {
272                if let Some(resource) = &event.resource {
273                    // Register (or re-register) the subscription.
274                    match self.manager.register(
275                        &tenant_id,
276                        subscription_id,
277                        resource,
278                        event.fhir_version,
279                    ) {
280                        Ok(sub) => {
281                            info!(
282                                tenant_id = %sub.tenant_id,
283                                subscription_id = %sub.id,
284                                topic_url = %sub.topic_url,
285                                channel_type = %sub.channel.channel_type.as_fhir_str(),
286                                endpoint = sub.channel.endpoint.as_deref().unwrap_or(""),
287                                status = %sub.status,
288                                fhir_version = %sub.fhir_version,
289                                "Subscription registered"
290                            );
291                            // If status is requested, perform handshake and activate.
292                            if sub.status == SubscriptionStatusCode::Requested {
293                                info!(
294                                    tenant_id = %sub.tenant_id,
295                                    subscription_id = %sub.id,
296                                    channel_type = %sub.channel.channel_type.as_fhir_str(),
297                                    endpoint = sub.channel.endpoint.as_deref().unwrap_or(""),
298                                    "Subscription activation requested"
299                                );
300                                self.activate_subscription(&sub).await;
301                            }
302                        }
303                        Err(e) => {
304                            warn!(
305                                tenant_id,
306                                subscription_id,
307                                error = %e,
308                                "Failed to register subscription"
309                            );
310                        }
311                    }
312                }
313            }
314        }
315    }
316
317    /// Handle a SubscriptionTopic resource event.
318    async fn handle_topic_event(&self, event: &ResourceEvent) {
319        let topic_key = (
320            event.tenant_id.to_string(),
321            event.resource_type.clone(),
322            event.resource_id.clone(),
323        );
324
325        match event.event_type {
326            ResourceEventType::Delete => {
327                let mut candidate_urls = Vec::new();
328
329                if let Some((_, indexed_url)) = self.topic_resource_index.remove(&topic_key) {
330                    candidate_urls.push(indexed_url);
331                }
332
333                if let Some(resource) = &event.resource {
334                    match InMemoryTopicRegistry::parse_topic_resource(resource) {
335                        Ok(topic) => candidate_urls.push(topic.canonical_url),
336                        Err(e) => {
337                            warn!(
338                                resource_id = %event.resource_id,
339                                error = %e,
340                                "Failed to parse SubscriptionTopic delete payload"
341                            );
342                        }
343                    }
344                }
345
346                if let Some(previous_resource) = &event.previous_resource {
347                    match InMemoryTopicRegistry::parse_topic_resource(previous_resource) {
348                        Ok(topic) => candidate_urls.push(topic.canonical_url),
349                        Err(e) => {
350                            warn!(
351                                resource_id = %event.resource_id,
352                                error = %e,
353                                "Failed to parse previous SubscriptionTopic state"
354                            );
355                        }
356                    }
357                }
358
359                candidate_urls.sort();
360                candidate_urls.dedup();
361
362                if candidate_urls.is_empty() {
363                    warn!(
364                        resource_id = %event.resource_id,
365                        "SubscriptionTopic deleted but canonical URL could not be resolved"
366                    );
367                    return;
368                }
369
370                for canonical_url in candidate_urls {
371                    let removed = self.topic_registry.remove_topic(&canonical_url);
372                    info!(
373                        resource_id = %event.resource_id,
374                        topic_url = %canonical_url,
375                        removed,
376                        "SubscriptionTopic deleted"
377                    );
378                }
379            }
380            ResourceEventType::Create | ResourceEventType::Update => {
381                if let Some(resource) = &event.resource {
382                    match InMemoryTopicRegistry::parse_topic_resource(resource) {
383                        Ok(topic) => {
384                            let canonical_url = topic.canonical_url.clone();
385                            if let Some(previous_url) = self
386                                .topic_resource_index
387                                .insert(topic_key, canonical_url.clone())
388                                .filter(|previous_url| previous_url != &canonical_url)
389                            {
390                                let _ = self.topic_registry.remove_topic(&previous_url);
391                            }
392                            info!(
393                                topic_url = %canonical_url,
394                                "Registered SubscriptionTopic"
395                            );
396                            self.topic_registry.add_topic(topic);
397                        }
398                        Err(e) => {
399                            warn!(
400                                resource_id = %event.resource_id,
401                                error = %e,
402                                "Failed to parse SubscriptionTopic"
403                            );
404                        }
405                    }
406                }
407            }
408        }
409    }
410
411    /// Handle an R4 backport `Basic` topic event.
412    ///
413    /// Returns `true` when the `Basic` resource was recognized as a topic lifecycle
414    /// event (including malformed topic candidates), `false` otherwise.
415    async fn handle_r4_basic_topic_event(&self, event: &ResourceEvent) -> bool {
416        if event.fhir_version.as_str() != "R4" {
417            return false;
418        }
419
420        let topic_key = (
421            event.tenant_id.to_string(),
422            event.resource_type.clone(),
423            event.resource_id.clone(),
424        );
425
426        match event.event_type {
427            ResourceEventType::Delete => {
428                let mut candidate_urls = Vec::new();
429                let mut recognized_topic = false;
430
431                if let Some((_, indexed_url)) = self.topic_resource_index.remove(&topic_key) {
432                    candidate_urls.push(indexed_url);
433                    recognized_topic = true;
434                }
435
436                if let Some(resource) = &event.resource {
437                    match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(resource) {
438                        Ok(Some(topic)) => {
439                            candidate_urls.push(topic.canonical_url);
440                            recognized_topic = true;
441                        }
442                        Ok(None) => {}
443                        Err(e) => {
444                            warn!(
445                                resource_id = %event.resource_id,
446                                error = %e,
447                                "Failed to parse R4 Basic SubscriptionTopic delete payload"
448                            );
449                            recognized_topic = true;
450                        }
451                    }
452                }
453
454                if let Some(previous_resource) = &event.previous_resource {
455                    match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(
456                        previous_resource,
457                    ) {
458                        Ok(Some(topic)) => {
459                            candidate_urls.push(topic.canonical_url);
460                            recognized_topic = true;
461                        }
462                        Ok(None) => {}
463                        Err(e) => {
464                            warn!(
465                                resource_id = %event.resource_id,
466                                error = %e,
467                                "Failed to parse previous R4 Basic SubscriptionTopic state"
468                            );
469                            recognized_topic = true;
470                        }
471                    }
472                }
473
474                candidate_urls.sort();
475                candidate_urls.dedup();
476
477                for canonical_url in candidate_urls {
478                    let removed = self.topic_registry.remove_topic(&canonical_url);
479                    info!(
480                        resource_id = %event.resource_id,
481                        topic_url = %canonical_url,
482                        removed,
483                        "R4 Basic SubscriptionTopic deleted"
484                    );
485                }
486
487                recognized_topic
488            }
489            ResourceEventType::Create | ResourceEventType::Update => {
490                if let Some(resource) = &event.resource {
491                    match InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(resource) {
492                        Ok(Some(topic)) => {
493                            let canonical_url = topic.canonical_url.clone();
494                            if let Some(previous_url) = self
495                                .topic_resource_index
496                                .insert(topic_key, canonical_url.clone())
497                                .filter(|previous_url| previous_url != &canonical_url)
498                            {
499                                let _ = self.topic_registry.remove_topic(&previous_url);
500                            }
501                            info!(
502                                topic_url = %canonical_url,
503                                "Registered R4 Basic SubscriptionTopic"
504                            );
505                            self.topic_registry.add_topic(topic);
506                            true
507                        }
508                        Ok(None) => false,
509                        Err(e) => {
510                            warn!(
511                                resource_id = %event.resource_id,
512                                error = %e,
513                                "Failed to parse R4 Basic SubscriptionTopic"
514                            );
515                            true
516                        }
517                    }
518                } else {
519                    false
520                }
521            }
522        }
523    }
524
525    /// Activate a subscription by performing the handshake.
526    async fn activate_subscription(&self, subscription: &ActiveSubscription) {
527        let tenant_id = &subscription.tenant_id;
528        let sub_id = &subscription.id;
529        let handshake_max_attempts = self.config.handshake_max_attempts.max(1);
530
531        // Build handshake notification.
532        let handshake_bundle = match notification::build_handshake(subscription, &self.base_url) {
533            Ok(b) => b,
534            Err(e) => {
535                warn!(
536                    tenant_id,
537                    subscription_id = sub_id,
538                    channel_type = %subscription.channel.channel_type.as_fhir_str(),
539                    endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
540                    error = %e,
541                    "Failed to build handshake"
542                );
543                let _ =
544                    self.manager
545                        .update_status(tenant_id, sub_id, SubscriptionStatusCode::Error);
546                return;
547            }
548        };
549
550        if !self.config.handshake_initial_delay.is_zero() {
551            info!(
552                tenant_id,
553                subscription_id = sub_id,
554                channel_type = %subscription.channel.channel_type.as_fhir_str(),
555                endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
556                delay_ms = self.config.handshake_initial_delay.as_millis() as u64,
557                "Delaying subscription handshake"
558            );
559            tokio::time::sleep(self.config.handshake_initial_delay).await;
560        }
561
562        let mut attempt = 1;
563        loop {
564            info!(
565                tenant_id,
566                subscription_id = sub_id,
567                channel_type = %subscription.channel.channel_type.as_fhir_str(),
568                endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
569                attempt,
570                max_attempts = handshake_max_attempts,
571                "Sending subscription handshake"
572            );
573
574            // Perform handshake.
575            let result = match subscription.channel.channel_type {
576                ChannelType::RestHook => {
577                    self.rest_hook_channel
578                        .handshake(subscription, &handshake_bundle)
579                        .await
580                }
581                ChannelType::Websocket => {
582                    self.ws_channel
583                        .handshake(subscription, &handshake_bundle)
584                        .await
585                }
586                ChannelType::Email => match self.email_channel.as_ref() {
587                    Some(ch) => ch.handshake(subscription, &handshake_bundle).await,
588                    None => {
589                        warn!(
590                            tenant_id,
591                            subscription_id = sub_id,
592                            endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
593                            "Email channel requested but no SMTP settings configured"
594                        );
595                        let _ = self.manager.update_status(
596                            tenant_id,
597                            sub_id,
598                            SubscriptionStatusCode::Error,
599                        );
600                        return;
601                    }
602                },
603                ChannelType::Message => match self.messaging_channel.as_ref() {
604                    Some(ch) => ch.handshake(subscription, &handshake_bundle).await,
605                    None => {
606                        warn!(
607                            tenant_id,
608                            subscription_id = sub_id,
609                            endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
610                            "Messaging channel requested but messaging settings not configured"
611                        );
612                        let _ = self.manager.update_status(
613                            tenant_id,
614                            sub_id,
615                            SubscriptionStatusCode::Error,
616                        );
617                        return;
618                    }
619                },
620                _ => {
621                    warn!(
622                        tenant_id,
623                        subscription_id = sub_id,
624                        channel_type = subscription.channel.channel_type.as_fhir_str(),
625                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
626                        "Handshake not supported for channel type"
627                    );
628                    return;
629                }
630            };
631
632            match result {
633                Ok(DispatchResult::Success) => {
634                    info!(
635                        tenant_id,
636                        subscription_id = sub_id,
637                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
638                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
639                        attempt,
640                        "Handshake successful, activating subscription"
641                    );
642                    let _ = self.manager.update_status(
643                        tenant_id,
644                        sub_id,
645                        SubscriptionStatusCode::Active,
646                    );
647                    return;
648                }
649                Ok(DispatchResult::PermanentError(msg)) => {
650                    warn!(
651                        tenant_id,
652                        subscription_id = sub_id,
653                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
654                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
655                        attempt,
656                        error = %msg,
657                        "Handshake failed with permanent error"
658                    );
659                    let _ = self.manager.update_status(
660                        tenant_id,
661                        sub_id,
662                        SubscriptionStatusCode::Error,
663                    );
664                    return;
665                }
666                Ok(DispatchResult::RetryableError(msg)) => {
667                    if attempt >= handshake_max_attempts {
668                        warn!(
669                            tenant_id,
670                            subscription_id = sub_id,
671                            channel_type = %subscription.channel.channel_type.as_fhir_str(),
672                            endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
673                            attempts = attempt,
674                            error = %msg,
675                            "Handshake retries exhausted"
676                        );
677                        let _ = self.manager.update_status(
678                            tenant_id,
679                            sub_id,
680                            SubscriptionStatusCode::Error,
681                        );
682                        return;
683                    }
684
685                    let delay = calculate_handshake_retry_delay(&self.config, attempt);
686                    warn!(
687                        tenant_id,
688                        subscription_id = sub_id,
689                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
690                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
691                        attempt,
692                        next_attempt = attempt + 1,
693                        delay_ms = delay.as_millis() as u64,
694                        error = %msg,
695                        "Retrying subscription handshake"
696                    );
697                    tokio::time::sleep(delay).await;
698                    attempt += 1;
699                }
700                Err(e) => {
701                    warn!(
702                        tenant_id,
703                        subscription_id = sub_id,
704                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
705                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
706                        attempt,
707                        error = %e,
708                        "Handshake error"
709                    );
710                    let _ = self.manager.update_status(
711                        tenant_id,
712                        sub_id,
713                        SubscriptionStatusCode::Error,
714                    );
715                    return;
716                }
717            }
718        }
719    }
720
721    /// Dispatch a notification with retry logic.
722    async fn dispatch_with_retry(
723        &self,
724        subscription: &ActiveSubscription,
725        bundle: &serde_json::Value,
726        notification_type: &'static str,
727        event_number: Option<u64>,
728    ) {
729        let tenant_id = &subscription.tenant_id;
730        let sub_id = &subscription.id;
731
732        let dispatcher: &dyn ChannelDispatcher = match subscription.channel.channel_type {
733            ChannelType::RestHook => self.rest_hook_channel.as_ref(),
734            ChannelType::Websocket => self.ws_channel.as_ref(),
735            ChannelType::Email => match self.email_channel.as_deref() {
736                Some(ch) => ch,
737                None => {
738                    warn!(
739                        tenant_id,
740                        subscription_id = sub_id,
741                        notification_type,
742                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
743                        "Email dispatch requested but no SMTP settings configured"
744                    );
745                    return;
746                }
747            },
748            ChannelType::Message => match self.messaging_channel.as_deref() {
749                Some(ch) => ch,
750                None => {
751                    warn!(
752                        tenant_id,
753                        subscription_id = sub_id,
754                        notification_type,
755                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
756                        "Messaging dispatch requested but messaging settings not configured"
757                    );
758                    return;
759                }
760            },
761            _ => {
762                warn!(
763                    tenant_id,
764                    subscription_id = sub_id,
765                    notification_type,
766                    channel = subscription.channel.channel_type.as_fhir_str(),
767                    endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
768                    "No dispatcher for channel type"
769                );
770                return;
771            }
772        };
773
774        let mut attempt: u32 = 0;
775        loop {
776            match dispatcher.dispatch(subscription, bundle).await {
777                Ok(DispatchResult::Success) => {
778                    self.manager.reset_failures(tenant_id, sub_id);
779                    info!(
780                        tenant_id,
781                        subscription_id = sub_id,
782                        notification_type,
783                        event_number,
784                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
785                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
786                        "Subscription notification dispatched"
787                    );
788                    return;
789                }
790                Ok(DispatchResult::PermanentError(msg)) => {
791                    warn!(
792                        tenant_id,
793                        subscription_id = sub_id,
794                        notification_type,
795                        event_number,
796                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
797                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
798                        error = %msg,
799                        "Permanent delivery error"
800                    );
801                    self.handle_delivery_failure(tenant_id, sub_id);
802                    return;
803                }
804                Ok(DispatchResult::RetryableError(msg)) => {
805                    attempt += 1;
806                    if !retry::should_retry(&self.config, attempt) {
807                        warn!(
808                            tenant_id,
809                            subscription_id = sub_id,
810                            notification_type,
811                            event_number,
812                            channel_type = %subscription.channel.channel_type.as_fhir_str(),
813                            endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
814                            attempts = attempt,
815                            error = %msg,
816                            "Max retries exhausted"
817                        );
818                        self.handle_delivery_failure(tenant_id, sub_id);
819                        return;
820                    }
821
822                    let delay = retry::calculate_delay(&self.config, attempt);
823                    debug!(
824                        tenant_id,
825                        subscription_id = sub_id,
826                        notification_type,
827                        event_number,
828                        attempt,
829                        delay_ms = delay.as_millis() as u64,
830                        "Retrying delivery"
831                    );
832                    tokio::time::sleep(delay).await;
833                }
834                Err(e) => {
835                    error!(
836                        tenant_id,
837                        subscription_id = sub_id,
838                        notification_type,
839                        event_number,
840                        channel_type = %subscription.channel.channel_type.as_fhir_str(),
841                        endpoint = subscription.channel.endpoint.as_deref().unwrap_or(""),
842                        error = %e,
843                        "Dispatch error"
844                    );
845                    self.handle_delivery_failure(tenant_id, sub_id);
846                    return;
847                }
848            }
849        }
850    }
851
852    /// Handle a delivery failure: increment failure count and potentially
853    /// transition status to error or off.
854    fn handle_delivery_failure(&self, tenant_id: &str, subscription_id: &str) {
855        if let Some(failure_count) = self.manager.record_failure(tenant_id, subscription_id) {
856            if failure_count >= self.config.off_threshold {
857                warn!(
858                    subscription_id,
859                    failures = failure_count,
860                    "Turning off subscription after repeated failures"
861                );
862                let _ = self.manager.update_status(
863                    tenant_id,
864                    subscription_id,
865                    SubscriptionStatusCode::Off,
866                );
867            } else if failure_count >= self.config.error_threshold {
868                let _ = self.manager.update_status(
869                    tenant_id,
870                    subscription_id,
871                    SubscriptionStatusCode::Error,
872                );
873            }
874        }
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use crate::event::ResourceEventType;
882    use crate::topics::{ResourceTrigger, TopicDefinition};
883    use chrono::Utc;
884    use helios_fhir::FhirVersion;
885    use helios_persistence::tenant::TenantId;
886    use serde_json::json;
887    use std::sync::{
888        Arc,
889        atomic::{AtomicUsize, Ordering},
890    };
891    use wiremock::matchers::method;
892    use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
893
894    fn make_engine(base_url: &str) -> SubscriptionEngine {
895        let config = SubscriptionConfig {
896            max_retries: 2,
897            error_threshold: 2,
898            off_threshold: 4,
899            ..Default::default()
900        };
901        SubscriptionEngine::new(config, base_url.to_string())
902    }
903
904    fn encounter_topic() -> TopicDefinition {
905        TopicDefinition {
906            canonical_url: "http://example.org/topic/encounter-start".to_string(),
907            title: Some("Encounter Start".to_string()),
908            resource_triggers: vec![ResourceTrigger {
909                resource_type: "Encounter".to_string(),
910                interactions: vec![ResourceEventType::Create],
911                fhirpath_criteria: None,
912            }],
913            can_filter_by: vec![],
914            notification_shape: vec![],
915        }
916    }
917
918    fn encounter_event() -> ResourceEvent {
919        ResourceEvent {
920            tenant_id: TenantId::new("t1"),
921            fhir_version: FhirVersion::default(),
922            resource_type: "Encounter".to_string(),
923            resource_id: "enc-1".to_string(),
924            version_id: "1".to_string(),
925            event_type: ResourceEventType::Create,
926            resource: Some(json!({
927                "resourceType": "Encounter",
928                "id": "enc-1",
929                "status": "in-progress"
930            })),
931            previous_resource: None,
932            timestamp: Utc::now(),
933        }
934    }
935
936    #[derive(Clone)]
937    struct FailFirstHandshake {
938        attempts: Arc<AtomicUsize>,
939    }
940
941    impl Respond for FailFirstHandshake {
942        fn respond(&self, _request: &Request) -> ResponseTemplate {
943            if self.attempts.fetch_add(1, Ordering::SeqCst) == 0 {
944                ResponseTemplate::new(500)
945            } else {
946                ResponseTemplate::new(200)
947            }
948        }
949    }
950
951    fn topic_event() -> ResourceEvent {
952        ResourceEvent {
953            tenant_id: TenantId::new("t1"),
954            fhir_version: FhirVersion::default(),
955            resource_type: "SubscriptionTopic".to_string(),
956            resource_id: "topic-1".to_string(),
957            version_id: "1".to_string(),
958            event_type: ResourceEventType::Create,
959            resource: Some(json!({
960                "resourceType": "SubscriptionTopic",
961                "id": "topic-1",
962                "url": "http://example.org/topic/encounter-start",
963                "resourceTrigger": [{
964                    "resource": "Encounter",
965                    "supportedInteraction": ["create"]
966                }]
967            })),
968            previous_resource: None,
969            timestamp: Utc::now(),
970        }
971    }
972
973    #[cfg(feature = "R4")]
974    fn r4_basic_topic_event() -> ResourceEvent {
975        ResourceEvent {
976            tenant_id: TenantId::new("t1"),
977            fhir_version: FhirVersion::R4,
978            resource_type: "Basic".to_string(),
979            resource_id: "topic-basic-1".to_string(),
980            version_id: "1".to_string(),
981            event_type: ResourceEventType::Create,
982            resource: Some(json!({
983                "resourceType": "Basic",
984                "id": "topic-basic-1",
985                "code": {
986                    "coding": [{
987                        "system": "http://hl7.org/fhir/fhir-types",
988                        "code": "SubscriptionTopic"
989                    }]
990                },
991                "extension": [{
992                    "url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.url",
993                    "valueUri": "http://example.org/topic/encounter-start-basic"
994                }, {
995                    "url": "http://hl7.org/fhir/4.3/StructureDefinition/extension-SubscriptionTopic.resourceTrigger",
996                    "extension": [
997                        { "url": "resource", "valueUri": "http://hl7.org/fhir/StructureDefinition/Encounter" },
998                        { "url": "supportedInteraction", "valueCode": "create" }
999                    ]
1000                }]
1001            })),
1002            previous_resource: None,
1003            timestamp: Utc::now(),
1004        }
1005    }
1006
1007    #[tokio::test]
1008    async fn test_topic_event_registers_topic() {
1009        let engine = make_engine("http://localhost:8080");
1010
1011        assert!(engine.topic_registry().list_topics().is_empty());
1012
1013        engine.on_resource_event(topic_event()).await;
1014
1015        let topics = engine.topic_registry().list_topics();
1016        assert_eq!(topics.len(), 1);
1017        assert!(topics.contains(&"http://example.org/topic/encounter-start".to_string()));
1018    }
1019
1020    #[tokio::test]
1021    async fn test_topic_delete_event_removes_topic_without_payload() {
1022        let engine = make_engine("http://localhost:8080");
1023        engine.on_resource_event(topic_event()).await;
1024
1025        let delete_event = ResourceEvent {
1026            tenant_id: TenantId::new("t1"),
1027            fhir_version: FhirVersion::default(),
1028            resource_type: "SubscriptionTopic".to_string(),
1029            resource_id: "topic-1".to_string(),
1030            version_id: "2".to_string(),
1031            event_type: ResourceEventType::Delete,
1032            resource: None,
1033            previous_resource: None,
1034            timestamp: Utc::now(),
1035        };
1036
1037        engine.on_resource_event(delete_event).await;
1038
1039        let topics = engine.topic_registry().list_topics();
1040        assert!(topics.is_empty());
1041    }
1042
1043    #[cfg(feature = "R4")]
1044    #[tokio::test]
1045    async fn test_r4_basic_topic_event_registers_topic() {
1046        let engine = make_engine("http://localhost:8080");
1047
1048        assert!(engine.topic_registry().list_topics().is_empty());
1049
1050        engine.on_resource_event(r4_basic_topic_event()).await;
1051
1052        let topics = engine.topic_registry().list_topics();
1053        assert_eq!(topics.len(), 1);
1054        assert!(topics.contains(&"http://example.org/topic/encounter-start-basic".to_string()));
1055    }
1056
1057    #[tokio::test]
1058    async fn test_subscription_event_registers_subscription() {
1059        let server = MockServer::start().await;
1060        Mock::given(method("POST"))
1061            .respond_with(ResponseTemplate::new(200))
1062            .mount(&server)
1063            .await;
1064
1065        let engine = make_engine("http://localhost:8080");
1066        engine.topic_registry().add_topic(encounter_topic());
1067
1068        // Create a subscription event.
1069        let sub_resource = crate::manager::tests::build_subscription_json(
1070            "http://example.org/topic/encounter-start",
1071            "rest-hook",
1072            Some(&format!("{}/webhook", server.uri())),
1073        );
1074
1075        let event = ResourceEvent {
1076            tenant_id: TenantId::new("t1"),
1077            fhir_version: FhirVersion::default(),
1078            resource_type: "Subscription".to_string(),
1079            resource_id: "sub-1".to_string(),
1080            version_id: "1".to_string(),
1081            event_type: ResourceEventType::Create,
1082            resource: Some(sub_resource),
1083            previous_resource: None,
1084            timestamp: Utc::now(),
1085        };
1086
1087        engine.on_resource_event(event).await;
1088
1089        // Subscription should be registered.
1090        let sub = engine.manager().get_subscription("t1", "sub-1");
1091        assert!(sub.is_some());
1092    }
1093
1094    #[tokio::test]
1095    async fn test_subscription_activation_retries_retryable_handshake_failure() {
1096        let server = MockServer::start().await;
1097        let attempts = Arc::new(AtomicUsize::new(0));
1098
1099        Mock::given(method("POST"))
1100            .respond_with(FailFirstHandshake {
1101                attempts: Arc::clone(&attempts),
1102            })
1103            .mount(&server)
1104            .await;
1105
1106        let config = SubscriptionConfig {
1107            handshake_max_attempts: 2,
1108            handshake_retry_initial_delay: std::time::Duration::from_millis(1),
1109            handshake_retry_max_delay: std::time::Duration::from_millis(1),
1110            ..Default::default()
1111        };
1112        let engine = SubscriptionEngine::new(config, "http://localhost:8080".to_string());
1113        engine.topic_registry().add_topic(encounter_topic());
1114
1115        let sub_resource = crate::manager::tests::build_subscription_json(
1116            "http://example.org/topic/encounter-start",
1117            "rest-hook",
1118            Some(&format!("{}/webhook", server.uri())),
1119        );
1120
1121        let event = ResourceEvent {
1122            tenant_id: TenantId::new("t1"),
1123            fhir_version: FhirVersion::default(),
1124            resource_type: "Subscription".to_string(),
1125            resource_id: "sub-1".to_string(),
1126            version_id: "1".to_string(),
1127            event_type: ResourceEventType::Create,
1128            resource: Some(sub_resource),
1129            previous_resource: None,
1130            timestamp: Utc::now(),
1131        };
1132
1133        engine.on_resource_event(event).await;
1134
1135        let sub = engine.manager().get_subscription("t1", "sub-1").unwrap();
1136        assert_eq!(sub.status, SubscriptionStatusCode::Active);
1137        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1138    }
1139
1140    #[tokio::test]
1141    async fn test_full_pipeline_event_to_dispatch() {
1142        let server = MockServer::start().await;
1143        Mock::given(method("POST"))
1144            .respond_with(ResponseTemplate::new(200))
1145            .expect(2) // handshake + event notification
1146            .mount(&server)
1147            .await;
1148
1149        let engine = make_engine("http://localhost:8080");
1150        engine.topic_registry().add_topic(encounter_topic());
1151
1152        // Register and activate a subscription.
1153        let sub_resource = crate::manager::tests::build_subscription_json(
1154            "http://example.org/topic/encounter-start",
1155            "rest-hook",
1156            Some(&format!("{}/webhook", server.uri())),
1157        );
1158
1159        let sub_event = ResourceEvent {
1160            tenant_id: TenantId::new("t1"),
1161            fhir_version: FhirVersion::default(),
1162            resource_type: "Subscription".to_string(),
1163            resource_id: "sub-1".to_string(),
1164            version_id: "1".to_string(),
1165            event_type: ResourceEventType::Create,
1166            resource: Some(sub_resource),
1167            previous_resource: None,
1168            timestamp: Utc::now(),
1169        };
1170
1171        engine.on_resource_event(sub_event).await;
1172
1173        // Fire a matching event.
1174        engine.on_resource_event(encounter_event()).await;
1175
1176        // Verify the mock received the requests.
1177        // (wiremock's expect(2) will panic on drop if not satisfied)
1178    }
1179
1180    #[tokio::test]
1181    async fn test_no_dispatch_when_no_matching_subscriptions() {
1182        let engine = make_engine("http://localhost:8080");
1183        engine.topic_registry().add_topic(encounter_topic());
1184
1185        // Fire an event with no subscriptions registered.
1186        engine.on_resource_event(encounter_event()).await;
1187        // Should complete without error.
1188    }
1189
1190    #[tokio::test]
1191    async fn test_no_dispatch_for_non_matching_resource_type() {
1192        let server = MockServer::start().await;
1193        Mock::given(method("POST"))
1194            .respond_with(ResponseTemplate::new(200))
1195            .expect(1) // Only the handshake
1196            .mount(&server)
1197            .await;
1198
1199        let engine = make_engine("http://localhost:8080");
1200        engine.topic_registry().add_topic(encounter_topic());
1201
1202        // Register subscription for Encounter topic.
1203        let sub_resource = crate::manager::tests::build_subscription_json(
1204            "http://example.org/topic/encounter-start",
1205            "rest-hook",
1206            Some(&format!("{}/webhook", server.uri())),
1207        );
1208
1209        let sub_event = ResourceEvent {
1210            tenant_id: TenantId::new("t1"),
1211            fhir_version: FhirVersion::default(),
1212            resource_type: "Subscription".to_string(),
1213            resource_id: "sub-1".to_string(),
1214            version_id: "1".to_string(),
1215            event_type: ResourceEventType::Create,
1216            resource: Some(sub_resource),
1217            previous_resource: None,
1218            timestamp: Utc::now(),
1219        };
1220
1221        engine.on_resource_event(sub_event).await;
1222
1223        // Fire a Patient event (doesn't match Encounter topic).
1224        let patient_event = ResourceEvent {
1225            tenant_id: TenantId::new("t1"),
1226            fhir_version: FhirVersion::default(),
1227            resource_type: "Patient".to_string(),
1228            resource_id: "pat-1".to_string(),
1229            version_id: "1".to_string(),
1230            event_type: ResourceEventType::Create,
1231            resource: Some(json!({"resourceType": "Patient", "id": "pat-1"})),
1232            previous_resource: None,
1233            timestamp: Utc::now(),
1234        };
1235
1236        engine.on_resource_event(patient_event).await;
1237        // Only 1 mock call (handshake), not 2.
1238    }
1239
1240    #[tokio::test]
1241    async fn test_delivery_failure_updates_status() {
1242        let server = MockServer::start().await;
1243
1244        // First request (handshake) succeeds, subsequent fail.
1245        Mock::given(method("POST"))
1246            .respond_with(ResponseTemplate::new(200))
1247            .up_to_n_times(1)
1248            .mount(&server)
1249            .await;
1250
1251        Mock::given(method("POST"))
1252            .respond_with(ResponseTemplate::new(500))
1253            .mount(&server)
1254            .await;
1255
1256        let config = SubscriptionConfig {
1257            max_retries: 1,
1258            error_threshold: 1,
1259            off_threshold: 3,
1260            ..Default::default()
1261        };
1262        let engine = SubscriptionEngine::new(config, "http://localhost:8080".to_string());
1263        engine.topic_registry().add_topic(encounter_topic());
1264
1265        // Register subscription.
1266        let sub_resource = crate::manager::tests::build_subscription_json(
1267            "http://example.org/topic/encounter-start",
1268            "rest-hook",
1269            Some(&format!("{}/webhook", server.uri())),
1270        );
1271
1272        let sub_event = ResourceEvent {
1273            tenant_id: TenantId::new("t1"),
1274            fhir_version: FhirVersion::default(),
1275            resource_type: "Subscription".to_string(),
1276            resource_id: "sub-1".to_string(),
1277            version_id: "1".to_string(),
1278            event_type: ResourceEventType::Create,
1279            resource: Some(sub_resource),
1280            previous_resource: None,
1281            timestamp: Utc::now(),
1282        };
1283
1284        engine.on_resource_event(sub_event).await;
1285
1286        // Fire a matching event (will fail delivery).
1287        engine.on_resource_event(encounter_event()).await;
1288
1289        // Check subscription status changed to error.
1290        let sub = engine.manager().get_subscription("t1", "sub-1").unwrap();
1291        assert_eq!(sub.status, SubscriptionStatusCode::Error);
1292    }
1293
1294    #[tokio::test]
1295    async fn test_subscription_delete_event() {
1296        let engine = make_engine("http://localhost:8080");
1297        engine.topic_registry().add_topic(encounter_topic());
1298
1299        // Register directly.
1300        let resource = crate::manager::tests::default_subscription_json();
1301        engine
1302            .manager()
1303            .register("t1", "sub-1", &resource, FhirVersion::default())
1304            .unwrap();
1305
1306        assert!(engine.manager().get_subscription("t1", "sub-1").is_some());
1307
1308        // Delete event.
1309        let delete_event = ResourceEvent {
1310            tenant_id: TenantId::new("t1"),
1311            fhir_version: FhirVersion::default(),
1312            resource_type: "Subscription".to_string(),
1313            resource_id: "sub-1".to_string(),
1314            version_id: "2".to_string(),
1315            event_type: ResourceEventType::Delete,
1316            resource: None,
1317            previous_resource: None,
1318            timestamp: Utc::now(),
1319        };
1320
1321        engine.on_resource_event(delete_event).await;
1322
1323        assert!(engine.manager().get_subscription("t1", "sub-1").is_none());
1324    }
1325
1326    #[tokio::test]
1327    async fn test_tenant_isolation() {
1328        let server = MockServer::start().await;
1329        Mock::given(method("POST"))
1330            .respond_with(ResponseTemplate::new(200))
1331            .expect(2) // 1 handshake + 1 event notification (only tenant-a)
1332            .mount(&server)
1333            .await;
1334
1335        let engine = make_engine("http://localhost:8080");
1336        engine.topic_registry().add_topic(encounter_topic());
1337
1338        // Register subscription for tenant-a.
1339        let sub_resource = crate::manager::tests::build_subscription_json(
1340            "http://example.org/topic/encounter-start",
1341            "rest-hook",
1342            Some(&format!("{}/webhook", server.uri())),
1343        );
1344
1345        let sub_event_a = ResourceEvent {
1346            tenant_id: TenantId::new("tenant-a"),
1347            fhir_version: FhirVersion::default(),
1348            resource_type: "Subscription".to_string(),
1349            resource_id: "sub-a".to_string(),
1350            version_id: "1".to_string(),
1351            event_type: ResourceEventType::Create,
1352            resource: Some(sub_resource),
1353            previous_resource: None,
1354            timestamp: Utc::now(),
1355        };
1356
1357        engine.on_resource_event(sub_event_a).await;
1358
1359        // Fire event from tenant-a (should match).
1360        let mut event_a = encounter_event();
1361        event_a.tenant_id = TenantId::new("tenant-a");
1362        engine.on_resource_event(event_a).await;
1363
1364        // Fire event from tenant-b (should NOT match).
1365        let mut event_b = encounter_event();
1366        event_b.tenant_id = TenantId::new("tenant-b");
1367        engine.on_resource_event(event_b).await;
1368
1369        // wiremock expects exactly 2 calls (handshake + 1 notification).
1370    }
1371}