Skip to main content

helios_subscriptions/manager/
mod.rs

1//! Subscription manager.
2//!
3//! Manages the lifecycle of `Subscription` resources: CRUD, status tracking,
4//! filter validation, and in-memory indexing of active subscriptions.
5
6pub mod filters;
7pub mod status;
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use helios_fhir::FhirVersion;
13use tracing::{debug, warn};
14
15use crate::error::SubscriptionError;
16use crate::topics::InMemoryTopicRegistry;
17
18pub use filters::SubscriptionFilter;
19pub use status::SubscriptionStatusCode;
20
21/// Payload content level requested by the subscriber.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PayloadContent {
24    /// No resource content in the notification.
25    Empty,
26    /// Resource references only (fullUrl + request, no resource body).
27    IdOnly,
28    /// Full resource content included.
29    FullResource,
30}
31
32impl PayloadContent {
33    /// Parse from a FHIR string value.
34    pub fn from_fhir_str(s: &str) -> Option<Self> {
35        match s {
36            "empty" => Some(Self::Empty),
37            "id-only" => Some(Self::IdOnly),
38            "full-resource" => Some(Self::FullResource),
39            _ => None,
40        }
41    }
42
43    /// Returns the FHIR string representation.
44    pub fn as_fhir_str(&self) -> &'static str {
45        match self {
46            Self::Empty => "empty",
47            Self::IdOnly => "id-only",
48            Self::FullResource => "full-resource",
49        }
50    }
51}
52
53/// Channel type for a subscription.
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub enum ChannelType {
56    RestHook,
57    Websocket,
58    Email,
59    Message,
60    Custom(String),
61}
62
63impl ChannelType {
64    /// Parse from a FHIR channel type string.
65    pub fn from_fhir_str(s: &str) -> Self {
66        match s {
67            "rest-hook" => Self::RestHook,
68            "websocket" => Self::Websocket,
69            "email" => Self::Email,
70            "message" => Self::Message,
71            other => Self::Custom(other.to_string()),
72        }
73    }
74
75    /// Returns the FHIR string representation.
76    pub fn as_fhir_str(&self) -> &str {
77        match self {
78            Self::RestHook => "rest-hook",
79            Self::Websocket => "websocket",
80            Self::Email => "email",
81            Self::Message => "message",
82            Self::Custom(s) => s,
83        }
84    }
85}
86
87/// Channel configuration for a subscription.
88#[derive(Debug, Clone)]
89pub struct ChannelConfig {
90    pub channel_type: ChannelType,
91    pub endpoint: Option<String>,
92    pub payload_mime_type: Option<String>,
93    pub payload_content: PayloadContent,
94    pub headers: Vec<String>,
95    pub heartbeat_period: Option<u32>,
96    pub timeout: Option<u32>,
97    pub max_count: Option<u32>,
98}
99
100/// An active subscription tracked by the manager.
101///
102/// This is a version-agnostic in-memory representation of the essential fields
103/// needed for event evaluation and notification delivery.
104#[derive(Debug, Clone)]
105pub struct ActiveSubscription {
106    /// The subscription resource ID.
107    pub id: String,
108
109    /// The canonical URL of the SubscriptionTopic.
110    pub topic_url: String,
111
112    /// Current status.
113    pub status: SubscriptionStatusCode,
114
115    /// Channel configuration.
116    pub channel: ChannelConfig,
117
118    /// Parsed filter criteria.
119    pub filters: Vec<SubscriptionFilter>,
120
121    /// The FHIR version of the subscription resource.
122    pub fhir_version: FhirVersion,
123
124    /// Monotonically increasing event counter.
125    pub events_since_start: u64,
126
127    /// Consecutive delivery failure count (for error/off transitions).
128    pub consecutive_failures: u32,
129
130    /// Tenant ID that owns this subscription.
131    pub tenant_id: String,
132}
133
134/// Manages the lifecycle and in-memory indexing of active subscriptions.
135pub struct SubscriptionManager {
136    /// Active subscriptions keyed by (tenant_id, subscription_id).
137    subscriptions: DashMap<(String, String), ActiveSubscription>,
138
139    /// Topic registry for validation.
140    topic_registry: Arc<InMemoryTopicRegistry>,
141
142    /// Supported channel types.
143    supported_channels: Vec<String>,
144}
145
146impl SubscriptionManager {
147    /// Creates a new subscription manager.
148    pub fn new(
149        topic_registry: Arc<InMemoryTopicRegistry>,
150        supported_channels: Vec<String>,
151    ) -> Self {
152        Self {
153            subscriptions: DashMap::new(),
154            topic_registry,
155            supported_channels,
156        }
157    }
158
159    /// Registers a subscription from a FHIR Subscription resource JSON.
160    ///
161    /// Validates the topic URL, channel type, and filter criteria.
162    /// On success, the subscription is stored in the in-memory index.
163    pub fn register(
164        &self,
165        tenant_id: &str,
166        subscription_id: &str,
167        resource: &serde_json::Value,
168        fhir_version: FhirVersion,
169    ) -> Result<ActiveSubscription, SubscriptionError> {
170        // Parse the subscription resource.
171        let topic_url = extract_topic_url(resource, fhir_version)?;
172        let channel = extract_channel_config(resource, fhir_version)?;
173        let filter_strings = extract_filter_criteria(resource, fhir_version);
174
175        // Validate topic exists.
176        let topic = self.topic_registry.get_topic(&topic_url).ok_or_else(|| {
177            SubscriptionError::TopicNotFound {
178                url: topic_url.clone(),
179            }
180        })?;
181
182        // Validate channel type is supported.
183        if !self
184            .supported_channels
185            .contains(&channel.channel_type.as_fhir_str().to_string())
186        {
187            return Err(SubscriptionError::UnsupportedChannel {
188                channel_type: channel.channel_type.as_fhir_str().to_string(),
189            });
190        }
191
192        // Validate rest-hook endpoint is present.
193        if channel.channel_type == ChannelType::RestHook && channel.endpoint.is_none() {
194            return Err(SubscriptionError::InvalidEndpoint {
195                message: "rest-hook channel requires an endpoint".to_string(),
196            });
197        }
198
199        // Validate email endpoint is a non-empty mailto: URI.
200        if channel.channel_type == ChannelType::Email {
201            let endpoint =
202                channel
203                    .endpoint
204                    .as_deref()
205                    .ok_or_else(|| SubscriptionError::InvalidEndpoint {
206                        message: "email channel requires a mailto: endpoint".to_string(),
207                    })?;
208            match endpoint.strip_prefix("mailto:") {
209                Some(rest) if !rest.trim().is_empty() => {}
210                _ => {
211                    return Err(SubscriptionError::InvalidEndpoint {
212                        message: "email endpoint must be a non-empty mailto: URI".to_string(),
213                    });
214                }
215            }
216        }
217
218        // Parse and validate filters.
219        let mut parsed_filters = Vec::new();
220        for filter_str in &filter_strings {
221            let filter = filters::parse_filter_string(filter_str)?;
222            parsed_filters.push(filter);
223        }
224        filters::validate_filters(&parsed_filters, &topic.can_filter_by)?;
225
226        let status = resource
227            .get("status")
228            .and_then(|v| v.as_str())
229            .and_then(SubscriptionStatusCode::from_fhir_str)
230            .unwrap_or(SubscriptionStatusCode::Requested);
231
232        let subscription = ActiveSubscription {
233            id: subscription_id.to_string(),
234            topic_url,
235            status,
236            channel,
237            filters: parsed_filters,
238            fhir_version,
239            events_since_start: 0,
240            consecutive_failures: 0,
241            tenant_id: tenant_id.to_string(),
242        };
243
244        debug!(
245            tenant = tenant_id,
246            subscription_id,
247            topic = %subscription.topic_url,
248            "Registered subscription"
249        );
250
251        let key = (tenant_id.to_string(), subscription_id.to_string());
252        self.subscriptions.insert(key, subscription.clone());
253
254        Ok(subscription)
255    }
256
257    /// Removes a subscription from the in-memory index.
258    pub fn deregister(&self, tenant_id: &str, subscription_id: &str) -> bool {
259        let key = (tenant_id.to_string(), subscription_id.to_string());
260        let removed = self.subscriptions.remove(&key).is_some();
261        if removed {
262            debug!(
263                tenant = tenant_id,
264                subscription_id, "Deregistered subscription"
265            );
266        }
267        removed
268    }
269
270    /// Returns all active subscriptions for a given topic URL within a tenant.
271    pub fn active_subscriptions_for_topic(
272        &self,
273        tenant_id: &str,
274        topic_url: &str,
275    ) -> Vec<ActiveSubscription> {
276        self.subscriptions
277            .iter()
278            .filter(|entry| {
279                let sub = entry.value();
280                sub.tenant_id == tenant_id
281                    && sub.topic_url == topic_url
282                    && sub.status == SubscriptionStatusCode::Active
283            })
284            .map(|entry| entry.value().clone())
285            .collect()
286    }
287
288    /// Gets the current status of a subscription.
289    pub fn get_subscription(
290        &self,
291        tenant_id: &str,
292        subscription_id: &str,
293    ) -> Option<ActiveSubscription> {
294        let key = (tenant_id.to_string(), subscription_id.to_string());
295        self.subscriptions.get(&key).map(|entry| entry.clone())
296    }
297
298    /// Updates the status of a subscription.
299    pub fn update_status(
300        &self,
301        tenant_id: &str,
302        subscription_id: &str,
303        new_status: SubscriptionStatusCode,
304    ) -> Result<SubscriptionStatusCode, SubscriptionError> {
305        let key = (tenant_id.to_string(), subscription_id.to_string());
306        let mut entry = self.subscriptions.get_mut(&key).ok_or_else(|| {
307            SubscriptionError::Internal(format!("subscription not found: {subscription_id}"))
308        })?;
309
310        let old_status = entry.status;
311        if !old_status.can_transition_to(new_status) {
312            return Err(SubscriptionError::InvalidStatusTransition {
313                from: old_status.to_string(),
314                to: new_status.to_string(),
315            });
316        }
317
318        entry.status = new_status;
319        debug!(
320            tenant = tenant_id,
321            subscription_id,
322            from = %old_status,
323            to = %new_status,
324            "Status transition"
325        );
326
327        Ok(old_status)
328    }
329
330    /// Increments the event counter and returns the new value.
331    pub fn increment_event_count(&self, tenant_id: &str, subscription_id: &str) -> Option<u64> {
332        let key = (tenant_id.to_string(), subscription_id.to_string());
333        let mut entry = self.subscriptions.get_mut(&key)?;
334        entry.events_since_start += 1;
335        Some(entry.events_since_start)
336    }
337
338    /// Records a delivery failure and returns the new consecutive failure count.
339    pub fn record_failure(&self, tenant_id: &str, subscription_id: &str) -> Option<u32> {
340        let key = (tenant_id.to_string(), subscription_id.to_string());
341        let mut entry = self.subscriptions.get_mut(&key)?;
342        entry.consecutive_failures += 1;
343        let count = entry.consecutive_failures;
344        warn!(
345            tenant = tenant_id,
346            subscription_id,
347            consecutive_failures = count,
348            "Delivery failure recorded"
349        );
350        Some(count)
351    }
352
353    /// Resets the consecutive failure counter after a successful delivery.
354    pub fn reset_failures(&self, tenant_id: &str, subscription_id: &str) {
355        let key = (tenant_id.to_string(), subscription_id.to_string());
356        if let Some(mut entry) = self.subscriptions.get_mut(&key) {
357            entry.consecutive_failures = 0;
358        }
359    }
360
361    /// Returns all subscriptions (for heartbeat checking, initialization, etc.).
362    pub fn all_subscriptions(&self) -> Vec<ActiveSubscription> {
363        self.subscriptions
364            .iter()
365            .map(|e| e.value().clone())
366            .collect()
367    }
368}
369
370// --- Version-aware field extraction ---
371
372/// Extract the topic canonical URL from a Subscription resource.
373fn extract_topic_url(
374    resource: &serde_json::Value,
375    fhir_version: FhirVersion,
376) -> Result<String, SubscriptionError> {
377    match fhir_version {
378        // R4: topic URL is in the `criteria` field (backport IG convention)
379        // or in the `backport-topic-canonical` extension.
380        #[cfg(feature = "R4")]
381        FhirVersion::R4 => {
382            // First try backport extension.
383            if let Some(url) = find_extension_value_url(
384                resource,
385                "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
386            ) {
387                return Ok(url);
388            }
389            // Fall back to criteria field.
390            resource
391                .get("criteria")
392                .and_then(|v| v.as_str())
393                .map(|s| s.to_string())
394                .ok_or_else(|| SubscriptionError::InvalidSubscription {
395                    message: "R4 Subscription missing topic URL (criteria or backport-topic-canonical extension)".to_string(),
396                })
397        }
398
399        // R4B+: topic URL is in the `topic` field (canonical reference).
400        #[allow(unreachable_patterns)]
401        _ => resource
402            .get("topic")
403            .and_then(|v| v.as_str())
404            .map(|s| s.to_string())
405            .ok_or_else(|| SubscriptionError::InvalidSubscription {
406                message: "Subscription missing 'topic' field".to_string(),
407            }),
408    }
409}
410
411/// Extract channel configuration from a Subscription resource.
412fn extract_channel_config(
413    resource: &serde_json::Value,
414    fhir_version: FhirVersion,
415) -> Result<ChannelConfig, SubscriptionError> {
416    match fhir_version {
417        // R4: channel is a nested object with type, endpoint, payload, header.
418        #[cfg(feature = "R4")]
419        FhirVersion::R4 => {
420            let channel =
421                resource
422                    .get("channel")
423                    .ok_or_else(|| SubscriptionError::InvalidSubscription {
424                        message: "R4 Subscription missing 'channel' field".to_string(),
425                    })?;
426
427            let channel_type_str = channel
428                .get("type")
429                .and_then(|v| v.as_str())
430                .unwrap_or("rest-hook");
431
432            let endpoint = channel
433                .get("endpoint")
434                .and_then(|v| v.as_str())
435                .map(String::from);
436
437            let payload_mime_type = channel
438                .get("payload")
439                .and_then(|v| v.as_str())
440                .map(String::from);
441
442            let headers = channel
443                .get("header")
444                .and_then(|v| v.as_array())
445                .map(|arr| {
446                    arr.iter()
447                        .filter_map(|v| v.as_str().map(String::from))
448                        .collect()
449                })
450                .unwrap_or_default();
451
452            // Payload content from backport extension.
453            // Prefer channel._payload extension (spec-compliant) and fall back
454            // to root extension for backward compatibility.
455            let payload_content = find_channel_payload_content_code(resource)
456                .or_else(|| {
457                    find_extension_value_code(
458                        resource,
459                        "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
460                    )
461                })
462            .and_then(|s| PayloadContent::from_fhir_str(&s))
463            .unwrap_or(PayloadContent::IdOnly);
464
465            let heartbeat_period = find_extension_value_unsigned_int(
466                resource,
467                "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-heartbeat-period",
468            );
469
470            let timeout = find_extension_value_unsigned_int(
471                resource,
472                "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-timeout",
473            );
474
475            let max_count = find_extension_value_unsigned_int(
476                resource,
477                "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-max-count",
478            );
479
480            Ok(ChannelConfig {
481                channel_type: ChannelType::from_fhir_str(channel_type_str),
482                endpoint,
483                payload_mime_type,
484                payload_content,
485                headers,
486                heartbeat_period,
487                timeout,
488                max_count,
489            })
490        }
491
492        // R4B+: channel type is `channelType` coding, endpoint, content, etc.
493        #[allow(unreachable_patterns)]
494        _ => {
495            let channel_type_str = resource
496                .get("channelType")
497                .and_then(|v| v.get("code"))
498                .and_then(|v| v.as_str())
499                .unwrap_or("rest-hook");
500
501            let endpoint = resource
502                .get("endpoint")
503                .and_then(|v| v.as_str())
504                .map(String::from);
505
506            let payload_content = resource
507                .get("content")
508                .and_then(|v| v.as_str())
509                .and_then(PayloadContent::from_fhir_str)
510                .unwrap_or(PayloadContent::IdOnly);
511
512            let payload_mime_type = resource
513                .get("contentType")
514                .and_then(|v| v.as_str())
515                .map(String::from);
516
517            let heartbeat_period = resource
518                .get("heartbeatPeriod")
519                .and_then(|v| v.as_u64())
520                .map(|v| v as u32);
521
522            let timeout = resource
523                .get("timeout")
524                .and_then(|v| v.as_u64())
525                .map(|v| v as u32);
526
527            let max_count = resource
528                .get("maxCount")
529                .and_then(|v| v.as_u64())
530                .map(|v| v as u32);
531
532            let headers = resource
533                .get("parameter")
534                .and_then(|v| v.as_array())
535                .map(|arr| {
536                    arr.iter()
537                        .filter_map(|p| {
538                            let name = p.get("name")?.as_str()?;
539                            let value = p.get("value")?.as_str()?;
540                            Some(format!("{name}: {value}"))
541                        })
542                        .collect()
543                })
544                .unwrap_or_default();
545
546            Ok(ChannelConfig {
547                channel_type: ChannelType::from_fhir_str(channel_type_str),
548                endpoint,
549                payload_mime_type,
550                payload_content,
551                headers,
552                heartbeat_period,
553                timeout,
554                max_count,
555            })
556        }
557    }
558}
559
560/// Extract filter criteria strings from a Subscription resource.
561fn extract_filter_criteria(resource: &serde_json::Value, fhir_version: FhirVersion) -> Vec<String> {
562    match fhir_version {
563        // R4: filter criteria from backport extension.
564        #[cfg(feature = "R4")]
565        FhirVersion::R4 => find_all_extension_values_string(
566            resource,
567            "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
568        ),
569
570        // R4B+: filterBy array with resourceType, filterParameter, comparator, value.
571        #[allow(unreachable_patterns)]
572        _ => {
573            let filter_by = match resource.get("filterBy").and_then(|v| v.as_array()) {
574                Some(arr) => arr,
575                None => return Vec::new(),
576            };
577
578            filter_by
579                .iter()
580                .filter_map(|f| {
581                    let param = f.get("filterParameter")?.as_str()?;
582                    let value = f.get("value")?.as_str()?;
583                    let resource_type = f.get("resourceType").and_then(|v| v.as_str());
584                    let comparator = f.get("comparator").and_then(|v| v.as_str());
585                    let value_with_comparator = comparator
586                        .filter(|c| !c.is_empty())
587                        .map(|c| format!("{c}:{value}"))
588                        .unwrap_or_else(|| value.to_string());
589
590                    let filter_str = if let Some(rt) = resource_type {
591                        format!("{rt}?{param}={value_with_comparator}")
592                    } else {
593                        format!("{param}={value_with_comparator}")
594                    };
595
596                    Some(filter_str)
597                })
598                .collect()
599        }
600    }
601}
602
603// --- Extension helpers for R4 backport ---
604
605/// Find a single extension with the given URL and return its `valueUrl`.
606#[cfg(feature = "R4")]
607fn find_extension_value_url(resource: &serde_json::Value, url: &str) -> Option<String> {
608    resource
609        .get("extension")?
610        .as_array()?
611        .iter()
612        .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
613        .and_then(|ext| ext.get("valueUrl").or_else(|| ext.get("valueCanonical")))
614        .and_then(|v| v.as_str())
615        .map(String::from)
616}
617
618/// Find a single extension with the given URL and return its `valueCode`.
619#[cfg(feature = "R4")]
620fn find_extension_value_code(resource: &serde_json::Value, url: &str) -> Option<String> {
621    resource
622        .get("extension")?
623        .as_array()?
624        .iter()
625        .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
626        .and_then(|ext| ext.get("valueCode"))
627        .and_then(|v| v.as_str())
628        .map(String::from)
629}
630
631#[cfg(feature = "R4")]
632fn find_channel_payload_content_code(resource: &serde_json::Value) -> Option<String> {
633    resource
634        .get("channel")?
635        .get("_payload")?
636        .get("extension")?
637        .as_array()?
638        .iter()
639        .find(|ext| {
640            ext.get("url").and_then(|v| v.as_str())
641                == Some(
642                    "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
643                )
644        })
645        .and_then(|ext| ext.get("valueCode"))
646        .and_then(|v| v.as_str())
647        .map(String::from)
648}
649
650/// Find a single extension with the given URL and return its `valueUnsignedInt`.
651#[cfg(feature = "R4")]
652fn find_extension_value_unsigned_int(resource: &serde_json::Value, url: &str) -> Option<u32> {
653    resource
654        .get("extension")?
655        .as_array()?
656        .iter()
657        .find(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
658        .and_then(|ext| ext.get("valueUnsignedInt"))
659        .and_then(|v| v.as_u64())
660        .map(|v| v as u32)
661}
662
663/// Find all extensions with the given URL and return their `valueString` values.
664#[cfg(feature = "R4")]
665fn find_all_extension_values_string(resource: &serde_json::Value, url: &str) -> Vec<String> {
666    resource
667        .get("extension")
668        .and_then(|v| v.as_array())
669        .map(|arr| {
670            arr.iter()
671                .filter(|ext| ext.get("url").and_then(|v| v.as_str()) == Some(url))
672                .filter_map(|ext| {
673                    ext.get("valueString")
674                        .and_then(|v| v.as_str())
675                        .map(String::from)
676                })
677                .collect()
678        })
679        .unwrap_or_default()
680}
681
682#[cfg(test)]
683pub(crate) mod tests {
684    use super::*;
685    use crate::event::ResourceEventType;
686    use crate::topics::{FilterDefinition, ResourceTrigger, TopicDefinition};
687    use serde_json::json;
688
689    fn create_test_registry() -> Arc<InMemoryTopicRegistry> {
690        let registry = Arc::new(InMemoryTopicRegistry::new());
691        registry.add_topic(TopicDefinition {
692            canonical_url: "http://example.org/topic/encounter-start".to_string(),
693            title: Some("Encounter Start".to_string()),
694            resource_triggers: vec![ResourceTrigger {
695                resource_type: "Encounter".to_string(),
696                interactions: vec![ResourceEventType::Create],
697                fhirpath_criteria: None,
698            }],
699            can_filter_by: vec![FilterDefinition {
700                resource_type: Some("Encounter".to_string()),
701                filter_parameter: "patient".to_string(),
702                comparators: vec!["eq".to_string()],
703                modifiers: vec![],
704            }],
705            notification_shape: vec![],
706        });
707        registry
708    }
709
710    /// Returns a subscription JSON compatible with the default FHIR version.
711    /// R4 uses criteria + channel object; R4B+ uses topic + channelType.
712    pub(crate) fn default_subscription_json() -> serde_json::Value {
713        #[cfg(feature = "R4")]
714        {
715            json!({
716                "resourceType": "Subscription",
717                "id": "sub-1",
718                "status": "requested",
719                "criteria": "http://example.org/topic/encounter-start",
720                "channel": {
721                    "type": "rest-hook",
722                    "endpoint": "https://example.com/webhook",
723                    "payload": "application/fhir+json"
724                },
725                "extension": [
726                    {
727                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
728                        "valueCanonical": "http://example.org/topic/encounter-start"
729                    },
730                    {
731                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
732                        "valueCode": "id-only"
733                    }
734                ]
735            })
736        }
737        #[cfg(not(feature = "R4"))]
738        {
739            json!({
740                "resourceType": "Subscription",
741                "id": "sub-1",
742                "status": "requested",
743                "topic": "http://example.org/topic/encounter-start",
744                "channelType": {
745                    "system": "http://terminology.hl7.org/CodeSystem/subscription-channel-type",
746                    "code": "rest-hook"
747                },
748                "endpoint": "https://example.com/webhook",
749                "content": "id-only",
750                "contentType": "application/fhir+json"
751            })
752        }
753    }
754
755    #[cfg(feature = "R4")]
756    fn r4_subscription_json() -> serde_json::Value {
757        json!({
758            "resourceType": "Subscription",
759            "id": "sub-r4-1",
760            "status": "requested",
761            "criteria": "http://example.org/topic/encounter-start",
762            "channel": {
763                "type": "rest-hook",
764                "endpoint": "https://example.com/webhook",
765                "payload": "application/fhir+json",
766                "header": ["Authorization: Bearer token123"]
767            },
768            "extension": [
769                {
770                    "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
771                    "valueCanonical": "http://example.org/topic/encounter-start"
772                },
773                {
774                    "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
775                    "valueCode": "id-only"
776                },
777                {
778                    "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-heartbeat-period",
779                    "valueUnsignedInt": 60
780                }
781            ]
782        })
783    }
784
785    #[cfg(feature = "R5")]
786    fn non_r4_test_version() -> FhirVersion {
787        FhirVersion::R5
788    }
789
790    #[cfg(all(not(feature = "R5"), feature = "R4B"))]
791    fn non_r4_test_version() -> FhirVersion {
792        FhirVersion::R4B
793    }
794
795    #[cfg(all(not(feature = "R5"), not(feature = "R4B"), feature = "R6"))]
796    fn non_r4_test_version() -> FhirVersion {
797        FhirVersion::R6
798    }
799
800    #[test]
801    fn test_register_r5_subscription() {
802        let registry = create_test_registry();
803        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
804
805        let resource = default_subscription_json();
806        let sub = manager
807            .register("tenant-1", "sub-1", &resource, FhirVersion::default())
808            .unwrap();
809
810        assert_eq!(sub.id, "sub-1");
811        assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
812        assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
813        assert_eq!(
814            sub.channel.endpoint.as_deref(),
815            Some("https://example.com/webhook")
816        );
817        assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
818        assert_eq!(sub.status, SubscriptionStatusCode::Requested);
819        assert_eq!(sub.events_since_start, 0);
820    }
821
822    #[cfg(any(feature = "R4B", feature = "R5", feature = "R6"))]
823    #[test]
824    fn test_register_native_subscription_topic_and_channeltype_parsing() {
825        let registry = create_test_registry();
826        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
827
828        let resource = json!({
829            "resourceType": "Subscription",
830            "id": "sub-native-1",
831            "status": "requested",
832            "topic": "http://example.org/topic/encounter-start",
833            "channelType": {
834                "system": "http://terminology.hl7.org/CodeSystem/subscription-channel-type",
835                "code": "rest-hook"
836            },
837            "endpoint": "https://example.com/webhook",
838            "contentType": "application/fhir+json",
839            "content": "id-only",
840            "parameter": [{
841                "name": "Authorization",
842                "value": "Bearer native-token"
843            }]
844        });
845
846        let sub = manager
847            .register("tenant-1", "sub-native-1", &resource, non_r4_test_version())
848            .unwrap();
849
850        assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
851        assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
852        assert_eq!(
853            sub.channel.endpoint.as_deref(),
854            Some("https://example.com/webhook")
855        );
856        assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
857        assert_eq!(
858            sub.channel.headers,
859            vec!["Authorization: Bearer native-token".to_string()]
860        );
861    }
862
863    #[cfg(feature = "R4")]
864    #[test]
865    fn test_register_r4_subscription_with_backport_extensions() {
866        let registry = create_test_registry();
867        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
868
869        let resource = r4_subscription_json();
870        let sub = manager
871            .register("tenant-1", "sub-r4-1", &resource, FhirVersion::R4)
872            .unwrap();
873
874        assert_eq!(sub.topic_url, "http://example.org/topic/encounter-start");
875        assert_eq!(sub.channel.channel_type, ChannelType::RestHook);
876        assert_eq!(sub.channel.payload_content, PayloadContent::IdOnly);
877        assert_eq!(sub.channel.heartbeat_period, Some(60));
878        assert_eq!(sub.channel.headers.len(), 1);
879        assert!(sub.channel.headers[0].contains("Authorization"));
880    }
881
882    #[cfg(feature = "R4")]
883    #[test]
884    fn test_register_r4_subscription_payload_content_in_channel_payload_extension() {
885        let registry = create_test_registry();
886        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
887
888        let resource = json!({
889            "resourceType": "Subscription",
890            "id": "sub-r4-2",
891            "status": "requested",
892            "criteria": "http://example.org/topic/encounter-start",
893            "channel": {
894                "type": "rest-hook",
895                "endpoint": "https://example.com/webhook",
896                "payload": "application/fhir+json",
897                "_payload": {
898                    "extension": [{
899                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
900                        "valueCode": "empty"
901                    }]
902                }
903            }
904        });
905
906        let sub = manager
907            .register("tenant-1", "sub-r4-2", &resource, FhirVersion::R4)
908            .unwrap();
909
910        assert_eq!(sub.channel.payload_content, PayloadContent::Empty);
911    }
912
913    /// Build a version-appropriate subscription JSON with custom topic/channel.
914    pub(crate) fn build_subscription_json(
915        topic_url: &str,
916        channel_type: &str,
917        endpoint: Option<&str>,
918    ) -> serde_json::Value {
919        #[cfg(feature = "R4")]
920        {
921            let mut sub = json!({
922                "resourceType": "Subscription",
923                "status": "requested",
924                "criteria": topic_url,
925                "channel": {
926                    "type": channel_type
927                },
928                "extension": [{
929                    "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
930                    "valueCanonical": topic_url
931                }]
932            });
933            if let Some(ep) = endpoint {
934                sub["channel"]["endpoint"] = json!(ep);
935            }
936            sub
937        }
938        #[cfg(not(feature = "R4"))]
939        {
940            let mut sub = json!({
941                "resourceType": "Subscription",
942                "status": "requested",
943                "topic": topic_url,
944                "channelType": { "code": channel_type }
945            });
946            if let Some(ep) = endpoint {
947                sub["endpoint"] = json!(ep);
948            }
949            sub
950        }
951    }
952
953    #[test]
954    fn test_register_invalid_topic() {
955        let registry = create_test_registry();
956        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
957
958        let resource = build_subscription_json(
959            "http://nonexistent.org/topic/missing",
960            "rest-hook",
961            Some("https://example.com/webhook"),
962        );
963
964        let result = manager.register("t1", "s1", &resource, FhirVersion::default());
965        assert!(matches!(
966            result,
967            Err(SubscriptionError::TopicNotFound { .. })
968        ));
969    }
970
971    #[test]
972    fn test_register_unsupported_channel() {
973        let registry = create_test_registry();
974        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
975
976        let resource = build_subscription_json(
977            "http://example.org/topic/encounter-start",
978            "email",
979            Some("mailto:test@example.com"),
980        );
981
982        let result = manager.register("t1", "s1", &resource, FhirVersion::default());
983        assert!(matches!(
984            result,
985            Err(SubscriptionError::UnsupportedChannel { .. })
986        ));
987    }
988
989    #[test]
990    fn test_register_rest_hook_missing_endpoint() {
991        let registry = create_test_registry();
992        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
993
994        let resource = build_subscription_json(
995            "http://example.org/topic/encounter-start",
996            "rest-hook",
997            None, // No endpoint.
998        );
999
1000        let result = manager.register("t1", "s1", &resource, FhirVersion::default());
1001        assert!(matches!(
1002            result,
1003            Err(SubscriptionError::InvalidEndpoint { .. })
1004        ));
1005    }
1006
1007    #[test]
1008    fn test_deregister() {
1009        let registry = create_test_registry();
1010        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1011
1012        let resource = default_subscription_json();
1013        manager
1014            .register("t1", "sub-1", &resource, FhirVersion::default())
1015            .unwrap();
1016
1017        assert!(manager.deregister("t1", "sub-1"));
1018        assert!(manager.get_subscription("t1", "sub-1").is_none());
1019
1020        // Deregistering again returns false.
1021        assert!(!manager.deregister("t1", "sub-1"));
1022    }
1023
1024    #[test]
1025    fn test_active_subscriptions_for_topic() {
1026        let registry = create_test_registry();
1027        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1028
1029        let resource = default_subscription_json();
1030        manager
1031            .register("t1", "sub-1", &resource, FhirVersion::default())
1032            .unwrap();
1033
1034        // Subscription is in "requested" status, should not appear in active list.
1035        let active = manager
1036            .active_subscriptions_for_topic("t1", "http://example.org/topic/encounter-start");
1037        assert!(active.is_empty());
1038
1039        // Transition to active.
1040        manager
1041            .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1042            .unwrap();
1043
1044        let active = manager
1045            .active_subscriptions_for_topic("t1", "http://example.org/topic/encounter-start");
1046        assert_eq!(active.len(), 1);
1047        assert_eq!(active[0].id, "sub-1");
1048    }
1049
1050    #[test]
1051    fn test_status_transitions() {
1052        let registry = create_test_registry();
1053        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1054
1055        let resource = default_subscription_json();
1056        manager
1057            .register("t1", "sub-1", &resource, FhirVersion::default())
1058            .unwrap();
1059
1060        // Requested -> Active (valid).
1061        let old = manager
1062            .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1063            .unwrap();
1064        assert_eq!(old, SubscriptionStatusCode::Requested);
1065
1066        // Active -> Error (valid).
1067        manager
1068            .update_status("t1", "sub-1", SubscriptionStatusCode::Error)
1069            .unwrap();
1070
1071        // Error -> Active (valid, recovery).
1072        manager
1073            .update_status("t1", "sub-1", SubscriptionStatusCode::Active)
1074            .unwrap();
1075
1076        // Active -> Requested (invalid).
1077        let result = manager.update_status("t1", "sub-1", SubscriptionStatusCode::Requested);
1078        assert!(matches!(
1079            result,
1080            Err(SubscriptionError::InvalidStatusTransition { .. })
1081        ));
1082    }
1083
1084    #[test]
1085    fn test_increment_event_count() {
1086        let registry = create_test_registry();
1087        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1088
1089        let resource = default_subscription_json();
1090        manager
1091            .register("t1", "sub-1", &resource, FhirVersion::default())
1092            .unwrap();
1093
1094        assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(1));
1095        assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(2));
1096        assert_eq!(manager.increment_event_count("t1", "sub-1"), Some(3));
1097
1098        // Nonexistent subscription.
1099        assert!(manager.increment_event_count("t1", "nonexistent").is_none());
1100    }
1101
1102    #[test]
1103    fn test_record_and_reset_failures() {
1104        let registry = create_test_registry();
1105        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1106
1107        let resource = default_subscription_json();
1108        manager
1109            .register("t1", "sub-1", &resource, FhirVersion::default())
1110            .unwrap();
1111
1112        assert_eq!(manager.record_failure("t1", "sub-1"), Some(1));
1113        assert_eq!(manager.record_failure("t1", "sub-1"), Some(2));
1114
1115        manager.reset_failures("t1", "sub-1");
1116        let sub = manager.get_subscription("t1", "sub-1").unwrap();
1117        assert_eq!(sub.consecutive_failures, 0);
1118    }
1119
1120    #[test]
1121    fn test_tenant_isolation() {
1122        let registry = create_test_registry();
1123        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1124
1125        let resource = default_subscription_json();
1126        manager
1127            .register("tenant-a", "sub-1", &resource, FhirVersion::default())
1128            .unwrap();
1129        manager
1130            .register("tenant-b", "sub-1", &resource, FhirVersion::default())
1131            .unwrap();
1132
1133        // Each tenant sees their own subscription.
1134        assert!(manager.get_subscription("tenant-a", "sub-1").is_some());
1135        assert!(manager.get_subscription("tenant-b", "sub-1").is_some());
1136
1137        // Deregistering in one tenant doesn't affect the other.
1138        manager.deregister("tenant-a", "sub-1");
1139        assert!(manager.get_subscription("tenant-a", "sub-1").is_none());
1140        assert!(manager.get_subscription("tenant-b", "sub-1").is_some());
1141    }
1142
1143    #[test]
1144    fn test_register_with_filter_by() {
1145        let registry = create_test_registry();
1146        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1147
1148        #[cfg(feature = "R4")]
1149        let (resource, fhir_version) = (
1150            json!({
1151                "resourceType": "Subscription",
1152                "status": "requested",
1153                "criteria": "http://example.org/topic/encounter-start",
1154                "channel": {
1155                    "type": "rest-hook",
1156                    "endpoint": "https://example.com/webhook"
1157                },
1158                "extension": [
1159                    {
1160                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
1161                        "valueCanonical": "http://example.org/topic/encounter-start"
1162                    },
1163                    {
1164                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content",
1165                        "valueCode": "full-resource"
1166                    },
1167                    {
1168                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
1169                        "valueString": "Encounter?patient=Patient/123"
1170                    }
1171                ]
1172            }),
1173            FhirVersion::R4,
1174        );
1175        #[cfg(all(
1176            not(feature = "R4"),
1177            any(feature = "R4B", feature = "R5", feature = "R6")
1178        ))]
1179        let (resource, fhir_version) = (
1180            json!({
1181                "resourceType": "Subscription",
1182                "status": "requested",
1183                "topic": "http://example.org/topic/encounter-start",
1184                "channelType": { "code": "rest-hook" },
1185                "endpoint": "https://example.com/webhook",
1186                "content": "full-resource",
1187                "filterBy": [{
1188                    "resourceType": "Encounter",
1189                    "filterParameter": "patient",
1190                    "value": "Patient/123"
1191                }]
1192            }),
1193            non_r4_test_version(),
1194        );
1195
1196        let sub = manager
1197            .register("t1", "sub-1", &resource, fhir_version)
1198            .unwrap();
1199
1200        assert_eq!(sub.filters.len(), 1);
1201        assert_eq!(sub.filters[0].filter_parameter, "patient");
1202        assert_eq!(sub.filters[0].value, "Patient/123");
1203        assert_eq!(sub.channel.payload_content, PayloadContent::FullResource);
1204    }
1205
1206    #[cfg(any(feature = "R4B", feature = "R5", feature = "R6"))]
1207    #[test]
1208    fn test_register_with_native_filter_by_comparator() {
1209        let registry = create_test_registry();
1210        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1211
1212        let resource = json!({
1213            "resourceType": "Subscription",
1214            "status": "requested",
1215            "topic": "http://example.org/topic/encounter-start",
1216            "channelType": { "code": "rest-hook" },
1217            "endpoint": "https://example.com/webhook",
1218            "filterBy": [{
1219                "resourceType": "Encounter",
1220                "filterParameter": "patient",
1221                "comparator": "eq",
1222                "value": "Patient/123"
1223            }]
1224        });
1225
1226        let sub = manager
1227            .register("t1", "sub-1", &resource, non_r4_test_version())
1228            .unwrap();
1229
1230        assert_eq!(sub.filters.len(), 1);
1231        assert_eq!(sub.filters[0].filter_parameter, "patient");
1232        assert_eq!(sub.filters[0].comparator, "eq");
1233        assert_eq!(sub.filters[0].value, "Patient/123");
1234    }
1235
1236    #[test]
1237    fn test_register_with_invalid_filter() {
1238        let registry = create_test_registry();
1239        let manager = SubscriptionManager::new(registry, vec!["rest-hook".to_string()]);
1240
1241        #[cfg(feature = "R4")]
1242        let (resource, fhir_version) = (
1243            json!({
1244                "resourceType": "Subscription",
1245                "status": "requested",
1246                "criteria": "http://example.org/topic/encounter-start",
1247                "channel": {
1248                    "type": "rest-hook",
1249                    "endpoint": "https://example.com/webhook"
1250                },
1251                "extension": [
1252                    {
1253                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-topic-canonical",
1254                        "valueCanonical": "http://example.org/topic/encounter-start"
1255                    },
1256                    {
1257                        "url": "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
1258                        "valueString": "unknown-param=some-value"
1259                    }
1260                ]
1261            }),
1262            FhirVersion::R4,
1263        );
1264        #[cfg(all(
1265            not(feature = "R4"),
1266            any(feature = "R4B", feature = "R5", feature = "R6")
1267        ))]
1268        let (resource, fhir_version) = (
1269            json!({
1270                "resourceType": "Subscription",
1271                "status": "requested",
1272                "topic": "http://example.org/topic/encounter-start",
1273                "channelType": { "code": "rest-hook" },
1274                "endpoint": "https://example.com/webhook",
1275                "filterBy": [{
1276                    "filterParameter": "unknown-param",
1277                    "value": "some-value"
1278                }]
1279            }),
1280            non_r4_test_version(),
1281        );
1282
1283        let result = manager.register("t1", "sub-1", &resource, fhir_version);
1284        assert!(matches!(
1285            result,
1286            Err(SubscriptionError::InvalidFilter { .. })
1287        ));
1288    }
1289}