Skip to main content

helios_subscriptions/topics/
mod.rs

1//! Subscription topic registry.
2//!
3//! Manages `SubscriptionTopic` definitions and evaluates whether resource events
4//! match a topic's triggers.
5
6use std::collections::HashMap;
7use std::sync::RwLock;
8
9use crate::error::SubscriptionError;
10use crate::event::ResourceEventType;
11
12const FHIR_TYPES_SYSTEM: &str = "http://hl7.org/fhir/fhir-types";
13const SUBSCRIPTION_TOPIC_CODE: &str = "SubscriptionTopic";
14const EXT_TOPIC_URL_SUFFIX: &str = "extension-SubscriptionTopic.url";
15const EXT_TOPIC_TITLE_SUFFIX: &str = "extension-SubscriptionTopic.title";
16const EXT_TOPIC_RESOURCE_TRIGGER_SUFFIX: &str = "extension-SubscriptionTopic.resourceTrigger";
17const EXT_TOPIC_CAN_FILTER_BY_SUFFIX: &str = "extension-SubscriptionTopic.canFilterBy";
18const EXT_TOPIC_NOTIFICATION_SHAPE_SUFFIX: &str = "extension-SubscriptionTopic.notificationShape";
19
20/// A version-agnostic representation of a `SubscriptionTopic`.
21#[derive(Debug, Clone)]
22pub struct TopicDefinition {
23    /// The canonical URL of the topic (e.g., `http://example.org/topic/encounter-start`).
24    pub canonical_url: String,
25
26    /// Human-readable title.
27    pub title: Option<String>,
28
29    /// Resource triggers that define when this topic fires.
30    pub resource_triggers: Vec<ResourceTrigger>,
31
32    /// Filters that subscribers can use to narrow which events they receive.
33    pub can_filter_by: Vec<FilterDefinition>,
34
35    /// Notification shape: which resource types may appear in notifications.
36    pub notification_shape: Vec<NotificationShape>,
37}
38
39/// A trigger condition defined by a topic.
40#[derive(Debug, Clone)]
41pub struct ResourceTrigger {
42    /// The resource type this trigger monitors (e.g., "Encounter", "Observation").
43    pub resource_type: String,
44
45    /// Which interaction types trigger this (create, update, delete).
46    pub interactions: Vec<ResourceEventType>,
47
48    /// Optional FHIRPath expression for additional trigger criteria.
49    pub fhirpath_criteria: Option<String>,
50}
51
52/// Defines a filter parameter that subscribers can use.
53#[derive(Debug, Clone)]
54pub struct FilterDefinition {
55    /// The resource type this filter applies to.
56    pub resource_type: Option<String>,
57
58    /// The filter parameter name (often a FHIR search parameter name).
59    pub filter_parameter: String,
60
61    /// Supported comparators (e.g., "eq", "in", "gt").
62    pub comparators: Vec<String>,
63
64    /// Supported modifiers (e.g., "missing", "exact").
65    pub modifiers: Vec<String>,
66}
67
68/// Defines the shape of notifications for a topic.
69#[derive(Debug, Clone)]
70pub struct NotificationShape {
71    /// The resource type included in notifications.
72    pub resource_type: String,
73
74    /// Include references to follow.
75    pub include: Vec<String>,
76}
77
78/// Describes a topic whose trigger matched a resource event.
79#[derive(Debug, Clone)]
80pub struct TopicMatch {
81    /// Canonical URL of the matching topic.
82    pub topic_url: String,
83
84    /// The focus resource type that triggered the match.
85    pub focus_resource_type: String,
86}
87
88/// Registry for subscription topics.
89///
90/// Stores topic definitions and evaluates which topics match a given
91/// resource event.
92pub struct InMemoryTopicRegistry {
93    /// Topics keyed by canonical URL.
94    topics: RwLock<HashMap<String, TopicDefinition>>,
95}
96
97impl InMemoryTopicRegistry {
98    /// Creates an empty topic registry.
99    pub fn new() -> Self {
100        Self {
101            topics: RwLock::new(HashMap::new()),
102        }
103    }
104
105    /// Registers a topic definition.
106    pub fn add_topic(&self, topic: TopicDefinition) {
107        let mut topics = self.topics.write().unwrap();
108        topics.insert(topic.canonical_url.clone(), topic);
109    }
110
111    /// Removes a topic by canonical URL.
112    pub fn remove_topic(&self, canonical_url: &str) -> bool {
113        let mut topics = self.topics.write().unwrap();
114        topics.remove(canonical_url).is_some()
115    }
116
117    /// Returns all registered topic canonical URLs.
118    pub fn list_topics(&self) -> Vec<String> {
119        let topics = self.topics.read().unwrap();
120        topics.keys().cloned().collect()
121    }
122
123    /// Returns a topic definition by canonical URL.
124    pub fn get_topic(&self, canonical_url: &str) -> Option<TopicDefinition> {
125        let topics = self.topics.read().unwrap();
126        topics.get(canonical_url).cloned()
127    }
128
129    /// Evaluates which topics match a resource event.
130    ///
131    /// Checks all registered topics' resource triggers against the event's
132    /// resource type and interaction type.
133    pub fn matching_topics(
134        &self,
135        resource_type: &str,
136        event_type: ResourceEventType,
137    ) -> Vec<TopicMatch> {
138        let topics = self.topics.read().unwrap();
139        let mut matches = Vec::new();
140
141        for topic in topics.values() {
142            for trigger in &topic.resource_triggers {
143                if trigger.resource_type == resource_type
144                    && trigger.interactions.contains(&event_type)
145                {
146                    matches.push(TopicMatch {
147                        topic_url: topic.canonical_url.clone(),
148                        focus_resource_type: trigger.resource_type.clone(),
149                    });
150                    // Only match once per topic even if multiple triggers match.
151                    break;
152                }
153            }
154        }
155
156        matches
157    }
158
159    /// Parses a `SubscriptionTopic` FHIR resource (JSON) into a [`TopicDefinition`].
160    ///
161    /// Works for R4B, R5, and R6 native `SubscriptionTopic` resources.
162    pub fn parse_topic_resource(
163        resource: &serde_json::Value,
164    ) -> Result<TopicDefinition, SubscriptionError> {
165        let canonical_url = resource
166            .get("url")
167            .and_then(|v| v.as_str())
168            .ok_or_else(|| SubscriptionError::InvalidSubscription {
169                message: "SubscriptionTopic missing 'url' field".to_string(),
170            })?
171            .to_string();
172
173        let title = resource
174            .get("title")
175            .and_then(|v| v.as_str())
176            .map(|s| s.to_string());
177
178        let resource_triggers = parse_resource_triggers(resource)?;
179        let can_filter_by = parse_can_filter_by(resource);
180        let notification_shape = parse_notification_shape(resource);
181
182        Ok(TopicDefinition {
183            canonical_url,
184            title,
185            resource_triggers,
186            can_filter_by,
187            notification_shape,
188        })
189    }
190
191    /// Parses an R4 backport `Basic` topic into a [`TopicDefinition`].
192    ///
193    /// Returns:
194    /// - `Ok(Some(topic))` when the resource is a strict backport topic
195    /// - `Ok(None)` when the resource is `Basic` but not a topic
196    /// - `Err` when the resource is marked as a topic but malformed
197    pub fn parse_r4_backport_basic_topic_resource(
198        resource: &serde_json::Value,
199    ) -> Result<Option<TopicDefinition>, SubscriptionError> {
200        if resource.get("resourceType").and_then(|v| v.as_str()) != Some("Basic") {
201            return Ok(None);
202        }
203
204        if !has_subscription_topic_basic_code(resource) {
205            return Ok(None);
206        }
207
208        let canonical_url =
209            find_top_level_extension_value_string(resource, EXT_TOPIC_URL_SUFFIX, &["valueUri"])
210                .ok_or_else(|| SubscriptionError::InvalidSubscription {
211                    message: "R4 Basic SubscriptionTopic missing canonical url extension"
212                        .to_string(),
213                })?;
214
215        let title = find_top_level_extension_value_string(
216            resource,
217            EXT_TOPIC_TITLE_SUFFIX,
218            &["valueString"],
219        );
220
221        let resource_triggers = parse_r4_basic_resource_triggers(resource)?;
222        if resource_triggers.is_empty() {
223            return Err(SubscriptionError::InvalidSubscription {
224                message: "R4 Basic SubscriptionTopic missing resourceTrigger extension".to_string(),
225            });
226        }
227
228        let can_filter_by = parse_r4_basic_can_filter_by(resource);
229        let notification_shape = parse_r4_basic_notification_shape(resource);
230
231        Ok(Some(TopicDefinition {
232            canonical_url,
233            title,
234            resource_triggers,
235            can_filter_by,
236            notification_shape,
237        }))
238    }
239}
240
241impl Default for InMemoryTopicRegistry {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247/// Parse `resourceTrigger` array from a SubscriptionTopic JSON resource.
248fn parse_resource_triggers(
249    resource: &serde_json::Value,
250) -> Result<Vec<ResourceTrigger>, SubscriptionError> {
251    let triggers = match resource.get("resourceTrigger").and_then(|v| v.as_array()) {
252        Some(arr) => arr,
253        None => return Ok(Vec::new()),
254    };
255
256    let mut result = Vec::new();
257    for trigger in triggers {
258        let resource_type = trigger
259            .get("resource")
260            .and_then(|v| v.as_str())
261            .ok_or_else(|| SubscriptionError::InvalidSubscription {
262                message: "resourceTrigger missing 'resource' field".to_string(),
263            })?
264            .to_string();
265
266        let interactions = parse_interactions(trigger);
267
268        let fhirpath_criteria = trigger
269            .get("fhirPathCriteria")
270            .and_then(|v| v.as_str())
271            .map(|s| s.to_string());
272
273        result.push(ResourceTrigger {
274            resource_type,
275            interactions,
276            fhirpath_criteria,
277        });
278    }
279
280    Ok(result)
281}
282
283/// Parse `supportedInteraction` array from a trigger definition.
284fn parse_interactions(trigger: &serde_json::Value) -> Vec<ResourceEventType> {
285    let interactions = match trigger
286        .get("supportedInteraction")
287        .and_then(|v| v.as_array())
288    {
289        Some(arr) => arr,
290        None => {
291            // Default to all interactions if not specified.
292            return vec![
293                ResourceEventType::Create,
294                ResourceEventType::Update,
295                ResourceEventType::Delete,
296            ];
297        }
298    };
299
300    interactions
301        .iter()
302        .filter_map(|v| match v.as_str()? {
303            "create" => Some(ResourceEventType::Create),
304            "update" => Some(ResourceEventType::Update),
305            "delete" => Some(ResourceEventType::Delete),
306            _ => None,
307        })
308        .collect()
309}
310
311/// Parse `canFilterBy` array from a SubscriptionTopic JSON resource.
312fn parse_can_filter_by(resource: &serde_json::Value) -> Vec<FilterDefinition> {
313    let filters = match resource.get("canFilterBy").and_then(|v| v.as_array()) {
314        Some(arr) => arr,
315        None => return Vec::new(),
316    };
317
318    filters
319        .iter()
320        .filter_map(|f| {
321            let filter_parameter = f.get("filterParameter")?.as_str()?.to_string();
322            let resource_type = f.get("resource").and_then(|v| v.as_str()).map(String::from);
323
324            let comparators = f
325                .get("comparator")
326                .and_then(|v| v.as_array())
327                .map(|arr| {
328                    arr.iter()
329                        .filter_map(|v| v.as_str().map(String::from))
330                        .collect()
331                })
332                .unwrap_or_default();
333
334            let modifiers = f
335                .get("modifier")
336                .and_then(|v| v.as_array())
337                .map(|arr| {
338                    arr.iter()
339                        .filter_map(|v| v.as_str().map(String::from))
340                        .collect()
341                })
342                .unwrap_or_default();
343
344            Some(FilterDefinition {
345                resource_type,
346                filter_parameter,
347                comparators,
348                modifiers,
349            })
350        })
351        .collect()
352}
353
354/// Parse `notificationShape` array from a SubscriptionTopic JSON resource.
355fn parse_notification_shape(resource: &serde_json::Value) -> Vec<NotificationShape> {
356    let shapes = match resource.get("notificationShape").and_then(|v| v.as_array()) {
357        Some(arr) => arr,
358        None => return Vec::new(),
359    };
360
361    shapes
362        .iter()
363        .filter_map(|s| {
364            let resource_type = s.get("resource")?.as_str()?.to_string();
365            let include = s
366                .get("include")
367                .and_then(|v| v.as_array())
368                .map(|arr| {
369                    arr.iter()
370                        .filter_map(|v| v.as_str().map(String::from))
371                        .collect()
372                })
373                .unwrap_or_default();
374
375            Some(NotificationShape {
376                resource_type,
377                include,
378            })
379        })
380        .collect()
381}
382
383fn has_subscription_topic_basic_code(resource: &serde_json::Value) -> bool {
384    resource
385        .get("code")
386        .and_then(|v| v.get("coding"))
387        .and_then(|v| v.as_array())
388        .map(|coding| {
389            coding.iter().any(|entry| {
390                entry.get("system").and_then(|v| v.as_str()) == Some(FHIR_TYPES_SYSTEM)
391                    && entry.get("code").and_then(|v| v.as_str()) == Some(SUBSCRIPTION_TOPIC_CODE)
392            })
393        })
394        .unwrap_or(false)
395}
396
397fn extension_url_matches_suffix(ext: &serde_json::Value, suffix: &str) -> bool {
398    ext.get("url")
399        .and_then(|v| v.as_str())
400        .map(|url| url.ends_with(suffix))
401        .unwrap_or(false)
402}
403
404fn find_top_level_extension_value_string(
405    resource: &serde_json::Value,
406    suffix: &str,
407    value_keys: &[&str],
408) -> Option<String> {
409    resource
410        .get("extension")?
411        .as_array()?
412        .iter()
413        .find(|ext| extension_url_matches_suffix(ext, suffix))
414        .and_then(|ext| {
415            for key in value_keys {
416                if let Some(value) = ext.get(*key).and_then(|v| v.as_str()) {
417                    return Some(value.to_string());
418                }
419            }
420            None
421        })
422}
423
424fn find_nested_extension_value_string(
425    ext: &serde_json::Value,
426    key: &str,
427    value_keys: &[&str],
428) -> Option<String> {
429    ext.get("extension")?.as_array()?.iter().find_map(|nested| {
430        if nested.get("url").and_then(|v| v.as_str()) != Some(key) {
431            return None;
432        }
433
434        for value_key in value_keys {
435            if let Some(value) = nested.get(*value_key).and_then(|v| v.as_str()) {
436                return Some(value.to_string());
437            }
438        }
439        None
440    })
441}
442
443fn find_all_nested_extension_values_string(
444    ext: &serde_json::Value,
445    key: &str,
446    value_keys: &[&str],
447) -> Vec<String> {
448    ext.get("extension")
449        .and_then(|v| v.as_array())
450        .map(|nested| {
451            nested
452                .iter()
453                .filter(|item| item.get("url").and_then(|v| v.as_str()) == Some(key))
454                .filter_map(|item| {
455                    for value_key in value_keys {
456                        if let Some(value) = item.get(*value_key).and_then(|v| v.as_str()) {
457                            return Some(value.to_string());
458                        }
459                    }
460                    None
461                })
462                .collect()
463        })
464        .unwrap_or_default()
465}
466
467fn parse_resource_type_from_uri(uri: &str) -> String {
468    if let Some(resource_type) = uri.rsplit('/').next() {
469        return resource_type.to_string();
470    }
471    uri.to_string()
472}
473
474fn parse_r4_basic_resource_triggers(
475    resource: &serde_json::Value,
476) -> Result<Vec<ResourceTrigger>, SubscriptionError> {
477    let trigger_exts = resource
478        .get("extension")
479        .and_then(|v| v.as_array())
480        .map(|exts| {
481            exts.iter()
482                .filter(|ext| extension_url_matches_suffix(ext, EXT_TOPIC_RESOURCE_TRIGGER_SUFFIX))
483                .collect::<Vec<_>>()
484        })
485        .unwrap_or_default();
486
487    let mut triggers = Vec::new();
488    for trigger_ext in trigger_exts {
489        let resource_uri =
490            find_nested_extension_value_string(trigger_ext, "resource", &["valueUri"]).ok_or_else(
491                || SubscriptionError::InvalidSubscription {
492                    message: "R4 Basic SubscriptionTopic trigger missing resource".to_string(),
493                },
494            )?;
495        let resource_type = parse_resource_type_from_uri(&resource_uri);
496
497        let interactions = find_all_nested_extension_values_string(
498            trigger_ext,
499            "supportedInteraction",
500            &["valueCode"],
501        )
502        .iter()
503        .filter_map(|code| match code.as_str() {
504            "create" => Some(ResourceEventType::Create),
505            "update" => Some(ResourceEventType::Update),
506            "delete" => Some(ResourceEventType::Delete),
507            _ => None,
508        })
509        .collect::<Vec<_>>();
510
511        if interactions.is_empty() {
512            return Err(SubscriptionError::InvalidSubscription {
513                message: "R4 Basic SubscriptionTopic trigger missing supportedInteraction"
514                    .to_string(),
515            });
516        }
517
518        let fhirpath_criteria =
519            find_nested_extension_value_string(trigger_ext, "fhirPathCriteria", &["valueString"]);
520
521        triggers.push(ResourceTrigger {
522            resource_type,
523            interactions,
524            fhirpath_criteria,
525        });
526    }
527
528    Ok(triggers)
529}
530
531fn parse_r4_basic_can_filter_by(resource: &serde_json::Value) -> Vec<FilterDefinition> {
532    let can_filter_by_exts = resource
533        .get("extension")
534        .and_then(|v| v.as_array())
535        .map(|exts| {
536            exts.iter()
537                .filter(|ext| extension_url_matches_suffix(ext, EXT_TOPIC_CAN_FILTER_BY_SUFFIX))
538                .collect::<Vec<_>>()
539        })
540        .unwrap_or_default();
541
542    can_filter_by_exts
543        .iter()
544        .filter_map(|ext| {
545            let filter_parameter =
546                find_nested_extension_value_string(ext, "filterParameter", &["valueString"])?;
547
548            let resource_type =
549                find_nested_extension_value_string(ext, "resource", &["valueUri", "valueString"])
550                    .map(|value| parse_resource_type_from_uri(&value));
551
552            let comparators =
553                find_all_nested_extension_values_string(ext, "comparator", &["valueCode"]);
554            let modifiers =
555                find_all_nested_extension_values_string(ext, "modifier", &["valueCode"]);
556
557            Some(FilterDefinition {
558                resource_type,
559                filter_parameter,
560                comparators,
561                modifiers,
562            })
563        })
564        .collect()
565}
566
567fn parse_r4_basic_notification_shape(resource: &serde_json::Value) -> Vec<NotificationShape> {
568    let shape_exts = resource
569        .get("extension")
570        .and_then(|v| v.as_array())
571        .map(|exts| {
572            exts.iter()
573                .filter(|ext| {
574                    extension_url_matches_suffix(ext, EXT_TOPIC_NOTIFICATION_SHAPE_SUFFIX)
575                })
576                .collect::<Vec<_>>()
577        })
578        .unwrap_or_default();
579
580    shape_exts
581        .iter()
582        .filter_map(|ext| {
583            let resource_type =
584                find_nested_extension_value_string(ext, "resource", &["valueUri", "valueString"])
585                    .map(|value| parse_resource_type_from_uri(&value))?;
586            let include = find_all_nested_extension_values_string(ext, "include", &["valueString"]);
587
588            Some(NotificationShape {
589                resource_type,
590                include,
591            })
592        })
593        .collect()
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599    use serde_json::json;
600
601    fn sample_encounter_topic() -> TopicDefinition {
602        TopicDefinition {
603            canonical_url: "http://example.org/topic/encounter-start".to_string(),
604            title: Some("Encounter Start".to_string()),
605            resource_triggers: vec![ResourceTrigger {
606                resource_type: "Encounter".to_string(),
607                interactions: vec![ResourceEventType::Create],
608                fhirpath_criteria: None,
609            }],
610            can_filter_by: vec![FilterDefinition {
611                resource_type: Some("Encounter".to_string()),
612                filter_parameter: "patient".to_string(),
613                comparators: vec!["eq".to_string()],
614                modifiers: vec![],
615            }],
616            notification_shape: vec![NotificationShape {
617                resource_type: "Encounter".to_string(),
618                include: vec!["Encounter:patient".to_string()],
619            }],
620        }
621    }
622
623    fn sample_observation_topic() -> TopicDefinition {
624        TopicDefinition {
625            canonical_url: "http://example.org/topic/new-lab-result".to_string(),
626            title: Some("New Lab Result".to_string()),
627            resource_triggers: vec![ResourceTrigger {
628                resource_type: "Observation".to_string(),
629                interactions: vec![ResourceEventType::Create, ResourceEventType::Update],
630                fhirpath_criteria: None,
631            }],
632            can_filter_by: vec![FilterDefinition {
633                resource_type: Some("Observation".to_string()),
634                filter_parameter: "code".to_string(),
635                comparators: vec!["eq".to_string(), "in".to_string()],
636                modifiers: vec![],
637            }],
638            notification_shape: vec![],
639        }
640    }
641
642    #[test]
643    fn test_add_and_list_topics() {
644        let registry = InMemoryTopicRegistry::new();
645        assert!(registry.list_topics().is_empty());
646
647        registry.add_topic(sample_encounter_topic());
648        let topics = registry.list_topics();
649        assert_eq!(topics.len(), 1);
650        assert!(topics.contains(&"http://example.org/topic/encounter-start".to_string()));
651    }
652
653    #[test]
654    fn test_get_topic() {
655        let registry = InMemoryTopicRegistry::new();
656        registry.add_topic(sample_encounter_topic());
657
658        let topic = registry
659            .get_topic("http://example.org/topic/encounter-start")
660            .unwrap();
661        assert_eq!(topic.title.unwrap(), "Encounter Start");
662        assert_eq!(topic.resource_triggers.len(), 1);
663
664        assert!(
665            registry
666                .get_topic("http://example.org/nonexistent")
667                .is_none()
668        );
669    }
670
671    #[test]
672    fn test_remove_topic() {
673        let registry = InMemoryTopicRegistry::new();
674        registry.add_topic(sample_encounter_topic());
675
676        assert!(registry.remove_topic("http://example.org/topic/encounter-start"));
677        assert!(registry.list_topics().is_empty());
678
679        assert!(!registry.remove_topic("http://example.org/nonexistent"));
680    }
681
682    #[test]
683    fn test_matching_topics_by_resource_type_and_interaction() {
684        let registry = InMemoryTopicRegistry::new();
685        registry.add_topic(sample_encounter_topic());
686        registry.add_topic(sample_observation_topic());
687
688        // Encounter create should match encounter topic.
689        let matches = registry.matching_topics("Encounter", ResourceEventType::Create);
690        assert_eq!(matches.len(), 1);
691        assert_eq!(
692            matches[0].topic_url,
693            "http://example.org/topic/encounter-start"
694        );
695        assert_eq!(matches[0].focus_resource_type, "Encounter");
696
697        // Observation create should match observation topic.
698        let matches = registry.matching_topics("Observation", ResourceEventType::Create);
699        assert_eq!(matches.len(), 1);
700        assert_eq!(
701            matches[0].topic_url,
702            "http://example.org/topic/new-lab-result"
703        );
704
705        // Observation update should also match.
706        let matches = registry.matching_topics("Observation", ResourceEventType::Update);
707        assert_eq!(matches.len(), 1);
708    }
709
710    #[test]
711    fn test_no_match_for_wrong_resource_type() {
712        let registry = InMemoryTopicRegistry::new();
713        registry.add_topic(sample_encounter_topic());
714
715        let matches = registry.matching_topics("Patient", ResourceEventType::Create);
716        assert!(matches.is_empty());
717    }
718
719    #[test]
720    fn test_no_match_for_wrong_interaction() {
721        let registry = InMemoryTopicRegistry::new();
722        registry.add_topic(sample_encounter_topic());
723
724        // Encounter topic only triggers on create, not update or delete.
725        let matches = registry.matching_topics("Encounter", ResourceEventType::Update);
726        assert!(matches.is_empty());
727
728        let matches = registry.matching_topics("Encounter", ResourceEventType::Delete);
729        assert!(matches.is_empty());
730    }
731
732    #[test]
733    fn test_multiple_topics_matching_same_event() {
734        let registry = InMemoryTopicRegistry::new();
735
736        // Add two topics that both trigger on Observation create.
737        registry.add_topic(sample_observation_topic());
738        registry.add_topic(TopicDefinition {
739            canonical_url: "http://example.org/topic/vital-signs".to_string(),
740            title: Some("Vital Signs".to_string()),
741            resource_triggers: vec![ResourceTrigger {
742                resource_type: "Observation".to_string(),
743                interactions: vec![ResourceEventType::Create],
744                fhirpath_criteria: None,
745            }],
746            can_filter_by: vec![],
747            notification_shape: vec![],
748        });
749
750        let matches = registry.matching_topics("Observation", ResourceEventType::Create);
751        assert_eq!(matches.len(), 2);
752    }
753
754    #[test]
755    fn test_parse_topic_resource() {
756        let topic_json = json!({
757            "resourceType": "SubscriptionTopic",
758            "url": "http://example.org/topic/patient-admit",
759            "title": "Patient Admission",
760            "resourceTrigger": [{
761                "resource": "Encounter",
762                "supportedInteraction": ["create", "update"],
763                "fhirPathCriteria": "(%previous.empty() | (%previous.status != 'in-progress')) and (%current.status = 'in-progress')"
764            }],
765            "canFilterBy": [{
766                "resource": "Encounter",
767                "filterParameter": "patient",
768                "comparator": ["eq"]
769            }],
770            "notificationShape": [{
771                "resource": "Encounter",
772                "include": ["Encounter:patient", "Encounter:location"]
773            }]
774        });
775
776        let topic = InMemoryTopicRegistry::parse_topic_resource(&topic_json).unwrap();
777        assert_eq!(
778            topic.canonical_url,
779            "http://example.org/topic/patient-admit"
780        );
781        assert_eq!(topic.title.unwrap(), "Patient Admission");
782
783        assert_eq!(topic.resource_triggers.len(), 1);
784        let trigger = &topic.resource_triggers[0];
785        assert_eq!(trigger.resource_type, "Encounter");
786        assert_eq!(trigger.interactions.len(), 2);
787        assert!(trigger.interactions.contains(&ResourceEventType::Create));
788        assert!(trigger.interactions.contains(&ResourceEventType::Update));
789        assert!(trigger.fhirpath_criteria.is_some());
790
791        assert_eq!(topic.can_filter_by.len(), 1);
792        assert_eq!(topic.can_filter_by[0].filter_parameter, "patient");
793
794        assert_eq!(topic.notification_shape.len(), 1);
795        assert_eq!(topic.notification_shape[0].include.len(), 2);
796    }
797
798    #[test]
799    fn test_parse_topic_resource_missing_url() {
800        let topic_json = json!({
801            "resourceType": "SubscriptionTopic",
802            "title": "No URL"
803        });
804
805        let result = InMemoryTopicRegistry::parse_topic_resource(&topic_json);
806        assert!(result.is_err());
807    }
808
809    #[test]
810    fn test_parse_topic_resource_minimal() {
811        let topic_json = json!({
812            "resourceType": "SubscriptionTopic",
813            "url": "http://example.org/topic/minimal"
814        });
815
816        let topic = InMemoryTopicRegistry::parse_topic_resource(&topic_json).unwrap();
817        assert_eq!(topic.canonical_url, "http://example.org/topic/minimal");
818        assert!(topic.resource_triggers.is_empty());
819        assert!(topic.can_filter_by.is_empty());
820        assert!(topic.notification_shape.is_empty());
821    }
822
823    #[test]
824    fn test_parse_topic_default_interactions() {
825        // When supportedInteraction is not specified, all interactions should be assumed.
826        let topic_json = json!({
827            "resourceType": "SubscriptionTopic",
828            "url": "http://example.org/topic/all-interactions",
829            "resourceTrigger": [{
830                "resource": "Patient"
831            }]
832        });
833
834        let topic = InMemoryTopicRegistry::parse_topic_resource(&topic_json).unwrap();
835        let trigger = &topic.resource_triggers[0];
836        assert_eq!(trigger.interactions.len(), 3);
837    }
838
839    #[test]
840    fn test_parse_r4_backport_basic_topic_resource() {
841        let topic_json = json!({
842            "resourceType": "Basic",
843            "id": "topic-basic-1",
844            "code": {
845                "coding": [{
846                    "system": "http://hl7.org/fhir/fhir-types",
847                    "code": "SubscriptionTopic"
848                }]
849            },
850            "extension": [
851                {
852                    "url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.url",
853                    "valueUri": "http://example.org/topic/basic-encounter"
854                },
855                {
856                    "url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.title",
857                    "valueString": "Basic Encounter Topic"
858                },
859                {
860                    "url": "http://hl7.org/fhir/4.3/StructureDefinition/extension-SubscriptionTopic.resourceTrigger",
861                    "extension": [
862                        { "url": "resource", "valueUri": "http://hl7.org/fhir/StructureDefinition/Encounter" },
863                        { "url": "supportedInteraction", "valueCode": "create" }
864                    ]
865                }
866            ]
867        });
868
869        let topic = InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(&topic_json)
870            .unwrap()
871            .expect("basic topic should parse");
872        assert_eq!(
873            topic.canonical_url,
874            "http://example.org/topic/basic-encounter"
875        );
876        assert_eq!(topic.title.as_deref(), Some("Basic Encounter Topic"));
877        assert_eq!(topic.resource_triggers.len(), 1);
878        assert_eq!(topic.resource_triggers[0].resource_type, "Encounter");
879        assert_eq!(topic.resource_triggers[0].interactions.len(), 1);
880        assert!(
881            topic.resource_triggers[0]
882                .interactions
883                .contains(&ResourceEventType::Create)
884        );
885    }
886
887    #[test]
888    fn test_parse_r4_backport_basic_topic_requires_subscriptiontopic_code() {
889        let basic_json = json!({
890            "resourceType": "Basic",
891            "code": {
892                "coding": [{
893                    "system": "http://hl7.org/fhir/fhir-types",
894                    "code": "OtherType"
895                }]
896            },
897            "extension": [{
898                "url": "http://hl7.org/fhir/5.0/StructureDefinition/extension-SubscriptionTopic.url",
899                "valueUri": "http://example.org/topic/basic-encounter"
900            }]
901        });
902
903        let result =
904            InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(&basic_json).unwrap();
905        assert!(result.is_none());
906    }
907
908    #[test]
909    fn test_parse_r4_backport_basic_topic_missing_core_extensions_errors() {
910        let malformed_topic = json!({
911            "resourceType": "Basic",
912            "code": {
913                "coding": [{
914                    "system": "http://hl7.org/fhir/fhir-types",
915                    "code": "SubscriptionTopic"
916                }]
917            },
918            "extension": []
919        });
920
921        let result =
922            InMemoryTopicRegistry::parse_r4_backport_basic_topic_resource(&malformed_topic);
923        assert!(result.is_err());
924    }
925}